From patchwork Mon Apr 4 11:30:22 2022 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Anton Khirnov X-Patchwork-Id: 35221 Delivered-To: andriy.gelman@gmail.com Received: by 2002:a02:aa92:0:0:0:0:0 with SMTP id u18csp3587286jai; Mon, 4 Apr 2022 04:38:56 -0700 (PDT) X-Google-Smtp-Source: ABdhPJzjOEH1ljKpPa8NQua34xiuB4nZXJUZeyNY2/d6qvGpxYVW5j38iM5i63mhd2t25iOzAaH7 X-Received: by 2002:a50:ce03:0:b0:41c:c36b:c75 with SMTP id y3-20020a50ce03000000b0041cc36b0c75mr5555172edi.195.1649072336471; Mon, 04 Apr 2022 04:38:56 -0700 (PDT) ARC-Seal: i=1; a=rsa-sha256; t=1649072336; cv=none; d=google.com; s=arc-20160816; b=f1TfD7UMxQietEWH4KR6W5rlfCu+cGTsQM758WKTUY9AwKZGuW9vXzzMKDMfn/oGx6 5L7DFXVmYKkwIeUnhDn/HooeMcgqZFU+UnNMqV3QzfaGCrFhawYC+p9QuY0iJ4GQrZdJ 6gOmg15jJchGYqYdYFATpwQMV3NaSwt8ftZ1fOg0maArKdjsmh1Znjc27ch05WYP9XPQ Y+Yf2IM5Gb+9sjb4KmH33YDqGunN5se0oc/bDl4mc8BXPn55iY+KL1neoBjT+lT4j/pr srzXQi/41EO6FN/uaj5DsS4JXULoDa6Nf/6FdZ+3Wg7F8rvIqYdPdFPmUSbpw3Czx1/F Vrsg== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:content-transfer-encoding:reply-to:list-subscribe :list-help:list-post:list-archive:list-unsubscribe:list-id :precedence:subject:mime-version:references:in-reply-to:message-id :date:to:from:delivered-to; bh=VuLHu65CmZ9LK8iuTXwoTAT/d3MYpgxY/Dk9jMgm2mU=; b=KwsryJ7KnQQxk3a4jPkxi0D806BfLfKKESs7bD0XQupQ5unC0qpJR3FM0sOxGYS5To 2CLgtjxTVb/XGP0+rDyrpaPvEdG21owyLthYAhBmmVIaUKzfHrESDBrpgFtP6E6TC4Zz 4Eh/ko2bxLzPraTTm/Ss3vDRmzjkF7VJ6g1dyFvQVzHeucJ2bbmQ8l2DhKWv//nRIKo0 eTs/2AaJb9QMtpuPOelhCUdMb4csBxautOLnJOh3OdnHaG4Jc++67ZqNqxHKCbfqayJY 1hz6mlNnTPjgHy7A+YyzuWPAolMUoKb+5/2gcgFbw+9LBTiQNsYsag0KXkk8FslMqbbE b1Nw== ARC-Authentication-Results: i=1; mx.google.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org. [79.124.17.100]) by mx.google.com with ESMTP id hk19-20020a170906c9d300b006dffb6427bdsi5967621ejb.269.2022.04.04.04.38.56; Mon, 04 Apr 2022 04:38:56 -0700 (PDT) Received-SPF: pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) client-ip=79.124.17.100; Authentication-Results: mx.google.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 33BED68B32A; Mon, 4 Apr 2022 14:37:50 +0300 (EEST) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from mail0.khirnov.net (red.khirnov.net [176.97.15.12]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id CC53268B2B7 for ; Mon, 4 Apr 2022 14:37:44 +0300 (EEST) Received: from localhost (localhost [IPv6:::1]) by mail0.khirnov.net (Postfix) with ESMTP id 87CD624017E for ; Mon, 4 Apr 2022 13:37:44 +0200 (CEST) Received: from mail0.khirnov.net ([IPv6:::1]) by localhost (mail0.khirnov.net [IPv6:::1]) (amavisd-new, port 10024) with ESMTP id ZLeZ_tclA4Ey for ; Mon, 4 Apr 2022 13:37:42 +0200 (CEST) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:2a00:c500:561:201::7]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256 client-signature RSA-PSS (2048 bits) client-digest SHA256) (Client CN "libav.khirnov.net", Issuer "smtp.khirnov.net SMTP CA" (verified OK)) by mail0.khirnov.net (Postfix) with ESMTPS id 3F5242404FE for ; Mon, 4 Apr 2022 13:37:41 +0200 (CEST) Received: by libav.khirnov.net (Postfix, from userid 1000) id 20B793A0D60; Mon, 4 Apr 2022 13:32:12 +0200 (CEST) From: Anton Khirnov To: ffmpeg-devel@ffmpeg.org Date: Mon, 4 Apr 2022 13:30:22 +0200 Message-Id: <20220404113037.13070-35-anton@khirnov.net> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20220404113037.13070-1-anton@khirnov.net> References: <20220404113037.13070-1-anton@khirnov.net> MIME-Version: 1.0 Subject: [FFmpeg-devel] [PATCH 34/49] fftools/ffmpeg: rework -shortest implementation X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" X-TUID: hx6SwXN8bTEf Content-Length: 38616 The -shortest option (which finishes the output file at the time the shortest stream ends) is currently implemented by faking the -t option when an output stream ends. This approach is fragile, since it depends on the frames/packets being processed in a specific order. E.g. there are currently some situations in which the output file length will depend unpredictably on unrelated factors like encoder delay. More importantly, the present work aiming at splitting various ffmpeg components into different threads will make this approach completely unworkable, since the frames/packets will arrive in effectively random order. This commit introduces a "sync queue", which is essentially a collection of FIFOs, one per stream. Frames/packets are submitted to these FIFOs and are then released for further processing (encoding or muxing) when it is ensured that the frame in question will not make its stream get ahead of the other streams (the logic is similar to libavformat's interleaving queue). These sync queues are then used for encoding and/or muxing when the -shortest option is specifierd. This commit changes the results of the copy-shortest[12] tests, where the last audio frame is now gone. This is correct, since it actually outlasts the last video frame. --- fftools/Makefile | 1 + fftools/ffmpeg.c | 144 +++++++++++--- fftools/ffmpeg.h | 14 +- fftools/ffmpeg_mux.c | 88 ++++++--- fftools/ffmpeg_opt.c | 80 ++++++++ fftools/sync_queue.c | 346 ++++++++++++++++++++++++++++++++++ fftools/sync_queue.h | 93 +++++++++ tests/ref/fate/copy-shortest1 | 1 - tests/ref/fate/copy-shortest2 | 1 - 9 files changed, 712 insertions(+), 56 deletions(-) create mode 100644 fftools/sync_queue.c create mode 100644 fftools/sync_queue.h diff --git a/fftools/Makefile b/fftools/Makefile index 81ad6c4f4f..f015df6846 100644 --- a/fftools/Makefile +++ b/fftools/Makefile @@ -14,6 +14,7 @@ OBJS-ffmpeg += \ fftools/ffmpeg_hw.o \ fftools/ffmpeg_mux.o \ fftools/ffmpeg_opt.o \ + fftools/sync_queue.o \ define DOFFTOOL OBJS-$(1) += fftools/cmdutils.o fftools/opt_common.o fftools/$(1).o $(OBJS-$(1)-yes) diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c index 07d1fc8a5d..8674f52047 100644 --- a/fftools/ffmpeg.c +++ b/fftools/ffmpeg.c @@ -104,6 +104,7 @@ #include "ffmpeg.h" #include "cmdutils.h" +#include "sync_queue.h" #include "libavutil/avassert.h" @@ -570,6 +571,7 @@ static void ffmpeg_cleanup(int ret) av_bsf_free(&ost->bsf_ctx); av_frame_free(&ost->filtered_frame); + av_frame_free(&ost->sq_frame); av_frame_free(&ost->last_frame); av_packet_free(&ost->pkt); av_dict_free(&ost->encoder_opts); @@ -690,13 +692,10 @@ static void update_benchmark(const char *fmt, ...) static void close_output_stream(OutputStream *ost) { OutputFile *of = output_files[ost->file_index]; - AVRational time_base = ost->stream_copy ? ost->mux_timebase : ost->enc_ctx->time_base; - ost->finished |= ENCODER_FINISHED; - if (of->shortest) { - int64_t end = av_rescale_q(ost->sync_opts - ost->first_pts, time_base, AV_TIME_BASE_Q); - of->recording_time = FFMIN(of->recording_time, end); - } + + if (ost->sq_idx_encode >= 0) + sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL)); } /* @@ -715,17 +714,22 @@ static void output_packet(OutputFile *of, AVPacket *pkt, { int ret = 0; + if (!eof && pkt->dts != AV_NOPTS_VALUE) + ost->last_mux_dts = av_rescale_q(pkt->dts, ost->mux_timebase, AV_TIME_BASE_Q); + /* apply the output bitstream filters */ if (ost->bsf_ctx) { ret = av_bsf_send_packet(ost->bsf_ctx, eof ? NULL : pkt); if (ret < 0) goto finish; while ((ret = av_bsf_receive_packet(ost->bsf_ctx, pkt)) >= 0) - of_submit_packet(of, pkt, ost); + of_submit_packet(of, pkt, ost, 0); + if (ret == AVERROR_EOF) + of_submit_packet(of, pkt, ost, 1); if (ret == AVERROR(EAGAIN)) ret = 0; - } else if (!eof) - of_submit_packet(of, pkt, ost); + } else + of_submit_packet(of, pkt, ost, eof); finish: if (ret < 0 && ret != AVERROR_EOF) { @@ -895,6 +899,52 @@ static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame) av_assert0(0); } +static int submit_encode_frame(OutputFile *of, OutputStream *ost, + AVFrame *frame) +{ + int ret; + + if (ost->sq_idx_encode < 0) + return encode_frame(of, ost, frame); + + if (frame) { + ret = av_frame_ref(ost->sq_frame, frame); + if (ret < 0) + return ret; + frame = ost->sq_frame; + } + + ret = sq_send(of->sq_encode, ost->sq_idx_encode, + SQFRAME(frame)); + if (ret < 0) { + if (frame) + av_frame_unref(frame); + if (ret != AVERROR_EOF) + return ret; + } + + while (1) { + AVFrame *enc_frame = ost->sq_frame; + + ret = sq_receive(of->sq_encode, ost->sq_idx_encode, + SQFRAME(enc_frame)); + if (ret == AVERROR_EOF) { + enc_frame = NULL; + } else if (ret < 0) { + return (ret == AVERROR(EAGAIN)) ? 0 : ret; + } + + ret = encode_frame(of, ost, enc_frame); + if (enc_frame) + av_frame_unref(enc_frame); + if (ret < 0) { + if (ret == AVERROR_EOF) + close_output_stream(ost); + return ret; + } + } +} + static void do_audio_out(OutputFile *of, OutputStream *ost, AVFrame *frame) { @@ -910,8 +960,8 @@ static void do_audio_out(OutputFile *of, OutputStream *ost, ost->sync_opts = frame->pts + frame->nb_samples; ost->samples_encoded += frame->nb_samples; - ret = encode_frame(of, ost, frame); - if (ret < 0) + ret = submit_encode_frame(of, ost, frame); + if (ret < 0 && ret != AVERROR_EOF) exit_program(1); } @@ -1197,8 +1247,8 @@ static void do_video_out(OutputFile *of, av_log(NULL, AV_LOG_DEBUG, "Forced keyframe at time %f\n", pts_time); } - ret = encode_frame(of, ost, in_picture); - if (ret < 0) + ret = submit_encode_frame(of, ost, in_picture); + if (ret < 0 && ret != AVERROR_EOF) exit_program(1); // Make sure Closed Captions will not be duplicated @@ -1216,14 +1266,12 @@ static void do_video_out(OutputFile *of, static void finish_output_stream(OutputStream *ost) { OutputFile *of = output_files[ost->file_index]; - AVRational time_base = ost->stream_copy ? ost->mux_timebase : ost->enc_ctx->time_base; - - ost->finished = ENCODER_FINISHED | MUXER_FINISHED; + ost->finished = ENCODER_FINISHED; - if (of->shortest) { - int64_t end = av_rescale_q(ost->sync_opts - ost->first_pts, time_base, AV_TIME_BASE_Q); - of->recording_time = FFMIN(of->recording_time, end); - } + if (ost->sq_idx_mux >= 0) + sq_send(of->sq_mux, ost->sq_idx_mux, SQPKT(NULL)); + else + ost->finished |= MUXER_FINISHED; } /** @@ -1281,6 +1329,12 @@ static int reap_filters(int flush) continue; } + if (filtered_frame->pts != AV_NOPTS_VALUE) { + AVRational tb = av_buffersink_get_time_base(filter); + ost->last_filter_pts = av_rescale_q(filtered_frame->pts, tb, + AV_TIME_BASE_Q); + } + switch (av_buffersink_get_type(filter)) { case AVMEDIA_TYPE_VIDEO: if (!ost->frame_aspect_ratio.num) @@ -1696,7 +1750,7 @@ static void flush_encoders(void) if (enc->codec_type != AVMEDIA_TYPE_VIDEO && enc->codec_type != AVMEDIA_TYPE_AUDIO) continue; - ret = encode_frame(of, ost, NULL); + ret = submit_encode_frame(of, ost, NULL); if (ret != AVERROR_EOF) exit_program(1); } @@ -3006,6 +3060,9 @@ static int init_output_stream_encode(OutputStream *ost, AVFrame *frame) break; } + if (ost->sq_idx_encode >= 0) + sq_set_tb(of->sq_encode, ost->sq_idx_encode, enc_ctx->time_base); + ost->mux_timebase = enc_ctx->time_base; return 0; @@ -3014,6 +3071,7 @@ static int init_output_stream_encode(OutputStream *ost, AVFrame *frame) static int init_output_stream(OutputStream *ost, AVFrame *frame, char *error, int error_len) { + OutputFile *of = output_files[ost->file_index]; int ret = 0; if (ost->encoding_needed) { @@ -3146,6 +3204,9 @@ static int init_output_stream(OutputStream *ost, AVFrame *frame, if (ret < 0) return ret; + if (ost->sq_idx_mux >= 0) + sq_set_tb(of->sq_mux, ost->sq_idx_mux, ost->mux_timebase); + ost->initialized = 1; ret = of_check_init(output_files[ost->file_index]); @@ -3377,13 +3438,19 @@ static OutputStream *choose_output(void) for (i = 0; i < nb_output_streams; i++) { OutputStream *ost = output_streams[i]; - int64_t opts = ost->last_mux_dts == AV_NOPTS_VALUE ? INT64_MIN : - av_rescale_q(ost->last_mux_dts, ost->st->time_base, - AV_TIME_BASE_Q); - if (ost->last_mux_dts == AV_NOPTS_VALUE) - av_log(NULL, AV_LOG_DEBUG, - "cur_dts is invalid st:%d (%d) [init:%d i_done:%d finish:%d] (this is harmless if it occurs once at the start per stream)\n", - ost->st->index, ost->st->id, ost->initialized, ost->inputs_done, ost->finished); + int64_t opts; + + if (ost->filter) { + opts = ost->last_filter_pts == AV_NOPTS_VALUE ? + INT64_MIN : ost->last_filter_pts; + } else { + opts = ost->last_mux_dts == AV_NOPTS_VALUE ? + INT64_MIN : ost->last_mux_dts; + if (ost->last_mux_dts == AV_NOPTS_VALUE) + av_log(NULL, AV_LOG_DEBUG, + "cur_dts is invalid st:%d (%d) [init:%d i_done:%d finish:%d] (this is harmless if it occurs once at the start per stream)\n", + ost->st->index, ost->st->id, ost->initialized, ost->inputs_done, ost->finished); + } if (!ost->initialized && !ost->inputs_done) return ost->unavailable ? NULL : ost; @@ -4205,6 +4272,26 @@ static int transcode_step(void) return reap_filters(0); } +static void flush_sync_queues_mux(void) +{ + /* mark all queue inputs as done */ + for (int i = 0; i < nb_output_streams; i++) { + OutputStream *ost = output_streams[i]; + OutputFile *of = output_files[ost->file_index]; + if (ost->sq_idx_mux >= 0) + sq_send(of->sq_mux, ost->sq_idx_mux, SQPKT(NULL)); + } + + /* encode all packets remaining in the sync queues */ + for (int i = 0; i < nb_output_streams; i++) { + OutputStream *ost = output_streams[i]; + OutputFile *of = output_files[ost->file_index]; + + if (!(ost->finished & MUXER_FINISHED)) + output_packet(of, ost->pkt, ost, 1); + } +} + /* * The following code is the main loop of the file converter */ @@ -4266,6 +4353,7 @@ static int transcode(void) } } flush_encoders(); + flush_sync_queues_mux(); term_exit(); diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index 604f0145e3..dfda6ccbe9 100644 --- a/fftools/ffmpeg.h +++ b/fftools/ffmpeg.h @@ -26,6 +26,7 @@ #include #include "cmdutils.h" +#include "sync_queue.h" #include "libavformat/avformat.h" #include "libavformat/avio.h" @@ -464,8 +465,10 @@ typedef struct OutputStream { /* pts of the first frame encoded for this stream, used for limiting * recording time */ int64_t first_pts; - /* dts of the last packet sent to the muxer */ + /* dts of the last packet sent to the muxing queue, in AV_TIME_BASE_Q */ int64_t last_mux_dts; + /* pts of the last frame received from the filters, in AV_TIME_BASE_Q */ + int64_t last_filter_pts; // the timebase of the packets sent to the muxer AVRational mux_timebase; AVRational enc_timebase; @@ -478,6 +481,7 @@ typedef struct OutputStream { int64_t max_frames; AVFrame *filtered_frame; AVFrame *last_frame; + AVFrame *sq_frame; AVPacket *pkt; int last_dropped; int last_nb0_frames[3]; @@ -566,6 +570,9 @@ typedef struct OutputStream { /* frame encode sum of squared error values */ int64_t error[4]; + + int sq_idx_encode; + int sq_idx_mux; } OutputStream; typedef struct Muxer Muxer; @@ -576,6 +583,9 @@ typedef struct OutputFile { Muxer *mux; const AVOutputFormat *format; + SyncQueue *sq_encode; + SyncQueue *sq_mux; + AVFormatContext *ctx; int ost_index; /* index of the first stream in output_streams */ int64_t recording_time; ///< desired length of the resulting file in microseconds == AV_TIME_BASE units @@ -691,7 +701,7 @@ int of_check_init(OutputFile *of); int of_write_trailer(OutputFile *of); void of_close(OutputFile **pof); -void of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost); +void of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost, int eof); int of_finished(OutputFile *of); int64_t of_filesize(OutputFile *of); AVChapter * const * diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c index ccfe31e09a..69f01448b8 100644 --- a/fftools/ffmpeg_mux.c +++ b/fftools/ffmpeg_mux.c @@ -20,6 +20,7 @@ #include #include "ffmpeg.h" +#include "sync_queue.h" #include "libavutil/fifo.h" #include "libavutil/intreadwrite.h" @@ -41,6 +42,10 @@ typedef struct MuxStream { * Updated when a packet is either pushed or pulled from the queue. */ size_t muxing_queue_data_size; + + /* dts of the last packet sent to the muxer, in the stream timebase + * used for making up missing dts values */ + int64_t last_mux_dts; } MuxStream; struct Muxer { @@ -161,6 +166,7 @@ static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) static void write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) { + MuxStream *ms = &of->mux->streams[ost->index]; AVFormatContext *s = of->ctx; AVStream *st = ost->st; int ret; @@ -188,21 +194,21 @@ static void write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) pkt->dts, pkt->pts, ost->file_index, ost->st->index); pkt->pts = - pkt->dts = pkt->pts + pkt->dts + ost->last_mux_dts + 1 - - FFMIN3(pkt->pts, pkt->dts, ost->last_mux_dts + 1) - - FFMAX3(pkt->pts, pkt->dts, ost->last_mux_dts + 1); + pkt->dts = pkt->pts + pkt->dts + ms->last_mux_dts + 1 + - FFMIN3(pkt->pts, pkt->dts, ms->last_mux_dts + 1) + - FFMAX3(pkt->pts, pkt->dts, ms->last_mux_dts + 1); } if ((st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO || st->codecpar->codec_type == AVMEDIA_TYPE_VIDEO || st->codecpar->codec_type == AVMEDIA_TYPE_SUBTITLE) && pkt->dts != AV_NOPTS_VALUE && - ost->last_mux_dts != AV_NOPTS_VALUE) { - int64_t max = ost->last_mux_dts + !(s->oformat->flags & AVFMT_TS_NONSTRICT); + ms->last_mux_dts != AV_NOPTS_VALUE) { + int64_t max = ms->last_mux_dts + !(s->oformat->flags & AVFMT_TS_NONSTRICT); if (pkt->dts < max) { int loglevel = max - pkt->dts > 2 || st->codecpar->codec_type == AVMEDIA_TYPE_VIDEO ? AV_LOG_WARNING : AV_LOG_DEBUG; if (exit_on_error) loglevel = AV_LOG_ERROR; av_log(s, loglevel, "Non-monotonous DTS in output stream " "%d:%d; previous: %"PRId64", current: %"PRId64"; ", - ost->file_index, ost->st->index, ost->last_mux_dts, pkt->dts); + ost->file_index, ost->st->index, ms->last_mux_dts, pkt->dts); if (exit_on_error) { av_log(NULL, AV_LOG_FATAL, "aborting.\n"); exit_program(1); @@ -216,7 +222,7 @@ static void write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) } } } - ost->last_mux_dts = pkt->dts; + ms->last_mux_dts = pkt->dts; ost->data_size += pkt->size; ost->packets_written++; @@ -245,26 +251,10 @@ static void write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) } } -void of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost) +static void submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost) { - AVStream *st = ost->st; int ret; - /* - * Audio encoders may split the packets -- #frames in != #packets out. - * But there is no reordering, so we can limit the number of output packets - * by simply dropping them here. - * Counting encoded video frames needs to be done separately because of - * reordering, see do_video_out(). - */ - if (!(st->codecpar->codec_type == AVMEDIA_TYPE_VIDEO && ost->encoding_needed)) { - if (ost->frame_number >= ost->max_frames) { - av_packet_unref(pkt); - return; - } - ost->frame_number++; - } - if (of->mux->header_written) { write_packet(of, ost, pkt); } else { @@ -277,6 +267,52 @@ void of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost) } } +void of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost, int eof) +{ + AVStream *st = ost->st; + + if (!eof) { + /* + * Audio encoders may split the packets -- #frames in != #packets out. + * But there is no reordering, so we can limit the number of output packets + * by simply dropping them here. + * Counting encoded video frames needs to be done separately because of + * reordering, see do_video_out(). + */ + if (!(st->codecpar->codec_type == AVMEDIA_TYPE_VIDEO && ost->encoding_needed)) { + if (ost->frame_number >= ost->max_frames) { + av_packet_unref(pkt); + return; + } + ost->frame_number++; + } + } + + if (ost->sq_idx_mux >= 0) { + int ret = sq_send(of->sq_mux, ost->sq_idx_mux, + SQPKT(eof ? NULL: pkt)); + if (ret < 0) { + av_packet_unref(pkt); + if (ret == AVERROR_EOF) { + ost->finished |= MUXER_FINISHED; + return; + } else + exit_program(1); + } + + while (1) { + ret = sq_receive(of->sq_mux, -1, SQPKT(pkt)); + if (ret == AVERROR_EOF || ret == AVERROR(EAGAIN)) + return; + else if (ret < 0) + exit_program(1); + + submit_packet(of, pkt, output_streams[of->ost_index + ret]); + } + } else if (!eof) + submit_packet(of, pkt, ost); +} + static int print_sdp(void) { char sdp[16384]; @@ -447,6 +483,9 @@ void of_close(OutputFile **pof) if (!of) return; + sq_free(&of->sq_encode); + sq_free(&of->sq_mux); + s = of->ctx; mux_free(&of->mux, s ? s->nb_streams : 0); @@ -481,6 +520,7 @@ int of_muxer_init(OutputFile *of, AVDictionary *opts, int64_t limit_filesize) ret = AVERROR(ENOMEM); goto fail; } + ms->last_mux_dts = AV_NOPTS_VALUE; } mux->limit_filesize = limit_filesize; diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c index adad46de5f..8884a5f9ed 100644 --- a/fftools/ffmpeg_opt.c +++ b/fftools/ffmpeg_opt.c @@ -30,6 +30,7 @@ #include "ffmpeg.h" #include "cmdutils.h" #include "opt_common.h" +#include "sync_queue.h" #include "libavformat/avformat.h" @@ -1640,6 +1641,7 @@ static OutputStream *new_output_stream(OptionsContext *o, AVFormatContext *oc, e input_streams[source_index]->st->discard = input_streams[source_index]->user_set_discard; } ost->last_mux_dts = AV_NOPTS_VALUE; + ost->last_filter_pts = AV_NOPTS_VALUE; return ost; } @@ -2299,6 +2301,78 @@ static int init_complex_filters(void) return 0; } +static int setup_sync_queues(OutputFile *of, AVFormatContext *oc) +{ + int nb_av_enc = 0, nb_interleaved = 0; + +#define IS_AV_ENC(ost, type) \ + (ost->encoding_needed && (type == AVMEDIA_TYPE_VIDEO || type == AVMEDIA_TYPE_AUDIO)) +#define IS_INTERLEAVED(type) (type != AVMEDIA_TYPE_ATTACHMENT) + + for (int i = 0; i < oc->nb_streams; i++) { + OutputStream *ost = output_streams[of->ost_index + i]; + enum AVMediaType type = ost->st->codecpar->codec_type; + + ost->sq_idx_encode = -1; + ost->sq_idx_mux = -1; + + nb_interleaved += IS_INTERLEAVED(type); + nb_av_enc += IS_AV_ENC(ost, type); + } + + if (!(nb_interleaved > 1 && of->shortest)) + return 0; + + /* if we have more than one encoded audio/video streams, then we + * synchronize them before encoding */ + if (nb_av_enc > 1) { + of->sq_encode = sq_alloc(SYNC_QUEUE_FRAMES); + if (!of->sq_encode) + return AVERROR(ENOMEM); + + for (int i = 0; i < oc->nb_streams; i++) { + OutputStream *ost = output_streams[of->ost_index + i]; + enum AVMediaType type = ost->st->codecpar->codec_type; + + if (!IS_AV_ENC(ost, type)) + continue; + + ost->sq_idx_encode = sq_add_stream(of->sq_encode); + if (ost->sq_idx_encode < 0) + return ost->sq_idx_encode; + + ost->sq_frame = av_frame_alloc(); + if (!ost->sq_frame) + return AVERROR(ENOMEM); + } + } + + /* if there any additional interleaved streams, then ALL the streams + * are also synchronized before sending them to the muxer */ + if (nb_interleaved > nb_av_enc) { + of->sq_mux = sq_alloc(SYNC_QUEUE_PACKETS); + if (!of->sq_mux) + return AVERROR(ENOMEM); + + for (int i = 0; i < oc->nb_streams; i++) { + OutputStream *ost = output_streams[of->ost_index + i]; + enum AVMediaType type = ost->st->codecpar->codec_type; + + if (!IS_INTERLEAVED(type)) + continue; + + ost->sq_idx_mux = sq_add_stream(of->sq_mux); + if (ost->sq_idx_mux < 0) + return ost->sq_idx_mux; + } + } + +#undef IS_AV_ENC +#undef IS_INTERLEAVED + + return 0; +} + static int open_output_file(OptionsContext *o, const char *filename) { AVFormatContext *oc; @@ -2936,6 +3010,12 @@ loop_end: exit_program(1); } + err = setup_sync_queues(of, oc); + if (err < 0) { + av_log(NULL, AV_LOG_FATAL, "Error setting up output sync queues\n"); + exit_program(1); + } + err = of_muxer_init(of, format_opts, o->limit_filesize); if (err < 0) { av_log(NULL, AV_LOG_FATAL, "Error initializing internal muxing state\n"); diff --git a/fftools/sync_queue.c b/fftools/sync_queue.c new file mode 100644 index 0000000000..cdf6a32cae --- /dev/null +++ b/fftools/sync_queue.c @@ -0,0 +1,346 @@ +/* + * This file is part of FFmpeg. + * + * FFmpeg is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * FFmpeg is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with FFmpeg; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include + +#include "libavutil/avassert.h" +#include "libavutil/error.h" +#include "libavutil/fifo.h" +#include "libavutil/mathematics.h" +#include "libavutil/mem.h" + +#include "sync_queue.h" + +typedef struct SyncQueueStream { + AVFifo *fifo; + AVRational tb; + int64_t head_ts; + int finished; +} SyncQueueStream; + +struct SyncQueue { + enum SyncQueueType type; + + SyncQueueStream *streams; + unsigned int nb_streams; + + // pool of preallocated frames to avoid constant allocations + SyncQueueFrame free_frames[32]; + unsigned int nb_free_frames; + + /* sync head: the stream with the smallest last timestamp */ + int head_stream; + /* the finished stream with the smallest finish timestamp or -1 */ + int head_finished_stream; + int finished; +}; + +static void frame_free(const SyncQueue *sq, SyncQueueFrame frame) +{ + if (sq->type == SYNC_QUEUE_PACKETS) + av_packet_free(&frame.p); + else + av_frame_free(&frame.f); +} + +static void frame_clear(const SyncQueue *sq, SyncQueueFrame frame) +{ + if (sq->type == SYNC_QUEUE_PACKETS) + av_packet_unref(frame.p); + else + av_frame_unref(frame.f); +} + +static void frame_release(SyncQueue *sq, SyncQueueFrame frame) +{ + if (sq->nb_free_frames < FF_ARRAY_ELEMS(sq->free_frames)) { + frame_clear(sq, frame); + sq->free_frames[sq->nb_free_frames++] = frame; + } else + frame_free(sq, frame); +} + +static int frame_get(SyncQueue *sq, SyncQueueFrame *frame) +{ + if (sq->nb_free_frames) { + *frame = sq->free_frames[--sq->nb_free_frames]; + memset(sq->free_frames + sq->nb_free_frames, 0, sizeof(sq->free_frames[0])); + return 0; + } + if (sq->type == SYNC_QUEUE_PACKETS) { + frame->p = av_packet_alloc(); + return frame->p ? 0 : AVERROR(ENOMEM); + } + frame->f = av_frame_alloc(); + return frame->f ? 0 : AVERROR(ENOMEM); +} + +static void frame_move(const SyncQueue *sq, SyncQueueFrame dst, + SyncQueueFrame src) +{ + if (sq->type == SYNC_QUEUE_PACKETS) + av_packet_move_ref(dst.p, src.p); + else + av_frame_move_ref(dst.f, src.f); +} + +static int64_t frame_ts(const SyncQueue *sq, SyncQueueFrame frame) +{ + return (sq->type == SYNC_QUEUE_PACKETS) ? + frame.p->pts + frame.p->duration : + frame.f->pts + frame.f->pkt_duration; +} + +static int frame_null(const SyncQueue *sq, SyncQueueFrame frame) +{ + return (sq->type == SYNC_QUEUE_PACKETS) ? (frame.p == NULL) : (frame.f == NULL); +} + +static void finish_stream(SyncQueue *sq, unsigned int stream_idx) +{ + SyncQueueStream *st = &sq->streams[stream_idx]; + + st->finished = 1; + + if (st->head_ts != AV_NOPTS_VALUE) { + /* check if this stream is the new finished head */ + if (sq->head_finished_stream < 0 || + av_compare_ts(st->head_ts, st->tb, + sq->streams[sq->head_finished_stream].head_ts, + sq->streams[sq->head_finished_stream].tb) < 0) { + sq->head_finished_stream = stream_idx; + } + + /* mark as finished all streams that should no longer receive new frames, + * due to them being ahead of some finished stream */ + st = &sq->streams[sq->head_finished_stream]; + for (unsigned int i = 0; i < sq->nb_streams; i++) { + SyncQueueStream *st1 = &sq->streams[i]; + if (st != st1 && st1->head_ts != AV_NOPTS_VALUE && + av_compare_ts(st->head_ts, st->tb, st1->head_ts, st1->tb) <= 0) + st1->finished = 1; + } + } + + /* mark the whole queue as finished if all streams are finished */ + for (unsigned int i = 0; i < sq->nb_streams; i++) { + if (!sq->streams[i].finished) + return; + } + sq->finished = 1; +} + +int sq_send(SyncQueue *sq, unsigned int stream_idx, SyncQueueFrame frame) +{ + SyncQueueStream *st; + SyncQueueFrame dst; + int64_t ts; + int ret; + + av_assert0(stream_idx < sq->nb_streams); + st = &sq->streams[stream_idx]; + + if (frame_null(sq, frame)) { + finish_stream(sq, stream_idx); + return 0; + } + if (st->finished) + return AVERROR_EOF; + + ret = frame_get(sq, &dst); + if (ret < 0) + return ret; + + frame_move(sq, dst, frame); + + ts = frame_ts(sq, dst); + + ret = av_fifo_write(st->fifo, &dst, 1); + if (ret < 0) { + frame_move(sq, frame, dst); + frame_release(sq, dst); + return ret; + } + + /* update this stream's head timestamp */ + if (ts != AV_NOPTS_VALUE && + (st->head_ts == AV_NOPTS_VALUE || st->head_ts < ts)) { + st->head_ts = ts; + + /* if this stream is now ahead of some finished stream, then + * this stream is also finished */ + if (sq->head_finished_stream >= 0 && + av_compare_ts(sq->streams[sq->head_finished_stream].head_ts, + sq->streams[sq->head_finished_stream].tb, + ts, st->tb) <= 0) + st->finished = 1; + + /* update the overall head timestamp if it could have changed */ + if (sq->head_stream < 0 || sq->head_stream == stream_idx) { + sq->head_stream = stream_idx; + for (unsigned int i = 0; i < sq->nb_streams; i++) { + SyncQueueStream *st_head = &sq->streams[sq->head_stream]; + SyncQueueStream *st_other = &sq->streams[i]; + if (st_other->head_ts != AV_NOPTS_VALUE && + av_compare_ts(st_other->head_ts, st_other->tb, + st_head->head_ts, st_head->tb) < 0) + sq->head_stream = i; + } + } + } + + + return 0; +} + +static int frame_receive(SyncQueue *sq, unsigned int stream_idx, + SyncQueueFrame frame) +{ + SyncQueueStream *st_head = sq->head_stream >= 0 ? + &sq->streams[sq->head_stream] : NULL; + SyncQueueStream *st; + + av_assert0(stream_idx < sq->nb_streams); + st = &sq->streams[stream_idx]; + + if (av_fifo_can_read(st->fifo)) { + SyncQueueFrame peek; + int64_t ts; + int cmp = 0; + int overflow = 0; + + av_fifo_peek(st->fifo, &peek, 1, 0); + ts = frame_ts(sq, peek); + + /* check if this stream's tail timestamp is before + * the overall queue head */ + if (ts != AV_NOPTS_VALUE && st_head) + cmp = av_compare_ts(ts, st->tb, st_head->head_ts, st_head->tb); + + /* also release the tail frame if this stream's head timestamp + * is over 10 seconds after the overall queue head */ + if (st->head_ts != AV_NOPTS_VALUE && st_head) { + int64_t head_st = av_rescale_q(st->head_ts, st->tb, AV_TIME_BASE_Q); + int64_t head_queue = av_rescale_q(st_head->head_ts, st_head->tb, + AV_TIME_BASE_Q); + overflow = head_st - head_queue > 10 * AV_TIME_BASE; + } + + /* return frames that are before the head; + * after all inputs are finished we can also return the head itself */ + if (ts == AV_NOPTS_VALUE || cmp < 0 || overflow || + (sq->finished && cmp == 0)) { + frame_move(sq, frame, peek); + frame_release(sq, peek); + av_fifo_drain2(st->fifo, 1); + return 0; + } + } + + return sq->finished ? AVERROR_EOF : AVERROR(EAGAIN); +} + +int sq_receive(SyncQueue *sq, int stream_idx, SyncQueueFrame frame) +{ + int nb_eof = 0; + int ret; + + /* read a frame for a specific stream */ + if (stream_idx >= 0) { + ret = frame_receive(sq, stream_idx, frame); + return (ret < 0) ? ret : stream_idx; + } + + /* read a frame for any stream with available output */ + for (unsigned int i = 0; i < sq->nb_streams; i++) { + ret = frame_receive(sq, i, frame); + if (ret == AVERROR_EOF || ret == AVERROR(EAGAIN)) { + nb_eof += ret == AVERROR_EOF; + continue; + } + return (ret < 0) ? ret : i; + } + + return (nb_eof == sq->nb_streams) ? AVERROR_EOF : AVERROR(EAGAIN); +} + +int sq_add_stream(SyncQueue *sq) +{ + SyncQueueStream *tmp, *st; + + tmp = av_realloc_array(sq->streams, sq->nb_streams + 1, sizeof(*sq->streams)); + if (!tmp) + return AVERROR(ENOMEM); + sq->streams = tmp; + + st = &sq->streams[sq->nb_streams]; + memset(st, 0, sizeof(*st)); + + st->fifo = av_fifo_alloc2(1, sizeof(SyncQueueFrame), AV_FIFO_FLAG_AUTO_GROW); + if (!st->fifo) + return AVERROR(ENOMEM); + + st->head_ts = AV_NOPTS_VALUE; + + return sq->nb_streams++; +} + +void sq_set_tb(SyncQueue *sq, unsigned int stream_idx, AVRational tb) +{ + av_assert0(stream_idx < sq->nb_streams); + sq->streams[stream_idx].tb = tb; +} + +SyncQueue *sq_alloc(enum SyncQueueType type) +{ + SyncQueue *sq = av_mallocz(sizeof(*sq)); + + if (!sq) + return NULL; + + sq->type = type; + + sq->head_stream = -1; + sq->head_finished_stream = -1; + + return sq; +} + +void sq_free(SyncQueue **psq) +{ + SyncQueue *sq = *psq; + + if (!sq) + return; + + for (unsigned int i = 0; i < sq->nb_streams; i++) { + SyncQueueFrame frame; + while (av_fifo_read(sq->streams[i].fifo, &frame, 1) >= 0) + frame_free(sq, frame); + + av_fifo_freep2(&sq->streams[i].fifo); + } + + av_freep(&sq->streams); + + for (unsigned int i = 0; i < sq->nb_free_frames; i++) + frame_free(sq, sq->free_frames[i]); + + av_freep(psq); +} diff --git a/fftools/sync_queue.h b/fftools/sync_queue.h new file mode 100644 index 0000000000..4a16ae3e62 --- /dev/null +++ b/fftools/sync_queue.h @@ -0,0 +1,93 @@ +/* + * This file is part of FFmpeg. + * + * FFmpeg is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * FFmpeg is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with FFmpeg; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef FFTOOLS_SYNC_QUEUE_H +#define FFTOOLS_SYNC_QUEUE_H + +#include "libavcodec/packet.h" + +#include "libavutil/frame.h" + +enum SyncQueueType { + SYNC_QUEUE_PACKETS, + SYNC_QUEUE_FRAMES, +}; + +typedef union SyncQueueFrame { + AVFrame *f; + AVPacket *p; +} SyncQueueFrame; + +#define SQFRAME(frame) ((SyncQueueFrame){ .f = (frame) }) +#define SQPKT(pkt) ((SyncQueueFrame){ .p = (pkt) }) + +typedef struct SyncQueue SyncQueue; + +SyncQueue *sq_alloc(enum SyncQueueType type); +void sq_free(SyncQueue **sq); + +/** + * Add a new stream to the sync queue. + * + * @return + * - a non-negative stream index on success + * - a negative error code on error + */ +int sq_add_stream(SyncQueue *sq); + +/** + * Set the timebase for the stream with index stream_idx. Should be called + * before sending any frames for this stream. + */ +void sq_set_tb(SyncQueue *sq, unsigned int stream_idx, AVRational tb); + +/** + * Submit a frame for the stream with index stream_idx. + * + * On success, the sync queue takes ownership of the frame and will reset the + * contents of the supplied frame. On failure, the frame remains owned by the + * caller. + * + * Sending a frame with NULL contents marks the stream as finished. + * + * @return + * - 0 on success + * - AVERROR_EOF when no more frames should be submitted for this stream + * - another a negative error code on failure + */ +int sq_send(SyncQueue *sq, unsigned int stream_idx, SyncQueueFrame frame); + +/** + * Read a frame from the queue. + * + * @param stream_idx index of the stream to read a frame for. May be -1, then + * try to read a frame from any stream that is ready for + * output. + * @param frame output frame will be written here on success. The frame is owned + * by the caller. + * + * @return + * - a non-negative index of the stream to which the returned frame belongs + * - AVERROR(EAGAIN) when more frames need to be submitted to the queue + * - AVERROR_EOF when no more frames will be available for this stream (for any + * stream if stream_idx is -1) + * - another negative error code on failure + */ +int sq_receive(SyncQueue *sq, int stream_idx, SyncQueueFrame frame); + +#endif // FFTOOLS_SYNC_QUEUE_H diff --git a/tests/ref/fate/copy-shortest1 b/tests/ref/fate/copy-shortest1 index 5038973e4e..87bee4c41f 100644 --- a/tests/ref/fate/copy-shortest1 +++ b/tests/ref/fate/copy-shortest1 @@ -120,4 +120,3 @@ 0, 98304, 98304, 2048, 11182, e35a2ab846029effdbca0e43639717f2 1, 85760, 85760, 1536, 418, cf52ea7fc69e4c5bc8f75b354dfe60af 0, 100352, 100352, 2048, 1423, f480272c7d0b97834bc8ea36cceca61d -1, 87296, 87296, 1536, 418, 78ab22657a1b6c8a0e5b8612ceb8081d diff --git a/tests/ref/fate/copy-shortest2 b/tests/ref/fate/copy-shortest2 index 5038973e4e..87bee4c41f 100644 --- a/tests/ref/fate/copy-shortest2 +++ b/tests/ref/fate/copy-shortest2 @@ -120,4 +120,3 @@ 0, 98304, 98304, 2048, 11182, e35a2ab846029effdbca0e43639717f2 1, 85760, 85760, 1536, 418, cf52ea7fc69e4c5bc8f75b354dfe60af 0, 100352, 100352, 2048, 1423, f480272c7d0b97834bc8ea36cceca61d -1, 87296, 87296, 1536, 418, 78ab22657a1b6c8a0e5b8612ceb8081d