diff mbox series

[FFmpeg-devel,22/24] fftools/ffmpeg_enc: convert to the scheduler

Message ID 20231104092125.10213-23-anton@khirnov.net
State New
Headers show
Series [FFmpeg-devel,01/24] lavf/mux: do not apply max_interleave_delta to subtitles | expand

Commit Message

Anton Khirnov Nov. 4, 2023, 7:56 a.m. UTC
---
 fftools/ffmpeg.c          |   3 +-
 fftools/ffmpeg.h          |   7 +-
 fftools/ffmpeg_enc.c      | 361 ++++++--------------------------------
 fftools/ffmpeg_mux_init.c |  43 +----
 4 files changed, 66 insertions(+), 348 deletions(-)
diff mbox series

Patch

diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index 1f21008588..122424a0e1 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -507,7 +507,7 @@  static void print_report(int is_last_report, int64_t timer_start, int64_t cur_ti
     av_bprint_init(&buf, 0, AV_BPRINT_SIZE_AUTOMATIC);
     av_bprint_init(&buf_script, 0, AV_BPRINT_SIZE_AUTOMATIC);
     for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
-        const float q = ost->enc ? ost->quality / (float) FF_QP2LAMBDA : -1;
+        const float q = ost->enc ? atomic_load(&ost->quality) / (float) FF_QP2LAMBDA : -1;
 
         if (vid && ost->type == AVMEDIA_TYPE_VIDEO) {
             av_bprintf(&buf, "q=%2.1f ", q);
@@ -1127,7 +1127,6 @@  static int transcode(Scheduler *sch, int *err_rate_exceeded)
         } else if (err_rate)
             av_log(ist, AV_LOG_VERBOSE, "Decode error rate %g\n", err_rate);
     }
-    ret = err_merge(ret, enc_flush());
 
     term_exit();
 
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index c1b61c83e7..20abd5e772 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -592,7 +592,7 @@  typedef struct OutputStream {
     uint64_t samples_encoded;
 
     /* packet quality factor */
-    int quality;
+    atomic_int quality;
 
     int sq_idx_encode;
     int sq_idx_mux;
@@ -776,10 +776,7 @@  int enc_alloc(Encoder **penc, const AVCodec *codec,
               Scheduler *sch, unsigned sch_idx);
 void enc_free(Encoder **penc);
 
-int enc_open(OutputStream *ost, const AVFrame *frame);
-int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub);
-int enc_frame(OutputStream *ost, AVFrame *frame);
-int enc_flush(void);
+int enc_open(void *opaque, const AVFrame *frame);
 
 /*
  * Initialize muxing state for the given stream, should be called
diff --git a/fftools/ffmpeg_enc.c b/fftools/ffmpeg_enc.c
index fbfe592f20..9383b167f7 100644
--- a/fftools/ffmpeg_enc.c
+++ b/fftools/ffmpeg_enc.c
@@ -41,12 +41,6 @@ 
 #include "libavformat/avformat.h"
 
 struct Encoder {
-    AVFrame *sq_frame;
-
-    // packet for receiving encoded output
-    AVPacket *pkt;
-    AVFrame  *sub_frame;
-
     // combined size of all the packets received from the encoder
     uint64_t data_size;
 
@@ -54,25 +48,9 @@  struct Encoder {
     uint64_t packets_encoded;
 
     int opened;
-    int finished;
 
     Scheduler      *sch;
     unsigned        sch_idx;
-
-    pthread_t       thread;
-    /**
-     * Queue for sending frames from the main thread to
-     * the encoder thread.
-     */
-    ThreadQueue    *queue_in;
-    /**
-     * Queue for sending encoded packets from the encoder thread
-     * to the main thread.
-     *
-     * An empty packet is sent to signal that a previously sent
-     * frame has been fully processed.
-     */
-    ThreadQueue    *queue_out;
 };
 
 // data that is local to the decoder thread and not visible outside of it
@@ -81,24 +59,6 @@  typedef struct EncoderThread {
     AVPacket  *pkt;
 } EncoderThread;
 
