diff mbox series

[FFmpeg-devel,20/24] fftools/ffmpeg_dec: convert to the scheduler

Message ID 20231104092125.10213-21-anton@khirnov.net
State New
Headers show
Series [FFmpeg-devel,01/24] lavf/mux: do not apply max_interleave_delta to subtitles | expand

Commit Message

Anton Khirnov Nov. 4, 2023, 7:56 a.m. UTC
---
 fftools/ffmpeg.c     |  22 ---
 fftools/ffmpeg.h     |  13 +-
 fftools/ffmpeg_dec.c | 315 ++++++++++---------------------------------
 3 files changed, 70 insertions(+), 280 deletions(-)

Comments

Michael Niedermayer Nov. 4, 2023, 6:30 p.m. UTC | #1
On Sat, Nov 04, 2023 at 08:56:29AM +0100, Anton Khirnov wrote:
> ---
>  fftools/ffmpeg.c     |  22 ---
>  fftools/ffmpeg.h     |  13 +-
>  fftools/ffmpeg_dec.c | 315 ++++++++++---------------------------------
>  3 files changed, 70 insertions(+), 280 deletions(-)

This or the previous commit (which doesnt build without this)
breaks:
ffmpeg -y -i tickets//1714/fake_mjpeg_stream.wmv -t 1 /tmp/file1714.avi

(infinite loop)
it can be quit with a "q" press here but with all patches applied it
seems "q" is not enough to kill it

seems on the samples server here: ffmpeg-bugs/trac/ticket1714/fake_mjpeg_stream.wmv

thx

[...]
Anton Khirnov Nov. 11, 2023, 3:24 p.m. UTC | #2
Quoting Michael Niedermayer (2023-11-04 19:30:39)
> On Sat, Nov 04, 2023 at 08:56:29AM +0100, Anton Khirnov wrote:
> > ---
> >  fftools/ffmpeg.c     |  22 ---
> >  fftools/ffmpeg.h     |  13 +-
> >  fftools/ffmpeg_dec.c | 315 ++++++++++---------------------------------
> >  3 files changed, 70 insertions(+), 280 deletions(-)
> 
> This or the previous commit (which doesnt build without this)
> breaks:
> ffmpeg -y -i tickets//1714/fake_mjpeg_stream.wmv -t 1 /tmp/file1714.avi
> 
> (infinite loop)
> it can be quit with a "q" press here but with all patches applied it
> seems "q" is not enough to kill it
> 
> seems on the samples server here: ffmpeg-bugs/trac/ticket1714/fake_mjpeg_stream.wmv

Should be fixed after v3 of 18/24; updated my branch as well.
diff mbox series

Patch

diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index 611ac4621d..bd783fe674 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -778,11 +778,6 @@  static int process_input_packet(InputStream *ist, const AVPacket *pkt, int no_eo
     int ret = 0;
     int eof_reached = 0;
 
-    if (ist->decoding_needed) {
-        ret = dec_packet(ist, pkt, no_eof);
-        if (ret < 0 && ret != AVERROR_EOF)
-            return ret;
-    }
     if (ret == AVERROR_EOF || (!pkt && !ist->decoding_needed))
         eof_reached = 1;
 
@@ -994,18 +989,6 @@  static void reset_eagain(void)
         ost->unavailable = 0;
 }
 
