diff mbox series

[FFmpeg-devel,23/27] WIP fftools/ffmpeg_dec: convert to the scheduler

Message ID 20230919191044.18873-24-anton@khirnov.net
State New
Headers show
Series [FFmpeg-devel,01/27] fftools/ffmpeg: move derivation of frame duration from filter framerate | expand

Commit Message

Anton Khirnov Sept. 19, 2023, 7:10 p.m. UTC
---
 fftools/ffmpeg.c     |   9 +--
 fftools/ffmpeg.h     |  11 ---
 fftools/ffmpeg_dec.c | 189 +++++++++----------------------------------
 3 files changed, 42 insertions(+), 167 deletions(-)
diff mbox series

Patch

diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index 00e57c4382..a09a9e1200 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -810,11 +810,6 @@  static int process_input_packet(InputStream *ist, const AVPacket *pkt, int no_eo
     int ret = 0;
     int eof_reached = 0;
 
-    if (ist->decoding_needed) {
-        ret = dec_packet(ist, pkt, no_eof);
-        if (ret < 0 && ret != AVERROR_EOF)
-            return ret;
-    }
     if (ret == AVERROR_EOF || (!pkt && !ist->decoding_needed))
         eof_reached = 1;
 
@@ -1036,6 +1031,7 @@  static void reset_eagain(void)
         ost->unavailable = 0;
 }
 
+#if 0
 static void decode_flush(InputFile *ifile)
 {
     for (int i = 0; i < ifile->nb_streams; i++) {
@@ -1047,6 +1043,7 @@  static void decode_flush(InputFile *ifile)
         dec_packet(ist, NULL, 1);
     }
 }
+#endif
 
 /*
  * Return
@@ -1063,11 +1060,13 @@  static int process_input(int file_index, AVPacket *pkt)
 
     ret = 0;
 
+#if 0
     if (ret == 1) {
         /* the input file is looped: flush the decoders */
         decode_flush(ifile);
         return AVERROR(EAGAIN);
     }
