@@ -1750,6 +1750,22 @@ Default value is 0.
Enable bitexact mode for (de)muxer and (de/en)coder
@item -shortest (@emph{output})
Finish encoding when the shortest input stream ends.
+
+Note that this option may require buffering frames, which introduces extra
+latency. The maximum amount of this latency may be controlled with the
+@code{-shortest_buf_duration} option.
+
+@item -shortest_buf_duration @var{duration} (@emph{output})
+The @code{-shortest} option may require buffering potentially large amounts
+of data when at least one of the streams is "sparse" (i.e. has large gaps
+between frames – this is typically the case for subtitles).
+
+This option controls the maximum duration of buffered frames in seconds.
+Larger values may allow the @code{-shortest} option to produce more accurate
+results, but increase memory use and latency.
+
+The default value is 10 seconds.
+
@item -dts_delta_threshold
Timestamp discontinuity delta threshold.
@item -dts_error_threshold @var{seconds}
@@ -14,6 +14,8 @@ OBJS-ffmpeg += \
fftools/ffmpeg_hw.o \
fftools/ffmpeg_mux.o \
fftools/ffmpeg_opt.o \
+ fftools/objpool.o \
+ fftools/sync_queue.o \
define DOFFTOOL
OBJS-$(1) += fftools/cmdutils.o fftools/opt_common.o fftools/$(1).o $(OBJS-$(1)-yes)
@@ -104,6 +104,7 @@
#include "ffmpeg.h"
#include "cmdutils.h"
+#include "sync_queue.h"
#include "libavutil/avassert.h"
@@ -569,6 +570,7 @@ static void ffmpeg_cleanup(int ret)
av_bsf_free(&ost->bsf_ctx);
av_frame_free(&ost->filtered_frame);
+ av_frame_free(&ost->sq_frame);
av_frame_free(&ost->last_frame);
av_packet_free(&ost->pkt);
av_dict_free(&ost->encoder_opts);
@@ -691,13 +693,10 @@ static void update_benchmark(const char *fmt, ...)
static void close_output_stream(OutputStream *ost)
{
OutputFile *of = output_files[ost->file_index];
- AVRational time_base = ost->stream_copy ? ost->mux_timebase : ost->enc_ctx->time_base;
-
ost->finished |= ENCODER_FINISHED;
- if (of->shortest) {
- int64_t end = av_rescale_q(ost->sync_opts - ost->first_pts, time_base, AV_TIME_BASE_Q);
- of->recording_time = FFMIN(of->recording_time, end);
- }
+
+ if (ost->sq_idx_encode >= 0)
+ sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL));
}
/*
@@ -726,10 +725,15 @@ static void output_packet(OutputFile *of, AVPacket *pkt,
goto finish;
while ((ret = av_bsf_receive_packet(ost->bsf_ctx, pkt)) >= 0)
of_submit_packet(of, pkt, ost);
+ if (ret == AVERROR_EOF)
+ of_submit_packet(of, NULL, ost);
if (ret == AVERROR(EAGAIN))
ret = 0;
- } else if (!eof)
- of_submit_packet(of, pkt, ost);
+ } else
+ of_submit_packet(of, eof ? NULL : pkt, ost);
+
+ if (eof)
+ ost->finished |= MUXER_FINISHED;
finish:
if (ret < 0 && ret != AVERROR_EOF) {
@@ -963,6 +967,52 @@ static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame)
av_assert0(0);
}
+static int submit_encode_frame(OutputFile *of, OutputStream *ost,
+ AVFrame *frame)
+{
+ int ret;
+
+ if (ost->sq_idx_encode < 0)
+ return encode_frame(of, ost, frame);
+
+ if (frame) {
+ ret = av_frame_ref(ost->sq_frame, frame);
+ if (ret < 0)
+ return ret;
+ frame = ost->sq_frame;
+ }
+
+ ret = sq_send(of->sq_encode, ost->sq_idx_encode,
+ SQFRAME(frame));
+ if (ret < 0) {
+ if (frame)
+ av_frame_unref(frame);
+ if (ret != AVERROR_EOF)
+ return ret;
+ }
+
+ while (1) {
+ AVFrame *enc_frame = ost->sq_frame;
+
+ ret = sq_receive(of->sq_encode, ost->sq_idx_encode,
+ SQFRAME(enc_frame));
+ if (ret == AVERROR_EOF) {
+ enc_frame = NULL;
+ } else if (ret < 0) {
+ return (ret == AVERROR(EAGAIN)) ? 0 : ret;
+ }
+
+ ret = encode_frame(of, ost, enc_frame);
+ if (enc_frame)
+ av_frame_unref(enc_frame);
+ if (ret < 0) {
+ if (ret == AVERROR_EOF)
+ close_output_stream(ost);
+ return ret;
+ }
+ }
+}
+
static void do_audio_out(OutputFile *of, OutputStream *ost,
AVFrame *frame)
{
@@ -978,8 +1028,8 @@ static void do_audio_out(OutputFile *of, OutputStream *ost,
ost->sync_opts = frame->pts + frame->nb_samples;
ost->samples_encoded += frame->nb_samples;
- ret = encode_frame(of, ost, frame);
- if (ret < 0)
+ ret = submit_encode_frame(of, ost, frame);
+ if (ret < 0 && ret != AVERROR_EOF)
exit_program(1);
}
@@ -1265,8 +1315,8 @@ static void do_video_out(OutputFile *of,
av_log(NULL, AV_LOG_DEBUG, "Forced keyframe at time %f\n", pts_time);
}
- ret = encode_frame(of, ost, in_picture);
- if (ret < 0)
+ ret = submit_encode_frame(of, ost, in_picture);
+ if (ret < 0 && ret != AVERROR_EOF)
exit_program(1);
ost->sync_opts++;
@@ -1278,19 +1328,6 @@ static void do_video_out(OutputFile *of,
av_frame_move_ref(ost->last_frame, next_picture);
}
-static void finish_output_stream(OutputStream *ost)
-{
- OutputFile *of = output_files[ost->file_index];
- AVRational time_base = ost->stream_copy ? ost->mux_timebase : ost->enc_ctx->time_base;
-
- ost->finished = ENCODER_FINISHED | MUXER_FINISHED;
-
- if (of->shortest) {
- int64_t end = av_rescale_q(ost->sync_opts - ost->first_pts, time_base, AV_TIME_BASE_Q);
- of->recording_time = FFMIN(of->recording_time, end);
- }
-}
-
/**
* Get and encode new output from any of the filtergraphs, without causing
* activity.
@@ -1758,7 +1795,7 @@ static void flush_encoders(void)
exit_program(1);
}
- finish_output_stream(ost);
+ output_packet(of, ost->pkt, ost, 1);
}
init_output_stream_wrapper(ost, NULL, 1);
@@ -1767,7 +1804,7 @@ static void flush_encoders(void)
if (enc->codec_type != AVMEDIA_TYPE_VIDEO && enc->codec_type != AVMEDIA_TYPE_AUDIO)
continue;
- ret = encode_frame(of, ost, NULL);
+ ret = submit_encode_frame(of, ost, NULL);
if (ret != AVERROR_EOF)
exit_program(1);
}
@@ -3078,6 +3115,9 @@ static int init_output_stream_encode(OutputStream *ost, AVFrame *frame)
break;
}
+ if (ost->sq_idx_encode >= 0)
+ sq_set_tb(of->sq_encode, ost->sq_idx_encode, enc_ctx->time_base);
+
ost->mux_timebase = enc_ctx->time_base;
return 0;
@@ -3086,6 +3126,7 @@ static int init_output_stream_encode(OutputStream *ost, AVFrame *frame)
static int init_output_stream(OutputStream *ost, AVFrame *frame,
char *error, int error_len)
{
+ OutputFile *of = output_files[ost->file_index];
int ret = 0;
if (ost->encoding_needed) {
@@ -3218,6 +3259,9 @@ static int init_output_stream(OutputStream *ost, AVFrame *frame,
if (ret < 0)
return ret;
+ if (ost->sq_idx_mux >= 0)
+ sq_set_tb(of->sq_mux, ost->sq_idx_mux, ost->mux_timebase);
+
ost->initialized = 1;
ret = of_check_init(output_files[ost->file_index]);
@@ -3923,8 +3967,10 @@ static int process_input(int file_index)
OutputStream *ost = output_streams[j];
if (ost->source_index == ifile->ist_index + i &&
- (ost->stream_copy || ost->enc->type == AVMEDIA_TYPE_SUBTITLE))
- finish_output_stream(ost);
+ (ost->stream_copy || ost->enc->type == AVMEDIA_TYPE_SUBTITLE)) {
+ OutputFile *of = output_files[ost->file_index];
+ output_packet(of, ost->pkt, ost, 1);
+ }
}
}
@@ -26,6 +26,7 @@
#include <signal.h>
#include "cmdutils.h"
+#include "sync_queue.h"
#include "libavformat/avformat.h"
#include "libavformat/avio.h"
@@ -150,6 +151,7 @@ typedef struct OptionsContext {
int64_t limit_filesize;
float mux_preload;
float mux_max_delay;
+ float shortest_buf_duration;
int shortest;
int bitexact;
@@ -482,6 +484,7 @@ typedef struct OutputStream {
int64_t max_frames;
AVFrame *filtered_frame;
AVFrame *last_frame;
+ AVFrame *sq_frame;
AVPacket *pkt;
int last_dropped;
int last_nb0_frames[3];
@@ -573,6 +576,9 @@ typedef struct OutputStream {
/* frame encode sum of squared error values */
int64_t error[4];
+
+ int sq_idx_encode;
+ int sq_idx_mux;
} OutputStream;
typedef struct Muxer Muxer;
@@ -583,6 +589,9 @@ typedef struct OutputFile {
Muxer *mux;
const AVOutputFormat *format;
+ SyncQueue *sq_encode;
+ SyncQueue *sq_mux;
+
AVFormatContext *ctx;
int ost_index; /* index of the first stream in output_streams */
int64_t recording_time; ///< desired length of the resulting file in microseconds == AV_TIME_BASE units
@@ -20,6 +20,7 @@
#include <string.h>
#include "ffmpeg.h"
+#include "sync_queue.h"
#include "libavutil/fifo.h"
#include "libavutil/intreadwrite.h"
@@ -56,6 +57,8 @@ struct Muxer {
int64_t limit_filesize;
int64_t final_filesize;
int header_written;
+
+ AVPacket *sq_pkt;
};
static int want_sdp = 1;
@@ -72,13 +75,14 @@ static void close_all_output_streams(OutputStream *ost, OSTFinished this_stream,
static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
{
MuxStream *ms = &of->mux->streams[ost->index];
- AVPacket *tmp_pkt;
+ AVPacket *tmp_pkt = NULL;
int ret;
if (!av_fifo_can_write(ms->muxing_queue)) {
size_t cur_size = av_fifo_can_read(ms->muxing_queue);
+ size_t pkt_size = pkt ? pkt->size : 0;
unsigned int are_we_over_size =
- (ms->muxing_queue_data_size + pkt->size) > ost->muxing_queue_data_threshold;
+ (ms->muxing_queue_data_size + pkt_size) > ost->muxing_queue_data_threshold;
size_t limit = are_we_over_size ? ost->max_muxing_queue_size : SIZE_MAX;
size_t new_size = FFMIN(2 * cur_size, limit);
@@ -93,6 +97,7 @@ static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
return ret;
}
+ if (pkt) {
ret = av_packet_make_refcounted(pkt);
if (ret < 0)
return ret;
@@ -103,6 +108,7 @@ static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
av_packet_move_ref(tmp_pkt, pkt);
ms->muxing_queue_data_size += tmp_pkt->size;
+ }
av_fifo_write(ms->muxing_queue, &tmp_pkt, 1);
return 0;
@@ -192,11 +198,44 @@ static void write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
}
}
+static void submit_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
+{
+ if (ost->sq_idx_mux >= 0) {
+ int ret = sq_send(of->sq_mux, ost->sq_idx_mux, SQPKT(pkt));
+ if (ret < 0) {
+ if (pkt)
+ av_packet_unref(pkt);
+ if (ret == AVERROR_EOF) {
+ ost->finished |= MUXER_FINISHED;
+ return;
+ } else
+ exit_program(1);
+ }
+
+ while (1) {
+ ret = sq_receive(of->sq_mux, -1, SQPKT(of->mux->sq_pkt));
+ if (ret == AVERROR_EOF || ret == AVERROR(EAGAIN))
+ return;
+ else if (ret < 0)
+ exit_program(1);
+
+ write_packet(of, output_streams[of->ost_index + ret],
+ of->mux->sq_pkt);
+ }
+ } else {
+ if (pkt)
+ write_packet(of, ost, pkt);
+ else
+ ost->finished |= MUXER_FINISHED;
+ }
+}
+
void of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost)
{
AVStream *st = ost->st;
int ret;
+ if (pkt) {
/*
* Audio encoders may split the packets -- #frames in != #packets out.
* But there is no reordering, so we can limit the number of output packets
@@ -211,9 +250,10 @@ void of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost)
}
ost->frame_number++;
}
+ }
if (of->mux->header_written) {
- write_packet(of, ost, pkt);
+ submit_packet(of, ost, pkt);
} else {
/* the muxer is not initialized yet, buffer the packet */
ret = queue_packet(of, ost, pkt);
@@ -321,9 +361,11 @@ int of_check_init(OutputFile *of)
ost->mux_timebase = ost->st->time_base;
while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) {
- ms->muxing_queue_data_size -= pkt->size;
- write_packet(of, ost, pkt);
- av_packet_free(&pkt);
+ submit_packet(of, ost, pkt);
+ if (pkt) {
+ ms->muxing_queue_data_size -= pkt->size;
+ av_packet_free(&pkt);
+ }
}
}
@@ -383,6 +425,8 @@ static void mux_free(Muxer **pmux, int nb_streams)
av_freep(&mux->streams);
av_dict_free(&mux->opts);
+ av_packet_free(&mux->sq_pkt);
+
av_freep(pmux);
}
@@ -394,6 +438,9 @@ void of_close(OutputFile **pof)
if (!of)
return;
+ sq_free(&of->sq_encode);
+ sq_free(&of->sq_mux);
+
s = of->ctx;
mux_free(&of->mux, s ? s->nb_streams : 0);
@@ -437,6 +484,14 @@ int of_muxer_init(OutputFile *of, AVDictionary *opts, int64_t limit_filesize)
if (strcmp(of->format->name, "rtp"))
want_sdp = 0;
+ if (of->sq_mux) {
+ mux->sq_pkt = av_packet_alloc();
+ if (!mux->sq_pkt) {
+ ret = AVERROR(ENOMEM);
+ goto fail;
+ }
+ }
+
/* write the header for files with no streams */
if (of->format->flags & AVFMT_NOSTREAMS && of->ctx->nb_streams == 0) {
ret = of_check_init(of);
@@ -31,6 +31,7 @@
#include "fopen_utf8.h"
#include "cmdutils.h"
#include "opt_common.h"
+#include "sync_queue.h"
#include "libavformat/avformat.h"
@@ -234,6 +235,7 @@ static void init_options(OptionsContext *o)
o->chapters_input_file = INT_MAX;
o->accurate_seek = 1;
o->thread_queue_size = -1;
+ o->shortest_buf_duration = 10.f;
}
static int show_hwaccels(void *optctx, const char *opt, const char *arg)
@@ -2326,6 +2328,78 @@ static int init_complex_filters(void)
return 0;
}
+static int setup_sync_queues(OutputFile *of, AVFormatContext *oc, int64_t buf_size_us)
+{
+ int nb_av_enc = 0, nb_interleaved = 0;
+
+#define IS_AV_ENC(ost, type) \
+ (ost->encoding_needed && (type == AVMEDIA_TYPE_VIDEO || type == AVMEDIA_TYPE_AUDIO))
+#define IS_INTERLEAVED(type) (type != AVMEDIA_TYPE_ATTACHMENT)
+
+ for (int i = 0; i < oc->nb_streams; i++) {
+ OutputStream *ost = output_streams[of->ost_index + i];
+ enum AVMediaType type = ost->st->codecpar->codec_type;
+
+ ost->sq_idx_encode = -1;
+ ost->sq_idx_mux = -1;
+
+ nb_interleaved += IS_INTERLEAVED(type);
+ nb_av_enc += IS_AV_ENC(ost, type);
+ }
+
+ if (!(nb_interleaved > 1 && of->shortest))
+ return 0;
+
+ /* if we have more than one encoded audio/video streams, then we
+ * synchronize them before encoding */
+ if (nb_av_enc > 1) {
+ of->sq_encode = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us);
+ if (!of->sq_encode)
+ return AVERROR(ENOMEM);
+
+ for (int i = 0; i < oc->nb_streams; i++) {
+ OutputStream *ost = output_streams[of->ost_index + i];
+ enum AVMediaType type = ost->st->codecpar->codec_type;
+
+ if (!IS_AV_ENC(ost, type))
+ continue;
+
+ ost->sq_idx_encode = sq_add_stream(of->sq_encode);
+ if (ost->sq_idx_encode < 0)
+ return ost->sq_idx_encode;
+
+ ost->sq_frame = av_frame_alloc();
+ if (!ost->sq_frame)
+ return AVERROR(ENOMEM);
+ }
+ }
+
+ /* if there are any additional interleaved streams, then ALL the streams
+ * are also synchronized before sending them to the muxer */
+ if (nb_interleaved > nb_av_enc) {
+ of->sq_mux = sq_alloc(SYNC_QUEUE_PACKETS, buf_size_us);
+ if (!of->sq_mux)
+ return AVERROR(ENOMEM);
+
+ for (int i = 0; i < oc->nb_streams; i++) {
+ OutputStream *ost = output_streams[of->ost_index + i];
+ enum AVMediaType type = ost->st->codecpar->codec_type;
+
+ if (!IS_INTERLEAVED(type))
+ continue;
+
+ ost->sq_idx_mux = sq_add_stream(of->sq_mux);
+ if (ost->sq_idx_mux < 0)
+ return ost->sq_idx_mux;
+ }
+ }
+
+#undef IS_AV_ENC
+#undef IS_INTERLEAVED
+
+ return 0;
+}
+
static int open_output_file(OptionsContext *o, const char *filename)
{
AVFormatContext *oc;
@@ -2963,6 +3037,12 @@ loop_end:
exit_program(1);
}
+ err = setup_sync_queues(of, oc, o->shortest_buf_duration * AV_TIME_BASE);
+ if (err < 0) {
+ av_log(NULL, AV_LOG_FATAL, "Error setting up output sync queues\n");
+ exit_program(1);
+ }
+
err = of_muxer_init(of, format_opts, o->limit_filesize);
if (err < 0) {
av_log(NULL, AV_LOG_FATAL, "Error initializing internal muxing state\n");
@@ -3675,6 +3755,8 @@ const OptionDef options[] = {
{ "shortest", OPT_BOOL | OPT_EXPERT | OPT_OFFSET |
OPT_OUTPUT, { .off = OFFSET(shortest) },
"finish encoding within shortest input" },
+ { "shortest_buf_duration", HAS_ARG | OPT_FLOAT | OPT_EXPERT | OPT_OFFSET | OPT_OUTPUT, { .off = OFFSET(shortest_buf_duration) },
+ "maximum buffering duration (in seconds) for the -shortest option" },
{ "bitexact", OPT_BOOL | OPT_EXPERT | OPT_OFFSET |
OPT_OUTPUT | OPT_INPUT, { .off = OFFSET(bitexact) },
"bitexact mode" },
new file mode 100644
@@ -0,0 +1,427 @@
+/*
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <stdint.h>
+#include <string.h>
+
+#include "libavutil/avassert.h"
+#include "libavutil/error.h"
+#include "libavutil/fifo.h"
+#include "libavutil/mathematics.h"
+#include "libavutil/mem.h"
+
+#include "objpool.h"
+#include "sync_queue.h"
+
+typedef struct SyncQueueStream {
+ AVFifo *fifo;
+ AVRational tb;
+
+ /* stream head: largest timestamp seen */
+ int64_t head_ts;
+ /* no more frames will be sent for this stream */
+ int finished;
+} SyncQueueStream;
+
+struct SyncQueue {
+ enum SyncQueueType type;
+
+ /* no more frames will be sent for any stream */
+ int finished;
+ /* sync head: the stream with the _smallest_ head timestamp
+ * this stream determines which frames can be output */
+ int head_stream;
+ /* the finished stream with the smallest finish timestamp or -1 */
+ int head_finished_stream;
+
+ // maximum buffering duration in microseconds
+ int64_t buf_size_us;
+
+ SyncQueueStream *streams;
+ unsigned int nb_streams;
+
+ // pool of preallocated frames to avoid constant allocations
+ ObjPool *pool;
+ SyncQueueFrame free_frames[32];
+ unsigned int nb_free_frames;
+};
+
+static void frame_move(const SyncQueue *sq, SyncQueueFrame dst,
+ SyncQueueFrame src)
+{
+ if (sq->type == SYNC_QUEUE_PACKETS)
+ av_packet_move_ref(dst.p, src.p);
+ else
+ av_frame_move_ref(dst.f, src.f);
+}
+
+static int64_t frame_ts(const SyncQueue *sq, SyncQueueFrame frame)
+{
+ return (sq->type == SYNC_QUEUE_PACKETS) ?
+ frame.p->pts + frame.p->duration :
+ frame.f->pts + frame.f->pkt_duration;
+}
+
+static int frame_null(const SyncQueue *sq, SyncQueueFrame frame)
+{
+ return (sq->type == SYNC_QUEUE_PACKETS) ? (frame.p == NULL) : (frame.f == NULL);
+}
+
+static void finish_stream(SyncQueue *sq, unsigned int stream_idx)
+{
+ SyncQueueStream *st = &sq->streams[stream_idx];
+
+ st->finished = 1;
+
+ if (st->head_ts != AV_NOPTS_VALUE) {
+ /* check if this stream is the new finished head */
+ if (sq->head_finished_stream < 0 ||
+ av_compare_ts(st->head_ts, st->tb,
+ sq->streams[sq->head_finished_stream].head_ts,
+ sq->streams[sq->head_finished_stream].tb) < 0) {
+ sq->head_finished_stream = stream_idx;
+ }
+
+ /* mark as finished all streams that should no longer receive new frames,
+ * due to them being ahead of some finished stream */
+ st = &sq->streams[sq->head_finished_stream];
+ for (unsigned int i = 0; i < sq->nb_streams; i++) {
+ SyncQueueStream *st1 = &sq->streams[i];
+ if (st != st1 && st1->head_ts != AV_NOPTS_VALUE &&
+ av_compare_ts(st->head_ts, st->tb, st1->head_ts, st1->tb) <= 0)
+ st1->finished = 1;
+ }
+ }
+
+ /* mark the whole queue as finished if all streams are finished */
+ for (unsigned int i = 0; i < sq->nb_streams; i++) {
+ if (!sq->streams[i].finished)
+ return;
+ }
+ sq->finished = 1;
+}
+
+static void queue_head_update(SyncQueue *sq)
+{
+ if (sq->head_stream < 0) {
+ /* wait for one timestamp in each stream before determining
+ * the queue head */
+ for (unsigned int i = 0; i < sq->nb_streams; i++) {
+ SyncQueueStream *st = &sq->streams[i];
+ if (st->head_ts == AV_NOPTS_VALUE)
+ return;
+ }
+
+ // placeholder value, correct one will be found below
+ sq->head_stream = 0;
+ }
+
+ for (unsigned int i = 0; i < sq->nb_streams; i++) {
+ SyncQueueStream *st_head = &sq->streams[sq->head_stream];
+ SyncQueueStream *st_other = &sq->streams[i];
+ if (st_other->head_ts != AV_NOPTS_VALUE &&
+ av_compare_ts(st_other->head_ts, st_other->tb,
+ st_head->head_ts, st_head->tb) < 0)
+ sq->head_stream = i;
+ }
+}
+
+/* update this stream's head timestamp */
+static void stream_update_ts(SyncQueue *sq, unsigned int stream_idx, int64_t ts)
+{
+ SyncQueueStream *st = &sq->streams[stream_idx];
+
+ if (ts == AV_NOPTS_VALUE ||
+ (st->head_ts != AV_NOPTS_VALUE && st->head_ts >= ts))
+ return;
+
+ st->head_ts = ts;
+
+ /* if this stream is now ahead of some finished stream, then
+ * this stream is also finished */
+ if (sq->head_finished_stream >= 0 &&
+ av_compare_ts(sq->streams[sq->head_finished_stream].head_ts,
+ sq->streams[sq->head_finished_stream].tb,
+ ts, st->tb) <= 0)
+ finish_stream(sq, stream_idx);
+
+ /* update the overall head timestamp if it could have changed */
+ if (sq->head_stream < 0 || sq->head_stream == stream_idx)
+ queue_head_update(sq);
+}
+
+/* If the queue for the given stream (or all streams when stream_idx=-1)
+ * is overflowing, trigger a fake heartbeat on lagging streams.
+ *
+ * @return 1 if heartbeat triggered, 0 otherwise
+ */
+static int overflow_heartbeat(SyncQueue *sq, int stream_idx)
+{
+ SyncQueueStream *st;
+ SyncQueueFrame frame;
+ int64_t tail_ts = AV_NOPTS_VALUE;
+
+ /* if no stream specified, pick the one that is most ahead */
+ if (stream_idx < 0) {
+ int64_t ts = AV_NOPTS_VALUE;
+
+ for (int i = 0; i < sq->nb_streams; i++) {
+ st = &sq->streams[i];
+ if (st->head_ts != AV_NOPTS_VALUE &&
+ (ts == AV_NOPTS_VALUE ||
+ av_compare_ts(ts, sq->streams[stream_idx].tb,
+ st->head_ts, st->tb) < 0)) {
+ ts = st->head_ts;
+ stream_idx = i;
+ }
+ }
+ /* no stream has a timestamp yet -> nothing to do */
+ if (stream_idx < 0)
+ return 0;
+ }
+
+ st = &sq->streams[stream_idx];
+
+ /* get the chosen stream's tail timestamp */
+ for (size_t i = 0; tail_ts == AV_NOPTS_VALUE &&
+ av_fifo_peek(st->fifo, &frame, 1, i) >= 0; i++)
+ tail_ts = frame_ts(sq, frame);
+
+ /* overflow triggers when the tail is over 10 seconds behind the head */
+ if (tail_ts == AV_NOPTS_VALUE || tail_ts >= st->head_ts ||
+ av_rescale_q(st->head_ts - tail_ts, st->tb, AV_TIME_BASE_Q) < sq->buf_size_us)
+ return 0;
+
+ /* signal a fake timestamp for all streams that prevent tail_ts from being output */
+ tail_ts++;
+ for (unsigned int i = 0; i < sq->nb_streams; i++) {
+ SyncQueueStream *st1 = &sq->streams[i];
+ int64_t ts;
+
+ if (st == st1 || st1->finished ||
+ (st1->head_ts != AV_NOPTS_VALUE &&
+ av_compare_ts(tail_ts, st->tb, st1->head_ts, st1->tb) <= 0))
+ continue;
+
+ ts = av_rescale_q(tail_ts, st->tb, st1->tb);
+ if (st1->head_ts != AV_NOPTS_VALUE)
+ ts = FFMAX(st1->head_ts + 1, ts);
+
+ stream_update_ts(sq, i, ts);
+ }
+
+ return 1;
+}
+
+int sq_send(SyncQueue *sq, unsigned int stream_idx, SyncQueueFrame frame)
+{
+ SyncQueueStream *st;
+ SyncQueueFrame dst;
+ int64_t ts;
+ int ret;
+
+ av_assert0(stream_idx < sq->nb_streams);
+ st = &sq->streams[stream_idx];
+
+ av_assert0(st->tb.num > 0 && st->tb.den > 0);
+
+ if (frame_null(sq, frame)) {
+ finish_stream(sq, stream_idx);
+ return 0;
+ }
+ if (st->finished)
+ return AVERROR_EOF;
+
+ ret = objpool_get(sq->pool, (void**)&dst);
+ if (ret < 0)
+ return ret;
+
+ frame_move(sq, dst, frame);
+
+ ts = frame_ts(sq, dst);
+
+ ret = av_fifo_write(st->fifo, &dst, 1);
+ if (ret < 0) {
+ frame_move(sq, frame, dst);
+ objpool_release(sq->pool, (void**)&dst);
+ return ret;
+ }
+
+ stream_update_ts(sq, stream_idx, ts);
+
+ return 0;
+}
+
+static int receive_for_stream(SyncQueue *sq, unsigned int stream_idx,
+ SyncQueueFrame frame)
+{
+ SyncQueueStream *st_head = sq->head_stream >= 0 ?
+ &sq->streams[sq->head_stream] : NULL;
+ SyncQueueStream *st;
+
+ av_assert0(stream_idx < sq->nb_streams);
+ st = &sq->streams[stream_idx];
+
+ if (av_fifo_can_read(st->fifo)) {
+ SyncQueueFrame peek;
+ int64_t ts;
+ int cmp = 1;
+
+ av_fifo_peek(st->fifo, &peek, 1, 0);
+ ts = frame_ts(sq, peek);
+
+ /* check if this stream's tail timestamp does not overtake
+ * the overall queue head */
+ if (ts != AV_NOPTS_VALUE && st_head)
+ cmp = av_compare_ts(ts, st->tb, st_head->head_ts, st_head->tb);
+
+ /* We can release frames that do not end after the queue head.
+ * Frames with no timestamps are just passed through with no conditions.
+ */
+ if (cmp <= 0 || ts == AV_NOPTS_VALUE) {
+ frame_move(sq, frame, peek);
+ objpool_release(sq->pool, (void**)&peek);
+ av_fifo_drain2(st->fifo, 1);
+ return 0;
+ }
+ }
+
+ return (sq->finished || (st->finished && !av_fifo_can_read(st->fifo))) ?
+ AVERROR_EOF : AVERROR(EAGAIN);
+}
+
+static int receive_internal(SyncQueue *sq, int stream_idx, SyncQueueFrame frame)
+{
+ int nb_eof = 0;
+ int ret;
+
+ /* read a frame for a specific stream */
+ if (stream_idx >= 0) {
+ ret = receive_for_stream(sq, stream_idx, frame);
+ return (ret < 0) ? ret : stream_idx;
+ }
+
+ /* read a frame for any stream with available output */
+ for (unsigned int i = 0; i < sq->nb_streams; i++) {
+ ret = receive_for_stream(sq, i, frame);
+ if (ret == AVERROR_EOF || ret == AVERROR(EAGAIN)) {
+ nb_eof += (ret == AVERROR_EOF);
+ continue;
+ }
+ return (ret < 0) ? ret : i;
+ }
+
+ return (nb_eof == sq->nb_streams) ? AVERROR_EOF : AVERROR(EAGAIN);
+}
+
+int sq_receive(SyncQueue *sq, int stream_idx, SyncQueueFrame frame)
+{
+ int ret = receive_internal(sq, stream_idx, frame);
+
+ /* try again if the queue overflowed and triggered a fake heartbeat
+ * for lagging streams */
+ if (ret == AVERROR(EAGAIN) && overflow_heartbeat(sq, stream_idx))
+ ret = receive_internal(sq, stream_idx, frame);
+
+ return ret;
+}
+
+int sq_add_stream(SyncQueue *sq)
+{
+ SyncQueueStream *tmp, *st;
+
+ tmp = av_realloc_array(sq->streams, sq->nb_streams + 1, sizeof(*sq->streams));
+ if (!tmp)
+ return AVERROR(ENOMEM);
+ sq->streams = tmp;
+
+ st = &sq->streams[sq->nb_streams];
+ memset(st, 0, sizeof(*st));
+
+ st->fifo = av_fifo_alloc2(1, sizeof(SyncQueueFrame), AV_FIFO_FLAG_AUTO_GROW);
+ if (!st->fifo)
+ return AVERROR(ENOMEM);
+
+ /* we set a valid default, so that a pathological stream that never
+ * receives even a real timebase (and no frames) won't stall all other
+ * streams forever; cf. overflow_heartbeat() */
+ st->tb = (AVRational){ 1, 1 };
+ st->head_ts = AV_NOPTS_VALUE;
+
+ return sq->nb_streams++;
+}
+
+void sq_set_tb(SyncQueue *sq, unsigned int stream_idx, AVRational tb)
+{
+ SyncQueueStream *st;
+
+ av_assert0(stream_idx < sq->nb_streams);
+ st = &sq->streams[stream_idx];
+
+ av_assert0(!av_fifo_can_read(st->fifo));
+
+ if (st->head_ts != AV_NOPTS_VALUE)
+ st->head_ts = av_rescale_q(st->head_ts, st->tb, tb);
+
+ st->tb = tb;
+}
+
+SyncQueue *sq_alloc(enum SyncQueueType type, int64_t buf_size_us)
+{
+ SyncQueue *sq = av_mallocz(sizeof(*sq));
+
+ if (!sq)
+ return NULL;
+
+ sq->type = type;
+ sq->buf_size_us = buf_size_us;
+
+ sq->head_stream = -1;
+ sq->head_finished_stream = -1;
+
+ sq->pool = (type == SYNC_QUEUE_PACKETS) ? objpool_alloc_packets() :
+ objpool_alloc_frames();
+ if (!sq->pool) {
+ av_freep(&sq);
+ return NULL;
+ }
+
+ return sq;
+}
+
+void sq_free(SyncQueue **psq)
+{
+ SyncQueue *sq = *psq;
+
+ if (!sq)
+ return;
+
+ for (unsigned int i = 0; i < sq->nb_streams; i++) {
+ SyncQueueFrame frame;
+ while (av_fifo_read(sq->streams[i].fifo, &frame, 1) >= 0)
+ objpool_release(sq->pool, (void**)&frame);
+
+ av_fifo_freep2(&sq->streams[i].fifo);
+ }
+
+ av_freep(&sq->streams);
+
+ objpool_free(&sq->pool);
+
+ av_freep(psq);
+}
new file mode 100644
@@ -0,0 +1,100 @@
+/*
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef FFTOOLS_SYNC_QUEUE_H
+#define FFTOOLS_SYNC_QUEUE_H
+
+#include <stdint.h>
+
+#include "libavcodec/packet.h"
+
+#include "libavutil/frame.h"
+
+enum SyncQueueType {
+ SYNC_QUEUE_PACKETS,
+ SYNC_QUEUE_FRAMES,
+};
+
+typedef union SyncQueueFrame {
+ AVFrame *f;
+ AVPacket *p;
+} SyncQueueFrame;
+
+#define SQFRAME(frame) ((SyncQueueFrame){ .f = (frame) })
+#define SQPKT(pkt) ((SyncQueueFrame){ .p = (pkt) })
+
+typedef struct SyncQueue SyncQueue;
+
+/**
+ * Allocate a sync queue of the given type.
+ *
+ * @param buf_size_us maximum duration that will be buffered in microseconds
+ */
+SyncQueue *sq_alloc(enum SyncQueueType type, int64_t buf_size_us);
+void sq_free(SyncQueue **sq);
+
+/**
+ * Add a new stream to the sync queue.
+ *
+ * @return
+ * - a non-negative stream index on success
+ * - a negative error code on error
+ */
+int sq_add_stream(SyncQueue *sq);
+
+/**
+ * Set the timebase for the stream with index stream_idx. Should be called
+ * before sending any frames for this stream.
+ */
+void sq_set_tb(SyncQueue *sq, unsigned int stream_idx, AVRational tb);
+
+/**
+ * Submit a frame for the stream with index stream_idx.
+ *
+ * On success, the sync queue takes ownership of the frame and will reset the
+ * contents of the supplied frame. On failure, the frame remains owned by the
+ * caller.
+ *
+ * Sending a frame with NULL contents marks the stream as finished.
+ *
+ * @return
+ * - 0 on success
+ * - AVERROR_EOF when no more frames should be submitted for this stream
+ * - another a negative error code on failure
+ */
+int sq_send(SyncQueue *sq, unsigned int stream_idx, SyncQueueFrame frame);
+
+/**
+ * Read a frame from the queue.
+ *
+ * @param stream_idx index of the stream to read a frame for. May be -1, then
+ * try to read a frame from any stream that is ready for
+ * output.
+ * @param frame output frame will be written here on success. The frame is owned
+ * by the caller.
+ *
+ * @return
+ * - a non-negative index of the stream to which the returned frame belongs
+ * - AVERROR(EAGAIN) when more frames need to be submitted to the queue
+ * - AVERROR_EOF when no more frames will be available for this stream (for any
+ * stream if stream_idx is -1)
+ * - another negative error code on failure
+ */
+int sq_receive(SyncQueue *sq, int stream_idx, SyncQueueFrame frame);
+
+#endif // FFTOOLS_SYNC_QUEUE_H
@@ -105,7 +105,7 @@ FATE_SAMPLES_FFMPEG-$(call ALLYES, COLOR_FILTER, VOBSUB_DEMUXER, MATROSKA_DEMUXE
fate-shortest-sub: CMD = enc_dec \
vobsub $(TARGET_SAMPLES)/sub/vobsub.idx matroska \
"-filter_complex 'color=s=1x1:rate=1:duration=400' -pix_fmt rgb24 -allow_raw_vfw 1 -c:s copy -c:v rawvideo" \
- framecrc "-map 0 -c copy -shortest"
+ framecrc "-map 0 -c copy -shortest -shortest_buf_duration 40"
# Basic test for fix_sub_duration, which calculates duration based on the
# following subtitle's pts.
@@ -120,4 +120,3 @@
0, 98304, 98304, 2048, 11182, e35a2ab846029effdbca0e43639717f2
1, 85760, 85760, 1536, 418, cf52ea7fc69e4c5bc8f75b354dfe60af
0, 100352, 100352, 2048, 1423, f480272c7d0b97834bc8ea36cceca61d
-1, 87296, 87296, 1536, 418, 78ab22657a1b6c8a0e5b8612ceb8081d
@@ -120,4 +120,3 @@
0, 98304, 98304, 2048, 11182, e35a2ab846029effdbca0e43639717f2
1, 85760, 85760, 1536, 418, cf52ea7fc69e4c5bc8f75b354dfe60af
0, 100352, 100352, 2048, 1423, f480272c7d0b97834bc8ea36cceca61d
-1, 87296, 87296, 1536, 418, 78ab22657a1b6c8a0e5b8612ceb8081d
@@ -1,4 +1,4 @@
52bc3d6a0c80e639095a2c28da4ef42c *tests/data/fate/shortest-sub.matroska
139246 tests/data/fate/shortest-sub.matroska
-d71f5d359ef788ea689415bc1e4a90df *tests/data/fate/shortest-sub.out.framecrc
-stddev:11541.12 PSNR: 15.08 MAXDIFF:22854 bytes: 2591/ 26055
+876ac3fa52e467050ab843969d4cf343 *tests/data/fate/shortest-sub.out.framecrc
+stddev:11541.12 PSNR: 15.08 MAXDIFF:22854 bytes: 2591/ 23735