diff mbox series

[FFmpeg-devel,v3,2/3] lavf/dashdec: Multithreaded DASH initialization

Message ID 20220823190326.249-3-lukas.fellechner@gmx.net
State New
Headers show
Series lavf/dashdec: Multithreaded DASH initialization | expand

Checks

Context Check Description
yinshiyou/make_loongarch64 success Make finished
yinshiyou/make_fate_loongarch64 success Make fate finished
andriy/make_x86 success Make finished
andriy/make_fate_x86 success Make fate finished

Commit Message

Lukas Fellechner Aug. 23, 2022, 7:03 p.m. UTC
This patch adds an "init-threads" option, specifying the max
number of threads to use. Multiple worker threads are spun up
to massively bring down init times.
---
 libavformat/dashdec.c | 351 +++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 350 insertions(+), 1 deletion(-)

--
2.31.1.windows.1

Comments

Andreas Rheinhardt Aug. 31, 2022, 2:54 a.m. UTC | #1
Lukas Fellechner:
> This patch adds an "init-threads" option, specifying the max
> number of threads to use. Multiple worker threads are spun up
> to massively bring down init times.
> ---
>  libavformat/dashdec.c | 351 +++++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 350 insertions(+), 1 deletion(-)
> 
> diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c
> index e82da45e43..20f2557ea3 100644
> --- a/libavformat/dashdec.c
> +++ b/libavformat/dashdec.c
> @@ -24,6 +24,7 @@
>  #include "libavutil/opt.h"
>  #include "libavutil/time.h"
>  #include "libavutil/parseutils.h"
> +#include "libavutil/thread.h"
>  #include "internal.h"
>  #include "avio_internal.h"
>  #include "dash.h"
> @@ -152,6 +153,8 @@ typedef struct DASHContext {
>      int max_url_size;
>      char *cenc_decryption_key;
> 
> +    int init_threads;
> +
>      /* Flags for init section*/
>      int is_init_section_common_video;
>      int is_init_section_common_audio;
> @@ -2033,6 +2036,331 @@ static void move_metadata(AVStream *st, const char *key, char **value)
>      }
>  }
> 
> +#if HAVE_THREADS
> +
> +struct work_pool_data
> +{
> +    AVFormatContext *ctx;
> +    struct representation *pls;
> +    struct representation *common_pls;
> +    pthread_mutex_t *common_mutex;
> +    pthread_cond_t *common_condition;
> +    int is_common;
> +    int is_started;
> +    int result;
> +};
> +
> +struct thread_data

This is against our naming conventions: CamelCase for struct tags and
typedefs, lowercase names with underscore for variable names.