-static void decode_flush(InputFile *ifile)
-{
-    for (int i = 0; i < ifile->nb_streams; i++) {
-        InputStream *ist = ifile->streams[i];
-
-        if (ist->discard || !ist->decoding_needed)
-            continue;
-
-        dec_packet(ist, NULL, 1);
-    }
-}
-
 /*
  * Return
  * - 0 -- one packet was read and processed
@@ -1021,11 +1004,6 @@  static int process_input(int file_index, AVPacket *pkt)
 
     ret = 0;
 
-    if (ret == 1) {
-        /* the input file is looped: flush the decoders */
-        decode_flush(ifile);
-        return AVERROR(EAGAIN);
-    }
     if (ret < 0) {
         if (ret != AVERROR_EOF) {
             av_log(ifile, AV_LOG_ERROR,
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index 73b3e54fb0..975d8b737e 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -61,6 +61,8 @@ 
 #define FFMPEG_OPT_TOP 1
 #define FFMPEG_OPT_FORCE_KF_SOURCE_NO_DROP 1
 
+#define FFMPEG_ERROR_RATE_EXCEEDED FFERRTAG('E', 'R', 'E', 'D')
+
 enum VideoSyncMethod {
     VSYNC_AUTO = -1,
     VSYNC_PASSTHROUGH,
@@ -796,17 +798,6 @@  int hwaccel_retrieve_data(AVCodecContext *avctx, AVFrame *input);
 int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx);
 void dec_free(Decoder **pdec);
 
-/**
- * Submit a packet for decoding
- *
- * When pkt==NULL and no_eof=0, there will be no more input. Flush decoders and
- * mark all downstreams as finished.
- *
- * When pkt==NULL and no_eof=1, the stream was reset (e.g. after a seek). Flush
- * decoders and await further input.
- */
-int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof);
-
 int enc_alloc(Encoder **penc, const AVCodec *codec,
               Scheduler *sch, unsigned sch_idx);
 void enc_free(Encoder **penc);
diff --git a/fftools/ffmpeg_dec.c b/fftools/ffmpeg_dec.c
index 53e14f061e..a81f83fc92 100644
--- a/fftools/ffmpeg_dec.c
+++ b/fftools/ffmpeg_dec.c
@@ -54,24 +54,6 @@  struct Decoder {
 
     Scheduler      *sch;
     unsigned        sch_idx;
-
-    pthread_t       thread;
-    /**
-     * Queue for sending coded packets from the main thread to
-     * the decoder thread.
-     *
-     * An empty packet is sent to flush the decoder without terminating
-     * decoding.
-     */
-    ThreadQueue    *queue_in;
-    /**
-     * Queue for sending decoded frames from the decoder thread
-     * to the main thread.
-     *
-     * An empty frame is sent to signal that a single packet has been fully
-     * processed.
-     */
-    ThreadQueue    *queue_out;
 };
 
 // data that is local to the decoder thread and not visible outside of it
@@ -80,24 +62,6 @@  typedef struct DecThreadContext {
     AVPacket        *pkt;
 } DecThreadContext;
 
-static int dec_thread_stop(Decoder *d)
-{
-    void *ret;
-
-    if (!d->queue_in)
-        return 0;
-
-    tq_send_finish(d->queue_in, 0);
-    tq_receive_finish(d->queue_out, 0);
-
-    pthread_join(d->thread, &ret);
-
-    tq_free(&d->queue_in);
-    tq_free(&d->queue_out);
-
-    return (intptr_t)ret;
-}
-
 void dec_free(Decoder **pdec)
 {
     Decoder *dec = *pdec;
@@ -105,8 +69,6 @@  void dec_free(Decoder **pdec)
     if (!dec)
         return;
 
-    dec_thread_stop(dec);
-
     av_frame_free(&dec->frame);
     av_packet_free(&dec->pkt);
 
@@ -148,25 +110,6 @@  fail:
     return AVERROR(ENOMEM);
 }
 
-static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame)
-{
-    int i, ret = 0;
-
-    for (i = 0; i < ist->nb_filters; i++) {
-        ret = ifilter_send_frame(ist->filters[i], decoded_frame,
-                                 i < ist->nb_filters - 1 ||
-                                 ist->dec->type == AVMEDIA_TYPE_SUBTITLE);
-        if (ret == AVERROR_EOF)
-            ret = 0; /* ignore */
-        if (ret < 0) {
-            av_log(NULL, AV_LOG_ERROR,
-                   "Failed to inject frame into filter network: %s\n", av_err2str(ret));
-            break;
-        }
-    }
-    return ret;
-}
-
 static AVRational audio_samplerate_update(void *logctx, Decoder *d,
                                           const AVFrame *frame)
 {
@@ -421,36 +364,31 @@  static int process_subtitle(InputStream *ist, AVFrame *frame)
     if (!subtitle)
         return 0;
 
-    ret = send_frame_to_filters(ist, frame);
+    ret = sch_dec_send(d->sch, d->sch_idx, frame);
     if (ret < 0)
-        return ret;
+        av_frame_unref(frame);
 
-    subtitle = (AVSubtitle*)frame->buf[0]->data;
-    if (!subtitle->num_rects)
-        return 0;
-
-    for (int oidx = 0; oidx < ist->nb_outputs; oidx++) {
-        OutputStream *ost = ist->outputs[oidx];
-        if (!ost->enc || ost->type != AVMEDIA_TYPE_SUBTITLE)
-            continue;
-
-        ret = enc_subtitle(output_files[ost->file_index], ost, subtitle);
-        if (ret < 0)
-            return ret;
-    }
-
-    return 0;
+    return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
 }
 
 static int transcode_subtitles(InputStream *ist, const AVPacket *pkt,
                                AVFrame *frame)
 {
-    Decoder          *d = ist->decoder;
+    Decoder *d = ist->decoder;
     AVPacket *flush_pkt = NULL;
     AVSubtitle subtitle;
     int got_output;
     int ret;
 
+    if (pkt && (intptr_t)pkt->opaque == PKT_OPAQUE_SUB_HEARTBEAT) {
+        frame->pts       = pkt->pts;
+        frame->time_base = pkt->time_base;
+        frame->opaque    = (void*)(intptr_t)FRAME_OPAQUE_SUB_HEARTBEAT;
+
+        ret = sch_dec_send(d->sch, d->sch_idx, frame);
+        return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
+    }
+
     if (!pkt) {
         flush_pkt = av_packet_alloc();
         if (!flush_pkt)
@@ -473,7 +411,7 @@  static int transcode_subtitles(InputStream *ist, const AVPacket *pkt,
 
     ist->frames_decoded++;
 
-    // XXX the queue for transferring data back to the main thread runs
+    // XXX the queue for transferring data to consumers runs
     // on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put that
     // inside the frame
     // eventually, subtitles should be switched to use AVFrames natively
@@ -486,26 +424,7 @@  static int transcode_subtitles(InputStream *ist, const AVPacket *pkt,
     frame->width  = ist->dec_ctx->width;
     frame->height = ist->dec_ctx->height;
 
-    ret = tq_send(d->queue_out, 0, frame);
-    if (ret < 0)
-        av_frame_unref(frame);
-
-    return ret;
-}
-
-static int send_filter_eof(InputStream *ist)
-{
-    Decoder *d = ist->decoder;
-    int i, ret;
-
-    for (i = 0; i < ist->nb_filters; i++) {
-        int64_t end_pts = d->last_frame_pts == AV_NOPTS_VALUE ? AV_NOPTS_VALUE :
-                          d->last_frame_pts + d->last_frame_duration_est;
-        ret = ifilter_send_eof(ist->filters[i], end_pts, d->last_frame_tb);
-        if (ret < 0)
-            return ret;
-    }
-    return 0;
+    return process_subtitle(ist, frame);
 }
 
 static int packet_decode(InputStream *ist, AVPacket *pkt, AVFrame *frame)
@@ -612,9 +531,11 @@  static int packet_decode(InputStream *ist, AVPacket *pkt, AVFrame *frame)
 
         ist->frames_decoded++;
 
-        ret = tq_send(d->queue_out, 0, frame);
-        if (ret < 0)
-            return ret;
+        ret = sch_dec_send(d->sch, d->sch_idx, frame);
+        if (ret < 0) {
+            av_frame_unref(frame);
+            return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
+        }
     }
 }
 
@@ -656,7 +577,6 @@  fail:
 void *decoder_thread(void *arg)
 {
     InputStream *ist = arg;
-    InputFile *ifile = input_files[ist->file_index];
     Decoder       *d = ist->decoder;
     DecThreadContext dt;
     int ret = 0, input_status = 0;
@@ -668,19 +588,30 @@  void *decoder_thread(void *arg)
     dec_thread_set_name(ist);
 
     while (!input_status) {
-        int dummy, flush_buffers;
+        int flush_buffers, have_data;
 
-        input_status = tq_receive(d->queue_in, &dummy, dt.pkt);
-        flush_buffers = input_status >= 0 && !dt.pkt->buf;
-        if (!dt.pkt->buf)
+        input_status  = sch_dec_receive(d->sch, d->sch_idx, dt.pkt);
+        have_data     = input_status >= 0 &&
+            (dt.pkt->buf || dt.pkt->side_data_elems ||
+             (intptr_t)dt.pkt->opaque == PKT_OPAQUE_SUB_HEARTBEAT);
+        flush_buffers = input_status >= 0 && !have_data;
+        if (!have_data)
             av_log(ist, AV_LOG_VERBOSE, "Decoder thread received %s packet\n",
                    flush_buffers ? "flush" : "EOF");
 
-        ret = packet_decode(ist, dt.pkt->buf ? dt.pkt : NULL, dt.frame);
+        ret = packet_decode(ist, have_data ? dt.pkt : NULL, dt.frame);
 
         av_packet_unref(dt.pkt);
         av_frame_unref(dt.frame);
 
+        // AVERROR_EOF  - EOF from the decoder
+        // AVERROR_EXIT - EOF from the scheduler
+        // we treat them differently when flushing
+        if (ret == AVERROR_EXIT) {
+            ret = AVERROR_EOF;
+            flush_buffers = 0;
+        }
+
         if (ret == AVERROR_EOF) {
             av_log(ist, AV_LOG_VERBOSE, "Decoder returned EOF, %s\n",
                    flush_buffers ? "resetting" : "finishing");
@@ -688,11 +619,10 @@  void *decoder_thread(void *arg)
             if (!flush_buffers)
                 break;
 
-            /* report last frame duration to the demuxer thread */
+            /* report last frame duration to the scheduler */
             if (ist->dec->type == AVMEDIA_TYPE_AUDIO) {
-                Timestamp ts = { .ts = d->last_frame_pts + d->last_frame_duration_est,
-                                 .tb = d->last_frame_tb };
-                av_thread_message_queue_send(ifile->audio_ts_queue, &ts, 0);
+                dt.pkt->pts       = d->last_frame_pts + d->last_frame_duration_est;
+                dt.pkt->time_base = d->last_frame_tb;
             }
 
             avcodec_flush_buffers(ist->dec_ctx);
@@ -701,149 +631,47 @@  void *decoder_thread(void *arg)
                    av_err2str(ret));
             break;
         }
-
-        // signal to the consumer thread that the entire packet was processed
-        ret = tq_send(d->queue_out, 0, dt.frame);
-        if (ret < 0) {
-            if (ret != AVERROR_EOF)
-                av_log(ist, AV_LOG_ERROR, "Error communicating with the main thread\n");
-            break;
-        }
     }
 
     // EOF is normal thread termination
     if (ret == AVERROR_EOF)
         ret = 0;
 
+    // on success send EOF timestamp to our downstreams
+    if (ret >= 0) {
+        float err_rate;
+
+        av_frame_unref(dt.frame);
+
+        dt.frame->opaque    = (void*)(intptr_t)FRAME_OPAQUE_EOF;
+        dt.frame->pts       = d->last_frame_pts == AV_NOPTS_VALUE ? AV_NOPTS_VALUE :
+                              d->last_frame_pts + d->last_frame_duration_est;
+        dt.frame->time_base = d->last_frame_tb;
+
+        ret = sch_dec_send(d->sch, d->sch_idx, dt.frame);
+        if (ret < 0 && ret != AVERROR_EOF) {
+            av_log(NULL, AV_LOG_FATAL,
+                   "Error signalling EOF timestamp: %s\n", av_err2str(ret));
+            goto finish;
+        }
+        ret = 0;
+
+        err_rate = (ist->frames_decoded || ist->decode_errors) ?
+                   ist->decode_errors / (ist->frames_decoded + ist->decode_errors) : 0.f;
+        if (err_rate > max_error_rate) {
+            av_log(ist, AV_LOG_FATAL, "Decode error rate %g exceeds maximum %g\n",
+                   err_rate, max_error_rate);
+            ret = FFMPEG_ERROR_RATE_EXCEEDED;
+        } else if (err_rate)
+            av_log(ist, AV_LOG_VERBOSE, "Decode error rate %g\n", err_rate);
+    }
+
 finish:
-    tq_receive_finish(d->queue_in,  0);
-    tq_send_finish   (d->queue_out, 0);
-
-    // make sure the demuxer does not get stuck waiting for audio durations
-    // that will never arrive
-    if (ifile->audio_ts_queue && ist->dec->type == AVMEDIA_TYPE_AUDIO)
-        av_thread_message_queue_set_err_recv(ifile->audio_ts_queue, AVERROR_EOF);
-
     dec_thread_uninit(&dt);
 
-    av_log(ist, AV_LOG_VERBOSE, "Terminating decoder thread\n");
-
     return (void*)(intptr_t)ret;
 }
 
-int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
-{
-    Decoder *d = ist->decoder;
-    int ret = 0, thread_ret;
-
-    // thread already joined
-    if (!d->queue_in)
-        return AVERROR_EOF;
-
-    // send the packet/flush request/EOF to the decoder thread
-    if (pkt || no_eof) {
-        av_packet_unref(d->pkt);
-
-        if (pkt) {
-            ret = av_packet_ref(d->pkt, pkt);
-            if (ret < 0)
-                goto finish;
-        }
-
-        ret = tq_send(d->queue_in, 0, d->pkt);
-        if (ret < 0)
-            goto finish;
-    } else
-        tq_send_finish(d->queue_in, 0);
-
-    // retrieve all decoded data for the packet
-    while (1) {
-        int dummy;
-
-        ret = tq_receive(d->queue_out, &dummy, d->frame);
-        if (ret < 0)
-            goto finish;
-
-        // packet fully processed
-        if (!d->frame->buf[0])
-            return 0;
-
-        // process the decoded frame
-        if (ist->dec->type == AVMEDIA_TYPE_SUBTITLE) {
-            ret = process_subtitle(ist, d->frame);
-        } else {
-            ret = send_frame_to_filters(ist, d->frame);
-        }
-        av_frame_unref(d->frame);
-        if (ret < 0)
-            goto finish;
-    }
-
-finish:
-    thread_ret = dec_thread_stop(d);
-    if (thread_ret < 0) {
-        av_log(ist, AV_LOG_ERROR, "Decoder thread returned error: %s\n",
-               av_err2str(thread_ret));
-        ret = err_merge(ret, thread_ret);
-    }
-    // non-EOF errors here are all fatal
-    if (ret < 0 && ret != AVERROR_EOF)
-        return ret;
-
-    // signal EOF to our downstreams
-    ret = send_filter_eof(ist);
-    if (ret < 0) {
-        av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n");
-        return ret;
-    }
-
-    return AVERROR_EOF;
-}
-
-static int dec_thread_start(InputStream *ist)
-{
-    Decoder *d = ist->decoder;
-    ObjPool *op;
-    int ret = 0;
-
-    op = objpool_alloc_packets();
-    if (!op)
-        return AVERROR(ENOMEM);
-
-    d->queue_in = tq_alloc(1, 1, op, pkt_move);
-    if (!d->queue_in) {
-        objpool_free(&op);
-        return AVERROR(ENOMEM);
-    }
-
-    op = objpool_alloc_frames();
-    if (!op)
-        goto fail;
-
-    d->queue_out = tq_alloc(1, 4, op, frame_move);
-    if (!d->queue_out) {
-        objpool_free(&op);
-        goto fail;
-    }
-
-    ret = pthread_create(&d->thread, NULL, decoder_thread, ist);
-    if (ret) {
-        ret = AVERROR(ret);
-        av_log(ist, AV_LOG_ERROR, "pthread_create() failed: %s\n",
-               av_err2str(ret));
-        goto fail;
-    }
-
-    return 0;
-fail:
-    if (ret >= 0)
-        ret = AVERROR(ENOMEM);
-
-    tq_free(&d->queue_in);
-    tq_free(&d->queue_out);
-    return ret;
-}
-
 static enum AVPixelFormat get_format(AVCodecContext *s, const enum AVPixelFormat *pix_fmts)
 {
     InputStream *ist = s->opaque;
@@ -1095,12 +923,5 @@  int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx)
     if (ret < 0)
         return ret;
 
-    ret = dec_thread_start(ist);
-    if (ret < 0) {
-        av_log(ist, AV_LOG_ERROR, "Error starting decoder thread: %s\n",
-               av_err2str(ret));
-        return ret;
-    }
-
     return 0;
 }