diff mbox series

[FFmpeg-devel,01/14] vvcdec: add thread executor

Message ID 20230521130319.13813-2-nuomi2021@gmail.com
State Superseded
Headers show
Series add vvc decoder c code | expand

Checks

Context Check Description
andriy/make_x86 success Make finished
andriy/make_fate_x86 fail Make fate failed

Commit Message

Nuo Mi May 21, 2023, 1:03 p.m. UTC
The executor design pattern was inroduced by java
<https://docs.oracle.com/en/java/javase/20/docs/api/java.base/java/util/concurrent/Executor.html>
it also adapted by python
<https://docs.python.org/3/library/concurrent.futures.html>
Compared to handcrafted thread pool management, it greatly simplifies the thread code.
---
 libavcodec/Makefile           |   1 +
 libavcodec/vvc/Makefile       |   4 +
 libavcodec/vvc/vvc_executor.c | 193 ++++++++++++++++++++++++++++++++++
 libavcodec/vvc/vvc_executor.h |  73 +++++++++++++
 4 files changed, 271 insertions(+)
 create mode 100644 libavcodec/vvc/Makefile
 create mode 100644 libavcodec/vvc/vvc_executor.c
 create mode 100644 libavcodec/vvc/vvc_executor.h

Comments

Rémi Denis-Courmont May 21, 2023, 1:15 p.m. UTC | #1
Le sunnuntaina 21. toukokuuta 2023, 16.03.06 EEST Nuo Mi a écrit :
> The executor design pattern was inroduced by java
> <https://docs.oracle.com/en/java/javase/20/docs/api/java.base/java/util/conc
> urrent/Executor.html> it also adapted by python
> <https://docs.python.org/3/library/concurrent.futures.html>
> Compared to handcrafted thread pool management, it greatly simplifies the
> thread code. ---
>  libavcodec/Makefile           |   1 +
>  libavcodec/vvc/Makefile       |   4 +
>  libavcodec/vvc/vvc_executor.c | 193 ++++++++++++++++++++++++++++++++++
>  libavcodec/vvc/vvc_executor.h |  73 +++++++++++++
>  4 files changed, 271 insertions(+)
>  create mode 100644 libavcodec/vvc/Makefile
>  create mode 100644 libavcodec/vvc/vvc_executor.c
>  create mode 100644 libavcodec/vvc/vvc_executor.h
> 
> diff --git a/libavcodec/Makefile b/libavcodec/Makefile
> index dab09f483a..b1fcbf71b2 100644
> --- a/libavcodec/Makefile
> +++ b/libavcodec/Makefile
> @@ -62,6 +62,7 @@ OBJS = ac3_parser.o                                       
>              \ xiph.o                                                      
>     \
> 
>  # subsystems
> +include $(SRC_PATH)/libavcodec/vvc/Makefile
>  OBJS-$(CONFIG_AANDCTTABLES)            += aandcttab.o
>  OBJS-$(CONFIG_AC3DSP)                  += ac3dsp.o ac3.o ac3tab.o
>  OBJS-$(CONFIG_ADTS_HEADER)             += adts_header.o
> mpeg4audio_sample_rates.o diff --git a/libavcodec/vvc/Makefile
> b/libavcodec/vvc/Makefile
> new file mode 100644
> index 0000000000..c4b93e0389
> --- /dev/null
> +++ b/libavcodec/vvc/Makefile
> @@ -0,0 +1,4 @@
> +clean::
> +	$(RM) $(CLEANSUFFIXES:%=libavcodec/vvc/%)
> +
> +OBJS-$(CONFIG_VVC_DECODER)          +=  vvc/vvc_executor.o
> diff --git a/libavcodec/vvc/vvc_executor.c b/libavcodec/vvc/vvc_executor.c
> new file mode 100644
> index 0000000000..f2afdf79ae
> --- /dev/null
> +++ b/libavcodec/vvc/vvc_executor.c
> @@ -0,0 +1,193 @@
> +/*
> + * VVC video Decoder
> + *
> + * Copyright (C) 2022 Nuo Mi
> + *
> + * This file is part of FFmpeg.
> + *
> + * FFmpeg is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * FFmpeg is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with FFmpeg; if not, write to the Free Software
> + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
> USA + */
> +#include "libavutil/avutil.h"
> +#include "libavutil/thread.h"
> +
> +#include "vvc_executor.h"

This does not seem specific to VVC in any way, so the naming (and choice of 
folder placement) is rather weird.