-static int enc_thread_stop(Encoder *e)
-{
-    void *ret;
-
-    if (!e->queue_in)
-        return 0;
-
-    tq_send_finish(e->queue_in, 0);
-    tq_receive_finish(e->queue_out, 0);
-
-    pthread_join(e->thread, &ret);
-
-    tq_free(&e->queue_in);
-    tq_free(&e->queue_out);
-
-    return (int)(intptr_t)ret;
-}
-
 void enc_free(Encoder **penc)
 {
     Encoder *enc = *penc;
@@ -106,13 +66,6 @@  void enc_free(Encoder **penc)
     if (!enc)
         return;
 
-    enc_thread_stop(enc);
-
-    av_frame_free(&enc->sq_frame);
-    av_frame_free(&enc->sub_frame);
-
-    av_packet_free(&enc->pkt);
-
     av_freep(penc);
 }
 
@@ -127,25 +80,12 @@  int enc_alloc(Encoder **penc, const AVCodec *codec,
     if (!enc)
         return AVERROR(ENOMEM);
 
-    if (codec->type == AVMEDIA_TYPE_SUBTITLE) {
-        enc->sub_frame = av_frame_alloc();
-        if (!enc->sub_frame)
-            goto fail;
-    }
-
-    enc->pkt = av_packet_alloc();
-    if (!enc->pkt)
-        goto fail;
-
     enc->sch     = sch;
     enc->sch_idx = sch_idx;
 
     *penc = enc;
 
     return 0;
-fail:
-    enc_free(&enc);
-    return AVERROR(ENOMEM);
 }
 
 static int hw_device_setup_for_encode(OutputStream *ost, AVBufferRef *frames_ref)
@@ -224,52 +164,9 @@  static int set_encoder_id(OutputFile *of, OutputStream *ost)
     return 0;
 }
 
-static int enc_thread_start(OutputStream *ost)
-{
-    Encoder *e = ost->enc;
-    ObjPool *op;
-    int ret = 0;
-
-    op = objpool_alloc_frames();
-    if (!op)
-        return AVERROR(ENOMEM);
-
-    e->queue_in = tq_alloc(1, 1, op, frame_move);
-    if (!e->queue_in) {
-        objpool_free(&op);
-        return AVERROR(ENOMEM);
-    }
-
-    op = objpool_alloc_packets();
-    if (!op)
-        goto fail;
-
-    e->queue_out = tq_alloc(1, 4, op, pkt_move);
-    if (!e->queue_out) {
-        objpool_free(&op);
-        goto fail;
-    }
-
-    ret = pthread_create(&e->thread, NULL, encoder_thread, ost);
-    if (ret) {
-        ret = AVERROR(ret);
-        av_log(ost, AV_LOG_ERROR, "pthread_create() failed: %s\n",
-               av_err2str(ret));
-        goto fail;
-    }
-
-    return 0;
-fail:
-    if (ret >= 0)
-        ret = AVERROR(ENOMEM);
-
-    tq_free(&e->queue_in);
-    tq_free(&e->queue_out);
-    return ret;
-}
-
-int enc_open(OutputStream *ost, const AVFrame *frame)
+int enc_open(void *opaque, const AVFrame *frame)
 {
+    OutputStream *ost = opaque;
     InputStream *ist = ost->ist;
     Encoder              *e = ost->enc;
     AVCodecContext *enc_ctx = ost->enc_ctx;
@@ -277,6 +174,7 @@  int enc_open(OutputStream *ost, const AVFrame *frame)
     const AVCodec      *enc = enc_ctx->codec;
     OutputFile      *of = output_files[ost->file_index];
     FrameData *fd;
+    int frame_samples = 0;
     int ret;
 
     if (e->opened)
@@ -420,17 +318,8 @@  int enc_open(OutputStream *ost, const AVFrame *frame)
 
     e->opened = 1;
 
-    if (ost->sq_idx_encode >= 0) {
-        e->sq_frame = av_frame_alloc();
-        if (!e->sq_frame)
-            return AVERROR(ENOMEM);
-    }
-
-    if (ost->enc_ctx->frame_size) {
-        av_assert0(ost->sq_idx_encode >= 0);
-        sq_frame_samples(output_files[ost->file_index]->sq_encode,
-                         ost->sq_idx_encode, ost->enc_ctx->frame_size);
-    }
+    if (ost->enc_ctx->frame_size)
+        frame_samples = ost->enc_ctx->frame_size;
 
     ret = check_avoptions(ost->encoder_opts);
     if (ret < 0)
@@ -476,18 +365,11 @@  int enc_open(OutputStream *ost, const AVFrame *frame)
     if (ost->st->time_base.num <= 0 || ost->st->time_base.den <= 0)
         ost->st->time_base = av_add_q(ost->enc_ctx->time_base, (AVRational){0, 1});
 
-    ret = enc_thread_start(ost);
-    if (ret < 0) {
-        av_log(ost, AV_LOG_ERROR, "Error starting encoder thread: %s\n",
-               av_err2str(ret));
-        return ret;
-    }
-
     ret = of_stream_init(of, ost);
     if (ret < 0)
         return ret;
 
-    return 0;
+    return frame_samples;
 }
 
 static int check_recording_time(OutputStream *ost, int64_t ts, AVRational tb)
