@@ -14,6 +14,7 @@ OBJS-ffmpeg += \
fftools/ffmpeg_hw.o \
fftools/ffmpeg_mux.o \
fftools/ffmpeg_opt.o \
+ fftools/sync_queue.o \
define DOFFTOOL
OBJS-$(1) += fftools/cmdutils.o fftools/opt_common.o fftools/$(1).o $(OBJS-$(1)-yes)
@@ -104,6 +104,7 @@
#include "ffmpeg.h"
#include "cmdutils.h"
+#include "sync_queue.h"
#include "libavutil/avassert.h"
@@ -570,6 +571,7 @@ static void ffmpeg_cleanup(int ret)
av_bsf_free(&ost->bsf_ctx);
av_frame_free(&ost->filtered_frame);
+ av_frame_free(&ost->sq_frame);
av_frame_free(&ost->last_frame);
av_packet_free(&ost->pkt);
av_dict_free(&ost->encoder_opts);
@@ -690,13 +692,10 @@ static void update_benchmark(const char *fmt, ...)
static void close_output_stream(OutputStream *ost)
{
OutputFile *of = output_files[ost->file_index];
- AVRational time_base = ost->stream_copy ? ost->mux_timebase : ost->enc_ctx->time_base;
-
ost->finished |= ENCODER_FINISHED;
- if (of->shortest) {
- int64_t end = av_rescale_q(ost->sync_opts - ost->first_pts, time_base, AV_TIME_BASE_Q);
- of->recording_time = FFMIN(of->recording_time, end);
- }
+
+ if (ost->sq_idx_encode >= 0)
+ sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL));
}
/*
@@ -715,17 +714,22 @@ static void output_packet(OutputFile *of, AVPacket *pkt,
{
int ret = 0;
+ if (!eof && pkt->dts != AV_NOPTS_VALUE)
+ ost->last_mux_dts = av_rescale_q(pkt->dts, ost->mux_timebase, AV_TIME_BASE_Q);
+
/* apply the output bitstream filters */
if (ost->bsf_ctx) {
ret = av_bsf_send_packet(ost->bsf_ctx, eof ? NULL : pkt);
if (ret < 0)
goto finish;
while ((ret = av_bsf_receive_packet(ost->bsf_ctx, pkt)) >= 0)
- of_submit_packet(of, pkt, ost);
+ of_submit_packet(of, pkt, ost, 0);
+ if (ret == AVERROR_EOF)
+ of_submit_packet(of, pkt, ost, 1);
if (ret == AVERROR(EAGAIN))
ret = 0;
- } else if (!eof)
- of_submit_packet(of, pkt, ost);
+ } else
+ of_submit_packet(of, pkt, ost, eof);
finish:
if (ret < 0 && ret != AVERROR_EOF) {
@@ -895,6 +899,52 @@ static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame)
av_assert0(0);
}
+static int submit_encode_frame(OutputFile *of, OutputStream *ost,
+ AVFrame *frame)
+{
+ int ret;
+
+ if (ost->sq_idx_encode < 0)
+ return encode_frame(of, ost, frame);
+
+ if (frame) {
+ ret = av_frame_ref(ost->sq_frame, frame);
+ if (ret < 0)
+ return ret;
+ frame = ost->sq_frame;
+ }
+
+ ret = sq_send(of->sq_encode, ost->sq_idx_encode,
+ SQFRAME(frame));
+ if (ret < 0) {
+ if (frame)
+ av_frame_unref(frame);
+ if (ret != AVERROR_EOF)
+ return ret;
+ }
+
+ while (1) {
+ AVFrame *enc_frame = ost->sq_frame;
+
+ ret = sq_receive(of->sq_encode, ost->sq_idx_encode,
+ SQFRAME(enc_frame));
+ if (ret == AVERROR_EOF) {
+ enc_frame = NULL;
+ } else if (ret < 0) {
+ return (ret == AVERROR(EAGAIN)) ? 0 : ret;
+ }
+
+ ret = encode_frame(of, ost, enc_frame);
+ if (enc_frame)
+ av_frame_unref(enc_frame);
+ if (ret < 0) {
+ if (ret == AVERROR_EOF)
+ close_output_stream(ost);
+ return ret;
+ }
+ }
+}
+
static void do_audio_out(OutputFile *of, OutputStream *ost,
AVFrame *frame)
{
@@ -910,8 +960,8 @@ static void do_audio_out(OutputFile *of, OutputStream *ost,
ost->sync_opts = frame->pts + frame->nb_samples;
ost->samples_encoded += frame->nb_samples;
- ret = encode_frame(of, ost, frame);
- if (ret < 0)
+ ret = submit_encode_frame(of, ost, frame);
+ if (ret < 0 && ret != AVERROR_EOF)
exit_program(1);
}
@@ -1197,8 +1247,8 @@ static void do_video_out(OutputFile *of,
av_log(NULL, AV_LOG_DEBUG, "Forced keyframe at time %f\n", pts_time);
}
- ret = encode_frame(of, ost, in_picture);
- if (ret < 0)
+ ret = submit_encode_frame(of, ost, in_picture);
+ if (ret < 0 && ret != AVERROR_EOF)
exit_program(1);
// Make sure Closed Captions will not be duplicated
@@ -1216,14 +1266,12 @@ static void do_video_out(OutputFile *of,
static void finish_output_stream(OutputStream *ost)
{
OutputFile *of = output_files[ost->file_index];
- AVRational time_base = ost->stream_copy ? ost->mux_timebase : ost->enc_ctx->time_base;
-
- ost->finished = ENCODER_FINISHED | MUXER_FINISHED;
+ ost->finished = ENCODER_FINISHED;
- if (of->shortest) {
- int64_t end = av_rescale_q(ost->sync_opts - ost->first_pts, time_base, AV_TIME_BASE_Q);
- of->recording_time = FFMIN(of->recording_time, end);
- }
+ if (ost->sq_idx_mux >= 0)
+ sq_send(of->sq_mux, ost->sq_idx_mux, SQPKT(NULL));
+ else
+ ost->finished |= MUXER_FINISHED;
}
/**
@@ -1281,6 +1329,12 @@ static int reap_filters(int flush)
continue;
}
+ if (filtered_frame->pts != AV_NOPTS_VALUE) {
+ AVRational tb = av_buffersink_get_time_base(filter);
+ ost->last_filter_pts = av_rescale_q(filtered_frame->pts, tb,
+ AV_TIME_BASE_Q);
+ }
+
switch (av_buffersink_get_type(filter)) {
case AVMEDIA_TYPE_VIDEO:
if (!ost->frame_aspect_ratio.num)
@@ -1696,7 +1750,7 @@ static void flush_encoders(void)
if (enc->codec_type != AVMEDIA_TYPE_VIDEO && enc->codec_type != AVMEDIA_TYPE_AUDIO)
continue;
- ret = encode_frame(of, ost, NULL);
+ ret = submit_encode_frame(of, ost, NULL);
if (ret != AVERROR_EOF)
exit_program(1);
}
@@ -3006,6 +3060,9 @@ static int init_output_stream_encode(OutputStream *ost, AVFrame *frame)
break;
}
+ if (ost->sq_idx_encode >= 0)
+ sq_set_tb(of->sq_encode, ost->sq_idx_encode, enc_ctx->time_base);
+
ost->mux_timebase = enc_ctx->time_base;
return 0;
@@ -3014,6 +3071,7 @@ static int init_output_stream_encode(OutputStream *ost, AVFrame *frame)
static int init_output_stream(OutputStream *ost, AVFrame *frame,
char *error, int error_len)
{
+ OutputFile *of = output_files[ost->file_index];
int ret = 0;
if (ost->encoding_needed) {
@@ -3146,6 +3204,9 @@ static int init_output_stream(OutputStream *ost, AVFrame *frame,
if (ret < 0)
return ret;
+ if (ost->sq_idx_mux >= 0)
+ sq_set_tb(of->sq_mux, ost->sq_idx_mux, ost->mux_timebase);
+
ost->initialized = 1;
ret = of_check_init(output_files[ost->file_index]);
@@ -3377,13 +3438,19 @@ static OutputStream *choose_output(void)
for (i = 0; i < nb_output_streams; i++) {
OutputStream *ost = output_streams[i];
- int64_t opts = ost->last_mux_dts == AV_NOPTS_VALUE ? INT64_MIN :
- av_rescale_q(ost->last_mux_dts, ost->st->time_base,
- AV_TIME_BASE_Q);
- if (ost->last_mux_dts == AV_NOPTS_VALUE)
- av_log(NULL, AV_LOG_DEBUG,
- "cur_dts is invalid st:%d (%d) [init:%d i_done:%d finish:%d] (this is harmless if it occurs once at the start per stream)\n",
- ost->st->index, ost->st->id, ost->initialized, ost->inputs_done, ost->finished);
+ int64_t opts;
+
+ if (ost->filter) {
+ opts = ost->last_filter_pts == AV_NOPTS_VALUE ?
+ INT64_MIN : ost->last_filter_pts;
+ } else {
+ opts = ost->last_mux_dts == AV_NOPTS_VALUE ?
+ INT64_MIN : ost->last_mux_dts;
+ if (ost->last_mux_dts == AV_NOPTS_VALUE)
+ av_log(NULL, AV_LOG_DEBUG,
+ "cur_dts is invalid st:%d (%d) [init:%d i_done:%d finish:%d] (this is harmless if it occurs once at the start per stream)\n",
+ ost->st->index, ost->st->id, ost->initialized, ost->inputs_done, ost->finished);
+ }
if (!ost->initialized && !ost->inputs_done)
return ost->unavailable ? NULL : ost;
@@ -4205,6 +4272,26 @@ static int transcode_step(void)
return reap_filters(0);
}
+static void flush_sync_queues_mux(void)
+{
+ /* mark all queue inputs as done */
+ for (int i = 0; i < nb_output_streams; i++) {
+ OutputStream *ost = output_streams[i];
+ OutputFile *of = output_files[ost->file_index];
+ if (ost->sq_idx_mux >= 0)
+ sq_send(of->sq_mux, ost->sq_idx_mux, SQPKT(NULL));
+ }
+
+ /* encode all packets remaining in the sync queues */
+ for (int i = 0; i < nb_output_streams; i++) {
+ OutputStream *ost = output_streams[i];
+ OutputFile *of = output_files[ost->file_index];
+
+ if (!(ost->finished & MUXER_FINISHED))
+ output_packet(of, ost->pkt, ost, 1);
+ }
+}
+
/*
* The following code is the main loop of the file converter
*/
@@ -4266,6 +4353,7 @@ static int transcode(void)
}
}
flush_encoders();
+ flush_sync_queues_mux();
term_exit();
@@ -26,6 +26,7 @@
#include <signal.h>
#include "cmdutils.h"
+#include "sync_queue.h"
#include "libavformat/avformat.h"
#include "libavformat/avio.h"
@@ -464,8 +465,10 @@ typedef struct OutputStream {
/* pts of the first frame encoded for this stream, used for limiting
* recording time */
int64_t first_pts;
- /* dts of the last packet sent to the muxer */
+ /* dts of the last packet sent to the muxing queue, in AV_TIME_BASE_Q */
int64_t last_mux_dts;
+ /* pts of the last frame received from the filters, in AV_TIME_BASE_Q */
+ int64_t last_filter_pts;
// the timebase of the packets sent to the muxer
AVRational mux_timebase;
AVRational enc_timebase;
@@ -478,6 +481,7 @@ typedef struct OutputStream {
int64_t max_frames;
AVFrame *filtered_frame;
AVFrame *last_frame;
+ AVFrame *sq_frame;
AVPacket *pkt;
int last_dropped;
int last_nb0_frames[3];
@@ -566,6 +570,9 @@ typedef struct OutputStream {
/* frame encode sum of squared error values */
int64_t error[4];
+
+ int sq_idx_encode;
+ int sq_idx_mux;
} OutputStream;
typedef struct Muxer Muxer;
@@ -576,6 +583,9 @@ typedef struct OutputFile {
Muxer *mux;
const AVOutputFormat *format;
+ SyncQueue *sq_encode;
+ SyncQueue *sq_mux;
+
AVFormatContext *ctx;
int ost_index; /* index of the first stream in output_streams */
int64_t recording_time; ///< desired length of the resulting file in microseconds == AV_TIME_BASE units
@@ -691,7 +701,7 @@ int of_check_init(OutputFile *of);
int of_write_trailer(OutputFile *of);
void of_close(OutputFile **pof);
-void of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost);
+void of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost, int eof);
int of_finished(OutputFile *of);
int64_t of_filesize(OutputFile *of);
AVChapter * const *
@@ -20,6 +20,7 @@
#include <string.h>
#include "ffmpeg.h"
+#include "sync_queue.h"
#include "libavutil/fifo.h"
#include "libavutil/intreadwrite.h"
@@ -41,6 +42,10 @@ typedef struct MuxStream {
* Updated when a packet is either pushed or pulled from the queue.
*/
size_t muxing_queue_data_size;
+
+ /* dts of the last packet sent to the muxer, in the stream timebase
+ * used for making up missing dts values */
+ int64_t last_mux_dts;
} MuxStream;
struct Muxer {
@@ -161,6 +166,7 @@ static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
static void write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
{
+ MuxStream *ms = &of->mux->streams[ost->index];
AVFormatContext *s = of->ctx;
AVStream *st = ost->st;
int ret;
@@ -188,21 +194,21 @@ static void write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
pkt->dts, pkt->pts,
ost->file_index, ost->st->index);
pkt->pts =
- pkt->dts = pkt->pts + pkt->dts + ost->last_mux_dts + 1
- - FFMIN3(pkt->pts, pkt->dts, ost->last_mux_dts + 1)
- - FFMAX3(pkt->pts, pkt->dts, ost->last_mux_dts + 1);
+ pkt->dts = pkt->pts + pkt->dts + ms->last_mux_dts + 1
+ - FFMIN3(pkt->pts, pkt->dts, ms->last_mux_dts + 1)
+ - FFMAX3(pkt->pts, pkt->dts, ms->last_mux_dts + 1);
}
if ((st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO || st->codecpar->codec_type == AVMEDIA_TYPE_VIDEO || st->codecpar->codec_type == AVMEDIA_TYPE_SUBTITLE) &&
pkt->dts != AV_NOPTS_VALUE &&
- ost->last_mux_dts != AV_NOPTS_VALUE) {
- int64_t max = ost->last_mux_dts + !(s->oformat->flags & AVFMT_TS_NONSTRICT);
+ ms->last_mux_dts != AV_NOPTS_VALUE) {
+ int64_t max = ms->last_mux_dts + !(s->oformat->flags & AVFMT_TS_NONSTRICT);
if (pkt->dts < max) {
int loglevel = max - pkt->dts > 2 || st->codecpar->codec_type == AVMEDIA_TYPE_VIDEO ? AV_LOG_WARNING : AV_LOG_DEBUG;
if (exit_on_error)
loglevel = AV_LOG_ERROR;
av_log(s, loglevel, "Non-monotonous DTS in output stream "
"%d:%d; previous: %"PRId64", current: %"PRId64"; ",
- ost->file_index, ost->st->index, ost->last_mux_dts, pkt->dts);
+ ost->file_index, ost->st->index, ms->last_mux_dts, pkt->dts);
if (exit_on_error) {
av_log(NULL, AV_LOG_FATAL, "aborting.\n");
exit_program(1);
@@ -216,7 +222,7 @@ static void write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
}
}
}
- ost->last_mux_dts = pkt->dts;
+ ms->last_mux_dts = pkt->dts;
ost->data_size += pkt->size;
ost->packets_written++;
@@ -245,26 +251,10 @@ static void write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
}
}
-void of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost)
+static void submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost)
{
- AVStream *st = ost->st;
int ret;
- /*
- * Audio encoders may split the packets -- #frames in != #packets out.
- * But there is no reordering, so we can limit the number of output packets
- * by simply dropping them here.
- * Counting encoded video frames needs to be done separately because of
- * reordering, see do_video_out().
- */
- if (!(st->codecpar->codec_type == AVMEDIA_TYPE_VIDEO && ost->encoding_needed)) {
- if (ost->frame_number >= ost->max_frames) {
- av_packet_unref(pkt);
- return;
- }
- ost->frame_number++;
- }
-
if (of->mux->header_written) {
write_packet(of, ost, pkt);
} else {
@@ -277,6 +267,52 @@ void of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost)
}
}
+void of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost, int eof)
+{
+ AVStream *st = ost->st;
+
+ if (!eof) {
+ /*
+ * Audio encoders may split the packets -- #frames in != #packets out.
+ * But there is no reordering, so we can limit the number of output packets
+ * by simply dropping them here.
+ * Counting encoded video frames needs to be done separately because of
+ * reordering, see do_video_out().
+ */
+ if (!(st->codecpar->codec_type == AVMEDIA_TYPE_VIDEO && ost->encoding_needed)) {
+ if (ost->frame_number >= ost->max_frames) {
+ av_packet_unref(pkt);
+ return;
+ }
+ ost->frame_number++;
+ }
+ }
+
+ if (ost->sq_idx_mux >= 0) {
+ int ret = sq_send(of->sq_mux, ost->sq_idx_mux,
+ SQPKT(eof ? NULL: pkt));
+ if (ret < 0) {
+ av_packet_unref(pkt);
+ if (ret == AVERROR_EOF) {
+ ost->finished |= MUXER_FINISHED;
+ return;
+ } else
+ exit_program(1);
+ }
+
+ while (1) {
+ ret = sq_receive(of->sq_mux, -1, SQPKT(pkt));
+ if (ret == AVERROR_EOF || ret == AVERROR(EAGAIN))
+ return;
+ else if (ret < 0)
+ exit_program(1);
+
+ submit_packet(of, pkt, output_streams[of->ost_index + ret]);
+ }
+ } else if (!eof)
+ submit_packet(of, pkt, ost);
+}
+
static int print_sdp(void)
{
char sdp[16384];
@@ -447,6 +483,9 @@ void of_close(OutputFile **pof)
if (!of)
return;
+ sq_free(&of->sq_encode);
+ sq_free(&of->sq_mux);
+
s = of->ctx;
mux_free(&of->mux, s ? s->nb_streams : 0);
@@ -481,6 +520,7 @@ int of_muxer_init(OutputFile *of, AVDictionary *opts, int64_t limit_filesize)
ret = AVERROR(ENOMEM);
goto fail;
}
+ ms->last_mux_dts = AV_NOPTS_VALUE;
}
mux->limit_filesize = limit_filesize;
@@ -30,6 +30,7 @@
#include "ffmpeg.h"
#include "cmdutils.h"
#include "opt_common.h"
+#include "sync_queue.h"
#include "libavformat/avformat.h"
@@ -1640,6 +1641,7 @@ static OutputStream *new_output_stream(OptionsContext *o, AVFormatContext *oc, e
input_streams[source_index]->st->discard = input_streams[source_index]->user_set_discard;
}
ost->last_mux_dts = AV_NOPTS_VALUE;
+ ost->last_filter_pts = AV_NOPTS_VALUE;
return ost;
}
@@ -2299,6 +2301,78 @@ static int init_complex_filters(void)
return 0;
}
+static int setup_sync_queues(OutputFile *of, AVFormatContext *oc)
+{
+ int nb_av_enc = 0, nb_interleaved = 0;
+
+#define IS_AV_ENC(ost, type) \
+ (ost->encoding_needed && (type == AVMEDIA_TYPE_VIDEO || type == AVMEDIA_TYPE_AUDIO))
+#define IS_INTERLEAVED(type) (type != AVMEDIA_TYPE_ATTACHMENT)
+
+ for (int i = 0; i < oc->nb_streams; i++) {
+ OutputStream *ost = output_streams[of->ost_index + i];
+ enum AVMediaType type = ost->st->codecpar->codec_type;
+
+ ost->sq_idx_encode = -1;
+ ost->sq_idx_mux = -1;
+
+ nb_interleaved += IS_INTERLEAVED(type);
+ nb_av_enc += IS_AV_ENC(ost, type);
+ }
+
+ if (!(nb_interleaved > 1 && of->shortest))
+ return 0;
+
+ /* if we have more than one encoded audio/video streams, then we
+ * synchronize them before encoding */
+ if (nb_av_enc > 1) {
+ of->sq_encode = sq_alloc(SYNC_QUEUE_FRAMES);
+ if (!of->sq_encode)
+ return AVERROR(ENOMEM);
+
+ for (int i = 0; i < oc->nb_streams; i++) {
+ OutputStream *ost = output_streams[of->ost_index + i];
+ enum AVMediaType type = ost->st->codecpar->codec_type;
+
+ if (!IS_AV_ENC(ost, type))
+ continue;
+
+ ost->sq_idx_encode = sq_add_stream(of->sq_encode);
+ if (ost->sq_idx_encode < 0)
+ return ost->sq_idx_encode;
+
+ ost->sq_frame = av_frame_alloc();
+ if (!ost->sq_frame)
+ return AVERROR(ENOMEM);
+ }
+ }
+
+ /* if there any additional interleaved streams, then ALL the streams
+ * are also synchronized before sending them to the muxer */
+ if (nb_interleaved > nb_av_enc) {
+ of->sq_mux = sq_alloc(SYNC_QUEUE_PACKETS);
+ if (!of->sq_mux)
+ return AVERROR(ENOMEM);
+
+ for (int i = 0; i < oc->nb_streams; i++) {
+ OutputStream *ost = output_streams[of->ost_index + i];
+ enum AVMediaType type = ost->st->codecpar->codec_type;
+
+ if (!IS_INTERLEAVED(type))
+ continue;
+
+ ost->sq_idx_mux = sq_add_stream(of->sq_mux);
+ if (ost->sq_idx_mux < 0)
+ return ost->sq_idx_mux;
+ }
+ }
+
+#undef IS_AV_ENC
+#undef IS_INTERLEAVED
+
+ return 0;
+}
+
static int open_output_file(OptionsContext *o, const char *filename)
{
AVFormatContext *oc;
@@ -2936,6 +3010,12 @@ loop_end:
exit_program(1);
}
+ err = setup_sync_queues(of, oc);
+ if (err < 0) {
+ av_log(NULL, AV_LOG_FATAL, "Error setting up output sync queues\n");
+ exit_program(1);
+ }
+
err = of_muxer_init(of, format_opts, o->limit_filesize);
if (err < 0) {
av_log(NULL, AV_LOG_FATAL, "Error initializing internal muxing state\n");
new file mode 100644
@@ -0,0 +1,346 @@
+/*
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <stdint.h>
+#include <string.h>
+
+#include "libavutil/avassert.h"
+#include "libavutil/error.h"
+#include "libavutil/fifo.h"
+#include "libavutil/mathematics.h"
+#include "libavutil/mem.h"
+
+#include "sync_queue.h"
+
+typedef struct SyncQueueStream {
+ AVFifo *fifo;
+ AVRational tb;
+ int64_t head_ts;
+ int finished;
+} SyncQueueStream;
+
+struct SyncQueue {
+ enum SyncQueueType type;
+
+ SyncQueueStream *streams;
+ unsigned int nb_streams;
+
+ // pool of preallocated frames to avoid constant allocations
+ SyncQueueFrame free_frames[32];
+ unsigned int nb_free_frames;
+
+ /* sync head: the stream with the smallest last timestamp */
+ int head_stream;
+ /* the finished stream with the smallest finish timestamp or -1 */
+ int head_finished_stream;
+ int finished;
+};
+
+static void frame_free(const SyncQueue *sq, SyncQueueFrame frame)
+{
+ if (sq->type == SYNC_QUEUE_PACKETS)
+ av_packet_free(&frame.p);
+ else
+ av_frame_free(&frame.f);
+}
+
+static void frame_clear(const SyncQueue *sq, SyncQueueFrame frame)
+{
+ if (sq->type == SYNC_QUEUE_PACKETS)
+ av_packet_unref(frame.p);
+ else
+ av_frame_unref(frame.f);
+}
+
+static void frame_release(SyncQueue *sq, SyncQueueFrame frame)
+{
+ if (sq->nb_free_frames < FF_ARRAY_ELEMS(sq->free_frames)) {
+ frame_clear(sq, frame);
+ sq->free_frames[sq->nb_free_frames++] = frame;
+ } else
+ frame_free(sq, frame);
+}
+
+static int frame_get(SyncQueue *sq, SyncQueueFrame *frame)
+{
+ if (sq->nb_free_frames) {
+ *frame = sq->free_frames[--sq->nb_free_frames];
+ memset(sq->free_frames + sq->nb_free_frames, 0, sizeof(sq->free_frames[0]));
+ return 0;
+ }
+ if (sq->type == SYNC_QUEUE_PACKETS) {
+ frame->p = av_packet_alloc();
+ return frame->p ? 0 : AVERROR(ENOMEM);
+ }
+ frame->f = av_frame_alloc();
+ return frame->f ? 0 : AVERROR(ENOMEM);
+}
+
+static void frame_move(const SyncQueue *sq, SyncQueueFrame dst,
+ SyncQueueFrame src)
+{
+ if (sq->type == SYNC_QUEUE_PACKETS)
+ av_packet_move_ref(dst.p, src.p);
+ else
+ av_frame_move_ref(dst.f, src.f);
+}
+
+static int64_t frame_ts(const SyncQueue *sq, SyncQueueFrame frame)
+{
+ return (sq->type == SYNC_QUEUE_PACKETS) ?
+ frame.p->pts + frame.p->duration :
+ frame.f->pts + frame.f->pkt_duration;
+}
+
+static int frame_null(const SyncQueue *sq, SyncQueueFrame frame)
+{
+ return (sq->type == SYNC_QUEUE_PACKETS) ? (frame.p == NULL) : (frame.f == NULL);
+}
+
+static void finish_stream(SyncQueue *sq, unsigned int stream_idx)
+{
+ SyncQueueStream *st = &sq->streams[stream_idx];
+
+ st->finished = 1;
+
+ if (st->head_ts != AV_NOPTS_VALUE) {
+ /* check if this stream is the new finished head */
+ if (sq->head_finished_stream < 0 ||
+ av_compare_ts(st->head_ts, st->tb,
+ sq->streams[sq->head_finished_stream].head_ts,
+ sq->streams[sq->head_finished_stream].tb) < 0) {
+ sq->head_finished_stream = stream_idx;
+ }
+
+ /* mark as finished all streams that should no longer receive new frames,
+ * due to them being ahead of some finished stream */
+ st = &sq->streams[sq->head_finished_stream];
+ for (unsigned int i = 0; i < sq->nb_streams; i++) {
+ SyncQueueStream *st1 = &sq->streams[i];
+ if (st != st1 && st1->head_ts != AV_NOPTS_VALUE &&
+ av_compare_ts(st->head_ts, st->tb, st1->head_ts, st1->tb) <= 0)
+ st1->finished = 1;
+ }
+ }
+
+ /* mark the whole queue as finished if all streams are finished */
+ for (unsigned int i = 0; i < sq->nb_streams; i++) {
+ if (!sq->streams[i].finished)
+ return;
+ }
+ sq->finished = 1;
+}
+
+int sq_send(SyncQueue *sq, unsigned int stream_idx, SyncQueueFrame frame)
+{
+ SyncQueueStream *st;
+ SyncQueueFrame dst;
+ int64_t ts;
+ int ret;
+
+ av_assert0(stream_idx < sq->nb_streams);
+ st = &sq->streams[stream_idx];
+
+ if (frame_null(sq, frame)) {
+ finish_stream(sq, stream_idx);
+ return 0;
+ }
+ if (st->finished)
+ return AVERROR_EOF;
+
+ ret = frame_get(sq, &dst);
+ if (ret < 0)
+ return ret;
+
+ frame_move(sq, dst, frame);
+
+ ts = frame_ts(sq, dst);
+
+ ret = av_fifo_write(st->fifo, &dst, 1);
+ if (ret < 0) {
+ frame_move(sq, frame, dst);
+ frame_release(sq, dst);
+ return ret;
+ }
+
+ /* update this stream's head timestamp */
+ if (ts != AV_NOPTS_VALUE &&
+ (st->head_ts == AV_NOPTS_VALUE || st->head_ts < ts)) {
+ st->head_ts = ts;
+
+ /* if this stream is now ahead of some finished stream, then
+ * this stream is also finished */
+ if (sq->head_finished_stream >= 0 &&
+ av_compare_ts(sq->streams[sq->head_finished_stream].head_ts,
+ sq->streams[sq->head_finished_stream].tb,
+ ts, st->tb) <= 0)
+ st->finished = 1;
+
+ /* update the overall head timestamp if it could have changed */
+ if (sq->head_stream < 0 || sq->head_stream == stream_idx) {
+ sq->head_stream = stream_idx;
+ for (unsigned int i = 0; i < sq->nb_streams; i++) {
+ SyncQueueStream *st_head = &sq->streams[sq->head_stream];
+ SyncQueueStream *st_other = &sq->streams[i];
+ if (st_other->head_ts != AV_NOPTS_VALUE &&
+ av_compare_ts(st_other->head_ts, st_other->tb,
+ st_head->head_ts, st_head->tb) < 0)
+ sq->head_stream = i;
+ }
+ }
+ }
+
+
+ return 0;
+}
+
+static int frame_receive(SyncQueue *sq, unsigned int stream_idx,
+ SyncQueueFrame frame)
+{
+ SyncQueueStream *st_head = sq->head_stream >= 0 ?
+ &sq->streams[sq->head_stream] : NULL;
+ SyncQueueStream *st;
+
+ av_assert0(stream_idx < sq->nb_streams);
+ st = &sq->streams[stream_idx];
+
+ if (av_fifo_can_read(st->fifo)) {
+ SyncQueueFrame peek;
+ int64_t ts;
+ int cmp = 0;
+ int overflow = 0;
+
+ av_fifo_peek(st->fifo, &peek, 1, 0);
+ ts = frame_ts(sq, peek);
+
+ /* check if this stream's tail timestamp is before
+ * the overall queue head */
+ if (ts != AV_NOPTS_VALUE && st_head)
+ cmp = av_compare_ts(ts, st->tb, st_head->head_ts, st_head->tb);
+
+ /* also release the tail frame if this stream's head timestamp
+ * is over 10 seconds after the overall queue head */
+ if (st->head_ts != AV_NOPTS_VALUE && st_head) {
+ int64_t head_st = av_rescale_q(st->head_ts, st->tb, AV_TIME_BASE_Q);
+ int64_t head_queue = av_rescale_q(st_head->head_ts, st_head->tb,
+ AV_TIME_BASE_Q);
+ overflow = head_st - head_queue > 10 * AV_TIME_BASE;
+ }
+
+ /* return frames that are before the head;
+ * after all inputs are finished we can also return the head itself */
+ if (ts == AV_NOPTS_VALUE || cmp < 0 || overflow ||
+ (sq->finished && cmp == 0)) {
+ frame_move(sq, frame, peek);
+ frame_release(sq, peek);
+ av_fifo_drain2(st->fifo, 1);
+ return 0;
+ }
+ }
+
+ return sq->finished ? AVERROR_EOF : AVERROR(EAGAIN);
+}
+
+int sq_receive(SyncQueue *sq, int stream_idx, SyncQueueFrame frame)
+{
+ int nb_eof = 0;
+ int ret;
+
+ /* read a frame for a specific stream */
+ if (stream_idx >= 0) {
+ ret = frame_receive(sq, stream_idx, frame);
+ return (ret < 0) ? ret : stream_idx;
+ }
+
+ /* read a frame for any stream with available output */
+ for (unsigned int i = 0; i < sq->nb_streams; i++) {
+ ret = frame_receive(sq, i, frame);
+ if (ret == AVERROR_EOF || ret == AVERROR(EAGAIN)) {
+ nb_eof += ret == AVERROR_EOF;
+ continue;
+ }
+ return (ret < 0) ? ret : i;
+ }
+
+ return (nb_eof == sq->nb_streams) ? AVERROR_EOF : AVERROR(EAGAIN);
+}
+
+int sq_add_stream(SyncQueue *sq)
+{
+ SyncQueueStream *tmp, *st;
+
+ tmp = av_realloc_array(sq->streams, sq->nb_streams + 1, sizeof(*sq->streams));
+ if (!tmp)
+ return AVERROR(ENOMEM);
+ sq->streams = tmp;
+
+ st = &sq->streams[sq->nb_streams];
+ memset(st, 0, sizeof(*st));
+
+ st->fifo = av_fifo_alloc2(1, sizeof(SyncQueueFrame), AV_FIFO_FLAG_AUTO_GROW);
+ if (!st->fifo)
+ return AVERROR(ENOMEM);
+
+ st->head_ts = AV_NOPTS_VALUE;
+
+ return sq->nb_streams++;
+}
+
+void sq_set_tb(SyncQueue *sq, unsigned int stream_idx, AVRational tb)
+{
+ av_assert0(stream_idx < sq->nb_streams);
+ sq->streams[stream_idx].tb = tb;
+}
+
+SyncQueue *sq_alloc(enum SyncQueueType type)
+{
+ SyncQueue *sq = av_mallocz(sizeof(*sq));
+
+ if (!sq)
+ return NULL;
+
+ sq->type = type;
+
+ sq->head_stream = -1;
+ sq->head_finished_stream = -1;
+
+ return sq;
+}
+
+void sq_free(SyncQueue **psq)
+{
+ SyncQueue *sq = *psq;
+
+ if (!sq)
+ return;
+
+ for (unsigned int i = 0; i < sq->nb_streams; i++) {
+ SyncQueueFrame frame;
+ while (av_fifo_read(sq->streams[i].fifo, &frame, 1) >= 0)
+ frame_free(sq, frame);
+
+ av_fifo_freep2(&sq->streams[i].fifo);
+ }
+
+ av_freep(&sq->streams);
+
+ for (unsigned int i = 0; i < sq->nb_free_frames; i++)
+ frame_free(sq, sq->free_frames[i]);
+
+ av_freep(psq);
+}
new file mode 100644
@@ -0,0 +1,93 @@
+/*
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef FFTOOLS_SYNC_QUEUE_H
+#define FFTOOLS_SYNC_QUEUE_H
+
+#include "libavcodec/packet.h"
+
+#include "libavutil/frame.h"
+
+enum SyncQueueType {
+ SYNC_QUEUE_PACKETS,
+ SYNC_QUEUE_FRAMES,
+};
+
+typedef union SyncQueueFrame {
+ AVFrame *f;
+ AVPacket *p;
+} SyncQueueFrame;
+
+#define SQFRAME(frame) ((SyncQueueFrame){ .f = (frame) })
+#define SQPKT(pkt) ((SyncQueueFrame){ .p = (pkt) })
+
+typedef struct SyncQueue SyncQueue;
+
+SyncQueue *sq_alloc(enum SyncQueueType type);
+void sq_free(SyncQueue **sq);
+
+/**
+ * Add a new stream to the sync queue.
+ *
+ * @return
+ * - a non-negative stream index on success
+ * - a negative error code on error
+ */
+int sq_add_stream(SyncQueue *sq);
+
+/**
+ * Set the timebase for the stream with index stream_idx. Should be called
+ * before sending any frames for this stream.
+ */
+void sq_set_tb(SyncQueue *sq, unsigned int stream_idx, AVRational tb);
+
+/**
+ * Submit a frame for the stream with index stream_idx.
+ *
+ * On success, the sync queue takes ownership of the frame and will reset the
+ * contents of the supplied frame. On failure, the frame remains owned by the
+ * caller.
+ *
+ * Sending a frame with NULL contents marks the stream as finished.
+ *
+ * @return
+ * - 0 on success
+ * - AVERROR_EOF when no more frames should be submitted for this stream
+ * - another a negative error code on failure
+ */
+int sq_send(SyncQueue *sq, unsigned int stream_idx, SyncQueueFrame frame);
+
+/**
+ * Read a frame from the queue.
+ *
+ * @param stream_idx index of the stream to read a frame for. May be -1, then
+ * try to read a frame from any stream that is ready for
+ * output.
+ * @param frame output frame will be written here on success. The frame is owned
+ * by the caller.
+ *
+ * @return
+ * - a non-negative index of the stream to which the returned frame belongs
+ * - AVERROR(EAGAIN) when more frames need to be submitted to the queue
+ * - AVERROR_EOF when no more frames will be available for this stream (for any
+ * stream if stream_idx is -1)
+ * - another negative error code on failure
+ */
+int sq_receive(SyncQueue *sq, int stream_idx, SyncQueueFrame frame);
+
+#endif // FFTOOLS_SYNC_QUEUE_H
@@ -120,4 +120,3 @@
0, 98304, 98304, 2048, 11182, e35a2ab846029effdbca0e43639717f2
1, 85760, 85760, 1536, 418, cf52ea7fc69e4c5bc8f75b354dfe60af
0, 100352, 100352, 2048, 1423, f480272c7d0b97834bc8ea36cceca61d
-1, 87296, 87296, 1536, 418, 78ab22657a1b6c8a0e5b8612ceb8081d
@@ -120,4 +120,3 @@
0, 98304, 98304, 2048, 11182, e35a2ab846029effdbca0e43639717f2
1, 85760, 85760, 1536, 418, cf52ea7fc69e4c5bc8f75b354dfe60af
0, 100352, 100352, 2048, 1423, f480272c7d0b97834bc8ea36cceca61d
-1, 87296, 87296, 1536, 418, 78ab22657a1b6c8a0e5b8612ceb8081d