diff mbox series

[FFmpeg-devel,20/27] WIP: fftools/ffmpeg_enc: move encoding to a separate thread

Message ID 20230919191044.18873-21-anton@khirnov.net
State New
Headers show
Series [FFmpeg-devel,01/27] fftools/ffmpeg: move derivation of frame duration from filter framerate | expand

Commit Message

Anton Khirnov Sept. 19, 2023, 7:10 p.m. UTC
As for the analogous decoding change, this is only a preparatory step to
a fully threaded architecture and does not yet make encoding truly
parallel. The main thread will currently submit a frame and wait until
it has been fully processed by the encoder before moving on. That will
change in future commits after filters are moved to threads and a
thread-aware scheduler is added.

WIP: resolve all // XXX left in the code
Also, if an encoder with a sync queue receives EOF it will terminate
after processing everything it currently has, even though the sync queue
might still be triggered by other threads.
---
 fftools/ffmpeg_enc.c | 384 ++++++++++++++++++++++++++++++++++++++-----
 1 file changed, 340 insertions(+), 44 deletions(-)
diff mbox series

Patch

diff --git a/fftools/ffmpeg_enc.c b/fftools/ffmpeg_enc.c
index d8d7c3416d..ea542173c5 100644
--- a/fftools/ffmpeg_enc.c
+++ b/fftools/ffmpeg_enc.c
@@ -20,6 +20,7 @@ 
 #include <stdint.h>
 
 #include "ffmpeg.h"
+#include "thread_queue.h"
 
 #include "libavutil/avassert.h"
 #include "libavutil/avstring.h"