@@ -514,8 +396,7 @@  static int do_subtitle_out(OutputFile *of, OutputStream *ost, const AVSubtitle *
         av_log(ost, AV_LOG_ERROR, "Subtitle packets must have a pts\n");
         return exit_on_error ? AVERROR(EINVAL) : 0;
     }
-    if (ost->finished ||
-        (of->start_time != AV_NOPTS_VALUE && sub->pts < of->start_time))
+    if ((of->start_time != AV_NOPTS_VALUE && sub->pts < of->start_time))
         return 0;
 
     enc = ost->enc_ctx;
@@ -579,7 +460,7 @@  static int do_subtitle_out(OutputFile *of, OutputStream *ost, const AVSubtitle *
         }
         pkt->dts = pkt->pts;
 
-        ret = tq_send(e->queue_out, 0, pkt);
+        ret = sch_enc_send(e->sch, e->sch_idx, pkt);
         if (ret < 0) {
             av_packet_unref(pkt);
             return ret;
@@ -671,10 +552,13 @@  static int update_video_stats(OutputStream *ost, const AVPacket *pkt, int write_
     int64_t frame_number;
     double ti1, bitrate, avg_bitrate;
     double psnr_val = -1;
+    int quality;
 
-    ost->quality   = sd ? AV_RL32(sd) : -1;
+    quality        = sd ? AV_RL32(sd) : -1;
     pict_type      = sd ? sd[4] : AV_PICTURE_TYPE_NONE;
 
+    atomic_store(&ost->quality, quality);
+
     if ((enc->flags & AV_CODEC_FLAG_PSNR) && sd && sd[5]) {
         // FIXME the scaling assumes 8bit
         double error = AV_RL64(sd + 8) / (enc->width * enc->height * 255.0 * 255.0);
@@ -697,10 +581,10 @@  static int update_video_stats(OutputStream *ost, const AVPacket *pkt, int write_
     frame_number = e->packets_encoded;
     if (vstats_version <= 1) {
         fprintf(vstats_file, "frame= %5"PRId64" q= %2.1f ", frame_number,
-                ost->quality / (float)FF_QP2LAMBDA);
+                quality / (float)FF_QP2LAMBDA);
     } else  {
         fprintf(vstats_file, "out= %2d st= %2d frame= %5"PRId64" q= %2.1f ", ost->file_index, ost->index, frame_number,
-                ost->quality / (float)FF_QP2LAMBDA);
+                quality / (float)FF_QP2LAMBDA);
     }
 
     if (psnr_val >= 0)
@@ -805,7 +689,7 @@  static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame,
 
         e->packets_encoded++;
 
-        ret = tq_send(e->queue_out, 0, pkt);
+        ret = sch_enc_send(e->sch, e->sch_idx, pkt);
         if (ret < 0) {
             av_packet_unref(pkt);
             return ret;
@@ -815,50 +699,6 @@  static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame,
     av_assert0(0);
 }
 
-static int submit_encode_frame(OutputFile *of, OutputStream *ost,
-                               AVFrame *frame, AVPacket *pkt)
-{
-    Encoder *e = ost->enc;
-    int ret;
-
-    if (ost->sq_idx_encode < 0)
-        return encode_frame(of, ost, frame, pkt);
-
-    if (frame) {
-        ret = av_frame_ref(e->sq_frame, frame);
-        if (ret < 0)
-            return ret;
-        frame = e->sq_frame;
-    }
-
-    ret = sq_send(of->sq_encode, ost->sq_idx_encode,
-                  SQFRAME(frame));
-    if (ret < 0) {
-        if (frame)
-            av_frame_unref(frame);
-        if (ret != AVERROR_EOF)
-            return ret;
-    }
-
-    while (1) {
-        AVFrame *enc_frame = e->sq_frame;
-
-        ret = sq_receive(of->sq_encode, ost->sq_idx_encode,
-                               SQFRAME(enc_frame));
-        if (ret == AVERROR_EOF) {
-            enc_frame = NULL;
-        } else if (ret < 0) {
-            return (ret == AVERROR(EAGAIN)) ? 0 : ret;
-        }
-
-        ret = encode_frame(of, ost, enc_frame, pkt);
-        if (enc_frame)
-            av_frame_unref(enc_frame);
-        if (ret < 0)
-            return ret;
-    }
-}
-
 static int do_audio_out(OutputFile *of, OutputStream *ost,
                         AVFrame *frame, AVPacket *pkt)
 {
@@ -874,7 +714,7 @@  static int do_audio_out(OutputFile *of, OutputStream *ost,
     if (!check_recording_time(ost, frame->pts, frame->time_base))
         return AVERROR_EOF;
 
-    return submit_encode_frame(of, ost, frame, pkt);
+    return encode_frame(of, ost, frame, pkt);
 }
 
 static enum AVPictureType forced_kf_apply(void *logctx, KeyframeForceCtx *kf,
@@ -942,7 +782,7 @@  static int do_video_out(OutputFile *of, OutputStream *ost,
     }
 #endif
 
-    return submit_encode_frame(of, ost, in_picture, pkt);
+    return encode_frame(of, ost, in_picture, pkt);
 }
 
 static int frame_encode(OutputStream *ost, AVFrame *frame, AVPacket *pkt)
@@ -951,9 +791,12 @@  static int frame_encode(OutputStream *ost, AVFrame *frame, AVPacket *pkt)
     enum AVMediaType type = ost->type;
 
     if (type == AVMEDIA_TYPE_SUBTITLE) {
+        const AVSubtitle *subtitle = frame && frame->buf[0] ?
+                                     (AVSubtitle*)frame->buf[0]->data : NULL;
+
         // no flushing for subtitles
-        return frame ?
-               do_subtitle_out(of, ost, (AVSubtitle*)frame->buf[0]->data, pkt) : 0;
+        return subtitle && subtitle->num_rects ?
+               do_subtitle_out(of, ost, subtitle, pkt) : 0;
     }
 
     if (frame) {
@@ -961,7 +804,7 @@  static int frame_encode(OutputStream *ost, AVFrame *frame, AVPacket *pkt)
                                               do_audio_out(of, ost, frame, pkt);
     }
 
-    return submit_encode_frame(of, ost, NULL, pkt);
+    return  encode_frame(of, ost, NULL, pkt);
 }
 
 static void enc_thread_set_name(const OutputStream *ost)
@@ -1002,24 +845,50 @@  fail:
 void *encoder_thread(void *arg)
 {
     OutputStream *ost = arg;
-    OutputFile    *of = output_files[ost->file_index];
     Encoder        *e = ost->enc;
     EncoderThread et;
     int ret = 0, input_status = 0;
+    int name_set = 0;
 
     ret = enc_thread_init(&et);
     if (ret < 0)
         goto finish;
 
-    enc_thread_set_name(ost);
+    /* Open the subtitle encoders immediately. AVFrame-based encoders
+     * are opened through a callback from the scheduler once they get
+     * their first frame
+     *
+     * N.B.: because the callback is called from a different thread,
+     * enc_ctx MUST NOT be accessed before sch_enc_receive() returns
+     * for the first time for audio/video. */
+    if (ost->type != AVMEDIA_TYPE_VIDEO && ost->type != AVMEDIA_TYPE_AUDIO) {
+        ret = enc_open(ost, NULL);
+        if (ret < 0)
+            goto finish;
+    }
 
     while (!input_status) {
-        int dummy;
-
-        input_status = tq_receive(e->queue_in, &dummy, et.frame);
-        if (input_status < 0)
+        input_status = sch_enc_receive(e->sch, e->sch_idx, et.frame);
+        if (input_status == AVERROR_EOF) {
             av_log(ost, AV_LOG_VERBOSE, "Encoder thread received EOF\n");
 
+            if (!e->opened) {
+                av_log(ost, AV_LOG_ERROR, "Could not open encoder before EOF\n");
+                ret = AVERROR(EINVAL);
+                goto finish;
+            }
+        } else if (input_status < 0) {
+            ret = input_status;
+            av_log(ost, AV_LOG_ERROR, "Error receiving a frame for encoding: %s\n",
+                   av_err2str(ret));
+            goto finish;
+        }
+
+        if (!name_set) {
+            enc_thread_set_name(ost);
+            name_set = 1;
+        }
+
         ret = frame_encode(ost, input_status >= 0 ? et.frame : NULL, et.pkt);
 
         av_packet_unref(et.pkt);
@@ -1033,15 +902,6 @@  void *encoder_thread(void *arg)
                        av_err2str(ret));
             break;
         }
-
-        // signal to the consumer thread that the frame was encoded
-        ret = tq_send(e->queue_out, 0, et.pkt);
-        if (ret < 0) {
-            if (ret != AVERROR_EOF)
-                av_log(ost, AV_LOG_ERROR,
-                       "Error communicating with the main thread\n");
-            break;
-        }
     }
 
     // EOF is normal thread termination
@@ -1049,118 +909,7 @@  void *encoder_thread(void *arg)
         ret = 0;
 
 finish:
-    if (ost->sq_idx_encode >= 0)
-        sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL));
-
-    tq_receive_finish(e->queue_in,  0);
-    tq_send_finish   (e->queue_out, 0);
-
     enc_thread_uninit(&et);
 
