From patchwork Thu Nov 23 19:15:05 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Anton Khirnov X-Patchwork-Id: 44774 Delivered-To: ffmpegpatchwork2@gmail.com Received: by 2002:a05:6a20:bca6:b0:181:818d:5e7f with SMTP id fx38csp802551pzb; Thu, 23 Nov 2023 11:20:25 -0800 (PST) X-Google-Smtp-Source: AGHT+IE8bf74g8mOUci0OuvmMlhxtGg0IymJItWkfVw2x3t7A+WKon6Clq1XKTFm6hZMGb5SXqsN X-Received: by 2002:a17:906:2f03:b0:9ef:e6fd:fe5a with SMTP id v3-20020a1709062f0300b009efe6fdfe5amr162187eji.74.1700767225208; Thu, 23 Nov 2023 11:20:25 -0800 (PST) ARC-Seal: i=1; a=rsa-sha256; t=1700767225; cv=none; d=google.com; s=arc-20160816; b=vyyK+PbH/iYdVAMrSLDlC8Sy9uOtf1+iagXHRFrSyyDdpKNDK0GLoMkEiDO5PBDAhF IQjKkyrL2SLWPlUpTnZ3toHVF26FKVNOlptJlXLRc4XOrVHMAAbGu9r54+UEAACGsm9E mKBwGLrJbRHz3Y+cCWHucUCy+wXAw5N1fyMOcVefI64UFG1YMJUfYhAA6vypX/fO4I6Y 6pECsl6RZkdFtG+VVuaiyZMzePBT1lW6tYCoBsZALaJgoyBg0BpO4EB/2Xgnjx2hVR11 Pn1xDnqhtwO6OOJvuxKU500V9mmPR7q0z/UdyLV0zyRCkjxgAqGLQ4yKesgUuxOSKWGS bUlQ== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:content-transfer-encoding:reply-to:list-subscribe :list-help:list-post:list-archive:list-unsubscribe:list-id :precedence:subject:mime-version:references:in-reply-to:message-id :date:to:from:delivered-to; bh=O1QzBQFn6R9nQFV5nUY6yphImtvw2m/yAIMTs81ffM8=; fh=YOA8vD9MJZuwZ71F/05pj6KdCjf6jQRmzLS+CATXUQk=; b=mUlfUVPPz1A60TAJNMyWhKRyjM149+ofbAMRtGaARoAlnObCzaK9GypU9ilT6bOGtV aWd/NwN2OJ5F8cc48aZIxTGSgOpXyLdY96voF0XM/mQSSsJi6cAbcQ/ALVttdi19gYzc L+nGvWn2OXzK/4NZHM3CszSfU+2JB7avdQvWSMKcnJATrgYzH846S5S64saFQ59OAfUH tNLjdAjn3EeTxPxh2gLRb//eOIYxGnHpzp9JG+QINvnXtM/b40H1xTNTRLr+KMywWnfJ 3jCSkkWMklC8sA2gLM1euKC2+jVRzTAxHp+mU9kprxudAdf+x+jzWk+s66afwbmbKhOl eYnw== ARC-Authentication-Results: i=1; mx.google.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org. [79.124.17.100]) by mx.google.com with ESMTP id i26-20020a170906091a00b009ddd0c8c5c1si954361ejd.517.2023.11.23.11.20.24; Thu, 23 Nov 2023 11:20:25 -0800 (PST) Received-SPF: pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) client-ip=79.124.17.100; Authentication-Results: mx.google.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id A3E9F68CF39; Thu, 23 Nov 2023 21:19:04 +0200 (EET) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from mail1.khirnov.net (quelana.khirnov.net [94.230.150.81]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id 9E3AF68CF01 for ; Thu, 23 Nov 2023 21:18:54 +0200 (EET) Received: from localhost (mail1.khirnov.net [IPv6:::1]) by mail1.khirnov.net (Postfix) with ESMTP id 2D21D1546 for ; Thu, 23 Nov 2023 20:18:19 +0100 (CET) Received: from mail1.khirnov.net ([IPv6:::1]) by localhost (mail1.khirnov.net [IPv6:::1]) (amavis, port 10024) with ESMTP id KRjyfsqNZlfT for ; Thu, 23 Nov 2023 20:18:18 +0100 (CET) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:2a00:c500:561:201::7]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256 client-signature RSA-PSS (2048 bits) client-digest SHA256) (Client CN "libav.khirnov.net", Issuer "smtp.khirnov.net SMTP CA" (verified OK)) by mail1.khirnov.net (Postfix) with ESMTPS id 7AA3F1BAB for ; Thu, 23 Nov 2023 20:18:15 +0100 (CET) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:::1]) by libav.khirnov.net (Postfix) with ESMTP id 6D0E43A05B7 for ; Thu, 23 Nov 2023 20:18:15 +0100 (CET) From: Anton Khirnov To: ffmpeg-devel@ffmpeg.org Date: Thu, 23 Nov 2023 20:15:05 +0100 Message-ID: <20231123191524.11296-12-anton@khirnov.net> X-Mailer: git-send-email 2.42.0 In-Reply-To: <20231123191524.11296-2-anton@khirnov.net> References: <20231123191524.11296-2-anton@khirnov.net> MIME-Version: 1.0 Subject: [FFmpeg-devel] [PATCH 10/13] fftools/ffmpeg_demux: switch from AVThreadMessageQueue to ThreadQueue X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" X-TUID: eta2vpP7lHuQ * the code is made shorter and simpler * avoids constantly allocating and freeing AVPackets, thanks to ThreadQueue integration with ObjPool * is consistent with decoding/filtering/muxing * reduces the diff in the future switch to thread-aware scheduling This makes ifile_get_packet() always block. Any potential issues caused by this will be resolved by the switch to thread-aware scheduling in future commits. --- fftools/ffmpeg.c | 32 ++++++------ fftools/ffmpeg.h | 3 +- fftools/ffmpeg_demux.c | 108 ++++++++++++++-------------------------- fftools/ffmpeg_filter.c | 5 +- 4 files changed, 58 insertions(+), 90 deletions(-) diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c index 61fcda2526..cf8a50bffc 100644 --- a/fftools/ffmpeg.c +++ b/fftools/ffmpeg.c @@ -1043,9 +1043,6 @@ static int check_keyboard_interaction(int64_t cur_time) static void reset_eagain(void) { - int i; - for (i = 0; i < nb_input_files; i++) - input_files[i]->eagain = 0; for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) ost->unavailable = 0; } @@ -1069,19 +1066,14 @@ static void decode_flush(InputFile *ifile) * this function should be called again * - AVERROR_EOF -- this function should not be called again */ -static int process_input(int file_index) +static int process_input(int file_index, AVPacket *pkt) { InputFile *ifile = input_files[file_index]; InputStream *ist; - AVPacket *pkt; int ret, i; - ret = ifile_get_packet(ifile, &pkt); + ret = ifile_get_packet(ifile, pkt); - if (ret == AVERROR(EAGAIN)) { - ifile->eagain = 1; - return ret; - } if (ret == 1) { /* the input file is looped: flush the decoders */ decode_flush(ifile); @@ -1128,7 +1120,7 @@ static int process_input(int file_index) ret = process_input_packet(ist, pkt, 0); - av_packet_free(&pkt); + av_packet_unref(pkt); return ret < 0 ? ret : 0; } @@ -1138,7 +1130,7 @@ static int process_input(int file_index) * * @return 0 for success, <0 for error */ -static int transcode_step(OutputStream *ost) +static int transcode_step(OutputStream *ost, AVPacket *demux_pkt) { InputStream *ist = NULL; int ret; @@ -1153,10 +1145,8 @@ static int transcode_step(OutputStream *ost) av_assert0(ist); } - ret = process_input(ist->file_index); + ret = process_input(ist->file_index, demux_pkt); if (ret == AVERROR(EAGAIN)) { - if (input_files[ist->file_index]->eagain) - ost->unavailable = 1; return 0; } @@ -1182,12 +1172,19 @@ static int transcode(int *err_rate_exceeded) int ret = 0, i; InputStream *ist; int64_t timer_start; + AVPacket *demux_pkt = NULL; print_stream_maps(); *err_rate_exceeded = 0; atomic_store(&transcode_init_done, 1); + demux_pkt = av_packet_alloc(); + if (!demux_pkt) { + ret = AVERROR(ENOMEM); + goto fail; + } + if (stdin_interaction) { av_log(NULL, AV_LOG_INFO, "Press [q] to stop, [?] for help\n"); } @@ -1215,7 +1212,7 @@ static int transcode(int *err_rate_exceeded) break; } - ret = transcode_step(ost); + ret = transcode_step(ost, demux_pkt); if (ret < 0 && ret != AVERROR_EOF) { av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n", av_err2str(ret)); break; @@ -1256,6 +1253,9 @@ static int transcode(int *err_rate_exceeded) /* dump report by using the first video and audio streams */ print_report(1, timer_start, av_gettime_relative()); +fail: + av_packet_free(&demux_pkt); + return ret; } diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index f50222472c..3c153021f8 100644 --- a/fftools/ffmpeg.h +++ b/fftools/ffmpeg.h @@ -407,7 +407,6 @@ typedef struct InputFile { AVFormatContext *ctx; int eof_reached; /* true if eof reached */ - int eagain; /* true if last read attempt returned EAGAIN */ int64_t input_ts_offset; int input_sync_ref; /** @@ -859,7 +858,7 @@ void ifile_close(InputFile **f); * caller should flush decoders and read from this demuxer again * - a negative error code on failure */ -int ifile_get_packet(InputFile *f, AVPacket **pkt); +int ifile_get_packet(InputFile *f, AVPacket *pkt); int ist_output_add(InputStream *ist, OutputStream *ost); int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple); diff --git a/fftools/ffmpeg_demux.c b/fftools/ffmpeg_demux.c index 791952f120..65a5e08ca5 100644 --- a/fftools/ffmpeg_demux.c +++ b/fftools/ffmpeg_demux.c @@ -21,6 +21,8 @@ #include "ffmpeg.h" #include "ffmpeg_utils.h" +#include "objpool.h" +#include "thread_queue.h" #include "libavutil/avassert.h" #include "libavutil/avstring.h" @@ -33,7 +35,6 @@ #include "libavutil/time.h" #include "libavutil/timestamp.h" #include "libavutil/thread.h" -#include "libavutil/threadmessage.h" #include "libavcodec/packet.h" @@ -107,19 +108,13 @@ typedef struct Demuxer { double readrate_initial_burst; - AVThreadMessageQueue *in_thread_queue; + ThreadQueue *thread_queue; int thread_queue_size; pthread_t thread; - int non_blocking; int read_started; } Demuxer; -typedef struct DemuxMsg { - AVPacket *pkt; - int looping; -} DemuxMsg; - static DemuxStream *ds_from_ist(InputStream *ist) { return (DemuxStream*)ist; @@ -440,26 +435,16 @@ static int ts_fixup(Demuxer *d, AVPacket *pkt) return 0; } -// process an input packet into a message to send to the consumer thread -// src is always cleared by this function -static int input_packet_process(Demuxer *d, DemuxMsg *msg, AVPacket *src) +static int input_packet_process(Demuxer *d, AVPacket *pkt) { InputFile *f = &d->f; - InputStream *ist = f->streams[src->stream_index]; + InputStream *ist = f->streams[pkt->stream_index]; DemuxStream *ds = ds_from_ist(ist); - AVPacket *pkt; int ret = 0; - pkt = av_packet_alloc(); - if (!pkt) { - av_packet_unref(src); - return AVERROR(ENOMEM); - } - av_packet_move_ref(pkt, src); - ret = ts_fixup(d, pkt); if (ret < 0) - goto fail; + return ret; ds->data_size += pkt->size; ds->nb_packets++; @@ -475,13 +460,7 @@ static int input_packet_process(Demuxer *d, DemuxMsg *msg, AVPacket *src) av_ts2timestr(input_files[ist->file_index]->ts_offset, &AV_TIME_BASE_Q)); } - msg->pkt = pkt; - pkt = NULL; - -fail: - av_packet_free(&pkt); - - return ret; + return 0; } static void readrate_sleep(Demuxer *d) @@ -531,7 +510,6 @@ static void *input_thread(void *arg) Demuxer *d = arg; InputFile *f = &d->f; AVPacket *pkt; - unsigned flags = d->non_blocking ? AV_THREAD_MESSAGE_NONBLOCK : 0; int ret = 0; pkt = av_packet_alloc(); @@ -547,8 +525,6 @@ static void *input_thread(void *arg) d->wallclock_start = av_gettime_relative(); while (1) { - DemuxMsg msg = { NULL }; - ret = av_read_frame(f->ctx, pkt); if (ret == AVERROR(EAGAIN)) { @@ -558,8 +534,8 @@ static void *input_thread(void *arg) if (ret < 0) { if (d->loop) { /* signal looping to the consumer thread */ - msg.looping = 1; - ret = av_thread_message_queue_send(d->in_thread_queue, &msg, 0); + pkt->stream_index = -1; + ret = tq_send(d->thread_queue, 0, pkt); if (ret >= 0) ret = seek_to_start(d); if (ret >= 0) @@ -602,35 +578,26 @@ static void *input_thread(void *arg) } } - ret = input_packet_process(d, &msg, pkt); + ret = input_packet_process(d, pkt); if (ret < 0) break; if (f->readrate) readrate_sleep(d); - ret = av_thread_message_queue_send(d->in_thread_queue, &msg, flags); - if (flags && ret == AVERROR(EAGAIN)) { - flags = 0; - ret = av_thread_message_queue_send(d->in_thread_queue, &msg, flags); - av_log(f, AV_LOG_WARNING, - "Thread message queue blocking; consider raising the " - "thread_queue_size option (current value: %d)\n", - d->thread_queue_size); - } + ret = tq_send(d->thread_queue, 0, pkt); if (ret < 0) { if (ret != AVERROR_EOF) av_log(f, AV_LOG_ERROR, "Unable to send packet to main thread: %s\n", av_err2str(ret)); - av_packet_free(&msg.pkt); break; } } finish: av_assert0(ret < 0); - av_thread_message_queue_set_err_recv(d->in_thread_queue, ret); + tq_send_finish(d->thread_queue, 0); av_packet_free(&pkt); @@ -642,16 +609,16 @@ finish: static void thread_stop(Demuxer *d) { InputFile *f = &d->f; - DemuxMsg msg; - if (!d->in_thread_queue) + if (!d->thread_queue) return; - av_thread_message_queue_set_err_send(d->in_thread_queue, AVERROR_EOF); - while (av_thread_message_queue_recv(d->in_thread_queue, &msg, 0) >= 0) - av_packet_free(&msg.pkt); + + tq_receive_finish(d->thread_queue, 0); pthread_join(d->thread, NULL); - av_thread_message_queue_free(&d->in_thread_queue); + + tq_free(&d->thread_queue); + av_thread_message_queue_free(&f->audio_ts_queue); } @@ -659,18 +626,20 @@ static int thread_start(Demuxer *d) { int ret; InputFile *f = &d->f; + ObjPool *op; if (d->thread_queue_size <= 0) d->thread_queue_size = (nb_input_files > 1 ? 8 : 1); - if (nb_input_files > 1 && - (f->ctx->pb ? !f->ctx->pb->seekable : - strcmp(f->ctx->iformat->name, "lavfi"))) - d->non_blocking = 1; - ret = av_thread_message_queue_alloc(&d->in_thread_queue, - d->thread_queue_size, sizeof(DemuxMsg)); - if (ret < 0) - return ret; + op = objpool_alloc_packets(); + if (!op) + return AVERROR(ENOMEM); + + d->thread_queue = tq_alloc(1, d->thread_queue_size, op, pkt_move); + if (!d->thread_queue) { + objpool_free(&op); + return AVERROR(ENOMEM); + } if (d->loop) { int nb_audio_dec = 0; @@ -700,31 +669,30 @@ static int thread_start(Demuxer *d) return 0; fail: - av_thread_message_queue_free(&d->in_thread_queue); + tq_free(&d->thread_queue); return ret; } -int ifile_get_packet(InputFile *f, AVPacket **pkt) +int ifile_get_packet(InputFile *f, AVPacket *pkt) { Demuxer *d = demuxer_from_ifile(f); - DemuxMsg msg; - int ret; + int ret, dummy; - if (!d->in_thread_queue) { + if (!d->thread_queue) { ret = thread_start(d); if (ret < 0) return ret; } - ret = av_thread_message_queue_recv(d->in_thread_queue, &msg, - d->non_blocking ? - AV_THREAD_MESSAGE_NONBLOCK : 0); + ret = tq_receive(d->thread_queue, &dummy, pkt); if (ret < 0) return ret; - if (msg.looping) - return 1; - *pkt = msg.pkt; + if (pkt->stream_index == -1) { + av_assert0(!pkt->data && !pkt->side_data_elems); + return 1; + } + return 0; } diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c index cd4b55dd40..d8320b7526 100644 --- a/fftools/ffmpeg_filter.c +++ b/fftools/ffmpeg_filter.c @@ -2010,9 +2010,8 @@ static int choose_input(const FilterGraph *fg, const FilterGraphThread *fgt) for (int i = 0; i < fg->nb_inputs; i++) { InputFilter *ifilter = fg->inputs[i]; InputFilterPriv *ifp = ifp_from_ifilter(ifilter); - InputStream *ist = ifp->ist; - if (input_files[ist->file_index]->eagain || fgt->eof_in[i]) + if (fgt->eof_in[i]) continue; nb_requests = av_buffersrc_get_nb_failed_requests(ifp->filter); @@ -2022,6 +2021,8 @@ static int choose_input(const FilterGraph *fg, const FilterGraphThread *fgt) } } + av_assert0(best_input >= 0); + return best_input; }