@@ -135,6 +135,7 @@ typedef struct FrameThreadContext {
* Decoder contexts that get assigned to frame threads.
*/
ChildDecoder *decoders;
+ unsigned nb_decoders;
PerThreadContext *threads; ///< The contexts for each thread.
PerThreadContext *prev_thread; ///< The last thread submit_packet() was called on.
@@ -150,6 +151,8 @@ typedef struct FrameThreadContext {
pthread_cond_t async_cond;
int async_lock;
+ const FFCodec *codec;
+
DecodedFrames df;
int result;
@@ -158,8 +161,9 @@ typedef struct FrameThreadContext {
*/
AVPacket *next_pkt;
- int next_decoding; ///< The next context to submit a packet to.
- int next_finished; ///< The next context to return output from.
+ int next_thread_submit;
+ int next_decoder;
+ int next_finished; ///< The next thread to return output from.
/* hwaccel state for thread-unsafe hwaccels is temporarily stored here in
* order to transfer its ownership to the next decoding thread without the
@@ -194,11 +198,11 @@ static void async_unlock(FrameThreadContext *fctx)
static void thread_set_name(PerThreadContext *p)
{
- AVCodecContext *avctx = p->avctx;
- int idx = p - p->parent->threads;
+ const FrameThreadContext *fctx = p->parent;
+ int idx = p - fctx->threads;
char name[16];
- snprintf(name, sizeof(name), "av:%.7s:df%d", avctx->codec->name, idx);
+ snprintf(name, sizeof(name), "av:%.7s:df%d", fctx->codec->p.name, idx);
ff_thread_setname(name);
}
@@ -259,13 +263,13 @@ static void decoded_frames_free(DecodedFrames *df)
static attribute_align_arg void *frame_worker_thread(void *arg)
{
PerThreadContext *p = arg;
- AVCodecContext *avctx = p->avctx;
- const FFCodec *codec = ffcodec(avctx->codec);
+ const FFCodec *codec = p->parent->codec;
thread_set_name(p);
pthread_mutex_lock(&p->mutex);
while (1) {
+ AVCodecContext *avctx;
int ret;
while (atomic_load(&p->state) == STATE_INPUT_READY && !p->die)
@@ -273,6 +277,8 @@ static attribute_align_arg void *frame_worker_thread(void *arg)
if (p->die) break;
+ // a decoder context was assigned to us by the main thread
+ avctx = p->avctx;
if (!codec->update_thread_context) {
ret = ff_thread_finish_setup(avctx);
if (ret < 0) {
@@ -524,16 +530,30 @@ static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx,
AVPacket *in_pkt)
{
FrameThreadContext *fctx = p->parent;
- ChildDecoder *cd = p->avctx->internal->thread_ctx;
PerThreadContext *prev_thread = fctx->prev_thread;
- const AVCodec *codec = p->avctx->codec;
+ ChildDecoder *cd;
int ret;
+ if (prev_thread) {
+ if (atomic_load(&prev_thread->state) == STATE_SETTING_UP) {
+ pthread_mutex_lock(&prev_thread->progress_mutex);
+ while (atomic_load(&prev_thread->state) == STATE_SETTING_UP)
+ pthread_cond_wait(&prev_thread->progress_cond, &prev_thread->progress_mutex);
+ pthread_mutex_unlock(&prev_thread->progress_mutex);
+ }
+
+ }
+ cd = &fctx->decoders[fctx->next_decoder];
+ fctx->next_decoder = (fctx->next_decoder + 1) % (user_avctx->thread_count + 1);
+
pthread_mutex_lock(&p->mutex);
av_packet_unref(p->avpkt);
av_packet_move_ref(p->avpkt, in_pkt);
+ p->avctx = cd->ctx;
+ cd->thread = p;
+
if (AVPACKET_IS_EMPTY(p->avpkt))
p->avctx->internal->draining = 1;
@@ -546,27 +566,6 @@ static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx,
(p->avctx->debug & FF_DEBUG_THREADS) != 0,
memory_order_relaxed);
- if (prev_thread) {
- if (atomic_load(&prev_thread->state) == STATE_SETTING_UP) {
- pthread_mutex_lock(&prev_thread->progress_mutex);
- while (atomic_load(&prev_thread->state) == STATE_SETTING_UP)
- pthread_cond_wait(&prev_thread->progress_cond, &prev_thread->progress_mutex);
- pthread_mutex_unlock(&prev_thread->progress_mutex);
- }
-
- /* codecs without delay might not be prepared to be called repeatedly here during
- * flushing (vp3/theora), and also don't need to be, since from this point on, they
- * will always return EOF anyway */
- if (!p->avctx->internal->draining ||
- (codec->capabilities & AV_CODEC_CAP_DELAY)) {
- ret = update_context_from_thread(p->avctx, prev_thread->avctx, 0);
- if (ret) {
- pthread_mutex_unlock(&p->mutex);
- return ret;
- }
- }
- }
-
/* transfer the stashed hwaccel state, if any */
av_assert0(!p->avctx->hwaccel || cd->hwaccel_threadsafe);
if (!cd->hwaccel_threadsafe) {
@@ -580,7 +579,7 @@ static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx,
pthread_mutex_unlock(&p->mutex);
fctx->prev_thread = p;
- fctx->next_decoding = (fctx->next_decoding + 1) % p->avctx->thread_count;
+ fctx->next_thread_submit = (fctx->next_thread_submit + 1) % user_avctx->thread_count;
return 0;
}
@@ -596,6 +595,7 @@ int ff_thread_receive_frame(AVCodecContext *avctx, AVFrame *frame)
/* submit packets to threads while there are no buffered results to return */
while (!fctx->df.nb_f && !fctx->result) {
+ ChildDecoder *cd;
PerThreadContext *p;
/* get a packet to be submitted to the next thread */
@@ -604,17 +604,18 @@ int ff_thread_receive_frame(AVCodecContext *avctx, AVFrame *frame)
if (ret < 0 && ret != AVERROR_EOF)
goto finish;
- ret = submit_packet(&fctx->threads[fctx->next_decoding], avctx,
+ ret = submit_packet(&fctx->threads[fctx->next_thread_submit], avctx,
fctx->next_pkt);
if (ret < 0)
goto finish;
/* do not return any frames until all threads have something to do */
- if (fctx->next_decoding != fctx->next_finished &&
+ if (fctx->next_thread_submit != fctx->next_finished &&
!avctx->internal->draining)
continue;
p = &fctx->threads[fctx->next_finished];
+ cd = p->avctx->internal->thread_ctx;
fctx->next_finished = (fctx->next_finished + 1) % avctx->thread_count;
if (atomic_load(&p->state) != STATE_INPUT_READY) {
@@ -629,6 +630,9 @@ int ff_thread_receive_frame(AVCodecContext *avctx, AVFrame *frame)
p->result = 0;
if (p->df.nb_f)
FFSWAP(DecodedFrames, fctx->df, p->df);
+
+ p->avctx = NULL;
+ cd->thread = NULL;
}
/* a thread may return multiple frames AND an error
@@ -696,18 +700,30 @@ void ff_thread_await_progress(const ThreadFrame *f, int n, int field)
int ff_thread_finish_setup(AVCodecContext *avctx)
{
+ FrameThreadContext *fctx;
ChildDecoder *cd;
PerThreadContext *p;
+ int ret = 0, err;
if (!(avctx->active_thread_type & FF_THREAD_FRAME))
return 0;
cd = avctx->internal->thread_ctx;
p = cd->thread;
+ fctx = cd->parent;
cd->hwaccel_threadsafe = avctx->hwaccel &&
(ffhwaccel(avctx->hwaccel)->caps_internal & HWACCEL_CAP_THREAD_SAFE);
+ // transfer decoder state to an idle context that will then be submitted to
+ // the next worker thread
+ if (!avctx->internal->draining ||
+ (avctx->codec->capabilities & AV_CODEC_CAP_DELAY)) {
+ err = update_context_from_thread(fctx->decoders[fctx->next_decoder].ctx, avctx, 0);
+ if (err < 0)
+ ret = err;
+ }
+
if (hwaccel_serial(avctx) && !p->hwaccel_serializing) {
pthread_mutex_lock(&p->parent->hwaccel_mutex);
p->hwaccel_serializing = 1;
@@ -742,7 +758,7 @@ int ff_thread_finish_setup(AVCodecContext *avctx)
pthread_cond_broadcast(&p->progress_cond);
pthread_mutex_unlock(&p->progress_mutex);
- return 0;
+ return ret;
}
/// Waits for all threads to finish.
@@ -806,10 +822,13 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count)
av_freep(&fctx->threads);
// clean up child decoders
- for (int i = 0; i < thread_count && fctx->decoders; i++) {
+ for (int i = 0; i < fctx->nb_decoders && fctx->decoders; i++) {
ChildDecoder *cd = &fctx->decoders[i];
AVCodecContext *ctx = cd->ctx;
+ if (!ctx)
+ continue;
+
if (ctx->internal) {
if (codec->close && cd->needs_close)
codec->close(ctx);
@@ -853,8 +872,7 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count)
av_freep(&avctx->internal->thread_ctx);
}
-static av_cold int dec_ctx_init(ChildDecoder *cd, int *decoders_to_free,
- AVCodecContext *avctx,
+static av_cold int dec_ctx_init(ChildDecoder *cd, AVCodecContext *avctx,
const FFCodec *codec, int first)
{
AVCodecContext *copy;
@@ -867,10 +885,6 @@ static av_cold int dec_ctx_init(ChildDecoder *cd, int *decoders_to_free,
copy->decoded_side_data = NULL;
copy->nb_decoded_side_data = 0;
- /* From now on, this ChildDecoder will be cleaned up by
- * ff_frame_thread_free in case of errors. */
- (*decoders_to_free)++;
-
cd->ctx = copy;
copy->internal = ff_decode_internal_alloc();
@@ -934,7 +948,7 @@ static av_cold int dec_ctx_init(ChildDecoder *cd, int *decoders_to_free,
}
static av_cold int init_thread(PerThreadContext *p, FrameThreadContext *fctx,
- AVCodecContext *avctx, AVCodecContext *child)
+ AVCodecContext *avctx)
{
int ret = 0;
@@ -948,7 +962,6 @@ static av_cold int init_thread(PerThreadContext *p, FrameThreadContext *fctx,
atomic_init(&p->state, STATE_INPUT_READY);
p->parent = fctx;
- p->avctx = child;
ret = ff_pthread_init(p, per_thread_offsets);
if (ret < 0)
@@ -958,7 +971,7 @@ static av_cold int init_thread(PerThreadContext *p, FrameThreadContext *fctx,
if (!p->avpkt)
return AVERROR(ENOMEM);
- atomic_init(&p->debug_threads, (child->debug & FF_DEBUG_THREADS) != 0);
+ atomic_init(&p->debug_threads, (avctx->debug & FF_DEBUG_THREADS) != 0);
ret = AVERROR(pthread_create(&p->thread, NULL, frame_worker_thread, p));
if (ret < 0)
@@ -973,7 +986,7 @@ int ff_frame_thread_init(AVCodecContext *avctx)
int thread_count = avctx->thread_count;
const FFCodec *codec = ffcodec(avctx->codec);
FrameThreadContext *fctx;
- int err, decoders_to_free = 0;
+ int err;
if (!thread_count) {
int nb_cpus = av_cpu_count();
@@ -1004,24 +1017,28 @@ int ff_frame_thread_init(AVCodecContext *avctx)
if (!fctx->next_pkt)
return AVERROR(ENOMEM);
+ fctx->codec = codec;
+
fctx->async_lock = 1;
if (codec->p.type == AVMEDIA_TYPE_VIDEO)
avctx->delay = avctx->thread_count - 1;
- fctx->decoders = av_calloc(thread_count, sizeof(*fctx->decoders));
+ fctx->nb_decoders = thread_count + 1;
+ fctx->decoders = av_calloc(fctx->nb_decoders, sizeof(*fctx->decoders));
if (!fctx->decoders) {
+ fctx->nb_decoders = 0;
err = AVERROR(ENOMEM);
goto error;
}
- for (; decoders_to_free < thread_count; ) {
- ChildDecoder *cd = &fctx->decoders[decoders_to_free];
- int first = !decoders_to_free;
+ for (int i = 0; i < fctx->nb_decoders; i++) {
+ ChildDecoder *cd = &fctx->decoders[i];
+ int first = !i;
cd->parent = fctx;
- err = dec_ctx_init(cd, &decoders_to_free, avctx, codec, first);
+ err = dec_ctx_init(cd, avctx, codec, first);
if (err < 0)
goto error;
}
@@ -1035,9 +1052,7 @@ int ff_frame_thread_init(AVCodecContext *avctx)
for (int i = 0; i < thread_count; i++) {
PerThreadContext *p = &fctx->threads[i];
- fctx->decoders[i].thread = p;
-
- err = init_thread(p, fctx, avctx, fctx->decoders[i].ctx);
+ err = init_thread(p, fctx, avctx);
if (err < 0)
goto error;
}
@@ -1045,7 +1060,7 @@ int ff_frame_thread_init(AVCodecContext *avctx)
return 0;
error:
- ff_frame_thread_free(avctx, decoders_to_free);
+ ff_frame_thread_free(avctx, thread_count);
return err;
}
@@ -1056,12 +1071,12 @@ void ff_thread_flush(AVCodecContext *avctx)
if (!fctx) return;
park_frame_worker_threads(fctx, avctx->thread_count);
- if (fctx->prev_thread) {
- if (fctx->prev_thread != &fctx->threads[0])
- update_context_from_thread(fctx->threads[0].avctx, fctx->prev_thread->avctx, 0);
- }
+ if (fctx->prev_thread &&
+ fctx->prev_thread->avctx != fctx->decoders[0].ctx)
+ update_context_from_thread(fctx->decoders[0].ctx, fctx->prev_thread->avctx, 0);
- fctx->next_decoding = fctx->next_finished = 0;
+ fctx->next_thread_submit = fctx->next_finished = 0;
+ fctx->next_decoder = 0;
fctx->prev_thread = NULL;
decoded_frames_flush(&fctx->df);
@@ -1072,10 +1087,15 @@ void ff_thread_flush(AVCodecContext *avctx)
decoded_frames_flush(&p->df);
p->result = 0;
+ p->avctx = NULL;
}
- for (int i = 0; i < avctx->thread_count; i++)
- avcodec_flush_buffers(fctx->decoders[i].ctx);
+ for (int i = 0; i < fctx->nb_decoders; i++) {
+ ChildDecoder *cd = &fctx->decoders[i];
+
+ cd->thread = NULL;
+ avcodec_flush_buffers(cd->ctx);
+ }
}
int ff_thread_can_start_frame(AVCodecContext *avctx)