> +{
> +    pthread_t thread;
> +    pthread_mutex_t *mutex;
> +    struct work_pool_data *work_pool;
> +    int work_pool_size;
> +    int is_started;
> +    int has_error;
> +};
> +
> +static void *worker_thread(void *ptr)
> +{
> +    int ret = 0;
> +    int i;
> +    struct thread_data *thread_data = (struct thread_data*)ptr;
> +    struct work_pool_data *work_pool = NULL;
> +    struct work_pool_data *data = NULL;
> +    for (;;) {
> +
> +        // get next work item unless there was an error
> +        pthread_mutex_lock(thread_data->mutex);
> +        data = NULL;
> +        if (!thread_data->has_error) {
> +            work_pool = thread_data->work_pool;
> +            for (i = 0; i < thread_data->work_pool_size; i++) {
> +                if (!work_pool->is_started) {
> +                    data = work_pool;
> +                    data->is_started = 1;
> +                    break;
> +                }
> +                work_pool++;
> +            }
> +        }
> +        pthread_mutex_unlock(thread_data->mutex);
> +
> +        if (!data) {
> +            // no more work to do
> +            return NULL;
> +        }
> +
> +        // if we are common section provider, init and signal
> +        if (data->is_common) {
> +            data->pls->parent = data->ctx;
> +            ret = update_init_section(data->pls);
> +            if (ret < 0) {
> +                pthread_cond_signal(data->common_condition);
> +                goto end;
> +            }
> +            else
> +                ret = AVERROR(pthread_cond_signal(data->common_condition));
> +        }
> +
> +        // if we depend on common section provider, wait for signal and copy
> +        if (data->common_pls) {
> +            ret = AVERROR(pthread_cond_wait(data->common_condition, data->common_mutex));
> +            if (ret < 0)
> +                goto end;
> +
> +            if (!data->common_pls->init_sec_buf) {
> +                goto end;
> +                ret = AVERROR(EFAULT);
> +            }
> +
> +            ret = copy_init_section(data->pls, data->common_pls);
> +            if (ret < 0)
> +                goto end;
> +        }
> +
> +        ret = begin_open_demux_for_component(data->ctx, data->pls);
> +        if (ret < 0)
> +            goto end;
> +
> +    end:
> +        data->result = ret;
> +
> +        // notify error to other threads and exit
> +        if (ret < 0) {
> +            pthread_mutex_lock(thread_data->mutex);
> +            thread_data->has_error = 1;
> +            pthread_mutex_unlock(thread_data->mutex);
> +            return NULL;
> +        }
> +    }
> +
> +
> +    return NULL;
> +}
> +
> +static void create_work_pool_data(AVFormatContext *ctx, int stream_index,
> +    struct representation *pls, struct representation *common_pls,
> +    struct work_pool_data *init_data, pthread_mutex_t *common_mutex,
> +    pthread_cond_t *common_condition)
> +{
> +    init_data->ctx = ctx;
> +    init_data->pls = pls;
> +    init_data->pls->stream_index = stream_index;
> +    init_data->common_condition = common_condition;
> +    init_data->common_mutex = common_mutex;
> +    init_data->result = -1;
> +
> +    if (pls == common_pls) {
> +        init_data->is_common = 1;
> +    }
> +    else if (common_pls) {
> +        init_data->common_pls = common_pls;
> +    }
> +}
> +
> +static int start_thread(struct thread_data *thread_data,
> +    struct work_pool_data *work_pool, int work_pool_size, pthread_mutex_t *mutex)
> +{
> +    int ret;
> +
> +    thread_data->mutex = mutex;
> +    thread_data->work_pool = work_pool;
> +    thread_data->work_pool_size = work_pool_size;
> +
> +    ret = AVERROR(pthread_create(&thread_data->thread, NULL, worker_thread, (void*)thread_data));
> +    if (ret == 0)
> +        thread_data->is_started = 1;
> +
> +    return ret;
> +}
> +
> +static int init_streams_multithreaded(AVFormatContext *s, int nstreams, int threads)
> +{
> +    DASHContext *c = s->priv_data;
> +    int ret = 0;
> +    int stream_index = 0;
> +    int i;

We allow "for (int i = 0;"

> +
> +    // alloc data
> +    struct work_pool_data *init_data = (struct work_pool_data*)av_mallocz(sizeof(struct work_pool_data) * nstreams);
> +    if (!init_data)
> +        return AVERROR(ENOMEM);
> +
> +    struct thread_data *thread_data = (struct thread_data*)av_mallocz(sizeof(struct thread_data) * threads);
> +    if (!thread_data)
> +        return AVERROR(ENOMEM);

1. init_data leaks here on error.
2. In fact, it seems to me that both init_data and thread_data are
nowhere freed.

> +
> +    // alloc mutex and conditions
> +    pthread_mutex_t work_pool_mutex;
> +
> +    pthread_mutex_t common_video_mutex;
> +    pthread_cond_t common_video_cond;
> +
> +    pthread_mutex_t common_audio_mutex;
> +    pthread_cond_t common_audio_cond;
> +
> +    pthread_mutex_t common_subtitle_mutex;
> +    pthread_cond_t common_subtitle_cond;
> +
> +    // init mutex and conditions
> +    ret = AVERROR(pthread_mutex_init(&work_pool_mutex, NULL));
> +    if (ret < 0)
> +        goto cleanup;
> +
> +    if (c->is_init_section_common_video) {
> +        ret = AVERROR(pthread_mutex_init(&common_video_mutex, NULL));
> +        if (ret < 0)
> +            goto cleanup;
> +
> +        ret = AVERROR(pthread_cond_init(&common_video_cond, NULL));
> +        if (ret < 0)
> +            goto cleanup;
> +    }
> +
> +    if (c->is_init_section_common_audio) {
> +        ret = AVERROR(pthread_mutex_init(&common_audio_mutex, NULL));
> +        if (ret < 0)
> +            goto cleanup;
> +
> +        ret = AVERROR(pthread_cond_init(&common_audio_cond, NULL));
> +        if (ret < 0)
> +            goto cleanup;
> +    }
> +
> +    if (c->is_init_section_common_subtitle) {
> +        ret = AVERROR(pthread_mutex_init(&common_subtitle_mutex, NULL));
> +        if (ret < 0)
> +            goto cleanup;
> +
> +        ret = AVERROR(pthread_cond_init(&common_subtitle_cond, NULL));
> +        if (ret < 0)
> +            goto cleanup;
> +    }
> +
> +    // init work pool data
> +    struct work_pool_data* current_data = init_data;
> +
> +    for (i = 0; i < c->n_videos; i++) {
> +        create_work_pool_data(s, stream_index, c->videos[i],
> +            c->is_init_section_common_video ? c->videos[0] : NULL,
> +            current_data, &common_video_mutex, &common_video_cond);
> +
> +        stream_index++;
> +        current_data++;
> +    }
> +
> +    for (i = 0; i < c->n_audios; i++) {
> +        create_work_pool_data(s, stream_index, c->audios[i],
> +            c->is_init_section_common_audio ? c->audios[0] : NULL,
> +            current_data, &common_audio_mutex, &common_audio_cond);
> +
> +        stream_index++;
> +        current_data++;
> +    }
> +
> +    for (i = 0; i < c->n_subtitles; i++) {
> +        create_work_pool_data(s, stream_index, c->subtitles[i],
> +            c->is_init_section_common_subtitle ? c->subtitles[0] : NULL,
> +            current_data, &common_subtitle_mutex, &common_subtitle_cond);
> +
> +        stream_index++;
> +        current_data++;
> +    }

This is very repetitive.

> +
> +    // start threads
> +    struct thread_data *current_thread = thread_data;
> +    for (i = 0; i < threads; i++) {
> +        ret = start_thread(current_thread, init_data, nstreams, &work_pool_mutex);
> +        if (ret < 0)
> +            goto cleanup;
> +
> +        current_thread++;
> +    }
> +
> +cleanup:
> +    // we need to cleanup even in case of errors, so we need to store results of init, run and cleanup
> +    int initResult = ret;
> +    int runResult = 0;
> +    int cleanupResult = 0;
> +
> +    // join threads
> +    current_thread = thread_data;
> +    for (i = 0; i < threads; i++) {
> +        if (current_thread->is_started) {
> +            ret = AVERROR(pthread_join(current_thread->thread, NULL));
> +            if (ret < 0)
> +                cleanupResult = ret;
> +        }
> +        current_thread++;
> +    }
> +
> +    // finalize streams and collect results
> +    current_data = init_data;
> +    for (i = 0; i < nstreams; i++) {
> +        if (current_data->result < 0) {
> +            // thread ran into error: collect result and break
> +            runResult = current_data->result;
> +            break;
> +        }
> +        else {
> +            // thread success: create streams on AVFormatContext
> +            ret = end_open_demux_for_component(s, current_data->pls);
> +            if (ret < 0)
> +                runResult = ret;
> +        }
> +        current_data++;
> +    }
> +
> +    // cleanup mutex and conditions
> +    ret = AVERROR(pthread_mutex_destroy(&work_pool_mutex));
> +    if (ret < 0)
> +        cleanupResult = ret;
> +
> +    if (c->is_init_section_common_video) {
> +        ret = AVERROR(pthread_mutex_destroy(&common_video_mutex));
> +        if (ret < 0)
> +            cleanupResult = ret;
> +
> +        ret = AVERROR(pthread_cond_destroy(&common_video_cond));
> +        if (ret < 0)
> +            cleanupResult = ret;
> +    }
> +
> +    if (c->is_init_section_common_audio) {
> +        ret = AVERROR(pthread_mutex_destroy(&common_audio_mutex));
> +        if (ret < 0)
> +            cleanupResult = ret;
> +
> +        ret = AVERROR(pthread_cond_destroy(&common_audio_cond));
> +        if (ret < 0)
> +            cleanupResult = ret;
> +    }
> +
> +    if (c->is_init_section_common_subtitle) {
> +        ret = AVERROR(pthread_mutex_destroy(&common_subtitle_mutex));
> +        if (ret < 0)
> +            cleanupResult = ret;
> +
> +        ret = AVERROR(pthread_cond_destroy(&common_subtitle_cond));
> +        if (ret < 0)
> +            cleanupResult = ret;
> +    }
> +
> +    // return results if errors have occured in one of the phases
> +    if (initResult < 0)
> +        return initResult;
> +
> +    if (runResult < 0)
> +        return runResult;
> +
> +    if (cleanupResult < 0)
> +        return cleanupResult;
> +
> +    return 0;
> +}
> +
> +#endif
> +
>  static int dash_read_header(AVFormatContext *s)
>  {
>      DASHContext *c = s->priv_data;
> @@ -2067,6 +2395,23 @@ static int dash_read_header(AVFormatContext *s)
>      if (c->n_subtitles)
>          c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);
> 
> +    int threads = 0;
> +    int nstreams = c->n_videos + c->n_audios + c->n_subtitles;
> +
> +#if HAVE_THREADS
> +    threads = FFMIN(nstreams, c->init_threads);
> +#endif
> +
> +    if (threads > 1)
> +    {
> +#if HAVE_THREADS
> +        ret = init_streams_multithreaded(s, nstreams, threads);
> +        if (ret < 0)
> +            return ret;
> +#endif
> +    }
> +    else
> +    {
>      /* Open the demuxer for video and audio components if available */
>      for (i = 0; i < c->n_videos; i++) {
>          rep = c->videos[i];
> @@ -2115,6 +2460,7 @@ static int dash_read_header(AVFormatContext *s)
> 
>      if (!stream_index)
>          return AVERROR_INVALIDDATA;
> +    }
> 
>      /* Create a program */
>      program = av_new_program(s, 0);
> @@ -2366,7 +2712,10 @@ static const AVOption dash_options[] = {
>          OFFSET(allowed_extensions), AV_OPT_TYPE_STRING,
>          {.str = "aac,m4a,m4s,m4v,mov,mp4,webm,ts"},
>          INT_MIN, INT_MAX, FLAGS},
> -    { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS },
> +    { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key),
> +        AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS },
> +    { "init_threads", "Number of threads to use for initializing the DASH stream",
> +        OFFSET(init_threads), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 64, FLAGS },
>      {NULL}
>  };
> 
> --
> 2.31.1.windows.1
> 

1. We actually have an API to process multiple tasks by different
threads: Look at libavutil/slicethread.h. Why can't you reuse that?
2. In case initialization of one of the conditions/mutexes fails, you
are nevertheless destroying them; you are even destroying completely
uninitialized mutexes. This is undefined behaviour. Checking the result
of it does not fix this.

