diff mbox series

[FFmpeg-devel,13/13] fftools/ffmpeg: convert to a threaded architecture

Message ID 20231123191524.11296-15-anton@khirnov.net
State New
Headers show
Series [FFmpeg-devel,01/13] lavfi/buffersink: avoid leaking peeked_frame on uninit | expand

Checks

Context Check Description
yinshiyou/make_loongarch64 success Make finished
yinshiyou/make_fate_loongarch64 success Make fate finished
andriy/make_x86 success Make finished
andriy/make_fate_x86 success Make fate finished

Commit Message

Anton Khirnov Nov. 23, 2023, 7:15 p.m. UTC
Change the main loop and every component (demuxers, decoders, filters,
encoders, muxers) to use the previously added transcode scheduler. Every
instance of every such component was already running in a separate
thread, but now they can actually run in parallel.

Changes the results of ffmpeg-fix_sub_duration_heartbeat - tested by
JEEB to be more correct and deterministic.
---
 fftools/ffmpeg.c                              | 374 +--------
 fftools/ffmpeg.h                              |  97 +--
 fftools/ffmpeg_dec.c                          | 321 ++------
 fftools/ffmpeg_demux.c                        | 268 ++++---
 fftools/ffmpeg_enc.c                          | 368 ++-------
 fftools/ffmpeg_filter.c                       | 720 +++++-------------
 fftools/ffmpeg_mux.c                          | 324 ++------
 fftools/ffmpeg_mux.h                          |  24 +-
 fftools/ffmpeg_mux_init.c                     |  88 +--
 fftools/ffmpeg_opt.c                          |   6 +-
 .../fate/ffmpeg-fix_sub_duration_heartbeat    |  36 +-
 11 files changed, 598 insertions(+), 2028 deletions(-)

Comments

Michael Niedermayer Nov. 24, 2023, 10:26 p.m. UTC | #1
On Thu, Nov 23, 2023 at 08:15:08PM +0100, Anton Khirnov wrote:
> Change the main loop and every component (demuxers, decoders, filters,
> encoders, muxers) to use the previously added transcode scheduler. Every
> instance of every such component was already running in a separate
> thread, but now they can actually run in parallel.
> 
> Changes the results of ffmpeg-fix_sub_duration_heartbeat - tested by
> JEEB to be more correct and deterministic.
> ---
>  fftools/ffmpeg.c                              | 374 +--------
>  fftools/ffmpeg.h                              |  97 +--
>  fftools/ffmpeg_dec.c                          | 321 ++------
>  fftools/ffmpeg_demux.c                        | 268 ++++---
>  fftools/ffmpeg_enc.c                          | 368 ++-------
>  fftools/ffmpeg_filter.c                       | 720 +++++-------------
>  fftools/ffmpeg_mux.c                          | 324 ++------
>  fftools/ffmpeg_mux.h                          |  24 +-
>  fftools/ffmpeg_mux_init.c                     |  88 +--
>  fftools/ffmpeg_opt.c                          |   6 +-
>  .../fate/ffmpeg-fix_sub_duration_heartbeat    |  36 +-
>  11 files changed, 598 insertions(+), 2028 deletions(-)

this (and many other things) infinite loops
./ffmpeg -f lavfi -i testsrc2 -bsf:v noise -bitexact -t 2 -y /tmp/y.y4m

thx

[...]
diff mbox series

Patch

diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index b8a97258a0..30b594fd97 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -117,7 +117,7 @@  typedef struct BenchmarkTimeStamps {
 static BenchmarkTimeStamps get_benchmark_time_stamps(void);
 static int64_t getmaxrss(void);
 
-unsigned nb_output_dumped = 0;
+atomic_uint nb_output_dumped = 0;
 
 static BenchmarkTimeStamps current_time;
 AVIOContext *progress_avio = NULL;
@@ -138,30 +138,6 @@  static struct termios oldtty;
 static int restore_tty;
 #endif
 
-/* sub2video hack:
-   Convert subtitles to video with alpha to insert them in filter graphs.
-   This is a temporary solution until libavfilter gets real subtitles support.
- */
-
-static void sub2video_heartbeat(InputFile *infile, int64_t pts, AVRational tb)
-{
-    /* When a frame is read from a file, examine all sub2video streams in
-       the same file and send the sub2video frame again. Otherwise, decoded
-       video frames could be accumulating in the filter graph while a filter
-       (possibly overlay) is desperately waiting for a subtitle frame. */
-    for (int i = 0; i < infile->nb_streams; i++) {
-        InputStream *ist = infile->streams[i];
-
-        if (ist->dec_ctx->codec_type != AVMEDIA_TYPE_SUBTITLE)
-            continue;
-
-        for (int j = 0; j < ist->nb_filters; j++)
-            ifilter_sub2video_heartbeat(ist->filters[j], pts, tb);
-    }
-}
-
-/* end of sub2video hack */
-
 static void term_exit_sigsafe(void)
 {
 #if HAVE_TERMIOS_H
@@ -499,23 +475,13 @@  void update_benchmark(const char *fmt, ...)
     }
 }
 
-void close_output_stream(OutputStream *ost)
-{
-    OutputFile *of = output_files[ost->file_index];
-    ost->finished |= ENCODER_FINISHED;
-
-    if (ost->sq_idx_encode >= 0)
-        sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL));
-}
-
-static void print_report(int is_last_report, int64_t timer_start, int64_t cur_time)
+static void print_report(int is_last_report, int64_t timer_start, int64_t cur_time, int64_t pts)
 {
     AVBPrint buf, buf_script;
     int64_t total_size = of_filesize(output_files[0]);
     int vid;
     double bitrate;
     double speed;
-    int64_t pts = AV_NOPTS_VALUE;
     static int64_t last_time = -1;
     static int first_report = 1;
     uint64_t nb_frames_dup = 0, nb_frames_drop = 0;
@@ -533,7 +499,7 @@  static void print_report(int is_last_report, int64_t timer_start, int64_t cur_ti
             last_time = cur_time;
         }
         if (((cur_time - last_time) < stats_period && !first_report) ||
-            (first_report && nb_output_dumped < nb_output_files))
+            (first_report && atomic_load(&nb_output_dumped) < nb_output_files))
             return;
         last_time = cur_time;
     }
@@ -544,7 +510,7 @@  static void print_report(int is_last_report, int64_t timer_start, int64_t cur_ti
     av_bprint_init(&buf, 0, AV_BPRINT_SIZE_AUTOMATIC);
     av_bprint_init(&buf_script, 0, AV_BPRINT_SIZE_AUTOMATIC);
     for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
-        const float q = ost->enc ? ost->quality / (float) FF_QP2LAMBDA : -1;
+        const float q = ost->enc ? atomic_load(&ost->quality) / (float) FF_QP2LAMBDA : -1;
 
         if (vid && ost->type == AVMEDIA_TYPE_VIDEO) {
             av_bprintf(&buf, "q=%2.1f ", q);
@@ -565,22 +531,18 @@  static void print_report(int is_last_report, int64_t timer_start, int64_t cur_ti
             if (is_last_report)
                 av_bprintf(&buf, "L");
 
-            nb_frames_dup  = ost->filter->nb_frames_dup;
-            nb_frames_drop = ost->filter->nb_frames_drop;
+            nb_frames_dup  = atomic_load(&ost->filter->nb_frames_dup);
+            nb_frames_drop = atomic_load(&ost->filter->nb_frames_drop);
 
             vid = 1;
         }
-        /* compute min output value */
-        if (ost->last_mux_dts != AV_NOPTS_VALUE) {
-            if (pts == AV_NOPTS_VALUE || ost->last_mux_dts > pts)
-                pts = ost->last_mux_dts;
-            if (copy_ts) {
-                if (copy_ts_first_pts == AV_NOPTS_VALUE && pts > 1)
-                    copy_ts_first_pts = pts;
-                if (copy_ts_first_pts != AV_NOPTS_VALUE)
-                    pts -= copy_ts_first_pts;
-            }
-        }
+    }
+
+    if (copy_ts) {
+        if (copy_ts_first_pts == AV_NOPTS_VALUE && pts > 1)
+            copy_ts_first_pts = pts;
+        if (copy_ts_first_pts != AV_NOPTS_VALUE)
+            pts -= copy_ts_first_pts;
     }
 
     us    = FFABS64U(pts) % AV_TIME_BASE;
@@ -783,81 +745,6 @@  int subtitle_wrap_frame(AVFrame *frame, AVSubtitle *subtitle, int copy)
     return 0;
 }
 
-int trigger_fix_sub_duration_heartbeat(OutputStream *ost, const AVPacket *pkt)
-{
-    OutputFile *of = output_files[ost->file_index];
-    int64_t signal_pts = av_rescale_q(pkt->pts, pkt->time_base,
-                                      AV_TIME_BASE_Q);
-
-    if (!ost->fix_sub_duration_heartbeat || !(pkt->flags & AV_PKT_FLAG_KEY))
-        // we are only interested in heartbeats on streams configured, and
-        // only on random access points.
-        return 0;
-
-    for (int i = 0; i < of->nb_streams; i++) {
-        OutputStream *iter_ost = of->streams[i];
-        InputStream  *ist      = iter_ost->ist;
-        int ret = AVERROR_BUG;
-
-        if (iter_ost == ost || !ist || !ist->decoding_needed ||
-            ist->dec_ctx->codec_type != AVMEDIA_TYPE_SUBTITLE)
-            // We wish to skip the stream that causes the heartbeat,
-            // output streams without an input stream, streams not decoded
-            // (as fix_sub_duration is only done for decoded subtitles) as
-            // well as non-subtitle streams.
-            continue;
-
-        if ((ret = fix_sub_duration_heartbeat(ist, signal_pts)) < 0)
-            return ret;
-    }
-
-    return 0;
-}
-
-/* pkt = NULL means EOF (needed to flush decoder buffers) */
-static int process_input_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
-{
-    InputFile *f = input_files[ist->file_index];
-    int64_t dts_est = AV_NOPTS_VALUE;
-    int ret = 0;
-    int eof_reached = 0;
-
-    if (ist->decoding_needed) {
-        ret = dec_packet(ist, pkt, no_eof);
-        if (ret < 0 && ret != AVERROR_EOF)
-            return ret;
-    }
-    if (ret == AVERROR_EOF || (!pkt && !ist->decoding_needed))
-        eof_reached = 1;
-
-    if (pkt && pkt->opaque_ref) {
-        DemuxPktData *pd = (DemuxPktData*)pkt->opaque_ref->data;
-        dts_est = pd->dts_est;
-    }
-
-    if (f->recording_time != INT64_MAX) {
-        int64_t start_time = 0;
-        if (copy_ts) {
-            start_time += f->start_time != AV_NOPTS_VALUE ? f->start_time : 0;
-            start_time += start_at_zero ? 0 : f->start_time_effective;
-        }
-        if (dts_est >= f->recording_time + start_time)
-            pkt = NULL;
-    }
-
-    for (int oidx = 0; oidx < ist->nb_outputs; oidx++) {
-        OutputStream *ost = ist->outputs[oidx];
-        if (ost->enc || (!pkt && no_eof))
-            continue;
-
-        ret = of_streamcopy(ost, pkt, dts_est);
-        if (ret < 0)
-            return ret;
-    }
-
-    return !eof_reached;
-}
-
 static void print_stream_maps(void)
 {
     av_log(NULL, AV_LOG_INFO, "Stream mapping:\n");
@@ -934,43 +821,6 @@  static void print_stream_maps(void)
     }
 }
 
-/**
- * Select the output stream to process.
- *
- * @retval 0 an output stream was selected
- * @retval AVERROR(EAGAIN) need to wait until more input is available
- * @retval AVERROR_EOF no more streams need output
- */
-static int choose_output(OutputStream **post)
-{
-    int64_t opts_min = INT64_MAX;
-    OutputStream *ost_min = NULL;
-
-    for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
-        int64_t opts;
-
-        if (ost->filter && ost->filter->last_pts != AV_NOPTS_VALUE) {
-            opts = ost->filter->last_pts;
-        } else {
-            opts = ost->last_mux_dts == AV_NOPTS_VALUE ?
-                   INT64_MIN : ost->last_mux_dts;
-        }
-
-        if (!ost->initialized && !ost->finished) {
-            ost_min = ost;
-            break;
-        }
-        if (!ost->finished && opts < opts_min) {
-            opts_min = opts;
-            ost_min  = ost;
-        }
-    }
-    if (!ost_min)
-        return AVERROR_EOF;
-    *post = ost_min;
-    return ost_min->unavailable ? AVERROR(EAGAIN) : 0;
-}
-
 static void set_tty_echo(int on)
 {
 #if HAVE_TERMIOS_H
@@ -1042,149 +892,21 @@  static int check_keyboard_interaction(int64_t cur_time)
     return 0;
 }
 
-static void reset_eagain(void)
-{
-    for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost))
-        ost->unavailable = 0;
-}
-
-static void decode_flush(InputFile *ifile)
-{
-    for (int i = 0; i < ifile->nb_streams; i++) {
-        InputStream *ist = ifile->streams[i];
-
-        if (ist->discard || !ist->decoding_needed)
-            continue;
-
-        dec_packet(ist, NULL, 1);
-    }
-}
-
-/*
- * Return
- * - 0 -- one packet was read and processed
- * - AVERROR(EAGAIN) -- no packets were available for selected file,
- *   this function should be called again
- * - AVERROR_EOF -- this function should not be called again
- */
-static int process_input(int file_index, AVPacket *pkt)
-{
-    InputFile *ifile = input_files[file_index];
-    InputStream *ist;
-    int ret, i;
-
-    ret = ifile_get_packet(ifile, pkt);
-
-    if (ret == 1) {
-        /* the input file is looped: flush the decoders */
-        decode_flush(ifile);
-        return AVERROR(EAGAIN);
-    }
-    if (ret < 0) {
-        if (ret != AVERROR_EOF) {
-            av_log(ifile, AV_LOG_ERROR,
-                   "Error retrieving a packet from demuxer: %s\n", av_err2str(ret));
-            if (exit_on_error)
-                return ret;
-        }
-
-        for (i = 0; i < ifile->nb_streams; i++) {
-            ist = ifile->streams[i];
-            if (!ist->discard) {
-                ret = process_input_packet(ist, NULL, 0);
-                if (ret>0)
-                    return 0;
-                else if (ret < 0)
-                    return ret;
-            }
-
-            /* mark all outputs that don't go through lavfi as finished */
-            for (int oidx = 0; oidx < ist->nb_outputs; oidx++) {
-                OutputStream *ost = ist->outputs[oidx];
-                OutputFile    *of = output_files[ost->file_index];
-
-                ret = of_output_packet(of, ost, NULL);
-                if (ret < 0)
-                    return ret;
-            }
-        }
-
-        ifile->eof_reached = 1;
-        return AVERROR(EAGAIN);
-    }
-
-    reset_eagain();
-
-    ist = ifile->streams[pkt->stream_index];
-
-    sub2video_heartbeat(ifile, pkt->pts, pkt->time_base);
-
-    ret = process_input_packet(ist, pkt, 0);
-
-    av_packet_unref(pkt);
-
-    return ret < 0 ? ret : 0;
-}
-
-/**
- * Run a single step of transcoding.
- *
- * @return  0 for success, <0 for error
- */
-static int transcode_step(OutputStream *ost, AVPacket *demux_pkt)
-{
-    InputStream  *ist = NULL;
-    int ret;
-
-    if (ost->filter) {
-        if ((ret = fg_transcode_step(ost->filter->graph, &ist)) < 0)
-            return ret;
-        if (!ist)
-            return 0;
-    } else {
-        ist = ost->ist;
-        av_assert0(ist);
-    }
-
-    ret = process_input(ist->file_index, demux_pkt);
-    if (ret == AVERROR(EAGAIN)) {
-        return 0;
-    }
-
-    if (ret < 0)
-        return ret == AVERROR_EOF ? 0 : ret;
-
-    // process_input() above might have caused output to become available
-    // in multiple filtergraphs, so we process all of them
-    for (int i = 0; i < nb_filtergraphs; i++) {
-        ret = reap_filters(filtergraphs[i], 0);
-        if (ret < 0)
-            return ret;
-    }
-
-    return 0;
-}
-
 /*
  * The following code is the main loop of the file converter
  */
