From patchwork Mon Apr 4 11:30:37 2022 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Anton Khirnov X-Patchwork-Id: 35200 Delivered-To: ffmpegpatchwork2@gmail.com Received: by 2002:a05:6a20:c05:b0:7a:e998:b410 with SMTP id bw5csp1916166pzb; Mon, 4 Apr 2022 04:41:33 -0700 (PDT) X-Google-Smtp-Source: ABdhPJxCUC4+1/5NfkP44lsU40V1tgatzenvVn5q3FN9MgXnQcgP2VylEzK7Y6j4e48wUTLGy6yn X-Received: by 2002:a50:ed91:0:b0:419:979a:ac57 with SMTP id h17-20020a50ed91000000b00419979aac57mr32843845edr.206.1649072493245; Mon, 04 Apr 2022 04:41:33 -0700 (PDT) ARC-Seal: i=1; a=rsa-sha256; t=1649072493; cv=none; d=google.com; s=arc-20160816; b=cFigxTF5V1rks3q5BR8rsFvsrChMCFGucQAGlFEvjOV+FFp/hUIE4+FYY3XCSa4x2V 9YmdTLlV1SverNG64W5d83By4otEfb5JFNJNG7KBjWGyuK24AE8+LtYeeOvFzf0Kp1sQ Bq/dy/8pRg4iIszP+d2nYVYxxHNVEleieNsekNktn0I79lYhewSFAZjwRBC4XDx2EZ2v GtrHx+B1m8WSkU9NUngbXwxFLs9HRc9J5syyA3XtpoSLlhVHX7NlKgHUkl3leiXZVapX YZRjg7YubSRmQG0nqUoteoEKMdNYBHa73oh/v4O86nPZFKJ+4I5E4IwaAhy5QFQLgv3y WKQw== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:content-transfer-encoding:reply-to:list-subscribe :list-help:list-post:list-archive:list-unsubscribe:list-id :precedence:subject:mime-version:references:in-reply-to:message-id :date:to:from:delivered-to; bh=qXOzASeDLjhxa7aDUgRwwjR/UkpSWq1IH79oYNM80mA=; b=ovGErGC+q+Yp/X44ZsItzmYrcj8P2fuLpbYMkyeu6yh8iGSwUrvDWBBLjQ88T1CZQM uo4l9GtfvWylEQqHOHihhUayfUJiYD21fGKsBXvZNRoXhxweFvFzmrWLvrvY78mVWSkI vfz6vQaNmEQKKSCr7QVtbXHaSWEayweDAF6QDL0YYhTB2g1dstHQrciIxZqAGcEMysPu QebP2PiifxHdnvOeiCfiTB5CEmCU/KrsLrbTbotjDumFZbWR2tdBPOwqJe2M+UMqwH2V 29RUbOhgZmFtLNOJho18igSkIwUg1jGTJlbRd0gXvRaOtL6UgmZdvsvSb+apvJpjr1KJ epRw== ARC-Authentication-Results: i=1; mx.google.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org. [79.124.17.100]) by mx.google.com with ESMTP id y19-20020a056402441300b00419dbec993fsi9221465eda.374.2022.04.04.04.41.32; Mon, 04 Apr 2022 04:41:33 -0700 (PDT) Received-SPF: pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) client-ip=79.124.17.100; Authentication-Results: mx.google.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 1675D68B3A5; Mon, 4 Apr 2022 14:38:04 +0300 (EEST) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from mail0.khirnov.net (red.khirnov.net [176.97.15.12]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id 8587B68B360 for ; Mon, 4 Apr 2022 14:37:47 +0300 (EEST) Received: from localhost (localhost [IPv6:::1]) by mail0.khirnov.net (Postfix) with ESMTP id 3FFBB240512 for ; Mon, 4 Apr 2022 13:37:47 +0200 (CEST) Received: from mail0.khirnov.net ([IPv6:::1]) by localhost (mail0.khirnov.net [IPv6:::1]) (amavisd-new, port 10024) with ESMTP id wo0K9uLryBqp for ; Mon, 4 Apr 2022 13:37:46 +0200 (CEST) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:2a00:c500:561:201::7]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256 client-signature RSA-PSS (2048 bits) client-digest SHA256) (Client CN "libav.khirnov.net", Issuer "smtp.khirnov.net SMTP CA" (verified OK)) by mail0.khirnov.net (Postfix) with ESMTPS id E72C4240179 for ; Mon, 4 Apr 2022 13:37:41 +0200 (CEST) Received: by libav.khirnov.net (Postfix, from userid 1000) id 56C5C3A0E93; Mon, 4 Apr 2022 13:32:12 +0200 (CEST) From: Anton Khirnov To: ffmpeg-devel@ffmpeg.org Date: Mon, 4 Apr 2022 13:30:37 +0200 Message-Id: <20220404113037.13070-50-anton@khirnov.net> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20220404113037.13070-1-anton@khirnov.net> References: <20220404113037.13070-1-anton@khirnov.net> MIME-Version: 1.0 Subject: [FFmpeg-devel] [PATCH 49/49] fftools/ffmpeg: move each muxer to a separate thread X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" X-TUID: nZc5iy7c10Ih --- fftools/ffmpeg.c | 38 +++------ fftools/ffmpeg.h | 7 +- fftools/ffmpeg_mux.c | 197 +++++++++++++++++++++++++++++++++++-------- 3 files changed, 178 insertions(+), 64 deletions(-) diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c index 9dfbc4216a..8ea27d3422 100644 --- a/fftools/ffmpeg.c +++ b/fftools/ffmpeg.c @@ -1286,10 +1286,7 @@ static void finish_output_stream(OutputStream *ost) OutputFile *of = output_files[ost->file_index]; ost->finished = ENCODER_FINISHED; - if (ost->sq_idx_mux >= 0) - sq_send(of->sq_mux, ost->sq_idx_mux, SQPKT(NULL)); - else - ost->finished |= MUXER_FINISHED; + output_packet(of, ost->pkt, ost, 1); } /** @@ -3421,9 +3418,8 @@ static int need_output(void) for (i = 0; i < nb_output_streams; i++) { OutputStream *ost = output_streams[i]; - OutputFile *of = output_files[ost->file_index]; - if (ost->finished || of_finished(of)) + if (ost->finished) continue; return 1; @@ -4269,26 +4265,6 @@ static int transcode_step(void) return reap_filters(0); } -static void flush_sync_queues_mux(void) -{ - /* mark all queue inputs as done */ - for (int i = 0; i < nb_output_streams; i++) { - OutputStream *ost = output_streams[i]; - OutputFile *of = output_files[ost->file_index]; - if (ost->sq_idx_mux >= 0) - sq_send(of->sq_mux, ost->sq_idx_mux, SQPKT(NULL)); - } - - /* encode all packets remaining in the sync queues */ - for (int i = 0; i < nb_output_streams; i++) { - OutputStream *ost = output_streams[i]; - OutputFile *of = output_files[ost->file_index]; - - if (!(ost->finished & MUXER_FINISHED)) - output_packet(of, ost->pkt, ost, 1); - } -} - /* * The following code is the main loop of the file converter */ @@ -4310,6 +4286,12 @@ static int transcode(void) timer_start = av_gettime_relative(); + for (i = 0; i < nb_output_files; i++) { + ret = of_thread_start(output_files[i]); + if (ret < 0) + goto fail; + } + if ((ret = init_input_threads()) < 0) goto fail; @@ -4346,7 +4328,9 @@ static int transcode(void) } } flush_encoders(); - flush_sync_queues_mux(); + + for (i = 0; i < nb_output_files; i++) + of_thread_stop(output_files[i]); term_exit(); diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index 407342462f..c4a5c2a0a2 100644 --- a/fftools/ffmpeg.h +++ b/fftools/ffmpeg.h @@ -583,6 +583,8 @@ typedef struct OutputFile { const AVOutputFormat *format; const char *url; + AVThreadMessageQueue *mux_queue; + SyncQueue *sq_encode; SyncQueue *sq_mux; @@ -697,11 +699,14 @@ int hwaccel_decode_init(AVCodecContext *avctx); int of_muxer_init(OutputFile *of, AVFormatContext *fc, AVDictionary *opts, int64_t limit_filesize); + +int of_thread_start(OutputFile *of); +void of_thread_stop(OutputFile *of); + int of_write_trailer(OutputFile *of); void of_close(OutputFile **pof); int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost, int eof); -int of_finished(OutputFile *of); int64_t of_filesize(OutputFile *of); AVChapter * const * of_get_chapters(OutputFile *of, unsigned int *nb_chapters); diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c index 6ca9a51dd6..f99dd5ec3e 100644 --- a/fftools/ffmpeg_mux.c +++ b/fftools/ffmpeg_mux.c @@ -16,17 +16,20 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ +#include #include #include #include "ffmpeg.h" #include "sync_queue.h" +#include "thread_queue.h" #include "libavutil/fifo.h" #include "libavutil/intreadwrite.h" #include "libavutil/log.h" #include "libavutil/mem.h" #include "libavutil/timestamp.h" +#include "libavutil/thread.h" #include "libavcodec/packet.h" @@ -46,18 +49,24 @@ typedef struct MuxStream { /* dts of the last packet sent to the muxer, in the stream timebase * used for making up missing dts values */ int64_t last_mux_dts; + + /* data (a real or a flush packet) was received for this stream */ + int got_data; } MuxStream; struct Muxer { AVFormatContext *fc; + pthread_t thread; + ThreadQueue *tq; + MuxStream *streams; AVDictionary *opts; /* filesize limit expressed in bytes */ int64_t limit_filesize; - int64_t final_filesize; + atomic_int_least64_t last_filesize; int header_written; }; @@ -221,13 +230,32 @@ static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) return 0; } +static int64_t filesize(AVIOContext *pb) +{ + int64_t ret = -1; + + if (pb) { + ret = avio_size(pb); + if (ret <= 0) // FIXME improve avio_size() so it works with non seekable output too + ret = avio_tell(pb); + } + + return ret; +} + static int write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) { MuxStream *ms = &of->mux->streams[ost->index]; AVFormatContext *s = of->mux->fc; AVStream *st = ost->st; + int64_t fs; int ret; + fs = filesize(s->pb); + atomic_store(&of->mux->last_filesize, fs); + if (fs >= of->mux->limit_filesize) + return AVERROR_EOF; + if ((st->codecpar->codec_type == AVMEDIA_TYPE_VIDEO && video_sync_method == VSYNC_DROP) || (st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO && audio_sync_method < 0)) pkt->pts = pkt->dts = AV_NOPTS_VALUE; @@ -333,8 +361,8 @@ static int check_write_header(OutputFile *of) int ret, i; for (i = 0; i < fc->nb_streams; i++) { - OutputStream *ost = output_streams[of->ost_index + i]; - if (!ost->initialized) + MuxStream *ms = &of->mux->streams[i]; + if (!ms->got_data) return 0; } @@ -378,12 +406,15 @@ static int check_write_header(OutputFile *of) return 0; } -int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost, int eof) +static int sync_queue_process(OutputFile *of, OutputStream *ost, AVPacket *pkt) { + Muxer *mux = of->mux; + MuxStream *ms = &mux->streams[ost->index]; int ret; - if (!of->mux->header_written) { - ret = check_write_header(of); + ms->got_data = 1; + if (!mux->header_written) { + ret = check_write_header(of); if (ret < 0) { av_packet_unref(pkt); return ret; @@ -391,34 +422,102 @@ int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost, int eof) } if (ost->sq_idx_mux >= 0) { - ret = sq_send(of->sq_mux, ost->sq_idx_mux, - SQPKT(eof ? NULL: pkt)); + int ret = sq_send(of->sq_mux, ost->sq_idx_mux, SQPKT(pkt)); if (ret < 0) { - av_packet_unref(pkt); - if (ret == AVERROR_EOF) { - ost->finished |= MUXER_FINISHED; - return 0; - } else - return ret; + if (pkt) + av_packet_unref(pkt); + return ret; } while (1) { + pkt = av_packet_alloc(); + if (!pkt) + // XXX + abort(); + ret = sq_receive(of->sq_mux, -1, SQPKT(pkt)); - if (ret == AVERROR_EOF || ret == AVERROR(EAGAIN)) - return 0; - else if (ret < 0) - return ret; + if (ret < 0) { + av_packet_free(&pkt); + return (ret == AVERROR_EOF || ret == AVERROR(EAGAIN)) ? 0 : ret; + } ret = submit_packet(of, pkt, output_streams[of->ost_index + ret]); + av_packet_free(&pkt); if (ret < 0) return ret; } - } else if (!eof) + } else if (pkt) return submit_packet(of, pkt, ost); return 0; } +static void *muxer_thread(void *arg) +{ + OutputFile *of = arg; + Muxer *mux = of->mux; + + while (1) { + OutputStream *ost; + AVPacket *pkt = NULL; + int stream_idx, ret; + + ret = tq_receive(mux->tq, &stream_idx, &pkt); + if (stream_idx < 0) { + av_log(NULL, AV_LOG_DEBUG, + "All streams finished for output file #%d\n", of->index); + break; + } + + ost = output_streams[of->ost_index + stream_idx]; + ret = sync_queue_process(of, ost, ret < 0 ? NULL : pkt); + av_packet_free(&pkt); + if (ret == AVERROR_EOF) + tq_receive_finish(mux->tq, stream_idx); + else if (ret < 0) { + av_log(NULL, AV_LOG_ERROR, + "Error muxing a packet for output file #%d\n", of->index); + break; + } + } + + for (unsigned int i = 0; i < mux->fc->nb_streams; i++) { + sync_queue_process(of, output_streams[of->ost_index], NULL); + tq_receive_finish(mux->tq, i); + } + + av_log(NULL, AV_LOG_DEBUG, "Terminating muxer thread %d\n", of->index); + + return NULL; +} + +int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost, int eof) +{ + AVPacket *pkt1; + int ret = 0; + + if (eof) { + tq_send_finish(of->mux->tq, ost->index); + return 0; + } + + pkt1 = av_packet_alloc(); + if (!pkt1) { + av_packet_unref(pkt); + return AVERROR(ENOMEM); + } + + av_packet_move_ref(pkt1, pkt); + + ret = tq_send(of->mux->tq, ost->index, &pkt1); + if (ret < 0) { + av_packet_free(&pkt1); + ost->finished |= MUXER_FINISHED; + } + + return ret == AVERROR_EOF ? 0 : ret; +} + int of_write_trailer(OutputFile *of) { AVFormatContext *fc = of->mux->fc; @@ -438,7 +537,7 @@ int of_write_trailer(OutputFile *of) return ret; } - of->mux->final_filesize = of_filesize(of); + of->mux->last_filesize = filesize(fc->pb); if (!(of->format->flags & AVFMT_NOFILE)) { ret = avio_closep(&fc->pb); @@ -487,6 +586,9 @@ static void mux_free(Muxer **pmux) av_freep(&mux->streams); av_dict_free(&mux->opts); + if (mux->tq) { + } + fc_close(&mux->fc); av_freep(pmux); @@ -558,30 +660,53 @@ fail: return ret; } -int of_finished(OutputFile *of) +int64_t of_filesize(OutputFile *of) { - return of_filesize(of) >= of->mux->limit_filesize; + return atomic_load(&of->mux->last_filesize); } -int64_t of_filesize(OutputFile *of) +AVChapter * const * +of_get_chapters(OutputFile *of, unsigned int *nb_chapters) { - AVIOContext *pb = of->mux->fc->pb; - int64_t ret = -1; + *nb_chapters = of->mux->fc->nb_chapters; + return of->mux->fc->chapters; +} - if (of->mux->final_filesize) - ret = of->mux->final_filesize; - else if (pb) { - ret = avio_size(pb); - if (ret <= 0) // FIXME improve avio_size() so it works with non seekable output too - ret = avio_tell(pb); +static void pkt_free(void *pkt) +{ + av_packet_free((AVPacket**)&pkt); +} + +int of_thread_start(OutputFile *of) +{ + Muxer *mux = of->mux; + int ret; + + mux->tq = tq_alloc(mux->fc->nb_streams, 8, sizeof(AVPacket*), + pkt_free); + if (!mux->tq) + return AVERROR(ENOMEM); + + ret = pthread_create(&mux->thread, NULL, muxer_thread, (void*)of); + if (ret) { + tq_free(&mux->tq); + return AVERROR(ret); } - return ret; + return 0; } -AVChapter * const * -of_get_chapters(OutputFile *of, unsigned int *nb_chapters) +void of_thread_stop(OutputFile *of) { - *nb_chapters = of->mux->fc->nb_chapters; - return of->mux->fc->chapters; + Muxer *mux = of->mux; + + if (!mux || !mux->tq) + return; + + for (unsigned int i = 0; i < mux->fc->nb_streams; i++) + tq_send_finish(mux->tq, i); + + pthread_join(mux->thread, NULL); + + tq_free(&mux->tq); }