diff mbox series

[FFmpeg-devel,21/24] fftools/ffmpeg_filter: convert to the scheduler

Message ID 20231104092125.10213-22-anton@khirnov.net
State New
Headers show
Series [FFmpeg-devel,01/24] lavf/mux: do not apply max_interleave_delta to subtitles | expand

Commit Message

Anton Khirnov Nov. 4, 2023, 7:56 a.m. UTC
---
 fftools/ffmpeg.c        |  44 +--
 fftools/ffmpeg.h        |  32 +-
 fftools/ffmpeg_filter.c | 720 +++++++++++-----------------------------
 3 files changed, 204 insertions(+), 592 deletions(-)
diff mbox series

Patch

diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index bd783fe674..1f21008588 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -138,30 +138,6 @@  static struct termios oldtty;
 static int restore_tty;
 #endif
 
-/* sub2video hack:
-   Convert subtitles to video with alpha to insert them in filter graphs.
-   This is a temporary solution until libavfilter gets real subtitles support.
- */
-
-static void sub2video_heartbeat(InputFile *infile, int64_t pts, AVRational tb)
-{
-    /* When a frame is read from a file, examine all sub2video streams in
-       the same file and send the sub2video frame again. Otherwise, decoded
-       video frames could be accumulating in the filter graph while a filter
-       (possibly overlay) is desperately waiting for a subtitle frame. */
-    for (int i = 0; i < infile->nb_streams; i++) {
-        InputStream *ist = infile->streams[i];
-
-        if (ist->dec_ctx->codec_type != AVMEDIA_TYPE_SUBTITLE)
-            continue;
-
-        for (int j = 0; j < ist->nb_filters; j++)
-            ifilter_sub2video_heartbeat(ist->filters[j], pts, tb);
-    }
-}
-
-/* end of sub2video hack */
-
 static void term_exit_sigsafe(void)
 {
 #if HAVE_TERMIOS_H
@@ -552,8 +528,8 @@  static void print_report(int is_last_report, int64_t timer_start, int64_t cur_ti
             if (is_last_report)
                 av_bprintf(&buf, "L");
 
-            nb_frames_dup  = ost->filter->nb_frames_dup;
-            nb_frames_drop = ost->filter->nb_frames_drop;
+            nb_frames_dup  = atomic_load(&ost->filter->nb_frames_dup);
+            nb_frames_drop = atomic_load(&ost->filter->nb_frames_drop);
 
             vid = 1;
         }
@@ -890,9 +866,7 @@  static int choose_output(OutputStream **post)
     for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
         int64_t opts;
 
-        if (ost->filter && ost->filter->last_pts != AV_NOPTS_VALUE) {
-            opts = ost->filter->last_pts;
-        } else {
+        {
             opts = ost->last_mux_dts == AV_NOPTS_VALUE ?
                    INT64_MIN : ost->last_mux_dts;
         }
@@ -1041,8 +1015,6 @@  static int process_input(int file_index, AVPacket *pkt)
 
     ist = ifile->streams[pkt->stream_index];
 
-    sub2video_heartbeat(ifile, pkt->pts, pkt->time_base);
-
     ret = process_input_packet(ist, pkt, 0);
 
     av_packet_unref(pkt);
@@ -1061,8 +1033,6 @@  static int transcode_step(OutputStream *ost, AVPacket *demux_pkt)
     int ret;
 
     if (ost->filter) {
-        if ((ret = fg_transcode_step(ost->filter->graph, &ist)) < 0)
-            return ret;
         if (!ist)
             return 0;
     } else {
@@ -1078,14 +1048,6 @@  static int transcode_step(OutputStream *ost, AVPacket *demux_pkt)
     if (ret < 0)
         return ret == AVERROR_EOF ? 0 : ret;
 
-    // process_input() above might have caused output to become available
-    // in multiple filtergraphs, so we process all of them
-    for (int i = 0; i < nb_filtergraphs; i++) {
-        ret = reap_filters(filtergraphs[i], 0);
-        if (ret < 0)
-            return ret;
-    }
-
     return 0;
 }
 
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index 975d8b737e..c1b61c83e7 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -84,9 +84,7 @@  enum HWAccelID {
 };
 
 enum FrameOpaque {
-    FRAME_OPAQUE_REAP_FILTERS = 1,
-    FRAME_OPAQUE_CHOOSE_INPUT,
-    FRAME_OPAQUE_SUB_HEARTBEAT,
+    FRAME_OPAQUE_SUB_HEARTBEAT = 1,
     FRAME_OPAQUE_EOF,
     FRAME_OPAQUE_SEND_COMMAND,
 };
@@ -313,11 +311,8 @@  typedef struct OutputFilter {
 
     enum AVMediaType     type;
 
-    /* pts of the last frame received from this filter, in AV_TIME_BASE_Q */
-    int64_t last_pts;
-
-    uint64_t nb_frames_dup;
-    uint64_t nb_frames_drop;
+    atomic_uint_least64_t nb_frames_dup;
+    atomic_uint_least64_t nb_frames_drop;
 } OutputFilter;
 
 typedef struct FilterGraph {
@@ -728,10 +723,6 @@  int subtitle_wrap_frame(AVFrame *frame, AVSubtitle *subtitle, int copy);
  */
 FrameData *frame_data(AVFrame *frame);
 
-int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference);
-int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb);
-void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb);
-
 /**
  * Set up fallback filtering parameters from a decoder context. They will only
  * be used if no frames are ever sent on this input, otherwise the actual
@@ -752,26 +743,9 @@  int fg_create(FilterGraph **pfg, char *graph_desc, Scheduler *sch);
 
 void fg_free(FilterGraph **pfg);
 
-/**
- * Perform a step of transcoding for the specified filter graph.
- *
- * @param[in]  graph     filter graph to consider
- * @param[out] best_ist  input stream where a frame would allow to continue
- * @return  0 for success, <0 for error
- */
-int fg_transcode_step(FilterGraph *graph, InputStream **best_ist);
-
 void fg_send_command(FilterGraph *fg, double time, const char *target,
                      const char *command, const char *arg, int all_filters);
 
