diff mbox series

[FFmpeg-devel,24/27] WIP fftools/ffmpeg_filter: convert to the scheduler

Message ID 20230919191044.18873-25-anton@khirnov.net
State New
Headers show
Series [FFmpeg-devel,01/27] fftools/ffmpeg: move derivation of frame duration from filter framerate | expand

Commit Message

Anton Khirnov Sept. 19, 2023, 7:10 p.m. UTC
---
 fftools/ffmpeg.h        |  22 --
 fftools/ffmpeg_filter.c | 533 ++++++++++++----------------------------
 2 files changed, 152 insertions(+), 403 deletions(-)
diff mbox series

Patch

diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index 841f8d0d68..eb4e8e27a9 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -312,9 +312,6 @@  typedef struct OutputFilter {
 
     enum AVMediaType     type;
 
-    /* pts of the last frame received from this filter, in AV_TIME_BASE_Q */
-    int64_t last_pts;
-
     uint64_t nb_frames_dup;
     uint64_t nb_frames_drop;
 } OutputFilter;
@@ -745,8 +742,6 @@  int subtitle_wrap_frame(AVFrame *frame, AVSubtitle *subtitle, int copy);
  */
 FrameData *frame_data(AVFrame *frame);
 
-int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference);
-int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb);
 void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb);
 
 /**
@@ -769,26 +764,9 @@  int fg_create(FilterGraph **pfg, char *graph_desc, Scheduler *sch);
 
 void fg_free(FilterGraph **pfg);
 
-/**
- * Perform a step of transcoding for the specified filter graph.
- *
- * @param[in]  graph     filter graph to consider
- * @param[out] best_ist  input stream where a frame would allow to continue
- * @return  0 for success, <0 for error
- */
-int fg_transcode_step(FilterGraph *graph, InputStream **best_ist);
-
 void fg_send_command(FilterGraph *fg, double time, const char *target,
                      const char *command, const char *arg, int all_filters);
 
