diff mbox series

[FFmpeg-devel] fftools/ffmpeg: optimize inter-thread queue sizes

Message ID 20240124192916.22857-1-anton@khirnov.net
State Accepted
Commit e0da916b8f5b079a4865eef7f64863f50785463d
Headers show
Series [FFmpeg-devel] fftools/ffmpeg: optimize inter-thread queue sizes | expand

Checks

Context Check Description
yinshiyou/configure_loongarch64 warning Failed to apply patch
andriy/configure_x86 warning Failed to apply patch

Commit Message

Anton Khirnov Jan. 24, 2024, 7:29 p.m. UTC
Use 8 packets/frames by default rather than 1, which seems to provide
better throughput.

Allow -thread_queue_size to set the muxer queue size manually again.
---
 fftools/ffmpeg_mux.h                             |  2 --
 fftools/ffmpeg_mux_init.c                        |  3 +--
 fftools/ffmpeg_opt.c                             |  2 +-
 fftools/ffmpeg_sched.c                           | 15 ++++++++++-----
 fftools/ffmpeg_sched.h                           |  4 +++-
 tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat |  5 -----
 6 files changed, 15 insertions(+), 16 deletions(-)

Comments

Andreas Rheinhardt Jan. 24, 2024, 7:35 p.m. UTC | #1
Anton Khirnov:
> Use 8 packets/frames by default rather than 1, which seems to provide
> better throughput.
> 
> Allow -thread_queue_size to set the muxer queue size manually again.
> ---
>  fftools/ffmpeg_mux.h                             |  2 --
>  fftools/ffmpeg_mux_init.c                        |  3 +--
>  fftools/ffmpeg_opt.c                             |  2 +-
>  fftools/ffmpeg_sched.c                           | 15 ++++++++++-----
>  fftools/ffmpeg_sched.h                           |  4 +++-
>  tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat |  5 -----
>  6 files changed, 15 insertions(+), 16 deletions(-)
> 
> diff --git a/fftools/ffmpeg_mux.h b/fftools/ffmpeg_mux.h
> index d0be8a51ea..e1b44142cf 100644
> --- a/fftools/ffmpeg_mux.h
> +++ b/fftools/ffmpeg_mux.h
> @@ -94,8 +94,6 @@ typedef struct Muxer {
>  
>      AVDictionary *opts;
>  
> -    int thread_queue_size;
> -
>      /* filesize limit expressed in bytes */
>      int64_t limit_filesize;
>      atomic_int_least64_t last_filesize;
> diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c
> index 6b5e4f8b3c..8ada837555 100644
> --- a/fftools/ffmpeg_mux_init.c
> +++ b/fftools/ffmpeg_mux_init.c
> @@ -3047,7 +3047,6 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
>      of->start_time     = o->start_time;
>      of->shortest       = o->shortest;
>  
> -    mux->thread_queue_size = o->thread_queue_size > 0 ? o->thread_queue_size : 8;
>      mux->limit_filesize    = o->limit_filesize;
>      av_dict_copy(&mux->opts, o->g->format_opts, 0);
>  
> @@ -3081,7 +3080,7 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
>      }
>  
>      err = sch_add_mux(sch, muxer_thread, mux_check_init, mux,
> -                      !strcmp(oc->oformat->name, "rtp"));
> +                      !strcmp(oc->oformat->name, "rtp"), o->thread_queue_size);
>      if (err < 0)
>          return err;
>      mux->sch     = sch;
> diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c
> index 304c493dcf..7505b0cf90 100644
> --- a/fftools/ffmpeg_opt.c
> +++ b/fftools/ffmpeg_opt.c
> @@ -144,7 +144,7 @@ static void init_options(OptionsContext *o)
>      o->limit_filesize = INT64_MAX;
>      o->chapters_input_file = INT_MAX;
>      o->accurate_seek  = 1;
> -    o->thread_queue_size = -1;
> +    o->thread_queue_size = 0;
>      o->input_sync_ref = -1;
>      o->find_stream_info = 1;
>      o->shortest_buf_duration = 10.f;
> diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
> index 4fc5a33941..62a40c6057 100644
> --- a/fftools/ffmpeg_sched.c
> +++ b/fftools/ffmpeg_sched.c
> @@ -218,6 +218,7 @@ typedef struct SchMux {
>       */
>      atomic_int          mux_started;
>      ThreadQueue        *queue;
> +    unsigned            queue_size;
>  
>      AVPacket           *sub_heartbeat_pkt;
>  } SchMux;
> @@ -358,6 +359,8 @@ static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_si
>      ThreadQueue *tq;
>      ObjPool *op;
>  
> +    queue_size = queue_size > 0 ? queue_size : 8;
> +
>      op = (type == QUEUE_PACKETS) ? objpool_alloc_packets() :
>                                     objpool_alloc_frames();
>      if (!op)
> @@ -653,7 +656,7 @@ static const AVClass sch_mux_class = {
>  };
>  
>  int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
> -                void *arg, int sdp_auto)
> +                void *arg, int sdp_auto, unsigned thread_queue_size)
>  {
>      const unsigned idx = sch->nb_mux;
>  
> @@ -667,6 +670,7 @@ int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
>      mux             = &sch->mux[idx];
>      mux->class      = &sch_mux_class;
>      mux->init       = init;
> +    mux->queue_size = thread_queue_size;
>  
>      task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg);
>  
> @@ -773,7 +777,7 @@ int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx,
>      if (!dec->send_frame)
>          return AVERROR(ENOMEM);
>  
> -    ret = queue_alloc(&dec->queue, 1, 1, QUEUE_PACKETS);
> +    ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS);
>      if (ret < 0)
>          return ret;
>  
> @@ -813,7 +817,7 @@ int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx,
>  
>      task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
>  
> -    ret = queue_alloc(&enc->queue, 1, 1, QUEUE_FRAMES);
> +    ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES);
>      if (ret < 0)
>          return ret;
>  
> @@ -861,7 +865,7 @@ int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
>      if (ret < 0)
>          return ret;
>  
> -    ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 1, QUEUE_FRAMES);
> +    ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES);
>      if (ret < 0)
>          return ret;
>  
> @@ -1313,7 +1317,8 @@ int sch_start(Scheduler *sch)
>              }
>          }
>  
> -        ret = queue_alloc(&mux->queue, mux->nb_streams, 1, QUEUE_PACKETS);
> +        ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size,
> +                          QUEUE_PACKETS);
>          if (ret < 0)
>              return ret;
>  
> diff --git a/fftools/ffmpeg_sched.h b/fftools/ffmpeg_sched.h
> index b167d8d158..d12affa69d 100644
> --- a/fftools/ffmpeg_sched.h
> +++ b/fftools/ffmpeg_sched.h
> @@ -225,12 +225,14 @@ int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
>   *             streams in the muxer.
>   * @param ctx Muxer state; will be passed to func/init and used for logging.
>   * @param sdp_auto Determines automatic SDP writing - see sch_sdp_filename().
> + * @param thread_queue_size number of packets that can be buffered before
> + *                          sending to the muxer blocks
>   *
>   * @retval ">=0" Index of the newly-created muxer.
>   * @retval "<0"  Error code.
>   */
>  int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
> -                void *ctx, int sdp_auto);
> +                void *ctx, int sdp_auto, unsigned thread_queue_size);
>  /**
>   * Add a muxed stream for a previously added muxer.
>   *
> diff --git a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
> index bc9b833799..3a3ec96637 100644
> --- a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
> +++ b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
> @@ -33,8 +33,3 @@
>  <font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
>  >> Safety remains our numb</font>
>  
> -9
> -00:00:03,704 --> 00:00:04,004
> -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
> ->> Safety remains our number one</font>
> -

Why does the output of this test change?

- Andreas
Anton Khirnov Jan. 24, 2024, 7:45 p.m. UTC | #2
Quoting Andreas Rheinhardt (2024-01-24 20:35:48)
> > diff --git a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
> > index bc9b833799..3a3ec96637 100644
> > --- a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
> > +++ b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
> > @@ -33,8 +33,3 @@
> >  <font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
> >  >> Safety remains our numb</font>
> >  
> > -9
> > -00:00:03,704 --> 00:00:04,004
> > -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
> > ->> Safety remains our number one</font>
> > -
> 
> Why does the output of this test change?

Because the feature being tested is semi-broken - exact output depends on the
amount of buffering between the decoder and the muxer. We also get
rare random failures in this test in current master. So far nobody has
suggested a way of fixing it properly.
diff mbox series

Patch

diff --git a/fftools/ffmpeg_mux.h b/fftools/ffmpeg_mux.h
index d0be8a51ea..e1b44142cf 100644
--- a/fftools/ffmpeg_mux.h
+++ b/fftools/ffmpeg_mux.h
@@ -94,8 +94,6 @@  typedef struct Muxer {
 
     AVDictionary *opts;
 
-    int thread_queue_size;
-
     /* filesize limit expressed in bytes */
     int64_t limit_filesize;
     atomic_int_least64_t last_filesize;
diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c
index 6b5e4f8b3c..8ada837555 100644
--- a/fftools/ffmpeg_mux_init.c
+++ b/fftools/ffmpeg_mux_init.c
@@ -3047,7 +3047,6 @@  int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
     of->start_time     = o->start_time;
     of->shortest       = o->shortest;
 
-    mux->thread_queue_size = o->thread_queue_size > 0 ? o->thread_queue_size : 8;
     mux->limit_filesize    = o->limit_filesize;
     av_dict_copy(&mux->opts, o->g->format_opts, 0);
 
@@ -3081,7 +3080,7 @@  int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
     }
 
     err = sch_add_mux(sch, muxer_thread, mux_check_init, mux,
-                      !strcmp(oc->oformat->name, "rtp"));
+                      !strcmp(oc->oformat->name, "rtp"), o->thread_queue_size);
     if (err < 0)
         return err;
     mux->sch     = sch;
diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c
index 304c493dcf..7505b0cf90 100644
--- a/fftools/ffmpeg_opt.c
+++ b/fftools/ffmpeg_opt.c
@@ -144,7 +144,7 @@  static void init_options(OptionsContext *o)
     o->limit_filesize = INT64_MAX;
     o->chapters_input_file = INT_MAX;
     o->accurate_seek  = 1;