-/**
- * Get and encode new output from specified filtergraph, without causing
- * activity.
- *
- * @return  0 for success, <0 for severe errors
- */
-int reap_filters(FilterGraph *fg, int flush);
-
 int ffmpeg_parse_options(int argc, char **argv, Scheduler *sch);
 
 void enc_stats_write(OutputStream *ost, EncStats *es,
diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c
index c01fc0e8ea..7e902670b4 100644
--- a/fftools/ffmpeg_filter.c
+++ b/fftools/ffmpeg_filter.c
@@ -21,8 +21,6 @@ 
 #include <stdint.h>
 
 #include "ffmpeg.h"
-#include "ffmpeg_utils.h"
-#include "thread_queue.h"
 
 #include "libavfilter/avfilter.h"
 #include "libavfilter/buffersink.h"
@@ -53,10 +51,11 @@  typedef struct FilterGraphPriv {
     // true when the filtergraph contains only meta filters
     // that do not modify the frame data
     int is_meta;
+    // source filters are present in the graph
+    int have_sources;
     int disable_conversions;
 
-    int nb_inputs_bound;
-    int nb_outputs_bound;
+    unsigned nb_outputs_done;
 
     const char *graph_desc;
 
@@ -67,41 +66,6 @@  typedef struct FilterGraphPriv {
 
     Scheduler       *sch;
     unsigned         sch_idx;
-
-    pthread_t        thread;
-    /**
-     * Queue for sending frames from the main thread to the filtergraph. Has
-     * nb_inputs+1 streams - the first nb_inputs stream correspond to
-     * filtergraph inputs. Frames on those streams may have their opaque set to
-     * - FRAME_OPAQUE_EOF: frame contains no data, but pts+timebase of the
-     *   EOF event for the correspondint stream. Will be immediately followed by
-     *   this stream being send-closed.
-     * - FRAME_OPAQUE_SUB_HEARTBEAT: frame contains no data, but pts+timebase of
-     *   a subtitle heartbeat event. Will only be sent for sub2video streams.
-     *
-     * The last stream is "control" - the main thread sends empty AVFrames with
-     * opaque set to
-     * - FRAME_OPAQUE_REAP_FILTERS: a request to retrieve all frame available
-     *   from filtergraph outputs. These frames are sent to corresponding
-     *   streams in queue_out. Finally an empty frame is sent to the control
-     *   stream in queue_out.
-     * - FRAME_OPAQUE_CHOOSE_INPUT: same as above, but in case no frames are
-     *   available the terminating empty frame's opaque will contain the index+1
-     *   of the filtergraph input to which more input frames should be supplied.
-     */
-    ThreadQueue     *queue_in;
-    /**
-     * Queue for sending frames from the filtergraph back to the main thread.
-     * Has nb_outputs+1 streams - the first nb_outputs stream correspond to
-     * filtergraph outputs.
-     *
-     * The last stream is "control" - see documentation for queue_in for more
-     * details.
-     */
-    ThreadQueue     *queue_out;
-    // submitting frames to filter thread returned EOF
-    // this only happens on thread exit, so is not per-input
-    int              eof_in;
 } FilterGraphPriv;
 
 static FilterGraphPriv *fgp_from_fg(FilterGraph *fg)
@@ -123,6 +87,9 @@  typedef struct FilterGraphThread {
     // The output index is stored in frame opaque.
     AVFifo  *frame_queue_out;
 
+    // index of the next input to request from the scheduler
+    unsigned next_in;
+    // set to 1 after at least one frame passed through this output
     int      got_frame;
 
     // EOF status of each input/output, as received by the thread
@@ -253,9 +220,6 @@  typedef struct OutputFilterPriv {
     int64_t ts_offset;
     int64_t next_pts;
     FPSConvContext fps;
-
-    // set to 1 after at least one frame passed through this output
-    int got_frame;
 } OutputFilterPriv;
 
 static OutputFilterPriv *ofp_from_ofilter(OutputFilter *ofilter)
@@ -653,57 +617,6 @@  static int ifilter_has_all_input_formats(FilterGraph *fg)
 
 static void *filter_thread(void *arg);
 
-// start the filtering thread once all inputs and outputs are bound
-static int fg_thread_try_start(FilterGraphPriv *fgp)
-{
-    FilterGraph *fg = &fgp->fg;
-    ObjPool *op;
-    int ret = 0;
-
-    if (fgp->nb_inputs_bound  < fg->nb_inputs ||
-        fgp->nb_outputs_bound < fg->nb_outputs)
-        return 0;
-
-    op = objpool_alloc_frames();
-    if (!op)
-        return AVERROR(ENOMEM);
-
-    fgp->queue_in = tq_alloc(fg->nb_inputs + 1, 1, op, frame_move);
-    if (!fgp->queue_in) {
-        objpool_free(&op);
-        return AVERROR(ENOMEM);
-    }
-
-    // at least one output is mandatory
-    op = objpool_alloc_frames();
-    if (!op)
-        goto fail;
-
-    fgp->queue_out = tq_alloc(fg->nb_outputs + 1, 1, op, frame_move);
-    if (!fgp->queue_out) {
-        objpool_free(&op);
-        goto fail;
-    }
-
-    ret = pthread_create(&fgp->thread, NULL, filter_thread, fgp);
-    if (ret) {
-        ret = AVERROR(ret);
-        av_log(NULL, AV_LOG_ERROR, "pthread_create() for filtergraph %d failed: %s\n",
-               fg->index, av_err2str(ret));
-        goto fail;
-    }
-
-    return 0;
-fail:
-    if (ret >= 0)
-        ret = AVERROR(ENOMEM);
-
-    tq_free(&fgp->queue_in);
-    tq_free(&fgp->queue_out);
-
-    return ret;
-}
-
 static char *describe_filter_link(FilterGraph *fg, AVFilterInOut *inout, int in)
 {
     AVFilterContext *ctx = inout->filter_ctx;
@@ -729,7 +642,6 @@  static OutputFilter *ofilter_alloc(FilterGraph *fg)
     ofilter->graph    = fg;
     ofp->format       = -1;
     ofp->index        = fg->nb_outputs - 1;
-    ofilter->last_pts = AV_NOPTS_VALUE;
 
     return ofilter;
 }
@@ -760,10 +672,7 @@  static int ifilter_bind_ist(InputFilter *ifilter, InputStream *ist)
             return AVERROR(ENOMEM);
     }
 
-    fgp->nb_inputs_bound++;
-    av_assert0(fgp->nb_inputs_bound <= ifilter->graph->nb_inputs);
-
-    return fg_thread_try_start(fgp);
+    return 0;
 }
 
 static int set_channel_layout(OutputFilterPriv *f, OutputStream *ost)
@@ -902,10 +811,7 @@  int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost,
     if (ret < 0)
         return ret;
 
-    fgp->nb_outputs_bound++;
-    av_assert0(fgp->nb_outputs_bound <= fg->nb_outputs);
-
-    return fg_thread_try_start(fgp);
+    return 0;
 }
 
 static InputFilter *ifilter_alloc(FilterGraph *fg)