@@ -43,6 +44,7 @@  struct Encoder {
 
     // packet for receiving encoded output
     AVPacket *pkt;
+    AVFrame  *sub_frame;
 
     // combined size of all the packets received from the encoder
     uint64_t data_size;
@@ -51,8 +53,48 @@  struct Encoder {
     uint64_t packets_encoded;
 
     int opened;
+    int finished;
+
+    pthread_t       thread;
+    /**
+     * Queue for sending frames from the main thread to
+     * the encoder thread.
+     */
+    ThreadQueue    *queue_in;
+    /**
+     * Queue for sending encoded packets from the encoder thread
+     * to the main thread.
+     *
+     * An empty packet is sent to signal that a previously sent
+     * frame has been fully processed.
+     */
+    ThreadQueue    *queue_out;
 };
 
+// data that is local to the decoder thread and not visible outside of it
+typedef struct EncoderThread {
+    AVFrame *frame;
+    AVPacket  *pkt;
+} EncoderThread;
+
+static int enc_thread_stop(Encoder *e)
+{
+    void *ret;
+
+    if (!e->queue_in)
+        return 0;
+
+    tq_send_finish(e->queue_in, 0);
+    tq_receive_finish(e->queue_out, 0);
+
+    pthread_join(e->thread, &ret);
+
+    tq_free(&e->queue_in);
+    tq_free(&e->queue_out);
+
+    return (int)(intptr_t)ret;
+}
+
 void enc_free(Encoder **penc)
 {
     Encoder *enc = *penc;
@@ -60,7 +102,10 @@  void enc_free(Encoder **penc)
     if (!enc)
         return;
 
+    enc_thread_stop(enc);
+
     av_frame_free(&enc->sq_frame);
+    av_frame_free(&enc->sub_frame);
 
     av_packet_free(&enc->pkt);
 
@@ -77,6 +122,12 @@  int enc_alloc(Encoder **penc, const AVCodec *codec)
     if (!enc)
         return AVERROR(ENOMEM);
 
+    if (codec->type == AVMEDIA_TYPE_SUBTITLE) {
+        enc->sub_frame = av_frame_alloc();
+        if (!enc->sub_frame)
+            goto fail;
+    }
+
     enc->pkt = av_packet_alloc();
     if (!enc->pkt)
         goto fail;
@@ -165,6 +216,52 @@  static int set_encoder_id(OutputFile *of, OutputStream *ost)
     return 0;
 }
 
+static void *encoder_thread(void *arg);
+
+static int enc_thread_start(OutputStream *ost)
+{
+    Encoder *e = ost->enc;
+    ObjPool *op;
+    int ret = 0;
+
+    op = objpool_alloc_frames();
+    if (!op)
+        return AVERROR(ENOMEM);
+
+    e->queue_in = tq_alloc(1, 1, op, frame_move);
+    if (!e->queue_in) {
+        objpool_free(&op);
+        return AVERROR(ENOMEM);
+    }
+
+    op = objpool_alloc_packets();
+    if (!op)
+        goto fail;
+
+    e->queue_out = tq_alloc(1, 4, op, pkt_move);
+    if (!e->queue_out) {
+        objpool_free(&op);
+        goto fail;
+    }
+
+    ret = pthread_create(&e->thread, NULL, encoder_thread, ost);
+    if (ret) {
+        ret = AVERROR(ret);
+        av_log(ost, AV_LOG_ERROR, "pthread_create() failed: %s\n",
+               av_err2str(ret));
+        goto fail;
+    }
+
+    return 0;
+fail:
+    if (ret >= 0)
+        ret = AVERROR(ENOMEM);
+
+    tq_free(&e->queue_in);
+    tq_free(&e->queue_out);
+    return ret;
+}
+
 int enc_open(OutputStream *ost, const AVFrame *frame)
 {
     InputStream *ist = ost->ist;
@@ -387,6 +484,13 @@  int enc_open(OutputStream *ost, const AVFrame *frame)
     if (ost->st->time_base.num <= 0 || ost->st->time_base.den <= 0)
         ost->st->time_base = av_add_q(ost->enc_ctx->time_base, (AVRational){0, 1});
 
+    ret = enc_thread_start(ost);
+    if (ret < 0) {
+        av_log(ost, AV_LOG_ERROR, "Error starting encoder thread: %s\n",
+               av_err2str(ret));
+        return ret;
+    }
+
     ret = of_stream_init(of, ost);
     if (ret < 0)
         return ret;
@@ -400,19 +504,18 @@  static int check_recording_time(OutputStream *ost, int64_t ts, AVRational tb)
 
     if (of->recording_time != INT64_MAX &&
         av_compare_ts(ts, tb, of->recording_time, AV_TIME_BASE_Q) >= 0) {
-        close_output_stream(ost);
         return 0;
     }
     return 1;
 }
 
-int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub)
+static int do_subtitle_out(OutputFile *of, OutputStream *ost, const AVSubtitle *sub,
+                           AVPacket *pkt)
 {
     Encoder *e = ost->enc;
     int subtitle_out_max_size = 1024 * 1024;
     int subtitle_out_size, nb, i, ret;
     AVCodecContext *enc;
-    AVPacket *pkt = e->pkt;
     int64_t pts;
 
     if (sub->pts == AV_NOPTS_VALUE) {
@@ -442,8 +545,9 @@  int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub)
     for (i = 0; i < nb; i++) {
         AVSubtitle local_sub = *sub;
 
+        // XXX
         if (!check_recording_time(ost, pts, AV_TIME_BASE_Q))
-            return 0;
+            return AVERROR_EOF;
 
         ret = av_new_packet(pkt, subtitle_out_max_size);
         if (ret < 0)
@@ -484,9 +588,11 @@  int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub)
         }
         pkt->dts = pkt->pts;
 
-        ret = of_output_packet(of, ost, pkt);
-        if (ret < 0)
+        ret = tq_send(e->queue_out, 0, pkt);
+        if (ret < 0) {
+            av_packet_unref(pkt);
             return ret;
+        }
     }
 
     return 0;
@@ -624,11 +730,11 @@  static int update_video_stats(OutputStream *ost, const AVPacket *pkt, int write_
     return 0;
 }
 
