diff mbox series

[FFmpeg-devel,2/2] fftools/ffmpeg_sched: make sure to always run task cleanup

Message ID 20240327102126.14299-2-anton@khirnov.net
State Accepted
Commit 24b9f29ff2e0b84ae1345f51cbf7240e079d7a2b
Headers show
Series [FFmpeg-devel,1/2] fftools/ffmpeg_sched: move sch_stop() to the bottom of the file | expand

Checks

Context Check Description
yinshiyou/make_loongarch64 success Make finished
yinshiyou/make_fate_loongarch64 success Make fate finished
andriy/make_x86 success Make finished
andriy/make_fate_x86 success Make fate finished

Commit Message

Anton Khirnov March 27, 2024, 10:21 a.m. UTC
Even in cases where the task failed to start due to pthread_create()
failing.
---
 fftools/ffmpeg_sched.c | 68 +++++++++++++++++++++++++++---------------
 1 file changed, 44 insertions(+), 24 deletions(-)
diff mbox series

Patch

diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
index 67c32fb5a0..ee3af45908 100644
--- a/fftools/ffmpeg_sched.c
+++ b/fftools/ffmpeg_sched.c
@@ -260,6 +260,12 @@  typedef struct SchFilterGraph {
     int                 task_exited;
 } SchFilterGraph;
 
+enum SchedulerState {
+    SCH_STATE_UNINIT,
+    SCH_STATE_STARTED,
+    SCH_STATE_STOPPED,
+};
+
 struct Scheduler {
     const AVClass      *class;
 
@@ -292,7 +298,7 @@  struct Scheduler {
     char               *sdp_filename;
     int                 sdp_auto;
 
-    int                 transcode_started;
+    enum SchedulerState state;
     atomic_int          terminate;
     atomic_int          task_failed;
 
@@ -1144,7 +1150,8 @@  int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
 
     // this may be called during initialization - do not start
     // threads before sch_start() is called
-    if (++mux->nb_streams_ready == mux->nb_streams && sch->transcode_started)
+    if (++mux->nb_streams_ready == mux->nb_streams &&
+        sch->state >= SCH_STATE_STARTED)
         ret = mux_init(sch, mux);
 
     pthread_mutex_unlock(&sch->mux_ready_lock);
@@ -1514,7 +1521,8 @@  int sch_start(Scheduler *sch)
     if (ret < 0)
         return ret;
 
-    sch->transcode_started = 1;
+    av_assert0(sch->state == SCH_STATE_UNINIT);
+    sch->state = SCH_STATE_STARTED;
 
     for (unsigned i = 0; i < sch->nb_mux; i++) {
         SchMux *mux = &sch->mux[i];
@@ -1522,7 +1530,7 @@  int sch_start(Scheduler *sch)
         if (mux->nb_streams_ready == mux->nb_streams) {
             ret = mux_init(sch, mux);
             if (ret < 0)
-                return ret;
+                goto fail;
         }
     }
 
@@ -1531,7 +1539,7 @@  int sch_start(Scheduler *sch)
 
         ret = task_start(&enc->task);
         if (ret < 0)
-            return ret;
+            goto fail;
     }
 
     for (unsigned i = 0; i < sch->nb_filters; i++) {
@@ -1539,7 +1547,7 @@  int sch_start(Scheduler *sch)
 
         ret = task_start(&fg->task);
         if (ret < 0)
-            return ret;
+            goto fail;
     }
 
     for (unsigned i = 0; i < sch->nb_dec; i++) {
@@ -1547,7 +1555,7 @@  int sch_start(Scheduler *sch)
 
         ret = task_start(&dec->task);
         if (ret < 0)
-            return ret;
+            goto fail;
     }
 
     for (unsigned i = 0; i < sch->nb_demux; i++) {
@@ -1558,7 +1566,7 @@  int sch_start(Scheduler *sch)
 
         ret = task_start(&d->task);
         if (ret < 0)
-            return ret;
+            goto fail;
     }
 
     pthread_mutex_lock(&sch->schedule_lock);
@@ -1566,6 +1574,9 @@  int sch_start(Scheduler *sch)
     pthread_mutex_unlock(&sch->schedule_lock);
 
     return 0;
+fail:
+    sch_stop(sch, NULL);
+    return ret;
 }
 
 int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