@@ -935,34 +841,6 @@  static InputFilter *ifilter_alloc(FilterGraph *fg)
     return ifilter;
 }
 
-static int fg_thread_stop(FilterGraphPriv *fgp)
-{
-    void *ret;
-
-    if (!fgp->queue_in)
-        return 0;
-
-    for (int i = 0; i <= fgp->fg.nb_inputs; i++) {
-        InputFilterPriv *ifp = i < fgp->fg.nb_inputs ?
-                               ifp_from_ifilter(fgp->fg.inputs[i]) : NULL;
-
-        if (ifp)
-            ifp->eof = 1;
-
-        tq_send_finish(fgp->queue_in, i);
-    }
-
-    for (int i = 0; i <= fgp->fg.nb_outputs; i++)
-        tq_receive_finish(fgp->queue_out, i);
-
-    pthread_join(fgp->thread, &ret);
-
-    tq_free(&fgp->queue_in);
-    tq_free(&fgp->queue_out);
-
-    return (int)(intptr_t)ret;
-}
-
 void fg_free(FilterGraph **pfg)
 {
     FilterGraph *fg = *pfg;
@@ -972,8 +850,6 @@  void fg_free(FilterGraph **pfg)
         return;
     fgp = fgp_from_fg(fg);
 
-    fg_thread_stop(fgp);
-
     avfilter_graph_free(&fg->graph);
     for (int j = 0; j < fg->nb_inputs; j++) {
         InputFilter *ifilter = fg->inputs[j];
@@ -1072,6 +948,15 @@  int fg_create(FilterGraph **pfg, char *graph_desc, Scheduler *sch)
     if (ret < 0)
         goto fail;
 
+    for (unsigned i = 0; i < graph->nb_filters; i++) {
+        const AVFilter *f = graph->filters[i]->filter;
+        if (!avfilter_filter_pad_count(f, 0) &&
+            !(f->flags & AVFILTER_FLAG_DYNAMIC_INPUTS)) {
+            fgp->have_sources = 1;
+            break;
+        }
+    }
+
     for (AVFilterInOut *cur = inputs; cur; cur = cur->next) {
         InputFilter *const ifilter = ifilter_alloc(fg);
         InputFilterPriv       *ifp = ifp_from_ifilter(ifilter);
@@ -1792,6 +1677,7 @@  static int configure_filtergraph(FilterGraph *fg, const FilterGraphThread *fgt)
     AVBufferRef *hw_device;
     AVFilterInOut *inputs, *outputs, *cur;
     int ret, i, simple = filtergraph_is_simple(fg);
+    int have_input_eof = 0;
     const char *graph_desc = fgp->graph_desc;
 
     cleanup_filtergraph(fg);
@@ -1914,11 +1800,18 @@  static int configure_filtergraph(FilterGraph *fg, const FilterGraphThread *fgt)
             ret = av_buffersrc_add_frame(ifp->filter, NULL);
             if (ret < 0)
                 goto fail;
+            have_input_eof = 1;
         }
     }
 
-    return 0;
+    if (have_input_eof) {
+        // make sure the EOF propagates to the end of the graph
+        ret = avfilter_graph_request_oldest(fg->graph);
+        if (ret < 0 && ret != AVERROR(EAGAIN) && ret != AVERROR_EOF)
+            goto fail;
+    }
 
+    return 0;
 fail:
     cleanup_filtergraph(fg);
     return ret;
@@ -2174,7 +2067,7 @@  static void video_sync_process(OutputFilterPriv *ofp, AVFrame *frame,
                                                 fps->frames_prev_hist[2]);
 
         if (!*nb_frames && fps->last_dropped) {
-            ofilter->nb_frames_drop++;
+            atomic_fetch_add(&ofilter->nb_frames_drop, 1);
             fps->last_dropped++;
         }
 
@@ -2252,21 +2145,23 @@  finish:
     fps->frames_prev_hist[0] = *nb_frames_prev;
 
     if (*nb_frames_prev == 0 && fps->last_dropped) {
-        ofilter->nb_frames_drop++;
+        atomic_fetch_add(&ofilter->nb_frames_drop, 1);
         av_log(ost, AV_LOG_VERBOSE,
                "*** dropping frame %"PRId64" at ts %"PRId64"\n",
                fps->frame_number, fps->last_frame->pts);
     }
     if (*nb_frames > (*nb_frames_prev && fps->last_dropped) + (*nb_frames > *nb_frames_prev)) {
+        uint64_t nb_frames_dup;
         if (*nb_frames > dts_error_threshold * 30) {
             av_log(ost, AV_LOG_ERROR, "%"PRId64" frame duplication too large, skipping\n", *nb_frames - 1);
-            ofilter->nb_frames_drop++;
+            atomic_fetch_add(&ofilter->nb_frames_drop, 1);
             *nb_frames = 0;
             return;
         }
-        ofilter->nb_frames_dup += *nb_frames - (*nb_frames_prev && fps->last_dropped) - (*nb_frames > *nb_frames_prev);
+        nb_frames_dup = atomic_fetch_add(&ofilter->nb_frames_dup,
+                                         *nb_frames - (*nb_frames_prev && fps->last_dropped) - (*nb_frames > *nb_frames_prev));
         av_log(ost, AV_LOG_VERBOSE, "*** %"PRId64" dup!\n", *nb_frames - 1);
-        if (ofilter->nb_frames_dup > fps->dup_warning) {
+        if (nb_frames_dup > fps->dup_warning) {
             av_log(ost, AV_LOG_WARNING, "More than %"PRIu64" frames duplicated\n", fps->dup_warning);
             fps->dup_warning *= 10;
         }
@@ -2276,8 +2171,57 @@  finish:
     fps->dropped_keyframe |= fps->last_dropped && (frame->flags & AV_FRAME_FLAG_KEY);
 }
 
+static int close_output(OutputFilterPriv *ofp, FilterGraphThread *fgt)
+{
+    FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph);
+    int ret;
+
+    // we are finished and no frames were ever seen at this output,
+    // at least initialize the encoder with a dummy frame
+    if (!fgt->got_frame) {
+        AVFrame *frame = fgt->frame;
+        FrameData *fd;
+
+        frame->time_base   = ofp->tb_out;
+        frame->format      = ofp->format;
+
+        frame->width               = ofp->width;
+        frame->height              = ofp->height;
+        frame->sample_aspect_ratio = ofp->sample_aspect_ratio;
+
+        frame->sample_rate = ofp->sample_rate;
+        if (ofp->ch_layout.nb_channels) {
+            ret = av_channel_layout_copy(&frame->ch_layout, &ofp->ch_layout);
+            if (ret < 0)
+                return ret;
+        }
+
+        fd = frame_data(frame);
+        if (!fd)
+            return AVERROR(ENOMEM);
+
+        fd->frame_rate_filter = ofp->fps.framerate;
+
+        av_assert0(!frame->buf[0]);
+
+        av_log(ofp->ofilter.ost, AV_LOG_WARNING,
+               "No filtered frames for output stream, trying to "
+               "initialize anyway.\n");
+
+        ret = sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, frame);
+        if (ret < 0) {
+            av_frame_unref(frame);
+            return ret;
+        }
+    }
+
+    fgt->eof_out[ofp->index] = 1;
+
+    return sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, NULL);
+}
+
 static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt,