- Andreas
Steven Liu Aug. 31, 2022, 7:25 a.m. UTC | #2
Andreas Rheinhardt <andreas.rheinhardt@outlook.com> 于2022年8月31日周三 10:54写道:
>
> Lukas Fellechner:
> > This patch adds an "init-threads" option, specifying the max
> > number of threads to use. Multiple worker threads are spun up
> > to massively bring down init times.
> > ---
> >  libavformat/dashdec.c | 351 +++++++++++++++++++++++++++++++++++++++++-
> >  1 file changed, 350 insertions(+), 1 deletion(-)
> >
> > diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c
> > index e82da45e43..20f2557ea3 100644
> > --- a/libavformat/dashdec.c
> > +++ b/libavformat/dashdec.c
> > @@ -24,6 +24,7 @@
> >  #include "libavutil/opt.h"
> >  #include "libavutil/time.h"
> >  #include "libavutil/parseutils.h"
> > +#include "libavutil/thread.h"
> >  #include "internal.h"
> >  #include "avio_internal.h"
> >  #include "dash.h"
> > @@ -152,6 +153,8 @@ typedef struct DASHContext {
> >      int max_url_size;
> >      char *cenc_decryption_key;
> >
> > +    int init_threads;
> > +
> >      /* Flags for init section*/
> >      int is_init_section_common_video;
> >      int is_init_section_common_audio;
> > @@ -2033,6 +2036,331 @@ static void move_metadata(AVStream *st, const char *key, char **value)
> >      }
> >  }
> >
> > +#if HAVE_THREADS
> > +
> > +struct work_pool_data
> > +{
> > +    AVFormatContext *ctx;
> > +    struct representation *pls;
> > +    struct representation *common_pls;
> > +    pthread_mutex_t *common_mutex;
> > +    pthread_cond_t *common_condition;
> > +    int is_common;
> > +    int is_started;
> > +    int result;
> > +};
> > +
> > +struct thread_data
>
> This is against our naming conventions: CamelCase for struct tags and
> typedefs, lowercase names with underscore for variable names.
>
> > +{
> > +    pthread_t thread;
> > +    pthread_mutex_t *mutex;
> > +    struct work_pool_data *work_pool;
> > +    int work_pool_size;
> > +    int is_started;
> > +    int has_error;
> > +};
> > +
> > +static void *worker_thread(void *ptr)
> > +{
> > +    int ret = 0;
> > +    int i;
> > +    struct thread_data *thread_data = (struct thread_data*)ptr;
> > +    struct work_pool_data *work_pool = NULL;
> > +    struct work_pool_data *data = NULL;
> > +    for (;;) {
> > +
> > +        // get next work item unless there was an error
> > +        pthread_mutex_lock(thread_data->mutex);
> > +        data = NULL;
> > +        if (!thread_data->has_error) {
> > +            work_pool = thread_data->work_pool;
> > +            for (i = 0; i < thread_data->work_pool_size; i++) {
> > +                if (!work_pool->is_started) {
> > +                    data = work_pool;
> > +                    data->is_started = 1;
> > +                    break;
> > +                }
> > +                work_pool++;
> > +            }
> > +        }
> > +        pthread_mutex_unlock(thread_data->mutex);
> > +
> > +        if (!data) {
> > +            // no more work to do
> > +            return NULL;
> > +        }
> > +
> > +        // if we are common section provider, init and signal
> > +        if (data->is_common) {
> > +            data->pls->parent = data->ctx;
> > +            ret = update_init_section(data->pls);
> > +            if (ret < 0) {
> > +                pthread_cond_signal(data->common_condition);
> > +                goto end;
> > +            }
> > +            else
> > +                ret = AVERROR(pthread_cond_signal(data->common_condition));
> > +        }
> > +
> > +        // if we depend on common section provider, wait for signal and copy
> > +        if (data->common_pls) {
> > +            ret = AVERROR(pthread_cond_wait(data->common_condition, data->common_mutex));
> > +            if (ret < 0)
> > +                goto end;
> > +
> > +            if (!data->common_pls->init_sec_buf) {
> > +                goto end;
> > +                ret = AVERROR(EFAULT);
> > +            }
> > +
> > +            ret = copy_init_section(data->pls, data->common_pls);
> > +            if (ret < 0)
> > +                goto end;
> > +        }
> > +
> > +        ret = begin_open_demux_for_component(data->ctx, data->pls);
> > +        if (ret < 0)
> > +            goto end;
> > +
> > +    end:
> > +        data->result = ret;
> > +
> > +        // notify error to other threads and exit
> > +        if (ret < 0) {
> > +            pthread_mutex_lock(thread_data->mutex);
> > +            thread_data->has_error = 1;
> > +            pthread_mutex_unlock(thread_data->mutex);
> > +            return NULL;
> > +        }
> > +    }
> > +
> > +
> > +    return NULL;
> > +}
> > +
> > +static void create_work_pool_data(AVFormatContext *ctx, int stream_index,
> > +    struct representation *pls, struct representation *common_pls,
> > +    struct work_pool_data *init_data, pthread_mutex_t *common_mutex,
> > +    pthread_cond_t *common_condition)
> > +{
> > +    init_data->ctx = ctx;
> > +    init_data->pls = pls;
> > +    init_data->pls->stream_index = stream_index;
> > +    init_data->common_condition = common_condition;
> > +    init_data->common_mutex = common_mutex;
> > +    init_data->result = -1;
> > +
> > +    if (pls == common_pls) {
> > +        init_data->is_common = 1;
> > +    }
> > +    else if (common_pls) {
> > +        init_data->common_pls = common_pls;
> > +    }
> > +}
> > +
> > +static int start_thread(struct thread_data *thread_data,
> > +    struct work_pool_data *work_pool, int work_pool_size, pthread_mutex_t *mutex)
> > +{
> > +    int ret;
> > +
> > +    thread_data->mutex = mutex;
> > +    thread_data->work_pool = work_pool;
> > +    thread_data->work_pool_size = work_pool_size;
> > +
> > +    ret = AVERROR(pthread_create(&thread_data->thread, NULL, worker_thread, (void*)thread_data));
> > +    if (ret == 0)
> > +        thread_data->is_started = 1;
> > +
> > +    return ret;
> > +}
> > +
> > +static int init_streams_multithreaded(AVFormatContext *s, int nstreams, int threads)
> > +{
> > +    DASHContext *c = s->priv_data;
> > +    int ret = 0;
> > +    int stream_index = 0;
> > +    int i;
>
> We allow "for (int i = 0;"
>
> > +
> > +    // alloc data
> > +    struct work_pool_data *init_data = (struct work_pool_data*)av_mallocz(sizeof(struct work_pool_data) * nstreams);
> > +    if (!init_data)
> > +        return AVERROR(ENOMEM);
> > +
> > +    struct thread_data *thread_data = (struct thread_data*)av_mallocz(sizeof(struct thread_data) * threads);
> > +    if (!thread_data)
> > +        return AVERROR(ENOMEM);
>
> 1. init_data leaks here on error.
> 2. In fact, it seems to me that both init_data and thread_data are
> nowhere freed.
>
> > +
> > +    // alloc mutex and conditions
> > +    pthread_mutex_t work_pool_mutex;
> > +
> > +    pthread_mutex_t common_video_mutex;
> > +    pthread_cond_t common_video_cond;
> > +
> > +    pthread_mutex_t common_audio_mutex;
> > +    pthread_cond_t common_audio_cond;
> > +
> > +    pthread_mutex_t common_subtitle_mutex;
> > +    pthread_cond_t common_subtitle_cond;
> > +
> > +    // init mutex and conditions
> > +    ret = AVERROR(pthread_mutex_init(&work_pool_mutex, NULL));
> > +    if (ret < 0)
> > +        goto cleanup;
> > +
> > +    if (c->is_init_section_common_video) {
> > +        ret = AVERROR(pthread_mutex_init(&common_video_mutex, NULL));
> > +        if (ret < 0)
> > +            goto cleanup;
> > +
> > +        ret = AVERROR(pthread_cond_init(&common_video_cond, NULL));
> > +        if (ret < 0)
> > +            goto cleanup;
> > +    }
> > +
> > +    if (c->is_init_section_common_audio) {
> > +        ret = AVERROR(pthread_mutex_init(&common_audio_mutex, NULL));
> > +        if (ret < 0)
> > +            goto cleanup;
> > +
> > +        ret = AVERROR(pthread_cond_init(&common_audio_cond, NULL));
> > +        if (ret < 0)
> > +            goto cleanup;
> > +    }
> > +
> > +    if (c->is_init_section_common_subtitle) {
> > +        ret = AVERROR(pthread_mutex_init(&common_subtitle_mutex, NULL));
> > +        if (ret < 0)
> > +            goto cleanup;
> > +
> > +        ret = AVERROR(pthread_cond_init(&common_subtitle_cond, NULL));
> > +        if (ret < 0)
> > +            goto cleanup;
> > +    }
> > +
> > +    // init work pool data
> > +    struct work_pool_data* current_data = init_data;
> > +
> > +    for (i = 0; i < c->n_videos; i++) {
> > +        create_work_pool_data(s, stream_index, c->videos[i],
> > +            c->is_init_section_common_video ? c->videos[0] : NULL,
> > +            current_data, &common_video_mutex, &common_video_cond);
> > +
> > +        stream_index++;
> > +        current_data++;
> > +    }
> > +
> > +    for (i = 0; i < c->n_audios; i++) {
> > +        create_work_pool_data(s, stream_index, c->audios[i],
> > +            c->is_init_section_common_audio ? c->audios[0] : NULL,
> > +            current_data, &common_audio_mutex, &common_audio_cond);
> > +
> > +        stream_index++;
> > +        current_data++;
> > +    }
> > +
> > +    for (i = 0; i < c->n_subtitles; i++) {
> > +        create_work_pool_data(s, stream_index, c->subtitles[i],
> > +            c->is_init_section_common_subtitle ? c->subtitles[0] : NULL,
> > +            current_data, &common_subtitle_mutex, &common_subtitle_cond);
> > +
> > +        stream_index++;
> > +        current_data++;
> > +    }
>
> This is very repetitive.
>
> > +
> > +    // start threads
> > +    struct thread_data *current_thread = thread_data;
> > +    for (i = 0; i < threads; i++) {
> > +        ret = start_thread(current_thread, init_data, nstreams, &work_pool_mutex);
> > +        if (ret < 0)
> > +            goto cleanup;
> > +
> > +        current_thread++;
> > +    }
> > +
> > +cleanup:
> > +    // we need to cleanup even in case of errors, so we need to store results of init, run and cleanup
> > +    int initResult = ret;
> > +    int runResult = 0;
> > +    int cleanupResult = 0;
> > +
> > +    // join threads
> > +    current_thread = thread_data;
> > +    for (i = 0; i < threads; i++) {
> > +        if (current_thread->is_started) {
> > +            ret = AVERROR(pthread_join(current_thread->thread, NULL));
> > +            if (ret < 0)
> > +                cleanupResult = ret;
> > +        }
> > +        current_thread++;
> > +    }
> > +
> > +    // finalize streams and collect results
> > +    current_data = init_data;
> > +    for (i = 0; i < nstreams; i++) {
> > +        if (current_data->result < 0) {
> > +            // thread ran into error: collect result and break
> > +            runResult = current_data->result;
> > +            break;
> > +        }
> > +        else {
> > +            // thread success: create streams on AVFormatContext
> > +            ret = end_open_demux_for_component(s, current_data->pls);
> > +            if (ret < 0)
> > +                runResult = ret;
> > +        }
> > +        current_data++;
> > +    }
> > +
> > +    // cleanup mutex and conditions
> > +    ret = AVERROR(pthread_mutex_destroy(&work_pool_mutex));
> > +    if (ret < 0)
> > +        cleanupResult = ret;
> > +
> > +    if (c->is_init_section_common_video) {
> > +        ret = AVERROR(pthread_mutex_destroy(&common_video_mutex));
> > +        if (ret < 0)
> > +            cleanupResult = ret;
> > +
> > +        ret = AVERROR(pthread_cond_destroy(&common_video_cond));
> > +        if (ret < 0)
> > +            cleanupResult = ret;
> > +    }
> > +
> > +    if (c->is_init_section_common_audio) {
> > +        ret = AVERROR(pthread_mutex_destroy(&common_audio_mutex));
> > +        if (ret < 0)
> > +            cleanupResult = ret;
> > +
> > +        ret = AVERROR(pthread_cond_destroy(&common_audio_cond));
> > +        if (ret < 0)
> > +            cleanupResult = ret;
> > +    }
> > +
> > +    if (c->is_init_section_common_subtitle) {
> > +        ret = AVERROR(pthread_mutex_destroy(&common_subtitle_mutex));
> > +        if (ret < 0)
> > +            cleanupResult = ret;
> > +
> > +        ret = AVERROR(pthread_cond_destroy(&common_subtitle_cond));
> > +        if (ret < 0)
> > +            cleanupResult = ret;
> > +    }
> > +
> > +    // return results if errors have occured in one of the phases
> > +    if (initResult < 0)
> > +        return initResult;
> > +
> > +    if (runResult < 0)
> > +        return runResult;
> > +
> > +    if (cleanupResult < 0)
> > +        return cleanupResult;
> > +
> > +    return 0;
> > +}
> > +
> > +#endif
> > +
> >  static int dash_read_header(AVFormatContext *s)
> >  {
> >      DASHContext *c = s->priv_data;
> > @@ -2067,6 +2395,23 @@ static int dash_read_header(AVFormatContext *s)
> >      if (c->n_subtitles)
> >          c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);
> >
> > +    int threads = 0;
> > +    int nstreams = c->n_videos + c->n_audios + c->n_subtitles;
> > +
> > +#if HAVE_THREADS
> > +    threads = FFMIN(nstreams, c->init_threads);
> > +#endif
> > +
> > +    if (threads > 1)
> > +    {
> > +#if HAVE_THREADS
> > +        ret = init_streams_multithreaded(s, nstreams, threads);
> > +        if (ret < 0)
> > +            return ret;
> > +#endif
> > +    }
> > +    else
> > +    {
> >      /* Open the demuxer for video and audio components if available */
> >      for (i = 0; i < c->n_videos; i++) {
> >          rep = c->videos[i];
> > @@ -2115,6 +2460,7 @@ static int dash_read_header(AVFormatContext *s)
> >
> >      if (!stream_index)
> >          return AVERROR_INVALIDDATA;
> > +    }
> >
> >      /* Create a program */
> >      program = av_new_program(s, 0);
> > @@ -2366,7 +2712,10 @@ static const AVOption dash_options[] = {
> >          OFFSET(allowed_extensions), AV_OPT_TYPE_STRING,
> >          {.str = "aac,m4a,m4s,m4v,mov,mp4,webm,ts"},
> >          INT_MIN, INT_MAX, FLAGS},
> > -    { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS },
> > +    { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key),
> > +        AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS },
> > +    { "init_threads", "Number of threads to use for initializing the DASH stream",
> > +        OFFSET(init_threads), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 64, FLAGS },
> >      {NULL}
> >  };
> >
> > --
> > 2.31.1.windows.1
> >
>
> 1. We actually have an API to process multiple tasks by different
> threads: Look at libavutil/slicethread.h. Why can't you reuse that?
I saw that usually be used in avfilters for slice multi-thread, or i
misunderstand something?

