Message ID | 20220821192636.734-1-lukas.fellechner@gmx.net |
---|---|
State | New |
Headers | show |
Series | [FFmpeg-devel,v2] lavf/dashdec: Multithreaded DASH initialization | expand |
Context | Check | Description |
---|---|---|
yinshiyou/commit_msg_loongarch64 | warning | Please wrap lines in the body of the commit message between 60 and 72 characters. |
andriy/commit_msg_x86 | warning | Please wrap lines in the body of the commit message between 60 and 72 characters. |
yinshiyou/make_loongarch64 | success | Make finished |
yinshiyou/make_fate_loongarch64 | success | Make fate finished |
andriy/make_x86 | success | Make finished |
andriy/make_fate_x86 | success | Make fate finished |
Lukas Fellechner <lukas.fellechner@gmx.net> 于2022年8月22日周一 03:27写道: > > Initializing DASH streams is currently slow, because each individual stream is opened and probed sequentially. With DASH streams often having somewhere between 10-20 streams, this can easily take up to half a minute. This patch adds an "init-threads" option, specifying the max number of threads to use. Multiple worker threads are spun up to massively bring down init times. > --- > libavformat/dashdec.c | 432 +++++++++++++++++++++++++++++++++++++----- > 1 file changed, 386 insertions(+), 46 deletions(-) > > diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c > index 63bf7e96a5..7eca3e3415 100644 > --- a/libavformat/dashdec.c > +++ b/libavformat/dashdec.c > @@ -24,6 +24,7 @@ > #include "libavutil/opt.h" > #include "libavutil/time.h" > #include "libavutil/parseutils.h" > +#include "libavutil/thread.h" > #include "internal.h" > #include "avio_internal.h" > #include "dash.h" > @@ -152,6 +153,8 @@ typedef struct DASHContext { > int max_url_size; > char *cenc_decryption_key; > > + int init_threads; > + > /* Flags for init section*/ > int is_init_section_common_video; > int is_init_section_common_audio; > @@ -1918,22 +1921,40 @@ fail: > return ret; > } > > -static int open_demux_for_component(AVFormatContext *s, struct representation *pls) > +static int open_demux_for_component(AVFormatContext* s, struct representation* pls) > +{ > + int ret = 0; > + > + ret = begin_open_demux_for_component(s, pls); > + if (ret < 0) > + return ret; > + > + ret = end_open_demux_for_component(s, pls); > + > + return ret; > +} > + > +static int begin_open_demux_for_component(AVFormatContext* s, struct representation* pls) > { > int ret = 0; > - int i; > > pls->parent = s; > - pls->cur_seq_no = calc_cur_seg_no(s, pls); > + pls->cur_seq_no = calc_cur_seg_no(s, pls); > > if (!pls->last_seq_no) { > pls->last_seq_no = calc_max_seg_no(pls, s->priv_data); > } > > ret = reopen_demux_for_component(s, pls); > - if (ret < 0) { > - goto fail; > - } > + > + return ret; > +} > + > +static int end_open_demux_for_component(AVFormatContext* s, struct representation* pls) > +{ > + int ret = 0; > + int i; > + > for (i = 0; i < pls->ctx->nb_streams; i++) { > AVStream *st = avformat_new_stream(s, NULL); > AVStream *ist = pls->ctx->streams[i]; > @@ -2015,6 +2036,135 @@ static void move_metadata(AVStream *st, const char *key, char **value) > } > } I look at the new functions likes begin_open_demux_for_component and end_open_demux_for_component maybe can separate patch. maybe you can submit two patch, one is make code clarify, the other one support multithreads, BTW, i saw there have two warning message about patch commit cooments in patchwork: https://patchwork.ffmpeg.org/project/ffmpeg/patch/20220821192636.734-1-lukas.fellechner@gmx.net/ > > +#if HAVE_THREADS > + > +struct work_pool_data > +{ > + AVFormatContext* ctx; > + struct representation* pls; > + struct representation* common_pls; > + pthread_mutex_t* common_mutex; > + pthread_cond_t* common_condition; > + int is_common; > + int is_started; > + int result; > +}; > + > +struct thread_data > +{ > + pthread_t thread; > + pthread_mutex_t* mutex; > + struct work_pool_data* work_pool; > + int work_pool_size; > + int is_started; > +}; > + > +static void *worker_thread(void *ptr) > +{ > + int ret = 0; > + int i; > + struct thread_data* thread_data = (struct thread_data*)ptr; > + struct work_pool_data* work_pool = NULL; > + struct work_pool_data* data = NULL; > + for (;;) { > + > + // get next work item > + pthread_mutex_lock(thread_data->mutex); > + data = NULL; > + work_pool = thread_data->work_pool; > + for (i = 0; i < thread_data->work_pool_size; i++) { > + if (!work_pool->is_started) { > + data = work_pool; > + data->is_started = 1; > + break; > + } > + work_pool++; > + } > + pthread_mutex_unlock(thread_data->mutex); > + > + if (!data) { > + // no more work to do > + return NULL; > + } > + > + // if we are common section provider, init and signal > + if (data->is_common) { > + data->pls->parent = data->ctx; > + ret = update_init_section(data->pls); > + if (ret < 0) { > + pthread_cond_signal(data->common_condition); > + goto end; > + } > + else > + ret = AVERROR(pthread_cond_signal(data->common_condition)); > + } > + > + // if we depend on common section provider, wait for signal and copy > + if (data->common_pls) { > + ret = AVERROR(pthread_cond_wait(data->common_condition, data->common_mutex)); > + if (ret < 0) > + goto end; > + > + if (!data->common_pls->init_sec_buf) { > + goto end; > + ret = AVERROR(EFAULT); > + } > + > + ret = copy_init_section(data->pls, data->common_pls); > + if (ret < 0) > + goto end; > + } > + > + ret = begin_open_demux_for_component(data->ctx, data->pls); > + if (ret < 0) > + goto end; > + > + end: > + data->result = ret; > + } > + > + > + return NULL; > +} > + > +static void create_work_pool_data(AVFormatContext* ctx, int stream_index, > + struct representation* pls, struct representation* common_pls, > + struct work_pool_data* init_data, pthread_mutex_t* common_mutex, > + pthread_cond_t* common_condition) > +{ > + init_data->ctx = ctx; > + init_data->pls = pls; > + init_data->pls->stream_index = stream_index; > + init_data->common_condition = common_condition; > + init_data->common_mutex = common_mutex; > + init_data->result = -1; > + > + if (pls == common_pls) { > + init_data->is_common = 1; > + } > + else if (common_pls) { > + init_data->common_pls = common_pls; > + } > +} > + > +static int start_thread(struct thread_data *thread_data, > + struct work_pool_data *work_pool, int work_pool_size, pthread_mutex_t *mutex) > +{ > + int ret; > + > + thread_data->mutex = mutex; > + thread_data->work_pool = work_pool; > + thread_data->work_pool_size = work_pool_size; > + > + ret = AVERROR(pthread_create(&thread_data->thread, NULL, worker_thread, (void*)thread_data)); > + if (ret == 0) > + thread_data->is_started = 1; > + > + return ret; > +} > + > +#endif > + > static int dash_read_header(AVFormatContext *s) > { > DASHContext *c = s->priv_data; > @@ -2040,63 +2190,252 @@ static int dash_read_header(AVFormatContext *s) > av_dict_set(&c->avio_opts, "seekable", "0", 0); > } > > - if(c->n_videos) > + if (c->n_videos) > c->is_init_section_common_video = is_common_init_section_exist(c->videos, c->n_videos); > > - /* Open the demuxer for video and audio components if available */ > - for (i = 0; i < c->n_videos; i++) { > - rep = c->videos[i]; > - if (i > 0 && c->is_init_section_common_video) { > - ret = copy_init_section(rep, c->videos[0]); > + if (c->n_audios) > + c->is_init_section_common_audio = is_common_init_section_exist(c->audios, c->n_audios); > + > + if (c->n_subtitles) > + c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles); > + > + int threads = 0; > + int nstreams = c->n_videos + c->n_audios + c->n_subtitles; > + > +#if HAVE_THREADS > + threads = FFMIN(nstreams, c->init_threads); > +#endif > + > + if (threads > 1) > + { > +#if HAVE_THREADS > + // alloc data > + struct work_pool_data* init_data = (struct work_pool_data*)av_mallocz(sizeof(struct work_pool_data) * nstreams); > + if (!init_data) > + return AVERROR(ENOMEM); > + > + struct thread_data* thread_data = (struct thread_data*)av_mallocz(sizeof(struct thread_data) * threads); > + if (!thread_data) > + return AVERROR(ENOMEM); > + > + // alloc mutex and conditions > + pthread_mutex_t work_pool_mutex; > + > + pthread_mutex_t common_video_mutex; > + pthread_cond_t common_video_cond; > + > + pthread_mutex_t common_audio_mutex; > + pthread_cond_t common_audio_cond; > + > + pthread_mutex_t common_subtitle_mutex; > + pthread_cond_t common_subtitle_cond; > + > + // init mutex and conditions > + ret = AVERROR(pthread_mutex_init(&work_pool_mutex, NULL)); > + if (ret < 0) > + goto cleanup; > + > + if (c->is_init_section_common_video) { > + ret = AVERROR(pthread_mutex_init(&common_video_mutex, NULL)); > if (ret < 0) > - return ret; > + goto cleanup; > + > + ret = AVERROR(pthread_cond_init(&common_video_cond, NULL)); > + if (ret < 0) > + goto cleanup; > } > - ret = open_demux_for_component(s, rep); > > - if (ret) > - return ret; > - rep->stream_index = stream_index; > - ++stream_index; > - } > + if (c->is_init_section_common_audio) { > + ret = AVERROR(pthread_mutex_init(&common_audio_mutex, NULL)); > + if (ret < 0) > + goto cleanup; > > - if(c->n_audios) > - c->is_init_section_common_audio = is_common_init_section_exist(c->audios, c->n_audios); > + ret = AVERROR(pthread_cond_init(&common_audio_cond, NULL)); > + if (ret < 0) > + goto cleanup; > + } > > - for (i = 0; i < c->n_audios; i++) { > - rep = c->audios[i]; > - if (i > 0 && c->is_init_section_common_audio) { > - ret = copy_init_section(rep, c->audios[0]); > + if (c->is_init_section_common_subtitle) { > + ret = AVERROR(pthread_mutex_init(&common_subtitle_mutex, NULL)); > if (ret < 0) > - return ret; > + goto cleanup; > + > + ret = AVERROR(pthread_cond_init(&common_subtitle_cond, NULL)); > + if (ret < 0) > + goto cleanup; > } > - ret = open_demux_for_component(s, rep); > > - if (ret) > - return ret; > - rep->stream_index = stream_index; > - ++stream_index; > - } > + // init work pool data > + struct work_pool_data* current_data = init_data; > > - if (c->n_subtitles) > - c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles); > + for (i = 0; i < c->n_videos; i++) { > + create_work_pool_data(s, stream_index, c->videos[i], > + c->is_init_section_common_video ? c->videos[0] : NULL, > + current_data, &common_video_mutex, &common_video_cond); > > - for (i = 0; i < c->n_subtitles; i++) { > - rep = c->subtitles[i]; > - if (i > 0 && c->is_init_section_common_subtitle) { > - ret = copy_init_section(rep, c->subtitles[0]); > + stream_index++; > + current_data++; > + } > + > + for (i = 0; i < c->n_audios; i++) { > + create_work_pool_data(s, stream_index, c->audios[i], > + c->is_init_section_common_audio ? c->audios[0] : NULL, > + current_data, &common_audio_mutex, &common_audio_cond); > + > + stream_index++; > + current_data++; > + } > + > + for (i = 0; i < c->n_subtitles; i++) { > + create_work_pool_data(s, stream_index, c->subtitles[i], > + c->is_init_section_common_subtitle ? c->subtitles[0] : NULL, > + current_data, &common_subtitle_mutex, &common_subtitle_cond); > + > + stream_index++; > + current_data++; > + } > + > + // start threads > + struct thread_data* current_thread = thread_data; > + for (i = 0; i < threads; i++) { > + ret = start_thread(current_thread, init_data, nstreams, &work_pool_mutex); > if (ret < 0) > - return ret; > + goto cleanup; > + > + current_thread++; > } > - ret = open_demux_for_component(s, rep); > > - if (ret) > - return ret; > - rep->stream_index = stream_index; > - ++stream_index; > + cleanup: > + // we need to cleanup even in case of errors, so we need to store results of init, run and cleanup > + int initResult = ret; > + int runResult = 0; > + int cleanupResult = 0; > + > + // join threads > + current_thread = thread_data; > + for (i = 0; i < threads; i++) { > + if (current_thread->is_started) { > + ret = AVERROR(pthread_join(current_thread->thread, NULL)); > + if (ret < 0) > + cleanupResult = ret; > + } > + current_thread++; > + } > + > + // finalize streams and collect results > + current_data = init_data; > + for (i = 0; i < nstreams; i++) { > + if (current_data->result < 0) { > + // thread ran into error: collect result > + runResult = current_data->result; > + } > + else { > + // thread success: create streams on AVFormatContext > + ret = end_open_demux_for_component(s, current_data->pls); > + if (ret < 0) > + runResult = ret; > + } > + current_data++; > + } > + > + // cleanup mutex and conditions > + ret = AVERROR(pthread_mutex_destroy(&work_pool_mutex)); > + if (ret < 0) > + cleanupResult = ret; > + > + if (c->is_init_section_common_video) { > + ret = AVERROR(pthread_mutex_destroy(&common_video_mutex)); > + if (ret < 0) > + cleanupResult = ret; > + > + ret = AVERROR(pthread_cond_destroy(&common_video_cond)); > + if (ret < 0) > + cleanupResult = ret; > + } > + > + if (c->is_init_section_common_audio) { > + ret = AVERROR(pthread_mutex_destroy(&common_audio_mutex)); > + if (ret < 0) > + cleanupResult = ret; > + > + ret = AVERROR(pthread_cond_destroy(&common_audio_cond)); > + if (ret < 0) > + cleanupResult = ret; > + } > + > + if (c->is_init_section_common_subtitle) { > + ret = AVERROR(pthread_mutex_destroy(&common_subtitle_mutex)); > + if (ret < 0) > + cleanupResult = ret; > + > + ret = AVERROR(pthread_cond_destroy(&common_subtitle_cond)); > + if (ret < 0) > + cleanupResult = ret; > + } > + > + // return results if errors have occured in one of the phases > + if (initResult < 0) > + return initResult; > + > + if (runResult < 0) > + return runResult; > + > + if (cleanupResult < 0) > + return cleanupResult; > + > +#endif > } > + else > + { > + /* Open the demuxer for video and audio components if available */ > + for (i = 0; i < c->n_videos; i++) { > + rep = c->videos[i]; > + if (i > 0 && c->is_init_section_common_video) { > + ret = copy_init_section(rep, c->videos[0]); > + if (ret < 0) > + return ret; > + } > + ret = open_demux_for_component(s, rep); > > - if (!stream_index) > - return AVERROR_INVALIDDATA; > + if (ret) > + return ret; > + rep->stream_index = stream_index; > + ++stream_index; > + } > + > + for (i = 0; i < c->n_audios; i++) { > + rep = c->audios[i]; > + if (i > 0 && c->is_init_section_common_audio) { > + ret = copy_init_section(rep, c->audios[0]); > + if (ret < 0) > + return ret; > + } > + ret = open_demux_for_component(s, rep); > + > + if (ret) > + return ret; > + rep->stream_index = stream_index; > + ++stream_index; > + } > + > + for (i = 0; i < c->n_subtitles; i++) { > + rep = c->subtitles[i]; > + if (i > 0 && c->is_init_section_common_subtitle) { > + ret = copy_init_section(rep, c->subtitles[0]); > + if (ret < 0) > + return ret; > + } > + ret = open_demux_for_component(s, rep); > + > + if (ret) > + return ret; > + rep->stream_index = stream_index; > + ++stream_index; > + } > + > + if (!stream_index) > + return AVERROR_INVALIDDATA; > + } > > /* Create a program */ > program = av_new_program(s, 0); > @@ -2349,6 +2688,7 @@ static const AVOption dash_options[] = { > {.str = "aac,m4a,m4s,m4v,mov,mp4,webm,ts"}, > INT_MIN, INT_MAX, FLAGS}, > { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS }, > + { "init_threads", "Number of threads to use for initializing the DASH stream", OFFSET(init_threads), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 64, FLAGS }, > {NULL} > }; > > -- > 2.31.1.windows.1 > > _______________________________________________ > ffmpeg-devel mailing list > ffmpeg-devel@ffmpeg.org > https://ffmpeg.org/mailman/listinfo/ffmpeg-devel > > To unsubscribe, visit link above, or email > ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
Gesendet: Dienstag, 23. August 2022 um 05:19 Uhr Von: "Steven Liu" <lingjiujianke@gmail.com> An: "FFmpeg development discussions and patches" <ffmpeg-devel@ffmpeg.org> Betreff: Re: [FFmpeg-devel] [PATCH v2] lavf/dashdec: Multithreaded DASH initialization Lukas Fellechner <lukas.fellechner@gmx.net> 于2022年8月22日周一 03:27写道: > > I look at the new functions likes begin_open_demux_for_component and > end_open_demux_for_component maybe can separate patch. > maybe you can submit two patch, one is make code clarify, the other > one support multithreads, Good idea. I split up the patch into three parts actually. Git seems to be pretty bad in handling indentation changes, which made the first patch look awful, although there were only very small changes. So I pulled out the indentation change into a separate patch. Not sure if that is a good idea, but it makes the other two patches much more readable.
diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c index 63bf7e96a5..7eca3e3415 100644 --- a/libavformat/dashdec.c +++ b/libavformat/dashdec.c @@ -24,6 +24,7 @@ #include "libavutil/opt.h" #include "libavutil/time.h" #include "libavutil/parseutils.h" +#include "libavutil/thread.h" #include "internal.h" #include "avio_internal.h" #include "dash.h" @@ -152,6 +153,8 @@ typedef struct DASHContext { int max_url_size; char *cenc_decryption_key; + int init_threads; + /* Flags for init section*/ int is_init_section_common_video; int is_init_section_common_audio; @@ -1918,22 +1921,40 @@ fail: return ret; } -static int open_demux_for_component(AVFormatContext *s, struct representation *pls) +static int open_demux_for_component(AVFormatContext* s, struct representation* pls) +{ + int ret = 0; + + ret = begin_open_demux_for_component(s, pls); + if (ret < 0) + return ret; + + ret = end_open_demux_for_component(s, pls); + + return ret; +} + +static int begin_open_demux_for_component(AVFormatContext* s, struct representation* pls) { int ret = 0; - int i; pls->parent = s; - pls->cur_seq_no = calc_cur_seg_no(s, pls); + pls->cur_seq_no = calc_cur_seg_no(s, pls); if (!pls->last_seq_no) { pls->last_seq_no = calc_max_seg_no(pls, s->priv_data); } ret = reopen_demux_for_component(s, pls); - if (ret < 0) { - goto fail; - } + + return ret; +} + +static int end_open_demux_for_component(AVFormatContext* s, struct representation* pls) +{ + int ret = 0; + int i; + for (i = 0; i < pls->ctx->nb_streams; i++) { AVStream *st = avformat_new_stream(s, NULL); AVStream *ist = pls->ctx->streams[i]; @@ -2015,6 +2036,135 @@ static void move_metadata(AVStream *st, const char *key, char **value) } } +#if HAVE_THREADS + +struct work_pool_data +{ + AVFormatContext* ctx; + struct representation* pls; + struct representation* common_pls; + pthread_mutex_t* common_mutex; + pthread_cond_t* common_condition; + int is_common; + int is_started; + int result; +}; + +struct thread_data +{ + pthread_t thread; + pthread_mutex_t* mutex; + struct work_pool_data* work_pool; + int work_pool_size; + int is_started; +}; + +static void *worker_thread(void *ptr) +{ + int ret = 0; + int i; + struct thread_data* thread_data = (struct thread_data*)ptr; + struct work_pool_data* work_pool = NULL; + struct work_pool_data* data = NULL; + for (;;) { + + // get next work item + pthread_mutex_lock(thread_data->mutex); + data = NULL; + work_pool = thread_data->work_pool; + for (i = 0; i < thread_data->work_pool_size; i++) { + if (!work_pool->is_started) { + data = work_pool; + data->is_started = 1; + break; + } + work_pool++; + } + pthread_mutex_unlock(thread_data->mutex); + + if (!data) { + // no more work to do + return NULL; + } + + // if we are common section provider, init and signal + if (data->is_common) { + data->pls->parent = data->ctx; + ret = update_init_section(data->pls); + if (ret < 0) { + pthread_cond_signal(data->common_condition); + goto end; + } + else + ret = AVERROR(pthread_cond_signal(data->common_condition)); + } + + // if we depend on common section provider, wait for signal and copy + if (data->common_pls) { + ret = AVERROR(pthread_cond_wait(data->common_condition, data->common_mutex)); + if (ret < 0) + goto end; + + if (!data->common_pls->init_sec_buf) { + goto end; + ret = AVERROR(EFAULT); + } + + ret = copy_init_section(data->pls, data->common_pls); + if (ret < 0) + goto end; + } + + ret = begin_open_demux_for_component(data->ctx, data->pls); + if (ret < 0) + goto end; + + end: + data->result = ret; + } + + + return NULL; +} + +static void create_work_pool_data(AVFormatContext* ctx, int stream_index, + struct representation* pls, struct representation* common_pls, + struct work_pool_data* init_data, pthread_mutex_t* common_mutex, + pthread_cond_t* common_condition) +{ + init_data->ctx = ctx; + init_data->pls = pls; + init_data->pls->stream_index = stream_index; + init_data->common_condition = common_condition; + init_data->common_mutex = common_mutex; + init_data->result = -1; + + if (pls == common_pls) { + init_data->is_common = 1; + } + else if (common_pls) { + init_data->common_pls = common_pls; + } +} + +static int start_thread(struct thread_data *thread_data, + struct work_pool_data *work_pool, int work_pool_size, pthread_mutex_t *mutex) +{ + int ret; + + thread_data->mutex = mutex; + thread_data->work_pool = work_pool; + thread_data->work_pool_size = work_pool_size; + + ret = AVERROR(pthread_create(&thread_data->thread, NULL, worker_thread, (void*)thread_data)); + if (ret == 0) + thread_data->is_started = 1; + + return ret; +} + +#endif + static int dash_read_header(AVFormatContext *s) { DASHContext *c = s->priv_data; @@ -2040,63 +2190,252 @@ static int dash_read_header(AVFormatContext *s) av_dict_set(&c->avio_opts, "seekable", "0", 0); } - if(c->n_videos) + if (c->n_videos) c->is_init_section_common_video = is_common_init_section_exist(c->videos, c->n_videos); - /* Open the demuxer for video and audio components if available */ - for (i = 0; i < c->n_videos; i++) { - rep = c->videos[i]; - if (i > 0 && c->is_init_section_common_video) { - ret = copy_init_section(rep, c->videos[0]); + if (c->n_audios) + c->is_init_section_common_audio = is_common_init_section_exist(c->audios, c->n_audios); + + if (c->n_subtitles) + c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles); + + int threads = 0; + int nstreams = c->n_videos + c->n_audios + c->n_subtitles; + +#if HAVE_THREADS + threads = FFMIN(nstreams, c->init_threads); +#endif + + if (threads > 1) + { +#if HAVE_THREADS + // alloc data + struct work_pool_data* init_data = (struct work_pool_data*)av_mallocz(sizeof(struct work_pool_data) * nstreams); + if (!init_data) + return AVERROR(ENOMEM); + + struct thread_data* thread_data = (struct thread_data*)av_mallocz(sizeof(struct thread_data) * threads); + if (!thread_data) + return AVERROR(ENOMEM); + + // alloc mutex and conditions + pthread_mutex_t work_pool_mutex; + + pthread_mutex_t common_video_mutex; + pthread_cond_t common_video_cond; + + pthread_mutex_t common_audio_mutex; + pthread_cond_t common_audio_cond; + + pthread_mutex_t common_subtitle_mutex; + pthread_cond_t common_subtitle_cond; + + // init mutex and conditions + ret = AVERROR(pthread_mutex_init(&work_pool_mutex, NULL)); + if (ret < 0) + goto cleanup; + + if (c->is_init_section_common_video) { + ret = AVERROR(pthread_mutex_init(&common_video_mutex, NULL)); if (ret < 0) - return ret; + goto cleanup; + + ret = AVERROR(pthread_cond_init(&common_video_cond, NULL)); + if (ret < 0) + goto cleanup; } - ret = open_demux_for_component(s, rep); - if (ret) - return ret; - rep->stream_index = stream_index; - ++stream_index; - } + if (c->is_init_section_common_audio) { + ret = AVERROR(pthread_mutex_init(&common_audio_mutex, NULL)); + if (ret < 0) + goto cleanup; - if(c->n_audios) - c->is_init_section_common_audio = is_common_init_section_exist(c->audios, c->n_audios); + ret = AVERROR(pthread_cond_init(&common_audio_cond, NULL)); + if (ret < 0) + goto cleanup; + } - for (i = 0; i < c->n_audios; i++) { - rep = c->audios[i]; - if (i > 0 && c->is_init_section_common_audio) { - ret = copy_init_section(rep, c->audios[0]); + if (c->is_init_section_common_subtitle) { + ret = AVERROR(pthread_mutex_init(&common_subtitle_mutex, NULL)); if (ret < 0) - return ret; + goto cleanup; + + ret = AVERROR(pthread_cond_init(&common_subtitle_cond, NULL)); + if (ret < 0) + goto cleanup; } - ret = open_demux_for_component(s, rep); - if (ret) - return ret; - rep->stream_index = stream_index; - ++stream_index; - } + // init work pool data + struct work_pool_data* current_data = init_data; - if (c->n_subtitles) - c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles); + for (i = 0; i < c->n_videos; i++) { + create_work_pool_data(s, stream_index, c->videos[i], + c->is_init_section_common_video ? c->videos[0] : NULL, + current_data, &common_video_mutex, &common_video_cond); - for (i = 0; i < c->n_subtitles; i++) { - rep = c->subtitles[i]; - if (i > 0 && c->is_init_section_common_subtitle) { - ret = copy_init_section(rep, c->subtitles[0]); + stream_index++; + current_data++; + } + + for (i = 0; i < c->n_audios; i++) { + create_work_pool_data(s, stream_index, c->audios[i], + c->is_init_section_common_audio ? c->audios[0] : NULL, + current_data, &common_audio_mutex, &common_audio_cond); + + stream_index++; + current_data++; + } + + for (i = 0; i < c->n_subtitles; i++) { + create_work_pool_data(s, stream_index, c->subtitles[i], + c->is_init_section_common_subtitle ? c->subtitles[0] : NULL, + current_data, &common_subtitle_mutex, &common_subtitle_cond); + + stream_index++; + current_data++; + } + + // start threads + struct thread_data* current_thread = thread_data; + for (i = 0; i < threads; i++) { + ret = start_thread(current_thread, init_data, nstreams, &work_pool_mutex); if (ret < 0) - return ret; + goto cleanup; + + current_thread++; } - ret = open_demux_for_component(s, rep); - if (ret) - return ret; - rep->stream_index = stream_index; - ++stream_index; + cleanup: + // we need to cleanup even in case of errors, so we need to store results of init, run and cleanup + int initResult = ret; + int runResult = 0; + int cleanupResult = 0; + + // join threads + current_thread = thread_data; + for (i = 0; i < threads; i++) { + if (current_thread->is_started) { + ret = AVERROR(pthread_join(current_thread->thread, NULL)); + if (ret < 0) + cleanupResult = ret; + } + current_thread++; + } + + // finalize streams and collect results + current_data = init_data; + for (i = 0; i < nstreams; i++) { + if (current_data->result < 0) { + // thread ran into error: collect result + runResult = current_data->result; + } + else { + // thread success: create streams on AVFormatContext + ret = end_open_demux_for_component(s, current_data->pls); + if (ret < 0) + runResult = ret; + } + current_data++; + } + + // cleanup mutex and conditions + ret = AVERROR(pthread_mutex_destroy(&work_pool_mutex)); + if (ret < 0) + cleanupResult = ret; + + if (c->is_init_section_common_video) { + ret = AVERROR(pthread_mutex_destroy(&common_video_mutex)); + if (ret < 0) + cleanupResult = ret; + + ret = AVERROR(pthread_cond_destroy(&common_video_cond)); + if (ret < 0) + cleanupResult = ret; + } + + if (c->is_init_section_common_audio) { + ret = AVERROR(pthread_mutex_destroy(&common_audio_mutex)); + if (ret < 0) + cleanupResult = ret; + + ret = AVERROR(pthread_cond_destroy(&common_audio_cond)); + if (ret < 0) + cleanupResult = ret; + } + + if (c->is_init_section_common_subtitle) { + ret = AVERROR(pthread_mutex_destroy(&common_subtitle_mutex)); + if (ret < 0) + cleanupResult = ret; + + ret = AVERROR(pthread_cond_destroy(&common_subtitle_cond)); + if (ret < 0) + cleanupResult = ret; + } + + // return results if errors have occured in one of the phases + if (initResult < 0) + return initResult; + + if (runResult < 0) + return runResult; + + if (cleanupResult < 0) + return cleanupResult; + +#endif } + else + { + /* Open the demuxer for video and audio components if available */ + for (i = 0; i < c->n_videos; i++) { + rep = c->videos[i]; + if (i > 0 && c->is_init_section_common_video) { + ret = copy_init_section(rep, c->videos[0]); + if (ret < 0) + return ret; + } + ret = open_demux_for_component(s, rep); - if (!stream_index) - return AVERROR_INVALIDDATA; + if (ret) + return ret; + rep->stream_index = stream_index; + ++stream_index; + } + + for (i = 0; i < c->n_audios; i++) { + rep = c->audios[i]; + if (i > 0 && c->is_init_section_common_audio) { + ret = copy_init_section(rep, c->audios[0]); + if (ret < 0) + return ret; + } + ret = open_demux_for_component(s, rep); + + if (ret) + return ret; + rep->stream_index = stream_index; + ++stream_index; + } + + for (i = 0; i < c->n_subtitles; i++) { + rep = c->subtitles[i]; + if (i > 0 && c->is_init_section_common_subtitle) { + ret = copy_init_section(rep, c->subtitles[0]); + if (ret < 0) + return ret; + } + ret = open_demux_for_component(s, rep); + + if (ret) + return ret; + rep->stream_index = stream_index; + ++stream_index; + } + + if (!stream_index) + return AVERROR_INVALIDDATA; + } /* Create a program */ program = av_new_program(s, 0); @@ -2349,6 +2688,7 @@ static const AVOption dash_options[] = { {.str = "aac,m4a,m4s,m4v,mov,mp4,webm,ts"}, INT_MIN, INT_MAX, FLAGS}, { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS }, + { "init_threads", "Number of threads to use for initializing the DASH stream", OFFSET(init_threads), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 64, FLAGS }, {NULL} };