diff mbox series

[FFmpeg-devel,v2] lavf/dashdec: Multithreaded DASH initialization

Message ID 20220821192636.734-1-lukas.fellechner@gmx.net
State New
Headers show
Series [FFmpeg-devel,v2] lavf/dashdec: Multithreaded DASH initialization | expand

Checks

Context Check Description
yinshiyou/commit_msg_loongarch64 warning Please wrap lines in the body of the commit message between 60 and 72 characters.
andriy/commit_msg_x86 warning Please wrap lines in the body of the commit message between 60 and 72 characters.
yinshiyou/make_loongarch64 success Make finished
yinshiyou/make_fate_loongarch64 success Make fate finished
andriy/make_x86 success Make finished
andriy/make_fate_x86 success Make fate finished

Commit Message

Lukas Fellechner Aug. 21, 2022, 7:26 p.m. UTC
Initializing DASH streams is currently slow, because each individual stream is opened and probed sequentially. With DASH streams often having somewhere between 10-20 streams, this can easily take up to half a minute. This patch adds an "init-threads" option, specifying the max number of threads to use. Multiple worker threads are spun up to massively bring down init times.
---
 libavformat/dashdec.c | 432 +++++++++++++++++++++++++++++++++++++-----
 1 file changed, 386 insertions(+), 46 deletions(-)

--
2.31.1.windows.1

Comments

Steven Liu Aug. 23, 2022, 3:19 a.m. UTC | #1
Lukas Fellechner <lukas.fellechner@gmx.net> 于2022年8月22日周一 03:27写道:
>
> Initializing DASH streams is currently slow, because each individual stream is opened and probed sequentially. With DASH streams often having somewhere between 10-20 streams, this can easily take up to half a minute. This patch adds an "init-threads" option, specifying the max number of threads to use. Multiple worker threads are spun up to massively bring down init times.
> ---
>  libavformat/dashdec.c | 432 +++++++++++++++++++++++++++++++++++++-----
>  1 file changed, 386 insertions(+), 46 deletions(-)
>
> diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c
> index 63bf7e96a5..7eca3e3415 100644
> --- a/libavformat/dashdec.c
> +++ b/libavformat/dashdec.c
> @@ -24,6 +24,7 @@
>  #include "libavutil/opt.h"
>  #include "libavutil/time.h"
>  #include "libavutil/parseutils.h"
> +#include "libavutil/thread.h"
>  #include "internal.h"
>  #include "avio_internal.h"
>  #include "dash.h"
> @@ -152,6 +153,8 @@ typedef struct DASHContext {
>      int max_url_size;
>      char *cenc_decryption_key;
>
> +    int init_threads;
> +
>      /* Flags for init section*/
>      int is_init_section_common_video;
>      int is_init_section_common_audio;
> @@ -1918,22 +1921,40 @@ fail:
>      return ret;
>  }
>
> -static int open_demux_for_component(AVFormatContext *s, struct representation *pls)
> +static int open_demux_for_component(AVFormatContext* s, struct representation* pls)
> +{
> +    int ret = 0;
> +
> +    ret = begin_open_demux_for_component(s, pls);
> +    if (ret < 0)
> +        return ret;
> +
> +    ret = end_open_demux_for_component(s, pls);
> +
> +    return ret;
> +}
> +
> +static int begin_open_demux_for_component(AVFormatContext* s, struct representation* pls)
>  {
>      int ret = 0;
> -    int i;
>
>      pls->parent = s;
> -    pls->cur_seq_no  = calc_cur_seg_no(s, pls);
> +    pls->cur_seq_no = calc_cur_seg_no(s, pls);
>
>      if (!pls->last_seq_no) {
>          pls->last_seq_no = calc_max_seg_no(pls, s->priv_data);
>      }
>
>      ret = reopen_demux_for_component(s, pls);
> -    if (ret < 0) {
> -        goto fail;
> -    }
> +
> +    return ret;
> +}
> +
> +static int end_open_demux_for_component(AVFormatContext* s, struct representation* pls)
> +{
> +    int ret = 0;
> +    int i;
> +
>      for (i = 0; i < pls->ctx->nb_streams; i++) {
>          AVStream *st = avformat_new_stream(s, NULL);
>          AVStream *ist = pls->ctx->streams[i];
> @@ -2015,6 +2036,135 @@ static void move_metadata(AVStream *st, const char *key, char **value)
>      }
>  }
I look at the new functions likes begin_open_demux_for_component and
end_open_demux_for_component maybe can separate patch.
maybe you can submit two patch, one is make code clarify, the other
one support multithreads,

