From patchwork Wed Dec 6 10:27:13 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Anton Khirnov X-Patchwork-Id: 44963 Delivered-To: ffmpegpatchwork2@gmail.com Received: by 2002:a05:6a20:9153:b0:181:818d:5e7f with SMTP id x19csp161516pzc; Wed, 6 Dec 2023 02:31:44 -0800 (PST) X-Google-Smtp-Source: AGHT+IFZy33JGkk7RpOVp97/lgSz95uWmuiDVf7lkqQeTiVrY5FYOoYJFoLqGCmTI42f2pvu4eBX X-Received: by 2002:a05:6402:3593:b0:54d:8ec0:17d4 with SMTP id y19-20020a056402359300b0054d8ec017d4mr654239edc.32.1701858704652; Wed, 06 Dec 2023 02:31:44 -0800 (PST) ARC-Seal: i=1; a=rsa-sha256; t=1701858704; cv=none; d=google.com; s=arc-20160816; b=Oo2cfMeDVSI3HHvKrXBjamNs2AUgVNPDYO8XJy80rZGTa2ozcIzmMfZk9nM770yT5A 4gGhvSNxZCgITWvaY625o4u4cHHh6XWEutbKmAsV/ZuLhWu1aG6jKTC6a8BaHIjHzrUz JJblVMkVRmhqpcI/vNU68uxD7djDxM0QFoZpEzVavqkPtDNzAhKFYrsuzaUg5e4Jvyfb voXSxHKtcR/5A/Kyn5W6SxuLeLuQ+dcQVcD7WBCrCxnLgPFqC/OY/+rlwnX9yBmI3H/g uWSim6sLqfxpNpgaTnTa8d7XLxHB0kDTLByM9nkBILdJRg93qNSihlZ6Dwp5r23i2GKm ScXQ== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:content-transfer-encoding:reply-to:list-subscribe :list-help:list-post:list-archive:list-unsubscribe:list-id :precedence:subject:mime-version:references:in-reply-to:message-id :date:to:from:delivered-to; bh=RQWIUfGXgGCoyyCVOrKndl6h8/uZyWv85+CvxvddqNI=; fh=YOA8vD9MJZuwZ71F/05pj6KdCjf6jQRmzLS+CATXUQk=; b=f97Fxy1/Jf8Ee5jZZymPEdaAhCKdwBcdaPwQ6fdF832fhcQyr4E/GyrFoGGkcUrwFn kB5rk2LZ1jAoZg+ZT2Qg3irDyQvPXWDhiiQyPGQpScZA9bsDGXjpbnl1DybbkgZ/usR0 JkK18PUvkuxaYeZ1rv2KaaLa64Qy/2MRHuw9SphM6XczPr6jFck+eJbXRPxAk1jLzbGX ELortBdnkfVhM+e9S6kte/fa9jLo4QeLXBMxtq/5cTePqwphwoGGcae+EQ+DsrQS70l7 qRQZqKoGxA87brgbFSShO0A3zG2oxl4KHMHryyn/7vyLgEY3gaVfA4gBSYo1tmVjK8QY YV8A== ARC-Authentication-Results: i=1; mx.google.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org. [79.124.17.100]) by mx.google.com with ESMTP id f26-20020a50a6da000000b0054cb1dcaf86si1990324edc.294.2023.12.06.02.31.42; Wed, 06 Dec 2023 02:31:44 -0800 (PST) Received-SPF: pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) client-ip=79.124.17.100; Authentication-Results: mx.google.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 5978A68CF81; Wed, 6 Dec 2023 12:30:26 +0200 (EET) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from mail1.khirnov.net (quelana.khirnov.net [94.230.150.81]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id 5830268CF5E for ; Wed, 6 Dec 2023 12:30:17 +0200 (EET) Received: from localhost (mail1.khirnov.net [IPv6:::1]) by mail1.khirnov.net (Postfix) with ESMTP id 903871D38 for ; Wed, 6 Dec 2023 11:30:12 +0100 (CET) Received: from mail1.khirnov.net ([IPv6:::1]) by localhost (mail1.khirnov.net [IPv6:::1]) (amavis, port 10024) with ESMTP id quy6udWbzFfw for ; Wed, 6 Dec 2023 11:30:12 +0100 (CET) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:2a00:c500:561:201::7]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256 client-signature RSA-PSS (2048 bits) client-digest SHA256) (Client CN "libav.khirnov.net", Issuer "smtp.khirnov.net SMTP CA" (verified OK)) by mail1.khirnov.net (Postfix) with ESMTPS id 9DA671DDF for ; Wed, 6 Dec 2023 11:30:10 +0100 (CET) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:::1]) by libav.khirnov.net (Postfix) with ESMTP id 90FAB3A06A6 for ; Wed, 6 Dec 2023 11:30:10 +0100 (CET) From: Anton Khirnov To: ffmpeg-devel@ffmpeg.org Date: Wed, 6 Dec 2023 11:27:13 +0100 Message-ID: <20231206103002.30084-8-anton@khirnov.net> X-Mailer: git-send-email 2.42.0 In-Reply-To: <20231206103002.30084-1-anton@khirnov.net> References: <20231206103002.30084-1-anton@khirnov.net> MIME-Version: 1.0 Subject: [FFmpeg-devel] [PATCH 07/10] fftools/ffmpeg_demux: switch from AVThreadMessageQueue to ThreadQueue X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" X-TUID: Hi0fa9RP0Sfr * the code is made shorter and simpler * avoids constantly allocating and freeing AVPackets, thanks to ThreadQueue integration with ObjPool * is consistent with decoding/filtering/muxing * reduces the diff in the future switch to thread-aware scheduling This makes ifile_get_packet() always block. Any potential issues caused by this will be resolved by the switch to thread-aware scheduling in future commits. --- fftools/ffmpeg.c | 32 ++++++------ fftools/ffmpeg.h | 3 +- fftools/ffmpeg_demux.c | 108 ++++++++++++++-------------------------- fftools/ffmpeg_filter.c | 5 +- 4 files changed, 58 insertions(+), 90 deletions(-) diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c index 61fcda2526..cf8a50bffc 100644 --- a/fftools/ffmpeg.c +++ b/fftools/ffmpeg.c @@ -1043,9 +1043,6 @@ static int check_keyboard_interaction(int64_t cur_time) static void reset_eagain(void) { - int i; - for (i = 0; i < nb_input_files; i++) - input_files[i]->eagain = 0; for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) ost->unavailable = 0; } @@ -1069,19 +1066,14 @@ static void decode_flush(InputFile *ifile) * this function should be called again * - AVERROR_EOF -- this function should not be called again */ -static int process_input(int file_index) +static int process_input(int file_index, AVPacket *pkt) { InputFile *ifile = input_files[file_index]; InputStream *ist; - AVPacket *pkt; int ret, i; - ret = ifile_get_packet(ifile, &pkt); + ret = ifile_get_packet(ifile, pkt); - if (ret == AVERROR(EAGAIN)) { - ifile->eagain = 1; - return ret; - } if (ret == 1) { /* the input file is looped: flush the decoders */ decode_flush(ifile); @@ -1128,7 +1120,7 @@ static int process_input(int file_index) ret = process_input_packet(ist, pkt, 0); - av_packet_free(&pkt); + av_packet_unref(pkt); return ret < 0 ? ret : 0; } @@ -1138,7 +1130,7 @@ static int process_input(int file_index) * * @return 0 for success, <0 for error */ -static int transcode_step(OutputStream *ost) +static int transcode_step(OutputStream *ost, AVPacket *demux_pkt) { InputStream *ist = NULL; int ret; @@ -1153,10 +1145,8 @@ static int transcode_step(OutputStream *ost) av_assert0(ist); } - ret = process_input(ist->file_index); + ret = process_input(ist->file_index, demux_pkt); if (ret == AVERROR(EAGAIN)) { - if (input_files[ist->file_index]->eagain) - ost->unavailable = 1; return 0; } @@ -1182,12 +1172,19 @@ static int transcode(int *err_rate_exceeded) int ret = 0, i; InputStream *ist; int64_t timer_start; + AVPacket *demux_pkt = NULL; print_stream_maps(); *err_rate_exceeded = 0; atomic_store(&transcode_init_done, 1); + demux_pkt = av_packet_alloc(); + if (!demux_pkt) { + ret = AVERROR(ENOMEM); + goto fail; + } + if (stdin_interaction) { av_log(NULL, AV_LOG_INFO, "Press [q] to stop, [?] for help\n"); } @@ -1215,7 +1212,7 @@ static int transcode(int *err_rate_exceeded) break; } - ret = transcode_step(ost); + ret = transcode_step(ost, demux_pkt); if (ret < 0 && ret != AVERROR_EOF) { av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n", av_err2str(ret)); break; @@ -1256,6 +1253,9 @@ static int transcode(int *err_rate_exceeded) /* dump report by using the first video and audio streams */ print_report(1, timer_start, av_gettime_relative()); +fail: + av_packet_free(&demux_pkt); + return ret; } diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index f50222472c..3c153021f8 100644 --- a/fftools/ffmpeg.h +++ b/fftools/ffmpeg.h @@ -407,7 +407,6 @@ typedef struct InputFile { AVFormatContext *ctx; int eof_reached; /* true if eof reached */ - int eagain; /* true if last read attempt returned EAGAIN */ int64_t input_ts_offset; int input_sync_ref; /** @@ -859,7 +858,7 @@ void ifile_close(InputFile **f); * caller should flush decoders and read from this demuxer again * - a negative error code on failure */ -int ifile_get_packet(InputFile *f, AVPacket **pkt); +int ifile_get_packet(InputFile *f, AVPacket *pkt); int ist_output_add(InputStream *ist, OutputStream *ost); int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple); diff --git a/fftools/ffmpeg_demux.c b/fftools/ffmpeg_demux.c index 791952f120..65a5e08ca5 100644 --- a/fftools/ffmpeg_demux.c +++ b/fftools/ffmpeg_demux.c @@ -21,6 +21,8 @@ #include "ffmpeg.h" #include "ffmpeg_utils.h" +#include "objpool.h" +#include "thread_queue.h" #include "libavutil/avassert.h" #include "libavutil/avstring.h" @@ -33,7 +35,6 @@ #include "libavutil/time.h" #include "libavutil/timestamp.h" #include "libavutil/thread.h" -#include "libavutil/threadmessage.h" #include "libavcodec/packet.h" @@ -107,19 +108,13 @@ typedef struct Demuxer { double readrate_initial_burst; - AVThreadMessageQueue *in_thread_queue; + ThreadQueue *thread_queue; int thread_queue_size; pthread_t thread; - int non_blocking; int read_started; } Demuxer; -typedef struct DemuxMsg { - AVPacket *pkt; - int looping; -} DemuxMsg; - static DemuxStream *ds_from_ist(InputStream *ist) { return (DemuxStream*)ist; @@ -440,26 +435,16 @@ static int ts_fixup(Demuxer *d, AVPacket *pkt) return 0; } -// process an input packet into a message to send to the consumer thread -// src is always cleared by this function -static int input_packet_process(Demuxer *d, DemuxMsg *msg, AVPacket *src) +static int input_packet_process(Demuxer *d, AVPacket *pkt) { InputFile *f = &d->f; - InputStream *ist = f->streams[src->stream_index]; + InputStream *ist = f->streams[pkt->stream_index]; DemuxStream *ds = ds_from_ist(ist); - AVPacket *pkt; int ret = 0; - pkt = av_packet_alloc(); - if (!pkt) { - av_packet_unref(src); - return AVERROR(ENOMEM); - } - av_packet_move_ref(pkt, src); - ret = ts_fixup(d, pkt); if (ret < 0) - goto fail; + return ret; ds->data_size += pkt->size; ds->nb_packets++; @@ -475,13 +460,7 @@ static int input_packet_process(Demuxer *d, DemuxMsg *msg, AVPacket *src) av_ts2timestr(input_files[ist->file_index]->ts_offset, &AV_TIME_BASE_Q)); } - msg->pkt = pkt; - pkt = NULL; - -fail: - av_packet_free(&pkt); - - return ret; + return 0; } static void readrate_sleep(Demuxer *d) @@ -531,7 +510,6 @@ static void *input_thread(void *arg) Demuxer *d = arg; InputFile *f = &d->f; AVPacket *pkt; - unsigned flags = d->non_blocking ? AV_THREAD_MESSAGE_NONBLOCK : 0; int ret = 0; pkt = av_packet_alloc(); @@ -547,8 +525,6 @@ static void *input_thread(void *arg) d->wallclock_start = av_gettime_relative(); while (1) { - DemuxMsg msg = { NULL }; - ret = av_read_frame(f->ctx, pkt); if (ret == AVERROR(EAGAIN)) { @@ -558,8 +534,8 @@ static void *input_thread(void *arg) if (ret < 0) { if (d->loop) { /* signal looping to the consumer thread */ - msg.looping = 1; - ret = av_thread_message_queue_send(d->in_thread_queue, &msg, 0); + pkt->stream_index = -1; + ret = tq_send(d->thread_queue, 0, pkt); if (ret >= 0) ret = seek_to_start(d); if (ret >= 0) @@ -602,35 +578,26 @@ static void *input_thread(void *arg) } } - ret = input_packet_process(d, &msg, pkt); + ret = input_packet_process(d, pkt); if (ret < 0) break; if (f->readrate) readrate_sleep(d); - ret = av_thread_message_queue_send(d->in_thread_queue, &msg, flags); - if (flags && ret == AVERROR(EAGAIN)) { - flags = 0; - ret = av_thread_message_queue_send(d->in_thread_queue, &msg, flags); - av_log(f, AV_LOG_WARNING, - "Thread message queue blocking; consider raising the " - "thread_queue_size option (current value: %d)\n", - d->thread_queue_size); - } + ret = tq_send(d->thread_queue, 0, pkt); if (ret < 0) { if (ret != AVERROR_EOF) av_log(f, AV_LOG_ERROR, "Unable to send packet to main thread: %s\n", av_err2str(ret)); - av_packet_free(&msg.pkt); break; } } finish: av_assert0(ret < 0); - av_thread_message_queue_set_err_recv(d->in_thread_queue, ret); + tq_send_finish(d->thread_queue, 0); av_packet_free(&pkt); @@ -642,16 +609,16 @@ finish: static void thread_stop(Demuxer *d) { InputFile *f = &d->f; - DemuxMsg msg; - if (!d->in_thread_queue) + if (!d->thread_queue) return; - av_thread_message_queue_set_err_send(d->in_thread_queue, AVERROR_EOF); - while (av_thread_message_queue_recv(d->in_thread_queue, &msg, 0) >= 0) - av_packet_free(&msg.pkt); + + tq_receive_finish(d->thread_queue, 0); pthread_join(d->thread, NULL); - av_thread_message_queue_free(&d->in_thread_queue); + + tq_free(&d->thread_queue); + av_thread_message_queue_free(&f->audio_ts_queue); } @@ -659,18 +626,20 @@ static int thread_start(Demuxer *d) { int ret; InputFile *f = &d->f; + ObjPool *op; if (d->thread_queue_size <= 0) d->thread_queue_size = (nb_input_files > 1 ? 8 : 1); - if (nb_input_files > 1 && - (f->ctx->pb ? !f->ctx->pb->seekable : - strcmp(f->ctx->iformat->name, "lavfi"))) - d->non_blocking = 1; - ret = av_thread_message_queue_alloc(&d->in_thread_queue, - d->thread_queue_size, sizeof(DemuxMsg)); - if (ret < 0) - return ret; + op = objpool_alloc_packets(); + if (!op) + return AVERROR(ENOMEM); + + d->thread_queue = tq_alloc(1, d->thread_queue_size, op, pkt_move); + if (!d->thread_queue) { + objpool_free(&op); + return AVERROR(ENOMEM); + } if (d->loop) { int nb_audio_dec = 0; @@ -700,31 +669,30 @@ static int thread_start(Demuxer *d) return 0; fail: - av_thread_message_queue_free(&d->in_thread_queue); + tq_free(&d->thread_queue); return ret; } -int ifile_get_packet(InputFile *f, AVPacket **pkt) +int ifile_get_packet(InputFile *f, AVPacket *pkt) { Demuxer *d = demuxer_from_ifile(f); - DemuxMsg msg; - int ret; + int ret, dummy; - if (!d->in_thread_queue) { + if (!d->thread_queue) { ret = thread_start(d); if (ret < 0) return ret; } - ret = av_thread_message_queue_recv(d->in_thread_queue, &msg, - d->non_blocking ? - AV_THREAD_MESSAGE_NONBLOCK : 0); + ret = tq_receive(d->thread_queue, &dummy, pkt); if (ret < 0) return ret; - if (msg.looping) - return 1; - *pkt = msg.pkt; + if (pkt->stream_index == -1) { + av_assert0(!pkt->data && !pkt->side_data_elems); + return 1; + } + return 0; } diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c index 1df1212442..d93fc74e07 100644 --- a/fftools/ffmpeg_filter.c +++ b/fftools/ffmpeg_filter.c @@ -2010,9 +2010,8 @@ static int choose_input(const FilterGraph *fg, const FilterGraphThread *fgt) for (int i = 0; i < fg->nb_inputs; i++) { InputFilter *ifilter = fg->inputs[i]; InputFilterPriv *ifp = ifp_from_ifilter(ifilter); - InputStream *ist = ifp->ist; - if (input_files[ist->file_index]->eagain || fgt->eof_in[i]) + if (fgt->eof_in[i]) continue; nb_requests = av_buffersrc_get_nb_failed_requests(ifp->filter); @@ -2022,6 +2021,8 @@ static int choose_input(const FilterGraph *fg, const FilterGraphThread *fgt) } } + av_assert0(best_input >= 0); + return best_input; }