@@ -48,6 +48,11 @@ typedef struct ThreadInfo {
ExecutorThread thread;
} ThreadInfo;
+typedef struct Queue {
+ FFTask *head;
+ FFTask *tail;
+} Queue;
+
struct FFExecutor {
FFTaskCallbacks cb;
int thread_count;
@@ -60,29 +65,39 @@ struct FFExecutor {
AVCond cond;
int die;
- FFTask *tasks;
+ Queue *q;
};
-static FFTask* remove_task(FFTask **prev, FFTask *t)
+static FFTask* remove_task(Queue *q)
{
- *prev = t->next;
- t->next = NULL;
+ FFTask *t = q->head;
+ if (t) {
+ q->head = t->next;
+ t->next = NULL;
+ if (!q->head)
+ q->tail = NULL;
+ }
return t;
}
-static void add_task(FFTask **prev, FFTask *t)
+static void add_task(Queue *q, FFTask *t)
{
- t->next = *prev;
- *prev = t;
+ t->next = NULL;
+ if (!q->head)
+ q->tail = q->head = t;
+ else
+ q->tail = q->tail->next = t;
}
static int run_one_task(FFExecutor *e, void *lc)
{
FFTaskCallbacks *cb = &e->cb;
- FFTask **prev = &e->tasks;
+ FFTask *t = NULL;
+
+ for (int i = 0; i < e->cb.priorities && !t; i++)
+ t = remove_task(e->q + i);
- if (*prev) {
- FFTask *t = remove_task(prev, *prev);
+ if (t) {
if (e->thread_count > 0)
ff_mutex_unlock(&e->lock);
cb->run(t, lc, cb->user_data);
@@ -132,6 +147,7 @@ static void executor_free(FFExecutor *e, const int has_lock, const int has_cond)
ff_mutex_destroy(&e->lock);
av_free(e->threads);
+ av_free(e->q);
av_free(e->local_contexts);
av_free(e);
@@ -141,7 +157,7 @@ FFExecutor* ff_executor_alloc(const FFTaskCallbacks *cb, int thread_count)
{
FFExecutor *e;
int has_lock = 0, has_cond = 0;
- if (!cb || !cb->user_data || !cb->run || !cb->priority_higher)
+ if (!cb || !cb->user_data || !cb->run || !cb->priorities)
return NULL;
e = av_mallocz(sizeof(*e));
@@ -153,6 +169,10 @@ FFExecutor* ff_executor_alloc(const FFTaskCallbacks *cb, int thread_count)
if (!e->local_contexts)
goto free_executor;
+ e->q = av_calloc(e->cb.priorities, sizeof(Queue));
+ if (!e->q)
+ goto free_executor;
+
e->threads = av_calloc(FFMAX(thread_count, 1), sizeof(*e->threads));
if (!e->threads)
goto free_executor;
@@ -192,16 +212,10 @@ void ff_executor_free(FFExecutor **executor)
void ff_executor_execute(FFExecutor *e, FFTask *t)
{
- FFTaskCallbacks *cb = &e->cb;
- FFTask **prev;
-
if (e->thread_count)
ff_mutex_lock(&e->lock);
- if (t) {
- for (prev = &e->tasks; *prev && cb->priority_higher(*prev, t); prev = &(*prev)->next)
- /* nothing */;
- add_task(prev, t);
- }
+ if (t)
+ add_task(e->q + t->priority % e->cb.priorities, t);
if (e->thread_count) {
ff_cond_signal(&e->cond);
ff_mutex_unlock(&e->lock);
@@ -32,6 +32,7 @@ typedef struct FFTask FFTask;
struct FFTask {
FFTask *next;
+ int priority; // task priority should >= 0 and < AVTaskCallbacks.priorities
};
typedef struct FFTaskCallbacks {
@@ -39,8 +40,8 @@ typedef struct FFTaskCallbacks {
int local_context_size;
- // return 1 if a's priority > b's priority
- int (*priority_higher)(const FFTask *a, const FFTask *b);
+ // how many priorities do we have?
+ int priorities;
// run the task
int (*run)(FFTask *t, void *local_context, void *user_data);
@@ -103,13 +103,28 @@ typedef struct VVCFrameThread {
AVCond cond;
} VVCFrameThread;
+#define PRIORITY_LOWEST 2
static void add_task(VVCContext *s, VVCTask *t)
{
- VVCFrameThread *ft = t->fc->ft;
+ VVCFrameThread *ft = t->fc->ft;
+ FFTask *task = &t->u.task;
+ const int priorities[] = {
+ 0, // VVC_TASK_STAGE_INIT,
+ 0, // VVC_TASK_STAGE_PARSE,
+ // For an 8K clip, a CTU line completed in the reference frame may trigger 64 and more inter tasks.
+ // We assign these tasks the lowest priority to avoid being overwhelmed with inter tasks.
+ PRIORITY_LOWEST, // VVC_TASK_STAGE_INTER
+ 1, // VVC_TASK_STAGE_RECON,
+ 1, // VVC_TASK_STAGE_LMCS,
+ 1, // VVC_TASK_STAGE_DEBLOCK_V,
+ 1, // VVC_TASK_STAGE_DEBLOCK_H,
+ 1, // VVC_TASK_STAGE_SAO,
+ 1, // VVC_TASK_STAGE_ALF,
+ };
atomic_fetch_add(&ft->nb_scheduled_tasks, 1);
-
- ff_executor_execute(s->executor, &t->u.task);
+ task->priority = priorities[t->stage];
+ ff_executor_execute(s->executor, task);
}
static void task_init(VVCTask *t, VVCTaskStage stage, VVCFrameContext *fc, const int rx, const int ry)
@@ -372,31 +387,6 @@ static int task_is_stage_ready(VVCTask *t, int add)
return task_has_target_score(t, stage, score);
}
-#define CHECK(a, b) \
- do { \
- if ((a) != (b)) \
- return (a) < (b); \
- } while (0)
-
-static int task_priority_higher(const FFTask *_a, const FFTask *_b)
-{
- const VVCTask *a = (const VVCTask*)_a;
- const VVCTask *b = (const VVCTask*)_b;
-
-
- if (a->stage <= VVC_TASK_STAGE_PARSE || b->stage <= VVC_TASK_STAGE_PARSE) {
- CHECK(a->stage, b->stage);
- CHECK(a->fc->decode_order, b->fc->decode_order); //decode order
- CHECK(a->ry, b->ry);
- return a->rx < b->rx;
- }
-
- CHECK(a->fc->decode_order, b->fc->decode_order); //decode order
- CHECK(a->rx + a->ry + a->stage, b->rx + b->ry + b->stage); //zigzag with type
- CHECK(a->rx + a->ry, b->rx + b->ry); //zigzag
- return a->ry < b->ry;
-}
-
static void check_colocation(VVCContext *s, VVCTask *t)
{
const VVCFrameContext *fc = t->fc;
@@ -681,7 +671,7 @@ FFExecutor* ff_vvc_executor_alloc(VVCContext *s, const int thread_count)
FFTaskCallbacks callbacks = {
s,
sizeof(VVCLocalContext),
- task_priority_higher,
+ PRIORITY_LOWEST + 1,
task_run,
};
return ff_executor_alloc(&callbacks, thread_count);