@@ -381,10 +381,13 @@ void avcodec_flush_buffers(AVCodecContext *avctx)
avci->draining = 0;
avci->draining_done = 0;
- av_frame_unref(avci->buffer_frame);
- av_packet_unref(avci->buffer_pkt);
+ if (avci->buffer_frame)
+ av_frame_unref(avci->buffer_frame);
+ if (avci->buffer_pkt)
+ av_packet_unref(avci->buffer_pkt);
- if (HAVE_THREADS && avctx->active_thread_type & FF_THREAD_FRAME)
+ if (HAVE_THREADS && avctx->active_thread_type & FF_THREAD_FRAME &&
+ !avci->is_frame_mt)
ff_thread_flush(avctx);
else if (ffcodec(avctx->codec)->flush)
ffcodec(avctx->codec)->flush(avctx);
@@ -84,16 +84,23 @@ void ff_thread_free(struct AVCodecContext *s);
void ff_thread_flush(struct AVCodecContext *avctx);
/**
- * Submit a new frame to a decoding thread.
- * Returns the next available frame in picture. *got_picture_ptr
- * will be 0 if none is available.
- * The return value on success is the size of the consumed packet for
- * compatibility with FFCodec.decode. This means the decoder
- * has to consume the full packet.
+ * Submit available packets for decoding to worker threads, return a
+ * decoded frame if available. Returns AVERROR(EAGAIN) if none is available.
*
- * Parameters are the same as FFCodec.decode.
+ * Parameters are the same as FFCodec.receive_frame.
*/
-int ff_thread_decode_frame(struct AVCodecContext *avctx, struct AVFrame *frame,
- int *got_picture_ptr, struct AVPacket *avpkt);
+int ff_thread_receive_frame(AVCodecContext *avctx, AVFrame *frame);
+
+/**
+ * Do the actual decoding and obtain a decoded frame from the decoder, if
+ * available. When frame threading is used, this is invoked by the worker
+ * threads, otherwise by the top layer directly.
+ */
+int ff_decode_receive_frame_internal(AVCodecContext *avctx, AVFrame *frame);
+
+/**
+ * Get a packet for decoding. This gets invoked by the worker threads.
+ */
+int ff_thread_get_packet(AVCodecContext *avctx, AVPacket *pkt);
#endif // AVCODEC_AVCODEC_INTERNAL_H
@@ -207,6 +207,11 @@ fail:
return ret;
}
+#if !HAVE_THREADS
+#define ff_thread_get_packet(avctx, pkt) (AVERROR_BUG)
+#define ff_thread_receive_frame(avctx, frame) (AVERROR_BUG)
+#endif
+
static int decode_get_packet(AVCodecContext *avctx, AVPacket *pkt)
{
AVCodecInternal *avci = avctx->internal;
@@ -240,6 +245,13 @@ int ff_decode_get_packet(AVCodecContext *avctx, AVPacket *pkt)
if (avci->draining)
return AVERROR_EOF;
+ /* If we are a worker thread, get the next packet from the threading
+ * context. Otherwise we are the main (user-facing) context, so we get the
+ * next packet from the input filterchain.
+ */
+ if (avctx->internal->is_frame_mt)
+ return ff_thread_get_packet(avctx, pkt);
+
while (1) {
int ret = decode_get_packet(avctx, pkt);
if (ret == AVERROR(EAGAIN) &&
@@ -413,15 +425,11 @@ static inline int decode_simple_internal(AVCodecContext *avctx, AVFrame *frame,
return AVERROR_EOF;
if (!pkt->data &&
- !(avctx->codec->capabilities & AV_CODEC_CAP_DELAY ||
- avctx->active_thread_type & FF_THREAD_FRAME))
+ !(avctx->codec->capabilities & AV_CODEC_CAP_DELAY))
return AVERROR_EOF;
got_frame = 0;
- if (HAVE_THREADS && avctx->active_thread_type & FF_THREAD_FRAME) {
- consumed = ff_thread_decode_frame(avctx, frame, &got_frame, pkt);
- } else {
frame->pict_type = dc->initial_pict_type;
frame->flags |= dc->intra_only_flag;
consumed = codec->cb.decode(avctx, frame, &got_frame, pkt);
@@ -436,7 +444,6 @@ FF_DISABLE_DEPRECATION_WARNINGS
FF_ENABLE_DEPRECATION_WARNINGS
#endif
}
- }
emms_c();
if (avctx->codec->type == AVMEDIA_TYPE_VIDEO) {
@@ -603,12 +610,12 @@ static int decode_simple_receive_frame(AVCodecContext *avctx, AVFrame *frame)
return 0;
}
-static int decode_receive_frame_internal(AVCodecContext *avctx, AVFrame *frame)
+int ff_decode_receive_frame_internal(AVCodecContext *avctx, AVFrame *frame)
{
AVCodecInternal *avci = avctx->internal;
DecodeContext *dc = decode_ctx(avci);
const FFCodec *const codec = ffcodec(avctx->codec);
- int ret, ok;
+ int ret;
av_assert0(!frame->buf[0]);
@@ -636,6 +643,20 @@ static int decode_receive_frame_internal(AVCodecContext *avctx, AVFrame *frame)
if (ret == AVERROR_EOF)
avci->draining_done = 1;
+ return ret;
+}
+
+static int decode_receive_frame_internal(AVCodecContext *avctx, AVFrame *frame)
+{
+ AVCodecInternal *avci = avctx->internal;
+ DecodeContext *dc = decode_ctx(avci);
+ int ret, ok;
+
+ if (avctx->active_thread_type & FF_THREAD_FRAME)
+ ret = ff_thread_receive_frame(avctx, frame);
+ else
+ ret = ff_decode_receive_frame_internal(avctx, frame);
+
/* preserve ret */
ok = detect_colorspace(avctx, frame);
if (ok < 0) {
@@ -2151,7 +2172,8 @@ void ff_decode_flush_buffers(AVCodecContext *avctx)
dc->pts_correction_last_pts =
dc->pts_correction_last_dts = INT64_MIN;
- av_bsf_flush(avci->bsf);
+ if (avci->bsf)
+ av_bsf_flush(avci->bsf);
dc->nb_draining_errors = 0;
dc->draining_started = 0;
@@ -53,6 +53,13 @@ typedef struct AVCodecInternal {
*/
int is_copy;
+ /**
+ * This field is set to 1 when frame threading is being used and the parent
+ * AVCodecContext of this AVCodecInternal is a worker-thread context (i.e.
+ * one of those actually doing the decoding), 0 otherwise.
+ */
+ int is_frame_mt;
+
/**
* Audio encoders can set this flag during init to indicate that they
* want the small last frame to be padded to a multiple of pad_samples.
@@ -32,6 +32,7 @@
#include "hwaccel_internal.h"
#include "hwconfig.h"
#include "internal.h"
+#include "packet_internal.h"
#include "pthread_internal.h"
#include "refstruct.h"
#include "thread.h"
@@ -64,6 +65,12 @@ enum {
INITIALIZED, ///< Thread has been properly set up
};
+typedef struct DecodedFrames {
+ AVFrame **f;
+ size_t nb_f;
+ size_t nb_f_allocated;
+} DecodedFrames;
+
typedef struct ThreadFrameProgress {
atomic_int progress[2];
} ThreadFrameProgress;
@@ -88,8 +95,10 @@ typedef struct PerThreadContext {
AVPacket *avpkt; ///< Input packet (for decoding) or output (for encoding).
- AVFrame *frame; ///< Output frame (for decoding) or input (for encoding).
- int got_frame; ///< The output of got_picture_ptr from the last avcodec_decode_video() call.
+ /**
+ * Decoded frames from a single decode iteration.
+ */
+ DecodedFrames df;
int result; ///< The result of the last codec decode/encode() call.
atomic_int state;
@@ -130,14 +139,17 @@ typedef struct FrameThreadContext {
pthread_cond_t async_cond;
int async_lock;
+ DecodedFrames df;
+ int result;
+
+ /**
+ * Packet to be submitted to the next thread for decoding.
+ */
+ AVPacket *next_pkt;
+
int next_decoding; ///< The next context to submit a packet to.
int next_finished; ///< The next context to return output from.
- int delaying; /**<
- * Set for the first N packets, where N is the number of threads.
- * While it is set, ff_thread_en/decode_frame won't return any results.
- */
-
/* hwaccel state for thread-unsafe hwaccels is temporarily stored here in
* order to transfer its ownership to the next decoding thread without the
* need for extra synchronization */
@@ -180,6 +192,52 @@ static void thread_set_name(PerThreadContext *p)
ff_thread_setname(name);
}
+// get a free frame to decode into
+static AVFrame *decoded_frames_get_free(DecodedFrames *df)
+{
+ if (df->nb_f == df->nb_f_allocated) {
+ AVFrame **tmp = av_realloc_array(df->f, df->nb_f + 1,
+ sizeof(*df->f));
+ if (!tmp)
+ return NULL;
+ df->f = tmp;
+
+ df->f[df->nb_f] = av_frame_alloc();
+ if (!df->f[df->nb_f])
+ return NULL;
+
+ df->nb_f_allocated++;
+ }
+
+ av_assert0(!df->f[df->nb_f]->buf[0]);
+
+ return df->f[df->nb_f];
+}
+
+static void decoded_frames_pop(DecodedFrames *df, AVFrame *dst)
+{
+ AVFrame *tmp_frame = df->f[0];
+ av_frame_move_ref(dst, tmp_frame);
+ memmove(df->f, df->f + 1, (df->nb_f - 1) * sizeof(*df->f));
+ df->f[--df->nb_f] = tmp_frame;
+}
+
+static void decoded_frames_flush(DecodedFrames *df)
+{
+ for (size_t i = 0; i < df->nb_f; i++)
+ av_frame_unref(df->f[i]);
+ df->nb_f = 0;
+}
+
+static void decoded_frames_free(DecodedFrames *df)
+{
+ for (size_t i = 0; i < df->nb_f_allocated; i++)
+ av_frame_free(&df->f[i]);
+ av_freep(&df->f);
+ df->nb_f = 0;
+ df->nb_f_allocated = 0;
+}
+
/**
* Codec worker thread.
*
@@ -197,6 +255,8 @@ static attribute_align_arg void *frame_worker_thread(void *arg)
pthread_mutex_lock(&p->mutex);
while (1) {
+ int ret;
+
while (atomic_load(&p->state) == STATE_INPUT_READY && !p->die)
pthread_cond_wait(&p->input_cond, &p->mutex);
@@ -220,18 +280,31 @@ static attribute_align_arg void *frame_worker_thread(void *arg)
p->hwaccel_serializing = 1;
}
- av_frame_unref(p->frame);
- p->got_frame = 0;
- p->frame->pict_type = p->initial_pict_type;
- p->frame->flags |= p->intra_only_flag;
- p->result = codec->cb.decode(avctx, p->frame, &p->got_frame, p->avpkt);
+ ret = 0;
+ while (ret >= 0) {
+ AVFrame *frame;
- if ((p->result < 0 || !p->got_frame) && p->frame->buf[0])
- av_frame_unref(p->frame);
+ /* get the frame which will store the output */
+ frame = decoded_frames_get_free(&p->df);
+ if (!frame) {
+ p->result = AVERROR(ENOMEM);
+ goto alloc_fail;
+ }
+
+ /* do the actual decoding */
+ ret = ff_decode_receive_frame_internal(avctx, frame);
+ if (ret == 0)
+ p->df.nb_f++;
+ else if (ret < 0 && frame->buf[0])
+ av_frame_unref(frame);
+
+ p->result = (ret == AVERROR(EAGAIN)) ? 0 : ret;
+ }
if (atomic_load(&p->state) == STATE_SETTING_UP)
ff_thread_finish_setup(avctx);
+alloc_fail:
if (p->hwaccel_serializing) {
/* wipe hwaccel state for thread-unsafe hwaccels to avoid stale
* pointers lying around;
@@ -426,18 +499,21 @@ static int update_context_from_user(AVCodecContext *dst, const AVCodecContext *s
}
static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx,
- AVPacket *avpkt)
+ AVPacket *in_pkt)
{
FrameThreadContext *fctx = p->parent;
PerThreadContext *prev_thread = fctx->prev_thread;
const AVCodec *codec = p->avctx->codec;
int ret;
- if (!avpkt->size && !(codec->capabilities & AV_CODEC_CAP_DELAY))
- return 0;
-
pthread_mutex_lock(&p->mutex);
+ av_packet_unref(p->avpkt);
+ av_packet_move_ref(p->avpkt, in_pkt);
+
+ if (AVPACKET_IS_EMPTY(p->avpkt))
+ p->avctx->internal->draining = 1;
+
ret = update_context_from_user(p->avctx, user_avctx);
if (ret) {
pthread_mutex_unlock(&p->mutex);
@@ -448,7 +524,6 @@ static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx,
memory_order_relaxed);
if (prev_thread) {
- int err;
if (atomic_load(&prev_thread->state) == STATE_SETTING_UP) {
pthread_mutex_lock(&prev_thread->progress_mutex);
while (atomic_load(&prev_thread->state) == STATE_SETTING_UP)
@@ -456,10 +531,16 @@ static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx,
pthread_mutex_unlock(&prev_thread->progress_mutex);
}
- err = update_context_from_thread(p->avctx, prev_thread->avctx, 0);
- if (err) {
- pthread_mutex_unlock(&p->mutex);
- return err;
+ /* codecs without delay might not be prepared to be called repeatedly here during
+ * flushing (vp3/theora), and also don't need to be, since from this point on, they
+ * will always return EOF anyway */
+ if (!p->avctx->internal->draining ||
+ (codec->capabilities & AV_CODEC_CAP_DELAY)) {
+ ret = update_context_from_thread(p->avctx, prev_thread->avctx, 0);
+ if (ret) {
+ pthread_mutex_unlock(&p->mutex);
+ return ret;
+ }
}
}
@@ -471,70 +552,47 @@ static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx,
FFSWAP(void*, p->avctx->internal->hwaccel_priv_data, fctx->stash_hwaccel_priv);
}
- av_packet_unref(p->avpkt);
- ret = av_packet_ref(p->avpkt, avpkt);
- if (ret < 0) {
- pthread_mutex_unlock(&p->mutex);
- av_log(p->avctx, AV_LOG_ERROR, "av_packet_ref() failed in submit_packet()\n");
- return ret;
- }
-
atomic_store(&p->state, STATE_SETTING_UP);
pthread_cond_signal(&p->input_cond);
pthread_mutex_unlock(&p->mutex);
fctx->prev_thread = p;
- fctx->next_decoding++;
+ fctx->next_decoding = (fctx->next_decoding + 1) % p->avctx->thread_count;
return 0;
}
-int ff_thread_decode_frame(AVCodecContext *avctx,
- AVFrame *picture, int *got_picture_ptr,
- AVPacket *avpkt)
+int ff_thread_receive_frame(AVCodecContext *avctx, AVFrame *frame)
{
FrameThreadContext *fctx = avctx->internal->thread_ctx;
- int finished = fctx->next_finished;
- PerThreadContext *p;
- int err;
+ int ret = 0;
/* release the async lock, permitting blocked hwaccel threads to
* go forward while we are in this function */
async_unlock(fctx);
- /*
- * Submit a packet to the next decoding thread.
- */
+ /* submit packets to threads while there are no buffered results to return */
+ while (!fctx->df.nb_f && !fctx->result) {
+ PerThreadContext *p;
- p = &fctx->threads[fctx->next_decoding];
- err = submit_packet(p, avctx, avpkt);
- if (err)
- goto finish;
-
- /*
- * If we're still receiving the initial packets, don't return a frame.
- */
-
- if (fctx->next_decoding > (avctx->thread_count-1-(avctx->codec_id == AV_CODEC_ID_FFV1)))
- fctx->delaying = 0;
-
- if (fctx->delaying) {
- *got_picture_ptr=0;
- if (avpkt->size) {
- err = avpkt->size;
+ /* get a packet to be submitted to the next thread */
+ av_packet_unref(fctx->next_pkt);
+ ret = ff_decode_get_packet(avctx, fctx->next_pkt);
+ if (ret < 0 && ret != AVERROR_EOF)
goto finish;
- }
- }
- /*
- * Return the next available frame from the oldest thread.
- * If we're at the end of the stream, then we have to skip threads that
- * didn't output a frame/error, because we don't want to accidentally signal
- * EOF (avpkt->size == 0 && *got_picture_ptr == 0 && err >= 0).
- */
+ ret = submit_packet(&fctx->threads[fctx->next_decoding], avctx,
+ fctx->next_pkt);
+ if (ret < 0)
+ goto finish;
- do {
- p = &fctx->threads[finished++];
+ /* do not return any frames until all threads have something to do */
+ if (fctx->next_decoding != fctx->next_finished &&
+ !avctx->internal->draining)
+ continue;
+
+ p = &fctx->threads[fctx->next_finished];
+ fctx->next_finished = (fctx->next_finished + 1) % avctx->thread_count;
if (atomic_load(&p->state) != STATE_INPUT_READY) {
pthread_mutex_lock(&p->progress_mutex);
@@ -543,35 +601,26 @@ int ff_thread_decode_frame(AVCodecContext *avctx,
pthread_mutex_unlock(&p->progress_mutex);
}
- av_frame_move_ref(picture, p->frame);
- *got_picture_ptr = p->got_frame;
- picture->pkt_dts = p->avpkt->dts;
- err = p->result;
+ update_context_from_thread(avctx, p->avctx, 1);
+ fctx->result = p->result;
+ p->result = 0;
+ if (p->df.nb_f)
+ FFSWAP(DecodedFrames, fctx->df, p->df);
+ }
- /*
- * A later call with avkpt->size == 0 may loop over all threads,
- * including this one, searching for a frame/error to return before being
- * stopped by the "finished != fctx->next_finished" condition.
- * Make sure we don't mistakenly return the same frame/error again.
- */
- p->got_frame = 0;
- p->result = 0;
+ /* a thread may return multiple frames AND an error
+ * we first return all the frames, then the error */
+ if (fctx->df.nb_f) {
+ decoded_frames_pop(&fctx->df, frame);
+ ret = 0;
+ } else {
+ ret = fctx->result;
+ fctx->result = 0;
+ }
- if (finished >= avctx->thread_count) finished = 0;
- } while (!avpkt->size && !*got_picture_ptr && err >= 0 && finished != fctx->next_finished);
-
- update_context_from_thread(avctx, p->avctx, 1);
-
- if (fctx->next_decoding >= avctx->thread_count) fctx->next_decoding = 0;
-
- fctx->next_finished = finished;
-
- /* return the size of the consumed packet if no error occurred */
- if (err >= 0)
- err = avpkt->size;
finish:
async_lock(fctx);
- return err;
+ return ret;
}
void ff_thread_report_progress(ThreadFrame *f, int n, int field)
@@ -679,7 +728,6 @@ static void park_frame_worker_threads(FrameThreadContext *fctx, int thread_count
pthread_cond_wait(&p->output_cond, &p->progress_mutex);
pthread_mutex_unlock(&p->progress_mutex);
}
- p->got_frame = 0;
}
async_lock(fctx);
@@ -732,6 +780,7 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count)
}
ff_refstruct_unref(&ctx->internal->pool);
+ av_packet_free(&ctx->internal->in_pkt);
av_packet_free(&ctx->internal->last_pkt_props);
av_freep(&ctx->internal);
av_buffer_unref(&ctx->hw_frames_ctx);
@@ -739,7 +788,7 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count)
&ctx->nb_decoded_side_data);
}
- av_frame_free(&p->frame);
+ decoded_frames_free(&p->df);
ff_pthread_free(p, per_thread_offsets);
av_packet_free(&p->avpkt);
@@ -747,6 +796,9 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count)
av_freep(&p->avctx);
}
+ decoded_frames_free(&fctx->df);
+ av_packet_free(&fctx->next_pkt);
+
av_freep(&fctx->threads);
ff_pthread_free(fctx, thread_ctx_offsets);
@@ -815,13 +867,17 @@ static av_cold int init_thread(PerThreadContext *p, int *threads_to_free,
if (err < 0)
return err;
- if (!(p->frame = av_frame_alloc()) ||
- !(p->avpkt = av_packet_alloc()))
+ if (!(p->avpkt = av_packet_alloc()))
return AVERROR(ENOMEM);
+ copy->internal->is_frame_mt = 1;
if (!first)
copy->internal->is_copy = 1;
+ copy->internal->in_pkt = av_packet_alloc();
+ if (!copy->internal->in_pkt)
+ return AVERROR(ENOMEM);
+
copy->internal->last_pkt_props = av_packet_alloc();
if (!copy->internal->last_pkt_props)
return AVERROR(ENOMEM);
@@ -891,8 +947,11 @@ int ff_frame_thread_init(AVCodecContext *avctx)
return err;
}
+ fctx->next_pkt = av_packet_alloc();
+ if (!fctx->next_pkt)
+ return AVERROR(ENOMEM);
+
fctx->async_lock = 1;
- fctx->delaying = 1;
if (codec->p.type == AVMEDIA_TYPE_VIDEO)
avctx->delay = avctx->thread_count - 1;
@@ -933,17 +992,18 @@ void ff_thread_flush(AVCodecContext *avctx)
}
fctx->next_decoding = fctx->next_finished = 0;
- fctx->delaying = 1;
fctx->prev_thread = NULL;
+
+ decoded_frames_flush(&fctx->df);
+ fctx->result = 0;
+
for (i = 0; i < avctx->thread_count; i++) {
PerThreadContext *p = &fctx->threads[i];
- // Make sure decode flush calls with size=0 won't return old frames
- p->got_frame = 0;
- av_frame_unref(p->frame);
+
+ decoded_frames_flush(&p->df);
p->result = 0;
- if (ffcodec(avctx->codec)->flush)
- ffcodec(avctx->codec)->flush(p->avctx);
+ avcodec_flush_buffers(p->avctx);
}
}
@@ -1039,3 +1099,15 @@ enum ThreadingStatus ff_thread_sync_ref(AVCodecContext *avctx, size_t offset)
return FF_THREAD_IS_COPY;
}
+
+int ff_thread_get_packet(AVCodecContext *avctx, AVPacket *pkt)
+{
+ PerThreadContext *p = avctx->internal->thread_ctx;
+
+ if (!AVPACKET_IS_EMPTY(p->avpkt)) {
+ av_packet_move_ref(pkt, p->avpkt);
+ return 0;
+ }
+
+ return avctx->internal->draining ? AVERROR_EOF : AVERROR(EAGAIN);
+}