diff mbox

[FFmpeg-devel] lavd: implement threaded NewTek NDI output

Message ID e9337f4b-1527-a0a0-80c6-677eb84ea244@m1stereo.tv
State New
Headers show

Commit Message

Maksym Veremeyenko Sept. 5, 2017, 7:09 a.m. UTC
04.09.2017 17:10, Maksym Veremeyenko пише:
> Hi,
> 
> attached patch implemented threaded NDI output - separate output thread 
> for each stream. it makes audio preview in my case more smooth.

updated patch allows running audio/video threads separately

please review

Comments

Marton Balint Sept. 5, 2017, 8:50 p.m. UTC | #1
On Tue, 5 Sep 2017, Maksym Veremeyenko wrote:

> 04.09.2017 17:10, Maksym Veremeyenko пише:
>> Hi,
>> 
>> attached patch implemented threaded NDI output - separate output thread for 
>> each stream. it makes audio preview in my case more smooth.
>
> updated patch allows running audio/video threads separately

If I get this correctly, this patch is needed because you can only 
schedule 1 frame to the NDI API, therefore especially for shorter audio 
frames the buffer may underrun, right?. If that is the case, then I'd 
describe this in a bit more detail in the docs and/or the commit message.

Also, decklink uses a concept called preroll for a similar purpose, it is 
specified in time, and the video buffer is capable of storing preroll*2 
amount of video. (Also, at the start of the stream the code only 
starts draining the buffer after preroll amount of video is received, 
there comes the name, preroll. This way the buffer won't underrun even at 
the start of the stream).

I just mentioned this because you may want to support a similar concept, 
or specify buffer sizes in time, instead of in frames. But if not, that 
is fine as well.

As for the actual code - I see a lot of code duplications :), maybe you 
can factorize audio_thread and video_thread to use the same function, and 
also the code which creates these threads.

But in general the code looks fine.

Regards,
Marton
Marton Balint Sept. 6, 2017, 8:42 a.m. UTC | #2
On Tue, 5 Sep 2017, Marton Balint wrote:

>
> On Tue, 5 Sep 2017, Maksym Veremeyenko wrote:
>
>> 04.09.2017 17:10, Maksym Veremeyenko пише:
>>> Hi,
>>> 
>>> attached patch implemented threaded NDI output - separate output thread 
> for 
>>> each stream. it makes audio preview in my case more smooth.
>>
>> updated patch allows running audio/video threads separately
>
> If I get this correctly, this patch is needed because you can only 
> schedule 1 frame to the NDI API, therefore especially for shorter audio 
> frames the buffer may underrun, right?. If that is the case, then I'd 
> describe this in a bit more detail in the docs and/or the commit message.

I've given this some more thought, and this only makes sense if NDI is 
throttling the output (e.g via clock_audio or clock_video). If not, then 
using threads should not make any difference, so something different might 
be going on. Do you have an idea what?

Thanks,
Marton
Maksym Veremeyenko Sept. 7, 2017, 3:57 p.m. UTC | #3
05.09.2017 23:50, Marton Balint пише:
[...]
> If I get this correctly, this patch is needed because you can only 
> schedule 1 frame to the NDI API, therefore especially for shorter audio 
> frames the buffer may underrun, right?. If that is the case, then I'd 
> describe this in a bit more detail in the docs and/or the commit message.
> 
this patch was needed to make an audio play smooth. sometimes i notices 
some audio issue with /reference monitoring tool/ - so it is rather 
research purpose to find a proper way.

if i specify 16 packets queue and use two queues i got video/audio 
unsync (all monitoring performed by *Studio Monitor* software).

*perfectly* working was reached by audio queue for two packets 
(previously processed by *asetnsamples* filter) and no-threads for video.

then i say about audio issue i mean that i *hear* by NDI software but 
not a logged output of reference analizer - i have only visual/cosumer 
method for estimating quality of audio/video packets sending...

> Also, decklink uses a concept called preroll for a similar purpose, it 
> is specified in time, and the video buffer is capable of storing 
> preroll*2 amount of video. (Also, at the start of the stream the code 
> only starts draining the buffer after preroll amount of video is 
> received, there comes the name, preroll. This way the buffer won't 
> underrun even at the start of the stream).
decklink has driver's DMAed memory for prerolled frame and decklink 
internally align audio/video samples to make it synchronous... so it 
hard to compare with hardware driven device.