-static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame)
+static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame,
+                        AVPacket *pkt)
 {
     Encoder            *e = ost->enc;
     AVCodecContext   *enc = ost->enc_ctx;
-    AVPacket         *pkt = e->pkt;
     const char *type_desc = av_get_media_type_string(enc->codec_type);
     const char    *action = frame ? "encode" : "flush";
     int ret;
@@ -678,11 +784,9 @@  static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame)
         if (ret == AVERROR(EAGAIN)) {
             av_assert0(frame); // should never happen during flushing
             return 0;
-        } else if (ret == AVERROR_EOF) {
-            ret = of_output_packet(of, ost, NULL);
-            return ret < 0 ? ret : AVERROR_EOF;
         } else if (ret < 0) {
-            av_log(ost, AV_LOG_ERROR, "%s encoding failed\n", type_desc);
+            if (ret != AVERROR_EOF)
+                av_log(ost, AV_LOG_ERROR, "%s encoding failed\n", type_desc);
             return ret;
         }
 
@@ -706,33 +810,36 @@  static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame)
                    av_ts2str(pkt->duration), av_ts2timestr(pkt->duration, &enc->time_base));
         }
 
-        if ((ret = trigger_fix_sub_duration_heartbeat(ost, pkt)) < 0) {
-            av_log(NULL, AV_LOG_ERROR,
-                   "Subtitle heartbeat logic failed in %s! (%s)\n",
-                   __func__, av_err2str(ret));
-            return ret;
-        }
+        // XXX
+        //if ((ret = trigger_fix_sub_duration_heartbeat(ost, pkt)) < 0) {
+        //    av_log(NULL, AV_LOG_ERROR,
+        //           "Subtitle heartbeat logic failed in %s! (%s)\n",
+        //           __func__, av_err2str(ret));
+        //    return ret;
+        //}
 
         e->data_size += pkt->size;
 
         e->packets_encoded++;
 
