diff mbox

[FFmpeg-devel] avcodec/frame_thread_encoder: factorize frame threading

Message ID 20190407184705.14920-1-cus@passwd.hu
State New
Headers show

Commit Message

Marton Balint April 7, 2019, 6:47 p.m. UTC
framethread.c is put into libavutil, but is has to be included directly to
avoid creating avpriv functions.

Functionality should be identical, there is one slight difference: we close the
per-thread avcodec contexts in the main thread and not in the workers.

Signed-off-by: Marton Balint <cus@passwd.hu>
---
 libavcodec/frame_thread_encoder.c | 229 ++++++++++++++------------------------
 libavutil/framethread.c           | 207 ++++++++++++++++++++++++++++++++++
 2 files changed, 288 insertions(+), 148 deletions(-)
 create mode 100644 libavutil/framethread.c

Comments

Paul B Mahol April 7, 2019, 6:58 p.m. UTC | #1
On 4/7/19, Marton Balint <cus@passwd.hu> wrote:
> framethread.c is put into libavutil, but is has to be included directly to
> avoid creating avpriv functions.
>
> Functionality should be identical, there is one slight difference: we close
> the
> per-thread avcodec contexts in the main thread and not in the workers.
>

Why?
Marton Balint April 7, 2019, 7:04 p.m. UTC | #2
On Sun, 7 Apr 2019, Paul B Mahol wrote:

> On 4/7/19, Marton Balint <cus@passwd.hu> wrote:
>> framethread.c is put into libavutil, but is has to be included directly to
>> avoid creating avpriv functions.
>>
>> Functionality should be identical, there is one slight difference: we close
>> the
>> per-thread avcodec contexts in the main thread and not in the workers.
>>
>
> Why?

Factorization is good because threading becomes more readable and can be 
reused later in e.g. filter frame threading.

Per-thread contexts are closed and freed in the main thread because doing 
it in the worker would have required another callback in the thread 
context, and it seems pointless to do it in the workers because 
if you check the current code, avcodec_close is serialized using a mutex.

Regards,
Marton
James Almer April 7, 2019, 7:13 p.m. UTC | #3
On 4/7/2019 3:47 PM, Marton Balint wrote:
> framethread.c is put into libavutil, but is has to be included directly to
> avoid creating avpriv functions.

If the reason behind this factorization is sharing the code between
modules across several libraries and the function signatures are
unlikely to change, then using avpriv_ functions is justified.
The structs will be internal to libavutil, so the only thing tied to the
ABI will be the avpriv symbols.

Including the entire c file will result in unnecessary binary bloat once
this code is used for example in libavfilter.
Marton Balint April 7, 2019, 7:22 p.m. UTC | #4
On Sun, 7 Apr 2019, James Almer wrote:

> On 4/7/2019 3:47 PM, Marton Balint wrote:
>> framethread.c is put into libavutil, but is has to be included directly to
>> avoid creating avpriv functions.
>
> If the reason behind this factorization is sharing the code between
> modules across several libraries and the function signatures are
> unlikely to change, then using avpriv_ functions is justified.
> The structs will be internal to libavutil, so the only thing tied to the
> ABI will be the avpriv symbols.
>
> Including the entire c file will result in unnecessary binary bloat once
> this code is used for example in libavfilter.

Yeah, I agree. I just did not want to make it avpriv until we have at 
least a second component using it, because I cannot forsee if the API 
will be sufficent or not.

Regards,
Marton
James Almer April 7, 2019, 7:33 p.m. UTC | #5
On 4/7/2019 4:22 PM, Marton Balint wrote:
> 
> 
> On Sun, 7 Apr 2019, James Almer wrote:
> 
>> On 4/7/2019 3:47 PM, Marton Balint wrote:
>>> framethread.c is put into libavutil, but is has to be included
>>> directly to
>>> avoid creating avpriv functions.
>>
>> If the reason behind this factorization is sharing the code between
>> modules across several libraries and the function signatures are
>> unlikely to change, then using avpriv_ functions is justified.
>> The structs will be internal to libavutil, so the only thing tied to the
>> ABI will be the avpriv symbols.
>>
>> Including the entire c file will result in unnecessary binary bloat once
>> this code is used for example in libavfilter.
> 
> Yeah, I agree. I just did not want to make it avpriv until we have at
> least a second component using it, because I cannot forsee if the API
> will be sufficent or not.