> I just mentioned this because you may want to support a similar concept, 
> or specify buffer sizes in time, instead of in frames. But if not, that 
> is fine as well.
queues is in a packet count units - AVPacket been queued.

> 
> As for the actual code - I see a lot of code duplications :), maybe you 
> can factorize audio_thread and video_thread to use the same function, 
> and also the code which creates these threads.
if it does not decrease code reading quality i can do that easy

> 
> But in general the code looks fine.
thanks
Maksym Veremeyenko Sept. 7, 2017, 4:06 p.m. UTC | #4
06.09.2017 11:42, Marton Balint пише:
[...]
> I've given this some more thought, and this only makes sense if NDI is 
> throttling the output (e.g via clock_audio or clock_video). If not, then 
> using threads should not make any difference, so something different 
> might be going on. Do you have an idea what?

clock_audio or clock_video do exactly what you mean - it prevent sending 
more packets then realtime.

in my case clock_audio=1, clock_video=1, video_queue=2, audio_queue=2 
gives very smooth result. extending queue size make monitoring software 
unsync output...
diff mbox

Patch

diff --git a/configure b/configure
index d582705..7626901 100755
--- a/configure
+++ b/configure
@@ -3019,7 +3019,7 @@  decklink_outdev_deps="decklink threads"
 decklink_outdev_extralibs="-lstdc++"
 libndi_newtek_indev_deps="libndi_newtek"
 libndi_newtek_indev_extralibs="-lndi"
-libndi_newtek_outdev_deps="libndi_newtek"
+libndi_newtek_outdev_deps="libndi_newtek threads"
 libndi_newtek_outdev_extralibs="-lndi"
 dshow_indev_deps="IBaseFilter"
 dshow_indev_extralibs="-lpsapi -lole32 -lstrmiids -luuid -loleaut32 -lshlwapi"
diff --git a/doc/outdevs.texi b/doc/outdevs.texi
index 0012b0f..595864b 100644
--- a/doc/outdevs.texi
+++ b/doc/outdevs.texi
@@ -213,6 +213,14 @@  Defaults to @option{false}.
 These specify whether audio "clock" themselves.
 Defaults to @option{false}.
 
+@item video_queue
+Enable video packets output in separate thread. Specify video packets queue length.
+Defaults to @option{0}.
+
+@item audio_queue
+Enable audio packets output in separate thread. Specify audio packets queue length.
+Defaults to @option{0}.
+
 @end table
 
 @subsection Examples
diff --git a/libavdevice/libndi_newtek_enc.c b/libavdevice/libndi_newtek_enc.c
index 6ca6f41..f8af851 100644
--- a/libavdevice/libndi_newtek_enc.c
+++ b/libavdevice/libndi_newtek_enc.c
@@ -23,9 +23,16 @@ 
 #include "libavformat/internal.h"
 #include "libavutil/opt.h"
 #include "libavutil/imgutils.h"
+#include "libavutil/threadmessage.h"
 
 #include "libndi_newtek_common.h"
 
