diff mbox series

[FFmpeg-devel,15/18] fftools/ffmpeg_sched: allow connecting encoder output to decoders

Message ID 20240306110319.17339-15-anton@khirnov.net
State Accepted
Commit efab83c1561c5c095a976971b947489a2d9abfa9
Headers show
Series [FFmpeg-devel,01/18] fftools/cmdutils: fix printing group name in split_commandline() | expand

Checks

Context Check Description
yinshiyou/make_loongarch64 success Make finished
yinshiyou/make_fate_loongarch64 success Make fate finished
andriy/make_x86 success Make finished
andriy/make_fate_x86 success Make fate finished

Commit Message

Anton Khirnov March 6, 2024, 11:03 a.m. UTC
---
 fftools/ffmpeg_sched.c | 212 ++++++++++++++++++++++++++++++++++-------
 fftools/ffmpeg_sched.h |   8 +-
 2 files changed, 181 insertions(+), 39 deletions(-)
diff mbox series

Patch

diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
index 5f8ef04680..d1fb942c34 100644
--- a/fftools/ffmpeg_sched.c
+++ b/fftools/ffmpeg_sched.c
@@ -1051,24 +1051,43 @@  int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst)
         }
     case SCH_NODE_TYPE_ENC: {
         SchEnc       *enc;
-        SchMuxStream *ms;
 
         av_assert0(src.idx < sch->nb_enc);
-        // encoding packets go to muxing
-        av_assert0(dst.type == SCH_NODE_TYPE_MUX &&
-                   dst.idx < sch->nb_mux &&
-                   dst.idx_stream < sch->mux[dst.idx].nb_streams);
         enc = &sch->enc[src.idx];
-        ms  = &sch->mux[dst.idx].streams[dst.idx_stream];
-
-        av_assert0(!ms->src.type);
 
         ret = GROW_ARRAY(enc->dst, enc->nb_dst);
         if (ret < 0)
             return ret;
 
         enc->dst[enc->nb_dst - 1] = dst;
-        ms->src                   = src;
+
+        // encoding packets go to muxing or decoding
+        switch (dst.type) {
+        case SCH_NODE_TYPE_MUX: {
+            SchMuxStream *ms;
+
+            av_assert0(dst.idx        < sch->nb_mux &&
+                       dst.idx_stream < sch->mux[dst.idx].nb_streams);
+            ms = &sch->mux[dst.idx].streams[dst.idx_stream];
+
+            av_assert0(!ms->src.type);
+            ms->src  = src;
+
+            break;
+            }
+        case SCH_NODE_TYPE_DEC: {
+            SchDec *dec;
+
+            av_assert0(dst.idx < sch->nb_dec);
+            dec = &sch->dec[dst.idx];
+
+            av_assert0(!dec->src.type);
+            dec->src = src;
+
+            break;
+            }
+        default: av_assert0(0);
+        }
 
         break;
         }
@@ -1217,6 +1236,31 @@  int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_
     return 0;
 }
 
+static void unchoke_for_stream(Scheduler *sch, SchedulerNode src)
+{
+    while (1) {
+        SchFilterGraph *fg;
+
+        // fed directly by a demuxer (i.e. not through a filtergraph)
+        if (src.type == SCH_NODE_TYPE_DEMUX) {
+            sch->demux[src.idx].waiter.choked_next = 0;
+            return;
+        }
+
+        av_assert0(src.type == SCH_NODE_TYPE_FILTER_OUT);
+        fg = &sch->filters[src.idx];
+
+        // the filtergraph contains internal sources and
+        // requested to be scheduled directly
+        if (fg->best_input == fg->nb_inputs) {
+            fg->waiter.choked_next = 0;
+            return;
+        }
+
+        src = fg->inputs[fg->best_input].src_sched;
+    }
+}
+
 static void schedule_update_locked(Scheduler *sch)
 {
     int64_t dts;
@@ -1245,7 +1289,6 @@  static void schedule_update_locked(Scheduler *sch)
 
         for (unsigned j = 0; j < mux->nb_streams; j++) {
             SchMuxStream *ms = &mux->streams[j];
-            SchDemux *d;
 
             // unblock sources for output streams that are not finished
             // and not too far ahead of the trailing stream
@@ -1256,28 +1299,9 @@  static void schedule_update_locked(Scheduler *sch)
             if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE)
                 continue;
 
-            // for outputs fed from filtergraphs, consider that filtergraph's
-            // best_input information, in other cases there is a well-defined
-            // source demuxer
-            if (ms->src_sched.type == SCH_NODE_TYPE_FILTER_OUT) {
-                SchFilterGraph *fg = &sch->filters[ms->src_sched.idx];
-                SchFilterIn *fi;
-
-                // the filtergraph contains internal sources and
-                // requested to be scheduled directly
-                if (fg->best_input == fg->nb_inputs) {
-                    fg->waiter.choked_next = 0;
-                    have_unchoked          = 1;
-                    continue;
-                }
-
-                fi = &fg->inputs[fg->best_input];
-                d  = &sch->demux[fi->src_sched.idx];
-            } else
-                d = &sch->demux[ms->src_sched.idx];
-
-            d->waiter.choked_next = 0;
-            have_unchoked         = 1;
+            // resolve the source to unchoke
+            unchoke_for_stream(sch, ms->src_sched);
+            have_unchoked = 1;
         }
     }
 