+#endif
     if (ret < 0) {
         if (ret != AVERROR_EOF) {
             av_log(ifile, AV_LOG_ERROR,
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index 4646c05bea..841f8d0d68 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -815,17 +815,6 @@  int hwaccel_retrieve_data(AVCodecContext *avctx, AVFrame *input);
 int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx);
 void dec_free(Decoder **pdec);
 
-/**
- * Submit a packet for decoding
- *
- * When pkt==NULL and no_eof=0, there will be no more input. Flush decoders and
- * mark all downstreams as finished.
- *
- * When pkt==NULL and no_eof=1, the stream was reset (e.g. after a seek). Flush
- * decoders and await further input.
- */
-int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof);
-
 int enc_alloc(Encoder **penc, const AVCodec *codec,
               Scheduler *sch, unsigned sch_idx);
 void enc_free(Encoder **penc);
diff --git a/fftools/ffmpeg_dec.c b/fftools/ffmpeg_dec.c
index dc8d0374a3..400fa666b9 100644
--- a/fftools/ffmpeg_dec.c
+++ b/fftools/ffmpeg_dec.c
@@ -53,24 +53,6 @@  struct Decoder {
 
     Scheduler      *sch;
     unsigned        sch_idx;
-
-    pthread_t       thread;
-    /**
-     * Queue for sending coded packets from the main thread to
-     * the decoder thread.
-     *
-     * An empty packet is sent to flush the decoder without terminating
-     * decoding.
-     */
-    ThreadQueue    *queue_in;
-    /**
-     * Queue for sending decoded frames from the decoder thread
-     * to the main thread.
-     *
-     * An empty frame is sent to signal that a single packet has been fully
-     * processed.
-     */
-    ThreadQueue    *queue_out;
 };
 
 // data that is local to the decoder thread and not visible outside of it
@@ -79,24 +61,6 @@  typedef struct DecThreadContext {
     AVPacket        *pkt;
 } DecThreadContext;
 
-static int dec_thread_stop(Decoder *d)
-{
-    void *ret;
-
-    if (!d->queue_in)
-        return 0;
-
-    tq_send_finish(d->queue_in, 0);
-    tq_receive_finish(d->queue_out, 0);
-
-    pthread_join(d->thread, &ret);
-
-    tq_free(&d->queue_in);
-    tq_free(&d->queue_out);
-
-    return (intptr_t)ret;
-}
-
 void dec_free(Decoder **pdec)
 {
     Decoder *dec = *pdec;
@@ -104,8 +68,6 @@  void dec_free(Decoder **pdec)
     if (!dec)
         return;
 
-    dec_thread_stop(dec);
-
     av_frame_free(&dec->frame);
     av_packet_free(&dec->pkt);
 
@@ -147,25 +109,6 @@  fail:
     return AVERROR(ENOMEM);
 }
 
-static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame)
-{
-    int i, ret = 0;
-
-    for (i = 0; i < ist->nb_filters; i++) {
-        ret = ifilter_send_frame(ist->filters[i], decoded_frame,
-                                 i < ist->nb_filters - 1 ||
-                                 ist->dec->type == AVMEDIA_TYPE_SUBTITLE);
-        if (ret == AVERROR_EOF)
-            ret = 0; /* ignore */
-        if (ret < 0) {
-            av_log(NULL, AV_LOG_ERROR,
-                   "Failed to inject frame into filter network: %s\n", av_err2str(ret));
-            break;
-        }
-    }
-    return ret;
-}
-
 static AVRational audio_samplerate_update(void *logctx, Decoder *d,
                                           const AVFrame *frame)
 {
@@ -420,7 +363,8 @@  static int process_subtitle(InputStream *ist, AVFrame *frame)
     if (!subtitle)
         return 0;
 
-    ret = send_frame_to_filters(ist, frame);
+    // XXX
+    //ret = send_frame_to_filters(ist, frame);
     if (ret < 0)
         return ret;
 
@@ -495,7 +439,7 @@  static int transcode_subtitles(InputStream *ist, const AVPacket *pkt,
 
     ist->frames_decoded++;
 
-    // XXX the queue for transferring data back to the main thread runs
+    // XXX the queue for transferring data to consumers runs
     // on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put that
     // inside the frame
     // eventually, subtitles should be switched to use AVFrames natively
@@ -508,26 +452,11 @@  static int transcode_subtitles(InputStream *ist, const AVPacket *pkt,
     frame->width  = ist->dec_ctx->width;
     frame->height = ist->dec_ctx->height;
 
-    ret = tq_send(d->queue_out, 0, frame);
+    ret = sch_dec_send(d->sch, d->sch_idx, frame);
     if (ret < 0)
         av_frame_unref(frame);
 
-    return ret;
-}
-
-static int send_filter_eof(InputStream *ist)
-{
-    Decoder *d = ist->decoder;
-    int i, ret;
-
-    for (i = 0; i < ist->nb_filters; i++) {
-        int64_t end_pts = d->last_frame_pts == AV_NOPTS_VALUE ? AV_NOPTS_VALUE :
-                          d->last_frame_pts + d->last_frame_duration_est;
-        ret = ifilter_send_eof(ist->filters[i], end_pts, d->last_frame_tb);
-        if (ret < 0)
-            return ret;
-    }
-    return 0;
+    return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
 }
 
 static int packet_decode(InputStream *ist, const AVPacket *pkt, AVFrame *frame)
@@ -629,9 +558,9 @@  static int packet_decode(InputStream *ist, const AVPacket *pkt, AVFrame *frame)
 
         ist->frames_decoded++;
 
-        ret = tq_send(d->queue_out, 0, frame);
+        ret = sch_dec_send(d->sch, d->sch_idx, frame);
         if (ret < 0)
-            return ret;
+            return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
     }
 }
 
@@ -685,9 +614,9 @@  void *decoder_thread(void *arg)
     dec_thread_set_name(ist);
 
     while (!input_status) {
-        int dummy, flush_buffers;
+        int flush_buffers;
 
-        input_status = tq_receive(d->queue_in, &dummy, dt.pkt);
+        input_status = sch_dec_receive(d->sch, d->sch_idx, dt.pkt);
         flush_buffers = input_status >= 0 && !dt.pkt->buf;
         if (!dt.pkt->buf)
             av_log(ist, AV_LOG_VERBOSE, "Decoder thread received %s packet\n",
@@ -698,6 +627,14 @@  void *decoder_thread(void *arg)
         av_packet_unref(dt.pkt);
         av_frame_unref(dt.frame);
 
+        // AVERROR_EOF  - EOF from the decoder
+        // AVERROR_EXIT - EOF from the scheduler
+        // we treat them differently when flushing
+        if (ret == AVERROR_EXIT) {
+            ret = AVERROR_EOF;
+            flush_buffers = 0;
+        }
+
         if (ret == AVERROR_EOF) {
             av_log(ist, AV_LOG_VERBOSE, "Decoder returned EOF, %s\n",
                    flush_buffers ? "resetting" : "finishing");
@@ -725,23 +662,32 @@  void *decoder_thread(void *arg)
                    av_err2str(ret));
             break;
         }
-
-        // signal to the consumer thread that the entire packet was processed
-        ret = tq_send(d->queue_out, 0, dt.frame);
-        if (ret < 0) {
-            if (ret != AVERROR_EOF)
-                av_log(ist, AV_LOG_ERROR, "Error communicating with the main thread\n");
-            break;
-        }
     }
 
     // EOF is normal thread termination
     if (ret == AVERROR_EOF)
         ret = 0;
 