> +
> +typedef struct ThreadInfo {
> +    int idx;
> +    VVCExecutor *e;
> +    pthread_t thread;
> +} ThreadInfo;
> +
> +struct VVCExecutor {
> +    VVCTaskCallbacks cb;
> +    ThreadInfo *threads;
> +    uint8_t *local_contexts;

It seems odd and needless complex to separate this from the thread info. It 
looks like you could simply append a pointer or a flexible array to ThreadInfo 
instead.

> +    int thread_count;
> +
> +    pthread_mutex_t lock;
> +    pthread_cond_t cond;
> +    int die;
> +    VVCTasklet *tasks;
> +};
> +
> +static void remove_task(VVCTasklet **prev, VVCTasklet *t)
> +{
> +    *prev  = t->next;
> +    t->next = NULL;
> +}
> +
> +static void add_task(VVCTasklet **prev, VVCTasklet *t)
> +{
> +    t->next = *prev;
> +    *prev   = t;
> +}
> +
> +static void *executor_worker_task(void *data)
> +{
> +    ThreadInfo *ti = (ThreadInfo*)data;
> +    VVCExecutor *e = ti->e;
> +    void *lc       = e->local_contexts + ti->idx *
> e->cb.local_context_size;
> +    VVCTasklet **prev;
> +    VVCTaskCallbacks *cb = &e->cb;
> +
> +    pthread_mutex_lock(&e->lock);
> +    while (1) {
> +        VVCTasklet* t = NULL;
> +        if (e->die) break;
> +
> +        for (prev = &e->tasks; *prev; prev = &(*prev)->next) {
> +            if (cb->ready(*prev, cb->user_data)) {
> +                t = *prev;
> +                break;
> +            }
> +        }
> +        if (t) {
> +            //found one task
> +            remove_task(prev, t);
> +            pthread_mutex_unlock(&e->lock);
> +            cb->run(t, lc, cb->user_data);
> +            pthread_mutex_lock(&e->lock);
> +        } else {
> +            //no task in one loop
> +            pthread_cond_wait(&e->cond, &e->lock);
> +        }
> +    }
> +    pthread_mutex_unlock(&e->lock);
> +    return NULL;
> +}
> +
> +VVCExecutor* ff_vvc_executor_alloc(const VVCTaskCallbacks *cb, int
> thread_count) +{
> +    VVCExecutor *e;
> +    int i, j, ret;
> +    if (!cb || !cb->user_data || !cb->ready || !cb->run ||
> !cb->priority_higher) +        return NULL;
> +    e = av_calloc(1, sizeof(*e));
> +    if (!e)
> +        return NULL;
> +    e->cb = *cb;
> +
> +    e->local_contexts = av_malloc(thread_count * e->cb.local_context_size);
> +    if (!e->local_contexts)
> +        goto free_executor;
> +
> +    e->threads = av_calloc(thread_count, sizeof(*e->threads));
> +    if (!e->threads)
> +        goto free_contexts;
> +    for (i = 0; i < thread_count; i++) {
> +        ThreadInfo *ti = e->threads + i;
> +        ti->e = e;
> +        ti->idx = i;
> +    }
> +
> +    ret = pthread_mutex_init(&e->lock, NULL);
> +    if (ret)
> +        goto free_threads;
> +
> +    ret = pthread_cond_init(&e->cond, NULL);
> +    if (ret)
> +        goto destroy_lock;
> +
> +    for (i = 0; i < thread_count; i++) {
> +        ThreadInfo *ti = e->threads + i;
> +        ret = pthread_create(&ti->thread, NULL, executor_worker_task, ti);
> +        if (ret)
> +            goto join_threads;
> +    }
> +    e->thread_count = thread_count;
> +    return e;
> +
> +join_threads:
> +    pthread_mutex_lock(&e->lock);
> +    e->die = 1;
> +    pthread_cond_broadcast(&e->cond);
> +    pthread_mutex_unlock(&e->lock);
> +    for (j = 0; j < i; j++)
> +        pthread_join(e->threads[j].thread, NULL);
> +    pthread_cond_destroy(&e->cond);
> +destroy_lock:
> +    pthread_mutex_destroy(&e->lock);
> +free_threads:
> +    av_free(e->threads);
> +free_contexts:
> +    av_free(e->local_contexts);
> +free_executor:
> +    free(e);
> +    return NULL;
> +}
> +
> +void ff_vvc_executor_free(VVCExecutor **executor)
> +{
> +    VVCExecutor *e;
> +    if (!executor || !*executor)
> +        return;
> +    e = *executor;
> +
> +    //singal die
> +    pthread_mutex_lock(&e->lock);
> +    e->die = 1;
> +    pthread_cond_broadcast(&e->cond);
> +    pthread_mutex_unlock(&e->lock);
> +
> +    for (int i = 0; i < e->thread_count; i++)
> +        pthread_join(e->threads[i].thread, NULL);
> +    pthread_cond_destroy(&e->cond);
> +    pthread_mutex_destroy(&e->lock);
> +
> +    av_free(e->threads);
> +    av_free(e->local_contexts);
> +
> +    av_freep(executor);
> +}
> +
> +void ff_vvc_executor_execute(VVCExecutor *e, VVCTasklet *t)
> +{
> +    VVCTaskCallbacks *cb = &e->cb;
> +    VVCTasklet **prev;
> +
> +    pthread_mutex_lock(&e->lock);
> +    for (prev = &e->tasks; *prev && cb->priority_higher(*prev, t); prev =
> &(*prev)->next) +        /* nothing */;
> +    add_task(prev, t);
> +    pthread_cond_signal(&e->cond);
> +    pthread_mutex_unlock(&e->lock);
> +}
> +
> +void ff_vvc_executor_wakeup(VVCExecutor *e)
> +{
> +    pthread_mutex_lock(&e->lock);
> +    pthread_cond_broadcast(&e->cond);
> +    pthread_mutex_unlock(&e->lock);

Signaling a condition variable without changing any state makes no sense.

> +}
Lynne May 21, 2023, 2:11 p.m. UTC | #2
May 21, 2023, 15:03 by nuomi2021@gmail.com:

> The executor design pattern was inroduced by java
> <https://docs.oracle.com/en/java/javase/20/docs/api/java.base/java/util/concurrent/Executor.html>
> it also adapted by python
> <https://docs.python.org/3/library/concurrent.futures.html>
> Compared to handcrafted thread pool management, it greatly simplifies the thread code.
> ---
>  libavcodec/Makefile           |   1 +
>  libavcodec/vvc/Makefile       |   4 +
>  libavcodec/vvc/vvc_executor.c | 193 ++++++++++++++++++++++++++++++++++
>  libavcodec/vvc/vvc_executor.h |  73 +++++++++++++
>  4 files changed, 271 insertions(+)
>  create mode 100644 libavcodec/vvc/Makefile
>  create mode 100644 libavcodec/vvc/vvc_executor.c
>  create mode 100644 libavcodec/vvc/vvc_executor.h
>

I'd like a unified API, since for AV1, we'd want something similar.
Having a custom one for each codec is a bad idea and prone to bitrotting.
Nuo Mi May 21, 2023, 2:24 p.m. UTC | #3
Hi Rémi,
Thank you for the cmments

On Sun, May 21, 2023 at 9:15 PM Rémi Denis-Courmont <remi@remlab.net> wrote:

> Le sunnuntaina 21. toukokuuta 2023, 16.03.06 EEST Nuo Mi a écrit :
> > The executor design pattern was inroduced by java
> > <
> https://docs.oracle.com/en/java/javase/20/docs/api/java.base/java/util/conc
> > urrent/Executor.html> it also adapted by python
> > <https://docs.python.org/3/library/concurrent.futures.html>
> > Compared to handcrafted thread pool management, it greatly simplifies the
> > thread code. ---
> >  libavcodec/Makefile           |   1 +
> >  libavcodec/vvc/Makefile       |   4 +
> >  libavcodec/vvc/vvc_executor.c | 193 ++++++++++++++++++++++++++++++++++
> >  libavcodec/vvc/vvc_executor.h |  73 +++++++++++++
> >  4 files changed, 271 insertions(+)
> >  create mode 100644 libavcodec/vvc/Makefile
> >  create mode 100644 libavcodec/vvc/vvc_executor.c
> >  create mode 100644 libavcodec/vvc/vvc_executor.h
> >
> > diff --git a/libavcodec/Makefile b/libavcodec/Makefile
> > index dab09f483a..b1fcbf71b2 100644
> > --- a/libavcodec/Makefile
> > +++ b/libavcodec/Makefile
> > @@ -62,6 +62,7 @@ OBJS = ac3_parser.o
>
> >              \ xiph.o
>
> >     \
> >
> >  # subsystems
> > +include $(SRC_PATH)/libavcodec/vvc/Makefile
> >  OBJS-$(CONFIG_AANDCTTABLES)            += aandcttab.o
> >  OBJS-$(CONFIG_AC3DSP)                  += ac3dsp.o ac3.o ac3tab.o
> >  OBJS-$(CONFIG_ADTS_HEADER)             += adts_header.o
> > mpeg4audio_sample_rates.o diff --git a/libavcodec/vvc/Makefile
> > b/libavcodec/vvc/Makefile
> > new file mode 100644
> > index 0000000000..c4b93e0389
> > --- /dev/null
> > +++ b/libavcodec/vvc/Makefile
> > @@ -0,0 +1,4 @@
> > +clean::
> > +     $(RM) $(CLEANSUFFIXES:%=libavcodec/vvc/%)
> > +
> > +OBJS-$(CONFIG_VVC_DECODER)          +=  vvc/vvc_executor.o
> > diff --git a/libavcodec/vvc/vvc_executor.c
> b/libavcodec/vvc/vvc_executor.c
> > new file mode 100644
> > index 0000000000..f2afdf79ae
> > --- /dev/null
> > +++ b/libavcodec/vvc/vvc_executor.c
> > @@ -0,0 +1,193 @@
> > +/*
> > + * VVC video Decoder
> > + *
> > + * Copyright (C) 2022 Nuo Mi
> > + *
> > + * This file is part of FFmpeg.
> > + *
> > + * FFmpeg is free software; you can redistribute it and/or
> > + * modify it under the terms of the GNU Lesser General Public
> > + * License as published by the Free Software Foundation; either
> > + * version 2.1 of the License, or (at your option) any later version.
> > + *
> > + * FFmpeg is distributed in the hope that it will be useful,
> > + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> > + * Lesser General Public License for more details.
> > + *
> > + * You should have received a copy of the GNU Lesser General Public
> > + * License along with FFmpeg; if not, write to the Free Software
> > + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
> 02110-1301
> > USA + */
> > +#include "libavutil/avutil.h"
> > +#include "libavutil/thread.h"
> > +
> > +#include "vvc_executor.h"
>
> This does not seem specific to VVC in any way, so the naming (and choice
> of
> folder placement) is rather weird.
>
> > +
> > +typedef struct ThreadInfo {
> > +    int idx;
> > +    VVCExecutor *e;
> > +    pthread_t thread;
> > +} ThreadInfo;
> > +
> > +struct VVCExecutor {
> > +    VVCTaskCallbacks cb;
> > +    ThreadInfo *threads;
> > +    uint8_t *local_contexts;
>
> It seems odd and needless complex to separate this from the thread info.
> It
> looks like you could simply append a pointer or a flexible array to
> ThreadInfo
> instead.
>
 Do you mean in VVCTasklet? No, we do not want to expose ThreadInfo details
to users.


> > +    int thread_count;
> > +
> > +    pthread_mutex_t lock;
> > +    pthread_cond_t cond;
> > +    int die;
> > +    VVCTasklet *tasks;
> > +};
> > +
> > +static void remove_task(VVCTasklet **prev, VVCTasklet *t)
> > +{
> > +    *prev  = t->next;
> > +    t->next = NULL;
> > +}
> > +
> > +static void add_task(VVCTasklet **prev, VVCTasklet *t)
> > +{
> > +    t->next = *prev;
> > +    *prev   = t;
> > +}
> > +
> > +static void *executor_worker_task(void *data)
> > +{
> > +    ThreadInfo *ti = (ThreadInfo*)data;
> > +    VVCExecutor *e = ti->e;
> > +    void *lc       = e->local_contexts + ti->idx *
> > e->cb.local_context_size;
> > +    VVCTasklet **prev;
> > +    VVCTaskCallbacks *cb = &e->cb;
> > +
> > +    pthread_mutex_lock(&e->lock);
> > +    while (1) {
> > +        VVCTasklet* t = NULL;
> > +        if (e->die) break;
> > +
> > +        for (prev = &e->tasks; *prev; prev = &(*prev)->next) {
> > +            if (cb->ready(*prev, cb->user_data)) {
> > +                t = *prev;
> > +                break;
> > +            }
> > +        }
> > +        if (t) {
> > +            //found one task
> > +            remove_task(prev, t);
> > +            pthread_mutex_unlock(&e->lock);
> > +            cb->run(t, lc, cb->user_data);
> > +            pthread_mutex_lock(&e->lock);
> > +        } else {
> > +            //no task in one loop
> > +            pthread_cond_wait(&e->cond, &e->lock);
> > +        }
> > +    }
> > +    pthread_mutex_unlock(&e->lock);
> > +    return NULL;
> > +}
> > +
> > +VVCExecutor* ff_vvc_executor_alloc(const VVCTaskCallbacks *cb, int
> > thread_count) +{
> > +    VVCExecutor *e;
> > +    int i, j, ret;
> > +    if (!cb || !cb->user_data || !cb->ready || !cb->run ||
> > !cb->priority_higher) +        return NULL;
> > +    e = av_calloc(1, sizeof(*e));
> > +    if (!e)
> > +        return NULL;
> > +    e->cb = *cb;
> > +
> > +    e->local_contexts = av_malloc(thread_count *
> e->cb.local_context_size);
> > +    if (!e->local_contexts)
> > +        goto free_executor;
> > +
> > +    e->threads = av_calloc(thread_count, sizeof(*e->threads));
> > +    if (!e->threads)
> > +        goto free_contexts;
> > +    for (i = 0; i < thread_count; i++) {
> > +        ThreadInfo *ti = e->threads + i;
> > +        ti->e = e;
> > +        ti->idx = i;
> > +    }
> > +
> > +    ret = pthread_mutex_init(&e->lock, NULL);
> > +    if (ret)
> > +        goto free_threads;
> > +
> > +    ret = pthread_cond_init(&e->cond, NULL);
> > +    if (ret)
> > +        goto destroy_lock;
> > +
> > +    for (i = 0; i < thread_count; i++) {
> > +        ThreadInfo *ti = e->threads + i;
> > +        ret = pthread_create(&ti->thread, NULL, executor_worker_task,
> ti);
> > +        if (ret)
> > +            goto join_threads;
> > +    }
> > +    e->thread_count = thread_count;
> > +    return e;
> > +
> > +join_threads:
> > +    pthread_mutex_lock(&e->lock);
> > +    e->die = 1;
> > +    pthread_cond_broadcast(&e->cond);
> > +    pthread_mutex_unlock(&e->lock);
> > +    for (j = 0; j < i; j++)
> > +        pthread_join(e->threads[j].thread, NULL);
> > +    pthread_cond_destroy(&e->cond);
> > +destroy_lock:
> > +    pthread_mutex_destroy(&e->lock);
> > +free_threads:
> > +    av_free(e->threads);
> > +free_contexts:
> > +    av_free(e->local_contexts);
> > +free_executor:
> > +    free(e);
> > +    return NULL;
> > +}
> > +
> > +void ff_vvc_executor_free(VVCExecutor **executor)
> > +{
> > +    VVCExecutor *e;
> > +    if (!executor || !*executor)
> > +        return;
> > +    e = *executor;
> > +
> > +    //singal die
> > +    pthread_mutex_lock(&e->lock);
> > +    e->die = 1;
> > +    pthread_cond_broadcast(&e->cond);
> > +    pthread_mutex_unlock(&e->lock);
> > +
> > +    for (int i = 0; i < e->thread_count; i++)
> > +        pthread_join(e->threads[i].thread, NULL);
> > +    pthread_cond_destroy(&e->cond);
> > +    pthread_mutex_destroy(&e->lock);
> > +
> > +    av_free(e->threads);
> > +    av_free(e->local_contexts);
> > +
> > +    av_freep(executor);
> > +}
> > +
> > +void ff_vvc_executor_execute(VVCExecutor *e, VVCTasklet *t)
> > +{
> > +    VVCTaskCallbacks *cb = &e->cb;
> > +    VVCTasklet **prev;
> > +
> > +    pthread_mutex_lock(&e->lock);
> > +    for (prev = &e->tasks; *prev && cb->priority_higher(*prev, t); prev
> =
> > &(*prev)->next) +        /* nothing */;
> > +    add_task(prev, t);
> > +    pthread_cond_signal(&e->cond);
> > +    pthread_mutex_unlock(&e->lock);
> > +}
> > +
> > +void ff_vvc_executor_wakeup(VVCExecutor *e)
> > +{
> > +    pthread_mutex_lock(&e->lock);
> > +    pthread_cond_broadcast(&e->cond);
> > +    pthread_mutex_unlock(&e->lock);
>
> Signaling a condition variable without changing any state makes no sense.
>
It's for error handling.

Imaging last all thread are waiting for ready jobs except the last one.
The last one thread has some error, it reports an error to the caller.
The caller needs to wake up threads to flush the task list.


> > +}
>
> --
> Rémi Denis-Courmont
> http://www.remlab.net/
>
>
>
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel@ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
>
Nuo Mi May 21, 2023, 2:44 p.m. UTC | #4
On Sun, May 21, 2023 at 9:15 PM Rémi Denis-Courmont <remi@remlab.net> wrote:

> Le sunnuntaina 21. toukokuuta 2023, 16.03.06 EEST Nuo Mi a écrit :
> > The executor design pattern was inroduced by java
> > <
> https://docs.oracle.com/en/java/javase/20/docs/api/java.base/java/util/conc
> > urrent/Executor.html> it also adapted by python
> > <https://docs.python.org/3/library/concurrent.futures.html>
> > Compared to handcrafted thread pool management, it greatly simplifies the
> > thread code. ---
> >  libavcodec/Makefile           |   1 +
> >  libavcodec/vvc/Makefile       |   4 +
> >  libavcodec/vvc/vvc_executor.c | 193 ++++++++++++++++++++++++++++++++++
> >  libavcodec/vvc/vvc_executor.h |  73 +++++++++++++
> >  4 files changed, 271 insertions(+)
> >  create mode 100644 libavcodec/vvc/Makefile
> >  create mode 100644 libavcodec/vvc/vvc_executor.c
> >  create mode 100644 libavcodec/vvc/vvc_executor.h
> >
> > diff --git a/libavcodec/Makefile b/libavcodec/Makefile
> > index dab09f483a..b1fcbf71b2 100644
> > --- a/libavcodec/Makefile
> > +++ b/libavcodec/Makefile
> > @@ -62,6 +62,7 @@ OBJS = ac3_parser.o
>
> >              \ xiph.o
>
> >     \
> >
> >  # subsystems
> > +include $(SRC_PATH)/libavcodec/vvc/Makefile
> >  OBJS-$(CONFIG_AANDCTTABLES)            += aandcttab.o
> >  OBJS-$(CONFIG_AC3DSP)                  += ac3dsp.o ac3.o ac3tab.o
> >  OBJS-$(CONFIG_ADTS_HEADER)             += adts_header.o
> > mpeg4audio_sample_rates.o diff --git a/libavcodec/vvc/Makefile
> > b/libavcodec/vvc/Makefile
> > new file mode 100644
> > index 0000000000..c4b93e0389
> > --- /dev/null
> > +++ b/libavcodec/vvc/Makefile
> > @@ -0,0 +1,4 @@
> > +clean::
> > +     $(RM) $(CLEANSUFFIXES:%=libavcodec/vvc/%)
> > +
> > +OBJS-$(CONFIG_VVC_DECODER)          +=  vvc/vvc_executor.o
> > diff --git a/libavcodec/vvc/vvc_executor.c
> b/libavcodec/vvc/vvc_executor.c
> > new file mode 100644
> > index 0000000000..f2afdf79ae
> > --- /dev/null
> > +++ b/libavcodec/vvc/vvc_executor.c
> > @@ -0,0 +1,193 @@
> > +/*
> > + * VVC video Decoder
> > + *
> > + * Copyright (C) 2022 Nuo Mi
> > + *
> > + * This file is part of FFmpeg.
> > + *
> > + * FFmpeg is free software; you can redistribute it and/or
> > + * modify it under the terms of the GNU Lesser General Public
> > + * License as published by the Free Software Foundation; either
> > + * version 2.1 of the License, or (at your option) any later version.
> > + *
> > + * FFmpeg is distributed in the hope that it will be useful,
> > + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> > + * Lesser General Public License for more details.
> > + *
> > + * You should have received a copy of the GNU Lesser General Public
> > + * License along with FFmpeg; if not, write to the Free Software
> > + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
> 02110-1301
> > USA + */
> > +#include "libavutil/avutil.h"
> > +#include "libavutil/thread.h"
> > +
> > +#include "vvc_executor.h"
>
> This does not seem specific to VVC in any way, so the naming (and choice
> of
> folder placement) is rather weird.
>
The utils can use as filter, encoder or any component.
Will move to libavcodec first. Somebody can promote it to libavutils if
they find it useful.

