From patchwork Sat Nov 4 07:56:29 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Anton Khirnov X-Patchwork-Id: 44513 Delivered-To: ffmpegpatchwork2@gmail.com Received: by 2002:a05:6a20:3aa6:b0:181:818d:5e7f with SMTP id d38csp336555pzh; Sat, 4 Nov 2023 02:23:43 -0700 (PDT) X-Google-Smtp-Source: AGHT+IFtv8CPJTIuFJu7/G+CZHo3Khd6zqa7c+laEOH/HGdNbBMldWk1MbFEy8WZA2WkLPxr3WZk X-Received: by 2002:a17:907:807:b0:9c4:54c6:8030 with SMTP id wv7-20020a170907080700b009c454c68030mr8551176ejb.6.1699089823264; Sat, 04 Nov 2023 02:23:43 -0700 (PDT) ARC-Seal: i=1; a=rsa-sha256; t=1699089823; cv=none; d=google.com; s=arc-20160816; b=HvHiMnR23YcBu/06eC6wcnod7bBhxlp5ynWvJn6WSlAUO8fEVwDJlyGQn024UsQfr+ Z/SO3tX+jGJiwn5bmfQlTSBBFf1NYA6RoKxonOM9x4hjpW57iPIty10fuVWoW8A/fC14 Ct3f6mYYS6Nllnq10A/g+7m2tJCwBlxmQFJTaSe+FaVPChRtgg8PiKrHpAe/CpGNUE9M +QOsbvbao8REWvCusheXhCFyqbkYV19fPlekCe/RG4cW5ZyY8/nZBnKBnkkCtV94TFsY GlQn57iC1SVm2r0Gc+bViDERwqFsGDfD7v9MWbFvKLv2tBPy+cm4CArjwpB2KX1rTpuI T08w== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:content-transfer-encoding:reply-to:list-subscribe :list-help:list-post:list-archive:list-unsubscribe:list-id :precedence:subject:mime-version:references:in-reply-to:message-id :date:to:from:delivered-to; bh=UC1btWvG6YWFfjNzxgyrGIR7IDVFx4tKmawcd0/CbhQ=; fh=YOA8vD9MJZuwZ71F/05pj6KdCjf6jQRmzLS+CATXUQk=; b=ik/Xx9U1MKGtQKW7c+yuTkgpYwfpIdHItB49oumSLdyvJwX6d4iH9NqL7R6C2nrTzu 1pAELOBmBcg5ytV5tvTk8zzU3DQQCchyhd8v2aLRRnpX1+GCcLm3L4Fw3/l+iIEBVunC NNNQGjG1ytE3aPTVKr0qdAcavC2Scwty5gKDcQuVeQ6wX7M5A2zFAE9HqNJEnHdXG1V4 40Zcho1fN+dbdXzIOliWL9bd6eNat8df3zxTyLQsmk/+AMJhcKrXRGjvp9uuI/U6cT8l Pn/fvn8uQkV0+mdf9MiRRz06vjGN2YYSuqRuC3ON0ZcjHzbxE3PnbbzayNpr6xdaZlHr biRQ== ARC-Authentication-Results: i=1; mx.google.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org. [79.124.17.100]) by mx.google.com with ESMTP id dl6-20020a170907944600b009bfc605fc84si2064464ejc.715.2023.11.04.02.23.42; Sat, 04 Nov 2023 02:23:43 -0700 (PDT) Received-SPF: pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) client-ip=79.124.17.100; Authentication-Results: mx.google.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 7D7F268CE94; Sat, 4 Nov 2023 11:22:07 +0200 (EET) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from mail1.khirnov.net (quelana.khirnov.net [94.230.150.81]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id 40E0968CE29 for ; Sat, 4 Nov 2023 11:21:55 +0200 (EET) Received: from localhost (mail1.khirnov.net [IPv6:::1]) by mail1.khirnov.net (Postfix) with ESMTP id 5678B1102 for ; Sat, 4 Nov 2023 10:21:51 +0100 (CET) Received: from mail1.khirnov.net ([IPv6:::1]) by localhost (mail1.khirnov.net [IPv6:::1]) (amavis, port 10024) with ESMTP id c1fomCX8tUUt for ; Sat, 4 Nov 2023 10:21:50 +0100 (CET) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:2a00:c500:561:201::7]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256 client-signature RSA-PSS (2048 bits) client-digest SHA256) (Client CN "libav.khirnov.net", Issuer "smtp.khirnov.net SMTP CA" (verified OK)) by mail1.khirnov.net (Postfix) with ESMTPS id BCC5F12DE for ; Sat, 4 Nov 2023 10:21:47 +0100 (CET) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:::1]) by libav.khirnov.net (Postfix) with ESMTP id 0D55A3A15C1 for ; Sat, 4 Nov 2023 10:21:41 +0100 (CET) From: Anton Khirnov To: ffmpeg-devel@ffmpeg.org Date: Sat, 4 Nov 2023 08:56:29 +0100 Message-ID: <20231104092125.10213-21-anton@khirnov.net> X-Mailer: git-send-email 2.42.0 In-Reply-To: <20231104092125.10213-1-anton@khirnov.net> References: <20231104092125.10213-1-anton@khirnov.net> MIME-Version: 1.0 Subject: [FFmpeg-devel] [PATCH 20/24] fftools/ffmpeg_dec: convert to the scheduler X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" X-TUID: bVzzmBYoRgiB --- fftools/ffmpeg.c | 22 --- fftools/ffmpeg.h | 13 +- fftools/ffmpeg_dec.c | 315 ++++++++++--------------------------------- 3 files changed, 70 insertions(+), 280 deletions(-) diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c index 611ac4621d..bd783fe674 100644 --- a/fftools/ffmpeg.c +++ b/fftools/ffmpeg.c @@ -778,11 +778,6 @@ static int process_input_packet(InputStream *ist, const AVPacket *pkt, int no_eo int ret = 0; int eof_reached = 0; - if (ist->decoding_needed) { - ret = dec_packet(ist, pkt, no_eof); - if (ret < 0 && ret != AVERROR_EOF) - return ret; - } if (ret == AVERROR_EOF || (!pkt && !ist->decoding_needed)) eof_reached = 1; @@ -994,18 +989,6 @@ static void reset_eagain(void) ost->unavailable = 0; } -static void decode_flush(InputFile *ifile) -{ - for (int i = 0; i < ifile->nb_streams; i++) { - InputStream *ist = ifile->streams[i]; - - if (ist->discard || !ist->decoding_needed) - continue; - - dec_packet(ist, NULL, 1); - } -} - /* * Return * - 0 -- one packet was read and processed @@ -1021,11 +1004,6 @@ static int process_input(int file_index, AVPacket *pkt) ret = 0; - if (ret == 1) { - /* the input file is looped: flush the decoders */ - decode_flush(ifile); - return AVERROR(EAGAIN); - } if (ret < 0) { if (ret != AVERROR_EOF) { av_log(ifile, AV_LOG_ERROR, diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index 73b3e54fb0..975d8b737e 100644 --- a/fftools/ffmpeg.h +++ b/fftools/ffmpeg.h @@ -61,6 +61,8 @@ #define FFMPEG_OPT_TOP 1 #define FFMPEG_OPT_FORCE_KF_SOURCE_NO_DROP 1 +#define FFMPEG_ERROR_RATE_EXCEEDED FFERRTAG('E', 'R', 'E', 'D') + enum VideoSyncMethod { VSYNC_AUTO = -1, VSYNC_PASSTHROUGH, @@ -796,17 +798,6 @@ int hwaccel_retrieve_data(AVCodecContext *avctx, AVFrame *input); int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx); void dec_free(Decoder **pdec); -/** - * Submit a packet for decoding - * - * When pkt==NULL and no_eof=0, there will be no more input. Flush decoders and - * mark all downstreams as finished. - * - * When pkt==NULL and no_eof=1, the stream was reset (e.g. after a seek). Flush - * decoders and await further input. - */ -int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof); - int enc_alloc(Encoder **penc, const AVCodec *codec, Scheduler *sch, unsigned sch_idx); void enc_free(Encoder **penc); diff --git a/fftools/ffmpeg_dec.c b/fftools/ffmpeg_dec.c index 53e14f061e..a81f83fc92 100644 --- a/fftools/ffmpeg_dec.c +++ b/fftools/ffmpeg_dec.c @@ -54,24 +54,6 @@ struct Decoder { Scheduler *sch; unsigned sch_idx; - - pthread_t thread; - /** - * Queue for sending coded packets from the main thread to - * the decoder thread. - * - * An empty packet is sent to flush the decoder without terminating - * decoding. - */ - ThreadQueue *queue_in; - /** - * Queue for sending decoded frames from the decoder thread - * to the main thread. - * - * An empty frame is sent to signal that a single packet has been fully - * processed. - */ - ThreadQueue *queue_out; }; // data that is local to the decoder thread and not visible outside of it @@ -80,24 +62,6 @@ typedef struct DecThreadContext { AVPacket *pkt; } DecThreadContext; -static int dec_thread_stop(Decoder *d) -{ - void *ret; - - if (!d->queue_in) - return 0; - - tq_send_finish(d->queue_in, 0); - tq_receive_finish(d->queue_out, 0); - - pthread_join(d->thread, &ret); - - tq_free(&d->queue_in); - tq_free(&d->queue_out); - - return (intptr_t)ret; -} - void dec_free(Decoder **pdec) { Decoder *dec = *pdec; @@ -105,8 +69,6 @@ void dec_free(Decoder **pdec) if (!dec) return; - dec_thread_stop(dec); - av_frame_free(&dec->frame); av_packet_free(&dec->pkt); @@ -148,25 +110,6 @@ fail: return AVERROR(ENOMEM); } -static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame) -{ - int i, ret = 0; - - for (i = 0; i < ist->nb_filters; i++) { - ret = ifilter_send_frame(ist->filters[i], decoded_frame, - i < ist->nb_filters - 1 || - ist->dec->type == AVMEDIA_TYPE_SUBTITLE); - if (ret == AVERROR_EOF) - ret = 0; /* ignore */ - if (ret < 0) { - av_log(NULL, AV_LOG_ERROR, - "Failed to inject frame into filter network: %s\n", av_err2str(ret)); - break; - } - } - return ret; -} - static AVRational audio_samplerate_update(void *logctx, Decoder *d, const AVFrame *frame) { @@ -421,36 +364,31 @@ static int process_subtitle(InputStream *ist, AVFrame *frame) if (!subtitle) return 0; - ret = send_frame_to_filters(ist, frame); + ret = sch_dec_send(d->sch, d->sch_idx, frame); if (ret < 0) - return ret; + av_frame_unref(frame); - subtitle = (AVSubtitle*)frame->buf[0]->data; - if (!subtitle->num_rects) - return 0; - - for (int oidx = 0; oidx < ist->nb_outputs; oidx++) { - OutputStream *ost = ist->outputs[oidx]; - if (!ost->enc || ost->type != AVMEDIA_TYPE_SUBTITLE) - continue; - - ret = enc_subtitle(output_files[ost->file_index], ost, subtitle); - if (ret < 0) - return ret; - } - - return 0; + return ret == AVERROR_EOF ? AVERROR_EXIT : ret; } static int transcode_subtitles(InputStream *ist, const AVPacket *pkt, AVFrame *frame) { - Decoder *d = ist->decoder; + Decoder *d = ist->decoder; AVPacket *flush_pkt = NULL; AVSubtitle subtitle; int got_output; int ret; + if (pkt && (intptr_t)pkt->opaque == PKT_OPAQUE_SUB_HEARTBEAT) { + frame->pts = pkt->pts; + frame->time_base = pkt->time_base; + frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_SUB_HEARTBEAT; + + ret = sch_dec_send(d->sch, d->sch_idx, frame); + return ret == AVERROR_EOF ? AVERROR_EXIT : ret; + } + if (!pkt) { flush_pkt = av_packet_alloc(); if (!flush_pkt) @@ -473,7 +411,7 @@ static int transcode_subtitles(InputStream *ist, const AVPacket *pkt, ist->frames_decoded++; - // XXX the queue for transferring data back to the main thread runs + // XXX the queue for transferring data to consumers runs // on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put that // inside the frame // eventually, subtitles should be switched to use AVFrames natively @@ -486,26 +424,7 @@ static int transcode_subtitles(InputStream *ist, const AVPacket *pkt, frame->width = ist->dec_ctx->width; frame->height = ist->dec_ctx->height; - ret = tq_send(d->queue_out, 0, frame); - if (ret < 0) - av_frame_unref(frame); - - return ret; -} - -static int send_filter_eof(InputStream *ist) -{ - Decoder *d = ist->decoder; - int i, ret; - - for (i = 0; i < ist->nb_filters; i++) { - int64_t end_pts = d->last_frame_pts == AV_NOPTS_VALUE ? AV_NOPTS_VALUE : - d->last_frame_pts + d->last_frame_duration_est; - ret = ifilter_send_eof(ist->filters[i], end_pts, d->last_frame_tb); - if (ret < 0) - return ret; - } - return 0; + return process_subtitle(ist, frame); } static int packet_decode(InputStream *ist, AVPacket *pkt, AVFrame *frame) @@ -612,9 +531,11 @@ static int packet_decode(InputStream *ist, AVPacket *pkt, AVFrame *frame) ist->frames_decoded++; - ret = tq_send(d->queue_out, 0, frame); - if (ret < 0) - return ret; + ret = sch_dec_send(d->sch, d->sch_idx, frame); + if (ret < 0) { + av_frame_unref(frame); + return ret == AVERROR_EOF ? AVERROR_EXIT : ret; + } } } @@ -656,7 +577,6 @@ fail: void *decoder_thread(void *arg) { InputStream *ist = arg; - InputFile *ifile = input_files[ist->file_index]; Decoder *d = ist->decoder; DecThreadContext dt; int ret = 0, input_status = 0; @@ -668,19 +588,30 @@ void *decoder_thread(void *arg) dec_thread_set_name(ist); while (!input_status) { - int dummy, flush_buffers; + int flush_buffers, have_data; - input_status = tq_receive(d->queue_in, &dummy, dt.pkt); - flush_buffers = input_status >= 0 && !dt.pkt->buf; - if (!dt.pkt->buf) + input_status = sch_dec_receive(d->sch, d->sch_idx, dt.pkt); + have_data = input_status >= 0 && + (dt.pkt->buf || dt.pkt->side_data_elems || + (intptr_t)dt.pkt->opaque == PKT_OPAQUE_SUB_HEARTBEAT); + flush_buffers = input_status >= 0 && !have_data; + if (!have_data) av_log(ist, AV_LOG_VERBOSE, "Decoder thread received %s packet\n", flush_buffers ? "flush" : "EOF"); - ret = packet_decode(ist, dt.pkt->buf ? dt.pkt : NULL, dt.frame); + ret = packet_decode(ist, have_data ? dt.pkt : NULL, dt.frame); av_packet_unref(dt.pkt); av_frame_unref(dt.frame); + // AVERROR_EOF - EOF from the decoder + // AVERROR_EXIT - EOF from the scheduler + // we treat them differently when flushing + if (ret == AVERROR_EXIT) { + ret = AVERROR_EOF; + flush_buffers = 0; + } + if (ret == AVERROR_EOF) { av_log(ist, AV_LOG_VERBOSE, "Decoder returned EOF, %s\n", flush_buffers ? "resetting" : "finishing"); @@ -688,11 +619,10 @@ void *decoder_thread(void *arg) if (!flush_buffers) break; - /* report last frame duration to the demuxer thread */ + /* report last frame duration to the scheduler */ if (ist->dec->type == AVMEDIA_TYPE_AUDIO) { - Timestamp ts = { .ts = d->last_frame_pts + d->last_frame_duration_est, - .tb = d->last_frame_tb }; - av_thread_message_queue_send(ifile->audio_ts_queue, &ts, 0); + dt.pkt->pts = d->last_frame_pts + d->last_frame_duration_est; + dt.pkt->time_base = d->last_frame_tb; } avcodec_flush_buffers(ist->dec_ctx); @@ -701,149 +631,47 @@ void *decoder_thread(void *arg) av_err2str(ret)); break; } - - // signal to the consumer thread that the entire packet was processed - ret = tq_send(d->queue_out, 0, dt.frame); - if (ret < 0) { - if (ret != AVERROR_EOF) - av_log(ist, AV_LOG_ERROR, "Error communicating with the main thread\n"); - break; - } } // EOF is normal thread termination if (ret == AVERROR_EOF) ret = 0; + // on success send EOF timestamp to our downstreams + if (ret >= 0) { + float err_rate; + + av_frame_unref(dt.frame); + + dt.frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_EOF; + dt.frame->pts = d->last_frame_pts == AV_NOPTS_VALUE ? AV_NOPTS_VALUE : + d->last_frame_pts + d->last_frame_duration_est; + dt.frame->time_base = d->last_frame_tb; + + ret = sch_dec_send(d->sch, d->sch_idx, dt.frame); + if (ret < 0 && ret != AVERROR_EOF) { + av_log(NULL, AV_LOG_FATAL, + "Error signalling EOF timestamp: %s\n", av_err2str(ret)); + goto finish; + } + ret = 0; + + err_rate = (ist->frames_decoded || ist->decode_errors) ? + ist->decode_errors / (ist->frames_decoded + ist->decode_errors) : 0.f; + if (err_rate > max_error_rate) { + av_log(ist, AV_LOG_FATAL, "Decode error rate %g exceeds maximum %g\n", + err_rate, max_error_rate); + ret = FFMPEG_ERROR_RATE_EXCEEDED; + } else if (err_rate) + av_log(ist, AV_LOG_VERBOSE, "Decode error rate %g\n", err_rate); + } + finish: - tq_receive_finish(d->queue_in, 0); - tq_send_finish (d->queue_out, 0); - - // make sure the demuxer does not get stuck waiting for audio durations - // that will never arrive - if (ifile->audio_ts_queue && ist->dec->type == AVMEDIA_TYPE_AUDIO) - av_thread_message_queue_set_err_recv(ifile->audio_ts_queue, AVERROR_EOF); - dec_thread_uninit(&dt); - av_log(ist, AV_LOG_VERBOSE, "Terminating decoder thread\n"); - return (void*)(intptr_t)ret; } -int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof) -{ - Decoder *d = ist->decoder; - int ret = 0, thread_ret; - - // thread already joined - if (!d->queue_in) - return AVERROR_EOF; - - // send the packet/flush request/EOF to the decoder thread - if (pkt || no_eof) { - av_packet_unref(d->pkt); - - if (pkt) { - ret = av_packet_ref(d->pkt, pkt); - if (ret < 0) - goto finish; - } - - ret = tq_send(d->queue_in, 0, d->pkt); - if (ret < 0) - goto finish; - } else - tq_send_finish(d->queue_in, 0); - - // retrieve all decoded data for the packet - while (1) { - int dummy; - - ret = tq_receive(d->queue_out, &dummy, d->frame); - if (ret < 0) - goto finish; - - // packet fully processed - if (!d->frame->buf[0]) - return 0; - - // process the decoded frame - if (ist->dec->type == AVMEDIA_TYPE_SUBTITLE) { - ret = process_subtitle(ist, d->frame); - } else { - ret = send_frame_to_filters(ist, d->frame); - } - av_frame_unref(d->frame); - if (ret < 0) - goto finish; - } - -finish: - thread_ret = dec_thread_stop(d); - if (thread_ret < 0) { - av_log(ist, AV_LOG_ERROR, "Decoder thread returned error: %s\n", - av_err2str(thread_ret)); - ret = err_merge(ret, thread_ret); - } - // non-EOF errors here are all fatal - if (ret < 0 && ret != AVERROR_EOF) - return ret; - - // signal EOF to our downstreams - ret = send_filter_eof(ist); - if (ret < 0) { - av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n"); - return ret; - } - - return AVERROR_EOF; -} - -static int dec_thread_start(InputStream *ist) -{ - Decoder *d = ist->decoder; - ObjPool *op; - int ret = 0; - - op = objpool_alloc_packets(); - if (!op) - return AVERROR(ENOMEM); - - d->queue_in = tq_alloc(1, 1, op, pkt_move); - if (!d->queue_in) { - objpool_free(&op); - return AVERROR(ENOMEM); - } - - op = objpool_alloc_frames(); - if (!op) - goto fail; - - d->queue_out = tq_alloc(1, 4, op, frame_move); - if (!d->queue_out) { - objpool_free(&op); - goto fail; - } - - ret = pthread_create(&d->thread, NULL, decoder_thread, ist); - if (ret) { - ret = AVERROR(ret); - av_log(ist, AV_LOG_ERROR, "pthread_create() failed: %s\n", - av_err2str(ret)); - goto fail; - } - - return 0; -fail: - if (ret >= 0) - ret = AVERROR(ENOMEM); - - tq_free(&d->queue_in); - tq_free(&d->queue_out); - return ret; -} - static enum AVPixelFormat get_format(AVCodecContext *s, const enum AVPixelFormat *pix_fmts) { InputStream *ist = s->opaque; @@ -1095,12 +923,5 @@ int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx) if (ret < 0) return ret; - ret = dec_thread_start(ist); - if (ret < 0) { - av_log(ist, AV_LOG_ERROR, "Error starting decoder thread: %s\n", - av_err2str(ret)); - return ret; - } - return 0; }