+    // on success send EOF timestamp to our downstreams
+    if (ret >= 0) {
+        av_frame_unref(dt.frame);
+
+        dt.frame->pts = d->last_frame_pts == AV_NOPTS_VALUE ? AV_NOPTS_VALUE :
+                        d->last_frame_pts + d->last_frame_duration_est;
+        dt.frame->time_base = d->last_frame_tb;
+
+        // XXX check EOF/EXIT handling
+        ret = sch_dec_send(d->sch, d->sch_idx, dt.frame);
+        if (ret < 0 && ret != AVERROR_EOF) {
+            av_log(NULL, AV_LOG_FATAL,
+                   "Error signalling EOF timestamp: %s\n", av_err2str(ret));
+            goto finish;
+        }
+        ret = 0;
+    }
+
 finish:
-    tq_receive_finish(d->queue_in,  0);
-    tq_send_finish   (d->queue_out, 0);
+    sch_dec_send(d->sch, d->sch_idx, NULL);
 
 #if 0
     // make sure the demuxer does not get stuck waiting for audio durations
@@ -757,15 +703,12 @@  finish:
     return (void*)(intptr_t)ret;
 }
 
+#if 0
 int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
 {
     Decoder *d = ist->decoder;
     int ret = 0, thread_ret;
 
-    // thread already joined
-    if (!d->queue_in)
-        return AVERROR_EOF;
-
     // send the packet/flush request/EOF to the decoder thread
     if (pkt || no_eof) {
         av_packet_unref(d->pkt);
@@ -816,59 +759,10 @@  finish:
     if (ret < 0 && ret != AVERROR_EOF)
         return ret;
 
-    // signal EOF to our downstreams
-    ret = send_filter_eof(ist);
-    if (ret < 0) {
-        av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n");
-        return ret;
-    }
 
     return AVERROR_EOF;
 }
-
-static int dec_thread_start(InputStream *ist)
-{
-    Decoder *d = ist->decoder;
-    ObjPool *op;
-    int ret = 0;
-
-    op = objpool_alloc_packets();
-    if (!op)
-        return AVERROR(ENOMEM);
-
-    d->queue_in = tq_alloc(1, 1, op, pkt_move);
-    if (!d->queue_in) {
-        objpool_free(&op);
-        return AVERROR(ENOMEM);
-    }
-
-    op = objpool_alloc_frames();
-    if (!op)
-        goto fail;
-
-    d->queue_out = tq_alloc(1, 4, op, frame_move);
-    if (!d->queue_out) {
-        objpool_free(&op);
-        goto fail;
-    }
-
-    ret = pthread_create(&d->thread, NULL, decoder_thread, ist);
-    if (ret) {
-        ret = AVERROR(ret);
-        av_log(ist, AV_LOG_ERROR, "pthread_create() failed: %s\n",
-               av_err2str(ret));
-        goto fail;
-    }
-
-    return 0;
-fail:
-    if (ret >= 0)
-        ret = AVERROR(ENOMEM);
-
-    tq_free(&d->queue_in);
-    tq_free(&d->queue_out);
-    return ret;
-}
+#endif
 
 static enum AVPixelFormat get_format(AVCodecContext *s, const enum AVPixelFormat *pix_fmts)
 {
@@ -1121,12 +1015,5 @@  int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx)
     if (ret < 0)
         return ret;
 
-    ret = dec_thread_start(ist);
-    if (ret < 0) {
-        av_log(ist, AV_LOG_ERROR, "Error starting decoder thread: %s\n",
-               av_err2str(ret));
-        return ret;
-    }
-
     return 0;
 }