-/**
- * Get and encode new output from specified filtergraph, without causing
- * activity.
- *
- * @return  0 for success, <0 for severe errors
- */
-int reap_filters(FilterGraph *fg, int flush);
-
 int ffmpeg_parse_options(int argc, char **argv, Scheduler *sch);
 
 void enc_stats_write(OutputStream *ost, EncStats *es,
diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c
index e8e78f5454..c588dde452 100644
--- a/fftools/ffmpeg_filter.c
+++ b/fftools/ffmpeg_filter.c
@@ -21,7 +21,6 @@ 
 #include <stdint.h>
 
 #include "ffmpeg.h"
-#include "thread_queue.h"
 
 #include "libavfilter/avfilter.h"
 #include "libavfilter/buffersink.h"
@@ -46,7 +45,6 @@  enum FrameOpaque {
     FRAME_OPAQUE_REAP_FILTERS = 1,
     FRAME_OPAQUE_CHOOSE_INPUT,
     FRAME_OPAQUE_SUB_HEARTBEAT,
-    FRAME_OPAQUE_EOF,
     FRAME_OPAQUE_SEND_COMMAND,
 };
 
@@ -62,8 +60,7 @@  typedef struct FilterGraphPriv {
     int is_meta;
     int disable_conversions;
 
-    int nb_inputs_bound;
-    int nb_outputs_bound;
+    unsigned nb_outputs_done;
 
     const char *graph_desc;
 
@@ -75,14 +72,10 @@  typedef struct FilterGraphPriv {
     Scheduler       *sch;
     unsigned         sch_idx;
 
-    pthread_t        thread;
     /**
      * Queue for sending frames from the main thread to the filtergraph. Has
      * nb_inputs+1 streams - the first nb_inputs stream correspond to
      * filtergraph inputs. Frames on those streams may have their opaque set to
-     * - FRAME_OPAQUE_EOF: frame contains no data, but pts+timebase of the
-     *   EOF event for the correspondint stream. Will be immediately followed by
-     *   this stream being send-closed.
      * - FRAME_OPAQUE_SUB_HEARTBEAT: frame contains no data, but pts+timebase of
      *   a subtitle heartbeat event. Will only be sent for sub2video streams.
      *
@@ -96,7 +89,7 @@  typedef struct FilterGraphPriv {
      *   available the terminating empty frame's opaque will contain the index+1
      *   of the filtergraph input to which more input frames should be supplied.
      */
-    ThreadQueue     *queue_in;
+
     /**
      * Queue for sending frames from the filtergraph back to the main thread.
      * Has nb_outputs+1 streams - the first nb_outputs stream correspond to
@@ -105,7 +98,7 @@  typedef struct FilterGraphPriv {
      * The last stream is "control" - see documentation for queue_in for more
      * details.
      */
-    ThreadQueue     *queue_out;
+    //ThreadQueue     *queue_out;
     // submitting frames to filter thread returned EOF
     // this only happens on thread exit, so is not per-input
     int              eof_in;
@@ -130,6 +123,7 @@  typedef struct FilterGraphThread {
     // The output index is stored in frame opaque.
     AVFifo  *frame_queue_out;
 
+    // set to 1 after at least one frame passed through this output
     int      got_frame;
 
     // EOF status of each input/output, as received by the thread
@@ -260,9 +254,6 @@  typedef struct OutputFilterPriv {
     int64_t ts_offset;
     int64_t next_pts;
     FPSConvContext fps;
-
-    // set to 1 after at least one frame passed through this output
-    int got_frame;
 } OutputFilterPriv;
 
 static OutputFilterPriv *ofp_from_ofilter(OutputFilter *ofilter)
@@ -660,57 +651,6 @@  static int ifilter_has_all_input_formats(FilterGraph *fg)
 
 static void *filter_thread(void *arg);
 
-// start the filtering thread once all inputs and outputs are bound
-static int fg_thread_try_start(FilterGraphPriv *fgp)
-{
-    FilterGraph *fg = &fgp->fg;
-    ObjPool *op;
-    int ret = 0;
-
-    if (fgp->nb_inputs_bound  < fg->nb_inputs ||
-        fgp->nb_outputs_bound < fg->nb_outputs)
-        return 0;
-
-    op = objpool_alloc_frames();
-    if (!op)
-        return AVERROR(ENOMEM);
-
-    fgp->queue_in = tq_alloc(fg->nb_inputs + 1, 1, op, frame_move);
-    if (!fgp->queue_in) {
-        objpool_free(&op);
-        return AVERROR(ENOMEM);
-    }
-
-    // at least one output is mandatory
-    op = objpool_alloc_frames();
-    if (!op)
-        goto fail;
-
-    fgp->queue_out = tq_alloc(fg->nb_outputs + 1, 1, op, frame_move);
-    if (!fgp->queue_out) {
-        objpool_free(&op);
-        goto fail;
-    }
-
-    ret = pthread_create(&fgp->thread, NULL, filter_thread, fgp);
-    if (ret) {
-        ret = AVERROR(ret);
-        av_log(NULL, AV_LOG_ERROR, "pthread_create() for filtergraph %d failed: %s\n",
-               fg->index, av_err2str(ret));
-        goto fail;
-    }
-
-    return 0;
-fail:
-    if (ret >= 0)
-        ret = AVERROR(ENOMEM);
-
-    tq_free(&fgp->queue_in);
-    tq_free(&fgp->queue_out);
-
-    return ret;
-}
-
 static char *describe_filter_link(FilterGraph *fg, AVFilterInOut *inout, int in)
 {
     AVFilterContext *ctx = inout->filter_ctx;
@@ -736,7 +676,6 @@  static OutputFilter *ofilter_alloc(FilterGraph *fg)
     ofilter->graph    = fg;
     ofp->format       = -1;
     ofp->index        = fg->nb_outputs - 1;
-    ofilter->last_pts = AV_NOPTS_VALUE;
 
     return ofilter;
 }
@@ -768,10 +707,7 @@  static int ifilter_bind_ist(InputFilter *ifilter, InputStream *ist)
             return AVERROR(ENOMEM);
     }
 
-    fgp->nb_inputs_bound++;
-    av_assert0(fgp->nb_inputs_bound <= ifilter->graph->nb_inputs);
-
-    return fg_thread_try_start(fgp);
+    return 0;
 }
 
 static int set_channel_layout(OutputFilterPriv *f, OutputStream *ost)
@@ -910,10 +846,7 @@  int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost,
     if (ret < 0)
         return ret;
 
-    fgp->nb_outputs_bound++;
-    av_assert0(fgp->nb_outputs_bound <= fg->nb_outputs);
-
-    return fg_thread_try_start(fgp);
+    return 0;
 }
 
 static InputFilter *ifilter_alloc(FilterGraph *fg)
@@ -943,34 +876,6 @@  static InputFilter *ifilter_alloc(FilterGraph *fg)
     return ifilter;
 }
 