> 2. In case initialization of one of the conditions/mutexes fails, you
> are nevertheless destroying them; you are even destroying completely
> uninitialized mutexes. This is undefined behaviour. Checking the result
> of it does not fix this.
>
> - Andreas


Thanks
Steven
Andreas Rheinhardt Aug. 31, 2022, 12:17 p.m. UTC | #3
Steven Liu:
> Andreas Rheinhardt <andreas.rheinhardt@outlook.com> 于2022年8月31日周三 10:54写道:
>> 1. We actually have an API to process multiple tasks by different
>> threads: Look at libavutil/slicethread.h. Why can't you reuse that?
> I saw that usually be used in avfilters for slice multi-thread, or i
> misunderstand something?
> 

It is also used by our slice-threaded decoders. In fact, it is
everywhere where we do slice threading.

- Andreas
Lukas Fellechner Sept. 4, 2022, 9:29 p.m. UTC | #4
Andreas Rheinhardt andreas.rheinhardt at outlook.com
Wed Aug 31 05:54:12 EEST 2022
>
> > +#if HAVE_THREADS
> > +
> > +struct work_pool_data
> > +{
> > +    AVFormatContext *ctx;
> > +    struct representation *pls;
> > +    struct representation *common_pls;
> > +    pthread_mutex_t *common_mutex;
> > +    pthread_cond_t *common_condition;
> > +    int is_common;
> > +    int is_started;
> > +    int result;
> > +};
> > +
> > +struct thread_data
>
> This is against our naming conventions: CamelCase for struct tags and
> typedefs, lowercase names with underscore for variable names.

In the code files I looked at, CamelCase is only used for typedef structs.
All structs without typedef are lower case with underscores, so I aligned
with that, originally.

I will make this a typedef struct and use CamelCase for next patch.