-                           AVFrame *frame, int buffer)
+                           AVFrame *frame)
 {
     FilterGraphPriv  *fgp = fgp_from_fg(ofp->ofilter.graph);
     AVFrame   *frame_prev = ofp->fps.last_frame;
@@ -2324,28 +2268,17 @@  static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt,
             frame_out = frame;
         }
 
-        if (buffer) {
-            AVFrame *f = av_frame_alloc();
-
-            if (!f) {
-                av_frame_unref(frame_out);
-                return AVERROR(ENOMEM);
-            }
-
-            av_frame_move_ref(f, frame_out);
-            f->opaque = (void*)(intptr_t)ofp->index;
-
-            ret = av_fifo_write(fgt->frame_queue_out, &f, 1);
-            if (ret < 0) {
-                av_frame_free(&f);
-                return AVERROR(ENOMEM);
-            }
-        } else {
-            // return the frame to the main thread
-            ret = tq_send(fgp->queue_out, ofp->index, frame_out);
+        {
+            // send the frame to consumers
+            ret = sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, frame_out);
             if (ret < 0) {
                 av_frame_unref(frame_out);
-                fgt->eof_out[ofp->index] = 1;
+
+                if (!fgt->eof_out[ofp->index]) {
+                    fgt->eof_out[ofp->index] = 1;
+                    fgp->nb_outputs_done++;
+                }
+
                 return ret == AVERROR_EOF ? 0 : ret;
             }
         }