-static int fg_thread_stop(FilterGraphPriv *fgp)
-{
-    void *ret;
-
-    if (!fgp->queue_in)
-        return 0;
-
-    for (int i = 0; i <= fgp->fg.nb_inputs; i++) {
-        InputFilterPriv *ifp = i < fgp->fg.nb_inputs ?
-                               ifp_from_ifilter(fgp->fg.inputs[i]) : NULL;
-
-        if (ifp)
-            ifp->eof = 1;
-
-        tq_send_finish(fgp->queue_in, i);
-    }
-
-    for (int i = 0; i <= fgp->fg.nb_outputs; i++)
-        tq_receive_finish(fgp->queue_out, i);
-
-    pthread_join(fgp->thread, &ret);
-
-    tq_free(&fgp->queue_in);
-    tq_free(&fgp->queue_out);
-
-    return (int)(intptr_t)ret;
-}
-
 void fg_free(FilterGraph **pfg)
 {
     FilterGraph *fg = *pfg;
@@ -980,8 +885,6 @@  void fg_free(FilterGraph **pfg)
         return;
     fgp = fgp_from_fg(fg);
 
-    fg_thread_stop(fgp);
-
     avfilter_graph_free(&fg->graph);
     for (int j = 0; j < fg->nb_inputs; j++) {
         InputFilter *ifilter = fg->inputs[j];
@@ -2253,8 +2156,56 @@  finish:
     fps->dropped_keyframe |= fps->last_dropped && (frame->flags & AV_FRAME_FLAG_KEY);
 }
 
+static int close_output(OutputFilterPriv *ofp, FilterGraphThread *fgt)
+{
+    FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph);
+    int ret;
+
+    // we are finished and no frames were ever seen at this output,
+    // at least initialize the encoder with a dummy frame
+    if (!fgt->got_frame) {
+        AVFrame *frame = fgt->frame;
+        FrameData *fd;
+
+        frame->time_base   = ofp->tb_out;
+        frame->format      = ofp->format;
+
+        frame->width               = ofp->width;
+        frame->height              = ofp->height;
+        frame->sample_aspect_ratio = ofp->sample_aspect_ratio;
+
+        frame->sample_rate = ofp->sample_rate;
+        if (ofp->ch_layout.nb_channels) {
+            ret = av_channel_layout_copy(&frame->ch_layout, &ofp->ch_layout);
+            if (ret < 0)
+                return ret;
+        }
+
+        fd = frame_data(frame);
+        if (!fd)
+            return AVERROR(ENOMEM);
+
+        fd->frame_rate_filter = ofp->fps.framerate;
+
+        av_assert0(!frame->buf[0]);
+
+        av_log(ofp->ofilter.ost, AV_LOG_WARNING,
+               "No filtered frames for output stream, trying to "
+               "initialize anyway.\n");
+
+        ret = sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, frame);
+        av_frame_unref(frame);
+        if (ret < 0)
+            return ret;
+    }
+
+    fgt->eof_out[ofp->index] = 1;
+
+    return sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, NULL);
+}
+
 static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt,
-                           AVFrame *frame, int buffer)
+                           AVFrame *frame)
 {
     FilterGraphPriv  *fgp = fgp_from_fg(ofp->ofilter.graph);
     AVFrame   *frame_prev = ofp->fps.last_frame;
@@ -2301,28 +2252,17 @@  static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt,
             frame_out = frame;
         }
 
-        if (buffer) {
-            AVFrame *f = av_frame_alloc();
-
-            if (!f) {
-                av_frame_unref(frame_out);
-                return AVERROR(ENOMEM);
-            }
-
-            av_frame_move_ref(f, frame_out);
-            f->opaque = (void*)(intptr_t)ofp->index;
-
-            ret = av_fifo_write(fgt->frame_queue_out, &f, 1);
-            if (ret < 0) {
-                av_frame_free(&f);
-                return AVERROR(ENOMEM);
-            }
-        } else {
-            // return the frame to the main thread
-            ret = tq_send(fgp->queue_out, ofp->index, frame_out);
+        {
+            // send the frame to consumers
+            ret = sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, frame_out);
             if (ret < 0) {
                 av_frame_unref(frame_out);
-                fgt->eof_out[ofp->index] = 1;
+
+                if (!fgt->eof_out[ofp->index]) {
+                    fgt->eof_out[ofp->index] = 1;
+                    fgp->nb_outputs_done++;
+                }
+
                 return ret == AVERROR_EOF ? 0 : ret;
             }
         }
