diff mbox series

[FFmpeg-devel,37/42] fftools/ffmpeg_sched: allow decoders to have multiple outputs

Message ID 20240827154041.13846-39-anton@khirnov.net
State New
Headers show
Series [FFmpeg-devel,01/42] lavu/opt: add API for setting array-type option values | expand

Commit Message

Anton Khirnov Aug. 27, 2024, 3:05 p.m. UTC
Will be useful for multilayer video.
---
 fftools/ffmpeg_dec.c      |  10 ++--
 fftools/ffmpeg_demux.c    |   2 +-
 fftools/ffmpeg_filter.c   |   4 +-
 fftools/ffmpeg_mux_init.c |   2 +-
 fftools/ffmpeg_sched.c    | 102 ++++++++++++++++++++++++++++----------
 fftools/ffmpeg_sched.h    |  21 ++++++--
 6 files changed, 100 insertions(+), 41 deletions(-)
diff mbox series

Patch

diff --git a/fftools/ffmpeg_dec.c b/fftools/ffmpeg_dec.c
index c2bcf784b0..54f7223f0f 100644
--- a/fftools/ffmpeg_dec.c
+++ b/fftools/ffmpeg_dec.c
@@ -578,7 +578,7 @@  static int process_subtitle(DecoderPriv *dp, AVFrame *frame)
     if (!subtitle)
         return 0;
 
-    ret = sch_dec_send(dp->sch, dp->sch_idx, frame);
+    ret = sch_dec_send(dp->sch, dp->sch_idx, 0, frame);
     if (ret < 0)
         av_frame_unref(frame);
 