@@ -2366,16 +2299,14 @@  static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt,
         av_frame_move_ref(frame_prev, frame);
     }
 
-    if (!frame) {
-        tq_send_finish(fgp->queue_out, ofp->index);
-        fgt->eof_out[ofp->index] = 1;
-    }
+    if (!frame)
+        return close_output(ofp, fgt);
 
     return 0;
 }
 
 static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
-                          AVFrame *frame,  int buffer)
+                          AVFrame *frame)
 {
     FilterGraphPriv    *fgp = fgp_from_fg(ofp->ofilter.graph);
     OutputStream       *ost = ofp->ofilter.ost;
@@ -2385,8 +2316,8 @@  static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
 
     ret = av_buffersink_get_frame_flags(filter, frame,
                                         AV_BUFFERSINK_FLAG_NO_REQUEST);
-    if (ret == AVERROR_EOF && !buffer && !fgt->eof_out[ofp->index]) {
-        ret = fg_output_frame(ofp, fgt, NULL, buffer);
+    if (ret == AVERROR_EOF && !fgt->eof_out[ofp->index]) {
+        ret = fg_output_frame(ofp, fgt, NULL);
         return (ret < 0) ? ret : 1;
     } else if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) {
         return 1;
@@ -2440,7 +2371,7 @@  static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
         fd->frame_rate_filter = ofp->fps.framerate;
     }
 
-    ret = fg_output_frame(ofp, fgt, frame, buffer);
+    ret = fg_output_frame(ofp, fgt, frame);
     av_frame_unref(frame);
     if (ret < 0)
         return ret;
@@ -2448,44 +2379,68 @@  static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
     return 0;
 }
 
-/* retrieve all frames available at filtergraph outputs and either send them to
- * the main thread (buffer=0) or buffer them for later (buffer=1) */
+/* retrieve all frames available at filtergraph outputs
+ * and send them to consumers */
 static int read_frames(FilterGraph *fg, FilterGraphThread *fgt,
-                       AVFrame *frame, int buffer)
+                       AVFrame *frame)
 {
     FilterGraphPriv *fgp = fgp_from_fg(fg);
-    int ret = 0;
+    int did_step = 0;
 
-    if (!fg->graph)
-        return 0;
-
-    // process buffered frames
-    if (!buffer) {
-        AVFrame *f;
-
-        while (av_fifo_read(fgt->frame_queue_out, &f, 1) >= 0) {
-            int out_idx = (intptr_t)f->opaque;
-            f->opaque = NULL;
-            ret = tq_send(fgp->queue_out, out_idx, f);
-            av_frame_free(&f);
-            if (ret < 0 && ret != AVERROR_EOF)
-                return ret;
+    // graph not configured, just select the input to request
+    if (!fg->graph) {
+        for (int i = 0; i < fg->nb_inputs; i++) {
+            InputFilterPriv *ifp = ifp_from_ifilter(fg->inputs[i]);
+            if (ifp->format < 0 && !fgt->eof_in[i]) {
+                fgt->next_in = i;
+                return 0;
+            }
         }
+
+        // This state - graph is not configured, but all inputs are either
+        // initialized or EOF - should be unreachable because sending EOF to a
+        // filter without even a fallback format should fail
+        av_assert0(0);
+        return AVERROR_BUG;
     }
 
-    /* Reap all buffers present in the buffer sinks */
-    for (int i = 0; i < fg->nb_outputs; i++) {
-        OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]);
-        int ret = 0;
+    while (1) {
+        int ret;
 
-        while (!ret) {
-            ret = fg_output_step(ofp, fgt, frame, buffer);
-            if (ret < 0)
-                return ret;
+        ret = avfilter_graph_request_oldest(fg->graph);
+        if (ret == AVERROR(EAGAIN)) {
+            fgt->next_in = choose_input(fg, fgt);
+            break;
+        } else if (ret < 0) {
+            if (ret == AVERROR_EOF)
+                av_log(fg, AV_LOG_VERBOSE, "Filtergraph returned EOF, finishing\n");
+            else
+                av_log(fg, AV_LOG_ERROR,
+                       "Error requesting a frame from the filtergraph: %s\n",
+                       av_err2str(ret));
+            return ret;
         }
-    }
+        fgt->next_in = fg->nb_inputs;
 
-    return 0;
+        // return after one iteration, so that scheduler can rate-control us
+        if (did_step && fgp->have_sources)
+            return 0;
+
+        /* Reap all buffers present in the buffer sinks */
+        for (int i = 0; i < fg->nb_outputs; i++) {
+            OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]);
+
+            ret = 0;
+            while (!ret) {
+                ret = fg_output_step(ofp, fgt, frame);
+                if (ret < 0)
+                    return ret;
+            }
+        }
+        did_step = 1;
+    };
+
+    return (fgp->nb_outputs_done == fg->nb_outputs) ? AVERROR_EOF : 0;
 }
 
 static void sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb)
