diff mbox series

[FFmpeg-devel,23/24] fftools/ffmpeg_mux: convert to the scheduler

Message ID 20231104092125.10213-24-anton@khirnov.net
State New
Headers show
Series [FFmpeg-devel,01/24] lavf/mux: do not apply max_interleave_delta to subtitles | expand

Commit Message

Anton Khirnov Nov. 4, 2023, 7:56 a.m. UTC
---
 fftools/ffmpeg.c          |  30 +---
 fftools/ffmpeg.h          |  11 +-
 fftools/ffmpeg_mux.c      | 290 ++++++--------------------------------
 fftools/ffmpeg_mux.h      |  24 +---
 fftools/ffmpeg_mux_init.c |  40 ++----
 fftools/ffmpeg_opt.c      |   6 +-
 6 files changed, 61 insertions(+), 340 deletions(-)
diff mbox series

Patch

diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index 122424a0e1..5d1560b891 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -117,7 +117,7 @@  typedef struct BenchmarkTimeStamps {
 static BenchmarkTimeStamps get_benchmark_time_stamps(void);
 static int64_t getmaxrss(void);
 
-unsigned nb_output_dumped = 0;
+atomic_uint nb_output_dumped = 0;
 
 static BenchmarkTimeStamps current_time;
 AVIOContext *progress_avio = NULL;
@@ -496,7 +496,7 @@  static void print_report(int is_last_report, int64_t timer_start, int64_t cur_ti
             last_time = cur_time;
         }
         if (((cur_time - last_time) < stats_period && !first_report) ||
-            (first_report && nb_output_dumped < nb_output_files))
+            (first_report && atomic_load(&nb_output_dumped) < nb_output_files))
             return;
         last_time = cur_time;
     }
@@ -750,28 +750,12 @@  int subtitle_wrap_frame(AVFrame *frame, AVSubtitle *subtitle, int copy)
 static int process_input_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
 {
     InputFile *f = input_files[ist->file_index];
-    int64_t dts_est = AV_NOPTS_VALUE;
     int ret = 0;
     int eof_reached = 0;
 
     if (ret == AVERROR_EOF || (!pkt && !ist->decoding_needed))
         eof_reached = 1;
 
-    if (pkt && pkt->opaque_ref) {
-        DemuxPktData *pd = (DemuxPktData*)pkt->opaque_ref->data;
-        dts_est = pd->dts_est;
-    }
-
-    for (int oidx = 0; oidx < ist->nb_outputs; oidx++) {
-        OutputStream *ost = ist->outputs[oidx];
-        if (ost->enc || (!pkt && no_eof))
-            continue;
-
-        ret = of_streamcopy(ost, pkt, dts_est);
-        if (ret < 0)
-            return ret;
-    }
-
     return !eof_reached;
 }
 
@@ -995,16 +979,6 @@  static int process_input(int file_index, AVPacket *pkt)
                 else if (ret < 0)
                     return ret;
             }
-
-            /* mark all outputs that don't go through lavfi as finished */
-            for (int oidx = 0; oidx < ist->nb_outputs; oidx++) {
-                OutputStream *ost = ist->outputs[oidx];
-                OutputFile    *of = output_files[ost->file_index];
-
-                ret = of_output_packet(of, ost, NULL);
-                if (ret < 0)
-                    return ret;
-            }
         }
 
         ifile->eof_reached = 1;
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index 20abd5e772..afc4496bd6 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -594,7 +594,6 @@  typedef struct OutputStream {
     /* packet quality factor */
     atomic_int quality;
 
-    int sq_idx_encode;
     int sq_idx_mux;
 
     EncStats enc_stats_pre;
@@ -646,7 +645,6 @@  extern FilterGraph **filtergraphs;
 extern int        nb_filtergraphs;
 
 extern char *vstats_filename;
-extern char *sdp_filename;
 
 extern float dts_delta_threshold;
 extern float dts_error_threshold;
@@ -679,7 +677,7 @@  extern const AVIOInterruptCB int_cb;
 extern const OptionDef options[];
 extern HWDevice *filter_hw_device;
 
-extern unsigned nb_output_dumped;
+extern atomic_uint nb_output_dumped;
 
 extern int ignore_unknown_streams;
 extern int copy_unknown_streams;
@@ -791,13 +789,6 @@  void of_free(OutputFile **pof);
 
 void of_enc_stats_close(void);
 
-int of_output_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt);
-
-/**
- * @param dts predicted packet dts in AV_TIME_BASE_Q
- */
-int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts);
-
 int64_t of_filesize(OutputFile *of);
 
 int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch);
diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c
index 7dd8e8c848..815bc883ea 100644
--- a/fftools/ffmpeg_mux.c
+++ b/fftools/ffmpeg_mux.c
@@ -23,16 +23,13 @@ 
 #include "ffmpeg.h"
 #include "ffmpeg_mux.h"
 #include "ffmpeg_utils.h"
-#include "objpool.h"
 #include "sync_queue.h"
-#include "thread_queue.h"
 
 #include "libavutil/fifo.h"
 #include "libavutil/intreadwrite.h"
 #include "libavutil/log.h"
 #include "libavutil/mem.h"
 #include "libavutil/timestamp.h"
-#include "libavutil/thread.h"
 
 #include "libavcodec/packet.h"
 
@@ -43,8 +40,6 @@  typedef struct MuxThreadContext {
     AVPacket *pkt;
 } MuxThreadContext;
 
-int want_sdp = 1;
-
 static Muxer *mux_from_of(OutputFile *of)
 {
     return (Muxer*)of;
@@ -207,6 +202,8 @@  static int sync_queue_process(Muxer *mux, OutputStream *ost, AVPacket *pkt, int
     return 0;
 }
 
+static int of_streamcopy(OutputStream *ost, AVPacket *pkt);
+
 /* apply the output bitstream filters */
 static int mux_packet_filter(Muxer *mux, OutputStream *ost,
                              AVPacket *pkt, int *stream_eof)
@@ -215,6 +212,18 @@  static int mux_packet_filter(Muxer *mux, OutputStream *ost,
     const char *err_msg;
     int ret = 0;
 
+    if (pkt && !ost->enc) {
+        ret = of_streamcopy(ost, pkt);
+        if (ret == AVERROR(EAGAIN))
+            return 0;
+        else if (ret == AVERROR_EOF) {
+            av_packet_unref(pkt);
+            pkt = NULL;
+            ret = 0;
+        } else if (ret < 0)
+            goto fail;
+    }
+
     if (ms->bsf_ctx) {
         int bsf_eof = 0;
 
@@ -316,19 +325,22 @@  void *muxer_thread(void *arg)
         OutputStream *ost;
         int stream_idx, stream_eof = 0;
 
-        ret = tq_receive(mux->tq, &stream_idx, mt.pkt);
+        ret = sch_mux_receive(mux->sch, of->index, mt.pkt);
+        stream_idx = mt.pkt->stream_index;
         if (stream_idx < 0) {
             av_log(mux, AV_LOG_VERBOSE, "All streams finished\n");
             ret = 0;
             break;
         }
 
-        ost = of->streams[stream_idx];
+        ost = of->streams[mux->sch_stream_idx[stream_idx]];
+        mt.pkt->stream_index = ost->index;
+
         ret = mux_packet_filter(mux, ost, ret < 0 ? NULL : mt.pkt, &stream_eof);
         av_packet_unref(mt.pkt);
         if (ret == AVERROR_EOF) {
             if (stream_eof) {
-                tq_receive_finish(mux->tq, stream_idx);
+                sch_mux_receive_finish(mux->sch, of->index, stream_idx);
             } else {
                 av_log(mux, AV_LOG_VERBOSE, "Muxer returned EOF\n");
                 ret = 0;
@@ -343,233 +355,54 @@  void *muxer_thread(void *arg)
 finish:
     mux_thread_uninit(&mt);
 
-    for (unsigned int i = 0; i < mux->fc->nb_streams; i++)
-        tq_receive_finish(mux->tq, i);
-
-    av_log(mux, AV_LOG_VERBOSE, "Terminating muxer thread\n");
-
     return (void*)(intptr_t)ret;
 }
 
-static int thread_submit_packet(Muxer *mux, OutputStream *ost, AVPacket *pkt)
-{
-    int ret = 0;
-
-    if (!pkt || ost->finished & MUXER_FINISHED)
-        goto finish;
-
-    ret = tq_send(mux->tq, ost->index, pkt);
-    if (ret < 0)
-        goto finish;
-
-    return 0;
-
-finish:
-    if (pkt)
-        av_packet_unref(pkt);
-
-    ost->finished |= MUXER_FINISHED;
-    tq_send_finish(mux->tq, ost->index);
-    return ret == AVERROR_EOF ? 0 : ret;
-}
-
-static int queue_packet(OutputStream *ost, AVPacket *pkt)
-{
-    MuxStream *ms = ms_from_ost(ost);
-    AVPacket *tmp_pkt = NULL;
-    int ret;
-
-    if (!av_fifo_can_write(ms->muxing_queue)) {
-        size_t cur_size = av_fifo_can_read(ms->muxing_queue);
-        size_t pkt_size = pkt ? pkt->size : 0;
-        unsigned int are_we_over_size =
-            (ms->muxing_queue_data_size + pkt_size) > ms->muxing_queue_data_threshold;
-        size_t limit    = are_we_over_size ? ms->max_muxing_queue_size : SIZE_MAX;
-        size_t new_size = FFMIN(2 * cur_size, limit);
-
-        if (new_size <= cur_size) {
-            av_log(ost, AV_LOG_ERROR,
-                   "Too many packets buffered for output stream %d:%d.\n",
-                   ost->file_index, ost->st->index);
-            return AVERROR(ENOSPC);
-        }
-        ret = av_fifo_grow2(ms->muxing_queue, new_size - cur_size);
-        if (ret < 0)
-            return ret;
-    }
-
-    if (pkt) {
-        ret = av_packet_make_refcounted(pkt);
-        if (ret < 0)
-            return ret;
-
-        tmp_pkt = av_packet_alloc();
-        if (!tmp_pkt)
-            return AVERROR(ENOMEM);
-
-        av_packet_move_ref(tmp_pkt, pkt);
-        ms->muxing_queue_data_size += tmp_pkt->size;
-    }
-    av_fifo_write(ms->muxing_queue, &tmp_pkt, 1);
-
-    return 0;
-}
-
-static int submit_packet(Muxer *mux, AVPacket *pkt, OutputStream *ost)
-{
-    int ret;
-
-    if (mux->tq) {
-        return thread_submit_packet(mux, ost, pkt);
-    } else {
-        /* the muxer is not initialized yet, buffer the packet */
-        ret = queue_packet(ost, pkt);
-        if (ret < 0) {
-            if (pkt)
-                av_packet_unref(pkt);
-            return ret;
-        }
-    }
-
-    return 0;
-}
-
-int of_output_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
-{
-    Muxer *mux = mux_from_of(of);
-    int ret = 0;
-
-    if (pkt && pkt->dts != AV_NOPTS_VALUE)
-        ost->last_mux_dts = av_rescale_q(pkt->dts, pkt->time_base, AV_TIME_BASE_Q);
-
-    ret = submit_packet(mux, pkt, ost);
-    if (ret < 0) {
-        av_log(ost, AV_LOG_ERROR, "Error submitting a packet to the muxer: %s",
-               av_err2str(ret));
-        return ret;
-    }
-
-    return 0;
-}
-
-int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts)
+static int of_streamcopy(OutputStream *ost, AVPacket *pkt)
 {
     OutputFile *of = output_files[ost->file_index];
     MuxStream  *ms = ms_from_ost(ost);
+    DemuxPktData *pd = pkt->opaque_ref ? (DemuxPktData*)pkt->opaque_ref->data : NULL;
+    int64_t      dts = pd ? pd->dts_est : AV_NOPTS_VALUE;
     int64_t start_time = (of->start_time == AV_NOPTS_VALUE) ? 0 : of->start_time;
     int64_t ts_offset;
-    AVPacket *opkt = ms->pkt;
-    int ret;
-
-    av_packet_unref(opkt);
 
     if (of->recording_time != INT64_MAX &&
         dts >= of->recording_time + start_time)
-        pkt = NULL;
-
-    // EOF: flush output bitstream filters.
-    if (!pkt)
-        return of_output_packet(of, ost, NULL);
+        return AVERROR_EOF;
 
     if (!ms->streamcopy_started && !(pkt->flags & AV_PKT_FLAG_KEY) &&
         !ms->copy_initial_nonkeyframes)
-        return 0;
+        return AVERROR(EAGAIN);
 
     if (!ms->streamcopy_started) {
         if (!ms->copy_prior_start &&
             (pkt->pts == AV_NOPTS_VALUE ?
              dts < ms->ts_copy_start :
              pkt->pts < av_rescale_q(ms->ts_copy_start, AV_TIME_BASE_Q, pkt->time_base)))
-            return 0;
+            return AVERROR(EAGAIN);
 
         if (of->start_time != AV_NOPTS_VALUE && dts < of->start_time)
-            return 0;
+            return AVERROR(EAGAIN);
     }
 
-    ret = av_packet_ref(opkt, pkt);
-    if (ret < 0)
-        return ret;
-
-    ts_offset = av_rescale_q(start_time, AV_TIME_BASE_Q, opkt->time_base);
+    ts_offset = av_rescale_q(start_time, AV_TIME_BASE_Q, pkt->time_base);
 
     if (pkt->pts != AV_NOPTS_VALUE)
-        opkt->pts -= ts_offset;
+        pkt->pts -= ts_offset;
 
     if (pkt->dts == AV_NOPTS_VALUE) {
-        opkt->dts = av_rescale_q(dts, AV_TIME_BASE_Q, opkt->time_base);
+        pkt->dts = av_rescale_q(dts, AV_TIME_BASE_Q, pkt->time_base);
     } else if (ost->st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) {
-        opkt->pts = opkt->dts - ts_offset;
+        pkt->pts = pkt->dts - ts_offset;
     }
-    opkt->dts -= ts_offset;
-
-    ret = of_output_packet(of, ost, opkt);
-    if (ret < 0)
-        return ret;
+    pkt->dts -= ts_offset;
 
     ms->streamcopy_started = 1;
 
     return 0;
 }
 
-static int thread_stop(Muxer *mux)
-{
-    void *ret;
-
-    if (!mux || !mux->tq)
-        return 0;
-
-    for (unsigned int i = 0; i < mux->fc->nb_streams; i++)
-        tq_send_finish(mux->tq, i);
-
-    pthread_join(mux->thread, &ret);
-
-    tq_free(&mux->tq);
-
-    return (int)(intptr_t)ret;
-}
-
-static int thread_start(Muxer *mux)
-{
-    AVFormatContext *fc = mux->fc;
-    ObjPool *op;
-    int ret;
-
-    op = objpool_alloc_packets();
-    if (!op)
-        return AVERROR(ENOMEM);
-
-    mux->tq = tq_alloc(fc->nb_streams, mux->thread_queue_size, op, pkt_move);
-    if (!mux->tq) {
-        objpool_free(&op);
-        return AVERROR(ENOMEM);
-    }
-
-    ret = pthread_create(&mux->thread, NULL, muxer_thread, (void*)mux);
-    if (ret) {
-        tq_free(&mux->tq);
-        return AVERROR(ret);
-    }
-
-    /* flush the muxing queues */
-    for (int i = 0; i < fc->nb_streams; i++) {
-        OutputStream *ost = mux->of.streams[i];
-        MuxStream     *ms = ms_from_ost(ost);
-        AVPacket *pkt;
-
-        while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) {
-            ret = thread_submit_packet(mux, ost, pkt);
-            if (pkt) {
-                ms->muxing_queue_data_size -= pkt->size;
-                av_packet_free(&pkt);
-            }
-            if (ret < 0)
-                return ret;
-        }
-    }
-
-    return 0;
-}
-
 int print_sdp(const char *filename);
 
 int print_sdp(const char *filename)
@@ -580,11 +413,6 @@  int print_sdp(const char *filename)
     AVIOContext *sdp_pb;
     AVFormatContext **avc;
 
-    for (i = 0; i < nb_output_files; i++) {
-        if (!mux_from_of(output_files[i])->header_written)
-            return 0;
-    }
-
     avc = av_malloc_array(nb_output_files, sizeof(*avc));
     if (!avc)
         return AVERROR(ENOMEM);
@@ -619,25 +447,17 @@  int print_sdp(const char *filename)
         avio_closep(&sdp_pb);
     }
 
-    // SDP successfully written, allow muxer threads to start
-    ret = 1;
-
 fail:
     av_freep(&avc);
     return ret;
 }
 
