@@ -3019,7 +3019,7 @@ decklink_outdev_deps="decklink threads"
decklink_outdev_extralibs="-lstdc++"
libndi_newtek_indev_deps="libndi_newtek"
libndi_newtek_indev_extralibs="-lndi"
-libndi_newtek_outdev_deps="libndi_newtek"
+libndi_newtek_outdev_deps="libndi_newtek threads"
libndi_newtek_outdev_extralibs="-lndi"
dshow_indev_deps="IBaseFilter"
dshow_indev_extralibs="-lpsapi -lole32 -lstrmiids -luuid -loleaut32 -lshlwapi"
@@ -213,6 +213,14 @@ Defaults to @option{false}.
These specify whether audio "clock" themselves.
Defaults to @option{false}.
+@item video_queue
+Enable video packets output in separate thread. Specify video packets queue length.
+Defaults to @option{0}.
+
+@item audio_queue
+Enable audio packets output in separate thread. Specify audio packets queue length.
+Defaults to @option{0}.
+
@end table
@subsection Examples
@@ -23,9 +23,16 @@
#include "libavformat/internal.h"
#include "libavutil/opt.h"
#include "libavutil/imgutils.h"
+#include "libavutil/threadmessage.h"
#include "libndi_newtek_common.h"
+#include <pthread.h>
+
+#define THREAD_VIDEO 0
+#define THREAD_AUDIO 1
+#define THREAD_LAST 2
+
struct NDIContext {
const AVClass *cclass;
@@ -37,12 +44,104 @@ struct NDIContext {
NDIlib_audio_frame_interleaved_16s_t *audio;
NDIlib_send_instance_t ndi_send;
AVFrame *last_avframe;
+
+ /* threaded operations */
+ AVFormatContext *avctx;
+ struct
+ {
+ int length;
+ AVThreadMessageQueue *queue;
+ pthread_t thread;
+ } threads[THREAD_LAST];
};
+static int ndi_write_video_packet(AVFormatContext *avctx, AVStream *st, AVPacket *pkt);
+static int ndi_write_audio_packet(AVFormatContext *avctx, AVStream *st, AVPacket *pkt);
+
+static void* ndi_thread_audio(void* p)
+{
+ int ret;
+ AVPacket pkt;
+ struct NDIContext *ctx = p;
+
+ av_log(ctx->avctx, AV_LOG_DEBUG, "%s: entering\n", __func__);
+
+ while (1) {
+ ret = av_thread_message_queue_recv(ctx->threads[THREAD_AUDIO].queue, &pkt, 0);
+ if (ret < 0) {
+ if (ret != AVERROR_EOF)
+ av_log(ctx->avctx, AV_LOG_ERROR, "Failed av_thread_message_queue_recv of audio queue.\n");
+ break;
+ }
+
+ ret = ndi_write_audio_packet(ctx->avctx, ctx->avctx->streams[pkt.stream_index], &pkt);
+ av_packet_unref(&pkt);
+ if (ret) {
+ av_log(ctx->avctx, AV_LOG_ERROR, "Failed ndi_write_audio_packet.\n");
+ break;
+ }
+ }
+
+ av_log(ctx->avctx, AV_LOG_DEBUG, "%s: exiting, ret=%d\n", __func__, ret);
+
+ return NULL;
+}
+
+static void* ndi_thread_video(void* p)
+{
+ int ret;
+ AVPacket pkt;
+ struct NDIContext *ctx = p;
+
+ av_log(ctx->avctx, AV_LOG_DEBUG, "%s: entering\n", __func__);
+
+ while (1) {
+ ret = av_thread_message_queue_recv(ctx->threads[THREAD_VIDEO].queue, &pkt, 0);
+ if (ret < 0) {
+ if (ret != AVERROR_EOF)
+ av_log(ctx->avctx, AV_LOG_ERROR, "Failed av_thread_message_queue_recv of video queue.\n");
+ break;
+ }
+
+ ret = ndi_write_video_packet(ctx->avctx, ctx->avctx->streams[pkt.stream_index], &pkt);
+ av_packet_unref(&pkt);
+ if (ret) {
+ av_log(ctx->avctx, AV_LOG_ERROR, "Failed ndi_write_video_packet.\n");
+ break;
+ }
+ }
+
+ av_log(ctx->avctx, AV_LOG_DEBUG, "%s: exiting, ret=%d\n", __func__, ret);
+
+ return NULL;
+}
+
static int ndi_write_trailer(AVFormatContext *avctx)
{
+ int i;
struct NDIContext *ctx = avctx->priv_data;
+ for (i = 0; i < THREAD_LAST; i++)
+ {
+ AVPacket pkt;
+
+ if (!ctx->threads[i].queue)
+ continue;
+
+ av_log(ctx->avctx, AV_LOG_DEBUG, "%s: freeing queue %d\n", __func__, i);
+
+ av_thread_message_queue_set_err_recv(ctx->threads[i].queue, AVERROR_EOF);
+
+ pthread_join(ctx->threads[i].thread, NULL);
+
+ while (av_thread_message_queue_recv(ctx->threads[i].queue, &pkt, 0) >= 0) {
+ av_log(ctx->avctx, AV_LOG_DEBUG, "%s: freeing packet queue %d\n", __func__, i);
+ av_packet_unref(&pkt);
+ }
+
+ av_thread_message_queue_free(&ctx->threads[i].queue);
+ }
+
if (ctx->ndi_send) {
NDIlib_send_destroy(ctx->ndi_send);
av_frame_free(&ctx->last_avframe);
@@ -119,18 +218,47 @@ static int ndi_write_audio_packet(AVFormatContext *avctx, AVStream *st, AVPacket
static int ndi_write_packet(AVFormatContext *avctx, AVPacket *pkt)
{
+ struct NDIContext *ctx = avctx->priv_data;
AVStream *st = avctx->streams[pkt->stream_index];
+ AVThreadMessageQueue *queue;
if (st->codecpar->codec_type == AVMEDIA_TYPE_VIDEO)
- return ndi_write_video_packet(avctx, st, pkt);
+ queue = ctx->threads[THREAD_VIDEO].queue;
else if (st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO)
- return ndi_write_audio_packet(avctx, st, pkt);
+ queue = ctx->threads[THREAD_AUDIO].queue;
+
+ if (queue) {
+
+ int ret;
+ AVPacket enq;
+
+ av_init_packet(&enq);
+
+ ret = av_packet_ref(&enq, pkt);
+ if (ret)
+ return ret;
+
+ ret = av_thread_message_queue_send(queue, &enq, 0);
+ if (ret) {
+ av_packet_unref(&enq);
+ return ret;
+ }
+
+ return 0;
+
+ } else {
+ if (st->codecpar->codec_type == AVMEDIA_TYPE_VIDEO)
+ return ndi_write_video_packet(avctx, st, pkt);
+ else if (st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO)
+ return ndi_write_audio_packet(avctx, st, pkt);
+ }
return AVERROR_BUG;
}
static int ndi_setup_audio(AVFormatContext *avctx, AVStream *st)
{
+ int ret;
struct NDIContext *ctx = avctx->priv_data;
AVCodecParameters *c = st->codecpar;
@@ -149,11 +277,30 @@ static int ndi_setup_audio(AVFormatContext *avctx, AVStream *st)
avpriv_set_pts_info(st, 64, 1, NDI_TIME_BASE);
+ if (ctx->threads[THREAD_AUDIO].length) {
+ ret = av_thread_message_queue_alloc(&ctx->threads[THREAD_AUDIO].queue,
+ ctx->threads[THREAD_AUDIO].length, sizeof(AVPacket));
+ if (ret) {
+ av_log(avctx, AV_LOG_ERROR, "Failed to av_thread_message_queue_alloc!\n");
+ return ret;
+ }
+
+ ret = pthread_create(&ctx->threads[THREAD_AUDIO].thread, NULL, ndi_thread_audio, ctx);
+ if (ret) {
+ av_log(NULL, AV_LOG_ERROR, "Failed to pthread_create: %s\n", strerror(ret));
+ av_thread_message_queue_free(&ctx->threads[THREAD_AUDIO].queue);
+ return AVERROR(ret);
+ }
+
+ ctx->avctx = avctx;
+ }
+
return 0;
}
static int ndi_setup_video(AVFormatContext *avctx, AVStream *st)
{
+ int ret;
struct NDIContext *ctx = avctx->priv_data;
AVCodecParameters *c = st->codecpar;
@@ -225,6 +372,24 @@ static int ndi_setup_video(AVFormatContext *avctx, AVStream *st)
avpriv_set_pts_info(st, 64, 1, NDI_TIME_BASE);
+ if (ctx->threads[THREAD_VIDEO].length) {
+ ret = av_thread_message_queue_alloc(&ctx->threads[THREAD_VIDEO].queue,
+ ctx->threads[THREAD_VIDEO].length, sizeof(AVPacket));
+ if (ret) {
+ av_log(avctx, AV_LOG_ERROR, "Failed to av_thread_message_queue_alloc!\n");
+ return ret;
+ }
+
+ ret = pthread_create(&ctx->threads[THREAD_VIDEO].thread, NULL, ndi_thread_video, ctx);
+ if (ret) {
+ av_log(NULL, AV_LOG_ERROR, "Failed to pthread_create: %s\n", strerror(ret));
+ av_thread_message_queue_free(&ctx->threads[THREAD_VIDEO].queue);
+ return AVERROR(ret);
+ }
+
+ ctx->avctx = avctx;
+ }
+
return 0;
}
@@ -273,6 +438,8 @@ static const AVOption options[] = {
{ "reference_level", "The audio reference level in dB" , OFFSET(reference_level), AV_OPT_TYPE_INT, { .i64 = 0 }, -20, 20, AV_OPT_FLAG_ENCODING_PARAM | AV_OPT_FLAG_AUDIO_PARAM},
{ "clock_video", "These specify whether video 'clock' themselves" , OFFSET(clock_video), AV_OPT_TYPE_BOOL, { .i64 = 0 }, 0, 1, AV_OPT_FLAG_ENCODING_PARAM | AV_OPT_FLAG_VIDEO_PARAM },
{ "clock_audio", "These specify whether audio 'clock' themselves" , OFFSET(clock_audio), AV_OPT_TYPE_BOOL, { .i64 = 0 }, 0, 1, AV_OPT_FLAG_ENCODING_PARAM | AV_OPT_FLAG_AUDIO_PARAM },
+ { "video_queue", "Video queue length", OFFSET(threads[THREAD_VIDEO].length), AV_OPT_TYPE_INT, { .i64 = 0 }, 1, 128, AV_OPT_FLAG_ENCODING_PARAM | AV_OPT_FLAG_VIDEO_PARAM},
+ { "audio_queue", "Audio queue length", OFFSET(threads[THREAD_AUDIO].length), AV_OPT_TYPE_INT, { .i64 = 0 }, 1, 128, AV_OPT_FLAG_ENCODING_PARAM | AV_OPT_FLAG_AUDIO_PARAM},
{ NULL },
};