> > +static int init_streams_multithreaded(AVFormatContext *s, int nstreams, int threads)
> > +{
> > +    DASHContext *c = s->priv_data;
> > +    int ret = 0;
> > +    int stream_index = 0;
> > +    int i;
>
> We allow "for (int i = 0;"

Oh, I did not know that, and I did not see it being used here anywhere.
Will use in next patch, it's my preferred style, actually.

> > +
> > +    // alloc data
> > +    struct work_pool_data *init_data = (struct work_pool_data*)av_mallocz(sizeof(struct work_pool_data) * nstreams);
> > +    if (!init_data)
> > +        return AVERROR(ENOMEM);
> > +
> > +    struct thread_data *thread_data = (struct thread_data*)av_mallocz(sizeof(struct thread_data) * threads);
> > +    if (!thread_data)
> > +        return AVERROR(ENOMEM);
>
> 1. init_data leaks here on error.
> 2. In fact, it seems to me that both init_data and thread_data are
> nowhere freed.

True, I must have lost the av_free call at some point.

> > +    // init work pool data
> > +    struct work_pool_data* current_data = init_data;
> > +
> > +    for (i = 0; i < c->n_videos; i++) {
> > +        create_work_pool_data(s, stream_index, c->videos[i],
> > +            c->is_init_section_common_video ? c->videos[0] : NULL,
> > +            current_data, &common_video_mutex, &common_video_cond);
> > +
> > +        stream_index++;
> > +        current_data++;
> > +    }
> > +
> > +    for (i = 0; i < c->n_audios; i++) {
> > +        create_work_pool_data(s, stream_index, c->audios[i],
> > +            c->is_init_section_common_audio ? c->audios[0] : NULL,
> > +            current_data, &common_audio_mutex, &common_audio_cond);
> > +
> > +        stream_index++;
> > +        current_data++;
> > +    }
> > +
> > +    for (i = 0; i < c->n_subtitles; i++) {
> > +        create_work_pool_data(s, stream_index, c->subtitles[i],
> > +            c->is_init_section_common_subtitle ? c->subtitles[0] : NULL,
> > +            current_data, &common_subtitle_mutex, &common_subtitle_cond);
> > +
> > +        stream_index++;
> > +        current_data++;
> > +    }
>
> This is very repetitive.

Will improve for next patch.

> 1. We actually have an API to process multiple tasks by different
> threads: Look at libavutil/slicethread.h. Why can't you reuse that?
> 2. In case initialization of one of the conditions/mutexes fails, you
> are nevertheless destroying them; you are even destroying completely
> uninitialized mutexes. This is undefined behaviour. Checking the result
> of it does not fix this.
>
> - Andreas

1. The slicethread implementation is pretty hard to understand.
I was not sure if it can be used for normal parallelization, because
it looked more like some kind of thread pool for continuous data
processing. But after taking a second look, I think I can use it here.
I will try and see if it works well.

2. I was not aware that this is undefined behavior. Will switch to
PTHREAD_MUTEX_INITIALIZER and PTHREAD_COND_INITIALIZER macros,
which return a safely initialized mutex/cond.

I also noticed one cross-thread issue that I will solve in the next patch.
Andreas Rheinhardt Sept. 4, 2022, 10:50 p.m. UTC | #5
Lukas Fellechner:
> Andreas Rheinhardt andreas.rheinhardt at outlook.com
> Wed Aug 31 05:54:12 EEST 2022
>>
>>> +#if HAVE_THREADS
>>> +
>>> +struct work_pool_data
>>> +{
>>> +    AVFormatContext *ctx;
>>> +    struct representation *pls;
>>> +    struct representation *common_pls;
>>> +    pthread_mutex_t *common_mutex;
>>> +    pthread_cond_t *common_condition;
>>> +    int is_common;
>>> +    int is_started;
>>> +    int result;
>>> +};
>>> +
>>> +struct thread_data
>>
>> This is against our naming conventions: CamelCase for struct tags and
>> typedefs, lowercase names with underscore for variable names.
> 
> In the code files I looked at, CamelCase is only used for typedef structs.
> All structs without typedef are lower case with underscores, so I aligned
> with that, originally.
> 
> I will make this a typedef struct and use CamelCase for next patch.
> 
>>> +static int init_streams_multithreaded(AVFormatContext *s, int nstreams, int threads)
>>> +{
>>> +    DASHContext *c = s->priv_data;
>>> +    int ret = 0;
>>> +    int stream_index = 0;
>>> +    int i;
>>
>> We allow "for (int i = 0;"
> 
> Oh, I did not know that, and I did not see it being used here anywhere.
> Will use in next patch, it's my preferred style, actually.
> 
>>> +
>>> +    // alloc data
>>> +    struct work_pool_data *init_data = (struct work_pool_data*)av_mallocz(sizeof(struct work_pool_data) * nstreams);
>>> +    if (!init_data)
>>> +        return AVERROR(ENOMEM);
>>> +
>>> +    struct thread_data *thread_data = (struct thread_data*)av_mallocz(sizeof(struct thread_data) * threads);
>>> +    if (!thread_data)
>>> +        return AVERROR(ENOMEM);
>>
>> 1. init_data leaks here on error.
>> 2. In fact, it seems to me that both init_data and thread_data are
>> nowhere freed.
> 
> True, I must have lost the av_free call at some point.
> 
>>> +    // init work pool data
>>> +    struct work_pool_data* current_data = init_data;
>>> +
>>> +    for (i = 0; i < c->n_videos; i++) {
>>> +        create_work_pool_data(s, stream_index, c->videos[i],
>>> +            c->is_init_section_common_video ? c->videos[0] : NULL,
>>> +            current_data, &common_video_mutex, &common_video_cond);
>>> +
>>> +        stream_index++;
>>> +        current_data++;
>>> +    }
>>> +
>>> +    for (i = 0; i < c->n_audios; i++) {
>>> +        create_work_pool_data(s, stream_index, c->audios[i],
>>> +            c->is_init_section_common_audio ? c->audios[0] : NULL,
>>> +            current_data, &common_audio_mutex, &common_audio_cond);
>>> +
>>> +        stream_index++;
>>> +        current_data++;
>>> +    }
>>> +
>>> +    for (i = 0; i < c->n_subtitles; i++) {
>>> +        create_work_pool_data(s, stream_index, c->subtitles[i],
>>> +            c->is_init_section_common_subtitle ? c->subtitles[0] : NULL,
>>> +            current_data, &common_subtitle_mutex, &common_subtitle_cond);
>>> +
>>> +        stream_index++;
>>> +        current_data++;
>>> +    }
>>
>> This is very repetitive.
> 
> Will improve for next patch.
> 
>> 1. We actually have an API to process multiple tasks by different
>> threads: Look at libavutil/slicethread.h. Why can't you reuse that?
>> 2. In case initialization of one of the conditions/mutexes fails, you
>> are nevertheless destroying them; you are even destroying completely
>> uninitialized mutexes. This is undefined behaviour. Checking the result
>> of it does not fix this.
>>
>> - Andreas
> 
> 1. The slicethread implementation is pretty hard to understand.
> I was not sure if it can be used for normal parallelization, because
> it looked more like some kind of thread pool for continuous data
> processing. But after taking a second look, I think I can use it here.
> I will try and see if it works well.
> 
> 2. I was not aware that this is undefined behavior. Will switch to
> PTHREAD_MUTEX_INITIALIZER and PTHREAD_COND_INITIALIZER macros,
> which return a safely initialized mutex/cond.
> 

"The behavior is undefined if the value specified by the mutex argument
to pthread_mutex_destroy() does not refer to an initialized mutex."
(From
https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_mutex_destroy.html)

Furthermore: "In cases where default mutex attributes are appropriate,
the macro PTHREAD_MUTEX_INITIALIZER can be used to initialize mutexes.
The effect shall be equivalent to dynamic initialization by a call to
pthread_mutex_init() with parameter attr specified as NULL, except that
no error checks are performed." The last sentence sounds as if one would
then have to check mutex locking.

Moreover, older pthread standards did not allow to use
PTHREAD_MUTEX_INITIALIZER with non-static mutexes, so I don't know
whether we can use that. Also our pthreads-wrapper on top of
OS/2-threads does not provide PTHREAD_COND_INITIALIZER (which is used
nowhere in the codebase).

> I also noticed one cross-thread issue that I will solve in the next patch.
Lukas Fellechner Sept. 5, 2022, 10:15 a.m. UTC | #6
> Gesendet: Montag, 05. September 2022 um 00:50 Uhr
> Von: "Andreas Rheinhardt" <andreas.rheinhardt@outlook.com>
> An: ffmpeg-devel@ffmpeg.org
> Betreff: Re: [FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH initialization
> Lukas Fellechner:
> > Andreas Rheinhardt andreas.rheinhardt at outlook.com
> > Wed Aug 31 05:54:12 EEST 2022
> >>
> >
> >> 1. We actually have an API to process multiple tasks by different
> >> threads: Look at libavutil/slicethread.h. Why can't you reuse that?
> >> 2. In case initialization of one of the conditions/mutexes fails, you
> >> are nevertheless destroying them; you are even destroying completely
> >> uninitialized mutexes. This is undefined behaviour. Checking the result
> >> of it does not fix this.
> >>
> >> - Andreas
> >
> > 1. The slicethread implementation is pretty hard to understand.
> > I was not sure if it can be used for normal parallelization, because
> > it looked more like some kind of thread pool for continuous data
> > processing. But after taking a second look, I think I can use it here.
> > I will try and see if it works well.
> >
> > 2. I was not aware that this is undefined behavior. Will switch to
> > PTHREAD_MUTEX_INITIALIZER and PTHREAD_COND_INITIALIZER macros,
> > which return a safely initialized mutex/cond.
> >
> 
> "The behavior is undefined if the value specified by the mutex argument
> to pthread_mutex_destroy() does not refer to an initialized mutex."
> (From
> https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_mutex_destroy.html)
> 
> Furthermore: "In cases where default mutex attributes are appropriate,
> the macro PTHREAD_MUTEX_INITIALIZER can be used to initialize mutexes.
> The effect shall be equivalent to dynamic initialization by a call to
> pthread_mutex_init() with parameter attr specified as NULL, except that
> no error checks are performed." The last sentence sounds as if one would
> then have to check mutex locking.
> 
> Moreover, older pthread standards did not allow to use
> PTHREAD_MUTEX_INITIALIZER with non-static mutexes, so I don't know
> whether we can use that. Also our pthreads-wrapper on top of
> OS/2-threads does not provide PTHREAD_COND_INITIALIZER (which is used
> nowhere in the codebase).

I missed that detail about the initializer macro. Thank you for clearing
that up.

After looking more into threads implementation in ffmpeg, I wonder if I
really need to check any results of init/destroy or other functions.
In slicethreads.c, there is zero checking on any of the lock functions.
The pthreads-based implementation does internally check the results of all
function calls and calls abort() in case of errors ("strict_" wrappers).
The Win32 implementation uses SRW locks which cannot even return errors. 
And the OS2 implementation returns 0 on all calls as well.

So right now, I think that I should continue with normal _init() calls
(no macros) and drop all error checking, just like slicethread does.
Are you fine with that approach?
Andreas Rheinhardt Sept. 5, 2022, 10:45 a.m. UTC | #7
Lukas Fellechner:
>> Gesendet: Montag, 05. September 2022 um 00:50 Uhr
>> Von: "Andreas Rheinhardt" <andreas.rheinhardt@outlook.com>
>> An: ffmpeg-devel@ffmpeg.org
>> Betreff: Re: [FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH initialization
>> Lukas Fellechner:
>>> Andreas Rheinhardt andreas.rheinhardt at outlook.com
>>> Wed Aug 31 05:54:12 EEST 2022
>>>>
>>>
>>>> 1. We actually have an API to process multiple tasks by different
>>>> threads: Look at libavutil/slicethread.h. Why can't you reuse that?
>>>> 2. In case initialization of one of the conditions/mutexes fails, you
>>>> are nevertheless destroying them; you are even destroying completely
>>>> uninitialized mutexes. This is undefined behaviour. Checking the result
>>>> of it does not fix this.
>>>>
>>>> - Andreas
>>>
>>> 1. The slicethread implementation is pretty hard to understand.
>>> I was not sure if it can be used for normal parallelization, because
>>> it looked more like some kind of thread pool for continuous data
>>> processing. But after taking a second look, I think I can use it here.
>>> I will try and see if it works well.
>>>
>>> 2. I was not aware that this is undefined behavior. Will switch to
>>> PTHREAD_MUTEX_INITIALIZER and PTHREAD_COND_INITIALIZER macros,
>>> which return a safely initialized mutex/cond.
>>>
>>
>> "The behavior is undefined if the value specified by the mutex argument
>> to pthread_mutex_destroy() does not refer to an initialized mutex."
>> (From
>> https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_mutex_destroy.html)
>>
>> Furthermore: "In cases where default mutex attributes are appropriate,
>> the macro PTHREAD_MUTEX_INITIALIZER can be used to initialize mutexes.
>> The effect shall be equivalent to dynamic initialization by a call to
>> pthread_mutex_init() with parameter attr specified as NULL, except that
>> no error checks are performed." The last sentence sounds as if one would
>> then have to check mutex locking.
>>
>> Moreover, older pthread standards did not allow to use
>> PTHREAD_MUTEX_INITIALIZER with non-static mutexes, so I don't know
>> whether we can use that. Also our pthreads-wrapper on top of
>> OS/2-threads does not provide PTHREAD_COND_INITIALIZER (which is used
>> nowhere in the codebase).
> 
> I missed that detail about the initializer macro. Thank you for clearing
> that up.
> 
> After looking more into threads implementation in ffmpeg, I wonder if I
> really need to check any results of init/destroy or other functions.
> In slicethreads.c, there is zero checking on any of the lock functions.
> The pthreads-based implementation does internally check the results of all
> function calls and calls abort() in case of errors ("strict_" wrappers).
> The Win32 implementation uses SRW locks which cannot even return errors. 
> And the OS2 implementation returns 0 on all calls as well.
> 
> So right now, I think that I should continue with normal _init() calls
> (no macros) and drop all error checking, just like slicethread does.
> Are you fine with that approach?

Zero checking is our old approach; the new approach checks for errors
and ensures that only mutexes/condition variables that have been
properly initialized are destroyed. See ff_pthread_init/free in
libavcodec/pthread.c (you can't use this in libavformat, because these
functions are local to libavcodec).

- Andreas
Lukas Fellechner Sept. 5, 2022, 2:28 p.m. UTC | #8
>Gesendet: Montag, 05. September 2022 um 12:45 Uhr
>Von: "Andreas Rheinhardt" <andreas.rheinhardt@outlook.com>
>An: ffmpeg-devel@ffmpeg.org
>Betreff: Re: [FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH initialization
>Lukas Fellechner:
>>> Gesendet: Montag, 05. September 2022 um 00:50 Uhr
>>> Von: "Andreas Rheinhardt" <andreas.rheinhardt@outlook.com>
>>> An: ffmpeg-devel@ffmpeg.org
>>> Betreff: Re: [FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH initialization
>>> Lukas Fellechner:
>>>> Andreas Rheinhardt andreas.rheinhardt at outlook.com
>>>> Wed Aug 31 05:54:12 EEST 2022
>>>>>
>>>>
>>>>> 1. We actually have an API to process multiple tasks by different
>>>>> threads: Look at libavutil/slicethread.h. Why can't you reuse that?
>>>>> 2. In case initialization of one of the conditions/mutexes fails, you
>>>>> are nevertheless destroying them; you are even destroying completely
>>>>> uninitialized mutexes. This is undefined behaviour. Checking the result
>>>>> of it does not fix this.
>>>>>
>>>>> - Andreas
>>>>
>>>> 1. The slicethread implementation is pretty hard to understand.
>>>> I was not sure if it can be used for normal parallelization, because
>>>> it looked more like some kind of thread pool for continuous data
>>>> processing. But after taking a second look, I think I can use it here.
>>>> I will try and see if it works well.
>>>>
>>>> 2. I was not aware that this is undefined behavior. Will switch to
>>>> PTHREAD_MUTEX_INITIALIZER and PTHREAD_COND_INITIALIZER macros,
>>>> which return a safely initialized mutex/cond.
>>>>
>>>
>>> "The behavior is undefined if the value specified by the mutex argument
>>> to pthread_mutex_destroy() does not refer to an initialized mutex."
>>> (From
>>> https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_mutex_destroy.html)
>>>
>>> Furthermore: "In cases where default mutex attributes are appropriate,
>>> the macro PTHREAD_MUTEX_INITIALIZER can be used to initialize mutexes.
>>> The effect shall be equivalent to dynamic initialization by a call to
>>> pthread_mutex_init() with parameter attr specified as NULL, except that
>>> no error checks are performed." The last sentence sounds as if one would
>>> then have to check mutex locking.
>>>
>>> Moreover, older pthread standards did not allow to use
>>> PTHREAD_MUTEX_INITIALIZER with non-static mutexes, so I don't know
>>> whether we can use that. Also our pthreads-wrapper on top of
>>> OS/2-threads does not provide PTHREAD_COND_INITIALIZER (which is used
>>> nowhere in the codebase).
>>
>> I missed that detail about the initializer macro. Thank you for clearing
>> that up.
>>
>> After looking more into threads implementation in ffmpeg, I wonder if I
>> really need to check any results of init/destroy or other functions.
>> In slicethreads.c, there is zero checking on any of the lock functions.
>> The pthreads-based implementation does internally check the results of all
>> function calls and calls abort() in case of errors ("strict_" wrappers).
>> The Win32 implementation uses SRW locks which cannot even return errors.
>> And the OS2 implementation returns 0 on all calls as well.
>>
>> So right now, I think that I should continue with normal _init() calls
>> (no macros) and drop all error checking, just like slicethread does.
>> Are you fine with that approach?
>
> Zero checking is our old approach; the new approach checks for errors
> and ensures that only mutexes/condition variables that have been
> properly initialized are destroyed. See ff_pthread_init/free in
> libavcodec/pthread.c (you can't use this in libavformat, because these
> functions are local to libavcodec).
>
> - Andreas

I see. I will try to do clean error checking then.
Lukas Fellechner Sept. 11, 2022, 8:35 p.m. UTC | #9
Gesendet: Montag, 05. September 2022 um 12:45 Uhr
Von: "Andreas Rheinhardt" <andreas.rheinhardt@outlook.com>
An: ffmpeg-devel@ffmpeg.org
Betreff: Re: [FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH initialization
Lukas Fellechner:
>>> Moreover, older pthread standards did not allow to use
>>> PTHREAD_MUTEX_INITIALIZER with non-static mutexes, so I don't know
>>> whether we can use that. Also our pthreads-wrapper on top of
>>> OS/2-threads does not provide PTHREAD_COND_INITIALIZER (which is used
>>> nowhere in the codebase).
>>
>> I missed that detail about the initializer macro. Thank you for clearing
>> that up.
>>
>> After looking more into threads implementation in ffmpeg, I wonder if I
>> really need to check any results of init/destroy or other functions.
>> In slicethreads.c, there is zero checking on any of the lock functions.
>> The pthreads-based implementation does internally check the results of all
>> function calls and calls abort() in case of errors ("strict_" wrappers).
>> The Win32 implementation uses SRW locks which cannot even return errors.
>> And the OS2 implementation returns 0 on all calls as well.
>>
>> So right now, I think that I should continue with normal _init() calls
>> (no macros) and drop all error checking, just like slicethread does.
>> Are you fine with that approach?
> 
> Zero checking is our old approach; the new approach checks for errors
> and ensures that only mutexes/condition variables that have been
> properly initialized are destroyed. See ff_pthread_init/free in
> libavcodec/pthread.c (you can't use this in libavformat, because these
> functions are local to libavcodec).
> 
> - Andreas

I was able to switch to using the slicethread implementation. It has a
very minor delay on init, because it waits for all threads to fully start
up before continueing. But it is only few ms and not worth adding a new
implementation just for that.

I also changed the initialization and release of mutex and conds, with
full return code checking and safe release.

There was one cross-thread issue I needed to address.

A multi hour duration test (connecting in endless loop) did not show
any issues after fixing the avio_opts cross thread access.

Please see the v4 patch for all the changes.

- Lukas
diff mbox series

Patch

diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c
index e82da45e43..20f2557ea3 100644
--- a/libavformat/dashdec.c
+++ b/libavformat/dashdec.c
@@ -24,6 +24,7 @@ 
 #include "libavutil/opt.h"
 #include "libavutil/time.h"
 #include "libavutil/parseutils.h"
+#include "libavutil/thread.h"
 #include "internal.h"
 #include "avio_internal.h"
 #include "dash.h"
@@ -152,6 +153,8 @@  typedef struct DASHContext {
     int max_url_size;
     char *cenc_decryption_key;

+    int init_threads;
+
     /* Flags for init section*/
     int is_init_section_common_video;
     int is_init_section_common_audio;
@@ -2033,6 +2036,331 @@  static void move_metadata(AVStream *st, const char *key, char **value)
     }
 }

+#if HAVE_THREADS
+
+struct work_pool_data
+{
+    AVFormatContext *ctx;
+    struct representation *pls;
+    struct representation *common_pls;
+    pthread_mutex_t *common_mutex;
+    pthread_cond_t *common_condition;
+    int is_common;
+    int is_started;
+    int result;
+};
+
+struct thread_data
+{
+    pthread_t thread;
+    pthread_mutex_t *mutex;
+    struct work_pool_data *work_pool;
+    int work_pool_size;
+    int is_started;
+    int has_error;
+};
+
+static void *worker_thread(void *ptr)
+{
+    int ret = 0;
+    int i;
+    struct thread_data *thread_data = (struct thread_data*)ptr;
+    struct work_pool_data *work_pool = NULL;
+    struct work_pool_data *data = NULL;
+    for (;;) {
+
+        // get next work item unless there was an error
+        pthread_mutex_lock(thread_data->mutex);
+        data = NULL;
+        if (!thread_data->has_error) {
+            work_pool = thread_data->work_pool;
+            for (i = 0; i < thread_data->work_pool_size; i++) {
+                if (!work_pool->is_started) {
+                    data = work_pool;
+                    data->is_started = 1;
+                    break;
+                }
+                work_pool++;
+            }
+        }
+        pthread_mutex_unlock(thread_data->mutex);
+
+        if (!data) {
+            // no more work to do
+            return NULL;
+        }
+
+        // if we are common section provider, init and signal
+        if (data->is_common) {
+            data->pls->parent = data->ctx;
+            ret = update_init_section(data->pls);
+            if (ret < 0) {
+                pthread_cond_signal(data->common_condition);
+                goto end;
+            }
+            else
+                ret = AVERROR(pthread_cond_signal(data->common_condition));
+        }
+
+        // if we depend on common section provider, wait for signal and copy
+        if (data->common_pls) {
+            ret = AVERROR(pthread_cond_wait(data->common_condition, data->common_mutex));
+            if (ret < 0)
+                goto end;
+
+            if (!data->common_pls->init_sec_buf) {
+                goto end;
+                ret = AVERROR(EFAULT);
+            }
+
+            ret = copy_init_section(data->pls, data->common_pls);
+            if (ret < 0)
+                goto end;
+        }
+
+        ret = begin_open_demux_for_component(data->ctx, data->pls);
+        if (ret < 0)
+            goto end;
+
+    end:
+        data->result = ret;
+
+        // notify error to other threads and exit
+        if (ret < 0) {
+            pthread_mutex_lock(thread_data->mutex);
+            thread_data->has_error = 1;
+            pthread_mutex_unlock(thread_data->mutex);
+            return NULL;
+        }
+    }
+
+
+    return NULL;
+}
+
+static void create_work_pool_data(AVFormatContext *ctx, int stream_index,
+    struct representation *pls, struct representation *common_pls,
+    struct work_pool_data *init_data, pthread_mutex_t *common_mutex,
+    pthread_cond_t *common_condition)
+{
+    init_data->ctx = ctx;
+    init_data->pls = pls;
+    init_data->pls->stream_index = stream_index;
+    init_data->common_condition = common_condition;
+    init_data->common_mutex = common_mutex;
+    init_data->result = -1;
+
+    if (pls == common_pls) {
+        init_data->is_common = 1;
+    }
+    else if (common_pls) {
+        init_data->common_pls = common_pls;
+    }
+}
+
+static int start_thread(struct thread_data *thread_data,
+    struct work_pool_data *work_pool, int work_pool_size, pthread_mutex_t *mutex)
+{
+    int ret;
+
+    thread_data->mutex = mutex;
+    thread_data->work_pool = work_pool;
+    thread_data->work_pool_size = work_pool_size;
+
+    ret = AVERROR(pthread_create(&thread_data->thread, NULL, worker_thread, (void*)thread_data));
+    if (ret == 0)
+        thread_data->is_started = 1;
+
+    return ret;
+}
+
+static int init_streams_multithreaded(AVFormatContext *s, int nstreams, int threads)
+{
+    DASHContext *c = s->priv_data;
+    int ret = 0;
+    int stream_index = 0;
+    int i;
+
+    // alloc data
+    struct work_pool_data *init_data = (struct work_pool_data*)av_mallocz(sizeof(struct work_pool_data) * nstreams);
+    if (!init_data)
+        return AVERROR(ENOMEM);
+
+    struct thread_data *thread_data = (struct thread_data*)av_mallocz(sizeof(struct thread_data) * threads);
+    if (!thread_data)
+        return AVERROR(ENOMEM);
+
+    // alloc mutex and conditions
+    pthread_mutex_t work_pool_mutex;
+
+    pthread_mutex_t common_video_mutex;
+    pthread_cond_t common_video_cond;
+
+    pthread_mutex_t common_audio_mutex;
+    pthread_cond_t common_audio_cond;
+
+    pthread_mutex_t common_subtitle_mutex;
+    pthread_cond_t common_subtitle_cond;
+
+    // init mutex and conditions
+    ret = AVERROR(pthread_mutex_init(&work_pool_mutex, NULL));
+    if (ret < 0)
+        goto cleanup;
+
+    if (c->is_init_section_common_video) {
+        ret = AVERROR(pthread_mutex_init(&common_video_mutex, NULL));
+        if (ret < 0)
+            goto cleanup;
+
+        ret = AVERROR(pthread_cond_init(&common_video_cond, NULL));
+        if (ret < 0)
+            goto cleanup;
+    }
+
+    if (c->is_init_section_common_audio) {
+        ret = AVERROR(pthread_mutex_init(&common_audio_mutex, NULL));
+        if (ret < 0)
+            goto cleanup;
+
+        ret = AVERROR(pthread_cond_init(&common_audio_cond, NULL));
+        if (ret < 0)
+            goto cleanup;
+    }
+
+    if (c->is_init_section_common_subtitle) {
+        ret = AVERROR(pthread_mutex_init(&common_subtitle_mutex, NULL));
+        if (ret < 0)
+            goto cleanup;
+
+        ret = AVERROR(pthread_cond_init(&common_subtitle_cond, NULL));
+        if (ret < 0)
+            goto cleanup;
+    }
+
+    // init work pool data
+    struct work_pool_data* current_data = init_data;
+
+    for (i = 0; i < c->n_videos; i++) {
+        create_work_pool_data(s, stream_index, c->videos[i],
+            c->is_init_section_common_video ? c->videos[0] : NULL,
+            current_data, &common_video_mutex, &common_video_cond);
+
+        stream_index++;
+        current_data++;
+    }
+
+    for (i = 0; i < c->n_audios; i++) {
+        create_work_pool_data(s, stream_index, c->audios[i],
+            c->is_init_section_common_audio ? c->audios[0] : NULL,
+            current_data, &common_audio_mutex, &common_audio_cond);
+
+        stream_index++;
+        current_data++;
+    }
+
+    for (i = 0; i < c->n_subtitles; i++) {
+        create_work_pool_data(s, stream_index, c->subtitles[i],
+            c->is_init_section_common_subtitle ? c->subtitles[0] : NULL,
+            current_data, &common_subtitle_mutex, &common_subtitle_cond);
+
+        stream_index++;
+        current_data++;
+    }
+
+    // start threads
+    struct thread_data *current_thread = thread_data;
+    for (i = 0; i < threads; i++) {
+        ret = start_thread(current_thread, init_data, nstreams, &work_pool_mutex);
+        if (ret < 0)
+            goto cleanup;
+
+        current_thread++;
+    }
+
+cleanup:
+    // we need to cleanup even in case of errors, so we need to store results of init, run and cleanup
+    int initResult = ret;
+    int runResult = 0;
+    int cleanupResult = 0;
+
+    // join threads
+    current_thread = thread_data;
+    for (i = 0; i < threads; i++) {
+        if (current_thread->is_started) {
+            ret = AVERROR(pthread_join(current_thread->thread, NULL));
+            if (ret < 0)
+                cleanupResult = ret;
+        }
+        current_thread++;
+    }
+
+    // finalize streams and collect results
+    current_data = init_data;
+    for (i = 0; i < nstreams; i++) {
+        if (current_data->result < 0) {
+            // thread ran into error: collect result and break
+            runResult = current_data->result;
+            break;
+        }
+        else {
+            // thread success: create streams on AVFormatContext
+            ret = end_open_demux_for_component(s, current_data->pls);
+            if (ret < 0)
+                runResult = ret;
+        }
+        current_data++;
+    }
+
+    // cleanup mutex and conditions
+    ret = AVERROR(pthread_mutex_destroy(&work_pool_mutex));
+    if (ret < 0)
+        cleanupResult = ret;
+
+    if (c->is_init_section_common_video) {
+        ret = AVERROR(pthread_mutex_destroy(&common_video_mutex));
+        if (ret < 0)
+            cleanupResult = ret;
+
+        ret = AVERROR(pthread_cond_destroy(&common_video_cond));
+        if (ret < 0)
+            cleanupResult = ret;
+    }
+
+    if (c->is_init_section_common_audio) {
+        ret = AVERROR(pthread_mutex_destroy(&common_audio_mutex));
+        if (ret < 0)
+            cleanupResult = ret;
+
+        ret = AVERROR(pthread_cond_destroy(&common_audio_cond));
+        if (ret < 0)
+            cleanupResult = ret;
+    }
+
+    if (c->is_init_section_common_subtitle) {
+        ret = AVERROR(pthread_mutex_destroy(&common_subtitle_mutex));
+        if (ret < 0)
+            cleanupResult = ret;
+
+        ret = AVERROR(pthread_cond_destroy(&common_subtitle_cond));
+        if (ret < 0)
+            cleanupResult = ret;
+    }
+
+    // return results if errors have occured in one of the phases
+    if (initResult < 0)
+        return initResult;
+
+    if (runResult < 0)
+        return runResult;
+
+    if (cleanupResult < 0)
+        return cleanupResult;
+
+    return 0;
+}
+
+#endif
+
 static int dash_read_header(AVFormatContext *s)
 {
     DASHContext *c = s->priv_data;
@@ -2067,6 +2395,23 @@  static int dash_read_header(AVFormatContext *s)
     if (c->n_subtitles)
         c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);

+    int threads = 0;
+    int nstreams = c->n_videos + c->n_audios + c->n_subtitles;
+
+#if HAVE_THREADS
+    threads = FFMIN(nstreams, c->init_threads);
+#endif
+
+    if (threads > 1)
+    {
+#if HAVE_THREADS
+        ret = init_streams_multithreaded(s, nstreams, threads);
+        if (ret < 0)
+            return ret;
+#endif
+    }
+    else
+    {
     /* Open the demuxer for video and audio components if available */
     for (i = 0; i < c->n_videos; i++) {
         rep = c->videos[i];
@@ -2115,6 +2460,7 @@  static int dash_read_header(AVFormatContext *s)

     if (!stream_index)
         return AVERROR_INVALIDDATA;
+    }

     /* Create a program */
     program = av_new_program(s, 0);
@@ -2366,7 +2712,10 @@  static const AVOption dash_options[] = {
         OFFSET(allowed_extensions), AV_OPT_TYPE_STRING,
         {.str = "aac,m4a,m4s,m4v,mov,mp4,webm,ts"},
         INT_MIN, INT_MAX, FLAGS},
-    { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS },
+    { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key),
+        AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS },
+    { "init_threads", "Number of threads to use for initializing the DASH stream",
+        OFFSET(init_threads), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 64, FLAGS },
     {NULL}
 };