Fair enough, guess it should be ok like this until it's needed
elsewhere, to avoid having to potentially live with useless symbols
until a major bump if they ultimately need to be changed. Although i
have to say it's kinda weird having a source file in the libavutil
folder that's not compiled into said library.

Wait to see if someone else comments.

> 
> Regards,
> Marton
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel@ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
> 
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
diff mbox

Patch

diff --git a/libavcodec/frame_thread_encoder.c b/libavcodec/frame_thread_encoder.c
index 55756c4c54..72b5d72803 100644
--- a/libavcodec/frame_thread_encoder.c
+++ b/libavcodec/frame_thread_encoder.c
@@ -22,7 +22,6 @@ 
 
 #include "frame_thread_encoder.h"
 
-#include "libavutil/fifo.h"
 #include "libavutil/avassert.h"
 #include "libavutil/imgutils.h"
 #include "libavutil/opt.h"
@@ -31,93 +30,67 @@ 
 #include "internal.h"
 #include "thread.h"
 
-#define MAX_THREADS 64
-#define BUFFER_SIZE (2*MAX_THREADS)
+#include "libavutil/framethread.c"
 
-typedef struct{
-    void *indata;
-    void *outdata;
-    int64_t return_code;
-    unsigned index;
-} Task;
-
-typedef struct{
+typedef struct {
+    AVCodecContext *thread_avctx[FRAMETHREAD_MAX_THREADS];
     AVCodecContext *parent_avctx;
     pthread_mutex_t buffer_mutex;
+    FrameThread *framethread;
+} EncodeThreadContext;
+
+typedef struct {
+    EncodeThreadContext *ctx;
+    AVFrame *frame;
+    AVPacket *packet;
+} EncodeTaskContext;
+
+static void encode_task_free(void *task_priv)
+{
+    EncodeTaskContext *task = task_priv;
+    av_frame_free(&task->frame);
+    av_packet_free(&task->packet);
+}
 
-    AVFifoBuffer *task_fifo;
-    pthread_mutex_t task_fifo_mutex;
-    pthread_cond_t task_fifo_cond;
-
-    Task finished_tasks[BUFFER_SIZE];
-    pthread_mutex_t finished_task_mutex;
-    pthread_cond_t finished_task_cond;
-
-    unsigned task_index;
-    unsigned finished_task_index;
-
-    pthread_t worker[MAX_THREADS];
-    atomic_int exit;
-} ThreadContext;
-
-static void * attribute_align_arg worker(void *v){
-    AVCodecContext *avctx = v;
-    ThreadContext *c = avctx->internal->frame_thread_encoder;
-    AVPacket *pkt = NULL;
-
-    while (!atomic_load(&c->exit)) {
-        int got_packet, ret;
-        AVFrame *frame;
-        Task task;
-
-        if(!pkt) pkt = av_packet_alloc();
-        if(!pkt) continue;
-        av_init_packet(pkt);
-
-        pthread_mutex_lock(&c->task_fifo_mutex);
-        while (av_fifo_size(c->task_fifo) <= 0 || atomic_load(&c->exit)) {
-            if (atomic_load(&c->exit)) {
-                pthread_mutex_unlock(&c->task_fifo_mutex);
-                goto end;
-            }
-            pthread_cond_wait(&c->task_fifo_cond, &c->task_fifo_mutex);
-        }
-        av_fifo_generic_read(c->task_fifo, &task, sizeof(task), NULL);
-        pthread_mutex_unlock(&c->task_fifo_mutex);
-        frame = task.indata;
+static int encode_task_func(void *task_priv, int thread_id)
+{
+    EncodeTaskContext *task = task_priv;
+    EncodeThreadContext *c = task->ctx;
+    AVPacket *pkt = av_packet_alloc();
+    int got_packet, ret;
 
-        ret = avcodec_encode_video2(avctx, pkt, frame, &got_packet);
+    if (!pkt) {
         pthread_mutex_lock(&c->buffer_mutex);
-        av_frame_unref(frame);
+        av_frame_unref(task->frame);
         pthread_mutex_unlock(&c->buffer_mutex);
-        av_frame_free(&frame);
-        if(got_packet) {
-            int ret2 = av_packet_make_refcounted(pkt);
-            if (ret >= 0 && ret2 < 0)
-                ret = ret2;
-        } else {
-            pkt->data = NULL;
-            pkt->size = 0;
-        }
-        pthread_mutex_lock(&c->finished_task_mutex);
-        c->finished_tasks[task.index].outdata = pkt; pkt = NULL;
-        c->finished_tasks[task.index].return_code = ret;
-        pthread_cond_signal(&c->finished_task_cond);
-        pthread_mutex_unlock(&c->finished_task_mutex);
+        av_frame_free(&task->frame);
+        return AVERROR(ENOMEM);
     }
-end:
-    av_free(pkt);
+
+    av_init_packet(pkt);
+
+    ret = avcodec_encode_video2(c->thread_avctx[thread_id], pkt, task->frame, &got_packet);
     pthread_mutex_lock(&c->buffer_mutex);
-    avcodec_close(avctx);
+    av_frame_unref(task->frame);
     pthread_mutex_unlock(&c->buffer_mutex);
-    av_freep(&avctx);
-    return NULL;
+    av_frame_free(&task->frame);
+    if(got_packet) {
+        int ret2 = av_packet_make_refcounted(pkt);
+        if (ret >= 0 && ret2 < 0)
+            ret = ret2;
+    } else {
+        pkt->data = NULL;
+        pkt->size = 0;
+    }
+    task->packet = pkt;
+
+    return ret;
 }
 
 int ff_frame_thread_encoder_init(AVCodecContext *avctx, AVDictionary *options){
     int i=0;
-    ThreadContext *c;
-
+    int ret;
+    EncodeThreadContext *c;
 
     if(   !(avctx->thread_type & FF_THREAD_FRAME)
        || !(avctx->codec->capabilities & AV_CODEC_CAP_INTRA_ONLY))
@@ -164,32 +137,29 @@  int ff_frame_thread_encoder_init(AVCodecContext *avctx, AVDictionary *options){
 
     if(!avctx->thread_count) {
         avctx->thread_count = av_cpu_count();
-        avctx->thread_count = FFMIN(avctx->thread_count, MAX_THREADS);
+        avctx->thread_count = FFMIN(avctx->thread_count, FRAMETHREAD_MAX_THREADS);
     }
 
     if(avctx->thread_count <= 1)
         return 0;
 
-    if(avctx->thread_count > MAX_THREADS)
+    if(avctx->thread_count > FRAMETHREAD_MAX_THREADS)
         return AVERROR(EINVAL);
 
     av_assert0(!avctx->internal->frame_thread_encoder);
-    c = avctx->internal->frame_thread_encoder = av_mallocz(sizeof(ThreadContext));
+    c = avctx->internal->frame_thread_encoder = av_mallocz(sizeof(EncodeThreadContext));
     if(!c)
         return AVERROR(ENOMEM);
 
-    c->parent_avctx = avctx;
+    ret = framethread_create(&c->framethread, encode_task_func, encode_task_free, avctx->thread_count);
+    if (ret < 0) {
+        av_freep(&avctx->internal->frame_thread_encoder);
+        return ret;
+    }
 
-    c->task_fifo = av_fifo_alloc_array(BUFFER_SIZE, sizeof(Task));
-    if(!c->task_fifo)
-        goto fail;
+    c->parent_avctx = avctx;
 
-    pthread_mutex_init(&c->task_fifo_mutex, NULL);
-    pthread_mutex_init(&c->finished_task_mutex, NULL);
     pthread_mutex_init(&c->buffer_mutex, NULL);
-    pthread_cond_init(&c->task_fifo_cond, NULL);
-    pthread_cond_init(&c->finished_task_cond, NULL);
-    atomic_init(&c->exit, 0);
 
     for(i=0; i<avctx->thread_count ; i++){
         AVDictionary *tmp = NULL;
@@ -223,9 +193,7 @@  int ff_frame_thread_encoder_init(AVCodecContext *avctx, AVDictionary *options){
         av_dict_free(&tmp);
         av_assert0(!thread_avctx->internal->frame_thread_encoder);
         thread_avctx->internal->frame_thread_encoder = c;
-        if(pthread_create(&c->worker[i], NULL, worker, thread_avctx)) {
-            goto fail;
-        }
+        c->thread_avctx[i] = thread_avctx;
     }
 
     avctx->active_thread_type = FF_THREAD_FRAME;
@@ -239,53 +207,27 @@  fail:
 }
 
 void ff_frame_thread_encoder_free(AVCodecContext *avctx){
-    int i;
-    ThreadContext *c= avctx->internal->frame_thread_encoder;
+    EncodeThreadContext *c= avctx->internal->frame_thread_encoder;
 
-    pthread_mutex_lock(&c->task_fifo_mutex);
-    atomic_store(&c->exit, 1);
-    pthread_cond_broadcast(&c->task_fifo_cond);
-    pthread_mutex_unlock(&c->task_fifo_mutex);
-
-    for (i=0; i<avctx->thread_count; i++) {
-         pthread_join(c->worker[i], NULL);
-    }
-
-    while (av_fifo_size(c->task_fifo) > 0) {
-        Task task;
-        AVFrame *frame;
-        av_fifo_generic_read(c->task_fifo, &task, sizeof(task), NULL);
-        frame = task.indata;
-        av_frame_free(&frame);
-        task.indata = NULL;
-    }
-
-    for (i=0; i<BUFFER_SIZE; i++) {
-        if (c->finished_tasks[i].outdata != NULL) {
-            AVPacket *pkt = c->finished_tasks[i].outdata;
-            av_packet_free(&pkt);
-            c->finished_tasks[i].outdata = NULL;
-        }
-    }
-
-    pthread_mutex_destroy(&c->task_fifo_mutex);
-    pthread_mutex_destroy(&c->finished_task_mutex);
+    framethread_free(&c->framethread);
     pthread_mutex_destroy(&c->buffer_mutex);
-    pthread_cond_destroy(&c->task_fifo_cond);
-    pthread_cond_destroy(&c->finished_task_cond);
-    av_fifo_freep(&c->task_fifo);
+    for (int i = 0; i < avctx->thread_count; i++) {
+        avcodec_close(c->thread_avctx[i]);
+        av_freep(&c->thread_avctx[i]);
+    }
     av_freep(&avctx->internal->frame_thread_encoder);
 }
 
 int ff_thread_video_encode_frame(AVCodecContext *avctx, AVPacket *pkt, const AVFrame *frame, int *got_packet_ptr){
-    ThreadContext *c = avctx->internal->frame_thread_encoder;
-    Task task;
+    EncodeThreadContext *c = avctx->internal->frame_thread_encoder;
+    EncodeTaskContext *t;
     int ret;
 
     av_assert1(!*got_packet_ptr);
 
     if(frame){
         AVFrame *new = av_frame_alloc();
+        EncodeTaskContext *task;
         if(!new)
             return AVERROR(ENOMEM);
         ret = av_frame_ref(new, frame);
@@ -293,35 +235,26 @@  int ff_thread_video_encode_frame(AVCodecContext *avctx, AVPacket *pkt, const AVF
             av_frame_free(&new);
             return ret;
         }
+        task = av_mallocz(sizeof(*task));
+        if (!task) {
+            av_frame_free(&new);
+            return AVERROR(ENOMEM);
+        }
+        task->ctx = c;
+        task->frame = new;
 
-        task.index = c->task_index;
-        task.indata = (void*)new;
-        pthread_mutex_lock(&c->task_fifo_mutex);
-        av_fifo_generic_write(c->task_fifo, &task, sizeof(task), NULL);
-        pthread_cond_signal(&c->task_fifo_cond);
-        pthread_mutex_unlock(&c->task_fifo_mutex);
-
-        c->task_index = (c->task_index+1) % BUFFER_SIZE;
+        framethread_submit_task(c->framethread, task);
     }
 
-    pthread_mutex_lock(&c->finished_task_mutex);
-    if (c->task_index == c->finished_task_index ||
-        (frame && !c->finished_tasks[c->finished_task_index].outdata &&
-         (c->task_index - c->finished_task_index) % BUFFER_SIZE <= avctx->thread_count)) {
-            pthread_mutex_unlock(&c->finished_task_mutex);
-            return 0;
-        }
+    t = framethread_wait_task(c->framethread, !frame, &ret);
+    if (!t)
+        return 0;
 
-    while (!c->finished_tasks[c->finished_task_index].outdata) {
-        pthread_cond_wait(&c->finished_task_cond, &c->finished_task_mutex);
-    }
-    task = c->finished_tasks[c->finished_task_index];
-    *pkt = *(AVPacket*)(task.outdata);
+    *pkt = *t->packet;
     if(pkt->data)
         *got_packet_ptr = 1;
-    av_freep(&c->finished_tasks[c->finished_task_index].outdata);
-    c->finished_task_index = (c->finished_task_index+1) % BUFFER_SIZE;
-    pthread_mutex_unlock(&c->finished_task_mutex);
+    av_freep(&t->packet);
+    av_freep(&t);
 
-    return task.return_code;
+    return ret;
 }
diff --git a/libavutil/framethread.c b/libavutil/framethread.c
new file mode 100644
index 0000000000..4cd94f4833
--- /dev/null
+++ b/libavutil/framethread.c
@@ -0,0 +1,207 @@ 
+/*
+ * Copyright (c) 2012 Michael Niedermayer <michaelni@gmx.at>
+ *
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <stdatomic.h>
+
+#include "libavutil/fifo.h"
+#include "libavutil/avassert.h"
+#include "libavutil/thread.h"
+
+#define FRAMETHREAD_MAX_THREADS 64
+#define BUFFER_SIZE (2*FRAMETHREAD_MAX_THREADS)
+
+typedef struct FrameThread FrameThread;
+
+typedef struct Task {
+    int64_t return_code;
+    unsigned index;
+    int finished;
+    void *priv;
+} Task;
+
+typedef struct Worker {
+    FrameThread *c;
+    int thread_id;
+    pthread_t worker;
+} Worker;
+
+typedef struct FrameThread {
+    AVFifoBuffer *task_fifo;
+    pthread_mutex_t task_fifo_mutex;
+    pthread_cond_t task_fifo_cond;
+
+    Task finished_tasks[BUFFER_SIZE];
+    pthread_mutex_t finished_task_mutex;
+    pthread_cond_t finished_task_cond;
+
+    Worker worker[FRAMETHREAD_MAX_THREADS];
+    unsigned task_index;
+    unsigned finished_task_index;
+
+    int (*task_func) (void *priv, int thread_id);
+    void (*task_free_func) (void *priv);
+    int thread_count;
+    atomic_int exit;
+} FrameThread;
+
+static void *attribute_align_arg thread_worker(void *w)
+{
+    Worker *worker = w;
+    FrameThread *c = worker->c;
+
+    while (!atomic_load(&c->exit)) {
+        Task task;
+
+        pthread_mutex_lock(&c->task_fifo_mutex);
+        while (av_fifo_size(c->task_fifo) <= 0 || atomic_load(&c->exit)) {
+            if (atomic_load(&c->exit)) {
+                pthread_mutex_unlock(&c->task_fifo_mutex);
+                goto end;
+            }
+            pthread_cond_wait(&c->task_fifo_cond, &c->task_fifo_mutex);
+        }
+        av_fifo_generic_read(c->task_fifo, &task, sizeof(task), NULL);
+        pthread_mutex_unlock(&c->task_fifo_mutex);
+        task.return_code = c->task_func(task.priv, worker->thread_id);
+        task.finished = 1;
+        pthread_mutex_lock(&c->finished_task_mutex);
+        c->finished_tasks[task.index] = task;
+        pthread_cond_signal(&c->finished_task_cond);
+        pthread_mutex_unlock(&c->finished_task_mutex);
+    }
+  end:
+    return NULL;
+}
+
+static void framethread_free(FrameThread ** pc)
+{
+    FrameThread *c = *pc;
+
+    if (!c)
+        return;
+
+    pthread_mutex_lock(&c->task_fifo_mutex);
+    atomic_store(&c->exit, 1);
+    pthread_cond_broadcast(&c->task_fifo_cond);
+    pthread_mutex_unlock(&c->task_fifo_mutex);
+
+    for (int i = 0; i < c->thread_count; i++)
+        pthread_join(c->worker[i].worker, NULL);
+
+    while (av_fifo_size(c->task_fifo) > 0) {
+        Task task;
+        av_fifo_generic_read(c->task_fifo, &task, sizeof(task), NULL);
+        c->task_free_func(task.priv);
+    }
+
+    for (int i = 0; i < BUFFER_SIZE; i++) {
+        if (c->finished_tasks[i].finished)
+            c->task_free_func(c->finished_tasks[i].priv);
+    }
+
+    pthread_mutex_destroy(&c->task_fifo_mutex);
+    pthread_mutex_destroy(&c->finished_task_mutex);
+    pthread_cond_destroy(&c->task_fifo_cond);
+    pthread_cond_destroy(&c->finished_task_cond);
+    av_fifo_freep(&c->task_fifo);
+
+    av_freep(pc);
+}
+
+static int framethread_create(FrameThread ** pctx,
+                              int (*task_func) (void *priv, int thread_id),
+                              void (*task_free_func) (void *priv), int nb_threads)
+{
+    FrameThread *c;
+
+    av_assert0(nb_threads >= 1);
+    av_assert0(nb_threads <= FRAMETHREAD_MAX_THREADS);
+
+    *pctx = c = av_mallocz(sizeof(*c));
+    if (!c)
+        return AVERROR(ENOMEM);
+
+    c->task_fifo = av_fifo_alloc_array(BUFFER_SIZE, sizeof(Task));
+    if (!c->task_fifo) {
+        av_freep(pctx);
+        return AVERROR(ENOMEM);
+    }
+
+    c->task_func = task_func;
+    c->task_free_func = task_free_func;
+
+    pthread_mutex_init(&c->task_fifo_mutex, NULL);
+    pthread_mutex_init(&c->finished_task_mutex, NULL);
+
+    pthread_cond_init(&c->task_fifo_cond, NULL);
+    pthread_cond_init(&c->finished_task_cond, NULL);
+    atomic_init(&c->exit, 0);
+
+    for (int i = 0; i < nb_threads; i++) {
+        c->worker[i].thread_id = i;
+        c->worker[i].c = c;
+        if (pthread_create(&c->worker[i].worker, NULL, thread_worker, &c->worker[i]))
+            goto fail;
+        c->thread_count++;
+    }
+
+    return 0;
+
+  fail:
+    framethread_free(pctx);
+    return AVERROR(ENOMEM);
+}
+
+static void framethread_submit_task(FrameThread * c, void *priv)
+{
+    Task task = { 0 };
+    task.priv = priv;
+    task.index = c->task_index;
+    pthread_mutex_lock(&c->task_fifo_mutex);
+    av_fifo_generic_write(c->task_fifo, &task, sizeof(task), NULL);
+    pthread_cond_signal(&c->task_fifo_cond);
+    pthread_mutex_unlock(&c->task_fifo_mutex);
+    c->task_index = (c->task_index + 1) % BUFFER_SIZE;
+}
+
+static void *framethread_wait_task(FrameThread * c, int flush, int *return_code)
+{
+    Task *task;
+    void *task_priv;
+
+    pthread_mutex_lock(&c->finished_task_mutex);
+    if (c->task_index == c->finished_task_index ||
+        (!flush && !c->finished_tasks[c->finished_task_index].finished &&
+         (c->task_index - c->finished_task_index) % BUFFER_SIZE <= c->thread_count)) {
+        pthread_mutex_unlock(&c->finished_task_mutex);
+        return NULL;
+    }
+    while (!c->finished_tasks[c->finished_task_index].finished) {
+        pthread_cond_wait(&c->finished_task_cond, &c->finished_task_mutex);
+    }
+    task = &c->finished_tasks[c->finished_task_index];
+    task->finished = 0;
+    *return_code = task->return_code;
+    task_priv = task->priv;
+
+    c->finished_task_index = (c->finished_task_index + 1) % BUFFER_SIZE;
+    pthread_mutex_unlock(&c->finished_task_mutex);
+    return task_priv;
+}