-static int transcode(Scheduler *sch, int *err_rate_exceeded)
+static int transcode(Scheduler *sch)
 {
     int ret = 0, i;
-    InputStream *ist;
-    int64_t timer_start;
-    AVPacket *demux_pkt = NULL;
+    int64_t timer_start, transcode_ts = 0;
 
     print_stream_maps();
 
-    *err_rate_exceeded = 0;
     atomic_store(&transcode_init_done, 1);
 
-    demux_pkt = av_packet_alloc();
-    if (!demux_pkt) {
-        ret = AVERROR(ENOMEM);
-        goto fail;
-    }
+    ret = sch_start(sch);
+    if (ret < 0)
+        return ret;
 
     if (stdin_interaction) {
         av_log(NULL, AV_LOG_INFO, "Press [q] to stop, [?] for help\n");
@@ -1192,8 +914,7 @@  static int transcode(Scheduler *sch, int *err_rate_exceeded)
 
     timer_start = av_gettime_relative();
 
-    while (!received_sigterm) {
-        OutputStream *ost;
+    while (!sch_wait(sch, stats_period, &transcode_ts)) {
         int64_t cur_time= av_gettime_relative();
 
         /* if 'q' pressed, exits */
@@ -1201,49 +922,11 @@  static int transcode(Scheduler *sch, int *err_rate_exceeded)
             if (check_keyboard_interaction(cur_time) < 0)
                 break;
 
-        ret = choose_output(&ost);
-        if (ret == AVERROR(EAGAIN)) {
-            reset_eagain();
-            av_usleep(10000);
-            ret = 0;
-            continue;
-        } else if (ret < 0) {
-            av_log(NULL, AV_LOG_VERBOSE, "No more output streams to write to, finishing.\n");
-            ret = 0;
-            break;
-        }
-
-        ret = transcode_step(ost, demux_pkt);
-        if (ret < 0 && ret != AVERROR_EOF) {
-            av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n", av_err2str(ret));
-            break;
-        }
-
         /* dump report by using the output first video and audio streams */
-        print_report(0, timer_start, cur_time);
+        print_report(0, timer_start, cur_time, transcode_ts);
     }
 
-    /* at the end of stream, we must flush the decoder buffers */
-    for (ist = ist_iter(NULL); ist; ist = ist_iter(ist)) {
-        float err_rate;
-
-        if (!input_files[ist->file_index]->eof_reached) {
-            int err = process_input_packet(ist, NULL, 0);
-            ret = err_merge(ret, err);
-        }
-
-        err_rate = (ist->frames_decoded || ist->decode_errors) ?
-                   ist->decode_errors / (ist->frames_decoded + ist->decode_errors) : 0.f;
-        if (err_rate > max_error_rate) {
-            av_log(ist, AV_LOG_FATAL, "Decode error rate %g exceeds maximum %g\n",
-                   err_rate, max_error_rate);
-            *err_rate_exceeded = 1;
-        } else if (err_rate)
-            av_log(ist, AV_LOG_VERBOSE, "Decode error rate %g\n", err_rate);
-    }
-    ret = err_merge(ret, enc_flush());
-
-    term_exit();
+    ret = sch_stop(sch);
 
     /* write the trailer if needed */
     for (i = 0; i < nb_output_files; i++) {
@@ -1251,11 +934,10 @@  static int transcode(Scheduler *sch, int *err_rate_exceeded)
         ret = err_merge(ret, err);
     }
 
-    /* dump report by using the first video and audio streams */
-    print_report(1, timer_start, av_gettime_relative());
+    term_exit();
 
-fail:
-    av_packet_free(&demux_pkt);
+    /* dump report by using the first video and audio streams */
+    print_report(1, timer_start, av_gettime_relative(), transcode_ts);
 
     return ret;
 }
@@ -1308,7 +990,7 @@  int main(int argc, char **argv)
 {
     Scheduler *sch = NULL;
 
-    int ret, err_rate_exceeded;
+    int ret;
     BenchmarkTimeStamps ti;
 
     init_dynload();
@@ -1350,7 +1032,7 @@  int main(int argc, char **argv)
     }
 
     current_time = ti = get_benchmark_time_stamps();
-    ret = transcode(sch, &err_rate_exceeded);
+    ret = transcode(sch);
     if (ret >= 0 && do_benchmark) {
         int64_t utime, stime, rtime;
         current_time = get_benchmark_time_stamps();
@@ -1362,8 +1044,8 @@  int main(int argc, char **argv)
                utime / 1000000.0, stime / 1000000.0, rtime / 1000000.0);
     }
 
-    ret = received_nb_signals ? 255 :
-          err_rate_exceeded   ?  69 : ret;
+    ret = received_nb_signals                 ? 255 :
+          (ret == FFMPEG_ERROR_RATE_EXCEEDED) ?  69 : ret;
 
 finish:
     if (ret == AVERROR_EXIT)
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index a89038b765..ba82b7490d 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -61,6 +61,8 @@ 
 #define FFMPEG_OPT_TOP 1
 #define FFMPEG_OPT_FORCE_KF_SOURCE_NO_DROP 1
 
+#define FFMPEG_ERROR_RATE_EXCEEDED FFERRTAG('E', 'R', 'E', 'D')
+
 enum VideoSyncMethod {
     VSYNC_AUTO = -1,
     VSYNC_PASSTHROUGH,
@@ -82,13 +84,16 @@  enum HWAccelID {
 };
 
 enum FrameOpaque {
-    FRAME_OPAQUE_REAP_FILTERS = 1,
-    FRAME_OPAQUE_CHOOSE_INPUT,
-    FRAME_OPAQUE_SUB_HEARTBEAT,
+    FRAME_OPAQUE_SUB_HEARTBEAT = 1,
     FRAME_OPAQUE_EOF,
     FRAME_OPAQUE_SEND_COMMAND,
 };
 
+enum PacketOpaque {
+    PKT_OPAQUE_SUB_HEARTBEAT = 1,
+    PKT_OPAQUE_FIX_SUB_DURATION,
+};
+
 typedef struct HWDevice {
     const char *name;
     enum AVHWDeviceType type;
@@ -309,11 +314,8 @@  typedef struct OutputFilter {
 
     enum AVMediaType     type;
 
-    /* pts of the last frame received from this filter, in AV_TIME_BASE_Q */
-    int64_t last_pts;
-
-    uint64_t nb_frames_dup;
-    uint64_t nb_frames_drop;
+    atomic_uint_least64_t nb_frames_dup;
+    atomic_uint_least64_t nb_frames_drop;
 } OutputFilter;
 
 typedef struct FilterGraph {
@@ -426,11 +428,6 @@  typedef struct InputFile {
 
     float readrate;
     int accurate_seek;
-
-    /* when looping the input file, this queue is used by decoders to report
-     * the last frame timestamp back to the demuxer thread */
-    AVThreadMessageQueue *audio_ts_queue;
-    int                   audio_ts_queue_size;
 } InputFile;
 
 enum forced_keyframes_const {
@@ -532,8 +529,6 @@  typedef struct OutputStream {
     InputStream *ist;
 
     AVStream *st;            /* stream in the output file */
-    /* dts of the last packet sent to the muxing queue, in AV_TIME_BASE_Q */
-    int64_t last_mux_dts;
 
     AVRational enc_timebase;
 
@@ -578,13 +573,6 @@  typedef struct OutputStream {
     AVDictionary *sws_dict;
     AVDictionary *swr_opts;
     char *apad;
-    OSTFinished finished;        /* no more packets should be written for this stream */
-    int unavailable;                     /* true if the steram is unavailable (possibly temporarily) */
-
-    // init_output_stream() has been called for this stream
-    // The encoder and the bitstream filters have been initialized and the stream
-    // parameters are set in the AVStream.
-    int initialized;
 
     const char *attachment_filename;
 
@@ -598,9 +586,8 @@  typedef struct OutputStream {
     uint64_t samples_encoded;
 
     /* packet quality factor */
-    int quality;
+    atomic_int quality;
 
-    int sq_idx_encode;
     int sq_idx_mux;
 
     EncStats enc_stats_pre;
@@ -658,7 +645,6 @@  extern FilterGraph **filtergraphs;
 extern int        nb_filtergraphs;
 
 extern char *vstats_filename;
-extern char *sdp_filename;
 
 extern float dts_delta_threshold;
 extern float dts_error_threshold;
@@ -691,7 +677,7 @@  extern const AVIOInterruptCB int_cb;
 extern const OptionDef options[];
 extern HWDevice *filter_hw_device;
 
-extern unsigned nb_output_dumped;
+extern atomic_uint nb_output_dumped;
 
 extern int ignore_unknown_streams;
 extern int copy_unknown_streams;
@@ -737,10 +723,6 @@  FrameData *frame_data(AVFrame *frame);
 
 const FrameData *frame_data_c(AVFrame *frame);
 
-int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference);
-int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb);
-void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb);
-
 /**
  * Set up fallback filtering parameters from a decoder context. They will only
  * be used if no frames are ever sent on this input, otherwise the actual
@@ -761,26 +743,9 @@  int fg_create(FilterGraph **pfg, char *graph_desc, Scheduler *sch);
 
 void fg_free(FilterGraph **pfg);
 
-/**
- * Perform a step of transcoding for the specified filter graph.
- *
- * @param[in]  graph     filter graph to consider
- * @param[out] best_ist  input stream where a frame would allow to continue
- * @return  0 for success, <0 for error
- */
-int fg_transcode_step(FilterGraph *graph, InputStream **best_ist);
-
 void fg_send_command(FilterGraph *fg, double time, const char *target,
                      const char *command, const char *arg, int all_filters);
 
-/**
- * Get and encode new output from specified filtergraph, without causing
- * activity.
- *
- * @return  0 for success, <0 for severe errors
- */
-int reap_filters(FilterGraph *fg, int flush);
-
 int ffmpeg_parse_options(int argc, char **argv, Scheduler *sch);
 
 void enc_stats_write(OutputStream *ost, EncStats *es,
@@ -807,25 +772,11 @@  int hwaccel_retrieve_data(AVCodecContext *avctx, AVFrame *input);
 int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx);
 void dec_free(Decoder **pdec);
 
-/**
- * Submit a packet for decoding
- *
- * When pkt==NULL and no_eof=0, there will be no more input. Flush decoders and
- * mark all downstreams as finished.
- *
- * When pkt==NULL and no_eof=1, the stream was reset (e.g. after a seek). Flush
- * decoders and await further input.
- */
-int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof);
-
 int enc_alloc(Encoder **penc, const AVCodec *codec,
               Scheduler *sch, unsigned sch_idx);
 void enc_free(Encoder **penc);
 
-int enc_open(OutputStream *ost, const AVFrame *frame);
-int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub);
-int enc_frame(OutputStream *ost, AVFrame *frame);
-int enc_flush(void);
+int enc_open(void *opaque, const AVFrame *frame);
 
 /*
  * Initialize muxing state for the given stream, should be called
@@ -840,30 +791,11 @@  void of_free(OutputFile **pof);
 
 void of_enc_stats_close(void);
 
-int of_output_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt);
-
-/**
- * @param dts predicted packet dts in AV_TIME_BASE_Q
- */
-int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts);
-
 int64_t of_filesize(OutputFile *of);
 
 int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch);
 void ifile_close(InputFile **f);
 
-/**
- * Get next input packet from the demuxer.
- *
- * @param pkt the packet is written here when this function returns 0
- * @return
- * - 0 when a packet has been read successfully
- * - 1 when stream end was reached, but the stream is looped;
- *     caller should flush decoders and read from this demuxer again
- * - a negative error code on failure
- */
-int ifile_get_packet(InputFile *f, AVPacket *pkt);
-
 int ist_output_add(InputStream *ist, OutputStream *ost);
 int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple);
 
@@ -880,9 +812,6 @@  InputStream *ist_iter(InputStream *prev);
  * pass NULL to start iteration */
 OutputStream *ost_iter(OutputStream *prev);
 
-void close_output_stream(OutputStream *ost);
-int trigger_fix_sub_duration_heartbeat(OutputStream *ost, const AVPacket *pkt);
-int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts);
 void update_benchmark(const char *fmt, ...);
 
 #define SPECIFIER_OPT_FMT_str  "%s"
diff --git a/fftools/ffmpeg_dec.c b/fftools/ffmpeg_dec.c
index 90ea0d6d93..5dde82a276 100644
--- a/fftools/ffmpeg_dec.c
+++ b/fftools/ffmpeg_dec.c
@@ -54,24 +54,6 @@  struct Decoder {
 
     Scheduler      *sch;
     unsigned        sch_idx;
-
-    pthread_t       thread;
-    /**
-     * Queue for sending coded packets from the main thread to
-     * the decoder thread.
-     *
-     * An empty packet is sent to flush the decoder without terminating
-     * decoding.
-     */
-    ThreadQueue    *queue_in;
-    /**
-     * Queue for sending decoded frames from the decoder thread
-     * to the main thread.
-     *
-     * An empty frame is sent to signal that a single packet has been fully
-     * processed.
-     */
-    ThreadQueue    *queue_out;
 };
 
 // data that is local to the decoder thread and not visible outside of it
@@ -80,24 +62,6 @@  typedef struct DecThreadContext {
     AVPacket        *pkt;
 } DecThreadContext;
 
-static int dec_thread_stop(Decoder *d)
-{
-    void *ret;
-
-    if (!d->queue_in)
-        return 0;
-
-    tq_send_finish(d->queue_in, 0);
-    tq_receive_finish(d->queue_out, 0);
-
-    pthread_join(d->thread, &ret);
-
-    tq_free(&d->queue_in);
-    tq_free(&d->queue_out);
-
-    return (intptr_t)ret;
-}
-
 void dec_free(Decoder **pdec)
 {
     Decoder *dec = *pdec;
@@ -105,8 +69,6 @@  void dec_free(Decoder **pdec)
     if (!dec)
         return;
 
-    dec_thread_stop(dec);
-
     av_frame_free(&dec->frame);
     av_packet_free(&dec->pkt);
 
@@ -148,25 +110,6 @@  fail:
     return AVERROR(ENOMEM);
 }
 
-static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame)
-{
-    int i, ret = 0;
-
-    for (i = 0; i < ist->nb_filters; i++) {
-        ret = ifilter_send_frame(ist->filters[i], decoded_frame,
-                                 i < ist->nb_filters - 1 ||
-                                 ist->dec->type == AVMEDIA_TYPE_SUBTITLE);
-        if (ret == AVERROR_EOF)
-            ret = 0; /* ignore */
-        if (ret < 0) {
-            av_log(NULL, AV_LOG_ERROR,
-                   "Failed to inject frame into filter network: %s\n", av_err2str(ret));
-            break;
-        }
-    }
-    return ret;
-}
-
 static AVRational audio_samplerate_update(void *logctx, Decoder *d,
                                           const AVFrame *frame)
 {
@@ -421,28 +364,14 @@  static int process_subtitle(InputStream *ist, AVFrame *frame)
     if (!subtitle)
         return 0;
 
-    ret = send_frame_to_filters(ist, frame);
+    ret = sch_dec_send(d->sch, d->sch_idx, frame);
     if (ret < 0)
-        return ret;
+        av_frame_unref(frame);
 
-    subtitle = (AVSubtitle*)frame->buf[0]->data;
-    if (!subtitle->num_rects)
-        return 0;
-
-    for (int oidx = 0; oidx < ist->nb_outputs; oidx++) {
-        OutputStream *ost = ist->outputs[oidx];
-        if (!ost->enc || ost->type != AVMEDIA_TYPE_SUBTITLE)
-            continue;
-
-        ret = enc_subtitle(output_files[ost->file_index], ost, subtitle);
-        if (ret < 0)
-            return ret;
-    }
-
-    return 0;
+    return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
 }
 
