Message ID | 20240124192916.22857-1-anton@khirnov.net |
---|---|
State | Accepted |
Commit | e0da916b8f5b079a4865eef7f64863f50785463d |
Headers | show |
Series | [FFmpeg-devel] fftools/ffmpeg: optimize inter-thread queue sizes | expand |
Context | Check | Description |
---|---|---|
yinshiyou/configure_loongarch64 | warning | Failed to apply patch |
andriy/configure_x86 | warning | Failed to apply patch |
Anton Khirnov: > Use 8 packets/frames by default rather than 1, which seems to provide > better throughput. > > Allow -thread_queue_size to set the muxer queue size manually again. > --- > fftools/ffmpeg_mux.h | 2 -- > fftools/ffmpeg_mux_init.c | 3 +-- > fftools/ffmpeg_opt.c | 2 +- > fftools/ffmpeg_sched.c | 15 ++++++++++----- > fftools/ffmpeg_sched.h | 4 +++- > tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat | 5 ----- > 6 files changed, 15 insertions(+), 16 deletions(-) > > diff --git a/fftools/ffmpeg_mux.h b/fftools/ffmpeg_mux.h > index d0be8a51ea..e1b44142cf 100644 > --- a/fftools/ffmpeg_mux.h > +++ b/fftools/ffmpeg_mux.h > @@ -94,8 +94,6 @@ typedef struct Muxer { > > AVDictionary *opts; > > - int thread_queue_size; > - > /* filesize limit expressed in bytes */ > int64_t limit_filesize; > atomic_int_least64_t last_filesize; > diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c > index 6b5e4f8b3c..8ada837555 100644 > --- a/fftools/ffmpeg_mux_init.c > +++ b/fftools/ffmpeg_mux_init.c > @@ -3047,7 +3047,6 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch) > of->start_time = o->start_time; > of->shortest = o->shortest; > > - mux->thread_queue_size = o->thread_queue_size > 0 ? o->thread_queue_size : 8; > mux->limit_filesize = o->limit_filesize; > av_dict_copy(&mux->opts, o->g->format_opts, 0); > > @@ -3081,7 +3080,7 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch) > } > > err = sch_add_mux(sch, muxer_thread, mux_check_init, mux, > - !strcmp(oc->oformat->name, "rtp")); > + !strcmp(oc->oformat->name, "rtp"), o->thread_queue_size); > if (err < 0) > return err; > mux->sch = sch; > diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c > index 304c493dcf..7505b0cf90 100644 > --- a/fftools/ffmpeg_opt.c > +++ b/fftools/ffmpeg_opt.c > @@ -144,7 +144,7 @@ static void init_options(OptionsContext *o) > o->limit_filesize = INT64_MAX; > o->chapters_input_file = INT_MAX; > o->accurate_seek = 1; > - o->thread_queue_size = -1; > + o->thread_queue_size = 0; > o->input_sync_ref = -1; > o->find_stream_info = 1; > o->shortest_buf_duration = 10.f; > diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c > index 4fc5a33941..62a40c6057 100644 > --- a/fftools/ffmpeg_sched.c > +++ b/fftools/ffmpeg_sched.c > @@ -218,6 +218,7 @@ typedef struct SchMux { > */ > atomic_int mux_started; > ThreadQueue *queue; > + unsigned queue_size; > > AVPacket *sub_heartbeat_pkt; > } SchMux; > @@ -358,6 +359,8 @@ static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_si > ThreadQueue *tq; > ObjPool *op; > > + queue_size = queue_size > 0 ? queue_size : 8; > + > op = (type == QUEUE_PACKETS) ? objpool_alloc_packets() : > objpool_alloc_frames(); > if (!op) > @@ -653,7 +656,7 @@ static const AVClass sch_mux_class = { > }; > > int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *), > - void *arg, int sdp_auto) > + void *arg, int sdp_auto, unsigned thread_queue_size) > { > const unsigned idx = sch->nb_mux; > > @@ -667,6 +670,7 @@ int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *), > mux = &sch->mux[idx]; > mux->class = &sch_mux_class; > mux->init = init; > + mux->queue_size = thread_queue_size; > > task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg); > > @@ -773,7 +777,7 @@ int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, > if (!dec->send_frame) > return AVERROR(ENOMEM); > > - ret = queue_alloc(&dec->queue, 1, 1, QUEUE_PACKETS); > + ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS); > if (ret < 0) > return ret; > > @@ -813,7 +817,7 @@ int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx, > > task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx); > > - ret = queue_alloc(&enc->queue, 1, 1, QUEUE_FRAMES); > + ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES); > if (ret < 0) > return ret; > > @@ -861,7 +865,7 @@ int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs, > if (ret < 0) > return ret; > > - ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 1, QUEUE_FRAMES); > + ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES); > if (ret < 0) > return ret; > > @@ -1313,7 +1317,8 @@ int sch_start(Scheduler *sch) > } > } > > - ret = queue_alloc(&mux->queue, mux->nb_streams, 1, QUEUE_PACKETS); > + ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size, > + QUEUE_PACKETS); > if (ret < 0) > return ret; > > diff --git a/fftools/ffmpeg_sched.h b/fftools/ffmpeg_sched.h > index b167d8d158..d12affa69d 100644 > --- a/fftools/ffmpeg_sched.h > +++ b/fftools/ffmpeg_sched.h > @@ -225,12 +225,14 @@ int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs, > * streams in the muxer. > * @param ctx Muxer state; will be passed to func/init and used for logging. > * @param sdp_auto Determines automatic SDP writing - see sch_sdp_filename(). > + * @param thread_queue_size number of packets that can be buffered before > + * sending to the muxer blocks > * > * @retval ">=0" Index of the newly-created muxer. > * @retval "<0" Error code. > */ > int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *), > - void *ctx, int sdp_auto); > + void *ctx, int sdp_auto, unsigned thread_queue_size); > /** > * Add a muxed stream for a previously added muxer. > * > diff --git a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat > index bc9b833799..3a3ec96637 100644 > --- a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat > +++ b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat > @@ -33,8 +33,3 @@ > <font face="Monospace">{\an7}(<i> inaudible radio chatter</i> ) > >> Safety remains our numb</font> > > -9 > -00:00:03,704 --> 00:00:04,004 > -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> ) > ->> Safety remains our number one</font> > - Why does the output of this test change? - Andreas
Quoting Andreas Rheinhardt (2024-01-24 20:35:48) > > diff --git a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat > > index bc9b833799..3a3ec96637 100644 > > --- a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat > > +++ b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat > > @@ -33,8 +33,3 @@ > > <font face="Monospace">{\an7}(<i> inaudible radio chatter</i> ) > > >> Safety remains our numb</font> > > > > -9 > > -00:00:03,704 --> 00:00:04,004 > > -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> ) > > ->> Safety remains our number one</font> > > - > > Why does the output of this test change? Because the feature being tested is semi-broken - exact output depends on the amount of buffering between the decoder and the muxer. We also get rare random failures in this test in current master. So far nobody has suggested a way of fixing it properly.
diff --git a/fftools/ffmpeg_mux.h b/fftools/ffmpeg_mux.h index d0be8a51ea..e1b44142cf 100644 --- a/fftools/ffmpeg_mux.h +++ b/fftools/ffmpeg_mux.h @@ -94,8 +94,6 @@ typedef struct Muxer { AVDictionary *opts; - int thread_queue_size; - /* filesize limit expressed in bytes */ int64_t limit_filesize; atomic_int_least64_t last_filesize; diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c index 6b5e4f8b3c..8ada837555 100644 --- a/fftools/ffmpeg_mux_init.c +++ b/fftools/ffmpeg_mux_init.c @@ -3047,7 +3047,6 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch) of->start_time = o->start_time; of->shortest = o->shortest; - mux->thread_queue_size = o->thread_queue_size > 0 ? o->thread_queue_size : 8; mux->limit_filesize = o->limit_filesize; av_dict_copy(&mux->opts, o->g->format_opts, 0); @@ -3081,7 +3080,7 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch) } err = sch_add_mux(sch, muxer_thread, mux_check_init, mux, - !strcmp(oc->oformat->name, "rtp")); + !strcmp(oc->oformat->name, "rtp"), o->thread_queue_size); if (err < 0) return err; mux->sch = sch; diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c index 304c493dcf..7505b0cf90 100644 --- a/fftools/ffmpeg_opt.c +++ b/fftools/ffmpeg_opt.c @@ -144,7 +144,7 @@ static void init_options(OptionsContext *o) o->limit_filesize = INT64_MAX; o->chapters_input_file = INT_MAX; o->accurate_seek = 1; - o->thread_queue_size = -1; + o->thread_queue_size = 0; o->input_sync_ref = -1; o->find_stream_info = 1; o->shortest_buf_duration = 10.f; diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c index 4fc5a33941..62a40c6057 100644 --- a/fftools/ffmpeg_sched.c +++ b/fftools/ffmpeg_sched.c @@ -218,6 +218,7 @@ typedef struct SchMux { */ atomic_int mux_started; ThreadQueue *queue; + unsigned queue_size; AVPacket *sub_heartbeat_pkt; } SchMux; @@ -358,6 +359,8 @@ static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_si ThreadQueue *tq; ObjPool *op; + queue_size = queue_size > 0 ? queue_size : 8; + op = (type == QUEUE_PACKETS) ? objpool_alloc_packets() : objpool_alloc_frames(); if (!op) @@ -653,7 +656,7 @@ static const AVClass sch_mux_class = { }; int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *), - void *arg, int sdp_auto) + void *arg, int sdp_auto, unsigned thread_queue_size) { const unsigned idx = sch->nb_mux; @@ -667,6 +670,7 @@ int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *), mux = &sch->mux[idx]; mux->class = &sch_mux_class; mux->init = init; + mux->queue_size = thread_queue_size; task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg); @@ -773,7 +777,7 @@ int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, if (!dec->send_frame) return AVERROR(ENOMEM); - ret = queue_alloc(&dec->queue, 1, 1, QUEUE_PACKETS); + ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS); if (ret < 0) return ret; @@ -813,7 +817,7 @@ int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx, task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx); - ret = queue_alloc(&enc->queue, 1, 1, QUEUE_FRAMES); + ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES); if (ret < 0) return ret; @@ -861,7 +865,7 @@ int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs, if (ret < 0) return ret; - ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 1, QUEUE_FRAMES); + ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES); if (ret < 0) return ret; @@ -1313,7 +1317,8 @@ int sch_start(Scheduler *sch) } } - ret = queue_alloc(&mux->queue, mux->nb_streams, 1, QUEUE_PACKETS); + ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size, + QUEUE_PACKETS); if (ret < 0) return ret; diff --git a/fftools/ffmpeg_sched.h b/fftools/ffmpeg_sched.h index b167d8d158..d12affa69d 100644 --- a/fftools/ffmpeg_sched.h +++ b/fftools/ffmpeg_sched.h @@ -225,12 +225,14 @@ int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs, * streams in the muxer. * @param ctx Muxer state; will be passed to func/init and used for logging. * @param sdp_auto Determines automatic SDP writing - see sch_sdp_filename(). + * @param thread_queue_size number of packets that can be buffered before + * sending to the muxer blocks * * @retval ">=0" Index of the newly-created muxer. * @retval "<0" Error code. */ int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *), - void *ctx, int sdp_auto); + void *ctx, int sdp_auto, unsigned thread_queue_size); /** * Add a muxed stream for a previously added muxer. * diff --git a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat index bc9b833799..3a3ec96637 100644 --- a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat +++ b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat @@ -33,8 +33,3 @@ <font face="Monospace">{\an7}(<i> inaudible radio chatter</i> ) >> Safety remains our numb</font> -9 -00:00:03,704 --> 00:00:04,004 -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> ) ->> Safety remains our number one</font> -