@@ -2343,16 +2283,14 @@  static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt,
         av_frame_move_ref(frame_prev, frame);
     }
 
-    if (!frame) {
-        tq_send_finish(fgp->queue_out, ofp->index);
-        fgt->eof_out[ofp->index] = 1;
-    }
+    if (!frame)
+        return close_output(ofp, fgt);
 
     return 0;
 }
 
 static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
-                          AVFrame *frame,  int buffer)
+                          AVFrame *frame)
 {
     FilterGraphPriv    *fgp = fgp_from_fg(ofp->ofilter.graph);
     OutputStream       *ost = ofp->ofilter.ost;
@@ -2362,8 +2300,8 @@  static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
 
     ret = av_buffersink_get_frame_flags(filter, frame,
                                         AV_BUFFERSINK_FLAG_NO_REQUEST);
-    if (ret == AVERROR_EOF && !buffer && !fgt->eof_out[ofp->index]) {
-        ret = fg_output_frame(ofp, fgt, NULL, buffer);
+    if (ret == AVERROR_EOF && !fgt->eof_out[ofp->index]) {
+        ret = fg_output_frame(ofp, fgt, NULL);
         return (ret < 0) ? ret : 1;
     } else if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) {
         return 1;
@@ -2417,7 +2355,7 @@  static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
         fd->frame_rate_filter = ofp->fps.framerate;
     }
 
-    ret = fg_output_frame(ofp, fgt, frame, buffer);
+    ret = fg_output_frame(ofp, fgt, frame);
     av_frame_unref(frame);
     if (ret < 0)
         return ret;
@@ -2425,44 +2363,29 @@  static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
     return 0;
 }
 
-/* retrieve all frames available at filtergraph outputs and either send them to
- * the main thread (buffer=0) or buffer them for later (buffer=1) */
+/* retrieve all frames available at filtergraph outputs
+ * and send them to consumers */
 static int read_frames(FilterGraph *fg, FilterGraphThread *fgt,
-                       AVFrame *frame, int buffer)
+                       AVFrame *frame)
 {
     FilterGraphPriv *fgp = fgp_from_fg(fg);
-    int ret = 0;
 
     if (!fg->graph)
         return 0;
 
-    // process buffered frames
-    if (!buffer) {
-        AVFrame *f;
-
-        while (av_fifo_read(fgt->frame_queue_out, &f, 1) >= 0) {
-            int out_idx = (intptr_t)f->opaque;
-            f->opaque = NULL;
-            ret = tq_send(fgp->queue_out, out_idx, f);
-            av_frame_free(&f);
-            if (ret < 0 && ret != AVERROR_EOF)
-                return ret;
-        }
-    }
-
     /* Reap all buffers present in the buffer sinks */
     for (int i = 0; i < fg->nb_outputs; i++) {
         OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]);
         int ret = 0;
 
         while (!ret) {
-            ret = fg_output_step(ofp, fgt, frame, buffer);
+            ret = fg_output_step(ofp, fgt, frame);
             if (ret < 0)
                 return ret;
         }
     }
 
-    return 0;
+    return (fgp->nb_outputs_done == fg->nb_outputs) ? AVERROR_EOF : 0;
 }
 
 static void sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb)
