@@ -20,6 +20,7 @@
#include <stdint.h>
#include "ffmpeg.h"
+#include "thread_queue.h"
#include "libavutil/avassert.h"
#include "libavutil/avstring.h"
@@ -43,6 +44,7 @@ struct Encoder {
// packet for receiving encoded output
AVPacket *pkt;
+ AVFrame *sub_frame;
// combined size of all the packets received from the encoder
uint64_t data_size;
@@ -51,8 +53,48 @@ struct Encoder {
uint64_t packets_encoded;
int opened;
+ int finished;
+
+ pthread_t thread;
+ /**
+ * Queue for sending frames from the main thread to
+ * the encoder thread.
+ */
+ ThreadQueue *queue_in;
+ /**
+ * Queue for sending encoded packets from the encoder thread
+ * to the main thread.
+ *
+ * An empty packet is sent to signal that a previously sent
+ * frame has been fully processed.
+ */
+ ThreadQueue *queue_out;
};
+// data that is local to the decoder thread and not visible outside of it
+typedef struct EncoderThread {
+ AVFrame *frame;
+ AVPacket *pkt;
+} EncoderThread;
+
+static int enc_thread_stop(Encoder *e)
+{
+ void *ret;
+
+ if (!e->queue_in)
+ return 0;
+
+ tq_send_finish(e->queue_in, 0);
+ tq_receive_finish(e->queue_out, 0);
+
+ pthread_join(e->thread, &ret);
+
+ tq_free(&e->queue_in);
+ tq_free(&e->queue_out);
+
+ return (int)(intptr_t)ret;
+}
+
void enc_free(Encoder **penc)
{
Encoder *enc = *penc;
@@ -60,7 +102,10 @@ void enc_free(Encoder **penc)
if (!enc)
return;
+ enc_thread_stop(enc);
+
av_frame_free(&enc->sq_frame);
+ av_frame_free(&enc->sub_frame);
av_packet_free(&enc->pkt);
@@ -77,6 +122,12 @@ int enc_alloc(Encoder **penc, const AVCodec *codec)
if (!enc)
return AVERROR(ENOMEM);
+ if (codec->type == AVMEDIA_TYPE_SUBTITLE) {
+ enc->sub_frame = av_frame_alloc();
+ if (!enc->sub_frame)
+ goto fail;
+ }
+
enc->pkt = av_packet_alloc();
if (!enc->pkt)
goto fail;
@@ -165,6 +216,52 @@ static int set_encoder_id(OutputFile *of, OutputStream *ost)
return 0;
}
+static void *encoder_thread(void *arg);
+
+static int enc_thread_start(OutputStream *ost)
+{
+ Encoder *e = ost->enc;
+ ObjPool *op;
+ int ret = 0;
+
+ op = objpool_alloc_frames();
+ if (!op)
+ return AVERROR(ENOMEM);
+
+ e->queue_in = tq_alloc(1, 1, op, frame_move);
+ if (!e->queue_in) {
+ objpool_free(&op);
+ return AVERROR(ENOMEM);
+ }
+
+ op = objpool_alloc_packets();
+ if (!op)
+ goto fail;
+
+ e->queue_out = tq_alloc(1, 4, op, pkt_move);
+ if (!e->queue_out) {
+ objpool_free(&op);
+ goto fail;
+ }
+
+ ret = pthread_create(&e->thread, NULL, encoder_thread, ost);
+ if (ret) {
+ ret = AVERROR(ret);
+ av_log(ost, AV_LOG_ERROR, "pthread_create() failed: %s\n",
+ av_err2str(ret));
+ goto fail;
+ }
+
+ return 0;
+fail:
+ if (ret >= 0)
+ ret = AVERROR(ENOMEM);
+
+ tq_free(&e->queue_in);
+ tq_free(&e->queue_out);
+ return ret;
+}
+
int enc_open(OutputStream *ost, const AVFrame *frame)
{
InputStream *ist = ost->ist;
@@ -387,6 +484,13 @@ int enc_open(OutputStream *ost, const AVFrame *frame)
if (ost->st->time_base.num <= 0 || ost->st->time_base.den <= 0)
ost->st->time_base = av_add_q(ost->enc_ctx->time_base, (AVRational){0, 1});
+ ret = enc_thread_start(ost);
+ if (ret < 0) {
+ av_log(ost, AV_LOG_ERROR, "Error starting encoder thread: %s\n",
+ av_err2str(ret));
+ return ret;
+ }
+
ret = of_stream_init(of, ost);
if (ret < 0)
return ret;
@@ -400,19 +504,18 @@ static int check_recording_time(OutputStream *ost, int64_t ts, AVRational tb)
if (of->recording_time != INT64_MAX &&
av_compare_ts(ts, tb, of->recording_time, AV_TIME_BASE_Q) >= 0) {
- close_output_stream(ost);
return 0;
}
return 1;
}
-int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub)
+static int do_subtitle_out(OutputFile *of, OutputStream *ost, const AVSubtitle *sub,
+ AVPacket *pkt)
{
Encoder *e = ost->enc;
int subtitle_out_max_size = 1024 * 1024;
int subtitle_out_size, nb, i, ret;
AVCodecContext *enc;
- AVPacket *pkt = e->pkt;
int64_t pts;
if (sub->pts == AV_NOPTS_VALUE) {
@@ -442,8 +545,9 @@ int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub)
for (i = 0; i < nb; i++) {
AVSubtitle local_sub = *sub;
+ // XXX
if (!check_recording_time(ost, pts, AV_TIME_BASE_Q))
- return 0;
+ return AVERROR_EOF;
ret = av_new_packet(pkt, subtitle_out_max_size);
if (ret < 0)
@@ -484,9 +588,11 @@ int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub)
}
pkt->dts = pkt->pts;
- ret = of_output_packet(of, ost, pkt);
- if (ret < 0)
+ ret = tq_send(e->queue_out, 0, pkt);
+ if (ret < 0) {
+ av_packet_unref(pkt);
return ret;
+ }
}
return 0;
@@ -624,11 +730,11 @@ static int update_video_stats(OutputStream *ost, const AVPacket *pkt, int write_
return 0;
}
-static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame)
+static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame,
+ AVPacket *pkt)
{
Encoder *e = ost->enc;
AVCodecContext *enc = ost->enc_ctx;
- AVPacket *pkt = e->pkt;
const char *type_desc = av_get_media_type_string(enc->codec_type);
const char *action = frame ? "encode" : "flush";
int ret;
@@ -678,11 +784,9 @@ static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame)
if (ret == AVERROR(EAGAIN)) {
av_assert0(frame); // should never happen during flushing
return 0;
- } else if (ret == AVERROR_EOF) {
- ret = of_output_packet(of, ost, NULL);
- return ret < 0 ? ret : AVERROR_EOF;
} else if (ret < 0) {
- av_log(ost, AV_LOG_ERROR, "%s encoding failed\n", type_desc);
+ if (ret != AVERROR_EOF)
+ av_log(ost, AV_LOG_ERROR, "%s encoding failed\n", type_desc);
return ret;
}
@@ -706,33 +810,36 @@ static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame)
av_ts2str(pkt->duration), av_ts2timestr(pkt->duration, &enc->time_base));
}
- if ((ret = trigger_fix_sub_duration_heartbeat(ost, pkt)) < 0) {
- av_log(NULL, AV_LOG_ERROR,
- "Subtitle heartbeat logic failed in %s! (%s)\n",
- __func__, av_err2str(ret));
- return ret;
- }
+ // XXX
+ //if ((ret = trigger_fix_sub_duration_heartbeat(ost, pkt)) < 0) {
+ // av_log(NULL, AV_LOG_ERROR,
+ // "Subtitle heartbeat logic failed in %s! (%s)\n",
+ // __func__, av_err2str(ret));
+ // return ret;
+ //}
e->data_size += pkt->size;
e->packets_encoded++;
- ret = of_output_packet(of, ost, pkt);
- if (ret < 0)
+ ret = tq_send(e->queue_out, 0, pkt);
+ if (ret < 0) {
+ av_packet_unref(pkt);
return ret;
+ }
}
av_assert0(0);
}
static int submit_encode_frame(OutputFile *of, OutputStream *ost,
- AVFrame *frame)
+ AVFrame *frame, AVPacket *pkt)
{
Encoder *e = ost->enc;
int ret;
if (ost->sq_idx_encode < 0)
- return encode_frame(of, ost, frame);
+ return encode_frame(of, ost, frame, pkt);
if (frame) {
ret = av_frame_ref(e->sq_frame, frame);
@@ -761,22 +868,22 @@ static int submit_encode_frame(OutputFile *of, OutputStream *ost,
return (ret == AVERROR(EAGAIN)) ? 0 : ret;
}
- ret = encode_frame(of, ost, enc_frame);
+ ret = encode_frame(of, ost, enc_frame, pkt);
if (enc_frame)
av_frame_unref(enc_frame);
if (ret < 0) {
- if (ret == AVERROR_EOF)
- close_output_stream(ost);
+ // XXX
+ //if (ret == AVERROR_EOF)
+ // close_output_stream(ost);
return ret;
}
}
}
static int do_audio_out(OutputFile *of, OutputStream *ost,
- AVFrame *frame)
+ AVFrame *frame, AVPacket *pkt)
{
AVCodecContext *enc = ost->enc_ctx;
- int ret;
if (!(enc->codec->capabilities & AV_CODEC_CAP_PARAM_CHANGE) &&
enc->ch_layout.nb_channels != frame->ch_layout.nb_channels) {
@@ -785,11 +892,12 @@ static int do_audio_out(OutputFile *of, OutputStream *ost,
return 0;
}
+ // XXX
if (!check_recording_time(ost, frame->pts, frame->time_base))
- return 0;
+ return AVERROR_EOF;
- ret = submit_encode_frame(of, ost, frame);
- return (ret < 0 && ret != AVERROR_EOF) ? ret : 0;
+ // XXX check EOF handling
+ return submit_encode_frame(of, ost, frame, pkt);
}
static enum AVPictureType forced_kf_apply(void *logctx, KeyframeForceCtx *kf,
@@ -839,13 +947,14 @@ force_keyframe:
}
/* May modify/reset frame */
-static int do_video_out(OutputFile *of, OutputStream *ost, AVFrame *in_picture)
+static int do_video_out(OutputFile *of, OutputStream *ost,
+ AVFrame *in_picture, AVPacket *pkt)
{
- int ret;
AVCodecContext *enc = ost->enc_ctx;
+ // XXX
if (!check_recording_time(ost, in_picture->pts, ost->enc_ctx->time_base))
- return 0;
+ return AVERROR_EOF;
in_picture->quality = enc->global_quality;
in_picture->pict_type = forced_kf_apply(ost, &ost->kf, enc->time_base, in_picture);
@@ -857,26 +966,210 @@ static int do_video_out(OutputFile *of, OutputStream *ost, AVFrame *in_picture)
}
#endif
- ret = submit_encode_frame(of, ost, in_picture);
- return (ret == AVERROR_EOF) ? 0 : ret;
+ // XXX check EOF handling
+ return submit_encode_frame(of, ost, in_picture, pkt);
+}
+
+static int frame_encode(OutputStream *ost, AVFrame *frame, AVPacket *pkt)
+{
+ OutputFile *of = output_files[ost->file_index];
+ enum AVMediaType type = ost->type;
+ int ret;
+
+ if (type == AVMEDIA_TYPE_SUBTITLE) {
+ // no flushing for subtitles
+ return frame ?
+ do_subtitle_out(of, ost, (AVSubtitle*)frame->buf[0]->data, pkt) : 0;
+ }
+
+ // XXX
+ if (frame) {
+ ret = (type == AVMEDIA_TYPE_VIDEO) ? do_video_out(of, ost, frame, pkt) :
+ do_audio_out(of, ost, frame, pkt);
+ if (ret < 0)
+ return ret;
+ }
+
+ return frame ? 0 : submit_encode_frame(of, ost, NULL, pkt);
+}
+
+static void enc_thread_set_name(const OutputStream *ost)
+{
+ char name[16];
+ snprintf(name, sizeof(name), "enc%d:%d:%s", ost->file_index, ost->index,
+ ost->enc_ctx->codec->name);
+ ff_thread_setname(name);
+}
+
+static void enc_thread_uninit(EncoderThread *et)
+{
+ av_packet_free(&et->pkt);
+ av_frame_free(&et->frame);
+
+ memset(et, 0, sizeof(*et));
+}
+
+static int enc_thread_init(EncoderThread *et)
+{
+ memset(et, 0, sizeof(*et));
+
+ et->frame = av_frame_alloc();
+ if (!et->frame)
+ goto fail;
+
+ et->pkt = av_packet_alloc();
+ if (!et->pkt)
+ goto fail;
+
+ return 0;
+
+fail:
+ enc_thread_uninit(et);
+ return AVERROR(ENOMEM);
+}
+
+static void *encoder_thread(void *arg)
+{
+ OutputStream *ost = arg;
+ OutputFile *of = output_files[ost->file_index];
+ Encoder *e = ost->enc;
+ EncoderThread et;
+ int ret = 0, input_status = 0;
+
+ ret = enc_thread_init(&et);
+ if (ret < 0)
+ goto finish;
+
+ enc_thread_set_name(ost);
+
+ while (!input_status) {
+ int dummy;
+
+ input_status = tq_receive(e->queue_in, &dummy, et.frame);
+ if (input_status < 0)
+ av_log(ost, AV_LOG_VERBOSE, "Encoder thread received EOF\n");
+
+ ret = frame_encode(ost, input_status >= 0 ? et.frame : NULL, et.pkt);
+
+ av_packet_unref(et.pkt);
+ av_frame_unref(et.frame);
+
+ if (ret < 0) {
+ if (ret == AVERROR_EOF)
+ av_log(ost, AV_LOG_VERBOSE, "Encoder returned EOF, finishing\n");
+ else
+ av_log(ost, AV_LOG_ERROR, "Error encoding a frame: %s\n",
+ av_err2str(ret));
+ break;
+ }
+
+ // signal to the consumer thread that the frame was encoded
+ ret = tq_send(e->queue_out, 0, et.pkt);
+ if (ret < 0) {
+ if (ret != AVERROR_EOF)
+ av_log(ost, AV_LOG_ERROR,
+ "Error communicating with the main thread\n");
+ break;
+ }
+ }
+
+ // EOF is normal thread termination
+ if (ret == AVERROR_EOF)
+ ret = 0;
+
+finish:
+ if (ost->sq_idx_encode >= 0)
+ sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL));
+
+ tq_receive_finish(e->queue_in, 0);
+ tq_send_finish (e->queue_out, 0);
+
+ enc_thread_uninit(&et);
+
+ av_log(ost, AV_LOG_VERBOSE, "Terminating encoder thread\n");
+
+ return (void*)(intptr_t)ret;
}
int enc_frame(OutputStream *ost, AVFrame *frame)
{
OutputFile *of = output_files[ost->file_index];
- int ret;
+ Encoder *e = ost->enc;
+ int ret, thread_ret;
ret = enc_open(ost, frame);
if (ret < 0)
return ret;
- return ost->enc_ctx->codec_type == AVMEDIA_TYPE_VIDEO ?
- do_video_out(of, ost, frame) : do_audio_out(of, ost, frame);
+ // thread already joined
+ // XXX check EOF handling
+ if (!e->queue_in)
+ return AVERROR_EOF;
+
+ // send the frame/EOF to the encoder thread
+ if (frame) {
+ ret = tq_send(e->queue_in, 0, frame);
+ if (ret < 0)
+ goto finish;
+ } else
+ tq_send_finish(e->queue_in, 0);
+
+ // retrieve all encoded data for the frame
+ while (1) {
+ int dummy;
+
+ ret = tq_receive(e->queue_out, &dummy, e->pkt);
+ if (ret < 0)
+ break;
+
+ // frame fully encoded
+ if (!e->pkt->data && !e->pkt->side_data_elems)
+ return 0;
+
+ // process the encoded packet
+ ret = of_output_packet(of, ost, e->pkt);
+ if (ret < 0)
+ goto finish;
+ }
+
+finish:
+ thread_ret = enc_thread_stop(e);
+ if (thread_ret < 0) {
+ av_log(ost, AV_LOG_ERROR, "Encoder thread returned error: %s\n",
+ av_err2str(thread_ret));
+ ret = err_merge(ret, thread_ret);
+ }
+
+ if (ret < 0 && ret != AVERROR_EOF)
+ return ret;
+
+ // signal EOF to the muxer
+ return of_output_packet(of, ost, NULL);
+}
+
+int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub)
+{
+ Encoder *e = ost->enc;
+ AVFrame *f = e->sub_frame;
+ int ret;
+
+ // XXX the queue for transferring data to the encoder thread runs
+ // on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put
+ // that inside the frame
+ // eventually, subtitles should be switched to use AVFrames natively
+ ret = subtitle_wrap_frame(f, sub, 1);
+ if (ret < 0)
+ return ret;
+
+ ret = enc_frame(ost, f);
+ av_frame_unref(f);
+
+ return ret;
}
int enc_flush(void)
{
- int ret;
+ int ret = 0;
for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
OutputFile *of = output_files[ost->file_index];
@@ -887,16 +1180,19 @@ int enc_flush(void)
for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
Encoder *e = ost->enc;
AVCodecContext *enc = ost->enc_ctx;
- OutputFile *of = output_files[ost->file_index];
+ int err;
if (!enc || !e->opened ||
(enc->codec_type != AVMEDIA_TYPE_VIDEO && enc->codec_type != AVMEDIA_TYPE_AUDIO))
continue;
- ret = submit_encode_frame(of, ost, NULL);
- if (ret != AVERROR_EOF)
- return ret;
+ err = enc_frame(ost, NULL);
+ // XXX check EOF handling
+ if (err != AVERROR_EOF && ret < 0)
+ ret = err_merge(ret, err);
+
+ av_assert0(!e->queue_in);
}
- return 0;
+ return ret;
}