@@ -2561,6 +2516,9 @@  static int send_eof(FilterGraphThread *fgt, InputFilter *ifilter,
     InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
     int ret;
 
+    if (fgt->eof_in[ifp->index])
+       return 0;
+
     fgt->eof_in[ifp->index] = 1;
 
     if (ifp->filter) {
@@ -2662,7 +2620,7 @@  static int send_frame(FilterGraph *fg, FilterGraphThread *fgt,
             return ret;
         }
 
-        ret = fg->graph ? read_frames(fg, fgt, tmp, 1) : 0;
+        ret = fg->graph ? read_frames(fg, fgt, tmp) : 0;
         av_frame_free(&tmp);
         if (ret < 0)
             return ret;
@@ -2695,80 +2653,6 @@  static int send_frame(FilterGraph *fg, FilterGraphThread *fgt,
     return 0;
 }
 
-static int msg_process(FilterGraphPriv *fgp, FilterGraphThread *fgt,
-                       AVFrame *frame)
-{
-    const enum FrameOpaque msg = (intptr_t)frame->opaque;
-    FilterGraph            *fg = &fgp->fg;
-    int              graph_eof = 0;
-    int ret;
-
-    frame->opaque = NULL;
-    av_assert0(msg > 0);
-    av_assert0(msg == FRAME_OPAQUE_SEND_COMMAND || !frame->buf[0]);
-
-    if (!fg->graph) {
-        // graph not configured yet, ignore all messages other than choosing
-        // the input to read from
-        if (msg != FRAME_OPAQUE_CHOOSE_INPUT)
-            goto done;
-
-        for (int i = 0; i < fg->nb_inputs; i++) {
-            InputFilter *ifilter = fg->inputs[i];
-            InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
-            if (ifp->format < 0 && !fgt->eof_in[i]) {
-                frame->opaque = (void*)(intptr_t)(i + 1);
-                goto done;
-            }
-        }
-
-        // This state - graph is not configured, but all inputs are either
-        // initialized or EOF - should be unreachable because sending EOF to a
-        // filter without even a fallback format should fail
-        av_assert0(0);
-        return AVERROR_BUG;
-    }
-
-    if (msg == FRAME_OPAQUE_SEND_COMMAND) {
-        FilterCommand *fc = (FilterCommand*)frame->buf[0]->data;
-        send_command(fg, fc->time, fc->target, fc->command, fc->arg, fc->all_filters);
-        av_frame_unref(frame);
-        goto done;
-    }
-
-    if (msg == FRAME_OPAQUE_CHOOSE_INPUT) {
-        ret = avfilter_graph_request_oldest(fg->graph);
-
-        graph_eof = ret == AVERROR_EOF;
-
-        if (ret == AVERROR(EAGAIN)) {
-            frame->opaque = (void*)(intptr_t)(choose_input(fg, fgt) + 1);
-            goto done;
-        } else if (ret < 0 && !graph_eof)
-            return ret;
-    }
-
-    ret = read_frames(fg, fgt, frame, 0);
-    if (ret < 0) {
-        av_log(fg, AV_LOG_ERROR, "Error sending filtered frames for encoding\n");
-        return ret;
-    }
-
-    if (graph_eof)
-        return AVERROR_EOF;
-
-    // signal to the main thread that we are done processing the message
-done:
-    ret = tq_send(fgp->queue_out, fg->nb_outputs, frame);
-    if (ret < 0) {
-        if (ret != AVERROR_EOF)
-            av_log(fg, AV_LOG_ERROR, "Error communicating with the main thread\n");
-        return ret;
-    }
-
-    return 0;
-}
-
 static void fg_thread_set_name(const FilterGraph *fg)
 {
     char name[16];
@@ -2855,294 +2739,94 @@  static void *filter_thread(void *arg)
         InputFilter *ifilter;
         InputFilterPriv *ifp;
         enum FrameOpaque o;
-        int input_idx, eof_frame;
+        unsigned input_idx = fgt.next_in;
 
-        input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame);
-        if (input_idx < 0 ||
-            (input_idx == fg->nb_inputs && input_status < 0)) {
+        input_status = sch_filter_receive(fgp->sch, fgp->sch_idx,
+                                          &input_idx, fgt.frame);
+        if (input_status == AVERROR_EOF) {
             av_log(fg, AV_LOG_VERBOSE, "Filtering thread received EOF\n");
             break;
+        } else if (input_status == AVERROR(EAGAIN)) {
+            // should only happen when we didn't request any input
+            av_assert0(input_idx == fg->nb_inputs);
+            goto read_frames;
         }
+        av_assert0(input_status >= 0);
+
+        o = (intptr_t)fgt.frame->opaque;
 
         o = (intptr_t)fgt.frame->opaque;
 
         // message on the control stream
         if (input_idx == fg->nb_inputs) {
-            ret = msg_process(fgp, &fgt, fgt.frame);
-            if (ret < 0)
-                goto finish;
+            FilterCommand *fc;
 
+            av_assert0(o == FRAME_OPAQUE_SEND_COMMAND && fgt.frame->buf[0]);
+
+            fc = (FilterCommand*)fgt.frame->buf[0]->data;
+            send_command(fg, fc->time, fc->target, fc->command, fc->arg,
+                         fc->all_filters);
+            av_frame_unref(fgt.frame);
             continue;
         }
 
         // we received an input frame or EOF
         ifilter   = fg->inputs[input_idx];
         ifp       = ifp_from_ifilter(ifilter);
-        eof_frame = input_status >= 0 && o == FRAME_OPAQUE_EOF;
+
         if (ifp->type_src == AVMEDIA_TYPE_SUBTITLE) {
             int hb_frame = input_status >= 0 && o == FRAME_OPAQUE_SUB_HEARTBEAT;
             ret = sub2video_frame(ifilter, (fgt.frame->buf[0] || hb_frame) ? fgt.frame : NULL);
-        } else if (input_status >= 0 && fgt.frame->buf[0]) {
+        } else if (fgt.frame->buf[0]) {
             ret = send_frame(fg, &fgt, ifilter, fgt.frame);
         } else {
-            int64_t   pts = input_status >= 0 ? fgt.frame->pts : AV_NOPTS_VALUE;
-            AVRational tb = input_status >= 0 ? fgt.frame->time_base : (AVRational){ 1, 1 };
-            ret = send_eof(&fgt, ifilter, pts, tb);
+            av_assert1(o == FRAME_OPAQUE_EOF);
+            ret = send_eof(&fgt, ifilter, fgt.frame->pts, fgt.frame->time_base);
         }
         av_frame_unref(fgt.frame);
         if (ret < 0)
+            goto finish;
+
+read_frames:
+        // retrieve all newly avalable frames
+        ret = read_frames(fg, &fgt, fgt.frame);
+        if (ret == AVERROR_EOF) {
+            av_log(fg, AV_LOG_VERBOSE, "All consumers returned EOF\n");
             break;
-
-        if (eof_frame) {
-            // an EOF frame is immediately followed by sender closing
-            // the corresponding stream, so retrieve that event
-            input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame);
-            av_assert0(input_status == AVERROR_EOF && input_idx == ifp->index);
-        }
-
-        // signal to the main thread that we are done
-        ret = tq_send(fgp->queue_out, fg->nb_outputs, fgt.frame);
-        if (ret < 0) {
-            if (ret == AVERROR_EOF)
-                break;
-
-            av_log(fg, AV_LOG_ERROR, "Error communicating with the main thread\n");
+        } else if (ret < 0) {
+            av_log(fg, AV_LOG_ERROR, "Error sending frames to consumers: %s\n",
+                   av_err2str(ret));
             goto finish;
         }
     }
 
+    for (unsigned i = 0; i < fg->nb_outputs; i++) {
+        OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]);
+
+        if (fgt.eof_out[i])
+            continue;
+
+        ret = fg_output_frame(ofp, &fgt, NULL);
+        if (ret < 0)
+            goto finish;
+    }
+
 finish:
     // EOF is normal termination
     if (ret == AVERROR_EOF)
         ret = 0;
 
