From patchwork Thu Jun 16 19:55:34 2022 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Anton Khirnov X-Patchwork-Id: 36285 Delivered-To: ffmpegpatchwork2@gmail.com Received: by 2002:a05:6a20:1a22:b0:84:42e0:ad30 with SMTP id cj34csp1129517pzb; Thu, 16 Jun 2022 13:08:09 -0700 (PDT) X-Google-Smtp-Source: AGRyM1ssUzG09M3vJyxk0hwrH/I82A37yxs1/6vdpf8mNP//BWK+0gt2s35YHqHr/2Ji+h/GfZfu X-Received: by 2002:a17:907:3f0a:b0:711:f0e2:ad67 with SMTP id hq10-20020a1709073f0a00b00711f0e2ad67mr5945738ejc.277.1655410088806; Thu, 16 Jun 2022 13:08:08 -0700 (PDT) ARC-Seal: i=1; a=rsa-sha256; t=1655410088; cv=none; d=google.com; s=arc-20160816; b=y9l8M0IJloziJ6I80eVZvlFWTsAeQNo1aAWhbgewFo0x2ADHOCamaoJIqjtqTyNcQw 7dxCYtQeBs/5k0I82UUacXUKbY2B6NPuR8UrE2KRxJCiT5DLFwOHWm5BsvH5U/zqwH1i +iPYybSiH2aJvtyr7GzngORpEoE8dGfpez5OS0b1wd+swWbjhqFtGPusFMUSj6KPW2H4 0FRPbwxWw8010mBrSOgPVtqzi5ByBS3th/L5DPrrC6mSutA3Vu6QaPPHCjX/5jM5Mk6p HdVnJAfGW8o5dek2p8sFORr/ld7TqTpqCo/0uHKp4r7Gln2RqiTIoUkxhn/HLJgPQ1F4 rAlg== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:content-transfer-encoding:reply-to:list-subscribe :list-help:list-post:list-archive:list-unsubscribe:list-id :precedence:subject:mime-version:references:in-reply-to:message-id :date:to:from:delivered-to; bh=YtRks+bJu+X7LrRn4/VFHL+bSAlB2Rc7sbmM5M1JwNE=; b=avUAPJjrRNLsrnCPDWK4y3BDoA7QBBsXP8S83aKowQQubrDp3sLvFT/epnwTppSjh/ e9gK1A4o4wbaq7iUdMhyCV1KKv0/azusM/AsTVP47effQAcb8jGFDLPIEMQFxpMKwYr8 4o7s2rqrheWbaJm+EyVJZ/A5pxr5PhUQImr48HSEWTkEwQQBHDLzisg0BwfB7FaTO/oY jvseCs31ffsLNHK874YS1tBa29Jb0ejvQBaLFBm2PEzxu5rtczOdmIVPUa4rdZ8/5gbO jKtQ4YCrrcrUNPR0DwBbhjPZmc1DFSe/RmWQrWyhFRXNjRBtFBWjcs+Hxr/tj2palGzt mUOQ== ARC-Authentication-Results: i=1; mx.google.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org. [79.124.17.100]) by mx.google.com with ESMTP id hb44-20020a170907162c00b006feb0e0856dsi3049946ejc.653.2022.06.16.13.08.08; Thu, 16 Jun 2022 13:08:08 -0700 (PDT) Received-SPF: pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) client-ip=79.124.17.100; Authentication-Results: mx.google.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 0822E68B8E0; Thu, 16 Jun 2022 23:04:08 +0300 (EEST) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from mail0.khirnov.net (red.khirnov.net [176.97.15.12]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id 6FF2A68B884 for ; Thu, 16 Jun 2022 23:03:49 +0300 (EEST) Received: from localhost (localhost [IPv6:::1]) by mail0.khirnov.net (Postfix) with ESMTP id 19373240175 for ; Thu, 16 Jun 2022 22:03:49 +0200 (CEST) Received: from mail0.khirnov.net ([IPv6:::1]) by localhost (mail0.khirnov.net [IPv6:::1]) (amavisd-new, port 10024) with ESMTP id JU8ovR3avZnU for ; Thu, 16 Jun 2022 22:03:44 +0200 (CEST) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:2a00:c500:561:201::7]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256 client-signature RSA-PSS (2048 bits) client-digest SHA256) (Client CN "libav.khirnov.net", Issuer "smtp.khirnov.net SMTP CA" (verified OK)) by mail0.khirnov.net (Postfix) with ESMTPS id 77FAE240690 for ; Thu, 16 Jun 2022 22:03:34 +0200 (CEST) Received: by libav.khirnov.net (Postfix, from userid 1000) id 228903A2237; Thu, 16 Jun 2022 22:03:31 +0200 (CEST) From: Anton Khirnov To: ffmpeg-devel@ffmpeg.org Date: Thu, 16 Jun 2022 21:55:34 +0200 Message-Id: <20220616195534.5278-35-anton@khirnov.net> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20220616195534.5278-1-anton@khirnov.net> References: <20220616195534.5278-1-anton@khirnov.net> MIME-Version: 1.0 Subject: [FFmpeg-devel] [PATCH 35/35] fftools/ffmpeg: move each muxer to a separate thread X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" X-TUID: BMwxAgA+EubG --- doc/ffmpeg.texi | 11 +- fftools/ffmpeg.c | 18 ++- fftools/ffmpeg.h | 7 +- fftools/ffmpeg_mux.c | 299 +++++++++++++++++++++++++++++++------------ fftools/ffmpeg_opt.c | 4 +- 5 files changed, 236 insertions(+), 103 deletions(-) diff --git a/doc/ffmpeg.texi b/doc/ffmpeg.texi index 7542832eb3..e4e2d6ddac 100644 --- a/doc/ffmpeg.texi +++ b/doc/ffmpeg.texi @@ -1900,13 +1900,16 @@ to the @option{-ss} option is considered an actual timestamp, and is not offset by the start time of the file. This matters only for files which do not start from timestamp 0, such as transport streams. -@item -thread_queue_size @var{size} (@emph{input}) -This option sets the maximum number of queued packets when reading from the -file or device. With low latency / high rate live streams, packets may be -discarded if they are not read in a timely manner; setting this value can +@item -thread_queue_size @var{size} (@emph{input/output}) +For input, this option sets the maximum number of queued packets when reading +from the file or device. With low latency / high rate live streams, packets may +be discarded if they are not read in a timely manner; setting this value can force ffmpeg to use a separate input thread and read packets as soon as they arrive. By default ffmpeg only does this if multiple inputs are specified. +For output, this option specified the maximum number of packets that may be +queued to each muxing thread. + @item -sdp_file @var{file} (@emph{global}) Print sdp information for an output stream to @var{file}. This allows dumping sdp information when at least one output isn't an diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c index aea7335c8f..2a7ff16b74 100644 --- a/fftools/ffmpeg.c +++ b/fftools/ffmpeg.c @@ -746,9 +746,6 @@ static void output_packet(OutputFile *of, AVPacket *pkt, goto mux_fail; } - if (eof) - ost->finished |= MUXER_FINISHED; - return; mux_fail: @@ -1521,7 +1518,7 @@ static void print_final_stats(int64_t total_size) enum AVMediaType type = ost->enc_ctx->codec_type; total_size += ost->data_size; - total_packets += ost->packets_written; + total_packets += atomic_load(&ost->packets_written); av_log(NULL, AV_LOG_VERBOSE, " Output stream #%d:%d (%s): ", i, j, av_get_media_type_string(type)); @@ -1534,7 +1531,7 @@ static void print_final_stats(int64_t total_size) } av_log(NULL, AV_LOG_VERBOSE, "%"PRIu64" packets muxed (%"PRIu64" bytes); ", - ost->packets_written, ost->data_size); + atomic_load(&ost->packets_written), ost->data_size); av_log(NULL, AV_LOG_VERBOSE, "\n"); } @@ -1603,7 +1600,7 @@ static void print_report(int is_last_report, int64_t timer_start, int64_t cur_ti if (!vid && enc->codec_type == AVMEDIA_TYPE_VIDEO) { float fps; - frame_number = ost->packets_written; + frame_number = atomic_load(&ost->packets_written); fps = t > 1 ? frame_number / t : 0; av_bprintf(&buf, "frame=%5d fps=%3.*f q=%3.1f ", frame_number, fps < 9.95, fps, q); @@ -3480,9 +3477,8 @@ static int need_output(void) for (i = 0; i < nb_output_streams; i++) { OutputStream *ost = output_streams[i]; - OutputFile *of = output_files[ost->file_index]; - if (ost->finished || of_finished(of)) + if (ost->finished) continue; return 1; @@ -4402,9 +4398,11 @@ static int transcode(void) /* close each encoder */ for (i = 0; i < nb_output_streams; i++) { + uint64_t packets_written; ost = output_streams[i]; - total_packets_written += ost->packets_written; - if (!ost->packets_written && (abort_on_flags & ABORT_ON_FLAG_EMPTY_OUTPUT_STREAM)) { + packets_written = atomic_load(&ost->packets_written); + total_packets_written += packets_written; + if (!packets_written && (abort_on_flags & ABORT_ON_FLAG_EMPTY_OUTPUT_STREAM)) { av_log(NULL, AV_LOG_FATAL, "Empty output on stream %d.\n", i); exit_program(1); } diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index 9baa701c67..8940f719b0 100644 --- a/fftools/ffmpeg.h +++ b/fftools/ffmpeg.h @@ -21,6 +21,7 @@ #include "config.h" +#include #include #include #include @@ -555,7 +556,7 @@ typedef struct OutputStream { // combined size of all the packets written uint64_t data_size; // number of packets send to the muxer - uint64_t packets_written; + atomic_uint_least64_t packets_written; // number of frames/samples sent to the encoder uint64_t frames_encoded; uint64_t samples_encoded; @@ -697,14 +698,14 @@ int hw_device_setup_for_filter(FilterGraph *fg); int hwaccel_decode_init(AVCodecContext *avctx); int of_muxer_init(OutputFile *of, AVFormatContext *fc, - AVDictionary *opts, int64_t limit_filesize); + AVDictionary *opts, int64_t limit_filesize, + int thread_queue_size); /* open the muxer when all the streams are initialized */ int of_check_init(OutputFile *of); int of_write_trailer(OutputFile *of); void of_close(OutputFile **pof); int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost); -int of_finished(OutputFile *of); int64_t of_filesize(OutputFile *of); AVChapter * const * of_get_chapters(OutputFile *of, unsigned int *nb_chapters); diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c index 2abadd3f9b..67b875d41d 100644 --- a/fftools/ffmpeg_mux.c +++ b/fftools/ffmpeg_mux.c @@ -16,17 +16,21 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ +#include #include #include #include "ffmpeg.h" +#include "objpool.h" #include "sync_queue.h" +#include "thread_queue.h" #include "libavutil/fifo.h" #include "libavutil/intreadwrite.h" #include "libavutil/log.h" #include "libavutil/mem.h" #include "libavutil/timestamp.h" +#include "libavutil/thread.h" #include "libavcodec/packet.h" @@ -51,13 +55,18 @@ typedef struct MuxStream { struct Muxer { AVFormatContext *fc; + pthread_t thread; + ThreadQueue *tq; + MuxStream *streams; AVDictionary *opts; + int thread_queue_size; + /* filesize limit expressed in bytes */ int64_t limit_filesize; - int64_t final_filesize; + atomic_int_least64_t last_filesize; int header_written; AVPacket *sq_pkt; @@ -65,15 +74,6 @@ struct Muxer { static int want_sdp = 1; -static void close_all_output_streams(OutputStream *ost, OSTFinished this_stream, OSTFinished others) -{ - int i; - for (i = 0; i < nb_output_streams; i++) { - OutputStream *ost2 = output_streams[i]; - ost2->finished |= ost == ost2 ? this_stream : others; - } -} - static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) { MuxStream *ms = &of->mux->streams[ost->index]; @@ -116,13 +116,32 @@ static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) return 0; } +static int64_t filesize(AVIOContext *pb) +{ + int64_t ret = -1; + + if (pb) { + ret = avio_size(pb); + if (ret <= 0) // FIXME improve avio_size() so it works with non seekable output too + ret = avio_tell(pb); + } + + return ret; +} + static int write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) { MuxStream *ms = &of->mux->streams[ost->index]; AVFormatContext *s = of->mux->fc; AVStream *st = ost->st; + int64_t fs; int ret; + fs = filesize(s->pb); + atomic_store(&of->mux->last_filesize, fs); + if (fs >= of->mux->limit_filesize) + return AVERROR_EOF; + if ((st->codecpar->codec_type == AVMEDIA_TYPE_VIDEO && ost->vsync_method == VSYNC_DROP) || (st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO && audio_sync_method < 0)) pkt->pts = pkt->dts = AV_NOPTS_VALUE; @@ -175,7 +194,7 @@ static int write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) ms->last_mux_dts = pkt->dts; ost->data_size += pkt->size; - ost->packets_written++; + atomic_fetch_add(&ost->packets_written, 1); pkt->stream_index = ost->index; @@ -193,66 +212,81 @@ static int write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) ret = av_interleaved_write_frame(s, pkt); if (ret < 0) { print_error("av_interleaved_write_frame()", ret); - main_return_code = 1; - close_all_output_streams(ost, MUXER_FINISHED | ENCODER_FINISHED, ENCODER_FINISHED); return ret; } return 0; } -static int submit_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) +static int sync_queue_process(OutputFile *of, OutputStream *ost, AVPacket *pkt) { if (ost->sq_idx_mux >= 0) { int ret = sq_send(of->sq_mux, ost->sq_idx_mux, SQPKT(pkt)); - if (ret < 0) { - if (pkt) - av_packet_unref(pkt); - if (ret == AVERROR_EOF) { - ost->finished |= MUXER_FINISHED; - return 0; - } else - return ret; - } + if (ret < 0) + return ret; while (1) { ret = sq_receive(of->sq_mux, -1, SQPKT(of->mux->sq_pkt)); - if (ret == AVERROR_EOF || ret == AVERROR(EAGAIN)) - return 0; - else if (ret < 0) - return ret; + if (ret < 0) + return (ret == AVERROR_EOF || ret == AVERROR(EAGAIN)) ? 0 : ret; ret = write_packet(of, output_streams[of->ost_index + ret], of->mux->sq_pkt); if (ret < 0) return ret; } - } else { - if (pkt) - return write_packet(of, ost, pkt); - - ost->finished |= MUXER_FINISHED; - } + } else if (pkt) + return write_packet(of, ost, pkt); return 0; } -int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost) +static void *muxer_thread(void *arg) { - int ret; + OutputFile *of = arg; + Muxer *mux = of->mux; + AVPacket *pkt = NULL; + int ret = 0; + + pkt = av_packet_alloc(); + if (!pkt) { + ret = AVERROR(ENOMEM); + goto finish; + } - if (of->mux->header_written) { - return submit_packet(of, ost, pkt); - } else { - /* the muxer is not initialized yet, buffer the packet */ - ret = queue_packet(of, ost, pkt); - if (ret < 0) { - av_packet_unref(pkt); - return ret; + while (1) { + OutputStream *ost; + int stream_idx; + + ret = tq_receive(mux->tq, &stream_idx, pkt); + if (stream_idx < 0) { + av_log(NULL, AV_LOG_DEBUG, + "All streams finished for output file #%d\n", of->index); + ret = 0; + break; + } + + ost = output_streams[of->ost_index + stream_idx]; + ret = sync_queue_process(of, ost, ret < 0 ? NULL : pkt); + av_packet_unref(pkt); + if (ret == AVERROR_EOF) + tq_receive_finish(mux->tq, stream_idx); + else if (ret < 0) { + av_log(NULL, AV_LOG_ERROR, + "Error muxing a packet for output file #%d\n", of->index); + break; } } - return 0; +finish: + av_packet_free(&pkt); + + for (unsigned int i = 0; i < mux->fc->nb_streams; i++) + tq_receive_finish(mux->tq, i); + + av_log(NULL, AV_LOG_DEBUG, "Terminating muxer thread %d\n", of->index); + + return (void*)(intptr_t)ret; } static int print_sdp(void) @@ -303,11 +337,125 @@ static int print_sdp(void) av_freep(&sdp_filename); } + // SDP successfully written, allow muxer threads to start + ret = 1; + fail: av_freep(&avc); return ret; } +static int submit_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) +{ + Muxer *mux = of->mux; + int ret = 0; + + if (!pkt || ost->finished & MUXER_FINISHED) + goto finish; + + ret = tq_send(mux->tq, ost->index, pkt); + if (ret < 0) + goto finish; + + return 0; + +finish: + if (pkt) + av_packet_unref(pkt); + + ost->finished |= MUXER_FINISHED; + tq_send_finish(mux->tq, ost->index); + return ret == AVERROR_EOF ? 0 : ret; +} + +int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost) +{ + int ret; + + if (of->mux->tq) { + return submit_packet(of, ost, pkt); + } else { + /* the muxer is not initialized yet, buffer the packet */ + ret = queue_packet(of, ost, pkt); + if (ret < 0) { + av_packet_unref(pkt); + return ret; + } + } + + return 0; +} + +static int thread_stop(OutputFile *of) +{ + Muxer *mux = of->mux; + void *ret; + + if (!mux || !mux->tq) + return 0; + + for (unsigned int i = 0; i < mux->fc->nb_streams; i++) + tq_send_finish(mux->tq, i); + + pthread_join(mux->thread, &ret); + + tq_free(&mux->tq); + + return (int)(intptr_t)ret; +} + +static void pkt_move(void *dst, void *src) +{ + av_packet_move_ref(dst, src); +} + +static int thread_start(OutputFile *of) +{ + Muxer *mux = of->mux; + AVFormatContext *fc = mux->fc; + ObjPool *op; + int ret; + + op = objpool_alloc_packets(); + if (!op) + return AVERROR(ENOMEM); + + mux->tq = tq_alloc(fc->nb_streams, mux->thread_queue_size, op, pkt_move); + if (!mux->tq) { + objpool_free(&op); + return AVERROR(ENOMEM); + } + + ret = pthread_create(&mux->thread, NULL, muxer_thread, (void*)of); + if (ret) { + tq_free(&mux->tq); + return AVERROR(ret); + } + + /* flush the muxing queues */ + for (int i = 0; i < fc->nb_streams; i++) { + MuxStream *ms = &of->mux->streams[i]; + OutputStream *ost = output_streams[of->ost_index + i]; + AVPacket *pkt; + + /* try to improve muxing time_base (only possible if nothing has been written yet) */ + if (!av_fifo_can_read(ms->muxing_queue)) + ost->mux_timebase = ost->st->time_base; + + while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) { + ret = submit_packet(of, ost, pkt); + if (pkt) { + ms->muxing_queue_data_size -= pkt->size; + av_packet_free(&pkt); + } + if (ret < 0) + return ret; + } + } + + return 0; +} + /* open the muxer when all the streams are initialized */ int of_check_init(OutputFile *of) { @@ -339,28 +487,19 @@ int of_check_init(OutputFile *of) if (ret < 0) { av_log(NULL, AV_LOG_ERROR, "Error writing the SDP.\n"); return ret; - } - } - - /* flush the muxing queues */ - for (i = 0; i < fc->nb_streams; i++) { - MuxStream *ms = &of->mux->streams[i]; - OutputStream *ost = output_streams[of->ost_index + i]; - AVPacket *pkt; - - /* try to improve muxing time_base (only possible if nothing has been written yet) */ - if (!av_fifo_can_read(ms->muxing_queue)) - ost->mux_timebase = ost->st->time_base; - - while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) { - ret = submit_packet(of, ost, pkt); - if (pkt) { - ms->muxing_queue_data_size -= pkt->size; - av_packet_free(&pkt); + } else if (ret == 1) { + /* SDP is written only after all the muxers are ready, so now we + * start ALL the threads */ + for (i = 0; i < nb_output_files; i++) { + ret = thread_start(output_files[i]); + if (ret < 0) + return ret; } - if (ret < 0) - return ret; } + } else { + ret = thread_start(of); + if (ret < 0) + return ret; } return 0; @@ -371,7 +510,7 @@ int of_write_trailer(OutputFile *of) AVFormatContext *fc = of->mux->fc; int ret; - if (!of->mux->header_written) { + if (!of->mux->tq) { av_log(NULL, AV_LOG_ERROR, "Nothing was written into output file %d (%s), because " "at least one of its streams received no packets.\n", @@ -379,13 +518,17 @@ int of_write_trailer(OutputFile *of) return AVERROR(EINVAL); } + ret = thread_stop(of); + if (ret < 0) + main_return_code = ret; + ret = av_write_trailer(fc); if (ret < 0) { av_log(NULL, AV_LOG_ERROR, "Error writing trailer of %s: %s\n", fc->url, av_err2str(ret)); return ret; } - of->mux->final_filesize = of_filesize(of); + of->mux->last_filesize = filesize(fc->pb); if (!(of->format->flags & AVFMT_NOFILE)) { ret = avio_closep(&fc->pb); @@ -448,6 +591,8 @@ void of_close(OutputFile **pof) if (!of) return; + thread_stop(of); + sq_free(&of->sq_encode); sq_free(&of->sq_mux); @@ -457,7 +602,8 @@ void of_close(OutputFile **pof) } int of_muxer_init(OutputFile *of, AVFormatContext *fc, - AVDictionary *opts, int64_t limit_filesize) + AVDictionary *opts, int64_t limit_filesize, + int thread_queue_size) { Muxer *mux = av_mallocz(sizeof(*mux)); int ret = 0; @@ -487,6 +633,7 @@ int of_muxer_init(OutputFile *of, AVFormatContext *fc, ms->last_mux_dts = AV_NOPTS_VALUE; } + mux->thread_queue_size = thread_queue_size > 0 ? thread_queue_size : 8; mux->limit_filesize = limit_filesize; mux->opts = opts; @@ -515,25 +662,9 @@ fail: return ret; } -int of_finished(OutputFile *of) -{ - return of_filesize(of) >= of->mux->limit_filesize; -} - int64_t of_filesize(OutputFile *of) { - AVIOContext *pb = of->mux->fc->pb; - int64_t ret = -1; - - if (of->mux->final_filesize) - ret = of->mux->final_filesize; - else if (pb) { - ret = avio_size(pb); - if (ret <= 0) // FIXME improve avio_size() so it works with non seekable output too - ret = avio_tell(pb); - } - - return ret; + return atomic_load(&of->mux->last_filesize); } AVChapter * const * diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c index 7f46238534..807c0263b9 100644 --- a/fftools/ffmpeg_opt.c +++ b/fftools/ffmpeg_opt.c @@ -3055,7 +3055,7 @@ loop_end: of->nb_streams = oc->nb_streams; of->url = filename; - err = of_muxer_init(of, oc, format_opts, o->limit_filesize); + err = of_muxer_init(of, oc, format_opts, o->limit_filesize, o->thread_queue_size); if (err < 0) { av_log(NULL, AV_LOG_FATAL, "Error initializing internal muxing state\n"); exit_program(1); @@ -3841,7 +3841,7 @@ const OptionDef options[] = { { "disposition", OPT_STRING | HAS_ARG | OPT_SPEC | OPT_OUTPUT, { .off = OFFSET(disposition) }, "disposition", "" }, - { "thread_queue_size", HAS_ARG | OPT_INT | OPT_OFFSET | OPT_EXPERT | OPT_INPUT, + { "thread_queue_size", HAS_ARG | OPT_INT | OPT_OFFSET | OPT_EXPERT | OPT_INPUT | OPT_OUTPUT, { .off = OFFSET(thread_queue_size) }, "set the maximum number of queued packets from the demuxer" }, { "find_stream_info", OPT_BOOL | OPT_PERFILE | OPT_INPUT | OPT_EXPERT, { &find_stream_info },