-int mux_check_init(Muxer *mux)
+int mux_check_init(void *arg)
 {
+    Muxer     *mux = arg;
     OutputFile *of = &mux->of;
     AVFormatContext *fc = mux->fc;
-    int ret, i;
-
-    for (i = 0; i < fc->nb_streams; i++) {
-        OutputStream *ost = of->streams[i];
-        if (!ost->initialized)
-            return 0;
-    }
+    int ret;
 
     ret = avformat_write_header(fc, &mux->opts);
     if (ret < 0) {
@@ -649,27 +469,7 @@  int mux_check_init(Muxer *mux)
     mux->header_written = 1;
 
     av_dump_format(fc, of->index, fc->url, 1);
-    nb_output_dumped++;
-
-    if (sdp_filename || want_sdp) {
-        ret = print_sdp(sdp_filename);
-        if (ret < 0) {
-            av_log(NULL, AV_LOG_ERROR, "Error writing the SDP.\n");
-            return ret;
-        } else if (ret == 1) {
-            /* SDP is written only after all the muxers are ready, so now we
-             * start ALL the threads */
-            for (i = 0; i < nb_output_files; i++) {
-                ret = thread_start(mux_from_of(output_files[i]));
-                if (ret < 0)
-                    return ret;
-            }
-        }
-    } else {
-        ret = thread_start(mux_from_of(of));
-        if (ret < 0)
-            return ret;
-    }
+    atomic_fetch_add(&nb_output_dumped, 1);
 
     return 0;
 }
@@ -726,9 +526,10 @@  int of_stream_init(OutputFile *of, OutputStream *ost)
                                          ost->st->time_base);
     }
 