-    av_log(ost, AV_LOG_VERBOSE, "Terminating encoder thread\n");
-
     return (void*)(intptr_t)ret;
 }
-
-int enc_frame(OutputStream *ost, AVFrame *frame)
-{
-    OutputFile *of = output_files[ost->file_index];
-    Encoder     *e = ost->enc;
-    int ret, thread_ret;
-
-    ret = enc_open(ost, frame);
-    if (ret < 0)
-        return ret;
-
-    if (!e->queue_in)
-        return AVERROR_EOF;
-
-    // send the frame/EOF to the encoder thread
-    if (frame) {
-        ret = tq_send(e->queue_in, 0, frame);
-        if (ret < 0)
-            goto finish;
-    } else
-        tq_send_finish(e->queue_in, 0);
-
-    // retrieve all encoded data for the frame
-    while (1) {
-        int dummy;
-
-        ret = tq_receive(e->queue_out, &dummy, e->pkt);
-        if (ret < 0)
-            break;
-
-        // frame fully encoded
-        if (!e->pkt->data && !e->pkt->side_data_elems)
-            return 0;
-
-        // process the encoded packet
-        ret = of_output_packet(of, ost, e->pkt);
-        if (ret < 0)
-            goto finish;
-    }
-
-finish:
-    thread_ret = enc_thread_stop(e);
-    if (thread_ret < 0) {
-        av_log(ost, AV_LOG_ERROR, "Encoder thread returned error: %s\n",
-               av_err2str(thread_ret));
-        ret = err_merge(ret, thread_ret);
-    }
-
-    if (ret < 0 && ret != AVERROR_EOF)
-        return ret;
-
-    // signal EOF to the muxer
-    return of_output_packet(of, ost, NULL);
-}
-
-int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub)
-{
-    Encoder *e = ost->enc;
-    AVFrame *f = e->sub_frame;
-    int ret;
-
-    // XXX the queue for transferring data to the encoder thread runs
-    // on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put
-    // that inside the frame
-    // eventually, subtitles should be switched to use AVFrames natively
-    ret = subtitle_wrap_frame(f, sub, 1);
-    if (ret < 0)
-        return ret;
-
-    ret = enc_frame(ost, f);
-    av_frame_unref(f);
-
-    return ret;
-}
-
-int enc_flush(void)
-{
-    int ret = 0;
-
-    for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
-        OutputFile      *of = output_files[ost->file_index];
-        if (ost->sq_idx_encode >= 0)
-            sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL));
-    }
-
-    for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
-        Encoder          *e = ost->enc;
-        AVCodecContext *enc = ost->enc_ctx;
-        int err;
-
-        if (!enc || !e->opened ||
-            (enc->codec_type != AVMEDIA_TYPE_VIDEO && enc->codec_type != AVMEDIA_TYPE_AUDIO))
-            continue;
-
-        err = enc_frame(ost, NULL);
-        if (err != AVERROR_EOF && ret < 0)
-            ret = err_merge(ret, err);
-
-        av_assert0(!e->queue_in);
-    }
-
-    return ret;
-}
diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c
index 21ab6445a1..bc3d3b9902 100644
--- a/fftools/ffmpeg_mux_init.c
+++ b/fftools/ffmpeg_mux_init.c
@@ -1189,7 +1189,8 @@  static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
         if (!ost->enc_ctx)
             return AVERROR(ENOMEM);
 