-    for (int i = 0; i <= fg->nb_inputs; i++)
-        tq_receive_finish(fgp->queue_in, i);
-    for (int i = 0; i <= fg->nb_outputs; i++)
-        tq_send_finish(fgp->queue_out, i);
-
     fg_thread_uninit(&fgt);
 
-    av_log(fg, AV_LOG_VERBOSE, "Terminating filtering thread\n");
-
     return (void*)(intptr_t)ret;
 }
 
-static int thread_send_frame(FilterGraphPriv *fgp, InputFilter *ifilter,
-                             AVFrame *frame, enum FrameOpaque type)
-{
-    InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
-    int output_idx, ret;
-
-    if (ifp->eof) {
-        av_frame_unref(frame);
-        return AVERROR_EOF;
-    }
-
-    frame->opaque = (void*)(intptr_t)type;
-
-    ret = tq_send(fgp->queue_in, ifp->index, frame);
-    if (ret < 0) {
-        ifp->eof = 1;
-        av_frame_unref(frame);
-        return ret;
-    }
-
-    if (type == FRAME_OPAQUE_EOF)
-        tq_send_finish(fgp->queue_in, ifp->index);
-
-    // wait for the frame to be processed
-    ret = tq_receive(fgp->queue_out, &output_idx, frame);
-    av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF);
-
-    return ret;
-}
-
-int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference)
-{
-    FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
-    int ret;
-
-    if (keep_reference) {
-        ret = av_frame_ref(fgp->frame, frame);
-        if (ret < 0)
-            return ret;
-    } else
-        av_frame_move_ref(fgp->frame, frame);
-
-    return thread_send_frame(fgp, ifilter, fgp->frame, 0);
-}
-
-int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb)
-{
-    FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
-    int ret;
-
-    fgp->frame->pts       = pts;
-    fgp->frame->time_base = tb;
-
-    ret = thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_EOF);
-
-    return ret == AVERROR_EOF ? 0 : ret;
-}
-
-void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb)
-{
-    FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
-
-    fgp->frame->pts       = pts;
-    fgp->frame->time_base = tb;
-
-    thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_SUB_HEARTBEAT);
-}
-
-int fg_transcode_step(FilterGraph *graph, InputStream **best_ist)
-{
-    FilterGraphPriv *fgp = fgp_from_fg(graph);
-    int ret, got_frames = 0;
-
-    if (fgp->eof_in)
-        return AVERROR_EOF;
-
-    // signal to the filtering thread to return all frames it can
-    av_assert0(!fgp->frame->buf[0]);
-    fgp->frame->opaque = (void*)(intptr_t)(best_ist             ?
-                                           FRAME_OPAQUE_CHOOSE_INPUT :
-                                           FRAME_OPAQUE_REAP_FILTERS);
-
-    ret = tq_send(fgp->queue_in, graph->nb_inputs, fgp->frame);
-    if (ret < 0) {
-        fgp->eof_in = 1;
-        goto finish;
-    }
-
-    while (1) {
-        OutputFilter *ofilter;
-        OutputFilterPriv *ofp;
-        OutputStream *ost;
-        int output_idx;
-
-        ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame);
-
-        // EOF on the whole queue or the control stream
-        if (output_idx < 0 ||
-            (ret < 0 && output_idx == graph->nb_outputs))
-            goto finish;
-
-        // EOF for a specific stream
-        if (ret < 0) {
-            ofilter = graph->outputs[output_idx];
-            ofp     = ofp_from_ofilter(ofilter);
-
-            // we are finished and no frames were ever seen at this output,
-            // at least initialize the encoder with a dummy frame
-            if (!ofp->got_frame) {
-                AVFrame *frame = fgp->frame;
-                FrameData *fd;
-
-                frame->time_base   = ofp->tb_out;
-                frame->format      = ofp->format;
-
-                frame->width               = ofp->width;
-                frame->height              = ofp->height;
-                frame->sample_aspect_ratio = ofp->sample_aspect_ratio;
-
-                frame->sample_rate = ofp->sample_rate;
-                if (ofp->ch_layout.nb_channels) {
-                    ret = av_channel_layout_copy(&frame->ch_layout, &ofp->ch_layout);
-                    if (ret < 0)
-                        return ret;
-                }
-
-                fd = frame_data(frame);
-                if (!fd)
-                    return AVERROR(ENOMEM);
-
-                fd->frame_rate_filter = ofp->fps.framerate;
-
-                av_assert0(!frame->buf[0]);
-
-                av_log(ofilter->ost, AV_LOG_WARNING,
-                       "No filtered frames for output stream, trying to "
-                       "initialize anyway.\n");
-
-                enc_open(ofilter->ost, frame);
-                av_frame_unref(frame);
-            }
-
-            close_output_stream(graph->outputs[output_idx]->ost);
-            continue;
-        }
-
-        // request was fully processed by the filtering thread,
-        // return the input stream to read from, if needed
-        if (output_idx == graph->nb_outputs) {
-            int input_idx = (intptr_t)fgp->frame->opaque - 1;
-            av_assert0(input_idx <= graph->nb_inputs);
-
-            if (best_ist) {
-                *best_ist = (input_idx >= 0 && input_idx < graph->nb_inputs) ?
-                            ifp_from_ifilter(graph->inputs[input_idx])->ist : NULL;
-
-                if (input_idx < 0 && !got_frames) {
-                    for (int i = 0; i < graph->nb_outputs; i++)
-                        graph->outputs[i]->ost->unavailable = 1;
-                }
-            }
-            break;
-        }
-
-        // got a frame from the filtering thread, send it for encoding
-        ofilter = graph->outputs[output_idx];
-        ost     = ofilter->ost;
-        ofp     = ofp_from_ofilter(ofilter);
-
-        if (ost->finished) {
-            av_frame_unref(fgp->frame);
-            tq_receive_finish(fgp->queue_out, output_idx);
-            continue;
-        }
-
-        if (fgp->frame->pts != AV_NOPTS_VALUE) {
-            ofilter->last_pts = av_rescale_q(fgp->frame->pts,
-                                             fgp->frame->time_base,
-                                             AV_TIME_BASE_Q);
-        }
-
-        ret = enc_frame(ost, fgp->frame);
-        av_frame_unref(fgp->frame);
-        if (ret < 0)
-            goto finish;
-
-        ofp->got_frame = 1;
-        got_frames     = 1;
-    }
-
-finish:
-    if (ret < 0) {
-        fgp->eof_in = 1;
-        for (int i = 0; i < graph->nb_outputs; i++)
-            close_output_stream(graph->outputs[i]->ost);
-    }
-
-    return ret;
-}
-
-int reap_filters(FilterGraph *fg, int flush)
-{
-    return fg_transcode_step(fg, NULL);
-}
-
 void fg_send_command(FilterGraph *fg, double time, const char *target,
                      const char *command, const char *arg, int all_filters)
 {
     FilterGraphPriv *fgp = fgp_from_fg(fg);
     AVBufferRef *buf;
     FilterCommand *fc;
-    int output_idx, ret;
-
-    if (!fgp->queue_in)
-        return;
 
     fc = av_mallocz(sizeof(*fc));
     if (!fc)
@@ -3168,13 +2852,5 @@  void fg_send_command(FilterGraph *fg, double time, const char *target,
     fgp->frame->buf[0] = buf;
     fgp->frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_SEND_COMMAND;
 
-    ret = tq_send(fgp->queue_in, fg->nb_inputs + 1, fgp->frame);
-    if (ret < 0) {
-        av_frame_unref(fgp->frame);
-        return;
-    }
-
-    // wait for the frame to be processed
-    ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame);
-    av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF);
+    sch_filter_command(fgp->sch, fgp->sch_idx, fgp->frame);
 }