BTW, i saw there have two warning message about patch commit cooments
in patchwork: https://patchwork.ffmpeg.org/project/ffmpeg/patch/20220821192636.734-1-lukas.fellechner@gmx.net/

>
> +#if HAVE_THREADS
> +
> +struct work_pool_data
> +{
> +    AVFormatContext* ctx;
> +    struct representation* pls;
> +    struct representation* common_pls;
> +    pthread_mutex_t* common_mutex;
> +    pthread_cond_t* common_condition;
> +    int is_common;
> +    int is_started;
> +    int result;
> +};
> +
> +struct thread_data
> +{
> +    pthread_t thread;
> +    pthread_mutex_t* mutex;
> +    struct work_pool_data* work_pool;
> +    int work_pool_size;
> +    int is_started;
> +};
> +
> +static void *worker_thread(void *ptr)
> +{
> +    int ret = 0;
> +    int i;
> +    struct thread_data* thread_data = (struct thread_data*)ptr;
> +    struct work_pool_data* work_pool = NULL;
> +    struct work_pool_data* data = NULL;
> +    for (;;) {
> +
> +        // get next work item
> +        pthread_mutex_lock(thread_data->mutex);
> +        data = NULL;
> +        work_pool = thread_data->work_pool;
> +        for (i = 0; i < thread_data->work_pool_size; i++) {
> +            if (!work_pool->is_started) {
> +                data = work_pool;
> +                data->is_started = 1;
> +                break;
> +            }
> +            work_pool++;
> +        }
> +        pthread_mutex_unlock(thread_data->mutex);
> +
> +        if (!data) {
> +            // no more work to do
> +            return NULL;
> +        }
> +
> +        // if we are common section provider, init and signal
> +        if (data->is_common) {
> +            data->pls->parent = data->ctx;
> +            ret = update_init_section(data->pls);
> +            if (ret < 0) {
> +                pthread_cond_signal(data->common_condition);
> +                goto end;
> +            }
> +            else
> +                ret = AVERROR(pthread_cond_signal(data->common_condition));
> +        }
> +
> +        // if we depend on common section provider, wait for signal and copy
> +        if (data->common_pls) {
> +            ret = AVERROR(pthread_cond_wait(data->common_condition, data->common_mutex));
> +            if (ret < 0)
> +                goto end;
> +
> +            if (!data->common_pls->init_sec_buf) {
> +                goto end;
> +                ret = AVERROR(EFAULT);
> +            }
> +
> +            ret = copy_init_section(data->pls, data->common_pls);
> +            if (ret < 0)
> +                goto end;
> +        }
> +
> +        ret = begin_open_demux_for_component(data->ctx, data->pls);
> +        if (ret < 0)
> +            goto end;
> +
> +    end:
> +        data->result = ret;
> +    }
> +
> +
> +    return NULL;
> +}
> +
> +static void create_work_pool_data(AVFormatContext* ctx, int stream_index,
> +    struct representation* pls, struct representation* common_pls,
> +    struct work_pool_data* init_data, pthread_mutex_t* common_mutex,
> +    pthread_cond_t* common_condition)
> +{
> +    init_data->ctx = ctx;
> +    init_data->pls = pls;
> +    init_data->pls->stream_index = stream_index;
> +    init_data->common_condition = common_condition;
> +    init_data->common_mutex = common_mutex;
> +    init_data->result = -1;
> +
> +    if (pls == common_pls) {
> +        init_data->is_common = 1;
> +    }
> +    else if (common_pls) {
> +        init_data->common_pls = common_pls;
> +    }
> +}
> +
> +static int start_thread(struct thread_data *thread_data,
> +    struct work_pool_data *work_pool, int work_pool_size, pthread_mutex_t *mutex)
> +{
> +    int ret;
> +
> +    thread_data->mutex = mutex;
> +    thread_data->work_pool = work_pool;
> +    thread_data->work_pool_size = work_pool_size;
> +
> +    ret = AVERROR(pthread_create(&thread_data->thread, NULL, worker_thread, (void*)thread_data));
> +    if (ret == 0)
> +        thread_data->is_started = 1;
> +
> +    return ret;
> +}
> +
> +#endif
> +
>  static int dash_read_header(AVFormatContext *s)
>  {
>      DASHContext *c = s->priv_data;
> @@ -2040,63 +2190,252 @@ static int dash_read_header(AVFormatContext *s)
>          av_dict_set(&c->avio_opts, "seekable", "0", 0);
>      }
>
> -    if(c->n_videos)
> +    if (c->n_videos)
>          c->is_init_section_common_video = is_common_init_section_exist(c->videos, c->n_videos);
>
> -    /* Open the demuxer for video and audio components if available */
> -    for (i = 0; i < c->n_videos; i++) {
> -        rep = c->videos[i];
> -        if (i > 0 && c->is_init_section_common_video) {
> -            ret = copy_init_section(rep, c->videos[0]);
> +    if (c->n_audios)
> +        c->is_init_section_common_audio = is_common_init_section_exist(c->audios, c->n_audios);
> +
> +    if (c->n_subtitles)
> +        c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);
> +
> +    int threads = 0;
> +    int nstreams = c->n_videos + c->n_audios + c->n_subtitles;
> +
> +#if HAVE_THREADS
> +    threads = FFMIN(nstreams, c->init_threads);
> +#endif
> +
> +    if (threads > 1)
> +    {
> +#if HAVE_THREADS
> +        // alloc data
> +        struct work_pool_data* init_data = (struct work_pool_data*)av_mallocz(sizeof(struct work_pool_data) * nstreams);
> +        if (!init_data)
> +            return AVERROR(ENOMEM);
> +
> +        struct thread_data* thread_data = (struct thread_data*)av_mallocz(sizeof(struct thread_data) * threads);
> +        if (!thread_data)
> +            return AVERROR(ENOMEM);
> +
> +        // alloc mutex and conditions
> +        pthread_mutex_t work_pool_mutex;
> +
> +        pthread_mutex_t common_video_mutex;
> +        pthread_cond_t common_video_cond;
> +
> +        pthread_mutex_t common_audio_mutex;
> +        pthread_cond_t common_audio_cond;
> +
> +        pthread_mutex_t common_subtitle_mutex;
> +        pthread_cond_t common_subtitle_cond;
> +
> +        // init mutex and conditions
> +        ret = AVERROR(pthread_mutex_init(&work_pool_mutex, NULL));
> +        if (ret < 0)
> +            goto cleanup;
> +
> +        if (c->is_init_section_common_video) {
> +            ret = AVERROR(pthread_mutex_init(&common_video_mutex, NULL));
>              if (ret < 0)
> -                return ret;
> +                goto cleanup;
> +
> +            ret = AVERROR(pthread_cond_init(&common_video_cond, NULL));
> +            if (ret < 0)
> +                goto cleanup;
>          }
> -        ret = open_demux_for_component(s, rep);
>
> -        if (ret)
> -            return ret;
> -        rep->stream_index = stream_index;
> -        ++stream_index;
> -    }
> +        if (c->is_init_section_common_audio) {
> +            ret = AVERROR(pthread_mutex_init(&common_audio_mutex, NULL));
> +            if (ret < 0)
> +                goto cleanup;
>
> -    if(c->n_audios)
> -        c->is_init_section_common_audio = is_common_init_section_exist(c->audios, c->n_audios);
> +            ret = AVERROR(pthread_cond_init(&common_audio_cond, NULL));
> +            if (ret < 0)
> +                goto cleanup;
> +        }
>
> -    for (i = 0; i < c->n_audios; i++) {
> -        rep = c->audios[i];
> -        if (i > 0 && c->is_init_section_common_audio) {
> -            ret = copy_init_section(rep, c->audios[0]);
> +        if (c->is_init_section_common_subtitle) {
> +            ret = AVERROR(pthread_mutex_init(&common_subtitle_mutex, NULL));
>              if (ret < 0)
> -                return ret;
> +                goto cleanup;
> +
> +            ret = AVERROR(pthread_cond_init(&common_subtitle_cond, NULL));
> +            if (ret < 0)
> +                goto cleanup;
>          }
> -        ret = open_demux_for_component(s, rep);
>
> -        if (ret)
> -            return ret;
> -        rep->stream_index = stream_index;
> -        ++stream_index;
> -    }
> +        // init work pool data
> +        struct work_pool_data* current_data = init_data;
>
> -    if (c->n_subtitles)
> -        c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);
> +        for (i = 0; i < c->n_videos; i++) {
> +            create_work_pool_data(s, stream_index, c->videos[i],
> +                c->is_init_section_common_video ? c->videos[0] : NULL,
> +                current_data, &common_video_mutex, &common_video_cond);
>
> -    for (i = 0; i < c->n_subtitles; i++) {
> -        rep = c->subtitles[i];
> -        if (i > 0 && c->is_init_section_common_subtitle) {
> -            ret = copy_init_section(rep, c->subtitles[0]);
> +            stream_index++;
> +            current_data++;
> +        }
> +
> +        for (i = 0; i < c->n_audios; i++) {
> +            create_work_pool_data(s, stream_index, c->audios[i],
> +                c->is_init_section_common_audio ? c->audios[0] : NULL,
> +                current_data, &common_audio_mutex, &common_audio_cond);
> +
> +            stream_index++;
> +            current_data++;
> +        }
> +
> +        for (i = 0; i < c->n_subtitles; i++) {
> +            create_work_pool_data(s, stream_index, c->subtitles[i],
> +                c->is_init_section_common_subtitle ? c->subtitles[0] : NULL,
> +                current_data, &common_subtitle_mutex, &common_subtitle_cond);
> +
> +            stream_index++;
> +            current_data++;
> +        }
> +
> +        // start threads
> +        struct thread_data* current_thread = thread_data;
> +        for (i = 0; i < threads; i++) {
> +            ret = start_thread(current_thread, init_data, nstreams, &work_pool_mutex);
>              if (ret < 0)
> -                return ret;
> +                goto cleanup;
> +
> +            current_thread++;
>          }
> -        ret = open_demux_for_component(s, rep);
>
> -        if (ret)
> -            return ret;
> -        rep->stream_index = stream_index;
> -        ++stream_index;
> +    cleanup:
> +        // we need to cleanup even in case of errors, so we need to store results of init, run and cleanup
> +        int initResult = ret;
> +        int runResult = 0;
> +        int cleanupResult = 0;
> +
> +        // join threads
> +        current_thread = thread_data;
> +        for (i = 0; i < threads; i++) {
> +            if (current_thread->is_started) {
> +                ret = AVERROR(pthread_join(current_thread->thread, NULL));
> +                if (ret < 0)
> +                    cleanupResult = ret;
> +            }
> +            current_thread++;
> +        }
> +
> +        // finalize streams and collect results
> +        current_data = init_data;
> +        for (i = 0; i < nstreams; i++) {
> +            if (current_data->result < 0) {
> +                // thread ran into error: collect result
> +                runResult = current_data->result;
> +            }
> +            else {
> +                // thread success: create streams on AVFormatContext
> +                ret = end_open_demux_for_component(s, current_data->pls);
> +                if (ret < 0)
> +                    runResult = ret;
> +            }
> +            current_data++;
> +        }
> +
> +        // cleanup mutex and conditions
> +        ret = AVERROR(pthread_mutex_destroy(&work_pool_mutex));
> +        if (ret < 0)
> +            cleanupResult = ret;
> +
> +        if (c->is_init_section_common_video) {
> +            ret = AVERROR(pthread_mutex_destroy(&common_video_mutex));
> +            if (ret < 0)
> +                cleanupResult = ret;
> +
> +            ret = AVERROR(pthread_cond_destroy(&common_video_cond));
> +            if (ret < 0)
> +                cleanupResult = ret;
> +        }
> +
> +        if (c->is_init_section_common_audio) {
> +            ret = AVERROR(pthread_mutex_destroy(&common_audio_mutex));
> +            if (ret < 0)
> +                cleanupResult = ret;
> +
> +            ret = AVERROR(pthread_cond_destroy(&common_audio_cond));
> +            if (ret < 0)
> +                cleanupResult = ret;
> +        }
> +
> +        if (c->is_init_section_common_subtitle) {
> +            ret = AVERROR(pthread_mutex_destroy(&common_subtitle_mutex));
> +            if (ret < 0)
> +                cleanupResult = ret;
> +
> +            ret = AVERROR(pthread_cond_destroy(&common_subtitle_cond));
> +            if (ret < 0)
> +                cleanupResult = ret;
> +        }
> +
> +        // return results if errors have occured in one of the phases
> +        if (initResult < 0)
> +            return initResult;
> +
> +        if (runResult < 0)
> +            return runResult;
> +
> +        if (cleanupResult < 0)
> +            return cleanupResult;
> +
> +#endif
>      }
> +    else
> +    {
> +        /* Open the demuxer for video and audio components if available */
> +        for (i = 0; i < c->n_videos; i++) {
> +            rep = c->videos[i];
> +            if (i > 0 && c->is_init_section_common_video) {
> +                ret = copy_init_section(rep, c->videos[0]);
> +                if (ret < 0)
> +                    return ret;
> +            }
> +            ret = open_demux_for_component(s, rep);
>
> -    if (!stream_index)
> -        return AVERROR_INVALIDDATA;
> +            if (ret)
> +                return ret;
> +            rep->stream_index = stream_index;
> +            ++stream_index;
> +        }
> +
> +        for (i = 0; i < c->n_audios; i++) {
> +            rep = c->audios[i];
> +            if (i > 0 && c->is_init_section_common_audio) {
> +                ret = copy_init_section(rep, c->audios[0]);
> +                if (ret < 0)
> +                    return ret;
> +            }
> +            ret = open_demux_for_component(s, rep);
> +
> +            if (ret)
> +                return ret;
> +            rep->stream_index = stream_index;
> +            ++stream_index;
> +        }
> +
> +        for (i = 0; i < c->n_subtitles; i++) {
> +            rep = c->subtitles[i];
> +            if (i > 0 && c->is_init_section_common_subtitle) {
> +                ret = copy_init_section(rep, c->subtitles[0]);
> +                if (ret < 0)
> +                    return ret;
> +            }
> +            ret = open_demux_for_component(s, rep);
> +
> +            if (ret)
> +                return ret;
> +            rep->stream_index = stream_index;
> +            ++stream_index;
> +        }
> +
> +        if (!stream_index)
> +            return AVERROR_INVALIDDATA;
> +    }
>
>      /* Create a program */
>      program = av_new_program(s, 0);
> @@ -2349,6 +2688,7 @@ static const AVOption dash_options[] = {
>          {.str = "aac,m4a,m4s,m4v,mov,mp4,webm,ts"},
>          INT_MIN, INT_MAX, FLAGS},
>      { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS },
> +    { "init_threads", "Number of threads to use for initializing the DASH stream", OFFSET(init_threads), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 64, FLAGS },
>      {NULL}
>  };
>
> --
> 2.31.1.windows.1
>
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel@ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
Lukas Fellechner Aug. 23, 2022, 7:09 p.m. UTC | #2
Gesendet: Dienstag, 23. August 2022 um 05:19 Uhr
Von: "Steven Liu" <lingjiujianke@gmail.com>
An: "FFmpeg development discussions and patches" <ffmpeg-devel@ffmpeg.org>
Betreff: Re: [FFmpeg-devel] [PATCH v2] lavf/dashdec: Multithreaded DASH initialization
Lukas Fellechner <lukas.fellechner@gmx.net> 于2022年8月22日周一 03:27写道:
>
> I look at the new functions likes begin_open_demux_for_component and
> end_open_demux_for_component maybe can separate patch.
> maybe you can submit two patch, one is make code clarify, the other
> one support multithreads,

Good idea. I split up the patch into three parts actually. Git seems to
be pretty bad in handling indentation changes, which made the first patch
look awful, although there were only very small changes.

So I pulled out the indentation change into a separate patch.
Not sure if that is a good idea, but it makes the other two patches
much more readable.
diff mbox series

Patch

diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c
index 63bf7e96a5..7eca3e3415 100644
--- a/libavformat/dashdec.c
+++ b/libavformat/dashdec.c
@@ -24,6 +24,7 @@ 
 #include "libavutil/opt.h"
 #include "libavutil/time.h"
 #include "libavutil/parseutils.h"
+#include "libavutil/thread.h"
 #include "internal.h"
 #include "avio_internal.h"
 #include "dash.h"
@@ -152,6 +153,8 @@  typedef struct DASHContext {
     int max_url_size;
     char *cenc_decryption_key;

+    int init_threads;
+
     /* Flags for init section*/
     int is_init_section_common_video;
     int is_init_section_common_audio;
@@ -1918,22 +1921,40 @@  fail:
     return ret;
 }

-static int open_demux_for_component(AVFormatContext *s, struct representation *pls)
+static int open_demux_for_component(AVFormatContext* s, struct representation* pls)
+{
+    int ret = 0;
+
+    ret = begin_open_demux_for_component(s, pls);
+    if (ret < 0)
+        return ret;
+
+    ret = end_open_demux_for_component(s, pls);
+
+    return ret;
+}
+
+static int begin_open_demux_for_component(AVFormatContext* s, struct representation* pls)
 {
     int ret = 0;
-    int i;

     pls->parent = s;
-    pls->cur_seq_no  = calc_cur_seg_no(s, pls);
+    pls->cur_seq_no = calc_cur_seg_no(s, pls);

     if (!pls->last_seq_no) {
         pls->last_seq_no = calc_max_seg_no(pls, s->priv_data);
     }

     ret = reopen_demux_for_component(s, pls);
-    if (ret < 0) {
-        goto fail;
-    }
+
+    return ret;
+}
+
+static int end_open_demux_for_component(AVFormatContext* s, struct representation* pls)
+{
+    int ret = 0;
+    int i;
+
     for (i = 0; i < pls->ctx->nb_streams; i++) {
         AVStream *st = avformat_new_stream(s, NULL);
         AVStream *ist = pls->ctx->streams[i];
@@ -2015,6 +2036,135 @@  static void move_metadata(AVStream *st, const char *key, char **value)
     }
 }

+#if HAVE_THREADS
+
+struct work_pool_data
+{
+    AVFormatContext* ctx;
+    struct representation* pls;
+    struct representation* common_pls;
+    pthread_mutex_t* common_mutex;
+    pthread_cond_t* common_condition;
+    int is_common;
+    int is_started;
+    int result;
+};
+
+struct thread_data
+{
+    pthread_t thread;
+    pthread_mutex_t* mutex;
+    struct work_pool_data* work_pool;
+    int work_pool_size;
+    int is_started;
+};
+
+static void *worker_thread(void *ptr)
+{
+    int ret = 0;
+    int i;
+    struct thread_data* thread_data = (struct thread_data*)ptr;
+    struct work_pool_data* work_pool = NULL;
+    struct work_pool_data* data = NULL;
+    for (;;) {
+
+        // get next work item
+        pthread_mutex_lock(thread_data->mutex);
+        data = NULL;
+        work_pool = thread_data->work_pool;
+        for (i = 0; i < thread_data->work_pool_size; i++) {
+            if (!work_pool->is_started) {
+                data = work_pool;
+                data->is_started = 1;
+                break;
+            }
+            work_pool++;
+        }
+        pthread_mutex_unlock(thread_data->mutex);
+
+        if (!data) {
+            // no more work to do
+            return NULL;
+        }
+
+        // if we are common section provider, init and signal
+        if (data->is_common) {
+            data->pls->parent = data->ctx;
+            ret = update_init_section(data->pls);
+            if (ret < 0) {
+                pthread_cond_signal(data->common_condition);
+                goto end;
+            }
+            else
+                ret = AVERROR(pthread_cond_signal(data->common_condition));
+        }
+
+        // if we depend on common section provider, wait for signal and copy
+        if (data->common_pls) {
+            ret = AVERROR(pthread_cond_wait(data->common_condition, data->common_mutex));
+            if (ret < 0)
+                goto end;
+
+            if (!data->common_pls->init_sec_buf) {
+                goto end;
+                ret = AVERROR(EFAULT);
+            }
+
+            ret = copy_init_section(data->pls, data->common_pls);
+            if (ret < 0)
+                goto end;
+        }
+
+        ret = begin_open_demux_for_component(data->ctx, data->pls);
+        if (ret < 0)
+            goto end;
+
+    end:
+        data->result = ret;
+    }
+
+
+    return NULL;
+}
+
+static void create_work_pool_data(AVFormatContext* ctx, int stream_index,
+    struct representation* pls, struct representation* common_pls,
+    struct work_pool_data* init_data, pthread_mutex_t* common_mutex,
+    pthread_cond_t* common_condition)
+{
+    init_data->ctx = ctx;
+    init_data->pls = pls;
+    init_data->pls->stream_index = stream_index;
+    init_data->common_condition = common_condition;
+    init_data->common_mutex = common_mutex;
+    init_data->result = -1;
+
+    if (pls == common_pls) {
+        init_data->is_common = 1;
+    }
+    else if (common_pls) {
+        init_data->common_pls = common_pls;
+    }
+}
+
+static int start_thread(struct thread_data *thread_data,
+    struct work_pool_data *work_pool, int work_pool_size, pthread_mutex_t *mutex)
+{
+    int ret;
+
+    thread_data->mutex = mutex;
+    thread_data->work_pool = work_pool;
+    thread_data->work_pool_size = work_pool_size;
+
+    ret = AVERROR(pthread_create(&thread_data->thread, NULL, worker_thread, (void*)thread_data));
+    if (ret == 0)
+        thread_data->is_started = 1;
+
+    return ret;
+}
+
+#endif
+
 static int dash_read_header(AVFormatContext *s)
 {
     DASHContext *c = s->priv_data;
@@ -2040,63 +2190,252 @@  static int dash_read_header(AVFormatContext *s)
         av_dict_set(&c->avio_opts, "seekable", "0", 0);
     }

-    if(c->n_videos)
+    if (c->n_videos)
         c->is_init_section_common_video = is_common_init_section_exist(c->videos, c->n_videos);

-    /* Open the demuxer for video and audio components if available */
-    for (i = 0; i < c->n_videos; i++) {
-        rep = c->videos[i];
-        if (i > 0 && c->is_init_section_common_video) {
-            ret = copy_init_section(rep, c->videos[0]);
+    if (c->n_audios)
+        c->is_init_section_common_audio = is_common_init_section_exist(c->audios, c->n_audios);
+
+    if (c->n_subtitles)
+        c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);
+
+    int threads = 0;
+    int nstreams = c->n_videos + c->n_audios + c->n_subtitles;
+
+#if HAVE_THREADS
+    threads = FFMIN(nstreams, c->init_threads);
+#endif
+
+    if (threads > 1)
+    {
+#if HAVE_THREADS
+        // alloc data
+        struct work_pool_data* init_data = (struct work_pool_data*)av_mallocz(sizeof(struct work_pool_data) * nstreams);
+        if (!init_data)
+            return AVERROR(ENOMEM);
+
+        struct thread_data* thread_data = (struct thread_data*)av_mallocz(sizeof(struct thread_data) * threads);
+        if (!thread_data)
+            return AVERROR(ENOMEM);
+
+        // alloc mutex and conditions
+        pthread_mutex_t work_pool_mutex;
+
+        pthread_mutex_t common_video_mutex;
+        pthread_cond_t common_video_cond;
+
+        pthread_mutex_t common_audio_mutex;
+        pthread_cond_t common_audio_cond;
+
+        pthread_mutex_t common_subtitle_mutex;
+        pthread_cond_t common_subtitle_cond;
+
+        // init mutex and conditions
+        ret = AVERROR(pthread_mutex_init(&work_pool_mutex, NULL));
+        if (ret < 0)
+            goto cleanup;
+
+        if (c->is_init_section_common_video) {
+            ret = AVERROR(pthread_mutex_init(&common_video_mutex, NULL));
             if (ret < 0)
-                return ret;
+                goto cleanup;
+
+            ret = AVERROR(pthread_cond_init(&common_video_cond, NULL));
+            if (ret < 0)
+                goto cleanup;
         }
-        ret = open_demux_for_component(s, rep);

-        if (ret)
-            return ret;
-        rep->stream_index = stream_index;
-        ++stream_index;
-    }
+        if (c->is_init_section_common_audio) {
+            ret = AVERROR(pthread_mutex_init(&common_audio_mutex, NULL));
+            if (ret < 0)
+                goto cleanup;

-    if(c->n_audios)
-        c->is_init_section_common_audio = is_common_init_section_exist(c->audios, c->n_audios);
+            ret = AVERROR(pthread_cond_init(&common_audio_cond, NULL));
+            if (ret < 0)
+                goto cleanup;
+        }

-    for (i = 0; i < c->n_audios; i++) {
-        rep = c->audios[i];
-        if (i > 0 && c->is_init_section_common_audio) {
-            ret = copy_init_section(rep, c->audios[0]);
+        if (c->is_init_section_common_subtitle) {
+            ret = AVERROR(pthread_mutex_init(&common_subtitle_mutex, NULL));
             if (ret < 0)
-                return ret;
+                goto cleanup;
+
+            ret = AVERROR(pthread_cond_init(&common_subtitle_cond, NULL));
+            if (ret < 0)
+                goto cleanup;
         }
-        ret = open_demux_for_component(s, rep);

-        if (ret)
-            return ret;
-        rep->stream_index = stream_index;
-        ++stream_index;
-    }
+        // init work pool data
+        struct work_pool_data* current_data = init_data;

-    if (c->n_subtitles)
-        c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);
+        for (i = 0; i < c->n_videos; i++) {
+            create_work_pool_data(s, stream_index, c->videos[i],
+                c->is_init_section_common_video ? c->videos[0] : NULL,
+                current_data, &common_video_mutex, &common_video_cond);

-    for (i = 0; i < c->n_subtitles; i++) {
-        rep = c->subtitles[i];
-        if (i > 0 && c->is_init_section_common_subtitle) {
-            ret = copy_init_section(rep, c->subtitles[0]);
+            stream_index++;
+            current_data++;
+        }
+
+        for (i = 0; i < c->n_audios; i++) {
+            create_work_pool_data(s, stream_index, c->audios[i],
+                c->is_init_section_common_audio ? c->audios[0] : NULL,
+                current_data, &common_audio_mutex, &common_audio_cond);
+
+            stream_index++;
+            current_data++;
+        }
+
+        for (i = 0; i < c->n_subtitles; i++) {
+            create_work_pool_data(s, stream_index, c->subtitles[i],
+                c->is_init_section_common_subtitle ? c->subtitles[0] : NULL,
+                current_data, &common_subtitle_mutex, &common_subtitle_cond);
+
+            stream_index++;
+            current_data++;
+        }
+
+        // start threads
+        struct thread_data* current_thread = thread_data;
+        for (i = 0; i < threads; i++) {
+            ret = start_thread(current_thread, init_data, nstreams, &work_pool_mutex);
             if (ret < 0)
-                return ret;
+                goto cleanup;
+
+            current_thread++;
         }
-        ret = open_demux_for_component(s, rep);

-        if (ret)
-            return ret;
-        rep->stream_index = stream_index;
-        ++stream_index;
+    cleanup:
+        // we need to cleanup even in case of errors, so we need to store results of init, run and cleanup
+        int initResult = ret;
+        int runResult = 0;
+        int cleanupResult = 0;
+
+        // join threads
+        current_thread = thread_data;
+        for (i = 0; i < threads; i++) {
+            if (current_thread->is_started) {
+                ret = AVERROR(pthread_join(current_thread->thread, NULL));
+                if (ret < 0)
+                    cleanupResult = ret;
+            }
+            current_thread++;
+        }
+
+        // finalize streams and collect results
+        current_data = init_data;
+        for (i = 0; i < nstreams; i++) {
+            if (current_data->result < 0) {
+                // thread ran into error: collect result
+                runResult = current_data->result;
+            }
+            else {
+                // thread success: create streams on AVFormatContext
+                ret = end_open_demux_for_component(s, current_data->pls);
+                if (ret < 0)
+                    runResult = ret;
+            }
+            current_data++;
+        }
+
+        // cleanup mutex and conditions
+        ret = AVERROR(pthread_mutex_destroy(&work_pool_mutex));
+        if (ret < 0)
+            cleanupResult = ret;
+
+        if (c->is_init_section_common_video) {
+            ret = AVERROR(pthread_mutex_destroy(&common_video_mutex));
+            if (ret < 0)
+                cleanupResult = ret;
+
+            ret = AVERROR(pthread_cond_destroy(&common_video_cond));
+            if (ret < 0)
+                cleanupResult = ret;
+        }
+
+        if (c->is_init_section_common_audio) {
+            ret = AVERROR(pthread_mutex_destroy(&common_audio_mutex));
+            if (ret < 0)
+                cleanupResult = ret;
+
+            ret = AVERROR(pthread_cond_destroy(&common_audio_cond));
+            if (ret < 0)
+                cleanupResult = ret;
+        }
+
+        if (c->is_init_section_common_subtitle) {
+            ret = AVERROR(pthread_mutex_destroy(&common_subtitle_mutex));
+            if (ret < 0)
+                cleanupResult = ret;
+
+            ret = AVERROR(pthread_cond_destroy(&common_subtitle_cond));
+            if (ret < 0)
+                cleanupResult = ret;
+        }
+
+        // return results if errors have occured in one of the phases
+        if (initResult < 0)
+            return initResult;
+
+        if (runResult < 0)
+            return runResult;
+
+        if (cleanupResult < 0)
+            return cleanupResult;
+
+#endif
     }
+    else
+    {
+        /* Open the demuxer for video and audio components if available */
+        for (i = 0; i < c->n_videos; i++) {
+            rep = c->videos[i];
+            if (i > 0 && c->is_init_section_common_video) {
+                ret = copy_init_section(rep, c->videos[0]);
+                if (ret < 0)
+                    return ret;
+            }
+            ret = open_demux_for_component(s, rep);

-    if (!stream_index)
-        return AVERROR_INVALIDDATA;
+            if (ret)
+                return ret;
+            rep->stream_index = stream_index;
+            ++stream_index;
+        }
+
+        for (i = 0; i < c->n_audios; i++) {
+            rep = c->audios[i];
+            if (i > 0 && c->is_init_section_common_audio) {
+                ret = copy_init_section(rep, c->audios[0]);
+                if (ret < 0)
+                    return ret;
+            }
+            ret = open_demux_for_component(s, rep);
+
+            if (ret)
+                return ret;
+            rep->stream_index = stream_index;
+            ++stream_index;
+        }
+
+        for (i = 0; i < c->n_subtitles; i++) {
+            rep = c->subtitles[i];
+            if (i > 0 && c->is_init_section_common_subtitle) {
+                ret = copy_init_section(rep, c->subtitles[0]);
+                if (ret < 0)
+                    return ret;
+            }
+            ret = open_demux_for_component(s, rep);
+
+            if (ret)
+                return ret;
+            rep->stream_index = stream_index;
+            ++stream_index;
+        }
+
+        if (!stream_index)
+            return AVERROR_INVALIDDATA;
+    }

     /* Create a program */
     program = av_new_program(s, 0);
@@ -2349,6 +2688,7 @@  static const AVOption dash_options[] = {
         {.str = "aac,m4a,m4s,m4v,mov,mp4,webm,ts"},
         INT_MIN, INT_MAX, FLAGS},
     { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS },
+    { "init_threads", "Number of threads to use for initializing the DASH stream", OFFSET(init_threads), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 64, FLAGS },
     {NULL}
 };