@@ -2414,6 +2425,18 @@  int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
     return send_to_filter(sch, fg, fg->nb_inputs, frame);
 }
 
+static int task_cleanup(Scheduler *sch, SchedulerNode node)
+{
+    switch (node.type) {
+    case SCH_NODE_TYPE_DEMUX:       return demux_done (sch, node.idx);
+    case SCH_NODE_TYPE_MUX:         return mux_done   (sch, node.idx);
+    case SCH_NODE_TYPE_DEC:         return dec_done   (sch, node.idx);
+    case SCH_NODE_TYPE_ENC:         return enc_done   (sch, node.idx);
+    case SCH_NODE_TYPE_FILTER_IN:   return filter_done(sch, node.idx);
+    default: av_assert0(0);
+    }
+}
+
 static void *task_wrapper(void *arg)
 {
     SchTask  *task = arg;
@@ -2426,15 +2449,7 @@  static void *task_wrapper(void *arg)
         av_log(task->func_arg, AV_LOG_ERROR,
                "Task finished with error code: %d (%s)\n", ret, av_err2str(ret));
 
-    switch (task->node.type) {
-    case SCH_NODE_TYPE_DEMUX:       err = demux_done (sch, task->node.idx); break;
-    case SCH_NODE_TYPE_MUX:         err = mux_done   (sch, task->node.idx); break;
-    case SCH_NODE_TYPE_DEC:         err = dec_done   (sch, task->node.idx); break;
-    case SCH_NODE_TYPE_ENC:         err = enc_done   (sch, task->node.idx); break;
-    case SCH_NODE_TYPE_FILTER_IN:   err = filter_done(sch, task->node.idx); break;
-    default: av_assert0(0);
-    }
-
+    err = task_cleanup(sch, task->node);
     ret = err_merge(ret, err);
 
     // EOF is considered normal termination
@@ -2450,13 +2465,13 @@  static void *task_wrapper(void *arg)
     return (void*)(intptr_t)ret;
 }
 
-static int task_stop(SchTask *task)
+static int task_stop(Scheduler *sch, SchTask *task)
 {
     int ret;
     void *thread_ret;
 
     if (!task->thread_running)
-        return 0;
+        return task_cleanup(sch, task->node);
 
     ret = pthread_join(task->thread, &thread_ret);
     av_assert0(ret == 0);
@@ -2470,6 +2485,9 @@  int sch_stop(Scheduler *sch, int64_t *finish_ts)
 {
     int ret = 0, err;
 
+    if (sch->state != SCH_STATE_STARTED)
+        return 0;
+
     atomic_store(&sch->terminate, 1);
 
     for (unsigned type = 0; type < 2; type++)
@@ -2481,40 +2499,42 @@  int sch_stop(Scheduler *sch, int64_t *finish_ts)
     for (unsigned i = 0; i < sch->nb_demux; i++) {
         SchDemux *d = &sch->demux[i];
 
-        err = task_stop(&d->task);
+        err = task_stop(sch, &d->task);
         ret = err_merge(ret, err);
     }
 
     for (unsigned i = 0; i < sch->nb_dec; i++) {
         SchDec *dec = &sch->dec[i];
 
-        err = task_stop(&dec->task);
+        err = task_stop(sch, &dec->task);
         ret = err_merge(ret, err);
     }
 
     for (unsigned i = 0; i < sch->nb_filters; i++) {
         SchFilterGraph *fg = &sch->filters[i];
 
-        err = task_stop(&fg->task);
+        err = task_stop(sch, &fg->task);
         ret = err_merge(ret, err);
     }
 
     for (unsigned i = 0; i < sch->nb_enc; i++) {
         SchEnc *enc = &sch->enc[i];
 
-        err = task_stop(&enc->task);
+        err = task_stop(sch, &enc->task);
         ret = err_merge(ret, err);
     }
 
     for (unsigned i = 0; i < sch->nb_mux; i++) {
         SchMux *mux = &sch->mux[i];
 
-        err = task_stop(&mux->task);
+        err = task_stop(sch, &mux->task);
         ret = err_merge(ret, err);
     }
 
     if (finish_ts)
         *finish_ts = trailing_dts(sch, 1);
 
+    sch->state = SCH_STATE_STOPPED;
+
     return ret;
 }