-int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts)
+static int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts)
 {
     Decoder *d = ist->decoder;
     int ret = AVERROR_BUG;
@@ -468,12 +397,24 @@  int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts)
 static int transcode_subtitles(InputStream *ist, const AVPacket *pkt,
                                AVFrame *frame)
 {
-    Decoder          *d = ist->decoder;
+    Decoder *d = ist->decoder;
     AVPacket *flush_pkt = NULL;
     AVSubtitle subtitle;
     int got_output;
     int ret;
 
+    if (pkt && (intptr_t)pkt->opaque == PKT_OPAQUE_SUB_HEARTBEAT) {
+        frame->pts       = pkt->pts;
+        frame->time_base = pkt->time_base;
+        frame->opaque    = (void*)(intptr_t)FRAME_OPAQUE_SUB_HEARTBEAT;
+
+        ret = sch_dec_send(d->sch, d->sch_idx, frame);
+        return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
+    } else if (pkt && (intptr_t)pkt->opaque == PKT_OPAQUE_FIX_SUB_DURATION) {
+        return fix_sub_duration_heartbeat(ist, av_rescale_q(pkt->pts, pkt->time_base,
+                                                            AV_TIME_BASE_Q));
+    }
+
     if (!pkt) {
         flush_pkt = av_packet_alloc();
         if (!flush_pkt)
@@ -496,7 +437,7 @@  static int transcode_subtitles(InputStream *ist, const AVPacket *pkt,
 
     ist->frames_decoded++;
 
-    // XXX the queue for transferring data back to the main thread runs
+    // XXX the queue for transferring data to consumers runs
     // on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put that
     // inside the frame
     // eventually, subtitles should be switched to use AVFrames natively
@@ -509,26 +450,7 @@  static int transcode_subtitles(InputStream *ist, const AVPacket *pkt,
     frame->width  = ist->dec_ctx->width;
     frame->height = ist->dec_ctx->height;
 
-    ret = tq_send(d->queue_out, 0, frame);
-    if (ret < 0)
-        av_frame_unref(frame);
-
-    return ret;
-}
-
-static int send_filter_eof(InputStream *ist)
-{
-    Decoder *d = ist->decoder;
-    int i, ret;
-
-    for (i = 0; i < ist->nb_filters; i++) {
-        int64_t end_pts = d->last_frame_pts == AV_NOPTS_VALUE ? AV_NOPTS_VALUE :
-                          d->last_frame_pts + d->last_frame_duration_est;
-        ret = ifilter_send_eof(ist->filters[i], end_pts, d->last_frame_tb);
-        if (ret < 0)
-            return ret;
-    }
-    return 0;
+    return process_subtitle(ist, frame);
 }
 
 static int packet_decode(InputStream *ist, AVPacket *pkt, AVFrame *frame)
@@ -635,9 +557,11 @@  static int packet_decode(InputStream *ist, AVPacket *pkt, AVFrame *frame)
 
         ist->frames_decoded++;
 
-        ret = tq_send(d->queue_out, 0, frame);
-        if (ret < 0)
-            return ret;
+        ret = sch_dec_send(d->sch, d->sch_idx, frame);
+        if (ret < 0) {
+            av_frame_unref(frame);
+            return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
+        }
     }
 }
 
@@ -679,7 +603,6 @@  fail:
 void *decoder_thread(void *arg)
 {
     InputStream *ist = arg;
-    InputFile *ifile = input_files[ist->file_index];
     Decoder       *d = ist->decoder;
     DecThreadContext dt;
     int ret = 0, input_status = 0;
@@ -691,19 +614,31 @@  void *decoder_thread(void *arg)
     dec_thread_set_name(ist);
 
     while (!input_status) {
-        int dummy, flush_buffers;
+        int flush_buffers, have_data;
 
-        input_status = tq_receive(d->queue_in, &dummy, dt.pkt);
-        flush_buffers = input_status >= 0 && !dt.pkt->buf;
-        if (!dt.pkt->buf)
+        input_status  = sch_dec_receive(d->sch, d->sch_idx, dt.pkt);
+        have_data     = input_status >= 0 &&
+            (dt.pkt->buf || dt.pkt->side_data_elems ||
+             (intptr_t)dt.pkt->opaque == PKT_OPAQUE_SUB_HEARTBEAT ||
+             (intptr_t)dt.pkt->opaque == PKT_OPAQUE_FIX_SUB_DURATION);
+        flush_buffers = input_status >= 0 && !have_data;
+        if (!have_data)
             av_log(ist, AV_LOG_VERBOSE, "Decoder thread received %s packet\n",
                    flush_buffers ? "flush" : "EOF");
 
-        ret = packet_decode(ist, dt.pkt->buf ? dt.pkt : NULL, dt.frame);
+        ret = packet_decode(ist, have_data ? dt.pkt : NULL, dt.frame);
 
         av_packet_unref(dt.pkt);
         av_frame_unref(dt.frame);
 
+        // AVERROR_EOF  - EOF from the decoder
+        // AVERROR_EXIT - EOF from the scheduler
+        // we treat them differently when flushing
+        if (ret == AVERROR_EXIT) {
+            ret = AVERROR_EOF;
+            flush_buffers = 0;
+        }
+
         if (ret == AVERROR_EOF) {
             av_log(ist, AV_LOG_VERBOSE, "Decoder returned EOF, %s\n",
                    flush_buffers ? "resetting" : "finishing");
@@ -711,11 +646,10 @@  void *decoder_thread(void *arg)
             if (!flush_buffers)
                 break;
 
-            /* report last frame duration to the demuxer thread */
+            /* report last frame duration to the scheduler */
             if (ist->dec->type == AVMEDIA_TYPE_AUDIO) {
-                Timestamp ts = { .ts = d->last_frame_pts + d->last_frame_duration_est,
-                                 .tb = d->last_frame_tb };
-                av_thread_message_queue_send(ifile->audio_ts_queue, &ts, 0);
+                dt.pkt->pts       = d->last_frame_pts + d->last_frame_duration_est;
+                dt.pkt->time_base = d->last_frame_tb;
             }
 
             avcodec_flush_buffers(ist->dec_ctx);
@@ -724,149 +658,47 @@  void *decoder_thread(void *arg)
                    av_err2str(ret));
             break;
         }
-
-        // signal to the consumer thread that the entire packet was processed
-        ret = tq_send(d->queue_out, 0, dt.frame);
-        if (ret < 0) {
-            if (ret != AVERROR_EOF)
-                av_log(ist, AV_LOG_ERROR, "Error communicating with the main thread\n");
-            break;
-        }
     }
 
     // EOF is normal thread termination
     if (ret == AVERROR_EOF)
         ret = 0;
 
+    // on success send EOF timestamp to our downstreams
+    if (ret >= 0) {
+        float err_rate;
+
+        av_frame_unref(dt.frame);
+
+        dt.frame->opaque    = (void*)(intptr_t)FRAME_OPAQUE_EOF;
+        dt.frame->pts       = d->last_frame_pts == AV_NOPTS_VALUE ? AV_NOPTS_VALUE :
+                              d->last_frame_pts + d->last_frame_duration_est;
+        dt.frame->time_base = d->last_frame_tb;
+
+        ret = sch_dec_send(d->sch, d->sch_idx, dt.frame);
+        if (ret < 0 && ret != AVERROR_EOF) {
+            av_log(NULL, AV_LOG_FATAL,
+                   "Error signalling EOF timestamp: %s\n", av_err2str(ret));
+            goto finish;
+        }
+        ret = 0;
+
+        err_rate = (ist->frames_decoded || ist->decode_errors) ?
+                   ist->decode_errors / (ist->frames_decoded + ist->decode_errors) : 0.f;
+        if (err_rate > max_error_rate) {
+            av_log(ist, AV_LOG_FATAL, "Decode error rate %g exceeds maximum %g\n",
+                   err_rate, max_error_rate);
+            ret = FFMPEG_ERROR_RATE_EXCEEDED;
+        } else if (err_rate)
+            av_log(ist, AV_LOG_VERBOSE, "Decode error rate %g\n", err_rate);
+    }
+
 finish:
-    tq_receive_finish(d->queue_in,  0);
-    tq_send_finish   (d->queue_out, 0);
-
-    // make sure the demuxer does not get stuck waiting for audio durations
-    // that will never arrive
-    if (ifile->audio_ts_queue && ist->dec->type == AVMEDIA_TYPE_AUDIO)
-        av_thread_message_queue_set_err_recv(ifile->audio_ts_queue, AVERROR_EOF);
-
     dec_thread_uninit(&dt);
 
-    av_log(ist, AV_LOG_VERBOSE, "Terminating decoder thread\n");
-
     return (void*)(intptr_t)ret;
 }
 
-int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
-{
-    Decoder *d = ist->decoder;
-    int ret = 0, thread_ret;
-
-    // thread already joined
-    if (!d->queue_in)
-        return AVERROR_EOF;
-
-    // send the packet/flush request/EOF to the decoder thread
-    if (pkt || no_eof) {
-        av_packet_unref(d->pkt);
-
-        if (pkt) {
-            ret = av_packet_ref(d->pkt, pkt);
-            if (ret < 0)
-                goto finish;
-        }
-
-        ret = tq_send(d->queue_in, 0, d->pkt);
-        if (ret < 0)
-            goto finish;
-    } else
-        tq_send_finish(d->queue_in, 0);
-
-    // retrieve all decoded data for the packet
-    while (1) {
-        int dummy;
-
-        ret = tq_receive(d->queue_out, &dummy, d->frame);
-        if (ret < 0)
-            goto finish;
-
-        // packet fully processed
-        if (!d->frame->buf[0])
-            return 0;
-
-        // process the decoded frame
-        if (ist->dec->type == AVMEDIA_TYPE_SUBTITLE) {
-            ret = process_subtitle(ist, d->frame);
-        } else {
-            ret = send_frame_to_filters(ist, d->frame);
-        }
-        av_frame_unref(d->frame);
-        if (ret < 0)
-            goto finish;
-    }
-
-finish:
-    thread_ret = dec_thread_stop(d);
-    if (thread_ret < 0) {
-        av_log(ist, AV_LOG_ERROR, "Decoder thread returned error: %s\n",
-               av_err2str(thread_ret));
-        ret = err_merge(ret, thread_ret);
-    }
-    // non-EOF errors here are all fatal
-    if (ret < 0 && ret != AVERROR_EOF)
-        return ret;
-
-    // signal EOF to our downstreams
-    ret = send_filter_eof(ist);
-    if (ret < 0) {
-        av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n");
-        return ret;
-    }
-
-    return AVERROR_EOF;
-}
-
-static int dec_thread_start(InputStream *ist)
-{
-    Decoder *d = ist->decoder;
-    ObjPool *op;
-    int ret = 0;
-
-    op = objpool_alloc_packets();
-    if (!op)
-        return AVERROR(ENOMEM);
-
-    d->queue_in = tq_alloc(1, 1, op, pkt_move);
-    if (!d->queue_in) {
-        objpool_free(&op);
-        return AVERROR(ENOMEM);
-    }
-
-    op = objpool_alloc_frames();
-    if (!op)
-        goto fail;
-
-    d->queue_out = tq_alloc(1, 4, op, frame_move);
-    if (!d->queue_out) {
-        objpool_free(&op);
-        goto fail;
-    }
-
-    ret = pthread_create(&d->thread, NULL, decoder_thread, ist);
-    if (ret) {
-        ret = AVERROR(ret);
-        av_log(ist, AV_LOG_ERROR, "pthread_create() failed: %s\n",
-               av_err2str(ret));
-        goto fail;
-    }
-
-    return 0;
-fail:
-    if (ret >= 0)
-        ret = AVERROR(ENOMEM);
-
-    tq_free(&d->queue_in);
-    tq_free(&d->queue_out);
-    return ret;
-}
-
 static enum AVPixelFormat get_format(AVCodecContext *s, const enum AVPixelFormat *pix_fmts)
 {
     InputStream *ist = s->opaque;
@@ -1118,12 +950,5 @@  int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx)
     if (ret < 0)
         return ret;
 
-    ret = dec_thread_start(ist);
-    if (ret < 0) {
-        av_log(ist, AV_LOG_ERROR, "Error starting decoder thread: %s\n",
-               av_err2str(ret));
-        return ret;
-    }
-
     return 0;
 }
diff --git a/fftools/ffmpeg_demux.c b/fftools/ffmpeg_demux.c
index 2234dbe076..91cd7a1125 100644
--- a/fftools/ffmpeg_demux.c
+++ b/fftools/ffmpeg_demux.c
@@ -22,8 +22,6 @@ 
 #include "ffmpeg.h"
 #include "ffmpeg_sched.h"
 #include "ffmpeg_utils.h"
-#include "objpool.h"
-#include "thread_queue.h"
 
 #include "libavutil/avassert.h"
 #include "libavutil/avstring.h"
@@ -35,7 +33,6 @@ 
 #include "libavutil/pixdesc.h"
 #include "libavutil/time.h"
 #include "libavutil/timestamp.h"
-#include "libavutil/thread.h"
 
 #include "libavcodec/packet.h"
 
@@ -66,7 +63,11 @@  typedef struct DemuxStream {
 
     double ts_scale;
 
+    // scheduler returned EOF for this stream
+    int finished;
+
     int streamcopy_needed;
+    int have_sub2video;
 
     int wrap_correction_done;
     int saw_first_ts;
@@ -101,6 +102,7 @@  typedef struct Demuxer {
 
     /* number of times input stream should be looped */
     int loop;
+    int have_audio_dec;
     /* duration of the looped segment of the input file */
     Timestamp duration;
     /* pts with the smallest/largest values ever seen */
@@ -113,11 +115,12 @@  typedef struct Demuxer {
     double readrate_initial_burst;
 
     Scheduler            *sch;
-    ThreadQueue          *thread_queue;
-    int                   thread_queue_size;
-    pthread_t             thread;
+
+    AVPacket             *pkt_heartbeat;
 
     int                   read_started;
+    int                   nb_streams_used;
+    int                   nb_streams_finished;
 } Demuxer;
 
 static DemuxStream *ds_from_ist(InputStream *ist)
@@ -153,7 +156,7 @@  static void report_new_stream(Demuxer *d, const AVPacket *pkt)
     d->nb_streams_warn = pkt->stream_index + 1;
 }
 
-static int seek_to_start(Demuxer *d)
+static int seek_to_start(Demuxer *d, Timestamp end_pts)
 {
     InputFile    *ifile = &d->f;
     AVFormatContext *is = ifile->ctx;
@@ -163,21 +166,10 @@  static int seek_to_start(Demuxer *d)
     if (ret < 0)
         return ret;
 
-    if (ifile->audio_ts_queue_size) {
-        int got_ts = 0;
-
-        while (got_ts < ifile->audio_ts_queue_size) {
-            Timestamp ts;
-            ret = av_thread_message_queue_recv(ifile->audio_ts_queue, &ts, 0);
-            if (ret < 0)
-                return ret;
-            got_ts++;
-
-            if (d->max_pts.ts == AV_NOPTS_VALUE ||
-                av_compare_ts(d->max_pts.ts, d->max_pts.tb, ts.ts, ts.tb) < 0)
-                d->max_pts = ts;
-        }
-    }
+    if (end_pts.ts != AV_NOPTS_VALUE &&
+        (d->max_pts.ts == AV_NOPTS_VALUE ||
+         av_compare_ts(d->max_pts.ts, d->max_pts.tb, end_pts.ts, end_pts.tb) < 0))
+        d->max_pts = end_pts;
 
     if (d->max_pts.ts != AV_NOPTS_VALUE) {
         int64_t min_pts = d->min_pts.ts == AV_NOPTS_VALUE ? 0 : d->min_pts.ts;
@@ -404,7 +396,7 @@  static int ts_fixup(Demuxer *d, AVPacket *pkt)
     duration = av_rescale_q(d->duration.ts, d->duration.tb, pkt->time_base);
     if (pkt->pts != AV_NOPTS_VALUE) {
         // audio decoders take precedence for estimating total file duration
-        int64_t pkt_duration = ifile->audio_ts_queue_size ? 0 : pkt->duration;
+        int64_t pkt_duration = d->have_audio_dec ? 0 : pkt->duration;
 
         pkt->pts += duration;
 
@@ -440,7 +432,7 @@  static int ts_fixup(Demuxer *d, AVPacket *pkt)
     return 0;
 }
 
-static int input_packet_process(Demuxer *d, AVPacket *pkt)
+static int input_packet_process(Demuxer *d, AVPacket *pkt, unsigned *send_flags)
 {
     InputFile     *f = &d->f;
     InputStream *ist = f->streams[pkt->stream_index];
@@ -451,6 +443,16 @@  static int input_packet_process(Demuxer *d, AVPacket *pkt)
     if (ret < 0)
         return ret;
 
+    if (f->recording_time != INT64_MAX) {
+        int64_t start_time = 0;
+        if (copy_ts) {
+            start_time += f->start_time != AV_NOPTS_VALUE ? f->start_time : 0;
+            start_time += start_at_zero ? 0 : f->start_time_effective;
+        }
+        if (ds->dts >= f->recording_time + start_time)
+            *send_flags |= DEMUX_SEND_STREAMCOPY_EOF;
+    }
+
     ds->data_size += pkt->size;
     ds->nb_packets++;
 
@@ -465,6 +467,8 @@  static int input_packet_process(Demuxer *d, AVPacket *pkt)
                av_ts2timestr(input_files[ist->file_index]->ts_offset, &AV_TIME_BASE_Q));
     }
 
+    pkt->stream_index = ds->sch_idx_stream;
+
     return 0;
 }
 
@@ -488,6 +492,65 @@  static void readrate_sleep(Demuxer *d)
     }
 }
 