+#include <pthread.h>
+
+#define THREAD_VIDEO    0
+#define THREAD_AUDIO    1
+#define THREAD_LAST     2
+
 struct NDIContext {
     const AVClass *cclass;
 
@@ -37,12 +44,104 @@  struct NDIContext {
     NDIlib_audio_frame_interleaved_16s_t *audio;
     NDIlib_send_instance_t ndi_send;
     AVFrame *last_avframe;
+
+    /* threaded operations */
+    AVFormatContext *avctx;
+    struct
+    {
+        int length;
+        AVThreadMessageQueue *queue;
+        pthread_t thread;
+    } threads[THREAD_LAST];
 };
 
+static int ndi_write_video_packet(AVFormatContext *avctx, AVStream *st, AVPacket *pkt);
+static int ndi_write_audio_packet(AVFormatContext *avctx, AVStream *st, AVPacket *pkt);
+
+static void* ndi_thread_audio(void* p)
+{
+    int ret;
+    AVPacket pkt;
+    struct NDIContext *ctx = p;
+
+    av_log(ctx->avctx, AV_LOG_DEBUG, "%s: entering\n", __func__);
+
+    while (1) {
+        ret = av_thread_message_queue_recv(ctx->threads[THREAD_AUDIO].queue, &pkt, 0);
+        if (ret < 0) {
+            if (ret != AVERROR_EOF)
+                av_log(ctx->avctx, AV_LOG_ERROR, "Failed av_thread_message_queue_recv of audio queue.\n");
+            break;
+        }
+
+        ret = ndi_write_audio_packet(ctx->avctx, ctx->avctx->streams[pkt.stream_index], &pkt);
+        av_packet_unref(&pkt);
+        if (ret) {
+            av_log(ctx->avctx, AV_LOG_ERROR, "Failed ndi_write_audio_packet.\n");
+            break;
+        }
+    }
+
+    av_log(ctx->avctx, AV_LOG_DEBUG, "%s: exiting, ret=%d\n", __func__, ret);
+
+    return NULL;
+}
+
+static void* ndi_thread_video(void* p)
+{
+    int ret;
+    AVPacket pkt;
+    struct NDIContext *ctx = p;
+
+    av_log(ctx->avctx, AV_LOG_DEBUG, "%s: entering\n", __func__);
+
+    while (1) {
+        ret = av_thread_message_queue_recv(ctx->threads[THREAD_VIDEO].queue, &pkt, 0);
+        if (ret < 0) {
+            if (ret != AVERROR_EOF)
+                av_log(ctx->avctx, AV_LOG_ERROR, "Failed av_thread_message_queue_recv of video queue.\n");
+            break;
+        }
+
+        ret = ndi_write_video_packet(ctx->avctx, ctx->avctx->streams[pkt.stream_index], &pkt);
+        av_packet_unref(&pkt);
+        if (ret) {
+            av_log(ctx->avctx, AV_LOG_ERROR, "Failed ndi_write_video_packet.\n");
+            break;
+        }
+    }
+
+    av_log(ctx->avctx, AV_LOG_DEBUG, "%s: exiting, ret=%d\n", __func__, ret);
+
+    return NULL;
+}
+
 static int ndi_write_trailer(AVFormatContext *avctx)
 {
+    int i;
     struct NDIContext *ctx = avctx->priv_data;
 
+    for (i = 0; i < THREAD_LAST; i++)
+    {
+        AVPacket pkt;
+
+        if (!ctx->threads[i].queue)
+            continue;
+
+        av_log(ctx->avctx, AV_LOG_DEBUG, "%s: freeing queue %d\n", __func__, i);
+
+        av_thread_message_queue_set_err_recv(ctx->threads[i].queue, AVERROR_EOF);
+
+        pthread_join(ctx->threads[i].thread, NULL);
+
+        while (av_thread_message_queue_recv(ctx->threads[i].queue, &pkt, 0) >= 0) {
+            av_log(ctx->avctx, AV_LOG_DEBUG, "%s: freeing packet queue %d\n", __func__, i);
+            av_packet_unref(&pkt);
+        }
+
+        av_thread_message_queue_free(&ctx->threads[i].queue);
+    }
+
     if (ctx->ndi_send) {
         NDIlib_send_destroy(ctx->ndi_send);
         av_frame_free(&ctx->last_avframe);
@@ -119,18 +218,47 @@  static int ndi_write_audio_packet(AVFormatContext *avctx, AVStream *st, AVPacket
 
 static int ndi_write_packet(AVFormatContext *avctx, AVPacket *pkt)
 {
+    struct NDIContext *ctx = avctx->priv_data;
     AVStream *st = avctx->streams[pkt->stream_index];
+    AVThreadMessageQueue *queue;
 
     if      (st->codecpar->codec_type == AVMEDIA_TYPE_VIDEO)
-        return ndi_write_video_packet(avctx, st, pkt);
+        queue = ctx->threads[THREAD_VIDEO].queue;
     else if (st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO)
-        return ndi_write_audio_packet(avctx, st, pkt);
+        queue = ctx->threads[THREAD_AUDIO].queue;
+
+    if (queue) {
+
+        int ret;
+        AVPacket enq;
+
+        av_init_packet(&enq);
+
+        ret = av_packet_ref(&enq, pkt);
+        if (ret)
+            return ret;
+
+        ret = av_thread_message_queue_send(queue, &enq, 0);
+        if (ret) {
+            av_packet_unref(&enq);
+            return ret;
+        }
+
+        return 0;
+
+    } else {
+        if      (st->codecpar->codec_type == AVMEDIA_TYPE_VIDEO)
+            return ndi_write_video_packet(avctx, st, pkt);
+        else if (st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO)
+            return ndi_write_audio_packet(avctx, st, pkt);
+    }
 
     return AVERROR_BUG;
 }
 
 static int ndi_setup_audio(AVFormatContext *avctx, AVStream *st)
 {
+    int ret;
     struct NDIContext *ctx = avctx->priv_data;
     AVCodecParameters *c = st->codecpar;
 
@@ -149,11 +277,30 @@  static int ndi_setup_audio(AVFormatContext *avctx, AVStream *st)
 
     avpriv_set_pts_info(st, 64, 1, NDI_TIME_BASE);
 
+    if (ctx->threads[THREAD_AUDIO].length) {
+        ret = av_thread_message_queue_alloc(&ctx->threads[THREAD_AUDIO].queue,
+            ctx->threads[THREAD_AUDIO].length, sizeof(AVPacket));
+        if (ret) {
+            av_log(avctx, AV_LOG_ERROR, "Failed to av_thread_message_queue_alloc!\n");
+            return ret;
+        }
+
+        ret = pthread_create(&ctx->threads[THREAD_AUDIO].thread, NULL, ndi_thread_audio, ctx);
+        if (ret) {
+            av_log(NULL, AV_LOG_ERROR, "Failed to pthread_create: %s\n", strerror(ret));
+            av_thread_message_queue_free(&ctx->threads[THREAD_AUDIO].queue);
+            return AVERROR(ret);
+        }
+
+        ctx->avctx = avctx;
+    }
+
     return 0;
 }
 
 static int ndi_setup_video(AVFormatContext *avctx, AVStream *st)
 {
+    int ret;
     struct NDIContext *ctx = avctx->priv_data;
     AVCodecParameters *c = st->codecpar;
 
@@ -225,6 +372,24 @@  static int ndi_setup_video(AVFormatContext *avctx, AVStream *st)
 
     avpriv_set_pts_info(st, 64, 1, NDI_TIME_BASE);
 
+    if (ctx->threads[THREAD_VIDEO].length) {
+        ret = av_thread_message_queue_alloc(&ctx->threads[THREAD_VIDEO].queue,
+            ctx->threads[THREAD_VIDEO].length, sizeof(AVPacket));
+        if (ret) {
+            av_log(avctx, AV_LOG_ERROR, "Failed to av_thread_message_queue_alloc!\n");
+            return ret;
+        }
+
+        ret = pthread_create(&ctx->threads[THREAD_VIDEO].thread, NULL, ndi_thread_video, ctx);
+        if (ret) {
+            av_log(NULL, AV_LOG_ERROR, "Failed to pthread_create: %s\n", strerror(ret));
+            av_thread_message_queue_free(&ctx->threads[THREAD_VIDEO].queue);
+            return AVERROR(ret);
+        }
+
+        ctx->avctx = avctx;
+    }
+
     return 0;
 }
 
@@ -273,6 +438,8 @@  static const AVOption options[] = {
     { "reference_level", "The audio reference level in dB"  , OFFSET(reference_level), AV_OPT_TYPE_INT, { .i64 = 0 }, -20, 20, AV_OPT_FLAG_ENCODING_PARAM | AV_OPT_FLAG_AUDIO_PARAM},
     { "clock_video", "These specify whether video 'clock' themselves"  , OFFSET(clock_video), AV_OPT_TYPE_BOOL, { .i64 = 0 }, 0, 1, AV_OPT_FLAG_ENCODING_PARAM | AV_OPT_FLAG_VIDEO_PARAM },
     { "clock_audio", "These specify whether audio 'clock' themselves"  , OFFSET(clock_audio), AV_OPT_TYPE_BOOL, { .i64 = 0 }, 0, 1, AV_OPT_FLAG_ENCODING_PARAM | AV_OPT_FLAG_AUDIO_PARAM },
+    { "video_queue", "Video queue length", OFFSET(threads[THREAD_VIDEO].length), AV_OPT_TYPE_INT, { .i64 = 0 }, 1, 128, AV_OPT_FLAG_ENCODING_PARAM | AV_OPT_FLAG_VIDEO_PARAM},
+    { "audio_queue", "Audio queue length", OFFSET(threads[THREAD_AUDIO].length), AV_OPT_TYPE_INT, { .i64 = 0 }, 1, 128, AV_OPT_FLAG_ENCODING_PARAM | AV_OPT_FLAG_AUDIO_PARAM},
     { NULL },
 };