-    ost->initialized = 1;
+    if (ms->sch_idx >= 0)
+        return sch_mux_stream_ready(mux->sch, of->index, ms->sch_idx);
 
-    return mux_check_init(mux);
+    return 0;
 }
 
 static int check_written(OutputFile *of)
@@ -842,15 +643,13 @@  int of_write_trailer(OutputFile *of)
     AVFormatContext *fc = mux->fc;
     int ret, mux_result = 0;
 
-    if (!mux->tq) {
+    if (!mux->header_written) {
         av_log(mux, AV_LOG_ERROR,
                "Nothing was written into output file, because "
                "at least one of its streams received no packets.\n");
         return AVERROR(EINVAL);
     }
 
-    mux_result = thread_stop(mux);
-
     ret = av_write_trailer(fc);
     if (ret < 0) {
         av_log(mux, AV_LOG_ERROR, "Error writing trailer: %s\n", av_err2str(ret));
@@ -895,13 +694,6 @@  static void ost_free(OutputStream **post)
         ost->logfile = NULL;
     }
 
-    if (ms->muxing_queue) {
-        AVPacket *pkt;
-        while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0)
-            av_packet_free(&pkt);
-        av_fifo_freep2(&ms->muxing_queue);
-    }
-
     avcodec_parameters_free(&ost->par_in);
 
     av_bsf_free(&ms->bsf_ctx);