+static int do_send(Demuxer *d, DemuxStream *ds, AVPacket *pkt, unsigned flags,
+                   const char *pkt_desc)
+{
+    int ret;
+
+    ret = sch_demux_send(d->sch, d->f.index, pkt, flags);
+    if (ret == AVERROR_EOF) {
+        av_packet_unref(pkt);
+
+        av_log(ds, AV_LOG_VERBOSE, "All consumers of this stream are done\n");
+        ds->finished = 1;
+
+        if (++d->nb_streams_finished == d->nb_streams_used) {
+            av_log(d, AV_LOG_VERBOSE, "All consumers are done\n");
+            return AVERROR_EOF;
+        }
+    } else if (ret < 0) {
+        if (ret != AVERROR_EXIT)
+            av_log(d, AV_LOG_ERROR,
+                   "Unable to send %s packet to consumers: %s\n",
+                   pkt_desc, av_err2str(ret));
+        return ret;
+    }
+
+    return 0;
+}
+
+static int demux_send(Demuxer *d, DemuxStream *ds, AVPacket *pkt, unsigned flags)
+{
+    InputFile  *f = &d->f;
+    int ret;
+
+    // send heartbeat for sub2video streams
+    if (d->pkt_heartbeat && pkt->pts != AV_NOPTS_VALUE) {
+        for (int i = 0; i < f->nb_streams; i++) {
+            DemuxStream *ds1 = ds_from_ist(f->streams[i]);
+
+            if (ds1->finished || !ds1->have_sub2video)
+                continue;
+
+            d->pkt_heartbeat->pts          = pkt->pts;
+            d->pkt_heartbeat->time_base    = pkt->time_base;
+            d->pkt_heartbeat->stream_index = ds1->sch_idx_stream;
+            d->pkt_heartbeat->opaque       = (void*)(intptr_t)PKT_OPAQUE_SUB_HEARTBEAT;
+
+            ret = do_send(d, ds1, d->pkt_heartbeat, 0, "heartbeat");
+            if (ret < 0)
+                return ret;
+        }
+    }
+
+    ret = do_send(d, ds, pkt, flags, "demuxed");
+    if (ret < 0)
+        return ret;
+
+
+    return 0;
+}
+
 static void discard_unused_programs(InputFile *ifile)
 {
     for (int j = 0; j < ifile->ctx->nb_programs; j++) {
@@ -527,9 +590,13 @@  static void *input_thread(void *arg)
 
     discard_unused_programs(f);
 
+    d->read_started    = 1;
     d->wallclock_start = av_gettime_relative();
 
     while (1) {
+        DemuxStream *ds;
+        unsigned send_flags = 0;
+
         ret = av_read_frame(f->ctx, pkt);
 
         if (ret == AVERROR(EAGAIN)) {
@@ -538,11 +605,13 @@  static void *input_thread(void *arg)
         }
         if (ret < 0) {
             if (d->loop) {
-                /* signal looping to the consumer thread */
+                /* signal looping to our consumers */
                 pkt->stream_index = -1;
-                ret = tq_send(d->thread_queue, 0, pkt);
+
+                ret = sch_demux_send(d->sch, f->index, pkt, 0);
                 if (ret >= 0)
-                    ret = seek_to_start(d);
+                    ret = seek_to_start(d, (Timestamp){ .ts = pkt->pts,
+                                                        .tb = pkt->time_base });
                 if (ret >= 0)
                     continue;
 
@@ -551,9 +620,11 @@  static void *input_thread(void *arg)
 
             if (ret == AVERROR_EOF)
                 av_log(d, AV_LOG_VERBOSE, "EOF while reading input\n");
-            else
+            else {
                 av_log(d, AV_LOG_ERROR, "Error during demuxing: %s\n",
                        av_err2str(ret));
+                ret = exit_on_error ? ret : 0;
+            }
 
             break;
         }
@@ -565,8 +636,9 @@  static void *input_thread(void *arg)
 
         /* the following test is needed in case new streams appear
            dynamically in stream : we ignore them */
-        if (pkt->stream_index >= f->nb_streams ||
-            f->streams[pkt->stream_index]->discard) {
+        ds = pkt->stream_index < f->nb_streams ?
+             ds_from_ist(f->streams[pkt->stream_index]) : NULL;
+        if (!ds || ds->ist.discard || ds->finished) {
             report_new_stream(d, pkt);
             av_packet_unref(pkt);
             continue;
@@ -583,122 +655,26 @@  static void *input_thread(void *arg)
             }
         }
 
-        ret = input_packet_process(d, pkt);
+        ret = input_packet_process(d, pkt, &send_flags);
         if (ret < 0)
             break;
 
         if (f->readrate)
             readrate_sleep(d);
 
-        ret = tq_send(d->thread_queue, 0, pkt);
-        if (ret < 0) {
-            if (ret != AVERROR_EOF)
-                av_log(f, AV_LOG_ERROR,
-                       "Unable to send packet to main thread: %s\n",
-                       av_err2str(ret));
+        ret = demux_send(d, ds, pkt, send_flags);
+        if (ret < 0)
             break;
-        }
     }
 
+    // EOF/EXIT is normal termination
+    if (ret == AVERROR_EOF || ret == AVERROR_EXIT)
+        ret = 0;
+
 finish:
-    av_assert0(ret < 0);
-    tq_send_finish(d->thread_queue, 0);
-
     av_packet_free(&pkt);
 
-    av_log(d, AV_LOG_VERBOSE, "Terminating demuxer thread\n");
-
-    return NULL;
-}
-
-static void thread_stop(Demuxer *d)
-{
-    InputFile *f = &d->f;
-
-    if (!d->thread_queue)
-        return;
-
-    tq_receive_finish(d->thread_queue, 0);
-
-    pthread_join(d->thread, NULL);
-
-    tq_free(&d->thread_queue);
-
-    av_thread_message_queue_free(&f->audio_ts_queue);
-}
-
-static int thread_start(Demuxer *d)
-{
-    int ret;
-    InputFile *f = &d->f;
-    ObjPool *op;
-
-    if (d->thread_queue_size <= 0)
-        d->thread_queue_size = (nb_input_files > 1 ? 8 : 1);
-
-    op = objpool_alloc_packets();
-    if (!op)
-        return AVERROR(ENOMEM);
-
-    d->thread_queue = tq_alloc(1, d->thread_queue_size, op, pkt_move);
-    if (!d->thread_queue) {
-        objpool_free(&op);
-        return AVERROR(ENOMEM);
-    }
-
-    if (d->loop) {
-        int nb_audio_dec = 0;
-
-        for (int i = 0; i < f->nb_streams; i++) {
-            InputStream *ist = f->streams[i];
-            nb_audio_dec += !!(ist->decoding_needed &&
-                               ist->st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO);
-        }
-
-        if (nb_audio_dec) {
-            ret = av_thread_message_queue_alloc(&f->audio_ts_queue,
-                                                nb_audio_dec, sizeof(Timestamp));
-            if (ret < 0)
-                goto fail;
-            f->audio_ts_queue_size = nb_audio_dec;
-        }
-    }
-
-    if ((ret = pthread_create(&d->thread, NULL, input_thread, d))) {
-        av_log(d, AV_LOG_ERROR, "pthread_create failed: %s. Try to increase `ulimit -v` or decrease `ulimit -s`.\n", strerror(ret));
-        ret = AVERROR(ret);
-        goto fail;
-    }
-
-    d->read_started = 1;
-
-    return 0;
-fail:
-    tq_free(&d->thread_queue);
-    return ret;
-}
-
-int ifile_get_packet(InputFile *f, AVPacket *pkt)
-{
-    Demuxer *d = demuxer_from_ifile(f);
-    int ret, dummy;
-
-    if (!d->thread_queue) {
-        ret = thread_start(d);
-        if (ret < 0)
-            return ret;
-    }
-
-    ret = tq_receive(d->thread_queue, &dummy, pkt);
-    if (ret < 0)
-        return ret;
-
-    if (pkt->stream_index == -1) {
-        av_assert0(!pkt->data && !pkt->side_data_elems);
-        return 1;
-    }
-
-    return 0;
+    return (void*)(intptr_t)ret;
 }
 
 static void demux_final_stats(Demuxer *d)
@@ -769,8 +745,6 @@  void ifile_close(InputFile **pf)
     if (!f)
         return;
 
-    thread_stop(d);
-
     if (d->read_started)
         demux_final_stats(d);
 
@@ -780,6 +754,8 @@  void ifile_close(InputFile **pf)
 
     avformat_close_input(&f->ctx);
 
+    av_packet_free(&d->pkt_heartbeat);
+
     av_freep(pf);
 }
 
@@ -802,7 +778,11 @@  static int ist_use(InputStream *ist, int decoding_needed)
         ds->sch_idx_stream = ret;
     }
 
-    ist->discard          = 0;
+    if (ist->discard) {
+        ist->discard = 0;
+        d->nb_streams_used++;
+    }
+
     ist->st->discard      = ist->user_set_discard;
     ist->decoding_needed |= decoding_needed;
     ds->streamcopy_needed |= !decoding_needed;
@@ -823,6 +803,8 @@  static int ist_use(InputStream *ist, int decoding_needed)
         ret = dec_open(ist, d->sch, ds->sch_idx_dec);
         if (ret < 0)
             return ret;
+
+        d->have_audio_dec |= is_audio;
     }
 
     return 0;
@@ -848,6 +830,7 @@  int ist_output_add(InputStream *ist, OutputStream *ost)
 
 int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple)
 {
+    Demuxer      *d = demuxer_from_ifile(input_files[ist->file_index]);
     DemuxStream *ds = ds_from_ist(ist);
     int ret;
 
@@ -866,6 +849,15 @@  int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple)
     if (ret < 0)
         return ret;
 
+    if (ist->dec_ctx->codec_type == AVMEDIA_TYPE_SUBTITLE) {
+        if (!d->pkt_heartbeat) {
+            d->pkt_heartbeat = av_packet_alloc();
+            if (!d->pkt_heartbeat)
+                return AVERROR(ENOMEM);
+        }
+        ds->have_sub2video = 1;
+    }
+
     return ds->sch_idx_dec;
 }
 
@@ -1607,8 +1599,6 @@  int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch)
                "since neither -readrate nor -re were given\n");
     }
 
-    d->thread_queue_size = o->thread_queue_size;
-
     /* Add all the streams from the given input file to the demuxer */
     for (int i = 0; i < ic->nb_streams; i++) {
         ret = ist_add(o, d, ic->streams[i]);
diff --git a/fftools/ffmpeg_enc.c b/fftools/ffmpeg_enc.c
index 9871381c0e..9383b167f7 100644
--- a/fftools/ffmpeg_enc.c
+++ b/fftools/ffmpeg_enc.c
@@ -41,12 +41,6 @@ 
 #include "libavformat/avformat.h"
 
 struct Encoder {
-    AVFrame *sq_frame;
-
-    // packet for receiving encoded output
-    AVPacket *pkt;
-    AVFrame  *sub_frame;
-
     // combined size of all the packets received from the encoder
     uint64_t data_size;
 
@@ -54,25 +48,9 @@  struct Encoder {
     uint64_t packets_encoded;
 
     int opened;
-    int finished;
 
     Scheduler      *sch;
     unsigned        sch_idx;
-
-    pthread_t       thread;
-    /**
-     * Queue for sending frames from the main thread to
-     * the encoder thread.
-     */
-    ThreadQueue    *queue_in;
-    /**
-     * Queue for sending encoded packets from the encoder thread
-     * to the main thread.
-     *
-     * An empty packet is sent to signal that a previously sent
-     * frame has been fully processed.
-     */
-    ThreadQueue    *queue_out;
 };
 
 // data that is local to the decoder thread and not visible outside of it
@@ -81,24 +59,6 @@  typedef struct EncoderThread {
     AVPacket  *pkt;
 } EncoderThread;
 
-static int enc_thread_stop(Encoder *e)
-{
-    void *ret;
-
-    if (!e->queue_in)
-        return 0;
-
-    tq_send_finish(e->queue_in, 0);
-    tq_receive_finish(e->queue_out, 0);
-
-    pthread_join(e->thread, &ret);
-
-    tq_free(&e->queue_in);
-    tq_free(&e->queue_out);
-
-    return (int)(intptr_t)ret;
-}
-
 void enc_free(Encoder **penc)
 {
     Encoder *enc = *penc;
@@ -106,13 +66,6 @@  void enc_free(Encoder **penc)
     if (!enc)
         return;
 
-    enc_thread_stop(enc);
-
-    av_frame_free(&enc->sq_frame);
-    av_frame_free(&enc->sub_frame);
-
-    av_packet_free(&enc->pkt);
-
     av_freep(penc);
 }
 
@@ -127,25 +80,12 @@  int enc_alloc(Encoder **penc, const AVCodec *codec,
     if (!enc)
         return AVERROR(ENOMEM);
 
-    if (codec->type == AVMEDIA_TYPE_SUBTITLE) {
-        enc->sub_frame = av_frame_alloc();
-        if (!enc->sub_frame)
-            goto fail;
-    }
-
-    enc->pkt = av_packet_alloc();
-    if (!enc->pkt)
-        goto fail;
-
     enc->sch     = sch;
     enc->sch_idx = sch_idx;
 
     *penc = enc;
 
     return 0;
-fail:
-    enc_free(&enc);
-    return AVERROR(ENOMEM);
 }
 
 static int hw_device_setup_for_encode(OutputStream *ost, AVBufferRef *frames_ref)
@@ -224,52 +164,9 @@  static int set_encoder_id(OutputFile *of, OutputStream *ost)
     return 0;
 }
 
-static int enc_thread_start(OutputStream *ost)
-{
-    Encoder *e = ost->enc;
-    ObjPool *op;
-    int ret = 0;
-
-    op = objpool_alloc_frames();
-    if (!op)
-        return AVERROR(ENOMEM);
-
-    e->queue_in = tq_alloc(1, 1, op, frame_move);
-    if (!e->queue_in) {
-        objpool_free(&op);
-        return AVERROR(ENOMEM);
-    }
-
-    op = objpool_alloc_packets();
-    if (!op)
-        goto fail;
-
-    e->queue_out = tq_alloc(1, 4, op, pkt_move);
-    if (!e->queue_out) {
-        objpool_free(&op);
-        goto fail;
-    }
-
-    ret = pthread_create(&e->thread, NULL, encoder_thread, ost);
-    if (ret) {
-        ret = AVERROR(ret);
-        av_log(ost, AV_LOG_ERROR, "pthread_create() failed: %s\n",
-               av_err2str(ret));
-        goto fail;
-    }
-
-    return 0;
-fail:
-    if (ret >= 0)
-        ret = AVERROR(ENOMEM);
-
-    tq_free(&e->queue_in);
-    tq_free(&e->queue_out);
-    return ret;
-}
-
-int enc_open(OutputStream *ost, const AVFrame *frame)
+int enc_open(void *opaque, const AVFrame *frame)
 {
+    OutputStream *ost = opaque;
     InputStream *ist = ost->ist;
     Encoder              *e = ost->enc;
     AVCodecContext *enc_ctx = ost->enc_ctx;
@@ -277,6 +174,7 @@  int enc_open(OutputStream *ost, const AVFrame *frame)
     const AVCodec      *enc = enc_ctx->codec;
     OutputFile      *of = output_files[ost->file_index];
     FrameData *fd;
+    int frame_samples = 0;
     int ret;
 
     if (e->opened)
@@ -420,17 +318,8 @@  int enc_open(OutputStream *ost, const AVFrame *frame)
 
     e->opened = 1;
 
-    if (ost->sq_idx_encode >= 0) {
-        e->sq_frame = av_frame_alloc();
-        if (!e->sq_frame)
-            return AVERROR(ENOMEM);
-    }
-
-    if (ost->enc_ctx->frame_size) {
-        av_assert0(ost->sq_idx_encode >= 0);
-        sq_frame_samples(output_files[ost->file_index]->sq_encode,
-                         ost->sq_idx_encode, ost->enc_ctx->frame_size);
-    }
+    if (ost->enc_ctx->frame_size)
+        frame_samples = ost->enc_ctx->frame_size;
 
     ret = check_avoptions(ost->encoder_opts);
     if (ret < 0)
@@ -476,18 +365,11 @@  int enc_open(OutputStream *ost, const AVFrame *frame)
     if (ost->st->time_base.num <= 0 || ost->st->time_base.den <= 0)
         ost->st->time_base = av_add_q(ost->enc_ctx->time_base, (AVRational){0, 1});
 
-    ret = enc_thread_start(ost);
-    if (ret < 0) {
-        av_log(ost, AV_LOG_ERROR, "Error starting encoder thread: %s\n",
-               av_err2str(ret));
-        return ret;
-    }
-
     ret = of_stream_init(of, ost);
     if (ret < 0)
         return ret;
 
-    return 0;
+    return frame_samples;
 }
 
 static int check_recording_time(OutputStream *ost, int64_t ts, AVRational tb)
@@ -514,8 +396,7 @@  static int do_subtitle_out(OutputFile *of, OutputStream *ost, const AVSubtitle *
         av_log(ost, AV_LOG_ERROR, "Subtitle packets must have a pts\n");
         return exit_on_error ? AVERROR(EINVAL) : 0;
     }
-    if (ost->finished ||
-        (of->start_time != AV_NOPTS_VALUE && sub->pts < of->start_time))
+    if ((of->start_time != AV_NOPTS_VALUE && sub->pts < of->start_time))
         return 0;
 
     enc = ost->enc_ctx;
@@ -579,7 +460,7 @@  static int do_subtitle_out(OutputFile *of, OutputStream *ost, const AVSubtitle *
         }
         pkt->dts = pkt->pts;
 
-        ret = tq_send(e->queue_out, 0, pkt);
+        ret = sch_enc_send(e->sch, e->sch_idx, pkt);
         if (ret < 0) {
             av_packet_unref(pkt);
             return ret;
@@ -671,10 +552,13 @@  static int update_video_stats(OutputStream *ost, const AVPacket *pkt, int write_
     int64_t frame_number;
     double ti1, bitrate, avg_bitrate;
     double psnr_val = -1;
+    int quality;
 
-    ost->quality   = sd ? AV_RL32(sd) : -1;
+    quality        = sd ? AV_RL32(sd) : -1;
     pict_type      = sd ? sd[4] : AV_PICTURE_TYPE_NONE;
 
+    atomic_store(&ost->quality, quality);
+
     if ((enc->flags & AV_CODEC_FLAG_PSNR) && sd && sd[5]) {
         // FIXME the scaling assumes 8bit
         double error = AV_RL64(sd + 8) / (enc->width * enc->height * 255.0 * 255.0);
@@ -697,10 +581,10 @@  static int update_video_stats(OutputStream *ost, const AVPacket *pkt, int write_
     frame_number = e->packets_encoded;
     if (vstats_version <= 1) {
         fprintf(vstats_file, "frame= %5"PRId64" q= %2.1f ", frame_number,
-                ost->quality / (float)FF_QP2LAMBDA);
+                quality / (float)FF_QP2LAMBDA);
     } else  {
         fprintf(vstats_file, "out= %2d st= %2d frame= %5"PRId64" q= %2.1f ", ost->file_index, ost->index, frame_number,
-                ost->quality / (float)FF_QP2LAMBDA);
+                quality / (float)FF_QP2LAMBDA);
     }
 
     if (psnr_val >= 0)
@@ -801,18 +685,11 @@  static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame,
                    av_ts2str(pkt->duration), av_ts2timestr(pkt->duration, &enc->time_base));
         }
 
-        if ((ret = trigger_fix_sub_duration_heartbeat(ost, pkt)) < 0) {
-            av_log(NULL, AV_LOG_ERROR,
-                   "Subtitle heartbeat logic failed in %s! (%s)\n",
-                   __func__, av_err2str(ret));
-            return ret;
-        }
-
         e->data_size += pkt->size;
 
         e->packets_encoded++;
 
-        ret = tq_send(e->queue_out, 0, pkt);
+        ret = sch_enc_send(e->sch, e->sch_idx, pkt);
         if (ret < 0) {
             av_packet_unref(pkt);
             return ret;
@@ -822,50 +699,6 @@  static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame,
     av_assert0(0);
 }
 
-static int submit_encode_frame(OutputFile *of, OutputStream *ost,
-                               AVFrame *frame, AVPacket *pkt)
-{
-    Encoder *e = ost->enc;
-    int ret;
-
-    if (ost->sq_idx_encode < 0)
-        return encode_frame(of, ost, frame, pkt);
-
-    if (frame) {
-        ret = av_frame_ref(e->sq_frame, frame);
-        if (ret < 0)
-            return ret;
-        frame = e->sq_frame;
-    }
-
-    ret = sq_send(of->sq_encode, ost->sq_idx_encode,
-                  SQFRAME(frame));
-    if (ret < 0) {
-        if (frame)
-            av_frame_unref(frame);
-        if (ret != AVERROR_EOF)
-            return ret;
-    }
-
-    while (1) {
-        AVFrame *enc_frame = e->sq_frame;
-
-        ret = sq_receive(of->sq_encode, ost->sq_idx_encode,
-                               SQFRAME(enc_frame));
-        if (ret == AVERROR_EOF) {
-            enc_frame = NULL;
-        } else if (ret < 0) {
-            return (ret == AVERROR(EAGAIN)) ? 0 : ret;
-        }
-
-        ret = encode_frame(of, ost, enc_frame, pkt);
-        if (enc_frame)
-            av_frame_unref(enc_frame);
-        if (ret < 0)
-            return ret;
-    }
-}
-
 static int do_audio_out(OutputFile *of, OutputStream *ost,
                         AVFrame *frame, AVPacket *pkt)
 {
@@ -881,7 +714,7 @@  static int do_audio_out(OutputFile *of, OutputStream *ost,
     if (!check_recording_time(ost, frame->pts, frame->time_base))
         return AVERROR_EOF;
 
-    return submit_encode_frame(of, ost, frame, pkt);
+    return encode_frame(of, ost, frame, pkt);
 }
 
 static enum AVPictureType forced_kf_apply(void *logctx, KeyframeForceCtx *kf,
@@ -949,7 +782,7 @@  static int do_video_out(OutputFile *of, OutputStream *ost,
     }
 #endif
 
-    return submit_encode_frame(of, ost, in_picture, pkt);
+    return encode_frame(of, ost, in_picture, pkt);
 }
 
 static int frame_encode(OutputStream *ost, AVFrame *frame, AVPacket *pkt)
@@ -958,9 +791,12 @@  static int frame_encode(OutputStream *ost, AVFrame *frame, AVPacket *pkt)
     enum AVMediaType type = ost->type;
 
     if (type == AVMEDIA_TYPE_SUBTITLE) {
+        const AVSubtitle *subtitle = frame && frame->buf[0] ?
+                                     (AVSubtitle*)frame->buf[0]->data : NULL;
+
         // no flushing for subtitles
-        return frame ?
-               do_subtitle_out(of, ost, (AVSubtitle*)frame->buf[0]->data, pkt) : 0;
+        return subtitle && subtitle->num_rects ?
+               do_subtitle_out(of, ost, subtitle, pkt) : 0;
     }
 
     if (frame) {
@@ -968,7 +804,7 @@  static int frame_encode(OutputStream *ost, AVFrame *frame, AVPacket *pkt)
                                               do_audio_out(of, ost, frame, pkt);
     }
 
-    return submit_encode_frame(of, ost, NULL, pkt);
+    return  encode_frame(of, ost, NULL, pkt);
 }
 
 static void enc_thread_set_name(const OutputStream *ost)
@@ -1009,24 +845,50 @@  fail:
 void *encoder_thread(void *arg)
 {
     OutputStream *ost = arg;
-    OutputFile    *of = output_files[ost->file_index];
     Encoder        *e = ost->enc;
     EncoderThread et;
     int ret = 0, input_status = 0;
+    int name_set = 0;
 
     ret = enc_thread_init(&et);
     if (ret < 0)
         goto finish;
 
-    enc_thread_set_name(ost);
+    /* Open the subtitle encoders immediately. AVFrame-based encoders
+     * are opened through a callback from the scheduler once they get
+     * their first frame
+     *
+     * N.B.: because the callback is called from a different thread,
+     * enc_ctx MUST NOT be accessed before sch_enc_receive() returns
+     * for the first time for audio/video. */
+    if (ost->type != AVMEDIA_TYPE_VIDEO && ost->type != AVMEDIA_TYPE_AUDIO) {
+        ret = enc_open(ost, NULL);
+        if (ret < 0)
+            goto finish;
+    }
 
     while (!input_status) {
-        int dummy;
-
-        input_status = tq_receive(e->queue_in, &dummy, et.frame);
-        if (input_status < 0)
+        input_status = sch_enc_receive(e->sch, e->sch_idx, et.frame);
+        if (input_status == AVERROR_EOF) {
             av_log(ost, AV_LOG_VERBOSE, "Encoder thread received EOF\n");
 
+            if (!e->opened) {
+                av_log(ost, AV_LOG_ERROR, "Could not open encoder before EOF\n");
+                ret = AVERROR(EINVAL);
+                goto finish;
+            }
+        } else if (input_status < 0) {
+            ret = input_status;
+            av_log(ost, AV_LOG_ERROR, "Error receiving a frame for encoding: %s\n",
+                   av_err2str(ret));
+            goto finish;
+        }
+
+        if (!name_set) {
+            enc_thread_set_name(ost);
+            name_set = 1;
+        }
+
         ret = frame_encode(ost, input_status >= 0 ? et.frame : NULL, et.pkt);
 
         av_packet_unref(et.pkt);
@@ -1040,15 +902,6 @@  void *encoder_thread(void *arg)
                        av_err2str(ret));
             break;
         }
-
-        // signal to the consumer thread that the frame was encoded
-        ret = tq_send(e->queue_out, 0, et.pkt);
-        if (ret < 0) {
-            if (ret != AVERROR_EOF)
-                av_log(ost, AV_LOG_ERROR,
-                       "Error communicating with the main thread\n");
-            break;
-        }
     }
 
     // EOF is normal thread termination