-        ret = sch_add_enc(mux->sch, encoder_thread, ost, NULL);
+        ret = sch_add_enc(mux->sch, encoder_thread, ost,
+                          ost->type == AVMEDIA_TYPE_SUBTITLE ? NULL : enc_open);
         if (ret < 0)
             return ret;
         ms->sch_idx_enc = ret;
@@ -2624,23 +2625,6 @@  static int validate_enc_avopt(Muxer *mux, const AVDictionary *codec_avopt)
     return 0;
 }
 
-static int init_output_stream_nofilter(OutputStream *ost)
-{
-    int ret = 0;
-
-    if (ost->enc_ctx) {
-        ret = enc_open(ost, NULL);
-        if (ret < 0)
-            return ret;
-    } else {
-        ret = of_stream_init(output_files[ost->file_index], ost);
-        if (ret < 0)
-            return ret;
-    }
-
-    return ret;
-}
-
 static const char *output_file_item_name(void *obj)
 {
     const Muxer *mux = obj;
@@ -2826,26 +2810,15 @@  int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
 
     of->url        = filename;
 
-    /* initialize stream copy and subtitle/data streams.
-     * Encoded AVFrame based streams will get initialized when the first AVFrame
-     * is received in do_video_out
-     */
+    /* initialize streamcopy streams. */
     for (int i = 0; i < of->nb_streams; i++) {
         OutputStream *ost = of->streams[i];
 
-        if (ost->filter)
-            continue;
-
-        err = init_output_stream_nofilter(ost);
-        if (err < 0)
-            return err;
-    }
-
-    /* write the header for files with no streams */
-    if (of->format->flags & AVFMT_NOSTREAMS && oc->nb_streams == 0) {
-        int ret = mux_check_init(mux);
-        if (ret < 0)
-            return ret;
+        if (!ost->enc) {
+            err = of_stream_init(of, ost);
+            if (err < 0)
+                return err;
+        }
     }
 
     return 0;