diff mbox series

[FFmpeg-devel,1/3] fftools/ffmpeg_sched: add filter API to signal EOF on input

Message ID 20240124104622.24343-1-anton@khirnov.net
State Accepted
Commit 00013341dfd674b72acb8d93fb6b3f3dc7ebc42e
Headers show
Series [FFmpeg-devel,1/3] fftools/ffmpeg_sched: add filter API to signal EOF on input | expand

Checks

Context Check Description
yinshiyou/make_loongarch64 success Make finished
yinshiyou/make_fate_loongarch64 success Make fate finished
andriy/make_x86 success Make finished
andriy/make_fate_x86 success Make fate finished

Commit Message

Anton Khirnov Jan. 24, 2024, 10:46 a.m. UTC
---
 fftools/ffmpeg_sched.c | 27 +++++++++++++++++++++++++--
 fftools/ffmpeg_sched.h |  7 +++++++
 2 files changed, 32 insertions(+), 2 deletions(-)
diff mbox series

Patch

diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
index 4fc5a33941..c6ed2e21ff 100644
--- a/fftools/ffmpeg_sched.c
+++ b/fftools/ffmpeg_sched.c
@@ -226,6 +226,7 @@  typedef struct SchFilterIn {
     SchedulerNode       src;
     SchedulerNode       src_sched;
     int                 send_finished;
+    int                 receive_finished;
 } SchFilterIn;
 
 typedef struct SchFilterOut {
@@ -237,7 +238,8 @@  typedef struct SchFilterGraph {
 
     SchFilterIn        *inputs;
     unsigned         nb_inputs;
-    atomic_uint      nb_inputs_finished;
+    atomic_uint      nb_inputs_finished_send;
+    unsigned         nb_inputs_finished_receive;
 
     SchFilterOut       *outputs;
     unsigned         nb_outputs;
@@ -1959,7 +1961,7 @@  static int send_to_filter(Scheduler *sch, SchFilterGraph *fg,
         tq_send_finish(fg->queue, in_idx);
 
         // close the control stream when all actual inputs are done
-        if (atomic_fetch_add(&fg->nb_inputs_finished, 1) == fg->nb_inputs - 1)
+        if (atomic_fetch_add(&fg->nb_inputs_finished_send, 1) == fg->nb_inputs - 1)
             tq_send_finish(fg->queue, fg->nb_inputs);
     }
     return 0;
@@ -2143,6 +2145,27 @@  int sch_filter_receive(Scheduler *sch, unsigned fg_idx,
     }
 }
 
+void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
+{
+    SchFilterGraph *fg;
+    SchFilterIn    *fi;
+
+    av_assert0(fg_idx < sch->nb_filters);
+    fg = &sch->filters[fg_idx];
+
+    av_assert0(in_idx < fg->nb_inputs);
+    fi = &fg->inputs[in_idx];
+
+    if (!fi->receive_finished) {
+        fi->receive_finished = 1;
+        tq_receive_finish(fg->queue, in_idx);
+
+        // close the control stream when all actual inputs are done
+        if (++fg->nb_inputs_finished_receive == fg->nb_inputs)
+            tq_receive_finish(fg->queue, fg->nb_inputs);
+    }
+}
+
 int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
 {
     SchFilterGraph *fg;
diff --git a/fftools/ffmpeg_sched.h b/fftools/ffmpeg_sched.h
index b167d8d158..811146f6ed 100644
--- a/fftools/ffmpeg_sched.h
+++ b/fftools/ffmpeg_sched.h
@@ -388,6 +388,13 @@  int sch_dec_send(Scheduler *sch, unsigned dec_idx, struct AVFrame *frame);
  */
 int sch_filter_receive(Scheduler *sch, unsigned fg_idx,
                        unsigned *in_idx, struct AVFrame *frame);
+/**
+ * Called by filter tasks to signal that a filter input will no longer accept input.
+ *
+ * @param fg_idx Filtergraph index previously returned from sch_add_filtergraph().
+ * @param in_idx Index of the input to finish.
+ */
+void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx);
 
 /**
  * Called by filtergraph tasks to send a filtered frame or EOF to consumers.