@@ -1056,118 +909,7 @@  void *encoder_thread(void *arg)
         ret = 0;
 
 finish:
-    if (ost->sq_idx_encode >= 0)
-        sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL));
-
-    tq_receive_finish(e->queue_in,  0);
-    tq_send_finish   (e->queue_out, 0);
-
     enc_thread_uninit(&et);
 
-    av_log(ost, AV_LOG_VERBOSE, "Terminating encoder thread\n");
-
     return (void*)(intptr_t)ret;
 }
-
-int enc_frame(OutputStream *ost, AVFrame *frame)
-{
-    OutputFile *of = output_files[ost->file_index];
-    Encoder     *e = ost->enc;
-    int ret, thread_ret;
-
-    ret = enc_open(ost, frame);
-    if (ret < 0)
-        return ret;
-
-    if (!e->queue_in)
-        return AVERROR_EOF;
-
-    // send the frame/EOF to the encoder thread
-    if (frame) {
-        ret = tq_send(e->queue_in, 0, frame);
-        if (ret < 0)
-            goto finish;
-    } else
-        tq_send_finish(e->queue_in, 0);
-
-    // retrieve all encoded data for the frame
-    while (1) {
-        int dummy;
-
-        ret = tq_receive(e->queue_out, &dummy, e->pkt);
-        if (ret < 0)
-            break;
-
-        // frame fully encoded
-        if (!e->pkt->data && !e->pkt->side_data_elems)
-            return 0;
-
-        // process the encoded packet
-        ret = of_output_packet(of, ost, e->pkt);
-        if (ret < 0)
-            goto finish;
-    }
-
-finish:
-    thread_ret = enc_thread_stop(e);
-    if (thread_ret < 0) {
-        av_log(ost, AV_LOG_ERROR, "Encoder thread returned error: %s\n",
-               av_err2str(thread_ret));
-        ret = err_merge(ret, thread_ret);
-    }
-
-    if (ret < 0 && ret != AVERROR_EOF)
-        return ret;
-
-    // signal EOF to the muxer
-    return of_output_packet(of, ost, NULL);
-}
-
-int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub)
-{
-    Encoder *e = ost->enc;
-    AVFrame *f = e->sub_frame;
-    int ret;
-
-    // XXX the queue for transferring data to the encoder thread runs
-    // on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put
-    // that inside the frame
-    // eventually, subtitles should be switched to use AVFrames natively
-    ret = subtitle_wrap_frame(f, sub, 1);
-    if (ret < 0)
-        return ret;
-
-    ret = enc_frame(ost, f);
-    av_frame_unref(f);
-
-    return ret;
-}
-
-int enc_flush(void)
-{
-    int ret = 0;
-
-    for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
-        OutputFile      *of = output_files[ost->file_index];
-        if (ost->sq_idx_encode >= 0)
-            sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL));
-    }
-
-    for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
-        Encoder          *e = ost->enc;
-        AVCodecContext *enc = ost->enc_ctx;
-        int err;
-
-        if (!enc || !e->opened ||
-            (enc->codec_type != AVMEDIA_TYPE_VIDEO && enc->codec_type != AVMEDIA_TYPE_AUDIO))
-            continue;
-
-        err = enc_frame(ost, NULL);
-        if (err != AVERROR_EOF && ret < 0)
-            ret = err_merge(ret, err);
-
-        av_assert0(!e->queue_in);
-    }
-
-    return ret;
-}
diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c
index 1b41d32540..fe8787cacb 100644
--- a/fftools/ffmpeg_filter.c
+++ b/fftools/ffmpeg_filter.c
@@ -21,8 +21,6 @@ 
 #include <stdint.h>
 
 #include "ffmpeg.h"
-#include "ffmpeg_utils.h"
-#include "thread_queue.h"
 
 #include "libavfilter/avfilter.h"
 #include "libavfilter/buffersink.h"
@@ -53,10 +51,11 @@  typedef struct FilterGraphPriv {
     // true when the filtergraph contains only meta filters
     // that do not modify the frame data
     int is_meta;
+    // source filters are present in the graph
+    int have_sources;
     int disable_conversions;
 
-    int nb_inputs_bound;
-    int nb_outputs_bound;
+    unsigned nb_outputs_done;
 
     const char *graph_desc;
 
@@ -67,41 +66,6 @@  typedef struct FilterGraphPriv {
 
     Scheduler       *sch;
     unsigned         sch_idx;
-
-    pthread_t        thread;
-    /**
-     * Queue for sending frames from the main thread to the filtergraph. Has
-     * nb_inputs+1 streams - the first nb_inputs stream correspond to
-     * filtergraph inputs. Frames on those streams may have their opaque set to
-     * - FRAME_OPAQUE_EOF: frame contains no data, but pts+timebase of the
-     *   EOF event for the correspondint stream. Will be immediately followed by
-     *   this stream being send-closed.
-     * - FRAME_OPAQUE_SUB_HEARTBEAT: frame contains no data, but pts+timebase of
-     *   a subtitle heartbeat event. Will only be sent for sub2video streams.
-     *
-     * The last stream is "control" - the main thread sends empty AVFrames with
-     * opaque set to
-     * - FRAME_OPAQUE_REAP_FILTERS: a request to retrieve all frame available
-     *   from filtergraph outputs. These frames are sent to corresponding
-     *   streams in queue_out. Finally an empty frame is sent to the control
-     *   stream in queue_out.
-     * - FRAME_OPAQUE_CHOOSE_INPUT: same as above, but in case no frames are
-     *   available the terminating empty frame's opaque will contain the index+1
-     *   of the filtergraph input to which more input frames should be supplied.
-     */
-    ThreadQueue     *queue_in;
-    /**
-     * Queue for sending frames from the filtergraph back to the main thread.
-     * Has nb_outputs+1 streams - the first nb_outputs stream correspond to
-     * filtergraph outputs.
-     *
-     * The last stream is "control" - see documentation for queue_in for more
-     * details.
-     */
-    ThreadQueue     *queue_out;
-    // submitting frames to filter thread returned EOF
-    // this only happens on thread exit, so is not per-input
-    int              eof_in;
 } FilterGraphPriv;
 
 static FilterGraphPriv *fgp_from_fg(FilterGraph *fg)
@@ -123,6 +87,9 @@  typedef struct FilterGraphThread {
     // The output index is stored in frame opaque.
     AVFifo  *frame_queue_out;
 
+    // index of the next input to request from the scheduler
+    unsigned next_in;
+    // set to 1 after at least one frame passed through this output
     int      got_frame;
 
     // EOF status of each input/output, as received by the thread
@@ -253,9 +220,6 @@  typedef struct OutputFilterPriv {
     int64_t ts_offset;
     int64_t next_pts;
     FPSConvContext fps;
-
-    // set to 1 after at least one frame passed through this output
-    int got_frame;
 } OutputFilterPriv;
 
 static OutputFilterPriv *ofp_from_ofilter(OutputFilter *ofilter)
@@ -653,57 +617,6 @@  static int ifilter_has_all_input_formats(FilterGraph *fg)
 
 static void *filter_thread(void *arg);
 
-// start the filtering thread once all inputs and outputs are bound
-static int fg_thread_try_start(FilterGraphPriv *fgp)
-{
-    FilterGraph *fg = &fgp->fg;
-    ObjPool *op;
-    int ret = 0;
-
-    if (fgp->nb_inputs_bound  < fg->nb_inputs ||
-        fgp->nb_outputs_bound < fg->nb_outputs)
-        return 0;
-
-    op = objpool_alloc_frames();
-    if (!op)
-        return AVERROR(ENOMEM);
-
-    fgp->queue_in = tq_alloc(fg->nb_inputs + 1, 1, op, frame_move);
-    if (!fgp->queue_in) {
-        objpool_free(&op);
-        return AVERROR(ENOMEM);
-    }
-
-    // at least one output is mandatory
-    op = objpool_alloc_frames();
-    if (!op)
-        goto fail;
-
-    fgp->queue_out = tq_alloc(fg->nb_outputs + 1, 1, op, frame_move);
-    if (!fgp->queue_out) {
-        objpool_free(&op);
-        goto fail;
-    }
-
-    ret = pthread_create(&fgp->thread, NULL, filter_thread, fgp);
-    if (ret) {
-        ret = AVERROR(ret);
-        av_log(NULL, AV_LOG_ERROR, "pthread_create() for filtergraph %d failed: %s\n",
-               fg->index, av_err2str(ret));
-        goto fail;
-    }
-
-    return 0;
-fail:
-    if (ret >= 0)
-        ret = AVERROR(ENOMEM);
-
-    tq_free(&fgp->queue_in);
-    tq_free(&fgp->queue_out);
-
-    return ret;
-}
-
 static char *describe_filter_link(FilterGraph *fg, AVFilterInOut *inout, int in)
 {
     AVFilterContext *ctx = inout->filter_ctx;
@@ -729,7 +642,6 @@  static OutputFilter *ofilter_alloc(FilterGraph *fg)
     ofilter->graph    = fg;
     ofp->format       = -1;
     ofp->index        = fg->nb_outputs - 1;
-    ofilter->last_pts = AV_NOPTS_VALUE;
 
     return ofilter;
 }
@@ -760,10 +672,7 @@  static int ifilter_bind_ist(InputFilter *ifilter, InputStream *ist)
             return AVERROR(ENOMEM);
     }
 
-    fgp->nb_inputs_bound++;
-    av_assert0(fgp->nb_inputs_bound <= ifilter->graph->nb_inputs);
-
-    return fg_thread_try_start(fgp);
+    return 0;
 }
 
 static int set_channel_layout(OutputFilterPriv *f, OutputStream *ost)
@@ -902,10 +811,7 @@  int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost,
     if (ret < 0)
         return ret;
 
-    fgp->nb_outputs_bound++;
-    av_assert0(fgp->nb_outputs_bound <= fg->nb_outputs);
-
-    return fg_thread_try_start(fgp);
+    return 0;
 }
 
 static InputFilter *ifilter_alloc(FilterGraph *fg)
@@ -935,34 +841,6 @@  static InputFilter *ifilter_alloc(FilterGraph *fg)
     return ifilter;
 }
 
-static int fg_thread_stop(FilterGraphPriv *fgp)
-{
-    void *ret;
-
-    if (!fgp->queue_in)
-        return 0;
-
-    for (int i = 0; i <= fgp->fg.nb_inputs; i++) {
-        InputFilterPriv *ifp = i < fgp->fg.nb_inputs ?
-                               ifp_from_ifilter(fgp->fg.inputs[i]) : NULL;
-
-        if (ifp)
-            ifp->eof = 1;
-
-        tq_send_finish(fgp->queue_in, i);
-    }
-
-    for (int i = 0; i <= fgp->fg.nb_outputs; i++)
-        tq_receive_finish(fgp->queue_out, i);
-
-    pthread_join(fgp->thread, &ret);
-
-    tq_free(&fgp->queue_in);
-    tq_free(&fgp->queue_out);
-
-    return (int)(intptr_t)ret;
-}
-
 void fg_free(FilterGraph **pfg)
 {
     FilterGraph *fg = *pfg;
@@ -972,8 +850,6 @@  void fg_free(FilterGraph **pfg)
         return;
     fgp = fgp_from_fg(fg);
 
-    fg_thread_stop(fgp);
-
     avfilter_graph_free(&fg->graph);
     for (int j = 0; j < fg->nb_inputs; j++) {
         InputFilter *ifilter = fg->inputs[j];
@@ -1072,6 +948,15 @@  int fg_create(FilterGraph **pfg, char *graph_desc, Scheduler *sch)
     if (ret < 0)
         goto fail;
 
+    for (unsigned i = 0; i < graph->nb_filters; i++) {
+        const AVFilter *f = graph->filters[i]->filter;
+        if (!avfilter_filter_pad_count(f, 0) &&
+            !(f->flags & AVFILTER_FLAG_DYNAMIC_INPUTS)) {
+            fgp->have_sources = 1;
+            break;
+        }
+    }
+
     for (AVFilterInOut *cur = inputs; cur; cur = cur->next) {
         InputFilter *const ifilter = ifilter_alloc(fg);
         InputFilterPriv       *ifp;
@@ -1800,6 +1685,7 @@  static int configure_filtergraph(FilterGraph *fg, const FilterGraphThread *fgt)
     AVBufferRef *hw_device;
     AVFilterInOut *inputs, *outputs, *cur;
     int ret, i, simple = filtergraph_is_simple(fg);
+    int have_input_eof = 0;
     const char *graph_desc = fgp->graph_desc;
 
     cleanup_filtergraph(fg);
@@ -1922,11 +1808,18 @@  static int configure_filtergraph(FilterGraph *fg, const FilterGraphThread *fgt)
             ret = av_buffersrc_add_frame(ifp->filter, NULL);
             if (ret < 0)
                 goto fail;
+            have_input_eof = 1;
         }
     }
 
-    return 0;
+    if (have_input_eof) {
+        // make sure the EOF propagates to the end of the graph
+        ret = avfilter_graph_request_oldest(fg->graph);
+        if (ret < 0 && ret != AVERROR(EAGAIN) && ret != AVERROR_EOF)
+            goto fail;
+    }
 
+    return 0;
 fail:
     cleanup_filtergraph(fg);
     return ret;
@@ -2182,7 +2075,7 @@  static void video_sync_process(OutputFilterPriv *ofp, AVFrame *frame,
                                                 fps->frames_prev_hist[2]);
 
         if (!*nb_frames && fps->last_dropped) {
-            ofilter->nb_frames_drop++;
+            atomic_fetch_add(&ofilter->nb_frames_drop, 1);
             fps->last_dropped++;
         }
 
@@ -2260,21 +2153,23 @@  finish:
     fps->frames_prev_hist[0] = *nb_frames_prev;
 
     if (*nb_frames_prev == 0 && fps->last_dropped) {
-        ofilter->nb_frames_drop++;
+        atomic_fetch_add(&ofilter->nb_frames_drop, 1);
         av_log(ost, AV_LOG_VERBOSE,
                "*** dropping frame %"PRId64" at ts %"PRId64"\n",
                fps->frame_number, fps->last_frame->pts);
     }
     if (*nb_frames > (*nb_frames_prev && fps->last_dropped) + (*nb_frames > *nb_frames_prev)) {
+        uint64_t nb_frames_dup;
         if (*nb_frames > dts_error_threshold * 30) {
             av_log(ost, AV_LOG_ERROR, "%"PRId64" frame duplication too large, skipping\n", *nb_frames - 1);
-            ofilter->nb_frames_drop++;
+            atomic_fetch_add(&ofilter->nb_frames_drop, 1);
             *nb_frames = 0;
             return;
         }
-        ofilter->nb_frames_dup += *nb_frames - (*nb_frames_prev && fps->last_dropped) - (*nb_frames > *nb_frames_prev);
+        nb_frames_dup = atomic_fetch_add(&ofilter->nb_frames_dup,
+                                         *nb_frames - (*nb_frames_prev && fps->last_dropped) - (*nb_frames > *nb_frames_prev));
         av_log(ost, AV_LOG_VERBOSE, "*** %"PRId64" dup!\n", *nb_frames - 1);
-        if (ofilter->nb_frames_dup > fps->dup_warning) {
+        if (nb_frames_dup > fps->dup_warning) {
             av_log(ost, AV_LOG_WARNING, "More than %"PRIu64" frames duplicated\n", fps->dup_warning);
             fps->dup_warning *= 10;
         }
@@ -2284,8 +2179,57 @@  finish:
     fps->dropped_keyframe |= fps->last_dropped && (frame->flags & AV_FRAME_FLAG_KEY);
 }
 
+static int close_output(OutputFilterPriv *ofp, FilterGraphThread *fgt)
+{
+    FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph);
+    int ret;
+
+    // we are finished and no frames were ever seen at this output,
+    // at least initialize the encoder with a dummy frame
+    if (!fgt->got_frame) {
+        AVFrame *frame = fgt->frame;
+        FrameData *fd;
+
+        frame->time_base   = ofp->tb_out;
+        frame->format      = ofp->format;
+
+        frame->width               = ofp->width;
+        frame->height              = ofp->height;
+        frame->sample_aspect_ratio = ofp->sample_aspect_ratio;
+
+        frame->sample_rate = ofp->sample_rate;
+        if (ofp->ch_layout.nb_channels) {
+            ret = av_channel_layout_copy(&frame->ch_layout, &ofp->ch_layout);
+            if (ret < 0)
+                return ret;
+        }
+
+        fd = frame_data(frame);
+        if (!fd)
+            return AVERROR(ENOMEM);
+
+        fd->frame_rate_filter = ofp->fps.framerate;
+
+        av_assert0(!frame->buf[0]);
+
+        av_log(ofp->ofilter.ost, AV_LOG_WARNING,
+               "No filtered frames for output stream, trying to "
+               "initialize anyway.\n");
+
+        ret = sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, frame);
+        if (ret < 0) {
+            av_frame_unref(frame);
+            return ret;
+        }
+    }
+
+    fgt->eof_out[ofp->index] = 1;
+
+    return sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, NULL);
+}
+
 static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt,