@@ -620,7 +620,7 @@  static int transcode_subtitles(DecoderPriv *dp, const AVPacket *pkt,
         frame->time_base = pkt->time_base;
         frame->opaque    = (void*)(intptr_t)FRAME_OPAQUE_SUB_HEARTBEAT;
 
-        ret = sch_dec_send(dp->sch, dp->sch_idx, frame);
+        ret = sch_dec_send(dp->sch, dp->sch_idx, 0, frame);
         return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
     } else if (pkt && (intptr_t)pkt->opaque == PKT_OPAQUE_FIX_SUB_DURATION) {
         return fix_sub_duration_heartbeat(dp, av_rescale_q(pkt->pts, pkt->time_base,
@@ -773,7 +773,7 @@  static int packet_decode(DecoderPriv *dp, AVPacket *pkt, AVFrame *frame)
 
         dp->dec.frames_decoded++;
 
-        ret = sch_dec_send(dp->sch, dp->sch_idx, frame);
+        ret = sch_dec_send(dp->sch, dp->sch_idx, 0, frame);
         if (ret < 0) {
             av_frame_unref(frame);
             return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
@@ -951,7 +951,7 @@  static int decoder_thread(void *arg)
                               dp->last_frame_pts + dp->last_frame_duration_est;
         dt.frame->time_base = dp->last_frame_tb;
 
-        ret = sch_dec_send(dp->sch, dp->sch_idx, dt.frame);
+        ret = sch_dec_send(dp->sch, dp->sch_idx, 0, dt.frame);
         if (ret < 0 && ret != AVERROR_EOF) {
             av_log(dp, AV_LOG_FATAL,
                    "Error signalling EOF timestamp: %s\n", av_err2str(ret));
@@ -1355,7 +1355,7 @@  int dec_create(const OptionsContext *o, const char *arg, Scheduler *sch)
         return ret;
     enc_idx = ret;
 
-    ret = sch_connect(sch, SCH_ENC(enc_idx), SCH_DEC(dp->sch_idx));
+    ret = sch_connect(sch, SCH_ENC(enc_idx), SCH_DEC_IN(dp->sch_idx));
     if (ret < 0)
         return ret;
 
diff --git a/fftools/ffmpeg_demux.c b/fftools/ffmpeg_demux.c
index 039ee0c785..476efff127 100644
--- a/fftools/ffmpeg_demux.c
+++ b/fftools/ffmpeg_demux.c
@@ -954,7 +954,7 @@  static int ist_use(InputStream *ist, int decoding_needed)
         ds->sch_idx_dec = ret;
 
         ret = sch_connect(d->sch, SCH_DSTREAM(d->f.index, ds->sch_idx_stream),
-                                  SCH_DEC(ds->sch_idx_dec));
+                                  SCH_DEC_IN(ds->sch_idx_dec));
         if (ret < 0)
             return ret;
 
diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c
index b562e8417c..fb2b1a5b32 100644
--- a/fftools/ffmpeg_filter.c
+++ b/fftools/ffmpeg_filter.c
@@ -703,7 +703,7 @@  static int ifilter_bind_ist(InputFilter *ifilter, InputStream *ist)
     if (dec_idx < 0)
         return dec_idx;
 
-    ret = sch_connect(fgp->sch, SCH_DEC(dec_idx),
+    ret = sch_connect(fgp->sch, SCH_DEC_OUT(dec_idx, 0),
                                 SCH_FILTER_IN(fgp->sch_idx, ifp->index));
     if (ret < 0)
         return ret;
@@ -749,7 +749,7 @@  static int ifilter_bind_dec(InputFilterPriv *ifp, Decoder *dec)
     if (dec_idx < 0)
         return dec_idx;
 
-    ret = sch_connect(fgp->sch, SCH_DEC(dec_idx),
+    ret = sch_connect(fgp->sch, SCH_DEC_OUT(dec_idx, 0),
                                 SCH_FILTER_IN(fgp->sch_idx, ifp->index));
     if (ret < 0)
         return ret;
diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c
index e84fa9719f..5ee2a9685b 100644
--- a/fftools/ffmpeg_mux_init.c
+++ b/fftools/ffmpeg_mux_init.c
@@ -1458,7 +1458,7 @@  static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
         ms->sch_idx_src = sched_idx;
 
         if (ost->enc) {
-            ret = sch_connect(mux->sch, SCH_DEC(sched_idx),
+            ret = sch_connect(mux->sch, SCH_DEC_OUT(sched_idx, 0),
                                         SCH_ENC(ms->sch_idx_enc));
             if (ret < 0)
                 goto fail;
diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
index cff824340b..ef0b6e2897 100644
--- a/fftools/ffmpeg_sched.c
+++ b/fftools/ffmpeg_sched.c
@@ -71,13 +71,19 @@  typedef struct SchTask {
     int                 thread_running;
 } SchTask;
 
+typedef struct SchDecOutput {
+    SchedulerNode      *dst;
+    uint8_t            *dst_finished;
+    unsigned         nb_dst;
+} SchDecOutput;
+
 typedef struct SchDec {
     const AVClass      *class;
 
     SchedulerNode       src;
-    SchedulerNode      *dst;
-    uint8_t            *dst_finished;
-    unsigned         nb_dst;
+
+    SchDecOutput       *outputs;
+    unsigned         nb_outputs;
 
     SchTask             task;
     // Queue for receiving input packets, one stream.
@@ -513,8 +519,14 @@  void sch_free(Scheduler **psch)
 
         av_thread_message_queue_free(&dec->queue_end_ts);
 
-        av_freep(&dec->dst);
-        av_freep(&dec->dst_finished);
+        for (unsigned j = 0; j < dec->nb_outputs; j++) {
+            SchDecOutput *o = &dec->outputs[j];
+
+            av_freep(&o->dst);
+            av_freep(&o->dst_finished);
+        }
+
+        av_freep(&dec->outputs);
 
         av_frame_free(&dec->send_frame);
     }
@@ -712,14 +724,28 @@  int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
     return ret < 0 ? ret : d->nb_streams - 1;
 }
 
+int sch_add_dec_output(Scheduler *sch, unsigned dec_idx)
+{
+    SchDec *dec;
+    int ret;
+
+    av_assert0(dec_idx < sch->nb_dec);
+    dec = &sch->dec[dec_idx];
+
+    ret = GROW_ARRAY(dec->outputs, dec->nb_outputs);
+    if (ret < 0)
+        return ret;
+
+    return dec->nb_outputs - 1;
+}
+
 static const AVClass sch_dec_class = {
     .class_name                = "SchDec",
     .version                   = LIBAVUTIL_VERSION_INT,
     .parent_log_context_offset = offsetof(SchDec, task.func_arg),
 };
 
-int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx,
-                int send_end_ts)
+int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts)
 {
     const unsigned idx = sch->nb_dec;
 
@@ -739,6 +765,10 @@  int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx,
     if (!dec->send_frame)
         return AVERROR(ENOMEM);
 
+    ret = sch_add_dec_output(sch, idx);
+    if (ret < 0)
+        return ret;
+
     ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS);
     if (ret < 0)
         return ret;
@@ -943,15 +973,19 @@  int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst)
         }
     case SCH_NODE_TYPE_DEC: {
         SchDec *dec;
+        SchDecOutput *o;
 
         av_assert0(src.idx < sch->nb_dec);
         dec = &sch->dec[src.idx];
 
-        ret = GROW_ARRAY(dec->dst, dec->nb_dst);
+        av_assert0(src.idx_stream < dec->nb_outputs);
+        o = &dec->outputs[src.idx_stream];
+
+        ret = GROW_ARRAY(o->dst, o->nb_dst);
         if (ret < 0)
             return ret;
 
-        dec->dst[dec->nb_dst - 1] = dst;
+        o->dst[o->nb_dst - 1] = dst;
 
         // decoded frames go to filters or encoding
         switch (dst.type) {
@@ -1417,15 +1451,20 @@  static int start_prepare(Scheduler *sch)
                    "Decoder not connected to a source\n");
             return AVERROR(EINVAL);
         }
-        if (!dec->nb_dst) {
-            av_log(dec, AV_LOG_ERROR,
-                   "Decoder not connected to any sink\n");
-            return AVERROR(EINVAL);
-        }
 
-        dec->dst_finished = av_calloc(dec->nb_dst, sizeof(*dec->dst_finished));
-        if (!dec->dst_finished)
-            return AVERROR(ENOMEM);
+        for (unsigned j = 0; j < dec->nb_outputs; j++) {
+            SchDecOutput *o = &dec->outputs[j];
+
+            if (!o->nb_dst) {
+                av_log(dec, AV_LOG_ERROR,
+                       "Decoder output %u not connected to any sink\n", j);
+                return AVERROR(EINVAL);
+            }
+
+            o->dst_finished = av_calloc(o->nb_dst, sizeof(*o->dst_finished));
+            if (!o->dst_finished)
+                return AVERROR(ENOMEM);
+        }
     }
 
     for (unsigned i = 0; i < sch->nb_enc; i++) {
@@ -2171,21 +2210,26 @@  finish:
     return AVERROR_EOF;
 }
 
-int sch_dec_send(Scheduler *sch, unsigned dec_idx, AVFrame *frame)
+int sch_dec_send(Scheduler *sch, unsigned dec_idx,
+                 unsigned out_idx, AVFrame *frame)
 {
     SchDec *dec;
+    SchDecOutput *o;
     int ret;
     unsigned nb_done = 0;
 
     av_assert0(dec_idx < sch->nb_dec);
     dec = &sch->dec[dec_idx];
 
-    for (unsigned i = 0; i < dec->nb_dst; i++) {
-        uint8_t *finished = &dec->dst_finished[i];
+    av_assert0(out_idx < dec->nb_outputs);
+    o = &dec->outputs[out_idx];
+
+    for (unsigned i = 0; i < o->nb_dst; i++) {
+        uint8_t *finished = &o->dst_finished[i];
         AVFrame *to_send  = frame;
 
         // sending a frame consumes it, so make a temporary reference if needed
-        if (i < dec->nb_dst - 1) {
+        if (i < o->nb_dst - 1) {
             to_send = dec->send_frame;
 
             // frame may sometimes contain props only,
@@ -2196,7 +2240,7 @@  int sch_dec_send(Scheduler *sch, unsigned dec_idx, AVFrame *frame)
                 return ret;
         }
 
-        ret = dec_send_to_dst(sch, dec->dst[i], finished, to_send);
+        ret = dec_send_to_dst(sch, o->dst[i], finished, to_send);
         if (ret < 0) {
             av_frame_unref(to_send);
             if (ret == AVERROR_EOF) {
@@ -2207,7 +2251,7 @@  int sch_dec_send(Scheduler *sch, unsigned dec_idx, AVFrame *frame)
         }
     }
 
-    return (nb_done == dec->nb_dst) ? AVERROR_EOF : 0;
+    return (nb_done == o->nb_dst) ? AVERROR_EOF : 0;
 }
 
 static int dec_done(Scheduler *sch, unsigned dec_idx)
@@ -2222,10 +2266,14 @@  static int dec_done(Scheduler *sch, unsigned dec_idx)
     if (dec->queue_end_ts)
         av_thread_message_queue_set_err_recv(dec->queue_end_ts, AVERROR_EOF);
 
-    for (unsigned i = 0; i < dec->nb_dst; i++) {
-        int err = dec_send_to_dst(sch, dec->dst[i], &dec->dst_finished[i], NULL);
-        if (err < 0 && err != AVERROR_EOF)
-            ret = err_merge(ret, err);
+    for (unsigned i = 0; i < dec->nb_outputs; i++) {
+        SchDecOutput *o = &dec->outputs[i];
+
+        for (unsigned j = 0; j < o->nb_dst; j++) {
+            int err = dec_send_to_dst(sch, o->dst[j], &o->dst_finished[j], NULL);
+            if (err < 0 && err != AVERROR_EOF)
+                ret = err_merge(ret, err);
+        }
     }
 
     return ret;
diff --git a/fftools/ffmpeg_sched.h b/fftools/ffmpeg_sched.h
index 7cd839016c..3062c4a6ec 100644
--- a/fftools/ffmpeg_sched.h
+++ b/fftools/ffmpeg_sched.h
@@ -114,9 +114,12 @@  typedef int (*SchThreadFunc)(void *arg);
 #define SCH_MSTREAM(file, stream)                           \
     (SchedulerNode){ .type = SCH_NODE_TYPE_MUX,             \
                      .idx = file, .idx_stream = stream }
-#define SCH_DEC(decoder)                                    \
+#define SCH_DEC_IN(decoder)                                 \
     (SchedulerNode){ .type = SCH_NODE_TYPE_DEC,             \
-                    .idx = decoder }
+                     .idx = decoder }
+#define SCH_DEC_OUT(decoder, out_idx)                       \
+    (SchedulerNode){ .type = SCH_NODE_TYPE_DEC,             \
+                     .idx = decoder, .idx_stream = out_idx }
 #define SCH_ENC(encoder)                                    \
     (SchedulerNode){ .type = SCH_NODE_TYPE_ENC,             \
                     .idx = encoder }
@@ -178,8 +181,15 @@  int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx);
  * @retval ">=0" Index of the newly-created decoder.
  * @retval "<0"  Error code.
  */
-int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx,
-                int send_end_ts);
+int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts);
+
+/**
+ * Add another output to decoder (e.g. for multiview video).
+ *
+ * @retval ">=0" Index of the newly-added decoder output.
+ * @retval "<0"  Error code.
+ */
+int sch_add_dec_output(Scheduler *sch, unsigned dec_idx);
 
 /**
  * Add a filtergraph to the scheduler.
@@ -379,7 +389,8 @@  int sch_dec_receive(Scheduler *sch, unsigned dec_idx, struct AVPacket *pkt);
  * @retval AVERROR_EOF all consumers are done, should terminate decoding
  * @retval "another negative error code" other failure
  */
-int sch_dec_send(Scheduler *sch, unsigned dec_idx, struct AVFrame *frame);
+int sch_dec_send(Scheduler *sch, unsigned dec_idx,
+                 unsigned out_idx, struct AVFrame *frame);
 
 /**
  * Called by filtergraph tasks to obtain frames for filtering. Will wait for a