@@ -966,8 +758,6 @@  void of_free(OutputFile **pof)
         return;
     mux = mux_from_of(of);
 
-    thread_stop(mux);
-
     sq_free(&of->sq_encode);
     sq_free(&mux->sq_mux);
 
diff --git a/fftools/ffmpeg_mux.h b/fftools/ffmpeg_mux.h
index aaf81eaa8d..82045d84b8 100644
--- a/fftools/ffmpeg_mux.h
+++ b/fftools/ffmpeg_mux.h
@@ -25,7 +25,6 @@ 
 #include <stdint.h>
 
 #include "ffmpeg_sched.h"
-#include "thread_queue.h"
 
 #include "libavformat/avformat.h"
 
@@ -33,7 +32,6 @@ 
 
 #include "libavutil/dict.h"
 #include "libavutil/fifo.h"
-#include "libavutil/thread.h"
 
 typedef struct MuxStream {
     OutputStream ost;
@@ -41,9 +39,6 @@  typedef struct MuxStream {
     // name used for logging
     char log_name[32];
 
-    /* the packets are buffered here until the muxer is ready to be initialized */
-    AVFifo *muxing_queue;
-
     AVBSFContext *bsf_ctx;
     AVPacket     *bsf_pkt;
 
@@ -56,17 +51,6 @@  typedef struct MuxStream {
 
     int64_t max_frames;
 
-    /*
-     * The size of the AVPackets' buffers in queue.
-     * Updated when a packet is either pushed or pulled from the queue.
-     */
-    size_t muxing_queue_data_size;
-
-    int max_muxing_queue_size;
-
-    /* Threshold after which max_muxing_queue_size will be in effect */
-    size_t muxing_queue_data_threshold;
-
     // timestamp from which the streamcopied streams should start,
     // in AV_TIME_BASE_Q;
     // everything before it should be discarded
@@ -105,9 +89,6 @@  typedef struct Muxer {
     int         *sch_stream_idx;
     int       nb_sch_stream_idx;
 
-    pthread_t    thread;
-    ThreadQueue *tq;
-
     AVDictionary *opts;
 
     int thread_queue_size;
@@ -121,10 +102,7 @@  typedef struct Muxer {
     AVPacket *sq_pkt;
 } Muxer;
 
-/* whether we want to print an SDP, set in of_open() */
-extern int want_sdp;
-
-int mux_check_init(Muxer *mux);
+int mux_check_init(void *arg);
 
 static MuxStream *ms_from_ost(OutputStream *ost)
 {
diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c
index bc3d3b9902..2699485908 100644
--- a/fftools/ffmpeg_mux_init.c
+++ b/fftools/ffmpeg_mux_init.c
@@ -923,13 +923,6 @@  static int new_stream_audio(Muxer *mux, const OptionsContext *o,
     return 0;
 }
 
-static int new_stream_attachment(Muxer *mux, const OptionsContext *o,
-                                 OutputStream *ost)
-{
-    ost->finished    = 1;
-    return 0;
-}
-
 static int new_stream_subtitle(Muxer *mux, const OptionsContext *o,
                                OutputStream *ost)
 {
@@ -1167,9 +1160,6 @@  static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
     if (!ost->par_in)
         return AVERROR(ENOMEM);
 
-    ms->muxing_queue = av_fifo_alloc2(8, sizeof(AVPacket*), 0);
-    if (!ms->muxing_queue)
-        return AVERROR(ENOMEM);
     ms->last_mux_dts = AV_NOPTS_VALUE;
 
     ost->st         = st;
@@ -1437,7 +1427,6 @@  static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
     case AVMEDIA_TYPE_VIDEO:      ret = new_stream_video     (mux, o, ost); break;
     case AVMEDIA_TYPE_AUDIO:      ret = new_stream_audio     (mux, o, ost); break;
     case AVMEDIA_TYPE_SUBTITLE:   ret = new_stream_subtitle  (mux, o, ost); break;
-    case AVMEDIA_TYPE_ATTACHMENT: ret = new_stream_attachment(mux, o, ost); break;
     }
     if (ret < 0)
         return ret;
@@ -1911,7 +1900,6 @@  static int setup_sync_queues(Muxer *mux, AVFormatContext *oc, int64_t buf_size_u
         MuxStream     *ms = ms_from_ost(ost);
         enum AVMediaType type = ost->type;
 
-        ost->sq_idx_encode = -1;
         ost->sq_idx_mux    = -1;
 
         nb_interleaved += IS_INTERLEAVED(type);
@@ -1934,11 +1922,17 @@  static int setup_sync_queues(Muxer *mux, AVFormatContext *oc, int64_t buf_size_u
      * - at least one encoded audio/video stream is frame-limited, since
      *   that has similar semantics to 'shortest'
      * - at least one audio encoder requires constant frame sizes
+     *
+     * Note that encoding sync queues are handled in the scheduler, because
+     * different encoders run in different threads and need external
+     * synchronization, while muxer sync queues can be handled inside the muxer
      */
     if ((of->shortest && nb_av_enc > 1) || limit_frames_av_enc || nb_audio_fs) {
-        of->sq_encode = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, mux);
-        if (!of->sq_encode)
-            return AVERROR(ENOMEM);
+        int sq_idx, ret;
+
+        sq_idx = sch_add_sq_enc(mux->sch, buf_size_us, mux);
+        if (sq_idx < 0)
+            return sq_idx;
 
         for (int i = 0; i < oc->nb_streams; i++) {
             OutputStream *ost = of->streams[i];
@@ -1948,13 +1942,11 @@  static int setup_sync_queues(Muxer *mux, AVFormatContext *oc, int64_t buf_size_u
             if (!IS_AV_ENC(ost, type))
                 continue;
 
-            ost->sq_idx_encode = sq_add_stream(of->sq_encode,
-                                               of->shortest || ms->max_frames < INT64_MAX);
-            if (ost->sq_idx_encode < 0)
-                return ost->sq_idx_encode;
-
-            if (ms->max_frames != INT64_MAX)
-                sq_limit_frames(of->sq_encode, ost->sq_idx_encode, ms->max_frames);
+            ret = sch_sq_add_enc(mux->sch, sq_idx, ms->sch_idx_enc,
+                                 of->shortest || ms->max_frames < INT64_MAX,
+                                 ms->max_frames);
+            if (ret < 0)
+                return ret;
         }
     }
 
@@ -2707,8 +2699,6 @@  int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
     av_strlcat(mux->log_name, "/",               sizeof(mux->log_name));
     av_strlcat(mux->log_name, oc->oformat->name, sizeof(mux->log_name));
 
-    if (strcmp(oc->oformat->name, "rtp"))
-        want_sdp = 0;
 
     of->format = oc->oformat;
     if (recording_time != INT64_MAX)
@@ -2724,7 +2714,7 @@  int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
                                            AVFMT_FLAG_BITEXACT);
     }
 
-    err = sch_add_mux(sch, muxer_thread, NULL, mux,
+    err = sch_add_mux(sch, muxer_thread, mux_check_init, mux,
                       !strcmp(oc->oformat->name, "rtp"));
     if (err < 0)
         return err;
diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c
index e1680ebe0e..92756d03e4 100644
--- a/fftools/ffmpeg_opt.c
+++ b/fftools/ffmpeg_opt.c
@@ -64,7 +64,6 @@  const char *const opt_name_top_field_first[]                  = {"top", NULL};
 HWDevice *filter_hw_device;
 
 char *vstats_filename;
-char *sdp_filename;
 
 float audio_drift_threshold = 0.1;
 float dts_delta_threshold   = 10;
@@ -580,9 +579,8 @@  fail:
 
 static int opt_sdp_file(void *optctx, const char *opt, const char *arg)
 {
-    av_free(sdp_filename);
-    sdp_filename = av_strdup(arg);
-    return 0;
+    Scheduler *sch = optctx;
+    return sch_sdp_filename(sch, arg);
 }
 
 #if CONFIG_VAAPI