-    o->thread_queue_size = -1;
+    o->thread_queue_size = 0;
     o->input_sync_ref = -1;
     o->find_stream_info = 1;
     o->shortest_buf_duration = 10.f;
diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
index 4fc5a33941..62a40c6057 100644
--- a/fftools/ffmpeg_sched.c
+++ b/fftools/ffmpeg_sched.c
@@ -218,6 +218,7 @@  typedef struct SchMux {
      */
     atomic_int          mux_started;
     ThreadQueue        *queue;
+    unsigned            queue_size;
 
     AVPacket           *sub_heartbeat_pkt;
 } SchMux;
@@ -358,6 +359,8 @@  static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_si
     ThreadQueue *tq;
     ObjPool *op;
 
+    queue_size = queue_size > 0 ? queue_size : 8;
+
     op = (type == QUEUE_PACKETS) ? objpool_alloc_packets() :
                                    objpool_alloc_frames();
     if (!op)
@@ -653,7 +656,7 @@  static const AVClass sch_mux_class = {
 };
 
 int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
-                void *arg, int sdp_auto)
+                void *arg, int sdp_auto, unsigned thread_queue_size)
 {
     const unsigned idx = sch->nb_mux;
 
@@ -667,6 +670,7 @@  int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
     mux             = &sch->mux[idx];
     mux->class      = &sch_mux_class;
     mux->init       = init;
+    mux->queue_size = thread_queue_size;
 
     task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg);
 
@@ -773,7 +777,7 @@  int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx,
     if (!dec->send_frame)
         return AVERROR(ENOMEM);
 
-    ret = queue_alloc(&dec->queue, 1, 1, QUEUE_PACKETS);
+    ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS);
     if (ret < 0)
         return ret;
 
@@ -813,7 +817,7 @@  int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx,
 
     task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
 
-    ret = queue_alloc(&enc->queue, 1, 1, QUEUE_FRAMES);
+    ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES);
     if (ret < 0)
         return ret;
 
@@ -861,7 +865,7 @@  int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
     if (ret < 0)
         return ret;
 
-    ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 1, QUEUE_FRAMES);
+    ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES);
     if (ret < 0)
         return ret;
 
@@ -1313,7 +1317,8 @@  int sch_start(Scheduler *sch)
             }
         }
 
-        ret = queue_alloc(&mux->queue, mux->nb_streams, 1, QUEUE_PACKETS);
+        ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size,
+                          QUEUE_PACKETS);
         if (ret < 0)
             return ret;
 
diff --git a/fftools/ffmpeg_sched.h b/fftools/ffmpeg_sched.h
index b167d8d158..d12affa69d 100644
--- a/fftools/ffmpeg_sched.h
+++ b/fftools/ffmpeg_sched.h
@@ -225,12 +225,14 @@  int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
  *             streams in the muxer.
  * @param ctx Muxer state; will be passed to func/init and used for logging.
  * @param sdp_auto Determines automatic SDP writing - see sch_sdp_filename().
+ * @param thread_queue_size number of packets that can be buffered before
+ *                          sending to the muxer blocks
  *
  * @retval ">=0" Index of the newly-created muxer.
  * @retval "<0"  Error code.
  */
 int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
-                void *ctx, int sdp_auto);
+                void *ctx, int sdp_auto, unsigned thread_queue_size);
 /**
  * Add a muxed stream for a previously added muxer.
  *
diff --git a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
index bc9b833799..3a3ec96637 100644
--- a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
+++ b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
@@ -33,8 +33,3 @@ 
 <font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
 >> Safety remains our numb</font>
 
-9
-00:00:03,704 --> 00:00:04,004
-<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
->> Safety remains our number one</font>
-