Message ID | 20220823190326.249-3-lukas.fellechner@gmx.net |
---|---|
State | New |
Headers | show |
Series | lavf/dashdec: Multithreaded DASH initialization | expand |
Context | Check | Description |
---|---|---|
yinshiyou/make_loongarch64 | success | Make finished |
yinshiyou/make_fate_loongarch64 | success | Make fate finished |
andriy/make_x86 | success | Make finished |
andriy/make_fate_x86 | success | Make fate finished |
Lukas Fellechner: > This patch adds an "init-threads" option, specifying the max > number of threads to use. Multiple worker threads are spun up > to massively bring down init times. > --- > libavformat/dashdec.c | 351 +++++++++++++++++++++++++++++++++++++++++- > 1 file changed, 350 insertions(+), 1 deletion(-) > > diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c > index e82da45e43..20f2557ea3 100644 > --- a/libavformat/dashdec.c > +++ b/libavformat/dashdec.c > @@ -24,6 +24,7 @@ > #include "libavutil/opt.h" > #include "libavutil/time.h" > #include "libavutil/parseutils.h" > +#include "libavutil/thread.h" > #include "internal.h" > #include "avio_internal.h" > #include "dash.h" > @@ -152,6 +153,8 @@ typedef struct DASHContext { > int max_url_size; > char *cenc_decryption_key; > > + int init_threads; > + > /* Flags for init section*/ > int is_init_section_common_video; > int is_init_section_common_audio; > @@ -2033,6 +2036,331 @@ static void move_metadata(AVStream *st, const char *key, char **value) > } > } > > +#if HAVE_THREADS > + > +struct work_pool_data > +{ > + AVFormatContext *ctx; > + struct representation *pls; > + struct representation *common_pls; > + pthread_mutex_t *common_mutex; > + pthread_cond_t *common_condition; > + int is_common; > + int is_started; > + int result; > +}; > + > +struct thread_data This is against our naming conventions: CamelCase for struct tags and typedefs, lowercase names with underscore for variable names. > +{ > + pthread_t thread; > + pthread_mutex_t *mutex; > + struct work_pool_data *work_pool; > + int work_pool_size; > + int is_started; > + int has_error; > +}; > + > +static void *worker_thread(void *ptr) > +{ > + int ret = 0; > + int i; > + struct thread_data *thread_data = (struct thread_data*)ptr; > + struct work_pool_data *work_pool = NULL; > + struct work_pool_data *data = NULL; > + for (;;) { > + > + // get next work item unless there was an error > + pthread_mutex_lock(thread_data->mutex); > + data = NULL; > + if (!thread_data->has_error) { > + work_pool = thread_data->work_pool; > + for (i = 0; i < thread_data->work_pool_size; i++) { > + if (!work_pool->is_started) { > + data = work_pool; > + data->is_started = 1; > + break; > + } > + work_pool++; > + } > + } > + pthread_mutex_unlock(thread_data->mutex); > + > + if (!data) { > + // no more work to do > + return NULL; > + } > + > + // if we are common section provider, init and signal > + if (data->is_common) { > + data->pls->parent = data->ctx; > + ret = update_init_section(data->pls); > + if (ret < 0) { > + pthread_cond_signal(data->common_condition); > + goto end; > + } > + else > + ret = AVERROR(pthread_cond_signal(data->common_condition)); > + } > + > + // if we depend on common section provider, wait for signal and copy > + if (data->common_pls) { > + ret = AVERROR(pthread_cond_wait(data->common_condition, data->common_mutex)); > + if (ret < 0) > + goto end; > + > + if (!data->common_pls->init_sec_buf) { > + goto end; > + ret = AVERROR(EFAULT); > + } > + > + ret = copy_init_section(data->pls, data->common_pls); > + if (ret < 0) > + goto end; > + } > + > + ret = begin_open_demux_for_component(data->ctx, data->pls); > + if (ret < 0) > + goto end; > + > + end: > + data->result = ret; > + > + // notify error to other threads and exit > + if (ret < 0) { > + pthread_mutex_lock(thread_data->mutex); > + thread_data->has_error = 1; > + pthread_mutex_unlock(thread_data->mutex); > + return NULL; > + } > + } > + > + > + return NULL; > +} > + > +static void create_work_pool_data(AVFormatContext *ctx, int stream_index, > + struct representation *pls, struct representation *common_pls, > + struct work_pool_data *init_data, pthread_mutex_t *common_mutex, > + pthread_cond_t *common_condition) > +{ > + init_data->ctx = ctx; > + init_data->pls = pls; > + init_data->pls->stream_index = stream_index; > + init_data->common_condition = common_condition; > + init_data->common_mutex = common_mutex; > + init_data->result = -1; > + > + if (pls == common_pls) { > + init_data->is_common = 1; > + } > + else if (common_pls) { > + init_data->common_pls = common_pls; > + } > +} > + > +static int start_thread(struct thread_data *thread_data, > + struct work_pool_data *work_pool, int work_pool_size, pthread_mutex_t *mutex) > +{ > + int ret; > + > + thread_data->mutex = mutex; > + thread_data->work_pool = work_pool; > + thread_data->work_pool_size = work_pool_size; > + > + ret = AVERROR(pthread_create(&thread_data->thread, NULL, worker_thread, (void*)thread_data)); > + if (ret == 0) > + thread_data->is_started = 1; > + > + return ret; > +} > + > +static int init_streams_multithreaded(AVFormatContext *s, int nstreams, int threads) > +{ > + DASHContext *c = s->priv_data; > + int ret = 0; > + int stream_index = 0; > + int i; We allow "for (int i = 0;" > + > + // alloc data > + struct work_pool_data *init_data = (struct work_pool_data*)av_mallocz(sizeof(struct work_pool_data) * nstreams); > + if (!init_data) > + return AVERROR(ENOMEM); > + > + struct thread_data *thread_data = (struct thread_data*)av_mallocz(sizeof(struct thread_data) * threads); > + if (!thread_data) > + return AVERROR(ENOMEM); 1. init_data leaks here on error. 2. In fact, it seems to me that both init_data and thread_data are nowhere freed. > + > + // alloc mutex and conditions > + pthread_mutex_t work_pool_mutex; > + > + pthread_mutex_t common_video_mutex; > + pthread_cond_t common_video_cond; > + > + pthread_mutex_t common_audio_mutex; > + pthread_cond_t common_audio_cond; > + > + pthread_mutex_t common_subtitle_mutex; > + pthread_cond_t common_subtitle_cond; > + > + // init mutex and conditions > + ret = AVERROR(pthread_mutex_init(&work_pool_mutex, NULL)); > + if (ret < 0) > + goto cleanup; > + > + if (c->is_init_section_common_video) { > + ret = AVERROR(pthread_mutex_init(&common_video_mutex, NULL)); > + if (ret < 0) > + goto cleanup; > + > + ret = AVERROR(pthread_cond_init(&common_video_cond, NULL)); > + if (ret < 0) > + goto cleanup; > + } > + > + if (c->is_init_section_common_audio) { > + ret = AVERROR(pthread_mutex_init(&common_audio_mutex, NULL)); > + if (ret < 0) > + goto cleanup; > + > + ret = AVERROR(pthread_cond_init(&common_audio_cond, NULL)); > + if (ret < 0) > + goto cleanup; > + } > + > + if (c->is_init_section_common_subtitle) { > + ret = AVERROR(pthread_mutex_init(&common_subtitle_mutex, NULL)); > + if (ret < 0) > + goto cleanup; > + > + ret = AVERROR(pthread_cond_init(&common_subtitle_cond, NULL)); > + if (ret < 0) > + goto cleanup; > + } > + > + // init work pool data > + struct work_pool_data* current_data = init_data; > + > + for (i = 0; i < c->n_videos; i++) { > + create_work_pool_data(s, stream_index, c->videos[i], > + c->is_init_section_common_video ? c->videos[0] : NULL, > + current_data, &common_video_mutex, &common_video_cond); > + > + stream_index++; > + current_data++; > + } > + > + for (i = 0; i < c->n_audios; i++) { > + create_work_pool_data(s, stream_index, c->audios[i], > + c->is_init_section_common_audio ? c->audios[0] : NULL, > + current_data, &common_audio_mutex, &common_audio_cond); > + > + stream_index++; > + current_data++; > + } > + > + for (i = 0; i < c->n_subtitles; i++) { > + create_work_pool_data(s, stream_index, c->subtitles[i], > + c->is_init_section_common_subtitle ? c->subtitles[0] : NULL, > + current_data, &common_subtitle_mutex, &common_subtitle_cond); > + > + stream_index++; > + current_data++; > + } This is very repetitive. > + > + // start threads > + struct thread_data *current_thread = thread_data; > + for (i = 0; i < threads; i++) { > + ret = start_thread(current_thread, init_data, nstreams, &work_pool_mutex); > + if (ret < 0) > + goto cleanup; > + > + current_thread++; > + } > + > +cleanup: > + // we need to cleanup even in case of errors, so we need to store results of init, run and cleanup > + int initResult = ret; > + int runResult = 0; > + int cleanupResult = 0; > + > + // join threads > + current_thread = thread_data; > + for (i = 0; i < threads; i++) { > + if (current_thread->is_started) { > + ret = AVERROR(pthread_join(current_thread->thread, NULL)); > + if (ret < 0) > + cleanupResult = ret; > + } > + current_thread++; > + } > + > + // finalize streams and collect results > + current_data = init_data; > + for (i = 0; i < nstreams; i++) { > + if (current_data->result < 0) { > + // thread ran into error: collect result and break > + runResult = current_data->result; > + break; > + } > + else { > + // thread success: create streams on AVFormatContext > + ret = end_open_demux_for_component(s, current_data->pls); > + if (ret < 0) > + runResult = ret; > + } > + current_data++; > + } > + > + // cleanup mutex and conditions > + ret = AVERROR(pthread_mutex_destroy(&work_pool_mutex)); > + if (ret < 0) > + cleanupResult = ret; > + > + if (c->is_init_section_common_video) { > + ret = AVERROR(pthread_mutex_destroy(&common_video_mutex)); > + if (ret < 0) > + cleanupResult = ret; > + > + ret = AVERROR(pthread_cond_destroy(&common_video_cond)); > + if (ret < 0) > + cleanupResult = ret; > + } > + > + if (c->is_init_section_common_audio) { > + ret = AVERROR(pthread_mutex_destroy(&common_audio_mutex)); > + if (ret < 0) > + cleanupResult = ret; > + > + ret = AVERROR(pthread_cond_destroy(&common_audio_cond)); > + if (ret < 0) > + cleanupResult = ret; > + } > + > + if (c->is_init_section_common_subtitle) { > + ret = AVERROR(pthread_mutex_destroy(&common_subtitle_mutex)); > + if (ret < 0) > + cleanupResult = ret; > + > + ret = AVERROR(pthread_cond_destroy(&common_subtitle_cond)); > + if (ret < 0) > + cleanupResult = ret; > + } > + > + // return results if errors have occured in one of the phases > + if (initResult < 0) > + return initResult; > + > + if (runResult < 0) > + return runResult; > + > + if (cleanupResult < 0) > + return cleanupResult; > + > + return 0; > +} > + > +#endif > + > static int dash_read_header(AVFormatContext *s) > { > DASHContext *c = s->priv_data; > @@ -2067,6 +2395,23 @@ static int dash_read_header(AVFormatContext *s) > if (c->n_subtitles) > c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles); > > + int threads = 0; > + int nstreams = c->n_videos + c->n_audios + c->n_subtitles; > + > +#if HAVE_THREADS > + threads = FFMIN(nstreams, c->init_threads); > +#endif > + > + if (threads > 1) > + { > +#if HAVE_THREADS > + ret = init_streams_multithreaded(s, nstreams, threads); > + if (ret < 0) > + return ret; > +#endif > + } > + else > + { > /* Open the demuxer for video and audio components if available */ > for (i = 0; i < c->n_videos; i++) { > rep = c->videos[i]; > @@ -2115,6 +2460,7 @@ static int dash_read_header(AVFormatContext *s) > > if (!stream_index) > return AVERROR_INVALIDDATA; > + } > > /* Create a program */ > program = av_new_program(s, 0); > @@ -2366,7 +2712,10 @@ static const AVOption dash_options[] = { > OFFSET(allowed_extensions), AV_OPT_TYPE_STRING, > {.str = "aac,m4a,m4s,m4v,mov,mp4,webm,ts"}, > INT_MIN, INT_MAX, FLAGS}, > - { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS }, > + { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), > + AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS }, > + { "init_threads", "Number of threads to use for initializing the DASH stream", > + OFFSET(init_threads), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 64, FLAGS }, > {NULL} > }; > > -- > 2.31.1.windows.1 > 1. We actually have an API to process multiple tasks by different threads: Look at libavutil/slicethread.h. Why can't you reuse that? 2. In case initialization of one of the conditions/mutexes fails, you are nevertheless destroying them; you are even destroying completely uninitialized mutexes. This is undefined behaviour. Checking the result of it does not fix this. - Andreas
Andreas Rheinhardt <andreas.rheinhardt@outlook.com> 于2022年8月31日周三 10:54写道: > > Lukas Fellechner: > > This patch adds an "init-threads" option, specifying the max > > number of threads to use. Multiple worker threads are spun up > > to massively bring down init times. > > --- > > libavformat/dashdec.c | 351 +++++++++++++++++++++++++++++++++++++++++- > > 1 file changed, 350 insertions(+), 1 deletion(-) > > > > diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c > > index e82da45e43..20f2557ea3 100644 > > --- a/libavformat/dashdec.c > > +++ b/libavformat/dashdec.c > > @@ -24,6 +24,7 @@ > > #include "libavutil/opt.h" > > #include "libavutil/time.h" > > #include "libavutil/parseutils.h" > > +#include "libavutil/thread.h" > > #include "internal.h" > > #include "avio_internal.h" > > #include "dash.h" > > @@ -152,6 +153,8 @@ typedef struct DASHContext { > > int max_url_size; > > char *cenc_decryption_key; > > > > + int init_threads; > > + > > /* Flags for init section*/ > > int is_init_section_common_video; > > int is_init_section_common_audio; > > @@ -2033,6 +2036,331 @@ static void move_metadata(AVStream *st, const char *key, char **value) > > } > > } > > > > +#if HAVE_THREADS > > + > > +struct work_pool_data > > +{ > > + AVFormatContext *ctx; > > + struct representation *pls; > > + struct representation *common_pls; > > + pthread_mutex_t *common_mutex; > > + pthread_cond_t *common_condition; > > + int is_common; > > + int is_started; > > + int result; > > +}; > > + > > +struct thread_data > > This is against our naming conventions: CamelCase for struct tags and > typedefs, lowercase names with underscore for variable names. > > > +{ > > + pthread_t thread; > > + pthread_mutex_t *mutex; > > + struct work_pool_data *work_pool; > > + int work_pool_size; > > + int is_started; > > + int has_error; > > +}; > > + > > +static void *worker_thread(void *ptr) > > +{ > > + int ret = 0; > > + int i; > > + struct thread_data *thread_data = (struct thread_data*)ptr; > > + struct work_pool_data *work_pool = NULL; > > + struct work_pool_data *data = NULL; > > + for (;;) { > > + > > + // get next work item unless there was an error > > + pthread_mutex_lock(thread_data->mutex); > > + data = NULL; > > + if (!thread_data->has_error) { > > + work_pool = thread_data->work_pool; > > + for (i = 0; i < thread_data->work_pool_size; i++) { > > + if (!work_pool->is_started) { > > + data = work_pool; > > + data->is_started = 1; > > + break; > > + } > > + work_pool++; > > + } > > + } > > + pthread_mutex_unlock(thread_data->mutex); > > + > > + if (!data) { > > + // no more work to do > > + return NULL; > > + } > > + > > + // if we are common section provider, init and signal > > + if (data->is_common) { > > + data->pls->parent = data->ctx; > > + ret = update_init_section(data->pls); > > + if (ret < 0) { > > + pthread_cond_signal(data->common_condition); > > + goto end; > > + } > > + else > > + ret = AVERROR(pthread_cond_signal(data->common_condition)); > > + } > > + > > + // if we depend on common section provider, wait for signal and copy > > + if (data->common_pls) { > > + ret = AVERROR(pthread_cond_wait(data->common_condition, data->common_mutex)); > > + if (ret < 0) > > + goto end; > > + > > + if (!data->common_pls->init_sec_buf) { > > + goto end; > > + ret = AVERROR(EFAULT); > > + } > > + > > + ret = copy_init_section(data->pls, data->common_pls); > > + if (ret < 0) > > + goto end; > > + } > > + > > + ret = begin_open_demux_for_component(data->ctx, data->pls); > > + if (ret < 0) > > + goto end; > > + > > + end: > > + data->result = ret; > > + > > + // notify error to other threads and exit > > + if (ret < 0) { > > + pthread_mutex_lock(thread_data->mutex); > > + thread_data->has_error = 1; > > + pthread_mutex_unlock(thread_data->mutex); > > + return NULL; > > + } > > + } > > + > > + > > + return NULL; > > +} > > + > > +static void create_work_pool_data(AVFormatContext *ctx, int stream_index, > > + struct representation *pls, struct representation *common_pls, > > + struct work_pool_data *init_data, pthread_mutex_t *common_mutex, > > + pthread_cond_t *common_condition) > > +{ > > + init_data->ctx = ctx; > > + init_data->pls = pls; > > + init_data->pls->stream_index = stream_index; > > + init_data->common_condition = common_condition; > > + init_data->common_mutex = common_mutex; > > + init_data->result = -1; > > + > > + if (pls == common_pls) { > > + init_data->is_common = 1; > > + } > > + else if (common_pls) { > > + init_data->common_pls = common_pls; > > + } > > +} > > + > > +static int start_thread(struct thread_data *thread_data, > > + struct work_pool_data *work_pool, int work_pool_size, pthread_mutex_t *mutex) > > +{ > > + int ret; > > + > > + thread_data->mutex = mutex; > > + thread_data->work_pool = work_pool; > > + thread_data->work_pool_size = work_pool_size; > > + > > + ret = AVERROR(pthread_create(&thread_data->thread, NULL, worker_thread, (void*)thread_data)); > > + if (ret == 0) > > + thread_data->is_started = 1; > > + > > + return ret; > > +} > > + > > +static int init_streams_multithreaded(AVFormatContext *s, int nstreams, int threads) > > +{ > > + DASHContext *c = s->priv_data; > > + int ret = 0; > > + int stream_index = 0; > > + int i; > > We allow "for (int i = 0;" > > > + > > + // alloc data > > + struct work_pool_data *init_data = (struct work_pool_data*)av_mallocz(sizeof(struct work_pool_data) * nstreams); > > + if (!init_data) > > + return AVERROR(ENOMEM); > > + > > + struct thread_data *thread_data = (struct thread_data*)av_mallocz(sizeof(struct thread_data) * threads); > > + if (!thread_data) > > + return AVERROR(ENOMEM); > > 1. init_data leaks here on error. > 2. In fact, it seems to me that both init_data and thread_data are > nowhere freed. > > > + > > + // alloc mutex and conditions > > + pthread_mutex_t work_pool_mutex; > > + > > + pthread_mutex_t common_video_mutex; > > + pthread_cond_t common_video_cond; > > + > > + pthread_mutex_t common_audio_mutex; > > + pthread_cond_t common_audio_cond; > > + > > + pthread_mutex_t common_subtitle_mutex; > > + pthread_cond_t common_subtitle_cond; > > + > > + // init mutex and conditions > > + ret = AVERROR(pthread_mutex_init(&work_pool_mutex, NULL)); > > + if (ret < 0) > > + goto cleanup; > > + > > + if (c->is_init_section_common_video) { > > + ret = AVERROR(pthread_mutex_init(&common_video_mutex, NULL)); > > + if (ret < 0) > > + goto cleanup; > > + > > + ret = AVERROR(pthread_cond_init(&common_video_cond, NULL)); > > + if (ret < 0) > > + goto cleanup; > > + } > > + > > + if (c->is_init_section_common_audio) { > > + ret = AVERROR(pthread_mutex_init(&common_audio_mutex, NULL)); > > + if (ret < 0) > > + goto cleanup; > > + > > + ret = AVERROR(pthread_cond_init(&common_audio_cond, NULL)); > > + if (ret < 0) > > + goto cleanup; > > + } > > + > > + if (c->is_init_section_common_subtitle) { > > + ret = AVERROR(pthread_mutex_init(&common_subtitle_mutex, NULL)); > > + if (ret < 0) > > + goto cleanup; > > + > > + ret = AVERROR(pthread_cond_init(&common_subtitle_cond, NULL)); > > + if (ret < 0) > > + goto cleanup; > > + } > > + > > + // init work pool data > > + struct work_pool_data* current_data = init_data; > > + > > + for (i = 0; i < c->n_videos; i++) { > > + create_work_pool_data(s, stream_index, c->videos[i], > > + c->is_init_section_common_video ? c->videos[0] : NULL, > > + current_data, &common_video_mutex, &common_video_cond); > > + > > + stream_index++; > > + current_data++; > > + } > > + > > + for (i = 0; i < c->n_audios; i++) { > > + create_work_pool_data(s, stream_index, c->audios[i], > > + c->is_init_section_common_audio ? c->audios[0] : NULL, > > + current_data, &common_audio_mutex, &common_audio_cond); > > + > > + stream_index++; > > + current_data++; > > + } > > + > > + for (i = 0; i < c->n_subtitles; i++) { > > + create_work_pool_data(s, stream_index, c->subtitles[i], > > + c->is_init_section_common_subtitle ? c->subtitles[0] : NULL, > > + current_data, &common_subtitle_mutex, &common_subtitle_cond); > > + > > + stream_index++; > > + current_data++; > > + } > > This is very repetitive. > > > + > > + // start threads > > + struct thread_data *current_thread = thread_data; > > + for (i = 0; i < threads; i++) { > > + ret = start_thread(current_thread, init_data, nstreams, &work_pool_mutex); > > + if (ret < 0) > > + goto cleanup; > > + > > + current_thread++; > > + } > > + > > +cleanup: > > + // we need to cleanup even in case of errors, so we need to store results of init, run and cleanup > > + int initResult = ret; > > + int runResult = 0; > > + int cleanupResult = 0; > > + > > + // join threads > > + current_thread = thread_data; > > + for (i = 0; i < threads; i++) { > > + if (current_thread->is_started) { > > + ret = AVERROR(pthread_join(current_thread->thread, NULL)); > > + if (ret < 0) > > + cleanupResult = ret; > > + } > > + current_thread++; > > + } > > + > > + // finalize streams and collect results > > + current_data = init_data; > > + for (i = 0; i < nstreams; i++) { > > + if (current_data->result < 0) { > > + // thread ran into error: collect result and break > > + runResult = current_data->result; > > + break; > > + } > > + else { > > + // thread success: create streams on AVFormatContext > > + ret = end_open_demux_for_component(s, current_data->pls); > > + if (ret < 0) > > + runResult = ret; > > + } > > + current_data++; > > + } > > + > > + // cleanup mutex and conditions > > + ret = AVERROR(pthread_mutex_destroy(&work_pool_mutex)); > > + if (ret < 0) > > + cleanupResult = ret; > > + > > + if (c->is_init_section_common_video) { > > + ret = AVERROR(pthread_mutex_destroy(&common_video_mutex)); > > + if (ret < 0) > > + cleanupResult = ret; > > + > > + ret = AVERROR(pthread_cond_destroy(&common_video_cond)); > > + if (ret < 0) > > + cleanupResult = ret; > > + } > > + > > + if (c->is_init_section_common_audio) { > > + ret = AVERROR(pthread_mutex_destroy(&common_audio_mutex)); > > + if (ret < 0) > > + cleanupResult = ret; > > + > > + ret = AVERROR(pthread_cond_destroy(&common_audio_cond)); > > + if (ret < 0) > > + cleanupResult = ret; > > + } > > + > > + if (c->is_init_section_common_subtitle) { > > + ret = AVERROR(pthread_mutex_destroy(&common_subtitle_mutex)); > > + if (ret < 0) > > + cleanupResult = ret; > > + > > + ret = AVERROR(pthread_cond_destroy(&common_subtitle_cond)); > > + if (ret < 0) > > + cleanupResult = ret; > > + } > > + > > + // return results if errors have occured in one of the phases > > + if (initResult < 0) > > + return initResult; > > + > > + if (runResult < 0) > > + return runResult; > > + > > + if (cleanupResult < 0) > > + return cleanupResult; > > + > > + return 0; > > +} > > + > > +#endif > > + > > static int dash_read_header(AVFormatContext *s) > > { > > DASHContext *c = s->priv_data; > > @@ -2067,6 +2395,23 @@ static int dash_read_header(AVFormatContext *s) > > if (c->n_subtitles) > > c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles); > > > > + int threads = 0; > > + int nstreams = c->n_videos + c->n_audios + c->n_subtitles; > > + > > +#if HAVE_THREADS > > + threads = FFMIN(nstreams, c->init_threads); > > +#endif > > + > > + if (threads > 1) > > + { > > +#if HAVE_THREADS > > + ret = init_streams_multithreaded(s, nstreams, threads); > > + if (ret < 0) > > + return ret; > > +#endif > > + } > > + else > > + { > > /* Open the demuxer for video and audio components if available */ > > for (i = 0; i < c->n_videos; i++) { > > rep = c->videos[i]; > > @@ -2115,6 +2460,7 @@ static int dash_read_header(AVFormatContext *s) > > > > if (!stream_index) > > return AVERROR_INVALIDDATA; > > + } > > > > /* Create a program */ > > program = av_new_program(s, 0); > > @@ -2366,7 +2712,10 @@ static const AVOption dash_options[] = { > > OFFSET(allowed_extensions), AV_OPT_TYPE_STRING, > > {.str = "aac,m4a,m4s,m4v,mov,mp4,webm,ts"}, > > INT_MIN, INT_MAX, FLAGS}, > > - { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS }, > > + { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), > > + AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS }, > > + { "init_threads", "Number of threads to use for initializing the DASH stream", > > + OFFSET(init_threads), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 64, FLAGS }, > > {NULL} > > }; > > > > -- > > 2.31.1.windows.1 > > > > 1. We actually have an API to process multiple tasks by different > threads: Look at libavutil/slicethread.h. Why can't you reuse that? I saw that usually be used in avfilters for slice multi-thread, or i misunderstand something? > 2. In case initialization of one of the conditions/mutexes fails, you > are nevertheless destroying them; you are even destroying completely > uninitialized mutexes. This is undefined behaviour. Checking the result > of it does not fix this. > > - Andreas Thanks Steven
Steven Liu: > Andreas Rheinhardt <andreas.rheinhardt@outlook.com> 于2022年8月31日周三 10:54写道: >> 1. We actually have an API to process multiple tasks by different >> threads: Look at libavutil/slicethread.h. Why can't you reuse that? > I saw that usually be used in avfilters for slice multi-thread, or i > misunderstand something? > It is also used by our slice-threaded decoders. In fact, it is everywhere where we do slice threading. - Andreas
Andreas Rheinhardt andreas.rheinhardt at outlook.com Wed Aug 31 05:54:12 EEST 2022 > > > +#if HAVE_THREADS > > + > > +struct work_pool_data > > +{ > > + AVFormatContext *ctx; > > + struct representation *pls; > > + struct representation *common_pls; > > + pthread_mutex_t *common_mutex; > > + pthread_cond_t *common_condition; > > + int is_common; > > + int is_started; > > + int result; > > +}; > > + > > +struct thread_data > > This is against our naming conventions: CamelCase for struct tags and > typedefs, lowercase names with underscore for variable names. In the code files I looked at, CamelCase is only used for typedef structs. All structs without typedef are lower case with underscores, so I aligned with that, originally. I will make this a typedef struct and use CamelCase for next patch. > > +static int init_streams_multithreaded(AVFormatContext *s, int nstreams, int threads) > > +{ > > + DASHContext *c = s->priv_data; > > + int ret = 0; > > + int stream_index = 0; > > + int i; > > We allow "for (int i = 0;" Oh, I did not know that, and I did not see it being used here anywhere. Will use in next patch, it's my preferred style, actually. > > + > > + // alloc data > > + struct work_pool_data *init_data = (struct work_pool_data*)av_mallocz(sizeof(struct work_pool_data) * nstreams); > > + if (!init_data) > > + return AVERROR(ENOMEM); > > + > > + struct thread_data *thread_data = (struct thread_data*)av_mallocz(sizeof(struct thread_data) * threads); > > + if (!thread_data) > > + return AVERROR(ENOMEM); > > 1. init_data leaks here on error. > 2. In fact, it seems to me that both init_data and thread_data are > nowhere freed. True, I must have lost the av_free call at some point. > > + // init work pool data > > + struct work_pool_data* current_data = init_data; > > + > > + for (i = 0; i < c->n_videos; i++) { > > + create_work_pool_data(s, stream_index, c->videos[i], > > + c->is_init_section_common_video ? c->videos[0] : NULL, > > + current_data, &common_video_mutex, &common_video_cond); > > + > > + stream_index++; > > + current_data++; > > + } > > + > > + for (i = 0; i < c->n_audios; i++) { > > + create_work_pool_data(s, stream_index, c->audios[i], > > + c->is_init_section_common_audio ? c->audios[0] : NULL, > > + current_data, &common_audio_mutex, &common_audio_cond); > > + > > + stream_index++; > > + current_data++; > > + } > > + > > + for (i = 0; i < c->n_subtitles; i++) { > > + create_work_pool_data(s, stream_index, c->subtitles[i], > > + c->is_init_section_common_subtitle ? c->subtitles[0] : NULL, > > + current_data, &common_subtitle_mutex, &common_subtitle_cond); > > + > > + stream_index++; > > + current_data++; > > + } > > This is very repetitive. Will improve for next patch. > 1. We actually have an API to process multiple tasks by different > threads: Look at libavutil/slicethread.h. Why can't you reuse that? > 2. In case initialization of one of the conditions/mutexes fails, you > are nevertheless destroying them; you are even destroying completely > uninitialized mutexes. This is undefined behaviour. Checking the result > of it does not fix this. > > - Andreas 1. The slicethread implementation is pretty hard to understand. I was not sure if it can be used for normal parallelization, because it looked more like some kind of thread pool for continuous data processing. But after taking a second look, I think I can use it here. I will try and see if it works well. 2. I was not aware that this is undefined behavior. Will switch to PTHREAD_MUTEX_INITIALIZER and PTHREAD_COND_INITIALIZER macros, which return a safely initialized mutex/cond. I also noticed one cross-thread issue that I will solve in the next patch.
Lukas Fellechner: > Andreas Rheinhardt andreas.rheinhardt at outlook.com > Wed Aug 31 05:54:12 EEST 2022 >> >>> +#if HAVE_THREADS >>> + >>> +struct work_pool_data >>> +{ >>> + AVFormatContext *ctx; >>> + struct representation *pls; >>> + struct representation *common_pls; >>> + pthread_mutex_t *common_mutex; >>> + pthread_cond_t *common_condition; >>> + int is_common; >>> + int is_started; >>> + int result; >>> +}; >>> + >>> +struct thread_data >> >> This is against our naming conventions: CamelCase for struct tags and >> typedefs, lowercase names with underscore for variable names. > > In the code files I looked at, CamelCase is only used for typedef structs. > All structs without typedef are lower case with underscores, so I aligned > with that, originally. > > I will make this a typedef struct and use CamelCase for next patch. > >>> +static int init_streams_multithreaded(AVFormatContext *s, int nstreams, int threads) >>> +{ >>> + DASHContext *c = s->priv_data; >>> + int ret = 0; >>> + int stream_index = 0; >>> + int i; >> >> We allow "for (int i = 0;" > > Oh, I did not know that, and I did not see it being used here anywhere. > Will use in next patch, it's my preferred style, actually. > >>> + >>> + // alloc data >>> + struct work_pool_data *init_data = (struct work_pool_data*)av_mallocz(sizeof(struct work_pool_data) * nstreams); >>> + if (!init_data) >>> + return AVERROR(ENOMEM); >>> + >>> + struct thread_data *thread_data = (struct thread_data*)av_mallocz(sizeof(struct thread_data) * threads); >>> + if (!thread_data) >>> + return AVERROR(ENOMEM); >> >> 1. init_data leaks here on error. >> 2. In fact, it seems to me that both init_data and thread_data are >> nowhere freed. > > True, I must have lost the av_free call at some point. > >>> + // init work pool data >>> + struct work_pool_data* current_data = init_data; >>> + >>> + for (i = 0; i < c->n_videos; i++) { >>> + create_work_pool_data(s, stream_index, c->videos[i], >>> + c->is_init_section_common_video ? c->videos[0] : NULL, >>> + current_data, &common_video_mutex, &common_video_cond); >>> + >>> + stream_index++; >>> + current_data++; >>> + } >>> + >>> + for (i = 0; i < c->n_audios; i++) { >>> + create_work_pool_data(s, stream_index, c->audios[i], >>> + c->is_init_section_common_audio ? c->audios[0] : NULL, >>> + current_data, &common_audio_mutex, &common_audio_cond); >>> + >>> + stream_index++; >>> + current_data++; >>> + } >>> + >>> + for (i = 0; i < c->n_subtitles; i++) { >>> + create_work_pool_data(s, stream_index, c->subtitles[i], >>> + c->is_init_section_common_subtitle ? c->subtitles[0] : NULL, >>> + current_data, &common_subtitle_mutex, &common_subtitle_cond); >>> + >>> + stream_index++; >>> + current_data++; >>> + } >> >> This is very repetitive. > > Will improve for next patch. > >> 1. We actually have an API to process multiple tasks by different >> threads: Look at libavutil/slicethread.h. Why can't you reuse that? >> 2. In case initialization of one of the conditions/mutexes fails, you >> are nevertheless destroying them; you are even destroying completely >> uninitialized mutexes. This is undefined behaviour. Checking the result >> of it does not fix this. >> >> - Andreas > > 1. The slicethread implementation is pretty hard to understand. > I was not sure if it can be used for normal parallelization, because > it looked more like some kind of thread pool for continuous data > processing. But after taking a second look, I think I can use it here. > I will try and see if it works well. > > 2. I was not aware that this is undefined behavior. Will switch to > PTHREAD_MUTEX_INITIALIZER and PTHREAD_COND_INITIALIZER macros, > which return a safely initialized mutex/cond. > "The behavior is undefined if the value specified by the mutex argument to pthread_mutex_destroy() does not refer to an initialized mutex." (From https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_mutex_destroy.html) Furthermore: "In cases where default mutex attributes are appropriate, the macro PTHREAD_MUTEX_INITIALIZER can be used to initialize mutexes. The effect shall be equivalent to dynamic initialization by a call to pthread_mutex_init() with parameter attr specified as NULL, except that no error checks are performed." The last sentence sounds as if one would then have to check mutex locking. Moreover, older pthread standards did not allow to use PTHREAD_MUTEX_INITIALIZER with non-static mutexes, so I don't know whether we can use that. Also our pthreads-wrapper on top of OS/2-threads does not provide PTHREAD_COND_INITIALIZER (which is used nowhere in the codebase). > I also noticed one cross-thread issue that I will solve in the next patch.
> Gesendet: Montag, 05. September 2022 um 00:50 Uhr > Von: "Andreas Rheinhardt" <andreas.rheinhardt@outlook.com> > An: ffmpeg-devel@ffmpeg.org > Betreff: Re: [FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH initialization > Lukas Fellechner: > > Andreas Rheinhardt andreas.rheinhardt at outlook.com > > Wed Aug 31 05:54:12 EEST 2022 > >> > > > >> 1. We actually have an API to process multiple tasks by different > >> threads: Look at libavutil/slicethread.h. Why can't you reuse that? > >> 2. In case initialization of one of the conditions/mutexes fails, you > >> are nevertheless destroying them; you are even destroying completely > >> uninitialized mutexes. This is undefined behaviour. Checking the result > >> of it does not fix this. > >> > >> - Andreas > > > > 1. The slicethread implementation is pretty hard to understand. > > I was not sure if it can be used for normal parallelization, because > > it looked more like some kind of thread pool for continuous data > > processing. But after taking a second look, I think I can use it here. > > I will try and see if it works well. > > > > 2. I was not aware that this is undefined behavior. Will switch to > > PTHREAD_MUTEX_INITIALIZER and PTHREAD_COND_INITIALIZER macros, > > which return a safely initialized mutex/cond. > > > > "The behavior is undefined if the value specified by the mutex argument > to pthread_mutex_destroy() does not refer to an initialized mutex." > (From > https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_mutex_destroy.html) > > Furthermore: "In cases where default mutex attributes are appropriate, > the macro PTHREAD_MUTEX_INITIALIZER can be used to initialize mutexes. > The effect shall be equivalent to dynamic initialization by a call to > pthread_mutex_init() with parameter attr specified as NULL, except that > no error checks are performed." The last sentence sounds as if one would > then have to check mutex locking. > > Moreover, older pthread standards did not allow to use > PTHREAD_MUTEX_INITIALIZER with non-static mutexes, so I don't know > whether we can use that. Also our pthreads-wrapper on top of > OS/2-threads does not provide PTHREAD_COND_INITIALIZER (which is used > nowhere in the codebase). I missed that detail about the initializer macro. Thank you for clearing that up. After looking more into threads implementation in ffmpeg, I wonder if I really need to check any results of init/destroy or other functions. In slicethreads.c, there is zero checking on any of the lock functions. The pthreads-based implementation does internally check the results of all function calls and calls abort() in case of errors ("strict_" wrappers). The Win32 implementation uses SRW locks which cannot even return errors. And the OS2 implementation returns 0 on all calls as well. So right now, I think that I should continue with normal _init() calls (no macros) and drop all error checking, just like slicethread does. Are you fine with that approach?
Lukas Fellechner: >> Gesendet: Montag, 05. September 2022 um 00:50 Uhr >> Von: "Andreas Rheinhardt" <andreas.rheinhardt@outlook.com> >> An: ffmpeg-devel@ffmpeg.org >> Betreff: Re: [FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH initialization >> Lukas Fellechner: >>> Andreas Rheinhardt andreas.rheinhardt at outlook.com >>> Wed Aug 31 05:54:12 EEST 2022 >>>> >>> >>>> 1. We actually have an API to process multiple tasks by different >>>> threads: Look at libavutil/slicethread.h. Why can't you reuse that? >>>> 2. In case initialization of one of the conditions/mutexes fails, you >>>> are nevertheless destroying them; you are even destroying completely >>>> uninitialized mutexes. This is undefined behaviour. Checking the result >>>> of it does not fix this. >>>> >>>> - Andreas >>> >>> 1. The slicethread implementation is pretty hard to understand. >>> I was not sure if it can be used for normal parallelization, because >>> it looked more like some kind of thread pool for continuous data >>> processing. But after taking a second look, I think I can use it here. >>> I will try and see if it works well. >>> >>> 2. I was not aware that this is undefined behavior. Will switch to >>> PTHREAD_MUTEX_INITIALIZER and PTHREAD_COND_INITIALIZER macros, >>> which return a safely initialized mutex/cond. >>> >> >> "The behavior is undefined if the value specified by the mutex argument >> to pthread_mutex_destroy() does not refer to an initialized mutex." >> (From >> https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_mutex_destroy.html) >> >> Furthermore: "In cases where default mutex attributes are appropriate, >> the macro PTHREAD_MUTEX_INITIALIZER can be used to initialize mutexes. >> The effect shall be equivalent to dynamic initialization by a call to >> pthread_mutex_init() with parameter attr specified as NULL, except that >> no error checks are performed." The last sentence sounds as if one would >> then have to check mutex locking. >> >> Moreover, older pthread standards did not allow to use >> PTHREAD_MUTEX_INITIALIZER with non-static mutexes, so I don't know >> whether we can use that. Also our pthreads-wrapper on top of >> OS/2-threads does not provide PTHREAD_COND_INITIALIZER (which is used >> nowhere in the codebase). > > I missed that detail about the initializer macro. Thank you for clearing > that up. > > After looking more into threads implementation in ffmpeg, I wonder if I > really need to check any results of init/destroy or other functions. > In slicethreads.c, there is zero checking on any of the lock functions. > The pthreads-based implementation does internally check the results of all > function calls and calls abort() in case of errors ("strict_" wrappers). > The Win32 implementation uses SRW locks which cannot even return errors. > And the OS2 implementation returns 0 on all calls as well. > > So right now, I think that I should continue with normal _init() calls > (no macros) and drop all error checking, just like slicethread does. > Are you fine with that approach? Zero checking is our old approach; the new approach checks for errors and ensures that only mutexes/condition variables that have been properly initialized are destroyed. See ff_pthread_init/free in libavcodec/pthread.c (you can't use this in libavformat, because these functions are local to libavcodec). - Andreas
>Gesendet: Montag, 05. September 2022 um 12:45 Uhr >Von: "Andreas Rheinhardt" <andreas.rheinhardt@outlook.com> >An: ffmpeg-devel@ffmpeg.org >Betreff: Re: [FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH initialization >Lukas Fellechner: >>> Gesendet: Montag, 05. September 2022 um 00:50 Uhr >>> Von: "Andreas Rheinhardt" <andreas.rheinhardt@outlook.com> >>> An: ffmpeg-devel@ffmpeg.org >>> Betreff: Re: [FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH initialization >>> Lukas Fellechner: >>>> Andreas Rheinhardt andreas.rheinhardt at outlook.com >>>> Wed Aug 31 05:54:12 EEST 2022 >>>>> >>>> >>>>> 1. We actually have an API to process multiple tasks by different >>>>> threads: Look at libavutil/slicethread.h. Why can't you reuse that? >>>>> 2. In case initialization of one of the conditions/mutexes fails, you >>>>> are nevertheless destroying them; you are even destroying completely >>>>> uninitialized mutexes. This is undefined behaviour. Checking the result >>>>> of it does not fix this. >>>>> >>>>> - Andreas >>>> >>>> 1. The slicethread implementation is pretty hard to understand. >>>> I was not sure if it can be used for normal parallelization, because >>>> it looked more like some kind of thread pool for continuous data >>>> processing. But after taking a second look, I think I can use it here. >>>> I will try and see if it works well. >>>> >>>> 2. I was not aware that this is undefined behavior. Will switch to >>>> PTHREAD_MUTEX_INITIALIZER and PTHREAD_COND_INITIALIZER macros, >>>> which return a safely initialized mutex/cond. >>>> >>> >>> "The behavior is undefined if the value specified by the mutex argument >>> to pthread_mutex_destroy() does not refer to an initialized mutex." >>> (From >>> https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_mutex_destroy.html) >>> >>> Furthermore: "In cases where default mutex attributes are appropriate, >>> the macro PTHREAD_MUTEX_INITIALIZER can be used to initialize mutexes. >>> The effect shall be equivalent to dynamic initialization by a call to >>> pthread_mutex_init() with parameter attr specified as NULL, except that >>> no error checks are performed." The last sentence sounds as if one would >>> then have to check mutex locking. >>> >>> Moreover, older pthread standards did not allow to use >>> PTHREAD_MUTEX_INITIALIZER with non-static mutexes, so I don't know >>> whether we can use that. Also our pthreads-wrapper on top of >>> OS/2-threads does not provide PTHREAD_COND_INITIALIZER (which is used >>> nowhere in the codebase). >> >> I missed that detail about the initializer macro. Thank you for clearing >> that up. >> >> After looking more into threads implementation in ffmpeg, I wonder if I >> really need to check any results of init/destroy or other functions. >> In slicethreads.c, there is zero checking on any of the lock functions. >> The pthreads-based implementation does internally check the results of all >> function calls and calls abort() in case of errors ("strict_" wrappers). >> The Win32 implementation uses SRW locks which cannot even return errors. >> And the OS2 implementation returns 0 on all calls as well. >> >> So right now, I think that I should continue with normal _init() calls >> (no macros) and drop all error checking, just like slicethread does. >> Are you fine with that approach? > > Zero checking is our old approach; the new approach checks for errors > and ensures that only mutexes/condition variables that have been > properly initialized are destroyed. See ff_pthread_init/free in > libavcodec/pthread.c (you can't use this in libavformat, because these > functions are local to libavcodec). > > - Andreas I see. I will try to do clean error checking then.
Gesendet: Montag, 05. September 2022 um 12:45 Uhr Von: "Andreas Rheinhardt" <andreas.rheinhardt@outlook.com> An: ffmpeg-devel@ffmpeg.org Betreff: Re: [FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH initialization Lukas Fellechner: >>> Moreover, older pthread standards did not allow to use >>> PTHREAD_MUTEX_INITIALIZER with non-static mutexes, so I don't know >>> whether we can use that. Also our pthreads-wrapper on top of >>> OS/2-threads does not provide PTHREAD_COND_INITIALIZER (which is used >>> nowhere in the codebase). >> >> I missed that detail about the initializer macro. Thank you for clearing >> that up. >> >> After looking more into threads implementation in ffmpeg, I wonder if I >> really need to check any results of init/destroy or other functions. >> In slicethreads.c, there is zero checking on any of the lock functions. >> The pthreads-based implementation does internally check the results of all >> function calls and calls abort() in case of errors ("strict_" wrappers). >> The Win32 implementation uses SRW locks which cannot even return errors. >> And the OS2 implementation returns 0 on all calls as well. >> >> So right now, I think that I should continue with normal _init() calls >> (no macros) and drop all error checking, just like slicethread does. >> Are you fine with that approach? > > Zero checking is our old approach; the new approach checks for errors > and ensures that only mutexes/condition variables that have been > properly initialized are destroyed. See ff_pthread_init/free in > libavcodec/pthread.c (you can't use this in libavformat, because these > functions are local to libavcodec). > > - Andreas I was able to switch to using the slicethread implementation. It has a very minor delay on init, because it waits for all threads to fully start up before continueing. But it is only few ms and not worth adding a new implementation just for that. I also changed the initialization and release of mutex and conds, with full return code checking and safe release. There was one cross-thread issue I needed to address. A multi hour duration test (connecting in endless loop) did not show any issues after fixing the avio_opts cross thread access. Please see the v4 patch for all the changes. - Lukas
diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c index e82da45e43..20f2557ea3 100644 --- a/libavformat/dashdec.c +++ b/libavformat/dashdec.c @@ -24,6 +24,7 @@ #include "libavutil/opt.h" #include "libavutil/time.h" #include "libavutil/parseutils.h" +#include "libavutil/thread.h" #include "internal.h" #include "avio_internal.h" #include "dash.h" @@ -152,6 +153,8 @@ typedef struct DASHContext { int max_url_size; char *cenc_decryption_key; + int init_threads; + /* Flags for init section*/ int is_init_section_common_video; int is_init_section_common_audio; @@ -2033,6 +2036,331 @@ static void move_metadata(AVStream *st, const char *key, char **value) } } +#if HAVE_THREADS + +struct work_pool_data +{ + AVFormatContext *ctx; + struct representation *pls; + struct representation *common_pls; + pthread_mutex_t *common_mutex; + pthread_cond_t *common_condition; + int is_common; + int is_started; + int result; +}; + +struct thread_data +{ + pthread_t thread; + pthread_mutex_t *mutex; + struct work_pool_data *work_pool; + int work_pool_size; + int is_started; + int has_error; +}; + +static void *worker_thread(void *ptr) +{ + int ret = 0; + int i; + struct thread_data *thread_data = (struct thread_data*)ptr; + struct work_pool_data *work_pool = NULL; + struct work_pool_data *data = NULL; + for (;;) { + + // get next work item unless there was an error + pthread_mutex_lock(thread_data->mutex); + data = NULL; + if (!thread_data->has_error) { + work_pool = thread_data->work_pool; + for (i = 0; i < thread_data->work_pool_size; i++) { + if (!work_pool->is_started) { + data = work_pool; + data->is_started = 1; + break; + } + work_pool++; + } + } + pthread_mutex_unlock(thread_data->mutex); + + if (!data) { + // no more work to do + return NULL; + } + + // if we are common section provider, init and signal + if (data->is_common) { + data->pls->parent = data->ctx; + ret = update_init_section(data->pls); + if (ret < 0) { + pthread_cond_signal(data->common_condition); + goto end; + } + else + ret = AVERROR(pthread_cond_signal(data->common_condition)); + } + + // if we depend on common section provider, wait for signal and copy + if (data->common_pls) { + ret = AVERROR(pthread_cond_wait(data->common_condition, data->common_mutex)); + if (ret < 0) + goto end; + + if (!data->common_pls->init_sec_buf) { + goto end; + ret = AVERROR(EFAULT); + } + + ret = copy_init_section(data->pls, data->common_pls); + if (ret < 0) + goto end; + } + + ret = begin_open_demux_for_component(data->ctx, data->pls); + if (ret < 0) + goto end; + + end: + data->result = ret; + + // notify error to other threads and exit + if (ret < 0) { + pthread_mutex_lock(thread_data->mutex); + thread_data->has_error = 1; + pthread_mutex_unlock(thread_data->mutex); + return NULL; + } + } + + + return NULL; +} + +static void create_work_pool_data(AVFormatContext *ctx, int stream_index, + struct representation *pls, struct representation *common_pls, + struct work_pool_data *init_data, pthread_mutex_t *common_mutex, + pthread_cond_t *common_condition) +{ + init_data->ctx = ctx; + init_data->pls = pls; + init_data->pls->stream_index = stream_index; + init_data->common_condition = common_condition; + init_data->common_mutex = common_mutex; + init_data->result = -1; + + if (pls == common_pls) { + init_data->is_common = 1; + } + else if (common_pls) { + init_data->common_pls = common_pls; + } +} + +static int start_thread(struct thread_data *thread_data, + struct work_pool_data *work_pool, int work_pool_size, pthread_mutex_t *mutex) +{ + int ret; + + thread_data->mutex = mutex; + thread_data->work_pool = work_pool; + thread_data->work_pool_size = work_pool_size; + + ret = AVERROR(pthread_create(&thread_data->thread, NULL, worker_thread, (void*)thread_data)); + if (ret == 0) + thread_data->is_started = 1; + + return ret; +} + +static int init_streams_multithreaded(AVFormatContext *s, int nstreams, int threads) +{ + DASHContext *c = s->priv_data; + int ret = 0; + int stream_index = 0; + int i; + + // alloc data + struct work_pool_data *init_data = (struct work_pool_data*)av_mallocz(sizeof(struct work_pool_data) * nstreams); + if (!init_data) + return AVERROR(ENOMEM); + + struct thread_data *thread_data = (struct thread_data*)av_mallocz(sizeof(struct thread_data) * threads); + if (!thread_data) + return AVERROR(ENOMEM); + + // alloc mutex and conditions + pthread_mutex_t work_pool_mutex; + + pthread_mutex_t common_video_mutex; + pthread_cond_t common_video_cond; + + pthread_mutex_t common_audio_mutex; + pthread_cond_t common_audio_cond; + + pthread_mutex_t common_subtitle_mutex; + pthread_cond_t common_subtitle_cond; + + // init mutex and conditions + ret = AVERROR(pthread_mutex_init(&work_pool_mutex, NULL)); + if (ret < 0) + goto cleanup; + + if (c->is_init_section_common_video) { + ret = AVERROR(pthread_mutex_init(&common_video_mutex, NULL)); + if (ret < 0) + goto cleanup; + + ret = AVERROR(pthread_cond_init(&common_video_cond, NULL)); + if (ret < 0) + goto cleanup; + } + + if (c->is_init_section_common_audio) { + ret = AVERROR(pthread_mutex_init(&common_audio_mutex, NULL)); + if (ret < 0) + goto cleanup; + + ret = AVERROR(pthread_cond_init(&common_audio_cond, NULL)); + if (ret < 0) + goto cleanup; + } + + if (c->is_init_section_common_subtitle) { + ret = AVERROR(pthread_mutex_init(&common_subtitle_mutex, NULL)); + if (ret < 0) + goto cleanup; + + ret = AVERROR(pthread_cond_init(&common_subtitle_cond, NULL)); + if (ret < 0) + goto cleanup; + } + + // init work pool data + struct work_pool_data* current_data = init_data; + + for (i = 0; i < c->n_videos; i++) { + create_work_pool_data(s, stream_index, c->videos[i], + c->is_init_section_common_video ? c->videos[0] : NULL, + current_data, &common_video_mutex, &common_video_cond); + + stream_index++; + current_data++; + } + + for (i = 0; i < c->n_audios; i++) { + create_work_pool_data(s, stream_index, c->audios[i], + c->is_init_section_common_audio ? c->audios[0] : NULL, + current_data, &common_audio_mutex, &common_audio_cond); + + stream_index++; + current_data++; + } + + for (i = 0; i < c->n_subtitles; i++) { + create_work_pool_data(s, stream_index, c->subtitles[i], + c->is_init_section_common_subtitle ? c->subtitles[0] : NULL, + current_data, &common_subtitle_mutex, &common_subtitle_cond); + + stream_index++; + current_data++; + } + + // start threads + struct thread_data *current_thread = thread_data; + for (i = 0; i < threads; i++) { + ret = start_thread(current_thread, init_data, nstreams, &work_pool_mutex); + if (ret < 0) + goto cleanup; + + current_thread++; + } + +cleanup: + // we need to cleanup even in case of errors, so we need to store results of init, run and cleanup + int initResult = ret; + int runResult = 0; + int cleanupResult = 0; + + // join threads + current_thread = thread_data; + for (i = 0; i < threads; i++) { + if (current_thread->is_started) { + ret = AVERROR(pthread_join(current_thread->thread, NULL)); + if (ret < 0) + cleanupResult = ret; + } + current_thread++; + } + + // finalize streams and collect results + current_data = init_data; + for (i = 0; i < nstreams; i++) { + if (current_data->result < 0) { + // thread ran into error: collect result and break + runResult = current_data->result; + break; + } + else { + // thread success: create streams on AVFormatContext + ret = end_open_demux_for_component(s, current_data->pls); + if (ret < 0) + runResult = ret; + } + current_data++; + } + + // cleanup mutex and conditions + ret = AVERROR(pthread_mutex_destroy(&work_pool_mutex)); + if (ret < 0) + cleanupResult = ret; + + if (c->is_init_section_common_video) { + ret = AVERROR(pthread_mutex_destroy(&common_video_mutex)); + if (ret < 0) + cleanupResult = ret; + + ret = AVERROR(pthread_cond_destroy(&common_video_cond)); + if (ret < 0) + cleanupResult = ret; + } + + if (c->is_init_section_common_audio) { + ret = AVERROR(pthread_mutex_destroy(&common_audio_mutex)); + if (ret < 0) + cleanupResult = ret; + + ret = AVERROR(pthread_cond_destroy(&common_audio_cond)); + if (ret < 0) + cleanupResult = ret; + } + + if (c->is_init_section_common_subtitle) { + ret = AVERROR(pthread_mutex_destroy(&common_subtitle_mutex)); + if (ret < 0) + cleanupResult = ret; + + ret = AVERROR(pthread_cond_destroy(&common_subtitle_cond)); + if (ret < 0) + cleanupResult = ret; + } + + // return results if errors have occured in one of the phases + if (initResult < 0) + return initResult; + + if (runResult < 0) + return runResult; + + if (cleanupResult < 0) + return cleanupResult; + + return 0; +} + +#endif + static int dash_read_header(AVFormatContext *s) { DASHContext *c = s->priv_data; @@ -2067,6 +2395,23 @@ static int dash_read_header(AVFormatContext *s) if (c->n_subtitles) c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles); + int threads = 0; + int nstreams = c->n_videos + c->n_audios + c->n_subtitles; + +#if HAVE_THREADS + threads = FFMIN(nstreams, c->init_threads); +#endif + + if (threads > 1) + { +#if HAVE_THREADS + ret = init_streams_multithreaded(s, nstreams, threads); + if (ret < 0) + return ret; +#endif + } + else + { /* Open the demuxer for video and audio components if available */ for (i = 0; i < c->n_videos; i++) { rep = c->videos[i]; @@ -2115,6 +2460,7 @@ static int dash_read_header(AVFormatContext *s) if (!stream_index) return AVERROR_INVALIDDATA; + } /* Create a program */ program = av_new_program(s, 0); @@ -2366,7 +2712,10 @@ static const AVOption dash_options[] = { OFFSET(allowed_extensions), AV_OPT_TYPE_STRING, {.str = "aac,m4a,m4s,m4v,mov,mp4,webm,ts"}, INT_MIN, INT_MAX, FLAGS}, - { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS }, + { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), + AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS }, + { "init_threads", "Number of threads to use for initializing the DASH stream", + OFFSET(init_threads), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 64, FLAGS }, {NULL} };