-                           AVFrame *frame, int buffer)
+                           AVFrame *frame)
 {
     FilterGraphPriv  *fgp = fgp_from_fg(ofp->ofilter.graph);
     AVFrame   *frame_prev = ofp->fps.last_frame;
@@ -2332,28 +2276,17 @@  static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt,
             frame_out = frame;
         }
 
-        if (buffer) {
-            AVFrame *f = av_frame_alloc();
-
-            if (!f) {
-                av_frame_unref(frame_out);
-                return AVERROR(ENOMEM);
-            }
-
-            av_frame_move_ref(f, frame_out);
-            f->opaque = (void*)(intptr_t)ofp->index;
-
-            ret = av_fifo_write(fgt->frame_queue_out, &f, 1);
-            if (ret < 0) {
-                av_frame_free(&f);
-                return AVERROR(ENOMEM);
-            }
-        } else {
-            // return the frame to the main thread
-            ret = tq_send(fgp->queue_out, ofp->index, frame_out);
+        {
+            // send the frame to consumers
+            ret = sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, frame_out);
             if (ret < 0) {
                 av_frame_unref(frame_out);
-                fgt->eof_out[ofp->index] = 1;
+
+                if (!fgt->eof_out[ofp->index]) {
+                    fgt->eof_out[ofp->index] = 1;
+                    fgp->nb_outputs_done++;
+                }
+
                 return ret == AVERROR_EOF ? 0 : ret;
             }
         }
@@ -2374,16 +2307,14 @@  static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt,
         av_frame_move_ref(frame_prev, frame);
     }
 
-    if (!frame) {
-        tq_send_finish(fgp->queue_out, ofp->index);
-        fgt->eof_out[ofp->index] = 1;
-    }
+    if (!frame)
+        return close_output(ofp, fgt);
 
     return 0;
 }
 
 static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
-                          AVFrame *frame,  int buffer)
+                          AVFrame *frame)
 {
     FilterGraphPriv    *fgp = fgp_from_fg(ofp->ofilter.graph);
     OutputStream       *ost = ofp->ofilter.ost;
@@ -2393,8 +2324,8 @@  static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
 
     ret = av_buffersink_get_frame_flags(filter, frame,
                                         AV_BUFFERSINK_FLAG_NO_REQUEST);
-    if (ret == AVERROR_EOF && !buffer && !fgt->eof_out[ofp->index]) {
-        ret = fg_output_frame(ofp, fgt, NULL, buffer);
+    if (ret == AVERROR_EOF && !fgt->eof_out[ofp->index]) {
+        ret = fg_output_frame(ofp, fgt, NULL);
         return (ret < 0) ? ret : 1;
     } else if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) {
         return 1;
@@ -2448,7 +2379,7 @@  static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
         fd->frame_rate_filter = ofp->fps.framerate;
     }
 
-    ret = fg_output_frame(ofp, fgt, frame, buffer);
+    ret = fg_output_frame(ofp, fgt, frame);
     av_frame_unref(frame);
     if (ret < 0)
         return ret;
@@ -2456,44 +2387,68 @@  static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
     return 0;
 }
 
-/* retrieve all frames available at filtergraph outputs and either send them to
- * the main thread (buffer=0) or buffer them for later (buffer=1) */
+/* retrieve all frames available at filtergraph outputs
+ * and send them to consumers */
 static int read_frames(FilterGraph *fg, FilterGraphThread *fgt,
-                       AVFrame *frame, int buffer)
+                       AVFrame *frame)
 {
     FilterGraphPriv *fgp = fgp_from_fg(fg);
-    int ret = 0;
+    int did_step = 0;
 
-    if (!fg->graph)
-        return 0;
-
-    // process buffered frames
-    if (!buffer) {
-        AVFrame *f;
-
-        while (av_fifo_read(fgt->frame_queue_out, &f, 1) >= 0) {
-            int out_idx = (intptr_t)f->opaque;
-            f->opaque = NULL;
-            ret = tq_send(fgp->queue_out, out_idx, f);
-            av_frame_free(&f);
-            if (ret < 0 && ret != AVERROR_EOF)
-                return ret;
+    // graph not configured, just select the input to request
+    if (!fg->graph) {
+        for (int i = 0; i < fg->nb_inputs; i++) {
+            InputFilterPriv *ifp = ifp_from_ifilter(fg->inputs[i]);
+            if (ifp->format < 0 && !fgt->eof_in[i]) {
+                fgt->next_in = i;
+                return 0;
+            }
         }
+
+        // This state - graph is not configured, but all inputs are either
+        // initialized or EOF - should be unreachable because sending EOF to a
+        // filter without even a fallback format should fail
+        av_assert0(0);
+        return AVERROR_BUG;
     }
 
-    /* Reap all buffers present in the buffer sinks */
-    for (int i = 0; i < fg->nb_outputs; i++) {
-        OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]);
-        int ret = 0;
+    while (1) {
+        int ret;
 
-        while (!ret) {
-            ret = fg_output_step(ofp, fgt, frame, buffer);
-            if (ret < 0)
-                return ret;
+        ret = avfilter_graph_request_oldest(fg->graph);
+        if (ret == AVERROR(EAGAIN)) {
+            fgt->next_in = choose_input(fg, fgt);
+            break;
+        } else if (ret < 0) {
+            if (ret == AVERROR_EOF)
+                av_log(fg, AV_LOG_VERBOSE, "Filtergraph returned EOF, finishing\n");
+            else
+                av_log(fg, AV_LOG_ERROR,
+                       "Error requesting a frame from the filtergraph: %s\n",
+                       av_err2str(ret));
+            return ret;
         }
-    }
+        fgt->next_in = fg->nb_inputs;
 
-    return 0;
+        // return after one iteration, so that scheduler can rate-control us
+        if (did_step && fgp->have_sources)
+            return 0;
+
+        /* Reap all buffers present in the buffer sinks */
+        for (int i = 0; i < fg->nb_outputs; i++) {
+            OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]);
+
+            ret = 0;
+            while (!ret) {
+                ret = fg_output_step(ofp, fgt, frame);
+                if (ret < 0)
+                    return ret;
+            }
+        }
+        did_step = 1;
+    };
+
+    return (fgp->nb_outputs_done == fg->nb_outputs) ? AVERROR_EOF : 0;
 }
 
 static void sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb)
@@ -2571,6 +2526,9 @@  static int send_eof(FilterGraphThread *fgt, InputFilter *ifilter,
     InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
     int ret;
 
+    if (fgt->eof_in[ifp->index])
+       return 0;
+
     fgt->eof_in[ifp->index] = 1;
 
     if (ifp->filter) {
@@ -2672,7 +2630,7 @@  static int send_frame(FilterGraph *fg, FilterGraphThread *fgt,
             return ret;
         }
 
-        ret = fg->graph ? read_frames(fg, fgt, tmp, 1) : 0;
+        ret = fg->graph ? read_frames(fg, fgt, tmp) : 0;
         av_frame_free(&tmp);
         if (ret < 0)
             return ret;
@@ -2705,80 +2663,6 @@  static int send_frame(FilterGraph *fg, FilterGraphThread *fgt,
     return 0;
 }
 
-static int msg_process(FilterGraphPriv *fgp, FilterGraphThread *fgt,
-                       AVFrame *frame)
-{
-    const enum FrameOpaque msg = (intptr_t)frame->opaque;
-    FilterGraph            *fg = &fgp->fg;
-    int              graph_eof = 0;
-    int ret;
-
-    frame->opaque = NULL;
-    av_assert0(msg > 0);
-    av_assert0(msg == FRAME_OPAQUE_SEND_COMMAND || !frame->buf[0]);
-
-    if (!fg->graph) {
-        // graph not configured yet, ignore all messages other than choosing
-        // the input to read from
-        if (msg != FRAME_OPAQUE_CHOOSE_INPUT)
-            goto done;
-
-        for (int i = 0; i < fg->nb_inputs; i++) {
-            InputFilter *ifilter = fg->inputs[i];
-            InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
-            if (ifp->format < 0 && !fgt->eof_in[i]) {
-                frame->opaque = (void*)(intptr_t)(i + 1);
-                goto done;
-            }
-        }
-
-        // This state - graph is not configured, but all inputs are either
-        // initialized or EOF - should be unreachable because sending EOF to a
-        // filter without even a fallback format should fail
-        av_assert0(0);
-        return AVERROR_BUG;
-    }
-
-    if (msg == FRAME_OPAQUE_SEND_COMMAND) {
-        FilterCommand *fc = (FilterCommand*)frame->buf[0]->data;
-        send_command(fg, fc->time, fc->target, fc->command, fc->arg, fc->all_filters);
-        av_frame_unref(frame);
-        goto done;
-    }
-
-    if (msg == FRAME_OPAQUE_CHOOSE_INPUT) {
-        ret = avfilter_graph_request_oldest(fg->graph);
-
-        graph_eof = ret == AVERROR_EOF;
-
-        if (ret == AVERROR(EAGAIN)) {
-            frame->opaque = (void*)(intptr_t)(choose_input(fg, fgt) + 1);
-            goto done;
-        } else if (ret < 0 && !graph_eof)
-            return ret;
-    }
-
-    ret = read_frames(fg, fgt, frame, 0);
-    if (ret < 0) {
-        av_log(fg, AV_LOG_ERROR, "Error sending filtered frames for encoding\n");
-        return ret;
-    }
-
-    if (graph_eof)
-        return AVERROR_EOF;
-
-    // signal to the main thread that we are done processing the message
-done:
-    ret = tq_send(fgp->queue_out, fg->nb_outputs, frame);
-    if (ret < 0) {
-        if (ret != AVERROR_EOF)
-            av_log(fg, AV_LOG_ERROR, "Error communicating with the main thread\n");
-        return ret;
-    }
-
-    return 0;
-}
-
 static void fg_thread_set_name(const FilterGraph *fg)
 {
     char name[16];
@@ -2865,294 +2749,94 @@  static void *filter_thread(void *arg)
         InputFilter *ifilter;
         InputFilterPriv *ifp;
         enum FrameOpaque o;
-        int input_idx, eof_frame;
+        unsigned input_idx = fgt.next_in;
 
-        input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame);
-        if (input_idx < 0 ||
-            (input_idx == fg->nb_inputs && input_status < 0)) {
+        input_status = sch_filter_receive(fgp->sch, fgp->sch_idx,
+                                          &input_idx, fgt.frame);
+        if (input_status == AVERROR_EOF) {
             av_log(fg, AV_LOG_VERBOSE, "Filtering thread received EOF\n");
             break;
+        } else if (input_status == AVERROR(EAGAIN)) {
+            // should only happen when we didn't request any input
+            av_assert0(input_idx == fg->nb_inputs);
+            goto read_frames;
         }
+        av_assert0(input_status >= 0);
+
+        o = (intptr_t)fgt.frame->opaque;
 
         o = (intptr_t)fgt.frame->opaque;
 
         // message on the control stream
         if (input_idx == fg->nb_inputs) {
-            ret = msg_process(fgp, &fgt, fgt.frame);
-            if (ret < 0)
-                goto finish;
+            FilterCommand *fc;
 
+            av_assert0(o == FRAME_OPAQUE_SEND_COMMAND && fgt.frame->buf[0]);
+
+            fc = (FilterCommand*)fgt.frame->buf[0]->data;
+            send_command(fg, fc->time, fc->target, fc->command, fc->arg,
+                         fc->all_filters);
+            av_frame_unref(fgt.frame);
             continue;
         }
 
         // we received an input frame or EOF
         ifilter   = fg->inputs[input_idx];
         ifp       = ifp_from_ifilter(ifilter);
-        eof_frame = input_status >= 0 && o == FRAME_OPAQUE_EOF;
+
         if (ifp->type_src == AVMEDIA_TYPE_SUBTITLE) {
             int hb_frame = input_status >= 0 && o == FRAME_OPAQUE_SUB_HEARTBEAT;
             ret = sub2video_frame(ifilter, (fgt.frame->buf[0] || hb_frame) ? fgt.frame : NULL);
-        } else if (input_status >= 0 && fgt.frame->buf[0]) {
+        } else if (fgt.frame->buf[0]) {
             ret = send_frame(fg, &fgt, ifilter, fgt.frame);
         } else {
-            int64_t   pts = input_status >= 0 ? fgt.frame->pts : AV_NOPTS_VALUE;
-            AVRational tb = input_status >= 0 ? fgt.frame->time_base : (AVRational){ 1, 1 };
-            ret = send_eof(&fgt, ifilter, pts, tb);
+            av_assert1(o == FRAME_OPAQUE_EOF);
+            ret = send_eof(&fgt, ifilter, fgt.frame->pts, fgt.frame->time_base);
         }
         av_frame_unref(fgt.frame);
         if (ret < 0)
+            goto finish;
+
+read_frames:
+        // retrieve all newly avalable frames
+        ret = read_frames(fg, &fgt, fgt.frame);
+        if (ret == AVERROR_EOF) {
+            av_log(fg, AV_LOG_VERBOSE, "All consumers returned EOF\n");
             break;
-
-        if (eof_frame) {
-            // an EOF frame is immediately followed by sender closing
-            // the corresponding stream, so retrieve that event
-            input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame);
-            av_assert0(input_status == AVERROR_EOF && input_idx == ifp->index);
-        }
-
-        // signal to the main thread that we are done
-        ret = tq_send(fgp->queue_out, fg->nb_outputs, fgt.frame);
-        if (ret < 0) {
-            if (ret == AVERROR_EOF)
-                break;
-
-            av_log(fg, AV_LOG_ERROR, "Error communicating with the main thread\n");
+        } else if (ret < 0) {
+            av_log(fg, AV_LOG_ERROR, "Error sending frames to consumers: %s\n",
+                   av_err2str(ret));
             goto finish;
         }
     }
 
+    for (unsigned i = 0; i < fg->nb_outputs; i++) {
+        OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]);
+
+        if (fgt.eof_out[i])
+            continue;
+
+        ret = fg_output_frame(ofp, &fgt, NULL);
+        if (ret < 0)
+            goto finish;
+    }
+
 finish:
     // EOF is normal termination
     if (ret == AVERROR_EOF)
         ret = 0;
 
-    for (int i = 0; i <= fg->nb_inputs; i++)
-        tq_receive_finish(fgp->queue_in, i);
-    for (int i = 0; i <= fg->nb_outputs; i++)
-        tq_send_finish(fgp->queue_out, i);
-
     fg_thread_uninit(&fgt);
 
-    av_log(fg, AV_LOG_VERBOSE, "Terminating filtering thread\n");
-
     return (void*)(intptr_t)ret;
 }
 