@@ -2536,6 +2459,9 @@  static int send_eof(FilterGraphThread *fgt, InputFilter *ifilter,
     InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
     int ret;
 
+    if (fgt->eof_in[ifp->index])
+       return 0;
+
     fgt->eof_in[ifp->index] = 1;
 
     if (ifp->filter) {
@@ -2637,7 +2563,7 @@  static int send_frame(FilterGraph *fg, FilterGraphThread *fgt,
             return ret;
         }
 
-        ret = read_frames(fg, fgt, tmp, 1);
+        ret = read_frames(fg, fgt, tmp);
         av_frame_free(&tmp);
         if (ret < 0)
             return ret;
@@ -2674,6 +2600,29 @@  static int choose_input(const FilterGraph *fg, const FilterGraphThread *fgt)
 {
     int nb_requests, nb_requests_max = 0;
     int best_input = -1;
+    int ret;
+
+    if (!fg->graph) {
+        for (int i = 0; i < fg->nb_inputs; i++) {
+            InputFilterPriv *ifp = ifp_from_ifilter(fg->inputs[i]);
+            if (ifp->format < 0 && !fgt->eof_in[i])
+                return i;
+        }
+
+        // This state - graph is not configured, but all inputs are either
+        // initialized or EOF - should be unreachable because sending EOF to a
+        // filter without even a fallback format should fail
+        av_assert0(0);
+        return AVERROR_BUG;
+    }
+
+    ret = avfilter_graph_request_oldest(fg->graph);
+    if (ret < 0 && ret != AVERROR(EAGAIN))
+        return ret;
+
+    // we drain all frames from the filtergraph after each input frame, so
+    // we should never get success here
+    av_assert0(ret < 0);
 
     for (int i = 0; i < fg->nb_inputs; i++) {
         InputFilter *ifilter = fg->inputs[i];
@@ -2690,9 +2639,12 @@  static int choose_input(const FilterGraph *fg, const FilterGraphThread *fgt)
         }
     }
 
+    av_assert0(best_input >= 0);
+
     return best_input;
 }
 
+#if 0
 static int msg_process(FilterGraphPriv *fgp, FilterGraphThread *fgt,
                        AVFrame *frame)
 {
@@ -2766,6 +2718,7 @@  done:
 
     return 0;
 }
+#endif
 
 static void fg_thread_set_name(const FilterGraph *fg)
 {
@@ -2852,28 +2805,53 @@  static void *filter_thread(void *arg)
     while (1) {
         InputFilter *ifilter;
         InputFilterPriv *ifp;
-        int input_idx, eof_frame;
+        int input_idx;
 
-        input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame);
-        if (input_idx < 0 ||
-            (input_idx == fg->nb_inputs && input_status < 0)) {
+        // XXX
+        if (!fg->nb_inputs) {
+            abort();
+            goto no_inputs;
+        }
+
+        input_idx = choose_input(fg, &fgt);
+        // XXX
+        if (input_idx < 0)
+            goto finish;
+
+        input_status = sch_filter_receive(fgp->sch, fgp->sch_idx,
+                                          &input_idx, fgt.frame);
+        if (input_idx < 0) {
             av_log(fg, AV_LOG_VERBOSE, "Filtering thread received EOF\n");
             break;
         }
 
+        // XXX
+#if 0
         // message on the control stream
         if (input_idx == fg->nb_inputs) {
             ret = msg_process(fgp, &fgt, fgt.frame);
             if (ret < 0)
                 goto finish;
 
+#elif 0
+            enum FrameOpaque msg = (intptr_t)fgt.frame->opaque;
+
+            fgt.frame->opaque = NULL;
+            av_assert0(msg > 0);
+            av_assert0(msg == FRAME_OPAQUE_SEND_COMMAND);
+
+            {
+                FilterCommand *fc = (FilterCommand*)frame->buf[0]->data;
+                send_command(fg, fc->time, fc->target, fc->command, fc->arg, fc->all_filters);
+                av_frame_unref(frame);
+            }
+
             continue;
         }
+#endif
 
-        // we received an input frame or EOF
         ifilter   = fg->inputs[input_idx];
         ifp       = ifp_from_ifilter(ifilter);
-        eof_frame = input_status >= 0 && (intptr_t)fgt.frame->opaque == FRAME_OPAQUE_EOF;
         if (ifp->type_src == AVMEDIA_TYPE_SUBTITLE) {
             if (input_status >= 0 && (intptr_t)fgt.frame->opaque == FRAME_OPAQUE_SUB_HEARTBEAT)
                 sub2video_heartbeat(ifilter, fgt.frame->pts, fgt.frame->time_base);
@@ -2890,20 +2868,17 @@  static void *filter_thread(void *arg)
         if (ret < 0)
             break;
 
-        if (eof_frame) {
-            // an EOF frame is immediately followed by sender closing
-            // the corresponding stream, so retrieve that event
-            input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame);
-            av_assert0(input_status == AVERROR_EOF && input_idx == ifp->index);
-        }
-
-        // signal to the main thread that we are done
-        ret = tq_send(fgp->queue_out, fg->nb_outputs, fgt.frame);
+no_inputs:
+        // retrieve all newly avalable frames
+        // XXX: handle filtergraphs connected to an unlimited source
+        ret = read_frames(fg, &fgt, fgt.frame);
         if (ret < 0) {
             if (ret == AVERROR_EOF)
-                break;
+                av_log(fg, AV_LOG_VERBOSE, "All consumers returned EOF\n");
+            else
+                av_log(fg, AV_LOG_ERROR, "Error sending frames to consumers: %s\n",
+                       av_err2str(ret));
 
-            av_log(fg, AV_LOG_ERROR, "Error communicating with the main thread\n");
             goto finish;
         }
     }
@@ -2913,10 +2888,9 @@  finish:
     if (ret == AVERROR_EOF)
         ret = 0;
 
-    for (int i = 0; i <= fg->nb_inputs; i++)
-        tq_receive_finish(fgp->queue_in, i);
-    for (int i = 0; i <= fg->nb_outputs; i++)
-        tq_send_finish(fgp->queue_out, i);
+    // XXX close_output() here?
+
+    sch_filter_send(fgp->sch, fgp->sch_idx, -1, NULL);
 
     fg_thread_uninit(&fgt);
 
@@ -2925,64 +2899,6 @@  finish:
     return (void*)(intptr_t)ret;
 }
 
-static int thread_send_frame(FilterGraphPriv *fgp, InputFilter *ifilter,
-                             AVFrame *frame, enum FrameOpaque type)
-{
-    InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
-    int output_idx, ret;
-
-    if (ifp->eof) {
-        av_frame_unref(frame);
-        return AVERROR_EOF;
-    }
-
-    frame->opaque = (void*)(intptr_t)type;
-
-    ret = tq_send(fgp->queue_in, ifp->index, frame);
-    if (ret < 0) {
-        ifp->eof = 1;
-        av_frame_unref(frame);
-        return ret;
-    }
-
-    if (type == FRAME_OPAQUE_EOF)
-        tq_send_finish(fgp->queue_in, ifp->index);
-
-    // wait for the frame to be processed
-    ret = tq_receive(fgp->queue_out, &output_idx, frame);
-    av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF);
-
-    return ret;
-}
-
-int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference)
-{
-    FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
-    int ret;
-
-    if (keep_reference) {
-        ret = av_frame_ref(fgp->frame, frame);
-        if (ret < 0)
-            return ret;
-    } else
-        av_frame_move_ref(fgp->frame, frame);
-
-    return thread_send_frame(fgp, ifilter, fgp->frame, 0);
-}
-
-int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb)
-{
-    FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
-    int ret;
-
-    fgp->frame->pts       = pts;
-    fgp->frame->time_base = tb;
-
-    ret = thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_EOF);
-
-    return ret == AVERROR_EOF ? 0 : ret;
-}
-
 void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb)
 {
     FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
@@ -2990,144 +2906,10 @@  void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational t
     fgp->frame->pts       = pts;
     fgp->frame->time_base = tb;
 
+    // XXX
+#if 0
     thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_SUB_HEARTBEAT);