-        ret = of_output_packet(of, ost, pkt);
-        if (ret < 0)
+        ret = tq_send(e->queue_out, 0, pkt);
+        if (ret < 0) {
+            av_packet_unref(pkt);
             return ret;
+        }
     }
 
     av_assert0(0);
 }
 
 static int submit_encode_frame(OutputFile *of, OutputStream *ost,
-                               AVFrame *frame)
+                               AVFrame *frame, AVPacket *pkt)
 {
     Encoder *e = ost->enc;
     int ret;
 
     if (ost->sq_idx_encode < 0)
-        return encode_frame(of, ost, frame);
+        return encode_frame(of, ost, frame, pkt);
 
     if (frame) {
         ret = av_frame_ref(e->sq_frame, frame);
@@ -761,22 +868,22 @@  static int submit_encode_frame(OutputFile *of, OutputStream *ost,
             return (ret == AVERROR(EAGAIN)) ? 0 : ret;
         }
 
-        ret = encode_frame(of, ost, enc_frame);
+        ret = encode_frame(of, ost, enc_frame, pkt);
         if (enc_frame)
             av_frame_unref(enc_frame);
         if (ret < 0) {
-            if (ret == AVERROR_EOF)
-                close_output_stream(ost);
+            // XXX
+            //if (ret == AVERROR_EOF)
+            //    close_output_stream(ost);
             return ret;
         }
     }
 }
 
 static int do_audio_out(OutputFile *of, OutputStream *ost,
-                        AVFrame *frame)
+                        AVFrame *frame, AVPacket *pkt)
 {
     AVCodecContext *enc = ost->enc_ctx;
-    int ret;
 
     if (!(enc->codec->capabilities & AV_CODEC_CAP_PARAM_CHANGE) &&
         enc->ch_layout.nb_channels != frame->ch_layout.nb_channels) {
@@ -785,11 +892,12 @@  static int do_audio_out(OutputFile *of, OutputStream *ost,
         return 0;
     }
 
+    // XXX
     if (!check_recording_time(ost, frame->pts, frame->time_base))
-        return 0;
+        return AVERROR_EOF;
 
-    ret = submit_encode_frame(of, ost, frame);
-    return (ret < 0 && ret != AVERROR_EOF) ? ret : 0;
+    // XXX check EOF handling
+    return submit_encode_frame(of, ost, frame, pkt);
 }
 
 static enum AVPictureType forced_kf_apply(void *logctx, KeyframeForceCtx *kf,
@@ -839,13 +947,14 @@  force_keyframe:
 }
 
 /* May modify/reset frame */
-static int do_video_out(OutputFile *of, OutputStream *ost, AVFrame *in_picture)
+static int do_video_out(OutputFile *of, OutputStream *ost,
+                        AVFrame *in_picture, AVPacket *pkt)
 {
-    int ret;
     AVCodecContext *enc = ost->enc_ctx;
 
+    // XXX
     if (!check_recording_time(ost, in_picture->pts, ost->enc_ctx->time_base))
-        return 0;
+        return AVERROR_EOF;
 
     in_picture->quality = enc->global_quality;
     in_picture->pict_type = forced_kf_apply(ost, &ost->kf, enc->time_base, in_picture);
@@ -857,26 +966,210 @@  static int do_video_out(OutputFile *of, OutputStream *ost, AVFrame *in_picture)
     }
 #endif
 
-    ret = submit_encode_frame(of, ost, in_picture);
-    return (ret == AVERROR_EOF) ? 0 : ret;
+    // XXX check EOF handling
+    return submit_encode_frame(of, ost, in_picture, pkt);
+}
+
+static int frame_encode(OutputStream *ost, AVFrame *frame, AVPacket *pkt)
+{
+    OutputFile *of = output_files[ost->file_index];
+    enum AVMediaType type = ost->type;
+    int ret;
+
+    if (type == AVMEDIA_TYPE_SUBTITLE) {
+        // no flushing for subtitles
+        return frame ?
+               do_subtitle_out(of, ost, (AVSubtitle*)frame->buf[0]->data, pkt) : 0;
+    }
+
+    // XXX
+    if (frame) {
+        ret = (type == AVMEDIA_TYPE_VIDEO) ? do_video_out(of, ost, frame, pkt) :
+                                             do_audio_out(of, ost, frame, pkt);
+        if (ret < 0)
+            return ret;
+    }
+
+    return frame ? 0 : submit_encode_frame(of, ost, NULL, pkt);
+}
+
+static void enc_thread_set_name(const OutputStream *ost)
+{
+    char name[16];
+    snprintf(name, sizeof(name), "enc%d:%d:%s", ost->file_index, ost->index,
+             ost->enc_ctx->codec->name);
+    ff_thread_setname(name);
+}
+
+static void enc_thread_uninit(EncoderThread *et)
+{
+    av_packet_free(&et->pkt);
+    av_frame_free(&et->frame);
+
+    memset(et, 0, sizeof(*et));
+}
+
+static int enc_thread_init(EncoderThread *et)
+{
+    memset(et, 0, sizeof(*et));
+
+    et->frame = av_frame_alloc();
+    if (!et->frame)
+        goto fail;
+
+    et->pkt = av_packet_alloc();
+    if (!et->pkt)
+        goto fail;
+
+    return 0;
+
+fail:
+    enc_thread_uninit(et);
+    return AVERROR(ENOMEM);
+}
+
+static void *encoder_thread(void *arg)
+{
+    OutputStream *ost = arg;
+    OutputFile    *of = output_files[ost->file_index];
+    Encoder        *e = ost->enc;
+    EncoderThread et;
+    int ret = 0, input_status = 0;
+
+    ret = enc_thread_init(&et);
+    if (ret < 0)
+        goto finish;
+
+    enc_thread_set_name(ost);
+
+    while (!input_status) {
+        int dummy;
+
+        input_status = tq_receive(e->queue_in, &dummy, et.frame);
+        if (input_status < 0)
+            av_log(ost, AV_LOG_VERBOSE, "Encoder thread received EOF\n");
+
+        ret = frame_encode(ost, input_status >= 0 ? et.frame : NULL, et.pkt);
+
+        av_packet_unref(et.pkt);
+        av_frame_unref(et.frame);
+
+        if (ret < 0) {
+            if (ret == AVERROR_EOF)
+                av_log(ost, AV_LOG_VERBOSE, "Encoder returned EOF, finishing\n");
+            else
+                av_log(ost, AV_LOG_ERROR, "Error encoding a frame: %s\n",
+                       av_err2str(ret));
+            break;
+        }
+
+        // signal to the consumer thread that the frame was encoded
+        ret = tq_send(e->queue_out, 0, et.pkt);
+        if (ret < 0) {
+            if (ret != AVERROR_EOF)
+                av_log(ost, AV_LOG_ERROR,
+                       "Error communicating with the main thread\n");
+            break;
+        }
+    }
+
+    // EOF is normal thread termination
+    if (ret == AVERROR_EOF)
+        ret = 0;
+
+finish:
+    if (ost->sq_idx_encode >= 0)
+        sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL));
+
+    tq_receive_finish(e->queue_in,  0);
+    tq_send_finish   (e->queue_out, 0);
+
+    enc_thread_uninit(&et);
+
+    av_log(ost, AV_LOG_VERBOSE, "Terminating encoder thread\n");
+
+    return (void*)(intptr_t)ret;
 }
 
 int enc_frame(OutputStream *ost, AVFrame *frame)
 {
     OutputFile *of = output_files[ost->file_index];
-    int ret;
+    Encoder     *e = ost->enc;
+    int ret, thread_ret;
 
     ret = enc_open(ost, frame);
     if (ret < 0)
         return ret;
 
-    return ost->enc_ctx->codec_type == AVMEDIA_TYPE_VIDEO ?
-           do_video_out(of, ost, frame) : do_audio_out(of, ost, frame);
+    // thread already joined
+    // XXX check EOF handling
+    if (!e->queue_in)
+        return AVERROR_EOF;
+
+    // send the frame/EOF to the encoder thread
+    if (frame) {
+        ret = tq_send(e->queue_in, 0, frame);
+        if (ret < 0)
+            goto finish;
+    } else
+        tq_send_finish(e->queue_in, 0);
+
+    // retrieve all encoded data for the frame
+    while (1) {
+        int dummy;
+
+        ret = tq_receive(e->queue_out, &dummy, e->pkt);
+        if (ret < 0)
+            break;
+
+        // frame fully encoded
+        if (!e->pkt->data && !e->pkt->side_data_elems)
+            return 0;
+
+        // process the encoded packet
+        ret = of_output_packet(of, ost, e->pkt);
+        if (ret < 0)
+            goto finish;
+    }
+
+finish:
+    thread_ret = enc_thread_stop(e);
+    if (thread_ret < 0) {
+        av_log(ost, AV_LOG_ERROR, "Encoder thread returned error: %s\n",
+               av_err2str(thread_ret));
+        ret = err_merge(ret, thread_ret);
+    }
+
+    if (ret < 0 && ret != AVERROR_EOF)
+        return ret;
+
+    // signal EOF to the muxer
+    return of_output_packet(of, ost, NULL);
+}
+
+int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub)
+{
+    Encoder *e = ost->enc;
+    AVFrame *f = e->sub_frame;
+    int ret;
+
+    // XXX the queue for transferring data to the encoder thread runs
+    // on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put
+    // that inside the frame
+    // eventually, subtitles should be switched to use AVFrames natively
+    ret = subtitle_wrap_frame(f, sub, 1);
+    if (ret < 0)
+        return ret;
+
+    ret = enc_frame(ost, f);
+    av_frame_unref(f);
+
+    return ret;
 }
 
 int enc_flush(void)
 {
-    int ret;
+    int ret = 0;
 
     for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
         OutputFile      *of = output_files[ost->file_index];
@@ -887,16 +1180,19 @@  int enc_flush(void)
     for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
         Encoder          *e = ost->enc;
         AVCodecContext *enc = ost->enc_ctx;
-        OutputFile      *of = output_files[ost->file_index];
+        int err;
 
         if (!enc || !e->opened ||
             (enc->codec_type != AVMEDIA_TYPE_VIDEO && enc->codec_type != AVMEDIA_TYPE_AUDIO))
             continue;
 
-        ret = submit_encode_frame(of, ost, NULL);
-        if (ret != AVERROR_EOF)
-            return ret;
+        err = enc_frame(ost, NULL);
+        // XXX check EOF handling
+        if (err != AVERROR_EOF && ret < 0)
+            ret = err_merge(ret, err);
+
+        av_assert0(!e->queue_in);
     }
 
-    return 0;
+    return ret;
 }