-static int thread_send_frame(FilterGraphPriv *fgp, InputFilter *ifilter,
-                             AVFrame *frame, enum FrameOpaque type)
-{
-    InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
-    int output_idx, ret;
-
-    if (ifp->eof) {
-        av_frame_unref(frame);
-        return AVERROR_EOF;
-    }
-
-    frame->opaque = (void*)(intptr_t)type;
-
-    ret = tq_send(fgp->queue_in, ifp->index, frame);
-    if (ret < 0) {
-        ifp->eof = 1;
-        av_frame_unref(frame);
-        return ret;
-    }
-
-    if (type == FRAME_OPAQUE_EOF)
-        tq_send_finish(fgp->queue_in, ifp->index);
-
-    // wait for the frame to be processed
-    ret = tq_receive(fgp->queue_out, &output_idx, frame);
-    av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF);
-
-    return ret;
-}
-
-int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference)
-{
-    FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
-    int ret;
-
-    if (keep_reference) {
-        ret = av_frame_ref(fgp->frame, frame);
-        if (ret < 0)
-            return ret;
-    } else
-        av_frame_move_ref(fgp->frame, frame);
-
-    return thread_send_frame(fgp, ifilter, fgp->frame, 0);
-}
-
-int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb)
-{
-    FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
-    int ret;
-
-    fgp->frame->pts       = pts;
-    fgp->frame->time_base = tb;
-
-    ret = thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_EOF);
-
-    return ret == AVERROR_EOF ? 0 : ret;
-}
-
-void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb)
-{
-    FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
-
-    fgp->frame->pts       = pts;
-    fgp->frame->time_base = tb;
-
-    thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_SUB_HEARTBEAT);
-}
-
-int fg_transcode_step(FilterGraph *graph, InputStream **best_ist)
-{
-    FilterGraphPriv *fgp = fgp_from_fg(graph);
-    int ret, got_frames = 0;
-
-    if (fgp->eof_in)
-        return AVERROR_EOF;
-
-    // signal to the filtering thread to return all frames it can
-    av_assert0(!fgp->frame->buf[0]);
-    fgp->frame->opaque = (void*)(intptr_t)(best_ist             ?
-                                           FRAME_OPAQUE_CHOOSE_INPUT :
-                                           FRAME_OPAQUE_REAP_FILTERS);
-
-    ret = tq_send(fgp->queue_in, graph->nb_inputs, fgp->frame);
-    if (ret < 0) {
-        fgp->eof_in = 1;
-        goto finish;
-    }
-
-    while (1) {
-        OutputFilter *ofilter;
-        OutputFilterPriv *ofp;
-        OutputStream *ost;
-        int output_idx;
-
-        ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame);
-
-        // EOF on the whole queue or the control stream
-        if (output_idx < 0 ||
-            (ret < 0 && output_idx == graph->nb_outputs))
-            goto finish;
-
-        // EOF for a specific stream
-        if (ret < 0) {
-            ofilter = graph->outputs[output_idx];
-            ofp     = ofp_from_ofilter(ofilter);
-
-            // we are finished and no frames were ever seen at this output,
-            // at least initialize the encoder with a dummy frame
-            if (!ofp->got_frame) {
-                AVFrame *frame = fgp->frame;
-                FrameData *fd;
-
-                frame->time_base   = ofp->tb_out;
-                frame->format      = ofp->format;
-
-                frame->width               = ofp->width;
-                frame->height              = ofp->height;
-                frame->sample_aspect_ratio = ofp->sample_aspect_ratio;
-
-                frame->sample_rate = ofp->sample_rate;
-                if (ofp->ch_layout.nb_channels) {
-                    ret = av_channel_layout_copy(&frame->ch_layout, &ofp->ch_layout);
-                    if (ret < 0)
-                        return ret;
-                }
-
-                fd = frame_data(frame);
-                if (!fd)
-                    return AVERROR(ENOMEM);
-
-                fd->frame_rate_filter = ofp->fps.framerate;
-
-                av_assert0(!frame->buf[0]);
-
-                av_log(ofilter->ost, AV_LOG_WARNING,
-                       "No filtered frames for output stream, trying to "
-                       "initialize anyway.\n");
-
-                enc_open(ofilter->ost, frame);
-                av_frame_unref(frame);
-            }
-
-            close_output_stream(graph->outputs[output_idx]->ost);
-            continue;
-        }
-
-        // request was fully processed by the filtering thread,
-        // return the input stream to read from, if needed
-        if (output_idx == graph->nb_outputs) {
-            int input_idx = (intptr_t)fgp->frame->opaque - 1;
-            av_assert0(input_idx <= graph->nb_inputs);
-
-            if (best_ist) {
-                *best_ist = (input_idx >= 0 && input_idx < graph->nb_inputs) ?
-                            ifp_from_ifilter(graph->inputs[input_idx])->ist : NULL;
-
-                if (input_idx < 0 && !got_frames) {
-                    for (int i = 0; i < graph->nb_outputs; i++)
-                        graph->outputs[i]->ost->unavailable = 1;
-                }
-            }
-            break;
-        }
-
-        // got a frame from the filtering thread, send it for encoding
-        ofilter = graph->outputs[output_idx];
-        ost     = ofilter->ost;
-        ofp     = ofp_from_ofilter(ofilter);
-
-        if (ost->finished) {
-            av_frame_unref(fgp->frame);
-            tq_receive_finish(fgp->queue_out, output_idx);
-            continue;
-        }
-
-        if (fgp->frame->pts != AV_NOPTS_VALUE) {
-            ofilter->last_pts = av_rescale_q(fgp->frame->pts,
-                                             fgp->frame->time_base,
-                                             AV_TIME_BASE_Q);
-        }
-
-        ret = enc_frame(ost, fgp->frame);
-        av_frame_unref(fgp->frame);
-        if (ret < 0)
-            goto finish;
-
-        ofp->got_frame = 1;
-        got_frames     = 1;
-    }
-
-finish:
-    if (ret < 0) {
-        fgp->eof_in = 1;
-        for (int i = 0; i < graph->nb_outputs; i++)
-            close_output_stream(graph->outputs[i]->ost);
-    }
-
-    return ret;
-}
-
-int reap_filters(FilterGraph *fg, int flush)
-{
-    return fg_transcode_step(fg, NULL);
-}
-
 void fg_send_command(FilterGraph *fg, double time, const char *target,
                      const char *command, const char *arg, int all_filters)
 {
     FilterGraphPriv *fgp = fgp_from_fg(fg);
     AVBufferRef *buf;
     FilterCommand *fc;
-    int output_idx, ret;
-
-    if (!fgp->queue_in)
-        return;
 
     fc = av_mallocz(sizeof(*fc));
     if (!fc)
@@ -3178,13 +2862,5 @@  void fg_send_command(FilterGraph *fg, double time, const char *target,
     fgp->frame->buf[0] = buf;
     fgp->frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_SEND_COMMAND;
 
-    ret = tq_send(fgp->queue_in, fg->nb_inputs, fgp->frame);
-    if (ret < 0) {
-        av_frame_unref(fgp->frame);
-        return;
-    }
-
-    // wait for the frame to be processed
-    ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame);
-    av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF);
+    sch_filter_command(fgp->sch, fgp->sch_idx, fgp->frame);
 }
diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c
index ef5c2f60e0..067dc65d4e 100644
--- a/fftools/ffmpeg_mux.c
+++ b/fftools/ffmpeg_mux.c
@@ -23,16 +23,13 @@ 
 #include "ffmpeg.h"
 #include "ffmpeg_mux.h"
 #include "ffmpeg_utils.h"
-#include "objpool.h"
 #include "sync_queue.h"
-#include "thread_queue.h"
 
 #include "libavutil/fifo.h"
 #include "libavutil/intreadwrite.h"
 #include "libavutil/log.h"
 #include "libavutil/mem.h"
 #include "libavutil/timestamp.h"
-#include "libavutil/thread.h"
 
 #include "libavcodec/packet.h"
 
@@ -41,10 +38,9 @@ 
 
 typedef struct MuxThreadContext {
     AVPacket *pkt;
+    AVPacket *fix_sub_duration_pkt;
 } MuxThreadContext;
 
-int want_sdp = 1;
-
 static Muxer *mux_from_of(OutputFile *of)
 {
     return (Muxer*)of;
@@ -207,14 +203,41 @@  static int sync_queue_process(Muxer *mux, OutputStream *ost, AVPacket *pkt, int
     return 0;
 }
 
+static int of_streamcopy(OutputStream *ost, AVPacket *pkt);
+
 /* apply the output bitstream filters */
-static int mux_packet_filter(Muxer *mux, OutputStream *ost,
-                             AVPacket *pkt, int *stream_eof)
+static int mux_packet_filter(Muxer *mux, MuxThreadContext *mt,
+                             OutputStream *ost, AVPacket *pkt, int *stream_eof)
 {
     MuxStream *ms = ms_from_ost(ost);
     const char *err_msg;
     int ret = 0;
 
+    if (pkt && !ost->enc) {
+        ret = of_streamcopy(ost, pkt);
+        if (ret == AVERROR(EAGAIN))
+            return 0;
+        else if (ret == AVERROR_EOF) {
+            av_packet_unref(pkt);
+            pkt = NULL;
+            ret = 0;
+        } else if (ret < 0)
+            goto fail;
+    }
+
+    // emit heartbeat for -fix_sub_duration;
+    // we are only interested in heartbeats on on random access points.
+    if (pkt && (pkt->flags & AV_PKT_FLAG_KEY)) {
+        mt->fix_sub_duration_pkt->opaque    = (void*)(intptr_t)PKT_OPAQUE_FIX_SUB_DURATION;
+        mt->fix_sub_duration_pkt->pts       = pkt->pts;
+        mt->fix_sub_duration_pkt->time_base = pkt->time_base;
+
+        ret = sch_mux_sub_heartbeat(mux->sch, mux->sch_idx, ms->sch_idx,
+                                    mt->fix_sub_duration_pkt);
+        if (ret < 0)
+            goto fail;
+    }
+
     if (ms->bsf_ctx) {
         int bsf_eof = 0;
 
@@ -278,6 +301,7 @@  static void thread_set_name(OutputFile *of)
 static void mux_thread_uninit(MuxThreadContext *mt)
 {
     av_packet_free(&mt->pkt);
+    av_packet_free(&mt->fix_sub_duration_pkt);
 
     memset(mt, 0, sizeof(*mt));
 }
@@ -290,6 +314,10 @@  static int mux_thread_init(MuxThreadContext *mt)
     if (!mt->pkt)
         goto fail;
 
+    mt->fix_sub_duration_pkt = av_packet_alloc();
+    if (!mt->fix_sub_duration_pkt)
+        goto fail;
+
     return 0;
 
 fail:
@@ -316,19 +344,22 @@  void *muxer_thread(void *arg)
         OutputStream *ost;
         int stream_idx, stream_eof = 0;
 
-        ret = tq_receive(mux->tq, &stream_idx, mt.pkt);
+        ret = sch_mux_receive(mux->sch, of->index, mt.pkt);
+        stream_idx = mt.pkt->stream_index;
         if (stream_idx < 0) {
             av_log(mux, AV_LOG_VERBOSE, "All streams finished\n");
             ret = 0;
             break;
         }
 
-        ost = of->streams[stream_idx];
-        ret = mux_packet_filter(mux, ost, ret < 0 ? NULL : mt.pkt, &stream_eof);
+        ost = of->streams[mux->sch_stream_idx[stream_idx]];
+        mt.pkt->stream_index = ost->index;
+
+        ret = mux_packet_filter(mux, &mt, ost, ret < 0 ? NULL : mt.pkt, &stream_eof);
         av_packet_unref(mt.pkt);
         if (ret == AVERROR_EOF) {
             if (stream_eof) {
-                tq_receive_finish(mux->tq, stream_idx);
+                sch_mux_receive_finish(mux->sch, of->index, stream_idx);
             } else {
                 av_log(mux, AV_LOG_VERBOSE, "Muxer returned EOF\n");
                 ret = 0;
@@ -343,243 +374,55 @@  void *muxer_thread(void *arg)
 finish:
     mux_thread_uninit(&mt);
 
-    for (unsigned int i = 0; i < mux->fc->nb_streams; i++)
-        tq_receive_finish(mux->tq, i);
-
-    av_log(mux, AV_LOG_VERBOSE, "Terminating muxer thread\n");
-
     return (void*)(intptr_t)ret;
 }
 
-static int thread_submit_packet(Muxer *mux, OutputStream *ost, AVPacket *pkt)
-{
-    int ret = 0;
-
-    if (!pkt || ost->finished & MUXER_FINISHED)
-        goto finish;
-
-    ret = tq_send(mux->tq, ost->index, pkt);
-    if (ret < 0)
-        goto finish;
-
-    return 0;
-
-finish:
-    if (pkt)
-        av_packet_unref(pkt);
-
-    ost->finished |= MUXER_FINISHED;
-    tq_send_finish(mux->tq, ost->index);
-    return ret == AVERROR_EOF ? 0 : ret;
-}
-
-static int queue_packet(OutputStream *ost, AVPacket *pkt)
-{
-    MuxStream *ms = ms_from_ost(ost);
-    AVPacket *tmp_pkt = NULL;
-    int ret;
-
-    if (!av_fifo_can_write(ms->muxing_queue)) {
-        size_t cur_size = av_fifo_can_read(ms->muxing_queue);
-        size_t pkt_size = pkt ? pkt->size : 0;
-        unsigned int are_we_over_size =
-            (ms->muxing_queue_data_size + pkt_size) > ms->muxing_queue_data_threshold;
-        size_t limit    = are_we_over_size ? ms->max_muxing_queue_size : SIZE_MAX;
-        size_t new_size = FFMIN(2 * cur_size, limit);
-
-        if (new_size <= cur_size) {
-            av_log(ost, AV_LOG_ERROR,
-                   "Too many packets buffered for output stream %d:%d.\n",
-                   ost->file_index, ost->st->index);
-            return AVERROR(ENOSPC);
-        }
-        ret = av_fifo_grow2(ms->muxing_queue, new_size - cur_size);
-        if (ret < 0)
-            return ret;
-    }
-
-    if (pkt) {
-        ret = av_packet_make_refcounted(pkt);
-        if (ret < 0)
-            return ret;
-
-        tmp_pkt = av_packet_alloc();
-        if (!tmp_pkt)
-            return AVERROR(ENOMEM);
-
-        av_packet_move_ref(tmp_pkt, pkt);
-        ms->muxing_queue_data_size += tmp_pkt->size;
-    }
-    av_fifo_write(ms->muxing_queue, &tmp_pkt, 1);
-
-    return 0;
-}
-
-static int submit_packet(Muxer *mux, AVPacket *pkt, OutputStream *ost)
-{
-    int ret;
-
-    if (mux->tq) {
-        return thread_submit_packet(mux, ost, pkt);
-    } else {
-        /* the muxer is not initialized yet, buffer the packet */
-        ret = queue_packet(ost, pkt);
-        if (ret < 0) {
-            if (pkt)
-                av_packet_unref(pkt);
-            return ret;
-        }
-    }
-
-    return 0;
-}
-
-int of_output_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
-{
-    Muxer *mux = mux_from_of(of);
-    int ret = 0;
-
-    if (pkt && pkt->dts != AV_NOPTS_VALUE)
-        ost->last_mux_dts = av_rescale_q(pkt->dts, pkt->time_base, AV_TIME_BASE_Q);
-
-    ret = submit_packet(mux, pkt, ost);
-    if (ret < 0) {
-        av_log(ost, AV_LOG_ERROR, "Error submitting a packet to the muxer: %s",
-               av_err2str(ret));
-        return ret;
-    }
-
-    return 0;
-}
-
-int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts)
+static int of_streamcopy(OutputStream *ost, AVPacket *pkt)
 {
     OutputFile *of = output_files[ost->file_index];
     MuxStream  *ms = ms_from_ost(ost);
+    DemuxPktData *pd = pkt->opaque_ref ? (DemuxPktData*)pkt->opaque_ref->data : NULL;
+    int64_t      dts = pd ? pd->dts_est : AV_NOPTS_VALUE;
     int64_t start_time = (of->start_time == AV_NOPTS_VALUE) ? 0 : of->start_time;
     int64_t ts_offset;
-    AVPacket *opkt = ms->pkt;
-    int ret;
-
-    av_packet_unref(opkt);
 
     if (of->recording_time != INT64_MAX &&
         dts >= of->recording_time + start_time)
-        pkt = NULL;
-
-    // EOF: flush output bitstream filters.
-    if (!pkt)
-        return of_output_packet(of, ost, NULL);
+        return AVERROR_EOF;
 
     if (!ms->streamcopy_started && !(pkt->flags & AV_PKT_FLAG_KEY) &&
         !ms->copy_initial_nonkeyframes)
-        return 0;
+        return AVERROR(EAGAIN);
 
     if (!ms->streamcopy_started) {
         if (!ms->copy_prior_start &&
             (pkt->pts == AV_NOPTS_VALUE ?
              dts < ms->ts_copy_start :
              pkt->pts < av_rescale_q(ms->ts_copy_start, AV_TIME_BASE_Q, pkt->time_base)))
-            return 0;
+            return AVERROR(EAGAIN);
 
         if (of->start_time != AV_NOPTS_VALUE && dts < of->start_time)
-            return 0;
+            return AVERROR(EAGAIN);
     }
 
-    ret = av_packet_ref(opkt, pkt);
-    if (ret < 0)
-        return ret;
-
-    ts_offset = av_rescale_q(start_time, AV_TIME_BASE_Q, opkt->time_base);
+    ts_offset = av_rescale_q(start_time, AV_TIME_BASE_Q, pkt->time_base);
 
     if (pkt->pts != AV_NOPTS_VALUE)
-        opkt->pts -= ts_offset;
+        pkt->pts -= ts_offset;
 
     if (pkt->dts == AV_NOPTS_VALUE) {
-        opkt->dts = av_rescale_q(dts, AV_TIME_BASE_Q, opkt->time_base);
+        pkt->dts = av_rescale_q(dts, AV_TIME_BASE_Q, pkt->time_base);
     } else if (ost->st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) {
-        opkt->pts = opkt->dts - ts_offset;
-    }
-    opkt->dts -= ts_offset;
-
-    {
-        int ret = trigger_fix_sub_duration_heartbeat(ost, pkt);
-        if (ret < 0) {
-            av_log(NULL, AV_LOG_ERROR,
-                   "Subtitle heartbeat logic failed in %s! (%s)\n",
-                   __func__, av_err2str(ret));
-            return ret;
-        }
+        pkt->pts = pkt->dts - ts_offset;
     }
 
-    ret = of_output_packet(of, ost, opkt);
-    if (ret < 0)
-        return ret;
+    pkt->dts -= ts_offset;
 
     ms->streamcopy_started = 1;
 
     return 0;
 }
 
-static int thread_stop(Muxer *mux)
-{
-    void *ret;
-
-    if (!mux || !mux->tq)
-        return 0;
-
-    for (unsigned int i = 0; i < mux->fc->nb_streams; i++)
-        tq_send_finish(mux->tq, i);
-
-    pthread_join(mux->thread, &ret);
-
-    tq_free(&mux->tq);
-
-    return (int)(intptr_t)ret;
-}
-
-static int thread_start(Muxer *mux)
-{
-    AVFormatContext *fc = mux->fc;
-    ObjPool *op;
-    int ret;
-
-    op = objpool_alloc_packets();
-    if (!op)
-        return AVERROR(ENOMEM);
-
-    mux->tq = tq_alloc(fc->nb_streams, mux->thread_queue_size, op, pkt_move);
-    if (!mux->tq) {
-        objpool_free(&op);
-        return AVERROR(ENOMEM);
-    }
-
-    ret = pthread_create(&mux->thread, NULL, muxer_thread, (void*)mux);
-    if (ret) {
-        tq_free(&mux->tq);
-        return AVERROR(ret);
-    }
-
-    /* flush the muxing queues */
-    for (int i = 0; i < fc->nb_streams; i++) {
-        OutputStream *ost = mux->of.streams[i];
-        MuxStream     *ms = ms_from_ost(ost);
-        AVPacket *pkt;
-
-        while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) {
-            ret = thread_submit_packet(mux, ost, pkt);
-            if (pkt) {
-                ms->muxing_queue_data_size -= pkt->size;
-                av_packet_free(&pkt);
-            }
-            if (ret < 0)
-                return ret;
-        }
-    }
-
-    return 0;
-}
-
 int print_sdp(const char *filename);
 
 int print_sdp(const char *filename)
@@ -590,11 +433,6 @@  int print_sdp(const char *filename)
     AVIOContext *sdp_pb;
     AVFormatContext **avc;
 
-    for (i = 0; i < nb_output_files; i++) {
-        if (!mux_from_of(output_files[i])->header_written)
-            return 0;
-    }
-
     avc = av_malloc_array(nb_output_files, sizeof(*avc));
     if (!avc)
         return AVERROR(ENOMEM);
@@ -629,25 +467,17 @@  int print_sdp(const char *filename)
         avio_closep(&sdp_pb);
     }
 
-    // SDP successfully written, allow muxer threads to start
-    ret = 1;
-
 fail:
     av_freep(&avc);
     return ret;
 }
 