>
> > +
> > +typedef struct ThreadInfo {
> > +    int idx;
> > +    VVCExecutor *e;
> > +    pthread_t thread;
> > +} ThreadInfo;
> > +
> > +struct VVCExecutor {
> > +    VVCTaskCallbacks cb;
> > +    ThreadInfo *threads;
> > +    uint8_t *local_contexts;
>
> It seems odd and needless complex to separate this from the thread info.
> It
> looks like you could simply append a pointer or a flexible array to
> ThreadInfo
> instead.
>
> > +    int thread_count;
> > +
> > +    pthread_mutex_t lock;
> > +    pthread_cond_t cond;
> > +    int die;
> > +    VVCTasklet *tasks;
> > +};
> > +
> > +static void remove_task(VVCTasklet **prev, VVCTasklet *t)
> > +{
> > +    *prev  = t->next;
> > +    t->next = NULL;
> > +}
> > +
> > +static void add_task(VVCTasklet **prev, VVCTasklet *t)
> > +{
> > +    t->next = *prev;
> > +    *prev   = t;
> > +}
> > +
> > +static void *executor_worker_task(void *data)
> > +{
> > +    ThreadInfo *ti = (ThreadInfo*)data;
> > +    VVCExecutor *e = ti->e;
> > +    void *lc       = e->local_contexts + ti->idx *
> > e->cb.local_context_size;
> > +    VVCTasklet **prev;
> > +    VVCTaskCallbacks *cb = &e->cb;
> > +
> > +    pthread_mutex_lock(&e->lock);
> > +    while (1) {
> > +        VVCTasklet* t = NULL;
> > +        if (e->die) break;
> > +
> > +        for (prev = &e->tasks; *prev; prev = &(*prev)->next) {
> > +            if (cb->ready(*prev, cb->user_data)) {
> > +                t = *prev;
> > +                break;
> > +            }
> > +        }
> > +        if (t) {
> > +            //found one task
> > +            remove_task(prev, t);
> > +            pthread_mutex_unlock(&e->lock);
> > +            cb->run(t, lc, cb->user_data);
> > +            pthread_mutex_lock(&e->lock);
> > +        } else {
> > +            //no task in one loop
> > +            pthread_cond_wait(&e->cond, &e->lock);
> > +        }
> > +    }
> > +    pthread_mutex_unlock(&e->lock);
> > +    return NULL;
> > +}
> > +
> > +VVCExecutor* ff_vvc_executor_alloc(const VVCTaskCallbacks *cb, int
> > thread_count) +{
> > +    VVCExecutor *e;
> > +    int i, j, ret;
> > +    if (!cb || !cb->user_data || !cb->ready || !cb->run ||
> > !cb->priority_higher) +        return NULL;
> > +    e = av_calloc(1, sizeof(*e));
> > +    if (!e)
> > +        return NULL;
> > +    e->cb = *cb;
> > +
> > +    e->local_contexts = av_malloc(thread_count *
> e->cb.local_context_size);
> > +    if (!e->local_contexts)
> > +        goto free_executor;
> > +
> > +    e->threads = av_calloc(thread_count, sizeof(*e->threads));
> > +    if (!e->threads)
> > +        goto free_contexts;
> > +    for (i = 0; i < thread_count; i++) {
> > +        ThreadInfo *ti = e->threads + i;
> > +        ti->e = e;
> > +        ti->idx = i;
> > +    }
> > +
> > +    ret = pthread_mutex_init(&e->lock, NULL);
> > +    if (ret)
> > +        goto free_threads;
> > +
> > +    ret = pthread_cond_init(&e->cond, NULL);
> > +    if (ret)
> > +        goto destroy_lock;
> > +
> > +    for (i = 0; i < thread_count; i++) {
> > +        ThreadInfo *ti = e->threads + i;
> > +        ret = pthread_create(&ti->thread, NULL, executor_worker_task,
> ti);
> > +        if (ret)
> > +            goto join_threads;
> > +    }
> > +    e->thread_count = thread_count;
> > +    return e;
> > +
> > +join_threads:
> > +    pthread_mutex_lock(&e->lock);
> > +    e->die = 1;
> > +    pthread_cond_broadcast(&e->cond);
> > +    pthread_mutex_unlock(&e->lock);
> > +    for (j = 0; j < i; j++)
> > +        pthread_join(e->threads[j].thread, NULL);
> > +    pthread_cond_destroy(&e->cond);
> > +destroy_lock:
> > +    pthread_mutex_destroy(&e->lock);
> > +free_threads:
> > +    av_free(e->threads);
> > +free_contexts:
> > +    av_free(e->local_contexts);
> > +free_executor:
> > +    free(e);
> > +    return NULL;
> > +}
> > +
> > +void ff_vvc_executor_free(VVCExecutor **executor)
> > +{
> > +    VVCExecutor *e;
> > +    if (!executor || !*executor)
> > +        return;
> > +    e = *executor;
> > +
> > +    //singal die
> > +    pthread_mutex_lock(&e->lock);
> > +    e->die = 1;
> > +    pthread_cond_broadcast(&e->cond);
> > +    pthread_mutex_unlock(&e->lock);
> > +
> > +    for (int i = 0; i < e->thread_count; i++)
> > +        pthread_join(e->threads[i].thread, NULL);
> > +    pthread_cond_destroy(&e->cond);
> > +    pthread_mutex_destroy(&e->lock);
> > +
> > +    av_free(e->threads);
> > +    av_free(e->local_contexts);
> > +
> > +    av_freep(executor);
> > +}
> > +
> > +void ff_vvc_executor_execute(VVCExecutor *e, VVCTasklet *t)
> > +{
> > +    VVCTaskCallbacks *cb = &e->cb;
> > +    VVCTasklet **prev;
> > +
> > +    pthread_mutex_lock(&e->lock);
> > +    for (prev = &e->tasks; *prev && cb->priority_higher(*prev, t); prev
> =
> > &(*prev)->next) +        /* nothing */;
> > +    add_task(prev, t);
> > +    pthread_cond_signal(&e->cond);
> > +    pthread_mutex_unlock(&e->lock);
> > +}
> > +
> > +void ff_vvc_executor_wakeup(VVCExecutor *e)
> > +{
> > +    pthread_mutex_lock(&e->lock);
> > +    pthread_cond_broadcast(&e->cond);
> > +    pthread_mutex_unlock(&e->lock);
>
> Signaling a condition variable without changing any state makes no sense.
>
> > +}
>
> --
> Rémi Denis-Courmont
> http://www.remlab.net/
>
>
>
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel@ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
>
Nuo Mi May 21, 2023, 3:03 p.m. UTC | #5
On Sun, May 21, 2023 at 10:11 PM Lynne <dev@lynne.ee> wrote:

> May 21, 2023, 15:03 by nuomi2021@gmail.com:
>
> > The executor design pattern was inroduced by java
> > <
> https://docs.oracle.com/en/java/javase/20/docs/api/java.base/java/util/concurrent/Executor.html
> >
> > it also adapted by python
> > <https://docs.python.org/3/library/concurrent.futures.html>
> > Compared to handcrafted thread pool management, it greatly simplifies
> the thread code.
> > ---
> >  libavcodec/Makefile           |   1 +
> >  libavcodec/vvc/Makefile       |   4 +
> >  libavcodec/vvc/vvc_executor.c | 193 ++++++++++++++++++++++++++++++++++
> >  libavcodec/vvc/vvc_executor.h |  73 +++++++++++++
> >  4 files changed, 271 insertions(+)
> >  create mode 100644 libavcodec/vvc/Makefile
> >  create mode 100644 libavcodec/vvc/vvc_executor.c
> >  create mode 100644 libavcodec/vvc/vvc_executor.h
> >
>
> I'd like a unified API, since for AV1, we'd want something similar.
> Having a custom one for each codec is a bad idea and prone to bitrotting.
>
Hi Lynne,
Thanks for the comment.

dav1d has not merged to mainline yet.
It uses the same idea, but all codes are hand-crafted. Did not use any
existing patterns.
I guess the executor will simplify the dav1d's code.
Let us find out when it is sent to FFmpeg for review.

Current ffmpeg's thread management has limitations:
1. Slice thread bound to frame, it can't not us by another frame even
current frame has no job.
2. It is a one-shot mechanism. Can't support multi-stage decoder. So it
can't fully use the multiple cores.
This is why we need a new one.
Lynne May 21, 2023, 3:06 p.m. UTC | #6
May 21, 2023, 17:06 by nuomi2021@gmail.com:

> On Sun, May 21, 2023 at 10:11 PM Lynne <dev@lynne.ee> wrote:
>
>> May 21, 2023, 15:03 by nuomi2021@gmail.com:
>>
>> > The executor design pattern was inroduced by java
>> > <
>> https://docs.oracle.com/en/java/javase/20/docs/api/java.base/java/util/concurrent/Executor.html
>> >
>> > it also adapted by python
>> > <https://docs.python.org/3/library/concurrent.futures.html>
>> > Compared to handcrafted thread pool management, it greatly simplifies
>> the thread code.
>> > ---
>> >  libavcodec/Makefile           |   1 +
>> >  libavcodec/vvc/Makefile       |   4 +
>> >  libavcodec/vvc/vvc_executor.c | 193 ++++++++++++++++++++++++++++++++++
>> >  libavcodec/vvc/vvc_executor.h |  73 +++++++++++++
>> >  4 files changed, 271 insertions(+)
>> >  create mode 100644 libavcodec/vvc/Makefile
>> >  create mode 100644 libavcodec/vvc/vvc_executor.c
>> >  create mode 100644 libavcodec/vvc/vvc_executor.h
>> >
>>
>> I'd like a unified API, since for AV1, we'd want something similar.
>> Having a custom one for each codec is a bad idea and prone to bitrotting.
>>
> Hi Lynne,
> Thanks for the comment.
>
> dav1d has not merged to mainline yet.
> It uses the same idea, but all codes are hand-crafted. Did not use any
> existing patterns.
> I guess the executor will simplify the dav1d's code.
> Let us find out when it is sent to FFmpeg for review.
>
> Current ffmpeg's thread management has limitations:
> 1. Slice thread bound to frame, it can't not us by another frame even
> current frame has no job.
> 2. It is a one-shot mechanism. Can't support multi-stage decoder. So it
> can't fully use the multiple cores.
> This is why we need a new one.
>

I don't mind a new one, I'd like for it to not be specific for VVC from the
start so it can be reused and improved.
Nuo Mi May 21, 2023, 3:16 p.m. UTC | #7
On Sun, May 21, 2023 at 11:07 PM Lynne <dev@lynne.ee> wrote:

> May 21, 2023, 17:06 by nuomi2021@gmail.com:
>
> > On Sun, May 21, 2023 at 10:11 PM Lynne <dev@lynne.ee> wrote:
> >
> >> May 21, 2023, 15:03 by nuomi2021@gmail.com:
> >>
> >> > The executor design pattern was inroduced by java
> >> > <
> >>
> https://docs.oracle.com/en/java/javase/20/docs/api/java.base/java/util/concurrent/Executor.html
> >> >
> >> > it also adapted by python
> >> > <https://docs.python.org/3/library/concurrent.futures.html>
> >> > Compared to handcrafted thread pool management, it greatly simplifies
> >> the thread code.
> >> > ---
> >> >  libavcodec/Makefile           |   1 +
> >> >  libavcodec/vvc/Makefile       |   4 +
> >> >  libavcodec/vvc/vvc_executor.c | 193
> ++++++++++++++++++++++++++++++++++
> >> >  libavcodec/vvc/vvc_executor.h |  73 +++++++++++++
> >> >  4 files changed, 271 insertions(+)
> >> >  create mode 100644 libavcodec/vvc/Makefile
> >> >  create mode 100644 libavcodec/vvc/vvc_executor.c
> >> >  create mode 100644 libavcodec/vvc/vvc_executor.h
> >> >
> >>
> >> I'd like a unified API, since for AV1, we'd want something similar.
> >> Having a custom one for each codec is a bad idea and prone to
> bitrotting.
> >>
> > Hi Lynne,
> > Thanks for the comment.
> >
> > dav1d has not merged to mainline yet.
> > It uses the same idea, but all codes are hand-crafted. Did not use any
> > existing patterns.
> > I guess the executor will simplify the dav1d's code.
> > Let us find out when it is sent to FFmpeg for review.
> >
> > Current ffmpeg's thread management has limitations:
> > 1. Slice thread bound to frame, it can't not us by another frame even
> > current frame has no job.
> > 2. It is a one-shot mechanism. Can't support multi-stage decoder. So it
> > can't fully use the multiple cores.
> > This is why we need a new one.
> >
>
> I don't mind a new one, I'd like for it to not be specific for VVC from the
> start so it can be reused and improved.
>
Sure. Will move it to libavcodec.


> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel@ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
>
Rémi Denis-Courmont May 22, 2023, 4:29 p.m. UTC | #8
Le sunnuntaina 21. toukokuuta 2023, 17.24.56 EEST Nuo Mi a écrit :
> > > +
> > > +typedef struct ThreadInfo {
> > > +    int idx;
> > > +    VVCExecutor *e;
> > > +    pthread_t thread;
> > > +} ThreadInfo;
> > > +
> > > +struct VVCExecutor {
> > > +    VVCTaskCallbacks cb;
> > > +    ThreadInfo *threads;
> > > +    uint8_t *local_contexts;
> > 
> > It seems odd and needless complex to separate this from the thread info.
> > It
> > looks like you could simply append a pointer or a flexible array to
> > ThreadInfo
> > instead.
> 
>  Do you mean in VVCTasklet? No, we do not want to expose ThreadInfo details
> to users.

First, it feels very weird for a generic thread pool mechanism to have opaque 
per-thread state at all. AFAIK, any client state should be in the jobs, not 
the threads.

But even if... then the per-thread state should be in the thread structure, 
and the thread index should be unnecessary.

> > Signaling a condition variable without changing any state makes no sense.
> 
> It's for error handling.

Yeah, no.

> Imaging last all thread are waiting for ready jobs except the last one.
> The last one thread has some error, it reports an error to the caller.
> The caller needs to wake up threads to flush the task list.

That's not what this code does.

And you don't need to wake up all threads to flush the task list. There are two 
situations where it makes sense to wake more than one thread:
- When you want all to terminate to destroy the pool.
- When you have atomically queued so many jobs that you want to wake up idle 
runners at once, rather than signal them one at a time. (This is really only 
an optimisation, and only makes sense if you have really a lot of threads.)
Nuo Mi May 23, 2023, 6:23 a.m. UTC | #9
On Tue, May 23, 2023 at 12:29 AM Rémi Denis-Courmont <remi@remlab.net>
wrote:

> Le sunnuntaina 21. toukokuuta 2023, 17.24.56 EEST Nuo Mi a écrit :
> > > > +
> > > > +typedef struct ThreadInfo {
> > > > +    int idx;
> > > > +    VVCExecutor *e;
> > > > +    pthread_t thread;
> > > > +} ThreadInfo;
> > > > +
> > > > +struct VVCExecutor {
> > > > +    VVCTaskCallbacks cb;
> > > > +    ThreadInfo *threads;
> > > > +    uint8_t *local_contexts;
> > >
> > > It seems odd and needless complex to separate this from the thread
> info.
> > > It
> > > looks like you could simply append a pointer or a flexible array to
> > > ThreadInfo
> > > instead.
> >
> >  Do you mean in VVCTasklet? No, we do not want to expose ThreadInfo
> details
> > to users.
>
> First, it feels very weird for a generic thread pool mechanism to have
> opaque
> per-thread state at all. AFAIK, any client state should be in the jobs,
> not
> the threads.
>
Decoders need some temporary buffer.  We call it local context.
They are stateless. This means it is invalid before and after the job. This
is why we allocate it for thread instead of job.

>
> But even if... then the per-thread state should be in the thread
> structure,
> and the thread index should be unnecessary.
>
I can move local_context to ThreadInfo and remove idx if you prefer.
But this will do multiple allocations for the local context. Make the
allocation fail handling complicated.

>
> > > Signaling a condition variable without changing any state makes no
> sense.
> >
> > It's for error handling.
>
> Yeah, no.
>
> > Imaging last all thread are waiting for ready jobs except the last one.
> > The last one thread has some error, it reports an error to the caller.
> > The caller needs to wake up threads to flush the task list.
>

> That's not what this code does.
>
Currently, it is only for missed slices. all tasks for the frame will abort.
In the future, it may extend to error resilience and early quit for B frame.


> And you don't need to wake up all threads to flush the task list. There
> are two
> situations where it makes sense to wake more than one thread:
> - When you want all to terminate to destroy the pool.
> - When you have atomically queued so many jobs that you want to wake up
> idle
> runners at once, rather than signal them one at a time. (This is really
> only
> an optimisation, and only makes sense if you have really a lot of threads.)
>
It's more than this. Many blocks' condition may change, it can't handle
efficiently by one thread.
For example, If the cpu usage is high, you want to skip all queued B frames
and its tasks.
Wake Up all threads will speedup the process and free frame slot as quickly
as possible.

Thank you.

>
> --
> レミ・デニ-クールモン
> http://www.remlab.net/
>
>
>
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel@ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
>
diff mbox series

Patch

diff --git a/libavcodec/Makefile b/libavcodec/Makefile
index dab09f483a..b1fcbf71b2 100644
--- a/libavcodec/Makefile
+++ b/libavcodec/Makefile
@@ -62,6 +62,7 @@  OBJS = ac3_parser.o                                                     \
        xiph.o                                                           \
 
 # subsystems
+include $(SRC_PATH)/libavcodec/vvc/Makefile
 OBJS-$(CONFIG_AANDCTTABLES)            += aandcttab.o
 OBJS-$(CONFIG_AC3DSP)                  += ac3dsp.o ac3.o ac3tab.o
 OBJS-$(CONFIG_ADTS_HEADER)             += adts_header.o mpeg4audio_sample_rates.o
diff --git a/libavcodec/vvc/Makefile b/libavcodec/vvc/Makefile
new file mode 100644
index 0000000000..c4b93e0389
--- /dev/null
+++ b/libavcodec/vvc/Makefile
@@ -0,0 +1,4 @@ 
+clean::
+	$(RM) $(CLEANSUFFIXES:%=libavcodec/vvc/%)
+
+OBJS-$(CONFIG_VVC_DECODER)          +=  vvc/vvc_executor.o
diff --git a/libavcodec/vvc/vvc_executor.c b/libavcodec/vvc/vvc_executor.c
new file mode 100644
index 0000000000..f2afdf79ae
--- /dev/null
+++ b/libavcodec/vvc/vvc_executor.c
@@ -0,0 +1,193 @@ 
+/*
+ * VVC video Decoder
+ *
+ * Copyright (C) 2022 Nuo Mi
+ *
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+#include "libavutil/avutil.h"
+#include "libavutil/thread.h"
+
+#include "vvc_executor.h"
+
+typedef struct ThreadInfo {
+    int idx;
+    VVCExecutor *e;
+    pthread_t thread;
+} ThreadInfo;
+
+struct VVCExecutor {
+    VVCTaskCallbacks cb;
+    ThreadInfo *threads;
+    uint8_t *local_contexts;
+    int thread_count;
+
+    pthread_mutex_t lock;
+    pthread_cond_t cond;
+    int die;
+    VVCTasklet *tasks;
+};
+
+static void remove_task(VVCTasklet **prev, VVCTasklet *t)
+{
+    *prev  = t->next;
+    t->next = NULL;
+}
+
+static void add_task(VVCTasklet **prev, VVCTasklet *t)
+{
+    t->next = *prev;
+    *prev   = t;
+}
+
+static void *executor_worker_task(void *data)
+{
+    ThreadInfo *ti = (ThreadInfo*)data;
+    VVCExecutor *e = ti->e;
+    void *lc       = e->local_contexts + ti->idx * e->cb.local_context_size;
+    VVCTasklet **prev;
+    VVCTaskCallbacks *cb = &e->cb;
+
+    pthread_mutex_lock(&e->lock);
+    while (1) {
+        VVCTasklet* t = NULL;
+        if (e->die) break;
+
+        for (prev = &e->tasks; *prev; prev = &(*prev)->next) {
+            if (cb->ready(*prev, cb->user_data)) {
+                t = *prev;
+                break;
+            }
+        }
+        if (t) {
+            //found one task
+            remove_task(prev, t);
+            pthread_mutex_unlock(&e->lock);
+            cb->run(t, lc, cb->user_data);
+            pthread_mutex_lock(&e->lock);
+        } else {
+            //no task in one loop
+            pthread_cond_wait(&e->cond, &e->lock);
+        }
+    }
+    pthread_mutex_unlock(&e->lock);
+    return NULL;
+}
+
+VVCExecutor* ff_vvc_executor_alloc(const VVCTaskCallbacks *cb, int thread_count)
+{
+    VVCExecutor *e;
+    int i, j, ret;
+    if (!cb || !cb->user_data || !cb->ready || !cb->run || !cb->priority_higher)
+        return NULL;
+    e = av_calloc(1, sizeof(*e));
+    if (!e)
+        return NULL;
+    e->cb = *cb;
+
+    e->local_contexts = av_malloc(thread_count * e->cb.local_context_size);
+    if (!e->local_contexts)
+        goto free_executor;
+
+    e->threads = av_calloc(thread_count, sizeof(*e->threads));
+    if (!e->threads)
+        goto free_contexts;
+    for (i = 0; i < thread_count; i++) {
+        ThreadInfo *ti = e->threads + i;
+        ti->e = e;
+        ti->idx = i;
+    }
+
+    ret = pthread_mutex_init(&e->lock, NULL);
+    if (ret)
+        goto free_threads;
+
+    ret = pthread_cond_init(&e->cond, NULL);
+    if (ret)
+        goto destroy_lock;
+
+    for (i = 0; i < thread_count; i++) {
+        ThreadInfo *ti = e->threads + i;
+        ret = pthread_create(&ti->thread, NULL, executor_worker_task, ti);
+        if (ret)
+            goto join_threads;
+    }
+    e->thread_count = thread_count;
+    return e;
+
+join_threads:
+    pthread_mutex_lock(&e->lock);
+    e->die = 1;
+    pthread_cond_broadcast(&e->cond);
+    pthread_mutex_unlock(&e->lock);
+    for (j = 0; j < i; j++)
+        pthread_join(e->threads[j].thread, NULL);
+    pthread_cond_destroy(&e->cond);
+destroy_lock:
+    pthread_mutex_destroy(&e->lock);
+free_threads:
+    av_free(e->threads);
+free_contexts:
+    av_free(e->local_contexts);
+free_executor:
+    free(e);
+    return NULL;
+}
+
+void ff_vvc_executor_free(VVCExecutor **executor)
+{
+    VVCExecutor *e;
+    if (!executor || !*executor)
+        return;
+    e = *executor;
+
+    //singal die
+    pthread_mutex_lock(&e->lock);
+    e->die = 1;
+    pthread_cond_broadcast(&e->cond);
+    pthread_mutex_unlock(&e->lock);
+
+    for (int i = 0; i < e->thread_count; i++)
+        pthread_join(e->threads[i].thread, NULL);
+    pthread_cond_destroy(&e->cond);
+    pthread_mutex_destroy(&e->lock);
+
+    av_free(e->threads);
+    av_free(e->local_contexts);
+
+    av_freep(executor);
+}
+
+void ff_vvc_executor_execute(VVCExecutor *e, VVCTasklet *t)
+{
+    VVCTaskCallbacks *cb = &e->cb;
+    VVCTasklet **prev;
+
+    pthread_mutex_lock(&e->lock);
+    for (prev = &e->tasks; *prev && cb->priority_higher(*prev, t); prev = &(*prev)->next)
+        /* nothing */;
+    add_task(prev, t);
+    pthread_cond_signal(&e->cond);
+    pthread_mutex_unlock(&e->lock);
+}
+
+void ff_vvc_executor_wakeup(VVCExecutor *e)
+{
+    pthread_mutex_lock(&e->lock);
+    pthread_cond_broadcast(&e->cond);
+    pthread_mutex_unlock(&e->lock);
+}
diff --git a/libavcodec/vvc/vvc_executor.h b/libavcodec/vvc/vvc_executor.h
new file mode 100644
index 0000000000..79c00d58ca
--- /dev/null
+++ b/libavcodec/vvc/vvc_executor.h
@@ -0,0 +1,73 @@ 
+/*
+ * Copyright (C) 2022 Nuo Mi
+ *
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef AVCODEC_VVC_EXECUTOR_H
+#define AVCODEC_VVC_EXECUTOR_H
+
+typedef struct VVCExecutor VVCExecutor;
+typedef struct VVCTasklet VVCTasklet;
+
+struct VVCTasklet {
+    VVCTasklet *next;
+};
+
+typedef struct VVCTaskCallbacks {
+    void *user_data;
+
+    int local_context_size;
+
+    // return 1 if a's priority > b's priority
+    int (*priority_higher)(const VVCTasklet *a, const VVCTasklet *b);
+
+    // task is ready for run
+    int (*ready)(const VVCTasklet *t, void *user_data);
+
+    // run the task
+    int (*run)(VVCTasklet *t, void *local_context, void *user_data);
+} VVCTaskCallbacks;
+
+/**
+ * Alloc executor
+ * @param callbacks callback strucutre for executor
+ * @param thread_count worker thread number
+ * @return return the executor
+ */
+VVCExecutor* ff_vvc_executor_alloc(const VVCTaskCallbacks *callbacks, int thread_count);
+
+/**
+ * Free executor
+ * @param e  pointer to executor
+ */
+void ff_vvc_executor_free(VVCExecutor **e);
+
+/**
+ * Add task to executor
+ * @param e pointer to executor
+ * @param t pointer to task
+ */
+void ff_vvc_executor_execute(VVCExecutor *e, VVCTasklet *t);
+
+/**
+ * Wakeup all threads
+ * @param e pointer to executor
+ */
+void ff_vvc_executor_wakeup(VVCExecutor *e);
+
+#endif //AVCODEC_VVC_EXECUTOR_H