@@ -1303,6 +1327,105 @@  static void schedule_update_locked(Scheduler *sch)
 
 }
 
+enum {
+    CYCLE_NODE_NEW = 0,
+    CYCLE_NODE_STARTED,
+    CYCLE_NODE_DONE,
+};
+
+static int
+check_acyclic_for_output(const Scheduler *sch, SchedulerNode src,
+                         uint8_t *filters_visited, SchedulerNode *filters_stack)
+{
+    unsigned nb_filters_stack = 0;
+
+    memset(filters_visited, 0, sch->nb_filters * sizeof(*filters_visited));
+
+    while (1) {
+        const SchFilterGraph *fg = &sch->filters[src.idx];
+
+        filters_visited[src.idx] = CYCLE_NODE_STARTED;
+
+        // descend into every input, depth first
+        if (src.idx_stream < fg->nb_inputs) {
+            const SchFilterIn *fi = &fg->inputs[src.idx_stream++];
+
+            // connected to demuxer, no cycles possible
+            if (fi->src_sched.type == SCH_NODE_TYPE_DEMUX)
+                continue;
+
+            // otherwise connected to another filtergraph
+            av_assert0(fi->src_sched.type == SCH_NODE_TYPE_FILTER_OUT);
+
+            // found a cycle
+            if (filters_visited[fi->src_sched.idx] == CYCLE_NODE_STARTED)
+                return AVERROR(EINVAL);
+
+            // place current position on stack and descend
+            av_assert0(nb_filters_stack < sch->nb_filters);
+            filters_stack[nb_filters_stack++] = src;
+            src = (SchedulerNode){ .idx = fi->src_sched.idx, .idx_stream = 0 };
+            continue;
+        }
+
+        filters_visited[src.idx] = CYCLE_NODE_DONE;
+
+        // previous search finished,
+        if (nb_filters_stack) {
+            src = filters_stack[--nb_filters_stack];
+            continue;
+        }
+        return 0;
+    }
+}
+
+static int check_acyclic(Scheduler *sch)
+{
+    uint8_t       *filters_visited = NULL;
+    SchedulerNode *filters_stack   = NULL;
+
+    int ret = 0;
+
+    if (!sch->nb_filters)
+        return 0;
+
+    filters_visited = av_malloc_array(sch->nb_filters, sizeof(*filters_visited));
+    if (!filters_visited)
+        return AVERROR(ENOMEM);
+
+    filters_stack = av_malloc_array(sch->nb_filters, sizeof(*filters_stack));
+    if (!filters_stack) {
+        ret = AVERROR(ENOMEM);
+        goto fail;
+    }
+
+    // trace the transcoding graph upstream from every output stream
+    // fed by a filtergraph
+    for (unsigned i = 0; i < sch->nb_mux; i++) {
+        SchMux *mux = &sch->mux[i];
+
+        for (unsigned j = 0; j < mux->nb_streams; j++) {
+            SchMuxStream  *ms = &mux->streams[j];
+            SchedulerNode src = ms->src_sched;
+
+            if (src.type != SCH_NODE_TYPE_FILTER_OUT)
+                continue;
+            src.idx_stream = 0;
+
+            ret = check_acyclic_for_output(sch, src, filters_visited, filters_stack);
+            if (ret < 0) {
+                av_log(mux, AV_LOG_ERROR, "Transcoding graph has a cycle\n");
+                goto fail;
+            }
+        }
+    }
+
+fail:
+    av_freep(&filters_visited);
+    av_freep(&filters_stack);
+    return ret;
+}
+
 static int start_prepare(Scheduler *sch)
 {
     int ret;
@@ -1402,14 +1525,21 @@  static int start_prepare(Scheduler *sch)
 
         for (unsigned j = 0; j < fg->nb_inputs; j++) {
             SchFilterIn *fi = &fg->inputs[j];
+            SchDec     *dec;
 
             if (!fi->src.type) {
                 av_log(fg, AV_LOG_ERROR,
                        "Filtergraph input %u not connected to a source\n", j);
                 return AVERROR(EINVAL);
             }
+            av_assert0(fi->src.type == SCH_NODE_TYPE_DEC);
+            dec = &sch->dec[fi->src.idx];
 
-            fi->src_sched = sch->dec[fi->src.idx].src;
+            switch (dec->src.type) {
+            case SCH_NODE_TYPE_DEMUX: fi->src_sched = dec->src;                   break;
+            case SCH_NODE_TYPE_ENC:   fi->src_sched = sch->enc[dec->src.idx].src; break;
+            default: av_assert0(0);
+            }
         }
 
         for (unsigned j = 0; j < fg->nb_outputs; j++) {
@@ -1423,6 +1553,11 @@  static int start_prepare(Scheduler *sch)
         }
     }
 