-int mux_check_init(Muxer *mux)
+int mux_check_init(void *arg)
 {
+    Muxer     *mux = arg;
     OutputFile *of = &mux->of;
     AVFormatContext *fc = mux->fc;
-    int ret, i;
-
-    for (i = 0; i < fc->nb_streams; i++) {
-        OutputStream *ost = of->streams[i];
-        if (!ost->initialized)
-            return 0;
-    }
+    int ret;
 
     ret = avformat_write_header(fc, &mux->opts);
     if (ret < 0) {
@@ -659,27 +489,7 @@  int mux_check_init(Muxer *mux)
     mux->header_written = 1;
 
     av_dump_format(fc, of->index, fc->url, 1);
-    nb_output_dumped++;
-
-    if (sdp_filename || want_sdp) {
-        ret = print_sdp(sdp_filename);
-        if (ret < 0) {
-            av_log(NULL, AV_LOG_ERROR, "Error writing the SDP.\n");
-            return ret;
-        } else if (ret == 1) {
-            /* SDP is written only after all the muxers are ready, so now we
-             * start ALL the threads */
-            for (i = 0; i < nb_output_files; i++) {
-                ret = thread_start(mux_from_of(output_files[i]));
-                if (ret < 0)
-                    return ret;
-            }
-        }
-    } else {
-        ret = thread_start(mux_from_of(of));
-        if (ret < 0)
-            return ret;
-    }
+    atomic_fetch_add(&nb_output_dumped, 1);
 
     return 0;
 }
@@ -736,9 +546,10 @@  int of_stream_init(OutputFile *of, OutputStream *ost)
                                          ost->st->time_base);
     }
 
-    ost->initialized = 1;
+    if (ms->sch_idx >= 0)
+        return sch_mux_stream_ready(mux->sch, of->index, ms->sch_idx);
 
-    return mux_check_init(mux);
+    return 0;
 }
 
 static int check_written(OutputFile *of)
@@ -852,15 +663,13 @@  int of_write_trailer(OutputFile *of)
     AVFormatContext *fc = mux->fc;
     int ret, mux_result = 0;
 
-    if (!mux->tq) {
+    if (!mux->header_written) {
         av_log(mux, AV_LOG_ERROR,
                "Nothing was written into output file, because "
                "at least one of its streams received no packets.\n");
         return AVERROR(EINVAL);
     }
 
-    mux_result = thread_stop(mux);
-
     ret = av_write_trailer(fc);
     if (ret < 0) {
         av_log(mux, AV_LOG_ERROR, "Error writing trailer: %s\n", av_err2str(ret));
@@ -905,13 +714,6 @@  static void ost_free(OutputStream **post)
         ost->logfile = NULL;
     }
 
-    if (ms->muxing_queue) {
-        AVPacket *pkt;
-        while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0)
-            av_packet_free(&pkt);
-        av_fifo_freep2(&ms->muxing_queue);
-    }
-
     avcodec_parameters_free(&ost->par_in);
 
     av_bsf_free(&ms->bsf_ctx);
@@ -976,8 +778,6 @@  void of_free(OutputFile **pof)
         return;
     mux = mux_from_of(of);
 
-    thread_stop(mux);
-
     sq_free(&of->sq_encode);
     sq_free(&mux->sq_mux);
 
diff --git a/fftools/ffmpeg_mux.h b/fftools/ffmpeg_mux.h
index eee2b2cb07..5d7cf3fa76 100644
--- a/fftools/ffmpeg_mux.h
+++ b/fftools/ffmpeg_mux.h
@@ -25,7 +25,6 @@ 
 #include <stdint.h>
 
 #include "ffmpeg_sched.h"
-#include "thread_queue.h"
 
 #include "libavformat/avformat.h"
 
@@ -33,7 +32,6 @@ 
 
 #include "libavutil/dict.h"
 #include "libavutil/fifo.h"
-#include "libavutil/thread.h"
 
 typedef struct MuxStream {
     OutputStream ost;
@@ -41,9 +39,6 @@  typedef struct MuxStream {
     // name used for logging
     char log_name[32];
 
-    /* the packets are buffered here until the muxer is ready to be initialized */
-    AVFifo *muxing_queue;
-
     AVBSFContext *bsf_ctx;
     AVPacket     *bsf_pkt;
 
@@ -57,17 +52,6 @@  typedef struct MuxStream {
 
     int64_t max_frames;
 
-    /*
-     * The size of the AVPackets' buffers in queue.
-     * Updated when a packet is either pushed or pulled from the queue.
-     */
-    size_t muxing_queue_data_size;
-
-    int max_muxing_queue_size;
-
-    /* Threshold after which max_muxing_queue_size will be in effect */
-    size_t muxing_queue_data_threshold;
-
     // timestamp from which the streamcopied streams should start,
     // in AV_TIME_BASE_Q;
     // everything before it should be discarded
@@ -106,9 +90,6 @@  typedef struct Muxer {
     int         *sch_stream_idx;
     int       nb_sch_stream_idx;
 
-    pthread_t    thread;
-    ThreadQueue *tq;
-
     AVDictionary *opts;
 
     int thread_queue_size;
@@ -122,10 +103,7 @@  typedef struct Muxer {
     AVPacket *sq_pkt;
 } Muxer;
 
-/* whether we want to print an SDP, set in of_open() */
-extern int want_sdp;
-
-int mux_check_init(Muxer *mux);
+int mux_check_init(void *arg);
 
 static MuxStream *ms_from_ost(OutputStream *ost)
 {
diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c
index 534b4379c7..6459296ab0 100644
--- a/fftools/ffmpeg_mux_init.c
+++ b/fftools/ffmpeg_mux_init.c
@@ -924,13 +924,6 @@  static int new_stream_audio(Muxer *mux, const OptionsContext *o,
     return 0;
 }
 
-static int new_stream_attachment(Muxer *mux, const OptionsContext *o,
-                                 OutputStream *ost)
-{
-    ost->finished    = 1;
-    return 0;
-}
-
 static int new_stream_subtitle(Muxer *mux, const OptionsContext *o,
                                OutputStream *ost)
 {
@@ -1168,9 +1161,6 @@  static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
     if (!ost->par_in)
         return AVERROR(ENOMEM);
 
-    ms->muxing_queue = av_fifo_alloc2(8, sizeof(AVPacket*), 0);
-    if (!ms->muxing_queue)
-        return AVERROR(ENOMEM);
     ms->last_mux_dts = AV_NOPTS_VALUE;
 
     ost->st         = st;
@@ -1190,7 +1180,8 @@  static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
         if (!ost->enc_ctx)
             return AVERROR(ENOMEM);
 
-        ret = sch_add_enc(mux->sch, encoder_thread, ost, NULL);
+        ret = sch_add_enc(mux->sch, encoder_thread, ost,
+                          ost->type == AVMEDIA_TYPE_SUBTITLE ? NULL : enc_open);
         if (ret < 0)
             return ret;
         ms->sch_idx_enc = ret;
@@ -1414,9 +1405,6 @@  static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
 
         sch_mux_stream_buffering(mux->sch, mux->sch_idx, ms->sch_idx,
                                  max_muxing_queue_size, muxing_queue_data_threshold);
-
-        ms->max_muxing_queue_size       = max_muxing_queue_size;
-        ms->muxing_queue_data_threshold = muxing_queue_data_threshold;
     }
 
     MATCH_PER_STREAM_OPT(bits_per_raw_sample, i, ost->bits_per_raw_sample,
@@ -1434,8 +1422,6 @@  static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
     if (ost->enc_ctx && av_get_exact_bits_per_sample(ost->enc_ctx->codec_id) == 24)
         av_dict_set(&ost->swr_opts, "output_sample_bits", "24", 0);
 
-    ost->last_mux_dts = AV_NOPTS_VALUE;
-
     MATCH_PER_STREAM_OPT(copy_initial_nonkeyframes, i,
                          ms->copy_initial_nonkeyframes, oc, st);
 
@@ -1443,7 +1429,6 @@  static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
     case AVMEDIA_TYPE_VIDEO:      ret = new_stream_video     (mux, o, ost); break;
     case AVMEDIA_TYPE_AUDIO:      ret = new_stream_audio     (mux, o, ost); break;
     case AVMEDIA_TYPE_SUBTITLE:   ret = new_stream_subtitle  (mux, o, ost); break;
-    case AVMEDIA_TYPE_ATTACHMENT: ret = new_stream_attachment(mux, o, ost); break;
     }
     if (ret < 0)
         return ret;
@@ -1938,7 +1923,6 @@  static int setup_sync_queues(Muxer *mux, AVFormatContext *oc, int64_t buf_size_u
         MuxStream     *ms = ms_from_ost(ost);
         enum AVMediaType type = ost->type;
 
-        ost->sq_idx_encode = -1;
         ost->sq_idx_mux    = -1;
 
         nb_interleaved += IS_INTERLEAVED(type);
@@ -1961,11 +1945,17 @@  static int setup_sync_queues(Muxer *mux, AVFormatContext *oc, int64_t buf_size_u
      * - at least one encoded audio/video stream is frame-limited, since
      *   that has similar semantics to 'shortest'
      * - at least one audio encoder requires constant frame sizes
+     *
+     * Note that encoding sync queues are handled in the scheduler, because
+     * different encoders run in different threads and need external
+     * synchronization, while muxer sync queues can be handled inside the muxer
      */
     if ((of->shortest && nb_av_enc > 1) || limit_frames_av_enc || nb_audio_fs) {
-        of->sq_encode = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, mux);
-        if (!of->sq_encode)
-            return AVERROR(ENOMEM);
+        int sq_idx, ret;
+
+        sq_idx = sch_add_sq_enc(mux->sch, buf_size_us, mux);
+        if (sq_idx < 0)
+            return sq_idx;
 
         for (int i = 0; i < oc->nb_streams; i++) {
             OutputStream *ost = of->streams[i];
@@ -1975,13 +1965,11 @@  static int setup_sync_queues(Muxer *mux, AVFormatContext *oc, int64_t buf_size_u
             if (!IS_AV_ENC(ost, type))
                 continue;
 
-            ost->sq_idx_encode = sq_add_stream(of->sq_encode,
-                                               of->shortest || ms->max_frames < INT64_MAX);
-            if (ost->sq_idx_encode < 0)
-                return ost->sq_idx_encode;
-
-            if (ms->max_frames != INT64_MAX)
-                sq_limit_frames(of->sq_encode, ost->sq_idx_encode, ms->max_frames);
+            ret = sch_sq_add_enc(mux->sch, sq_idx, ms->sch_idx_enc,
+                                 of->shortest || ms->max_frames < INT64_MAX,
+                                 ms->max_frames);
+            if (ret < 0)
+                return ret;
         }
     }
 
@@ -2652,23 +2640,6 @@  static int validate_enc_avopt(Muxer *mux, const AVDictionary *codec_avopt)
     return 0;
 }
 
-static int init_output_stream_nofilter(OutputStream *ost)
-{
-    int ret = 0;
-
-    if (ost->enc_ctx) {
-        ret = enc_open(ost, NULL);
-        if (ret < 0)
-            return ret;
-    } else {
-        ret = of_stream_init(output_files[ost->file_index], ost);
-        if (ret < 0)
-            return ret;
-    }
-
-    return ret;
-}
-
 static const char *output_file_item_name(void *obj)
 {
     const Muxer *mux = obj;
@@ -2751,8 +2722,6 @@  int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
     av_strlcat(mux->log_name, "/",               sizeof(mux->log_name));
     av_strlcat(mux->log_name, oc->oformat->name, sizeof(mux->log_name));
 
-    if (strcmp(oc->oformat->name, "rtp"))
-        want_sdp = 0;
 
     of->format = oc->oformat;
     if (recording_time != INT64_MAX)
@@ -2768,7 +2737,7 @@  int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
                                            AVFMT_FLAG_BITEXACT);
     }
 
-    err = sch_add_mux(sch, muxer_thread, NULL, mux,
+    err = sch_add_mux(sch, muxer_thread, mux_check_init, mux,
                       !strcmp(oc->oformat->name, "rtp"));
     if (err < 0)
         return err;
@@ -2854,26 +2823,15 @@  int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
 
     of->url        = filename;
 
-    /* initialize stream copy and subtitle/data streams.
-     * Encoded AVFrame based streams will get initialized when the first AVFrame
-     * is received in do_video_out
-     */
+    /* initialize streamcopy streams. */
     for (int i = 0; i < of->nb_streams; i++) {
         OutputStream *ost = of->streams[i];
 
-        if (ost->filter)
-            continue;
-
-        err = init_output_stream_nofilter(ost);
-        if (err < 0)
-            return err;
-    }
-
-    /* write the header for files with no streams */
-    if (of->format->flags & AVFMT_NOSTREAMS && oc->nb_streams == 0) {
-        int ret = mux_check_init(mux);
-        if (ret < 0)
-            return ret;
+        if (!ost->enc) {
+            err = of_stream_init(of, ost);
+            if (err < 0)
+                return err;
+        }
     }
 
     return 0;
diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c
index d463306546..6177a96a4e 100644
--- a/fftools/ffmpeg_opt.c
+++ b/fftools/ffmpeg_opt.c
@@ -64,7 +64,6 @@  const char *const opt_name_top_field_first[]                  = {"top", NULL};
 HWDevice *filter_hw_device;
 
 char *vstats_filename;
-char *sdp_filename;
 
 float audio_drift_threshold = 0.1;
 float dts_delta_threshold   = 10;
@@ -580,9 +579,8 @@  fail:
 
 static int opt_sdp_file(void *optctx, const char *opt, const char *arg)
 {
-    av_free(sdp_filename);
-    sdp_filename = av_strdup(arg);
-    return 0;
+    Scheduler *sch = optctx;
+    return sch_sdp_filename(sch, arg);
 }
 
 #if CONFIG_VAAPI
diff --git a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
index 957a410921..bc9b833799 100644
--- a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
+++ b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
@@ -1,48 +1,40 @@ 
 1
-00:00:00,968 --> 00:00:01,001
+00:00:00,968 --> 00:00:01,168
 <font face="Monospace">{\an7}(</font>
 
 2
-00:00:01,001 --> 00:00:01,168
-<font face="Monospace">{\an7}(</font>
-
-3
 00:00:01,168 --> 00:00:01,368
 <font face="Monospace">{\an7}(<i> inaudibl</i></font>
 
-4
+3
 00:00:01,368 --> 00:00:01,568
 <font face="Monospace">{\an7}(<i> inaudible radio chat</i></font>
 
-5
+4
 00:00:01,568 --> 00:00:02,002
 <font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font>
 
+5
+00:00:02,002 --> 00:00:03,103
+<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font>
+
 6
-00:00:02,002 --> 00:00:03,003
-<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font>
-
-7
-00:00:03,003 --> 00:00:03,103
-<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font>
-
-8
 00:00:03,103 --> 00:00:03,303
-<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
+<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
 >></font>
 
-9
+7
 00:00:03,303 --> 00:00:03,503
-<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
+<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
 >> Safety rema</font>
 
-10
+8
 00:00:03,504 --> 00:00:03,704
-<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
+<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
 >> Safety remains our numb</font>
 
-11
+9
 00:00:03,704 --> 00:00:04,004
-<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
+<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
 >> Safety remains our number one</font>