@@ -728,6 +728,46 @@ cleanup:
return ret;
}
+static void subtitle_free(void *opaque, uint8_t *data)
+{
+ AVSubtitle *sub = (AVSubtitle*)data;
+ avsubtitle_free(sub);
+ av_free(sub);
+}
+
+int subtitle_wrap_frame(AVFrame *frame, AVSubtitle *subtitle, int copy)
+{
+ AVBufferRef *buf;
+ AVSubtitle *sub;
+ int ret;
+
+ if (copy) {
+ sub = av_mallocz(sizeof(*sub));
+ ret = sub ? copy_av_subtitle(sub, subtitle) : AVERROR(ENOMEM);
+ if (ret < 0) {
+ av_freep(&sub);
+ return ret;
+ }
+ } else {
+ sub = av_memdup(subtitle, sizeof(*subtitle));
+ if (!sub)
+ return AVERROR(ENOMEM);
+ memset(subtitle, 0, sizeof(*subtitle));
+ }
+
+ buf = av_buffer_create((uint8_t*)sub, sizeof(*sub),
+ subtitle_free, NULL, 0);
+ if (!buf) {
+ avsubtitle_free(sub);
+ av_freep(&sub);
+ return AVERROR(ENOMEM);
+ }
+
+ frame->buf[0] = buf;
+
+ return 0;
+}
+
static int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts)
{
int ret = AVERROR_BUG;
@@ -1038,30 +1078,11 @@ static void decode_flush(InputFile *ifile)
{
for (int i = 0; i < ifile->nb_streams; i++) {
InputStream *ist = ifile->streams[i];
- int ret;
- if (ist->discard)
+ if (ist->discard || !ist->decoding_needed)
continue;
- do {
- ret = process_input_packet(ist, NULL, 1);
- } while (ret > 0);
-
- if (ist->decoding_needed) {
- /* report last frame duration to the demuxer thread */
- if (ist->par->codec_type == AVMEDIA_TYPE_AUDIO) {
- LastFrameDuration dur;
-
- dur.stream_idx = i;
- dur.duration = av_rescale_q(ist->nb_samples,
- (AVRational){ 1, ist->dec_ctx->sample_rate},
- ist->st->time_base);
-
- av_thread_message_queue_send(ifile->audio_duration_queue, &dur, 0);
- }
-
- avcodec_flush_buffers(ist->dec_ctx);
- }
+ dec_packet(ist, NULL, 1);
}
}
@@ -730,6 +730,7 @@ int init_simple_filtergraph(InputStream *ist, OutputStream *ost,
int init_complex_filtergraph(FilterGraph *fg);
int copy_av_subtitle(AVSubtitle *dst, const AVSubtitle *src);
+int subtitle_wrap_frame(AVFrame *frame, AVSubtitle *subtitle, int copy);
/**
* Get our axiliary frame data attached to the frame, allocating it
@@ -941,4 +942,14 @@ extern const char * const opt_name_codec_tags[];
extern const char * const opt_name_frame_rates[];
extern const char * const opt_name_top_field_first[];
+static inline void pkt_move(void *dst, void *src)
+{
+ av_packet_move_ref(dst, src);
+}
+
+static inline void frame_move(void *dst, void *src)
+{
+ av_frame_move_ref(dst, src);
+}
+
#endif /* FFTOOLS_FFMPEG_H */
@@ -30,6 +30,7 @@
#include "libavfilter/buffersrc.h"
#include "ffmpeg.h"
+#include "thread_queue.h"
struct Decoder {
AVFrame *frame;
@@ -45,8 +46,50 @@ struct Decoder {
AVRational last_frame_tb;
int64_t last_filter_in_rescale_delta;
int last_frame_sample_rate;
+
+ pthread_t thread;
+ /**
+ * Queue for sending coded packets from the main thread to
+ * the decoder thread.
+ *
+ * An empty packet is sent to flush the decoder without terminating
+ * decoding.
+ */
+ ThreadQueue *queue_in;
+ /**
+ * Queue for sending decoded frames from the decoder thread
+ * to the main thread.
+ *
+ * An empty frame is sent to signal that a single packet has been fully
+ * processed.
+ */
+ ThreadQueue *queue_out;
};
+// data that is local to the decoder thread and not visible outside of it
+typedef struct DecThreadContext {
+ AVFrame *frame;
+ AVPacket *pkt;
+} DecThreadContext;
+
+static int dec_thread_stop(Decoder *d)
+{
+ void *ret;
+
+ if (!d->queue_in)
+ return 0;
+
+ tq_send_finish(d->queue_in, 0);
+ tq_receive_finish(d->queue_out, 0);
+
+ pthread_join(d->thread, &ret);
+
+ tq_free(&d->queue_in);
+ tq_free(&d->queue_out);
+
+ return (intptr_t)ret;
+}
+
void dec_free(Decoder **pdec)
{
Decoder *dec = *pdec;
@@ -54,6 +97,8 @@ void dec_free(Decoder **pdec)
if (!dec)
return;
+ dec_thread_stop(dec);
+
av_frame_free(&dec->frame);
av_packet_free(&dec->pkt);
@@ -383,8 +428,10 @@ out:
return ret;
}
-static int transcode_subtitles(InputStream *ist, const AVPacket *pkt)
+static int transcode_subtitles(InputStream *ist, const AVPacket *pkt,
+ AVFrame *frame)
{
+ Decoder *d = ist->decoder;
AVPacket *flush_pkt = NULL;
AVSubtitle subtitle;
int got_output;
@@ -403,20 +450,30 @@ static int transcode_subtitles(InputStream *ist, const AVPacket *pkt)
if (ret < 0) {
av_log(ist, AV_LOG_ERROR, "Error decoding subtitles: %s\n",
av_err2str(ret));
- if (exit_on_error)
- exit_program(1);
ist->decode_errors++;
+ return exit_on_error ? ret : 0;
}
- if (ret < 0 || !got_output) {
- if (!pkt)
- sub2video_flush(ist);
- return ret < 0 ? ret : AVERROR_EOF;
- }
+ if (!got_output)
+ return pkt ? 0 : AVERROR_EOF;
ist->frames_decoded++;
- return process_subtitle(ist, &subtitle);
+ // XXX the queue for transferring data back to the main thread runs
+ // on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put that
+ // inside the frame
+ // eventually, subtitles should be switched to use AVFrames natively
+ ret = subtitle_wrap_frame(frame, &subtitle, 0);
+ if (ret < 0) {
+ avsubtitle_free(&subtitle);
+ return ret;
+ }
+
+ ret = tq_send(d->queue_out, 0, frame);
+ if (ret < 0)
+ av_frame_unref(frame);
+
+ return ret;
}
static int send_filter_eof(InputStream *ist)
@@ -434,7 +491,7 @@ static int send_filter_eof(InputStream *ist)
return 0;
}
-int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
+static int packet_decode(InputStream *ist, const AVPacket *pkt, AVFrame *frame)
{
Decoder *d = ist->decoder;
AVCodecContext *dec = ist->dec_ctx;
@@ -442,7 +499,7 @@ int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
int ret;
if (dec->codec_type == AVMEDIA_TYPE_SUBTITLE)
- return transcode_subtitles(ist, pkt);
+ return transcode_subtitles(ist, pkt, frame);
// With fate-indeo3-2, we're getting 0-sized packets before EOF for some
// reason. This seems like a semi-critical bug. Don't trigger EOF, and
@@ -457,23 +514,25 @@ int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
if (ret == AVERROR(EAGAIN)) {
av_log(ist, AV_LOG_FATAL, "A decoder returned an unexpected error code. "
"This is a bug, please report it.\n");
- exit_program(1);
+ return AVERROR_BUG;
}
av_log(ist, AV_LOG_ERROR, "Error submitting %s to decoder: %s\n",
pkt ? "packet" : "EOF", av_err2str(ret));
- if (exit_on_error)
- exit_program(1);
- if (ret != AVERROR_EOF)
+ if (ret != AVERROR_EOF) {
ist->decode_errors++;
+ if (!exit_on_error)
+ ret = 0;
+ }
return ret;
}
while (1) {
- AVFrame *frame = d->frame;
FrameData *fd;
+ av_frame_unref(frame);
+
update_benchmark(NULL);
ret = avcodec_receive_frame(dec, frame);
update_benchmark("decode_%s %d.%d", type_desc,
@@ -483,30 +542,22 @@ int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
av_assert0(pkt); // should never happen during flushing
return 0;
} else if (ret == AVERROR_EOF) {
- /* after flushing, send an EOF on all the filter inputs attached to the stream */
- /* except when looping we need to flush but not to send an EOF */
- if (!no_eof) {
- ret = send_filter_eof(ist);
- if (ret < 0) {
- av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n");
- exit_program(1);
- }
- }
-
- return AVERROR_EOF;
+ return ret;
} else if (ret < 0) {
av_log(ist, AV_LOG_ERROR, "Decoding error: %s\n", av_err2str(ret));
- if (exit_on_error)
- exit_program(1);
ist->decode_errors++;
- return ret;
+
+ if (exit_on_error)
+ return ret;
+
+ continue;
}
if (frame->decode_error_flags || (frame->flags & AV_FRAME_FLAG_CORRUPT)) {
av_log(ist, exit_on_error ? AV_LOG_FATAL : AV_LOG_WARNING,
"corrupt decoded frame\n");
if (exit_on_error)
- exit_program(1);
+ return AVERROR_INVALIDDATA;
}
@@ -514,7 +565,7 @@ int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
fd = frame_data(frame);
if (!fd) {
av_frame_unref(frame);
- report_and_exit(AVERROR(ENOMEM));
+ return AVERROR(ENOMEM);
}
fd->pts = frame->pts;
fd->tb = dec->pkt_timebase;
@@ -533,19 +584,254 @@ int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
if (ret < 0) {
av_log(NULL, AV_LOG_FATAL, "Error while processing the decoded "
"data for stream #%d:%d\n", ist->file_index, ist->index);
- exit_program(1);
+ return ret;
}
}
ist->frames_decoded++;
- ret = send_frame_to_filters(ist, frame);
- av_frame_unref(frame);
+ ret = tq_send(d->queue_out, 0, frame);
if (ret < 0)
- exit_program(1);
+ return ret;
}
}
+static void dec_thread_set_name(const InputStream *ist)
+{
+ char name[16];
+ snprintf(name, sizeof(name), "dec%d:%d:%s", ist->file_index, ist->index,
+ ist->dec_ctx->codec->name);
+ ff_thread_setname(name);
+}
+
+static void dec_thread_uninit(DecThreadContext *dt)
+{
+ av_packet_free(&dt->pkt);
+ av_frame_free(&dt->frame);
+
+ memset(dt, 0, sizeof(*dt));
+}
+
+static int dec_thread_init(DecThreadContext *dt)
+{
+ memset(dt, 0, sizeof(*dt));
+
+ dt->frame = av_frame_alloc();
+ if (!dt->frame)
+ goto fail;
+
+ dt->pkt = av_packet_alloc();
+ if (!dt->pkt)
+ goto fail;
+
+ return 0;
+
+fail:
+ dec_thread_uninit(dt);
+ return AVERROR(ENOMEM);
+}
+
+static void *decoder_thread(void *arg)
+{
+ InputStream *ist = arg;
+ InputFile *ifile = input_files[ist->file_index];
+ Decoder *d = ist->decoder;
+ DecThreadContext dt;
+ int ret = 0, input_status = 0;
+
+ ret = dec_thread_init(&dt);
+ if (ret < 0)
+ goto finish;
+
+ dec_thread_set_name(ist);
+
+ while (!input_status) {
+ int dummy, flush_buffers;
+
+ input_status = tq_receive(d->queue_in, &dummy, dt.pkt);
+ flush_buffers = input_status >= 0 && !dt.pkt->buf;
+ if (!dt.pkt->buf)
+ av_log(ist, AV_LOG_VERBOSE, "Decoder thread received %s packet\n",
+ flush_buffers ? "flush" : "EOF");
+
+ ret = packet_decode(ist, dt.pkt->buf ? dt.pkt : NULL, dt.frame);
+
+ av_packet_unref(dt.pkt);
+ av_frame_unref(dt.frame);
+
+ if (ret == AVERROR_EOF) {
+ av_log(ist, AV_LOG_VERBOSE, "Decoder returned EOF, %s\n",
+ flush_buffers ? "resetting" : "finishing");
+
+ if (!flush_buffers)
+ break;
+
+ /* report last frame duration to the demuxer thread */
+ if (ist->dec->type == AVMEDIA_TYPE_AUDIO) {
+ LastFrameDuration dur;
+
+ dur.stream_idx = ist->index;
+ dur.duration = av_rescale_q(ist->nb_samples,
+ (AVRational){ 1, ist->dec_ctx->sample_rate},
+ ist->st->time_base);
+
+ av_thread_message_queue_send(ifile->audio_duration_queue, &dur, 0);
+ }
+
+ avcodec_flush_buffers(ist->dec_ctx);
+ } else if (ret < 0) {
+ av_log(ist, AV_LOG_ERROR, "Error processing packet in decoder: %s\n",
+ av_err2str(ret));
+ break;
+ }
+
+ // signal to the consumer thread that the entire packet was processed
+ ret = tq_send(d->queue_out, 0, dt.frame);
+ if (ret < 0) {
+ if (ret != AVERROR_EOF)
+ av_log(ist, AV_LOG_ERROR, "Error communicating with the main thread\n");
+ break;
+ }
+ }
+
+ // EOF is normal thread termination
+ if (ret == AVERROR_EOF)
+ ret = 0;
+
+finish:
+ tq_receive_finish(d->queue_in, 0);
+ tq_send_finish (d->queue_out, 0);
+
+ // make sure the demuxer does not get stuck waiting for audio durations
+ // that will never arrive
+ if (ifile->audio_duration_queue && ist->dec->type == AVMEDIA_TYPE_AUDIO)
+ av_thread_message_queue_set_err_recv(ifile->audio_duration_queue, AVERROR_EOF);
+
+ dec_thread_uninit(&dt);
+
+ av_log(ist, AV_LOG_VERBOSE, "Terminating decoder thread\n");
+
+ return (void*)(intptr_t)ret;
+}
+
+static int dec_thread_start(InputStream *ist)
+{
+ Decoder *d = ist->decoder;
+ ObjPool *op;
+ int ret = 0;
+
+ op = objpool_alloc_packets();
+ if (!op)
+ return AVERROR(ENOMEM);
+
+ d->queue_in = tq_alloc(1, 1, op, pkt_move);
+ if (!d->queue_in) {
+ objpool_free(&op);
+ return AVERROR(ENOMEM);
+ }
+
+ op = objpool_alloc_frames();
+ if (!op)
+ goto fail;
+
+ d->queue_out = tq_alloc(1, 4, op, frame_move);
+ if (!d->queue_out) {
+ objpool_free(&op);
+ goto fail;
+ }
+
+ ret = pthread_create(&d->thread, NULL, decoder_thread, ist);
+ if (ret) {
+ ret = AVERROR(ret);
+ av_log(ist, AV_LOG_ERROR, "pthread_create() failed: %s\n",
+ av_err2str(ret));
+ goto fail;
+ }
+
+ return 0;
+fail:
+ if (ret >= 0)
+ ret = AVERROR(ENOMEM);
+
+ tq_free(&d->queue_in);
+ tq_free(&d->queue_out);
+ return ret;
+}
+
+int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
+{
+ Decoder *d = ist->decoder;
+ int ret = 0, thread_ret;
+
+ // thread already joined
+ if (!d->queue_in)
+ return AVERROR_EOF;
+
+ // send the packet/flush request/EOF to the decoder thread
+ if (pkt || no_eof) {
+ av_packet_unref(d->pkt);
+
+ if (pkt) {
+ ret = av_packet_ref(d->pkt, pkt);
+ if (ret < 0)
+ goto finish;
+ }
+
+ ret = tq_send(d->queue_in, 0, d->pkt);
+ if (ret < 0)
+ goto finish;
+ } else
+ tq_send_finish(d->queue_in, 0);
+
+ // retrieve all decoded data for the packet
+ while (1) {
+ int dummy;
+
+ ret = tq_receive(d->queue_out, &dummy, d->frame);
+ if (ret < 0)
+ goto finish;
+
+ // packet fully processed
+ if (!d->frame->buf[0])
+ return 0;
+
+ // process the decoded frame
+ if (ist->dec->type == AVMEDIA_TYPE_SUBTITLE) {
+ AVSubtitle *sub = (AVSubtitle*)d->frame->buf[0]->data;
+ ret = process_subtitle(ist, sub);
+ } else {
+ ret = send_frame_to_filters(ist, d->frame);
+ }
+ av_frame_unref(d->frame);
+ if (ret < 0)
+ goto finish;
+ }
+
+finish:
+ thread_ret = dec_thread_stop(d);
+ if (thread_ret < 0) {
+ av_log(ist, AV_LOG_ERROR, "Decoder thread returned error: %s\n",
+ av_err2str(thread_ret));
+ ret = err_merge(ret, thread_ret);
+ }
+ // non-EOF errors here are all fatal
+ if (ret < 0 && ret != AVERROR_EOF)
+ report_and_exit(ret);
+
+ // signal EOF to our downstreams
+ if (ist->dec->type == AVMEDIA_TYPE_SUBTITLE)
+ sub2video_flush(ist);
+ else {
+ ret = send_filter_eof(ist);
+ if (ret < 0) {
+ av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n");
+ exit_program(1);
+ }
+ }
+
+ return AVERROR_EOF;
+}
+
static enum AVPixelFormat get_format(AVCodecContext *s, const enum AVPixelFormat *pix_fmts)
{
InputStream *ist = s->opaque;
@@ -781,5 +1067,12 @@ int dec_open(InputStream *ist)
}
assert_avoptions(ist->decoder_opts);
+ ret = dec_thread_start(ist);
+ if (ret < 0) {
+ av_log(ist, AV_LOG_ERROR, "Error starting decoder thread: %s\n",
+ av_err2str(ret));
+ return ret;
+ }
+
return 0;
}
@@ -475,11 +475,6 @@ static int thread_stop(Muxer *mux)
return (int)(intptr_t)ret;
}
-static void pkt_move(void *dst, void *src)
-{
- av_packet_move_ref(dst, src);
-}
-
static int thread_start(Muxer *mux)
{
AVFormatContext *fc = mux->fc;