From patchwork Sat Nov 4 07:56:24 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Anton Khirnov X-Patchwork-Id: 44512 Delivered-To: ffmpegpatchwork2@gmail.com Received: by 2002:a05:6a20:3aa6:b0:181:818d:5e7f with SMTP id d38csp336498pzh; Sat, 4 Nov 2023 02:23:34 -0700 (PDT) X-Google-Smtp-Source: AGHT+IEqRgypr29qctDhdTSqpMLPhKf5ceKCPoJHdhoa5H6syLwirJ8EbssoNnZpBnZVkVNkI1eG X-Received: by 2002:a05:6402:792:b0:53f:ba1a:800d with SMTP id d18-20020a056402079200b0053fba1a800dmr19350665edy.14.1699089814455; Sat, 04 Nov 2023 02:23:34 -0700 (PDT) ARC-Seal: i=1; a=rsa-sha256; t=1699089814; cv=none; d=google.com; s=arc-20160816; b=k0F6Njtjnh4a2YnuMUX0WaN7eIjtYVbJXbCY96CB21izzhmEPqo9g9mQT5ZARvmru4 rhCn+vy+unUpd7CUDCx1XtrblCgdD4Il1hmGunBQuFxhyxlHKETicizz2uvtA5sCRQaj QXYKvtiv6kQcAI2D+y8DZEciYe3Wh4OXRB0wXTz/NAN5PvwnhC1r/b4KGnaCjwLTAPl+ 1BCihWhkA0tgtXbgHTYNdGZmfMaF3lnP2Z5NlHQyyUBX4291Yr4F59UOP0yyNGFrJQJl AvN8jfyIEsr+RyAMnySG8VMMIkPDH/RUSQfkVpiWJ/BxDK+Tdb3C4rd4lHJdwu2IGg92 TOgA== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:content-transfer-encoding:reply-to:list-subscribe :list-help:list-post:list-archive:list-unsubscribe:list-id :precedence:subject:mime-version:references:in-reply-to:message-id :date:to:from:delivered-to; bh=0WXEJRRhfeY/s3ubAInykGe1baUEFPtrx0KiW5OyNA8=; fh=YOA8vD9MJZuwZ71F/05pj6KdCjf6jQRmzLS+CATXUQk=; b=oHoBJTZiNC5JXHZFa4A1r0dDH9rc7sEoNCNvXO9hX1fyFQ9vVPz18WjLMg4hU5IT4Y zP44TreOJ9QoAIb/1ZkIXNrUYgkfZusEd2HWbSf2YZUwU555lQo5RUMtiOqPzlQ8Gpid JmizpzFxitr7dfldv7pVyqQI5htYH2aH5wyWoz+qerpBXJegy4cwyNHRCPZCeJpSGF6i MOrqJR1wKoJydI1jCkbAUJSk3xI42jVwLEhgOlcpq+bR94ZBweFepRok8NIRJAx3L41+ v1AxX/VJZC5XBhsSAHJJ+MGeNvlvK4LlU2OvJtFFpPoeShJFjxRk/MOvz6tE7sqSKwZj i2AQ== ARC-Authentication-Results: i=1; mx.google.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org. [79.124.17.100]) by mx.google.com with ESMTP id f8-20020a056402354800b0053ebd5c9ffbsi2050626edd.215.2023.11.04.02.23.34; Sat, 04 Nov 2023 02:23:34 -0700 (PDT) Received-SPF: pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) client-ip=79.124.17.100; Authentication-Results: mx.google.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 769EE68CE8F; Sat, 4 Nov 2023 11:22:06 +0200 (EET) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from mail1.khirnov.net (quelana.khirnov.net [94.230.150.81]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id 3EC1068CE42 for ; Sat, 4 Nov 2023 11:21:55 +0200 (EET) Received: from localhost (mail1.khirnov.net [IPv6:::1]) by mail1.khirnov.net (Postfix) with ESMTP id 3828E1049 for ; Sat, 4 Nov 2023 10:21:51 +0100 (CET) Received: from mail1.khirnov.net ([IPv6:::1]) by localhost (mail1.khirnov.net [IPv6:::1]) (amavis, port 10024) with ESMTP id utnQZKc_4X3z for ; Sat, 4 Nov 2023 10:21:50 +0100 (CET) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:2a00:c500:561:201::7]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256 client-signature RSA-PSS (2048 bits) client-digest SHA256) (Client CN "libav.khirnov.net", Issuer "smtp.khirnov.net SMTP CA" (verified OK)) by mail1.khirnov.net (Postfix) with ESMTPS id B063C11D7 for ; Sat, 4 Nov 2023 10:21:47 +0100 (CET) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:::1]) by libav.khirnov.net (Postfix) with ESMTP id BE4A53A13E3 for ; Sat, 4 Nov 2023 10:21:40 +0100 (CET) From: Anton Khirnov To: ffmpeg-devel@ffmpeg.org Date: Sat, 4 Nov 2023 08:56:24 +0100 Message-ID: <20231104092125.10213-16-anton@khirnov.net> X-Mailer: git-send-email 2.42.0 In-Reply-To: <20231104092125.10213-1-anton@khirnov.net> References: <20231104092125.10213-1-anton@khirnov.net> MIME-Version: 1.0 Subject: [FFmpeg-devel] [PATCH 15/24] fftools/ffmpeg_demux: switch from AVThreadMessageQueue to ThreadQueue X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" X-TUID: T1s9j7lh4TLV * the code is made shorter and simpler * avoids constantly allocating and freeing AVPackets, thanks to ThreadQueue integration with ObjPool * is consistent with decoding/filtering/muxing * reduces the diff in the future switch to thread-aware scheduling This makes ifile_get_packet() always block. Any potential issues caused by this will be resolved by the switch to thread-aware scheduling in future commits. --- fftools/ffmpeg.c | 32 ++++++------ fftools/ffmpeg.h | 3 +- fftools/ffmpeg_demux.c | 108 ++++++++++++++-------------------------- fftools/ffmpeg_filter.c | 5 +- 4 files changed, 58 insertions(+), 90 deletions(-) diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c index cdb16ef90b..038649d9b5 100644 --- a/fftools/ffmpeg.c +++ b/fftools/ffmpeg.c @@ -1030,9 +1030,6 @@ static int check_keyboard_interaction(int64_t cur_time) static void reset_eagain(void) { - int i; - for (i = 0; i < nb_input_files; i++) - input_files[i]->eagain = 0; for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) ost->unavailable = 0; } @@ -1056,19 +1053,14 @@ static void decode_flush(InputFile *ifile) * this function should be called again * - AVERROR_EOF -- this function should not be called again */ -static int process_input(int file_index) +static int process_input(int file_index, AVPacket *pkt) { InputFile *ifile = input_files[file_index]; InputStream *ist; - AVPacket *pkt; int ret, i; - ret = ifile_get_packet(ifile, &pkt); + ret = ifile_get_packet(ifile, pkt); - if (ret == AVERROR(EAGAIN)) { - ifile->eagain = 1; - return ret; - } if (ret == 1) { /* the input file is looped: flush the decoders */ decode_flush(ifile); @@ -1115,7 +1107,7 @@ static int process_input(int file_index) ret = process_input_packet(ist, pkt, 0); - av_packet_free(&pkt); + av_packet_unref(pkt); return ret < 0 ? ret : 0; } @@ -1125,7 +1117,7 @@ static int process_input(int file_index) * * @return 0 for success, <0 for error */ -static int transcode_step(OutputStream *ost) +static int transcode_step(OutputStream *ost, AVPacket *demux_pkt) { InputStream *ist = NULL; int ret; @@ -1140,10 +1132,8 @@ static int transcode_step(OutputStream *ost) av_assert0(ist); } - ret = process_input(ist->file_index); + ret = process_input(ist->file_index, demux_pkt); if (ret == AVERROR(EAGAIN)) { - if (input_files[ist->file_index]->eagain) - ost->unavailable = 1; return 0; } @@ -1169,12 +1159,19 @@ static int transcode(int *err_rate_exceeded) int ret = 0, i; InputStream *ist; int64_t timer_start; + AVPacket *demux_pkt = NULL; print_stream_maps(); *err_rate_exceeded = 0; atomic_store(&transcode_init_done, 1); + demux_pkt = av_packet_alloc(); + if (!demux_pkt) { + ret = AVERROR(ENOMEM); + goto fail; + } + if (stdin_interaction) { av_log(NULL, AV_LOG_INFO, "Press [q] to stop, [?] for help\n"); } @@ -1202,7 +1199,7 @@ static int transcode(int *err_rate_exceeded) break; } - ret = transcode_step(ost); + ret = transcode_step(ost, demux_pkt); if (ret < 0 && ret != AVERROR_EOF) { av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n", av_err2str(ret)); break; @@ -1243,6 +1240,9 @@ static int transcode(int *err_rate_exceeded) /* dump report by using the first video and audio streams */ print_report(1, timer_start, av_gettime_relative()); +fail: + av_packet_free(&demux_pkt); + return ret; } diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index 9852df8320..8de91ab85a 100644 --- a/fftools/ffmpeg.h +++ b/fftools/ffmpeg.h @@ -407,7 +407,6 @@ typedef struct InputFile { AVFormatContext *ctx; int eof_reached; /* true if eof reached */ - int eagain; /* true if last read attempt returned EAGAIN */ int64_t input_ts_offset; int input_sync_ref; /** @@ -857,7 +856,7 @@ void ifile_close(InputFile **f); * caller should flush decoders and read from this demuxer again * - a negative error code on failure */ -int ifile_get_packet(InputFile *f, AVPacket **pkt); +int ifile_get_packet(InputFile *f, AVPacket *pkt); int ist_output_add(InputStream *ist, OutputStream *ost); int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple); diff --git a/fftools/ffmpeg_demux.c b/fftools/ffmpeg_demux.c index 791952f120..65a5e08ca5 100644 --- a/fftools/ffmpeg_demux.c +++ b/fftools/ffmpeg_demux.c @@ -21,6 +21,8 @@ #include "ffmpeg.h" #include "ffmpeg_utils.h" +#include "objpool.h" +#include "thread_queue.h" #include "libavutil/avassert.h" #include "libavutil/avstring.h" @@ -33,7 +35,6 @@ #include "libavutil/time.h" #include "libavutil/timestamp.h" #include "libavutil/thread.h" -#include "libavutil/threadmessage.h" #include "libavcodec/packet.h" @@ -107,19 +108,13 @@ typedef struct Demuxer { double readrate_initial_burst; - AVThreadMessageQueue *in_thread_queue; + ThreadQueue *thread_queue; int thread_queue_size; pthread_t thread; - int non_blocking; int read_started; } Demuxer; -typedef struct DemuxMsg { - AVPacket *pkt; - int looping; -} DemuxMsg; - static DemuxStream *ds_from_ist(InputStream *ist) { return (DemuxStream*)ist; @@ -440,26 +435,16 @@ static int ts_fixup(Demuxer *d, AVPacket *pkt) return 0; } -// process an input packet into a message to send to the consumer thread -// src is always cleared by this function -static int input_packet_process(Demuxer *d, DemuxMsg *msg, AVPacket *src) +static int input_packet_process(Demuxer *d, AVPacket *pkt) { InputFile *f = &d->f; - InputStream *ist = f->streams[src->stream_index]; + InputStream *ist = f->streams[pkt->stream_index]; DemuxStream *ds = ds_from_ist(ist); - AVPacket *pkt; int ret = 0; - pkt = av_packet_alloc(); - if (!pkt) { - av_packet_unref(src); - return AVERROR(ENOMEM); - } - av_packet_move_ref(pkt, src); - ret = ts_fixup(d, pkt); if (ret < 0) - goto fail; + return ret; ds->data_size += pkt->size; ds->nb_packets++; @@ -475,13 +460,7 @@ static int input_packet_process(Demuxer *d, DemuxMsg *msg, AVPacket *src) av_ts2timestr(input_files[ist->file_index]->ts_offset, &AV_TIME_BASE_Q)); } - msg->pkt = pkt; - pkt = NULL; - -fail: - av_packet_free(&pkt); - - return ret; + return 0; } static void readrate_sleep(Demuxer *d) @@ -531,7 +510,6 @@ static void *input_thread(void *arg) Demuxer *d = arg; InputFile *f = &d->f; AVPacket *pkt; - unsigned flags = d->non_blocking ? AV_THREAD_MESSAGE_NONBLOCK : 0; int ret = 0; pkt = av_packet_alloc(); @@ -547,8 +525,6 @@ static void *input_thread(void *arg) d->wallclock_start = av_gettime_relative(); while (1) { - DemuxMsg msg = { NULL }; - ret = av_read_frame(f->ctx, pkt); if (ret == AVERROR(EAGAIN)) { @@ -558,8 +534,8 @@ static void *input_thread(void *arg) if (ret < 0) { if (d->loop) { /* signal looping to the consumer thread */ - msg.looping = 1; - ret = av_thread_message_queue_send(d->in_thread_queue, &msg, 0); + pkt->stream_index = -1; + ret = tq_send(d->thread_queue, 0, pkt); if (ret >= 0) ret = seek_to_start(d); if (ret >= 0) @@ -602,35 +578,26 @@ static void *input_thread(void *arg) } } - ret = input_packet_process(d, &msg, pkt); + ret = input_packet_process(d, pkt); if (ret < 0) break; if (f->readrate) readrate_sleep(d); - ret = av_thread_message_queue_send(d->in_thread_queue, &msg, flags); - if (flags && ret == AVERROR(EAGAIN)) { - flags = 0; - ret = av_thread_message_queue_send(d->in_thread_queue, &msg, flags); - av_log(f, AV_LOG_WARNING, - "Thread message queue blocking; consider raising the " - "thread_queue_size option (current value: %d)\n", - d->thread_queue_size); - } + ret = tq_send(d->thread_queue, 0, pkt); if (ret < 0) { if (ret != AVERROR_EOF) av_log(f, AV_LOG_ERROR, "Unable to send packet to main thread: %s\n", av_err2str(ret)); - av_packet_free(&msg.pkt); break; } } finish: av_assert0(ret < 0); - av_thread_message_queue_set_err_recv(d->in_thread_queue, ret); + tq_send_finish(d->thread_queue, 0); av_packet_free(&pkt); @@ -642,16 +609,16 @@ finish: static void thread_stop(Demuxer *d) { InputFile *f = &d->f; - DemuxMsg msg; - if (!d->in_thread_queue) + if (!d->thread_queue) return; - av_thread_message_queue_set_err_send(d->in_thread_queue, AVERROR_EOF); - while (av_thread_message_queue_recv(d->in_thread_queue, &msg, 0) >= 0) - av_packet_free(&msg.pkt); + + tq_receive_finish(d->thread_queue, 0); pthread_join(d->thread, NULL); - av_thread_message_queue_free(&d->in_thread_queue); + + tq_free(&d->thread_queue); + av_thread_message_queue_free(&f->audio_ts_queue); } @@ -659,18 +626,20 @@ static int thread_start(Demuxer *d) { int ret; InputFile *f = &d->f; + ObjPool *op; if (d->thread_queue_size <= 0) d->thread_queue_size = (nb_input_files > 1 ? 8 : 1); - if (nb_input_files > 1 && - (f->ctx->pb ? !f->ctx->pb->seekable : - strcmp(f->ctx->iformat->name, "lavfi"))) - d->non_blocking = 1; - ret = av_thread_message_queue_alloc(&d->in_thread_queue, - d->thread_queue_size, sizeof(DemuxMsg)); - if (ret < 0) - return ret; + op = objpool_alloc_packets(); + if (!op) + return AVERROR(ENOMEM); + + d->thread_queue = tq_alloc(1, d->thread_queue_size, op, pkt_move); + if (!d->thread_queue) { + objpool_free(&op); + return AVERROR(ENOMEM); + } if (d->loop) { int nb_audio_dec = 0; @@ -700,31 +669,30 @@ static int thread_start(Demuxer *d) return 0; fail: - av_thread_message_queue_free(&d->in_thread_queue); + tq_free(&d->thread_queue); return ret; } -int ifile_get_packet(InputFile *f, AVPacket **pkt) +int ifile_get_packet(InputFile *f, AVPacket *pkt) { Demuxer *d = demuxer_from_ifile(f); - DemuxMsg msg; - int ret; + int ret, dummy; - if (!d->in_thread_queue) { + if (!d->thread_queue) { ret = thread_start(d); if (ret < 0) return ret; } - ret = av_thread_message_queue_recv(d->in_thread_queue, &msg, - d->non_blocking ? - AV_THREAD_MESSAGE_NONBLOCK : 0); + ret = tq_receive(d->thread_queue, &dummy, pkt); if (ret < 0) return ret; - if (msg.looping) - return 1; - *pkt = msg.pkt; + if (pkt->stream_index == -1) { + av_assert0(!pkt->data && !pkt->side_data_elems); + return 1; + } + return 0; } diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c index e288ea4b80..a98a02e643 100644 --- a/fftools/ffmpeg_filter.c +++ b/fftools/ffmpeg_filter.c @@ -2002,9 +2002,8 @@ static int choose_input(const FilterGraph *fg, const FilterGraphThread *fgt) for (int i = 0; i < fg->nb_inputs; i++) { InputFilter *ifilter = fg->inputs[i]; InputFilterPriv *ifp = ifp_from_ifilter(ifilter); - InputStream *ist = ifp->ist; - if (input_files[ist->file_index]->eagain || fgt->eof_in[i]) + if (fgt->eof_in[i]) continue; nb_requests = av_buffersrc_get_nb_failed_requests(ifp->filter); @@ -2014,6 +2013,8 @@ static int choose_input(const FilterGraph *fg, const FilterGraphThread *fgt) } } + av_assert0(best_input >= 0); + return best_input; }