+    // Check that the transcoding graph has no cycles.
+    ret = check_acyclic(sch);
+    if (ret < 0)
+        return ret;
+
     return 0;
 }
 
@@ -1575,6 +1710,8 @@  static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
             SchMux      *mux;
             SchMuxStream *ms;
 
+            if (enc->dst[i].type != SCH_NODE_TYPE_MUX)
+                continue;
 
             mux = &sch->mux[enc->dst[i].idx];
             ms = &mux->streams[enc->dst[i].idx_stream];
@@ -2150,14 +2287,19 @@  static int enc_send_to_dst(Scheduler *sch, const SchedulerNode dst,
     if (!pkt)
         goto finish;
 
-    ret = send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt);
+    ret = (dst.type == SCH_NODE_TYPE_MUX) ?
+          send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
+          tq_send(sch->dec[dst.idx].queue, 0, pkt);
     if (ret == AVERROR_EOF)
         goto finish;
 
     return ret;
 
 finish:
-    send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
+    if (dst.type == SCH_NODE_TYPE_MUX)
+        send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
+    else
+        tq_send_finish(sch->dec[dst.idx].queue, 0);
 
     *dst_finished = 1;
 
diff --git a/fftools/ffmpeg_sched.h b/fftools/ffmpeg_sched.h
index fc6711f9c3..a9190bd3d1 100644
--- a/fftools/ffmpeg_sched.h
+++ b/fftools/ffmpeg_sched.h
@@ -35,9 +35,9 @@ 
  * - demuxers, each containing any number of demuxed streams; demuxed packets
  *   belonging to some stream are sent to any number of decoders (transcoding)
  *   and/or muxers (streamcopy);
- * - decoders, which receive encoded packets from some demuxed stream, decode
- *   them, and send decoded frames to any number of filtergraph inputs
- *   (audio/video) or encoders (subtitles);
+ * - decoders, which receive encoded packets from some demuxed stream or
+ *   encoder, decode them, and send decoded frames to any number of filtergraph
+ *   inputs (audio/video) or encoders (subtitles);
  * - filtergraphs, each containing zero or more inputs (0 in case the
  *   filtergraph contains a lavfi source filter), and one or more outputs; the
  *   inputs and outputs need not have matching media types;
@@ -45,7 +45,7 @@ 
  *   filtered frames from each output are sent to some encoder;
  * - encoders, which receive decoded frames from some decoder (subtitles) or
  *   some filtergraph output (audio/video), encode them, and send encoded
- *   packets to some muxed stream;
+ *   packets to any number of muxed streams or decoders;
  * - muxers, each containing any number of muxed streams; each muxed stream
  *   receives encoded packets from some demuxed stream (streamcopy) or some
  *   encoder (transcoding); those packets are interleaved and written out by the