-}
-
-int fg_transcode_step(FilterGraph *graph, InputStream **best_ist)
-{
-    FilterGraphPriv *fgp = fgp_from_fg(graph);
-    int ret, got_frames = 0;
-
-    if (fgp->eof_in)
-        return AVERROR_EOF;
-
-    // signal to the filtering thread to return all frames it can
-    av_assert0(!fgp->frame->buf[0]);
-    fgp->frame->opaque = (void*)(intptr_t)(best_ist             ?
-                                           FRAME_OPAQUE_CHOOSE_INPUT :
-                                           FRAME_OPAQUE_REAP_FILTERS);
-
-    ret = tq_send(fgp->queue_in, graph->nb_inputs, fgp->frame);
-    if (ret < 0) {
-        fgp->eof_in = 1;
-        goto finish;
-    }
-
-    while (1) {
-        OutputFilter *ofilter;
-        OutputFilterPriv *ofp;
-        OutputStream *ost;
-        int output_idx;
-
-        ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame);
-
-        // EOF on the whole queue or the control stream
-        if (output_idx < 0 ||
-            (ret < 0 && output_idx == graph->nb_outputs))
-            goto finish;
-
-        // EOF for a specific stream
-        if (ret < 0) {
-            ofilter = graph->outputs[output_idx];
-            ofp     = ofp_from_ofilter(ofilter);
-
-            // we are finished and no frames were ever seen at this output,
-            // at least initialize the encoder with a dummy frame
-            if (!ofp->got_frame) {
-                AVFrame *frame = fgp->frame;
-                FrameData *fd;
-
-                frame->time_base   = ofp->tb_out;
-                frame->format      = ofp->format;
-
-                frame->width               = ofp->width;
-                frame->height              = ofp->height;
-                frame->sample_aspect_ratio = ofp->sample_aspect_ratio;
-
-                frame->sample_rate = ofp->sample_rate;
-                if (ofp->ch_layout.nb_channels) {
-                    ret = av_channel_layout_copy(&frame->ch_layout, &ofp->ch_layout);
-                    if (ret < 0)
-                        return ret;
-                }
-
-                fd = frame_data(frame);
-                if (!fd)
-                    return AVERROR(ENOMEM);
-
-                fd->frame_rate_filter = ofp->fps.framerate;
-
-                av_assert0(!frame->buf[0]);
-
-                av_log(ofilter->ost, AV_LOG_WARNING,
-                       "No filtered frames for output stream, trying to "
-                       "initialize anyway.\n");
-
-                enc_open(ofilter->ost, frame);
-                av_frame_unref(frame);
-            }
-
-            close_output_stream(graph->outputs[output_idx]->ost);
-            continue;
-        }
-
-        // request was fully processed by the filtering thread,
-        // return the input stream to read from, if needed
-        if (output_idx == graph->nb_outputs) {
-            int input_idx = (intptr_t)fgp->frame->opaque - 1;
-            av_assert0(input_idx <= graph->nb_inputs);
-
-            if (best_ist) {
-                *best_ist = (input_idx >= 0 && input_idx < graph->nb_inputs) ?
-                            ifp_from_ifilter(graph->inputs[input_idx])->ist : NULL;
-
-                if (input_idx < 0 && !got_frames) {
-                    for (int i = 0; i < graph->nb_outputs; i++)
-                        graph->outputs[i]->ost->unavailable = 1;
-                }
-            }
-            break;
-        }
-
-        // got a frame from the filtering thread, send it for encoding
-        ofilter = graph->outputs[output_idx];
-        ost     = ofilter->ost;
-        ofp     = ofp_from_ofilter(ofilter);
-
-        if (ost->finished) {
-            av_frame_unref(fgp->frame);
-            tq_receive_finish(fgp->queue_out, output_idx);
-            continue;
-        }
-
-        if (fgp->frame->pts != AV_NOPTS_VALUE) {
-            ofilter->last_pts = av_rescale_q(fgp->frame->pts,
-                                             fgp->frame->time_base,
-                                             AV_TIME_BASE_Q);
-        }
-
-        ret = enc_frame(ost, fgp->frame);
-        av_frame_unref(fgp->frame);
-        if (ret < 0)
-            goto finish;
-
-        ofp->got_frame = 1;
-        got_frames     = 1;
-    }
-
-finish:
-    if (ret < 0) {
-        fgp->eof_in = 1;
-        for (int i = 0; i < graph->nb_outputs; i++)
-            close_output_stream(graph->outputs[i]->ost);
-    }
-
-    return ret;
-}
-
-int reap_filters(FilterGraph *fg, int flush)
-{
-    return fg_transcode_step(fg, NULL);
+#endif
 }
 
 void fg_send_command(FilterGraph *fg, double time, const char *target,
@@ -3138,9 +2920,6 @@  void fg_send_command(FilterGraph *fg, double time, const char *target,
     FilterCommand *fc;
     int output_idx, ret;
 
-    if (!fgp->queue_in)
-        return;
-
     fc = av_mallocz(sizeof(*fc));
     if (!fc)
         return;
@@ -3165,13 +2944,5 @@  void fg_send_command(FilterGraph *fg, double time, const char *target,
     fgp->frame->buf[0] = buf;
     fgp->frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_SEND_COMMAND;
 
-    ret = tq_send(fgp->queue_in, fg->nb_inputs + 1, fgp->frame);
-    if (ret < 0) {
-        av_frame_unref(fgp->frame);
-        return;
-    }
-
-    // wait for the frame to be processed
-    ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame);
-    av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF);
+    // XXX actually implement the command
 }