From patchwork Fri Dec 1 11:15:46 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Anton Khirnov X-Patchwork-Id: 44852 Delivered-To: ffmpegpatchwork2@gmail.com Received: by 2002:a05:6a20:a301:b0:181:818d:5e7f with SMTP id x1csp1005061pzk; Fri, 1 Dec 2023 03:16:45 -0800 (PST) X-Google-Smtp-Source: AGHT+IEopgOMOvBY6Z/Ur16rHii+JA7k8ODepM1fpvaHKFomynSqWwWQaPbDQQxzQddMYl9CT9dE X-Received: by 2002:a2e:978d:0:b0:2c9:d44b:28e0 with SMTP id y13-20020a2e978d000000b002c9d44b28e0mr648945lji.21.1701429404545; Fri, 01 Dec 2023 03:16:44 -0800 (PST) ARC-Seal: i=1; a=rsa-sha256; t=1701429404; cv=none; d=google.com; s=arc-20160816; b=jgzhIcF9CVSXZOPfxYXpl6Ox+Sndh71gznK8Nky+EDaEoqlLtqPyKebd/e/KjcRM5P icQHYvOLLCtgrq8DEwzo3HWZM8BFfAj2SGJu0CwHkkvoHz0ojUMZlb1wxmKH3AhEzLJs fod36ICtYCkdGnzpuLnWe3o97Wuq5hSnNmKxJ1JsGYdi4Jz5Te6SM9g32dTIsRBUsmCd O5uG2n3bQwb8OIDB3Z4MCFQiu870iJPJ2B+e1C9dLAD/yBV6O2PIhROoefDjb2vS4tRa h1172M+IkoyRw0dDbSmLpOOcS3BIb2oeSUavimCDh1K8OQLuk+kiu8TsBHQ6W/iPEtIw 94Sw== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:content-transfer-encoding:reply-to:list-subscribe :list-help:list-post:list-archive:list-unsubscribe:list-id :precedence:subject:mime-version:references:in-reply-to:message-id :date:to:from:delivered-to; bh=k/HME50yEUlKg7n3+OCq2lJ0XCVwHusZEWbVYwNP1+4=; fh=YOA8vD9MJZuwZ71F/05pj6KdCjf6jQRmzLS+CATXUQk=; b=QOoB8ksdbN372hxTbOKYcmft/ymv1CU8XssTYRDRPgPVSoKr8sQYkzPOSeDd8FBrPq 487+ekIKJl48Sc3i1Jjs0IZQCX4oIAai9OUkZeao9PgvoDi5TzrkSuim2uTy6VOyIKBV ReJy7TmeSGLdfp2wJfmA7M4DXZvZAjQiRXMAVx8QLeUP04cmy5oU5SOJA2PF5kt7iDDT Q5Ms35DlLuEpuQYEMQPykJL2HRBOKP7Q+7mOy+zJ2rLtUPwXq6YaJUeSxoiVBR9v4NZu 6khEW/n4tb5vlU9ztt68jLWNnJkEJt8hN6dbzVlAStCl65x0B3X6o8Rl7n8w5slfjqCx W5hg== ARC-Authentication-Results: i=1; mx.google.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org. [79.124.17.100]) by mx.google.com with ESMTP id r18-20020aa7d592000000b0054af2b698edsi1598305edq.193.2023.12.01.03.16.43; Fri, 01 Dec 2023 03:16:44 -0800 (PST) Received-SPF: pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) client-ip=79.124.17.100; Authentication-Results: mx.google.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 28B6A68D032; Fri, 1 Dec 2023 13:16:39 +0200 (EET) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from mail1.khirnov.net (quelana.khirnov.net [94.230.150.81]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id 2AEEA68CF9F for ; Fri, 1 Dec 2023 13:16:33 +0200 (EET) Received: from localhost (mail1.khirnov.net [IPv6:::1]) by mail1.khirnov.net (Postfix) with ESMTP id 5EB2D13BB for ; Fri, 1 Dec 2023 12:16:32 +0100 (CET) Received: from mail1.khirnov.net ([IPv6:::1]) by localhost (mail1.khirnov.net [IPv6:::1]) (amavis, port 10024) with ESMTP id RXC0PpaoAx-w for ; Fri, 1 Dec 2023 12:16:30 +0100 (CET) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:2a00:c500:561:201::7]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256 client-signature RSA-PSS (2048 bits) client-digest SHA256) (Client CN "libav.khirnov.net", Issuer "smtp.khirnov.net SMTP CA" (verified OK)) by mail1.khirnov.net (Postfix) with ESMTPS id 92E7C1049 for ; Fri, 1 Dec 2023 12:16:30 +0100 (CET) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:::1]) by libav.khirnov.net (Postfix) with ESMTP id DFD1A3A0336 for ; Fri, 1 Dec 2023 12:16:22 +0100 (CET) From: Anton Khirnov To: ffmpeg-devel@ffmpeg.org Date: Fri, 1 Dec 2023 12:15:46 +0100 Message-ID: <20231201111621.10989-1-anton@khirnov.net> X-Mailer: git-send-email 2.42.0 In-Reply-To: <20231130204812.GQ3543730@pb2> References: <20231130204812.GQ3543730@pb2> MIME-Version: 1.0 Subject: [FFmpeg-devel] [PATCH 13/13 v3] fftools/ffmpeg: convert to a threaded architecture X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" X-TUID: aHFyYLwBv7ey Change the main loop and every component (demuxers, decoders, filters, encoders, muxers) to use the previously added transcode scheduler. Every instance of every such component was already running in a separate thread, but now they can actually run in parallel. Changes the results of ffmpeg-fix_sub_duration_heartbeat - tested by JEEB to be more correct and deterministic. --- Fixed the hang. Also updated the public branch. --- fftools/ffmpeg.c | 374 +-------- fftools/ffmpeg.h | 97 +-- fftools/ffmpeg_dec.c | 321 ++------ fftools/ffmpeg_demux.c | 268 ++++--- fftools/ffmpeg_enc.c | 368 ++------- fftools/ffmpeg_filter.c | 722 +++++------------- fftools/ffmpeg_mux.c | 324 ++------ fftools/ffmpeg_mux.h | 24 +- fftools/ffmpeg_mux_init.c | 88 +-- fftools/ffmpeg_opt.c | 6 +- .../fate/ffmpeg-fix_sub_duration_heartbeat | 36 +- 11 files changed, 598 insertions(+), 2030 deletions(-) diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c index b8a97258a0..30b594fd97 100644 --- a/fftools/ffmpeg.c +++ b/fftools/ffmpeg.c @@ -117,7 +117,7 @@ typedef struct BenchmarkTimeStamps { static BenchmarkTimeStamps get_benchmark_time_stamps(void); static int64_t getmaxrss(void); -unsigned nb_output_dumped = 0; +atomic_uint nb_output_dumped = 0; static BenchmarkTimeStamps current_time; AVIOContext *progress_avio = NULL; @@ -138,30 +138,6 @@ static struct termios oldtty; static int restore_tty; #endif -/* sub2video hack: - Convert subtitles to video with alpha to insert them in filter graphs. - This is a temporary solution until libavfilter gets real subtitles support. - */ - -static void sub2video_heartbeat(InputFile *infile, int64_t pts, AVRational tb) -{ - /* When a frame is read from a file, examine all sub2video streams in - the same file and send the sub2video frame again. Otherwise, decoded - video frames could be accumulating in the filter graph while a filter - (possibly overlay) is desperately waiting for a subtitle frame. */ - for (int i = 0; i < infile->nb_streams; i++) { - InputStream *ist = infile->streams[i]; - - if (ist->dec_ctx->codec_type != AVMEDIA_TYPE_SUBTITLE) - continue; - - for (int j = 0; j < ist->nb_filters; j++) - ifilter_sub2video_heartbeat(ist->filters[j], pts, tb); - } -} - -/* end of sub2video hack */ - static void term_exit_sigsafe(void) { #if HAVE_TERMIOS_H @@ -499,23 +475,13 @@ void update_benchmark(const char *fmt, ...) } } -void close_output_stream(OutputStream *ost) -{ - OutputFile *of = output_files[ost->file_index]; - ost->finished |= ENCODER_FINISHED; - - if (ost->sq_idx_encode >= 0) - sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL)); -} - -static void print_report(int is_last_report, int64_t timer_start, int64_t cur_time) +static void print_report(int is_last_report, int64_t timer_start, int64_t cur_time, int64_t pts) { AVBPrint buf, buf_script; int64_t total_size = of_filesize(output_files[0]); int vid; double bitrate; double speed; - int64_t pts = AV_NOPTS_VALUE; static int64_t last_time = -1; static int first_report = 1; uint64_t nb_frames_dup = 0, nb_frames_drop = 0; @@ -533,7 +499,7 @@ static void print_report(int is_last_report, int64_t timer_start, int64_t cur_ti last_time = cur_time; } if (((cur_time - last_time) < stats_period && !first_report) || - (first_report && nb_output_dumped < nb_output_files)) + (first_report && atomic_load(&nb_output_dumped) < nb_output_files)) return; last_time = cur_time; } @@ -544,7 +510,7 @@ static void print_report(int is_last_report, int64_t timer_start, int64_t cur_ti av_bprint_init(&buf, 0, AV_BPRINT_SIZE_AUTOMATIC); av_bprint_init(&buf_script, 0, AV_BPRINT_SIZE_AUTOMATIC); for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) { - const float q = ost->enc ? ost->quality / (float) FF_QP2LAMBDA : -1; + const float q = ost->enc ? atomic_load(&ost->quality) / (float) FF_QP2LAMBDA : -1; if (vid && ost->type == AVMEDIA_TYPE_VIDEO) { av_bprintf(&buf, "q=%2.1f ", q); @@ -565,22 +531,18 @@ static void print_report(int is_last_report, int64_t timer_start, int64_t cur_ti if (is_last_report) av_bprintf(&buf, "L"); - nb_frames_dup = ost->filter->nb_frames_dup; - nb_frames_drop = ost->filter->nb_frames_drop; + nb_frames_dup = atomic_load(&ost->filter->nb_frames_dup); + nb_frames_drop = atomic_load(&ost->filter->nb_frames_drop); vid = 1; } - /* compute min output value */ - if (ost->last_mux_dts != AV_NOPTS_VALUE) { - if (pts == AV_NOPTS_VALUE || ost->last_mux_dts > pts) - pts = ost->last_mux_dts; - if (copy_ts) { - if (copy_ts_first_pts == AV_NOPTS_VALUE && pts > 1) - copy_ts_first_pts = pts; - if (copy_ts_first_pts != AV_NOPTS_VALUE) - pts -= copy_ts_first_pts; - } - } + } + + if (copy_ts) { + if (copy_ts_first_pts == AV_NOPTS_VALUE && pts > 1) + copy_ts_first_pts = pts; + if (copy_ts_first_pts != AV_NOPTS_VALUE) + pts -= copy_ts_first_pts; } us = FFABS64U(pts) % AV_TIME_BASE; @@ -783,81 +745,6 @@ int subtitle_wrap_frame(AVFrame *frame, AVSubtitle *subtitle, int copy) return 0; } -int trigger_fix_sub_duration_heartbeat(OutputStream *ost, const AVPacket *pkt) -{ - OutputFile *of = output_files[ost->file_index]; - int64_t signal_pts = av_rescale_q(pkt->pts, pkt->time_base, - AV_TIME_BASE_Q); - - if (!ost->fix_sub_duration_heartbeat || !(pkt->flags & AV_PKT_FLAG_KEY)) - // we are only interested in heartbeats on streams configured, and - // only on random access points. - return 0; - - for (int i = 0; i < of->nb_streams; i++) { - OutputStream *iter_ost = of->streams[i]; - InputStream *ist = iter_ost->ist; - int ret = AVERROR_BUG; - - if (iter_ost == ost || !ist || !ist->decoding_needed || - ist->dec_ctx->codec_type != AVMEDIA_TYPE_SUBTITLE) - // We wish to skip the stream that causes the heartbeat, - // output streams without an input stream, streams not decoded - // (as fix_sub_duration is only done for decoded subtitles) as - // well as non-subtitle streams. - continue; - - if ((ret = fix_sub_duration_heartbeat(ist, signal_pts)) < 0) - return ret; - } - - return 0; -} - -/* pkt = NULL means EOF (needed to flush decoder buffers) */ -static int process_input_packet(InputStream *ist, const AVPacket *pkt, int no_eof) -{ - InputFile *f = input_files[ist->file_index]; - int64_t dts_est = AV_NOPTS_VALUE; - int ret = 0; - int eof_reached = 0; - - if (ist->decoding_needed) { - ret = dec_packet(ist, pkt, no_eof); - if (ret < 0 && ret != AVERROR_EOF) - return ret; - } - if (ret == AVERROR_EOF || (!pkt && !ist->decoding_needed)) - eof_reached = 1; - - if (pkt && pkt->opaque_ref) { - DemuxPktData *pd = (DemuxPktData*)pkt->opaque_ref->data; - dts_est = pd->dts_est; - } - - if (f->recording_time != INT64_MAX) { - int64_t start_time = 0; - if (copy_ts) { - start_time += f->start_time != AV_NOPTS_VALUE ? f->start_time : 0; - start_time += start_at_zero ? 0 : f->start_time_effective; - } - if (dts_est >= f->recording_time + start_time) - pkt = NULL; - } - - for (int oidx = 0; oidx < ist->nb_outputs; oidx++) { - OutputStream *ost = ist->outputs[oidx]; - if (ost->enc || (!pkt && no_eof)) - continue; - - ret = of_streamcopy(ost, pkt, dts_est); - if (ret < 0) - return ret; - } - - return !eof_reached; -} - static void print_stream_maps(void) { av_log(NULL, AV_LOG_INFO, "Stream mapping:\n"); @@ -934,43 +821,6 @@ static void print_stream_maps(void) } } -/** - * Select the output stream to process. - * - * @retval 0 an output stream was selected - * @retval AVERROR(EAGAIN) need to wait until more input is available - * @retval AVERROR_EOF no more streams need output - */ -static int choose_output(OutputStream **post) -{ - int64_t opts_min = INT64_MAX; - OutputStream *ost_min = NULL; - - for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) { - int64_t opts; - - if (ost->filter && ost->filter->last_pts != AV_NOPTS_VALUE) { - opts = ost->filter->last_pts; - } else { - opts = ost->last_mux_dts == AV_NOPTS_VALUE ? - INT64_MIN : ost->last_mux_dts; - } - - if (!ost->initialized && !ost->finished) { - ost_min = ost; - break; - } - if (!ost->finished && opts < opts_min) { - opts_min = opts; - ost_min = ost; - } - } - if (!ost_min) - return AVERROR_EOF; - *post = ost_min; - return ost_min->unavailable ? AVERROR(EAGAIN) : 0; -} - static void set_tty_echo(int on) { #if HAVE_TERMIOS_H @@ -1042,149 +892,21 @@ static int check_keyboard_interaction(int64_t cur_time) return 0; } -static void reset_eagain(void) -{ - for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) - ost->unavailable = 0; -} - -static void decode_flush(InputFile *ifile) -{ - for (int i = 0; i < ifile->nb_streams; i++) { - InputStream *ist = ifile->streams[i]; - - if (ist->discard || !ist->decoding_needed) - continue; - - dec_packet(ist, NULL, 1); - } -} - -/* - * Return - * - 0 -- one packet was read and processed - * - AVERROR(EAGAIN) -- no packets were available for selected file, - * this function should be called again - * - AVERROR_EOF -- this function should not be called again - */ -static int process_input(int file_index, AVPacket *pkt) -{ - InputFile *ifile = input_files[file_index]; - InputStream *ist; - int ret, i; - - ret = ifile_get_packet(ifile, pkt); - - if (ret == 1) { - /* the input file is looped: flush the decoders */ - decode_flush(ifile); - return AVERROR(EAGAIN); - } - if (ret < 0) { - if (ret != AVERROR_EOF) { - av_log(ifile, AV_LOG_ERROR, - "Error retrieving a packet from demuxer: %s\n", av_err2str(ret)); - if (exit_on_error) - return ret; - } - - for (i = 0; i < ifile->nb_streams; i++) { - ist = ifile->streams[i]; - if (!ist->discard) { - ret = process_input_packet(ist, NULL, 0); - if (ret>0) - return 0; - else if (ret < 0) - return ret; - } - - /* mark all outputs that don't go through lavfi as finished */ - for (int oidx = 0; oidx < ist->nb_outputs; oidx++) { - OutputStream *ost = ist->outputs[oidx]; - OutputFile *of = output_files[ost->file_index]; - - ret = of_output_packet(of, ost, NULL); - if (ret < 0) - return ret; - } - } - - ifile->eof_reached = 1; - return AVERROR(EAGAIN); - } - - reset_eagain(); - - ist = ifile->streams[pkt->stream_index]; - - sub2video_heartbeat(ifile, pkt->pts, pkt->time_base); - - ret = process_input_packet(ist, pkt, 0); - - av_packet_unref(pkt); - - return ret < 0 ? ret : 0; -} - -/** - * Run a single step of transcoding. - * - * @return 0 for success, <0 for error - */ -static int transcode_step(OutputStream *ost, AVPacket *demux_pkt) -{ - InputStream *ist = NULL; - int ret; - - if (ost->filter) { - if ((ret = fg_transcode_step(ost->filter->graph, &ist)) < 0) - return ret; - if (!ist) - return 0; - } else { - ist = ost->ist; - av_assert0(ist); - } - - ret = process_input(ist->file_index, demux_pkt); - if (ret == AVERROR(EAGAIN)) { - return 0; - } - - if (ret < 0) - return ret == AVERROR_EOF ? 0 : ret; - - // process_input() above might have caused output to become available - // in multiple filtergraphs, so we process all of them - for (int i = 0; i < nb_filtergraphs; i++) { - ret = reap_filters(filtergraphs[i], 0); - if (ret < 0) - return ret; - } - - return 0; -} - /* * The following code is the main loop of the file converter */ -static int transcode(Scheduler *sch, int *err_rate_exceeded) +static int transcode(Scheduler *sch) { int ret = 0, i; - InputStream *ist; - int64_t timer_start; - AVPacket *demux_pkt = NULL; + int64_t timer_start, transcode_ts = 0; print_stream_maps(); - *err_rate_exceeded = 0; atomic_store(&transcode_init_done, 1); - demux_pkt = av_packet_alloc(); - if (!demux_pkt) { - ret = AVERROR(ENOMEM); - goto fail; - } + ret = sch_start(sch); + if (ret < 0) + return ret; if (stdin_interaction) { av_log(NULL, AV_LOG_INFO, "Press [q] to stop, [?] for help\n"); @@ -1192,8 +914,7 @@ static int transcode(Scheduler *sch, int *err_rate_exceeded) timer_start = av_gettime_relative(); - while (!received_sigterm) { - OutputStream *ost; + while (!sch_wait(sch, stats_period, &transcode_ts)) { int64_t cur_time= av_gettime_relative(); /* if 'q' pressed, exits */ @@ -1201,49 +922,11 @@ static int transcode(Scheduler *sch, int *err_rate_exceeded) if (check_keyboard_interaction(cur_time) < 0) break; - ret = choose_output(&ost); - if (ret == AVERROR(EAGAIN)) { - reset_eagain(); - av_usleep(10000); - ret = 0; - continue; - } else if (ret < 0) { - av_log(NULL, AV_LOG_VERBOSE, "No more output streams to write to, finishing.\n"); - ret = 0; - break; - } - - ret = transcode_step(ost, demux_pkt); - if (ret < 0 && ret != AVERROR_EOF) { - av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n", av_err2str(ret)); - break; - } - /* dump report by using the output first video and audio streams */ - print_report(0, timer_start, cur_time); + print_report(0, timer_start, cur_time, transcode_ts); } - /* at the end of stream, we must flush the decoder buffers */ - for (ist = ist_iter(NULL); ist; ist = ist_iter(ist)) { - float err_rate; - - if (!input_files[ist->file_index]->eof_reached) { - int err = process_input_packet(ist, NULL, 0); - ret = err_merge(ret, err); - } - - err_rate = (ist->frames_decoded || ist->decode_errors) ? - ist->decode_errors / (ist->frames_decoded + ist->decode_errors) : 0.f; - if (err_rate > max_error_rate) { - av_log(ist, AV_LOG_FATAL, "Decode error rate %g exceeds maximum %g\n", - err_rate, max_error_rate); - *err_rate_exceeded = 1; - } else if (err_rate) - av_log(ist, AV_LOG_VERBOSE, "Decode error rate %g\n", err_rate); - } - ret = err_merge(ret, enc_flush()); - - term_exit(); + ret = sch_stop(sch); /* write the trailer if needed */ for (i = 0; i < nb_output_files; i++) { @@ -1251,11 +934,10 @@ static int transcode(Scheduler *sch, int *err_rate_exceeded) ret = err_merge(ret, err); } - /* dump report by using the first video and audio streams */ - print_report(1, timer_start, av_gettime_relative()); + term_exit(); -fail: - av_packet_free(&demux_pkt); + /* dump report by using the first video and audio streams */ + print_report(1, timer_start, av_gettime_relative(), transcode_ts); return ret; } @@ -1308,7 +990,7 @@ int main(int argc, char **argv) { Scheduler *sch = NULL; - int ret, err_rate_exceeded; + int ret; BenchmarkTimeStamps ti; init_dynload(); @@ -1350,7 +1032,7 @@ int main(int argc, char **argv) } current_time = ti = get_benchmark_time_stamps(); - ret = transcode(sch, &err_rate_exceeded); + ret = transcode(sch); if (ret >= 0 && do_benchmark) { int64_t utime, stime, rtime; current_time = get_benchmark_time_stamps(); @@ -1362,8 +1044,8 @@ int main(int argc, char **argv) utime / 1000000.0, stime / 1000000.0, rtime / 1000000.0); } - ret = received_nb_signals ? 255 : - err_rate_exceeded ? 69 : ret; + ret = received_nb_signals ? 255 : + (ret == FFMPEG_ERROR_RATE_EXCEEDED) ? 69 : ret; finish: if (ret == AVERROR_EXIT) diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index a89038b765..ba82b7490d 100644 --- a/fftools/ffmpeg.h +++ b/fftools/ffmpeg.h @@ -61,6 +61,8 @@ #define FFMPEG_OPT_TOP 1 #define FFMPEG_OPT_FORCE_KF_SOURCE_NO_DROP 1 +#define FFMPEG_ERROR_RATE_EXCEEDED FFERRTAG('E', 'R', 'E', 'D') + enum VideoSyncMethod { VSYNC_AUTO = -1, VSYNC_PASSTHROUGH, @@ -82,13 +84,16 @@ enum HWAccelID { }; enum FrameOpaque { - FRAME_OPAQUE_REAP_FILTERS = 1, - FRAME_OPAQUE_CHOOSE_INPUT, - FRAME_OPAQUE_SUB_HEARTBEAT, + FRAME_OPAQUE_SUB_HEARTBEAT = 1, FRAME_OPAQUE_EOF, FRAME_OPAQUE_SEND_COMMAND, }; +enum PacketOpaque { + PKT_OPAQUE_SUB_HEARTBEAT = 1, + PKT_OPAQUE_FIX_SUB_DURATION, +}; + typedef struct HWDevice { const char *name; enum AVHWDeviceType type; @@ -309,11 +314,8 @@ typedef struct OutputFilter { enum AVMediaType type; - /* pts of the last frame received from this filter, in AV_TIME_BASE_Q */ - int64_t last_pts; - - uint64_t nb_frames_dup; - uint64_t nb_frames_drop; + atomic_uint_least64_t nb_frames_dup; + atomic_uint_least64_t nb_frames_drop; } OutputFilter; typedef struct FilterGraph { @@ -426,11 +428,6 @@ typedef struct InputFile { float readrate; int accurate_seek; - - /* when looping the input file, this queue is used by decoders to report - * the last frame timestamp back to the demuxer thread */ - AVThreadMessageQueue *audio_ts_queue; - int audio_ts_queue_size; } InputFile; enum forced_keyframes_const { @@ -532,8 +529,6 @@ typedef struct OutputStream { InputStream *ist; AVStream *st; /* stream in the output file */ - /* dts of the last packet sent to the muxing queue, in AV_TIME_BASE_Q */ - int64_t last_mux_dts; AVRational enc_timebase; @@ -578,13 +573,6 @@ typedef struct OutputStream { AVDictionary *sws_dict; AVDictionary *swr_opts; char *apad; - OSTFinished finished; /* no more packets should be written for this stream */ - int unavailable; /* true if the steram is unavailable (possibly temporarily) */ - - // init_output_stream() has been called for this stream - // The encoder and the bitstream filters have been initialized and the stream - // parameters are set in the AVStream. - int initialized; const char *attachment_filename; @@ -598,9 +586,8 @@ typedef struct OutputStream { uint64_t samples_encoded; /* packet quality factor */ - int quality; + atomic_int quality; - int sq_idx_encode; int sq_idx_mux; EncStats enc_stats_pre; @@ -658,7 +645,6 @@ extern FilterGraph **filtergraphs; extern int nb_filtergraphs; extern char *vstats_filename; -extern char *sdp_filename; extern float dts_delta_threshold; extern float dts_error_threshold; @@ -691,7 +677,7 @@ extern const AVIOInterruptCB int_cb; extern const OptionDef options[]; extern HWDevice *filter_hw_device; -extern unsigned nb_output_dumped; +extern atomic_uint nb_output_dumped; extern int ignore_unknown_streams; extern int copy_unknown_streams; @@ -737,10 +723,6 @@ FrameData *frame_data(AVFrame *frame); const FrameData *frame_data_c(AVFrame *frame); -int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference); -int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb); -void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb); - /** * Set up fallback filtering parameters from a decoder context. They will only * be used if no frames are ever sent on this input, otherwise the actual @@ -761,26 +743,9 @@ int fg_create(FilterGraph **pfg, char *graph_desc, Scheduler *sch); void fg_free(FilterGraph **pfg); -/** - * Perform a step of transcoding for the specified filter graph. - * - * @param[in] graph filter graph to consider - * @param[out] best_ist input stream where a frame would allow to continue - * @return 0 for success, <0 for error - */ -int fg_transcode_step(FilterGraph *graph, InputStream **best_ist); - void fg_send_command(FilterGraph *fg, double time, const char *target, const char *command, const char *arg, int all_filters); -/** - * Get and encode new output from specified filtergraph, without causing - * activity. - * - * @return 0 for success, <0 for severe errors - */ -int reap_filters(FilterGraph *fg, int flush); - int ffmpeg_parse_options(int argc, char **argv, Scheduler *sch); void enc_stats_write(OutputStream *ost, EncStats *es, @@ -807,25 +772,11 @@ int hwaccel_retrieve_data(AVCodecContext *avctx, AVFrame *input); int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx); void dec_free(Decoder **pdec); -/** - * Submit a packet for decoding - * - * When pkt==NULL and no_eof=0, there will be no more input. Flush decoders and - * mark all downstreams as finished. - * - * When pkt==NULL and no_eof=1, the stream was reset (e.g. after a seek). Flush - * decoders and await further input. - */ -int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof); - int enc_alloc(Encoder **penc, const AVCodec *codec, Scheduler *sch, unsigned sch_idx); void enc_free(Encoder **penc); -int enc_open(OutputStream *ost, const AVFrame *frame); -int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub); -int enc_frame(OutputStream *ost, AVFrame *frame); -int enc_flush(void); +int enc_open(void *opaque, const AVFrame *frame); /* * Initialize muxing state for the given stream, should be called @@ -840,30 +791,11 @@ void of_free(OutputFile **pof); void of_enc_stats_close(void); -int of_output_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt); - -/** - * @param dts predicted packet dts in AV_TIME_BASE_Q - */ -int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts); - int64_t of_filesize(OutputFile *of); int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch); void ifile_close(InputFile **f); -/** - * Get next input packet from the demuxer. - * - * @param pkt the packet is written here when this function returns 0 - * @return - * - 0 when a packet has been read successfully - * - 1 when stream end was reached, but the stream is looped; - * caller should flush decoders and read from this demuxer again - * - a negative error code on failure - */ -int ifile_get_packet(InputFile *f, AVPacket *pkt); - int ist_output_add(InputStream *ist, OutputStream *ost); int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple); @@ -880,9 +812,6 @@ InputStream *ist_iter(InputStream *prev); * pass NULL to start iteration */ OutputStream *ost_iter(OutputStream *prev); -void close_output_stream(OutputStream *ost); -int trigger_fix_sub_duration_heartbeat(OutputStream *ost, const AVPacket *pkt); -int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts); void update_benchmark(const char *fmt, ...); #define SPECIFIER_OPT_FMT_str "%s" diff --git a/fftools/ffmpeg_dec.c b/fftools/ffmpeg_dec.c index 90ea0d6d93..5dde82a276 100644 --- a/fftools/ffmpeg_dec.c +++ b/fftools/ffmpeg_dec.c @@ -54,24 +54,6 @@ struct Decoder { Scheduler *sch; unsigned sch_idx; - - pthread_t thread; - /** - * Queue for sending coded packets from the main thread to - * the decoder thread. - * - * An empty packet is sent to flush the decoder without terminating - * decoding. - */ - ThreadQueue *queue_in; - /** - * Queue for sending decoded frames from the decoder thread - * to the main thread. - * - * An empty frame is sent to signal that a single packet has been fully - * processed. - */ - ThreadQueue *queue_out; }; // data that is local to the decoder thread and not visible outside of it @@ -80,24 +62,6 @@ typedef struct DecThreadContext { AVPacket *pkt; } DecThreadContext; -static int dec_thread_stop(Decoder *d) -{ - void *ret; - - if (!d->queue_in) - return 0; - - tq_send_finish(d->queue_in, 0); - tq_receive_finish(d->queue_out, 0); - - pthread_join(d->thread, &ret); - - tq_free(&d->queue_in); - tq_free(&d->queue_out); - - return (intptr_t)ret; -} - void dec_free(Decoder **pdec) { Decoder *dec = *pdec; @@ -105,8 +69,6 @@ void dec_free(Decoder **pdec) if (!dec) return; - dec_thread_stop(dec); - av_frame_free(&dec->frame); av_packet_free(&dec->pkt); @@ -148,25 +110,6 @@ fail: return AVERROR(ENOMEM); } -static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame) -{ - int i, ret = 0; - - for (i = 0; i < ist->nb_filters; i++) { - ret = ifilter_send_frame(ist->filters[i], decoded_frame, - i < ist->nb_filters - 1 || - ist->dec->type == AVMEDIA_TYPE_SUBTITLE); - if (ret == AVERROR_EOF) - ret = 0; /* ignore */ - if (ret < 0) { - av_log(NULL, AV_LOG_ERROR, - "Failed to inject frame into filter network: %s\n", av_err2str(ret)); - break; - } - } - return ret; -} - static AVRational audio_samplerate_update(void *logctx, Decoder *d, const AVFrame *frame) { @@ -421,28 +364,14 @@ static int process_subtitle(InputStream *ist, AVFrame *frame) if (!subtitle) return 0; - ret = send_frame_to_filters(ist, frame); + ret = sch_dec_send(d->sch, d->sch_idx, frame); if (ret < 0) - return ret; + av_frame_unref(frame); - subtitle = (AVSubtitle*)frame->buf[0]->data; - if (!subtitle->num_rects) - return 0; - - for (int oidx = 0; oidx < ist->nb_outputs; oidx++) { - OutputStream *ost = ist->outputs[oidx]; - if (!ost->enc || ost->type != AVMEDIA_TYPE_SUBTITLE) - continue; - - ret = enc_subtitle(output_files[ost->file_index], ost, subtitle); - if (ret < 0) - return ret; - } - - return 0; + return ret == AVERROR_EOF ? AVERROR_EXIT : ret; } -int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts) +static int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts) { Decoder *d = ist->decoder; int ret = AVERROR_BUG; @@ -468,12 +397,24 @@ int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts) static int transcode_subtitles(InputStream *ist, const AVPacket *pkt, AVFrame *frame) { - Decoder *d = ist->decoder; + Decoder *d = ist->decoder; AVPacket *flush_pkt = NULL; AVSubtitle subtitle; int got_output; int ret; + if (pkt && (intptr_t)pkt->opaque == PKT_OPAQUE_SUB_HEARTBEAT) { + frame->pts = pkt->pts; + frame->time_base = pkt->time_base; + frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_SUB_HEARTBEAT; + + ret = sch_dec_send(d->sch, d->sch_idx, frame); + return ret == AVERROR_EOF ? AVERROR_EXIT : ret; + } else if (pkt && (intptr_t)pkt->opaque == PKT_OPAQUE_FIX_SUB_DURATION) { + return fix_sub_duration_heartbeat(ist, av_rescale_q(pkt->pts, pkt->time_base, + AV_TIME_BASE_Q)); + } + if (!pkt) { flush_pkt = av_packet_alloc(); if (!flush_pkt) @@ -496,7 +437,7 @@ static int transcode_subtitles(InputStream *ist, const AVPacket *pkt, ist->frames_decoded++; - // XXX the queue for transferring data back to the main thread runs + // XXX the queue for transferring data to consumers runs // on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put that // inside the frame // eventually, subtitles should be switched to use AVFrames natively @@ -509,26 +450,7 @@ static int transcode_subtitles(InputStream *ist, const AVPacket *pkt, frame->width = ist->dec_ctx->width; frame->height = ist->dec_ctx->height; - ret = tq_send(d->queue_out, 0, frame); - if (ret < 0) - av_frame_unref(frame); - - return ret; -} - -static int send_filter_eof(InputStream *ist) -{ - Decoder *d = ist->decoder; - int i, ret; - - for (i = 0; i < ist->nb_filters; i++) { - int64_t end_pts = d->last_frame_pts == AV_NOPTS_VALUE ? AV_NOPTS_VALUE : - d->last_frame_pts + d->last_frame_duration_est; - ret = ifilter_send_eof(ist->filters[i], end_pts, d->last_frame_tb); - if (ret < 0) - return ret; - } - return 0; + return process_subtitle(ist, frame); } static int packet_decode(InputStream *ist, AVPacket *pkt, AVFrame *frame) @@ -635,9 +557,11 @@ static int packet_decode(InputStream *ist, AVPacket *pkt, AVFrame *frame) ist->frames_decoded++; - ret = tq_send(d->queue_out, 0, frame); - if (ret < 0) - return ret; + ret = sch_dec_send(d->sch, d->sch_idx, frame); + if (ret < 0) { + av_frame_unref(frame); + return ret == AVERROR_EOF ? AVERROR_EXIT : ret; + } } } @@ -679,7 +603,6 @@ fail: void *decoder_thread(void *arg) { InputStream *ist = arg; - InputFile *ifile = input_files[ist->file_index]; Decoder *d = ist->decoder; DecThreadContext dt; int ret = 0, input_status = 0; @@ -691,19 +614,31 @@ void *decoder_thread(void *arg) dec_thread_set_name(ist); while (!input_status) { - int dummy, flush_buffers; + int flush_buffers, have_data; - input_status = tq_receive(d->queue_in, &dummy, dt.pkt); - flush_buffers = input_status >= 0 && !dt.pkt->buf; - if (!dt.pkt->buf) + input_status = sch_dec_receive(d->sch, d->sch_idx, dt.pkt); + have_data = input_status >= 0 && + (dt.pkt->buf || dt.pkt->side_data_elems || + (intptr_t)dt.pkt->opaque == PKT_OPAQUE_SUB_HEARTBEAT || + (intptr_t)dt.pkt->opaque == PKT_OPAQUE_FIX_SUB_DURATION); + flush_buffers = input_status >= 0 && !have_data; + if (!have_data) av_log(ist, AV_LOG_VERBOSE, "Decoder thread received %s packet\n", flush_buffers ? "flush" : "EOF"); - ret = packet_decode(ist, dt.pkt->buf ? dt.pkt : NULL, dt.frame); + ret = packet_decode(ist, have_data ? dt.pkt : NULL, dt.frame); av_packet_unref(dt.pkt); av_frame_unref(dt.frame); + // AVERROR_EOF - EOF from the decoder + // AVERROR_EXIT - EOF from the scheduler + // we treat them differently when flushing + if (ret == AVERROR_EXIT) { + ret = AVERROR_EOF; + flush_buffers = 0; + } + if (ret == AVERROR_EOF) { av_log(ist, AV_LOG_VERBOSE, "Decoder returned EOF, %s\n", flush_buffers ? "resetting" : "finishing"); @@ -711,11 +646,10 @@ void *decoder_thread(void *arg) if (!flush_buffers) break; - /* report last frame duration to the demuxer thread */ + /* report last frame duration to the scheduler */ if (ist->dec->type == AVMEDIA_TYPE_AUDIO) { - Timestamp ts = { .ts = d->last_frame_pts + d->last_frame_duration_est, - .tb = d->last_frame_tb }; - av_thread_message_queue_send(ifile->audio_ts_queue, &ts, 0); + dt.pkt->pts = d->last_frame_pts + d->last_frame_duration_est; + dt.pkt->time_base = d->last_frame_tb; } avcodec_flush_buffers(ist->dec_ctx); @@ -724,149 +658,47 @@ void *decoder_thread(void *arg) av_err2str(ret)); break; } - - // signal to the consumer thread that the entire packet was processed - ret = tq_send(d->queue_out, 0, dt.frame); - if (ret < 0) { - if (ret != AVERROR_EOF) - av_log(ist, AV_LOG_ERROR, "Error communicating with the main thread\n"); - break; - } } // EOF is normal thread termination if (ret == AVERROR_EOF) ret = 0; + // on success send EOF timestamp to our downstreams + if (ret >= 0) { + float err_rate; + + av_frame_unref(dt.frame); + + dt.frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_EOF; + dt.frame->pts = d->last_frame_pts == AV_NOPTS_VALUE ? AV_NOPTS_VALUE : + d->last_frame_pts + d->last_frame_duration_est; + dt.frame->time_base = d->last_frame_tb; + + ret = sch_dec_send(d->sch, d->sch_idx, dt.frame); + if (ret < 0 && ret != AVERROR_EOF) { + av_log(NULL, AV_LOG_FATAL, + "Error signalling EOF timestamp: %s\n", av_err2str(ret)); + goto finish; + } + ret = 0; + + err_rate = (ist->frames_decoded || ist->decode_errors) ? + ist->decode_errors / (ist->frames_decoded + ist->decode_errors) : 0.f; + if (err_rate > max_error_rate) { + av_log(ist, AV_LOG_FATAL, "Decode error rate %g exceeds maximum %g\n", + err_rate, max_error_rate); + ret = FFMPEG_ERROR_RATE_EXCEEDED; + } else if (err_rate) + av_log(ist, AV_LOG_VERBOSE, "Decode error rate %g\n", err_rate); + } + finish: - tq_receive_finish(d->queue_in, 0); - tq_send_finish (d->queue_out, 0); - - // make sure the demuxer does not get stuck waiting for audio durations - // that will never arrive - if (ifile->audio_ts_queue && ist->dec->type == AVMEDIA_TYPE_AUDIO) - av_thread_message_queue_set_err_recv(ifile->audio_ts_queue, AVERROR_EOF); - dec_thread_uninit(&dt); - av_log(ist, AV_LOG_VERBOSE, "Terminating decoder thread\n"); - return (void*)(intptr_t)ret; } -int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof) -{ - Decoder *d = ist->decoder; - int ret = 0, thread_ret; - - // thread already joined - if (!d->queue_in) - return AVERROR_EOF; - - // send the packet/flush request/EOF to the decoder thread - if (pkt || no_eof) { - av_packet_unref(d->pkt); - - if (pkt) { - ret = av_packet_ref(d->pkt, pkt); - if (ret < 0) - goto finish; - } - - ret = tq_send(d->queue_in, 0, d->pkt); - if (ret < 0) - goto finish; - } else - tq_send_finish(d->queue_in, 0); - - // retrieve all decoded data for the packet - while (1) { - int dummy; - - ret = tq_receive(d->queue_out, &dummy, d->frame); - if (ret < 0) - goto finish; - - // packet fully processed - if (!d->frame->buf[0]) - return 0; - - // process the decoded frame - if (ist->dec->type == AVMEDIA_TYPE_SUBTITLE) { - ret = process_subtitle(ist, d->frame); - } else { - ret = send_frame_to_filters(ist, d->frame); - } - av_frame_unref(d->frame); - if (ret < 0) - goto finish; - } - -finish: - thread_ret = dec_thread_stop(d); - if (thread_ret < 0) { - av_log(ist, AV_LOG_ERROR, "Decoder thread returned error: %s\n", - av_err2str(thread_ret)); - ret = err_merge(ret, thread_ret); - } - // non-EOF errors here are all fatal - if (ret < 0 && ret != AVERROR_EOF) - return ret; - - // signal EOF to our downstreams - ret = send_filter_eof(ist); - if (ret < 0) { - av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n"); - return ret; - } - - return AVERROR_EOF; -} - -static int dec_thread_start(InputStream *ist) -{ - Decoder *d = ist->decoder; - ObjPool *op; - int ret = 0; - - op = objpool_alloc_packets(); - if (!op) - return AVERROR(ENOMEM); - - d->queue_in = tq_alloc(1, 1, op, pkt_move); - if (!d->queue_in) { - objpool_free(&op); - return AVERROR(ENOMEM); - } - - op = objpool_alloc_frames(); - if (!op) - goto fail; - - d->queue_out = tq_alloc(1, 4, op, frame_move); - if (!d->queue_out) { - objpool_free(&op); - goto fail; - } - - ret = pthread_create(&d->thread, NULL, decoder_thread, ist); - if (ret) { - ret = AVERROR(ret); - av_log(ist, AV_LOG_ERROR, "pthread_create() failed: %s\n", - av_err2str(ret)); - goto fail; - } - - return 0; -fail: - if (ret >= 0) - ret = AVERROR(ENOMEM); - - tq_free(&d->queue_in); - tq_free(&d->queue_out); - return ret; -} - static enum AVPixelFormat get_format(AVCodecContext *s, const enum AVPixelFormat *pix_fmts) { InputStream *ist = s->opaque; @@ -1118,12 +950,5 @@ int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx) if (ret < 0) return ret; - ret = dec_thread_start(ist); - if (ret < 0) { - av_log(ist, AV_LOG_ERROR, "Error starting decoder thread: %s\n", - av_err2str(ret)); - return ret; - } - return 0; } diff --git a/fftools/ffmpeg_demux.c b/fftools/ffmpeg_demux.c index 2234dbe076..91cd7a1125 100644 --- a/fftools/ffmpeg_demux.c +++ b/fftools/ffmpeg_demux.c @@ -22,8 +22,6 @@ #include "ffmpeg.h" #include "ffmpeg_sched.h" #include "ffmpeg_utils.h" -#include "objpool.h" -#include "thread_queue.h" #include "libavutil/avassert.h" #include "libavutil/avstring.h" @@ -35,7 +33,6 @@ #include "libavutil/pixdesc.h" #include "libavutil/time.h" #include "libavutil/timestamp.h" -#include "libavutil/thread.h" #include "libavcodec/packet.h" @@ -66,7 +63,11 @@ typedef struct DemuxStream { double ts_scale; + // scheduler returned EOF for this stream + int finished; + int streamcopy_needed; + int have_sub2video; int wrap_correction_done; int saw_first_ts; @@ -101,6 +102,7 @@ typedef struct Demuxer { /* number of times input stream should be looped */ int loop; + int have_audio_dec; /* duration of the looped segment of the input file */ Timestamp duration; /* pts with the smallest/largest values ever seen */ @@ -113,11 +115,12 @@ typedef struct Demuxer { double readrate_initial_burst; Scheduler *sch; - ThreadQueue *thread_queue; - int thread_queue_size; - pthread_t thread; + + AVPacket *pkt_heartbeat; int read_started; + int nb_streams_used; + int nb_streams_finished; } Demuxer; static DemuxStream *ds_from_ist(InputStream *ist) @@ -153,7 +156,7 @@ static void report_new_stream(Demuxer *d, const AVPacket *pkt) d->nb_streams_warn = pkt->stream_index + 1; } -static int seek_to_start(Demuxer *d) +static int seek_to_start(Demuxer *d, Timestamp end_pts) { InputFile *ifile = &d->f; AVFormatContext *is = ifile->ctx; @@ -163,21 +166,10 @@ static int seek_to_start(Demuxer *d) if (ret < 0) return ret; - if (ifile->audio_ts_queue_size) { - int got_ts = 0; - - while (got_ts < ifile->audio_ts_queue_size) { - Timestamp ts; - ret = av_thread_message_queue_recv(ifile->audio_ts_queue, &ts, 0); - if (ret < 0) - return ret; - got_ts++; - - if (d->max_pts.ts == AV_NOPTS_VALUE || - av_compare_ts(d->max_pts.ts, d->max_pts.tb, ts.ts, ts.tb) < 0) - d->max_pts = ts; - } - } + if (end_pts.ts != AV_NOPTS_VALUE && + (d->max_pts.ts == AV_NOPTS_VALUE || + av_compare_ts(d->max_pts.ts, d->max_pts.tb, end_pts.ts, end_pts.tb) < 0)) + d->max_pts = end_pts; if (d->max_pts.ts != AV_NOPTS_VALUE) { int64_t min_pts = d->min_pts.ts == AV_NOPTS_VALUE ? 0 : d->min_pts.ts; @@ -404,7 +396,7 @@ static int ts_fixup(Demuxer *d, AVPacket *pkt) duration = av_rescale_q(d->duration.ts, d->duration.tb, pkt->time_base); if (pkt->pts != AV_NOPTS_VALUE) { // audio decoders take precedence for estimating total file duration - int64_t pkt_duration = ifile->audio_ts_queue_size ? 0 : pkt->duration; + int64_t pkt_duration = d->have_audio_dec ? 0 : pkt->duration; pkt->pts += duration; @@ -440,7 +432,7 @@ static int ts_fixup(Demuxer *d, AVPacket *pkt) return 0; } -static int input_packet_process(Demuxer *d, AVPacket *pkt) +static int input_packet_process(Demuxer *d, AVPacket *pkt, unsigned *send_flags) { InputFile *f = &d->f; InputStream *ist = f->streams[pkt->stream_index]; @@ -451,6 +443,16 @@ static int input_packet_process(Demuxer *d, AVPacket *pkt) if (ret < 0) return ret; + if (f->recording_time != INT64_MAX) { + int64_t start_time = 0; + if (copy_ts) { + start_time += f->start_time != AV_NOPTS_VALUE ? f->start_time : 0; + start_time += start_at_zero ? 0 : f->start_time_effective; + } + if (ds->dts >= f->recording_time + start_time) + *send_flags |= DEMUX_SEND_STREAMCOPY_EOF; + } + ds->data_size += pkt->size; ds->nb_packets++; @@ -465,6 +467,8 @@ static int input_packet_process(Demuxer *d, AVPacket *pkt) av_ts2timestr(input_files[ist->file_index]->ts_offset, &AV_TIME_BASE_Q)); } + pkt->stream_index = ds->sch_idx_stream; + return 0; } @@ -488,6 +492,65 @@ static void readrate_sleep(Demuxer *d) } } +static int do_send(Demuxer *d, DemuxStream *ds, AVPacket *pkt, unsigned flags, + const char *pkt_desc) +{ + int ret; + + ret = sch_demux_send(d->sch, d->f.index, pkt, flags); + if (ret == AVERROR_EOF) { + av_packet_unref(pkt); + + av_log(ds, AV_LOG_VERBOSE, "All consumers of this stream are done\n"); + ds->finished = 1; + + if (++d->nb_streams_finished == d->nb_streams_used) { + av_log(d, AV_LOG_VERBOSE, "All consumers are done\n"); + return AVERROR_EOF; + } + } else if (ret < 0) { + if (ret != AVERROR_EXIT) + av_log(d, AV_LOG_ERROR, + "Unable to send %s packet to consumers: %s\n", + pkt_desc, av_err2str(ret)); + return ret; + } + + return 0; +} + +static int demux_send(Demuxer *d, DemuxStream *ds, AVPacket *pkt, unsigned flags) +{ + InputFile *f = &d->f; + int ret; + + // send heartbeat for sub2video streams + if (d->pkt_heartbeat && pkt->pts != AV_NOPTS_VALUE) { + for (int i = 0; i < f->nb_streams; i++) { + DemuxStream *ds1 = ds_from_ist(f->streams[i]); + + if (ds1->finished || !ds1->have_sub2video) + continue; + + d->pkt_heartbeat->pts = pkt->pts; + d->pkt_heartbeat->time_base = pkt->time_base; + d->pkt_heartbeat->stream_index = ds1->sch_idx_stream; + d->pkt_heartbeat->opaque = (void*)(intptr_t)PKT_OPAQUE_SUB_HEARTBEAT; + + ret = do_send(d, ds1, d->pkt_heartbeat, 0, "heartbeat"); + if (ret < 0) + return ret; + } + } + + ret = do_send(d, ds, pkt, flags, "demuxed"); + if (ret < 0) + return ret; + + + return 0; +} + static void discard_unused_programs(InputFile *ifile) { for (int j = 0; j < ifile->ctx->nb_programs; j++) { @@ -527,9 +590,13 @@ static void *input_thread(void *arg) discard_unused_programs(f); + d->read_started = 1; d->wallclock_start = av_gettime_relative(); while (1) { + DemuxStream *ds; + unsigned send_flags = 0; + ret = av_read_frame(f->ctx, pkt); if (ret == AVERROR(EAGAIN)) { @@ -538,11 +605,13 @@ static void *input_thread(void *arg) } if (ret < 0) { if (d->loop) { - /* signal looping to the consumer thread */ + /* signal looping to our consumers */ pkt->stream_index = -1; - ret = tq_send(d->thread_queue, 0, pkt); + + ret = sch_demux_send(d->sch, f->index, pkt, 0); if (ret >= 0) - ret = seek_to_start(d); + ret = seek_to_start(d, (Timestamp){ .ts = pkt->pts, + .tb = pkt->time_base }); if (ret >= 0) continue; @@ -551,9 +620,11 @@ static void *input_thread(void *arg) if (ret == AVERROR_EOF) av_log(d, AV_LOG_VERBOSE, "EOF while reading input\n"); - else + else { av_log(d, AV_LOG_ERROR, "Error during demuxing: %s\n", av_err2str(ret)); + ret = exit_on_error ? ret : 0; + } break; } @@ -565,8 +636,9 @@ static void *input_thread(void *arg) /* the following test is needed in case new streams appear dynamically in stream : we ignore them */ - if (pkt->stream_index >= f->nb_streams || - f->streams[pkt->stream_index]->discard) { + ds = pkt->stream_index < f->nb_streams ? + ds_from_ist(f->streams[pkt->stream_index]) : NULL; + if (!ds || ds->ist.discard || ds->finished) { report_new_stream(d, pkt); av_packet_unref(pkt); continue; @@ -583,122 +655,26 @@ static void *input_thread(void *arg) } } - ret = input_packet_process(d, pkt); + ret = input_packet_process(d, pkt, &send_flags); if (ret < 0) break; if (f->readrate) readrate_sleep(d); - ret = tq_send(d->thread_queue, 0, pkt); - if (ret < 0) { - if (ret != AVERROR_EOF) - av_log(f, AV_LOG_ERROR, - "Unable to send packet to main thread: %s\n", - av_err2str(ret)); + ret = demux_send(d, ds, pkt, send_flags); + if (ret < 0) break; - } } + // EOF/EXIT is normal termination + if (ret == AVERROR_EOF || ret == AVERROR_EXIT) + ret = 0; + finish: - av_assert0(ret < 0); - tq_send_finish(d->thread_queue, 0); - av_packet_free(&pkt); - av_log(d, AV_LOG_VERBOSE, "Terminating demuxer thread\n"); - - return NULL; -} - -static void thread_stop(Demuxer *d) -{ - InputFile *f = &d->f; - - if (!d->thread_queue) - return; - - tq_receive_finish(d->thread_queue, 0); - - pthread_join(d->thread, NULL); - - tq_free(&d->thread_queue); - - av_thread_message_queue_free(&f->audio_ts_queue); -} - -static int thread_start(Demuxer *d) -{ - int ret; - InputFile *f = &d->f; - ObjPool *op; - - if (d->thread_queue_size <= 0) - d->thread_queue_size = (nb_input_files > 1 ? 8 : 1); - - op = objpool_alloc_packets(); - if (!op) - return AVERROR(ENOMEM); - - d->thread_queue = tq_alloc(1, d->thread_queue_size, op, pkt_move); - if (!d->thread_queue) { - objpool_free(&op); - return AVERROR(ENOMEM); - } - - if (d->loop) { - int nb_audio_dec = 0; - - for (int i = 0; i < f->nb_streams; i++) { - InputStream *ist = f->streams[i]; - nb_audio_dec += !!(ist->decoding_needed && - ist->st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO); - } - - if (nb_audio_dec) { - ret = av_thread_message_queue_alloc(&f->audio_ts_queue, - nb_audio_dec, sizeof(Timestamp)); - if (ret < 0) - goto fail; - f->audio_ts_queue_size = nb_audio_dec; - } - } - - if ((ret = pthread_create(&d->thread, NULL, input_thread, d))) { - av_log(d, AV_LOG_ERROR, "pthread_create failed: %s. Try to increase `ulimit -v` or decrease `ulimit -s`.\n", strerror(ret)); - ret = AVERROR(ret); - goto fail; - } - - d->read_started = 1; - - return 0; -fail: - tq_free(&d->thread_queue); - return ret; -} - -int ifile_get_packet(InputFile *f, AVPacket *pkt) -{ - Demuxer *d = demuxer_from_ifile(f); - int ret, dummy; - - if (!d->thread_queue) { - ret = thread_start(d); - if (ret < 0) - return ret; - } - - ret = tq_receive(d->thread_queue, &dummy, pkt); - if (ret < 0) - return ret; - - if (pkt->stream_index == -1) { - av_assert0(!pkt->data && !pkt->side_data_elems); - return 1; - } - - return 0; + return (void*)(intptr_t)ret; } static void demux_final_stats(Demuxer *d) @@ -769,8 +745,6 @@ void ifile_close(InputFile **pf) if (!f) return; - thread_stop(d); - if (d->read_started) demux_final_stats(d); @@ -780,6 +754,8 @@ void ifile_close(InputFile **pf) avformat_close_input(&f->ctx); + av_packet_free(&d->pkt_heartbeat); + av_freep(pf); } @@ -802,7 +778,11 @@ static int ist_use(InputStream *ist, int decoding_needed) ds->sch_idx_stream = ret; } - ist->discard = 0; + if (ist->discard) { + ist->discard = 0; + d->nb_streams_used++; + } + ist->st->discard = ist->user_set_discard; ist->decoding_needed |= decoding_needed; ds->streamcopy_needed |= !decoding_needed; @@ -823,6 +803,8 @@ static int ist_use(InputStream *ist, int decoding_needed) ret = dec_open(ist, d->sch, ds->sch_idx_dec); if (ret < 0) return ret; + + d->have_audio_dec |= is_audio; } return 0; @@ -848,6 +830,7 @@ int ist_output_add(InputStream *ist, OutputStream *ost) int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple) { + Demuxer *d = demuxer_from_ifile(input_files[ist->file_index]); DemuxStream *ds = ds_from_ist(ist); int ret; @@ -866,6 +849,15 @@ int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple) if (ret < 0) return ret; + if (ist->dec_ctx->codec_type == AVMEDIA_TYPE_SUBTITLE) { + if (!d->pkt_heartbeat) { + d->pkt_heartbeat = av_packet_alloc(); + if (!d->pkt_heartbeat) + return AVERROR(ENOMEM); + } + ds->have_sub2video = 1; + } + return ds->sch_idx_dec; } @@ -1607,8 +1599,6 @@ int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch) "since neither -readrate nor -re were given\n"); } - d->thread_queue_size = o->thread_queue_size; - /* Add all the streams from the given input file to the demuxer */ for (int i = 0; i < ic->nb_streams; i++) { ret = ist_add(o, d, ic->streams[i]); diff --git a/fftools/ffmpeg_enc.c b/fftools/ffmpeg_enc.c index 9871381c0e..9383b167f7 100644 --- a/fftools/ffmpeg_enc.c +++ b/fftools/ffmpeg_enc.c @@ -41,12 +41,6 @@ #include "libavformat/avformat.h" struct Encoder { - AVFrame *sq_frame; - - // packet for receiving encoded output - AVPacket *pkt; - AVFrame *sub_frame; - // combined size of all the packets received from the encoder uint64_t data_size; @@ -54,25 +48,9 @@ struct Encoder { uint64_t packets_encoded; int opened; - int finished; Scheduler *sch; unsigned sch_idx; - - pthread_t thread; - /** - * Queue for sending frames from the main thread to - * the encoder thread. - */ - ThreadQueue *queue_in; - /** - * Queue for sending encoded packets from the encoder thread - * to the main thread. - * - * An empty packet is sent to signal that a previously sent - * frame has been fully processed. - */ - ThreadQueue *queue_out; }; // data that is local to the decoder thread and not visible outside of it @@ -81,24 +59,6 @@ typedef struct EncoderThread { AVPacket *pkt; } EncoderThread; -static int enc_thread_stop(Encoder *e) -{ - void *ret; - - if (!e->queue_in) - return 0; - - tq_send_finish(e->queue_in, 0); - tq_receive_finish(e->queue_out, 0); - - pthread_join(e->thread, &ret); - - tq_free(&e->queue_in); - tq_free(&e->queue_out); - - return (int)(intptr_t)ret; -} - void enc_free(Encoder **penc) { Encoder *enc = *penc; @@ -106,13 +66,6 @@ void enc_free(Encoder **penc) if (!enc) return; - enc_thread_stop(enc); - - av_frame_free(&enc->sq_frame); - av_frame_free(&enc->sub_frame); - - av_packet_free(&enc->pkt); - av_freep(penc); } @@ -127,25 +80,12 @@ int enc_alloc(Encoder **penc, const AVCodec *codec, if (!enc) return AVERROR(ENOMEM); - if (codec->type == AVMEDIA_TYPE_SUBTITLE) { - enc->sub_frame = av_frame_alloc(); - if (!enc->sub_frame) - goto fail; - } - - enc->pkt = av_packet_alloc(); - if (!enc->pkt) - goto fail; - enc->sch = sch; enc->sch_idx = sch_idx; *penc = enc; return 0; -fail: - enc_free(&enc); - return AVERROR(ENOMEM); } static int hw_device_setup_for_encode(OutputStream *ost, AVBufferRef *frames_ref) @@ -224,52 +164,9 @@ static int set_encoder_id(OutputFile *of, OutputStream *ost) return 0; } -static int enc_thread_start(OutputStream *ost) -{ - Encoder *e = ost->enc; - ObjPool *op; - int ret = 0; - - op = objpool_alloc_frames(); - if (!op) - return AVERROR(ENOMEM); - - e->queue_in = tq_alloc(1, 1, op, frame_move); - if (!e->queue_in) { - objpool_free(&op); - return AVERROR(ENOMEM); - } - - op = objpool_alloc_packets(); - if (!op) - goto fail; - - e->queue_out = tq_alloc(1, 4, op, pkt_move); - if (!e->queue_out) { - objpool_free(&op); - goto fail; - } - - ret = pthread_create(&e->thread, NULL, encoder_thread, ost); - if (ret) { - ret = AVERROR(ret); - av_log(ost, AV_LOG_ERROR, "pthread_create() failed: %s\n", - av_err2str(ret)); - goto fail; - } - - return 0; -fail: - if (ret >= 0) - ret = AVERROR(ENOMEM); - - tq_free(&e->queue_in); - tq_free(&e->queue_out); - return ret; -} - -int enc_open(OutputStream *ost, const AVFrame *frame) +int enc_open(void *opaque, const AVFrame *frame) { + OutputStream *ost = opaque; InputStream *ist = ost->ist; Encoder *e = ost->enc; AVCodecContext *enc_ctx = ost->enc_ctx; @@ -277,6 +174,7 @@ int enc_open(OutputStream *ost, const AVFrame *frame) const AVCodec *enc = enc_ctx->codec; OutputFile *of = output_files[ost->file_index]; FrameData *fd; + int frame_samples = 0; int ret; if (e->opened) @@ -420,17 +318,8 @@ int enc_open(OutputStream *ost, const AVFrame *frame) e->opened = 1; - if (ost->sq_idx_encode >= 0) { - e->sq_frame = av_frame_alloc(); - if (!e->sq_frame) - return AVERROR(ENOMEM); - } - - if (ost->enc_ctx->frame_size) { - av_assert0(ost->sq_idx_encode >= 0); - sq_frame_samples(output_files[ost->file_index]->sq_encode, - ost->sq_idx_encode, ost->enc_ctx->frame_size); - } + if (ost->enc_ctx->frame_size) + frame_samples = ost->enc_ctx->frame_size; ret = check_avoptions(ost->encoder_opts); if (ret < 0) @@ -476,18 +365,11 @@ int enc_open(OutputStream *ost, const AVFrame *frame) if (ost->st->time_base.num <= 0 || ost->st->time_base.den <= 0) ost->st->time_base = av_add_q(ost->enc_ctx->time_base, (AVRational){0, 1}); - ret = enc_thread_start(ost); - if (ret < 0) { - av_log(ost, AV_LOG_ERROR, "Error starting encoder thread: %s\n", - av_err2str(ret)); - return ret; - } - ret = of_stream_init(of, ost); if (ret < 0) return ret; - return 0; + return frame_samples; } static int check_recording_time(OutputStream *ost, int64_t ts, AVRational tb) @@ -514,8 +396,7 @@ static int do_subtitle_out(OutputFile *of, OutputStream *ost, const AVSubtitle * av_log(ost, AV_LOG_ERROR, "Subtitle packets must have a pts\n"); return exit_on_error ? AVERROR(EINVAL) : 0; } - if (ost->finished || - (of->start_time != AV_NOPTS_VALUE && sub->pts < of->start_time)) + if ((of->start_time != AV_NOPTS_VALUE && sub->pts < of->start_time)) return 0; enc = ost->enc_ctx; @@ -579,7 +460,7 @@ static int do_subtitle_out(OutputFile *of, OutputStream *ost, const AVSubtitle * } pkt->dts = pkt->pts; - ret = tq_send(e->queue_out, 0, pkt); + ret = sch_enc_send(e->sch, e->sch_idx, pkt); if (ret < 0) { av_packet_unref(pkt); return ret; @@ -671,10 +552,13 @@ static int update_video_stats(OutputStream *ost, const AVPacket *pkt, int write_ int64_t frame_number; double ti1, bitrate, avg_bitrate; double psnr_val = -1; + int quality; - ost->quality = sd ? AV_RL32(sd) : -1; + quality = sd ? AV_RL32(sd) : -1; pict_type = sd ? sd[4] : AV_PICTURE_TYPE_NONE; + atomic_store(&ost->quality, quality); + if ((enc->flags & AV_CODEC_FLAG_PSNR) && sd && sd[5]) { // FIXME the scaling assumes 8bit double error = AV_RL64(sd + 8) / (enc->width * enc->height * 255.0 * 255.0); @@ -697,10 +581,10 @@ static int update_video_stats(OutputStream *ost, const AVPacket *pkt, int write_ frame_number = e->packets_encoded; if (vstats_version <= 1) { fprintf(vstats_file, "frame= %5"PRId64" q= %2.1f ", frame_number, - ost->quality / (float)FF_QP2LAMBDA); + quality / (float)FF_QP2LAMBDA); } else { fprintf(vstats_file, "out= %2d st= %2d frame= %5"PRId64" q= %2.1f ", ost->file_index, ost->index, frame_number, - ost->quality / (float)FF_QP2LAMBDA); + quality / (float)FF_QP2LAMBDA); } if (psnr_val >= 0) @@ -801,18 +685,11 @@ static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame, av_ts2str(pkt->duration), av_ts2timestr(pkt->duration, &enc->time_base)); } - if ((ret = trigger_fix_sub_duration_heartbeat(ost, pkt)) < 0) { - av_log(NULL, AV_LOG_ERROR, - "Subtitle heartbeat logic failed in %s! (%s)\n", - __func__, av_err2str(ret)); - return ret; - } - e->data_size += pkt->size; e->packets_encoded++; - ret = tq_send(e->queue_out, 0, pkt); + ret = sch_enc_send(e->sch, e->sch_idx, pkt); if (ret < 0) { av_packet_unref(pkt); return ret; @@ -822,50 +699,6 @@ static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame, av_assert0(0); } -static int submit_encode_frame(OutputFile *of, OutputStream *ost, - AVFrame *frame, AVPacket *pkt) -{ - Encoder *e = ost->enc; - int ret; - - if (ost->sq_idx_encode < 0) - return encode_frame(of, ost, frame, pkt); - - if (frame) { - ret = av_frame_ref(e->sq_frame, frame); - if (ret < 0) - return ret; - frame = e->sq_frame; - } - - ret = sq_send(of->sq_encode, ost->sq_idx_encode, - SQFRAME(frame)); - if (ret < 0) { - if (frame) - av_frame_unref(frame); - if (ret != AVERROR_EOF) - return ret; - } - - while (1) { - AVFrame *enc_frame = e->sq_frame; - - ret = sq_receive(of->sq_encode, ost->sq_idx_encode, - SQFRAME(enc_frame)); - if (ret == AVERROR_EOF) { - enc_frame = NULL; - } else if (ret < 0) { - return (ret == AVERROR(EAGAIN)) ? 0 : ret; - } - - ret = encode_frame(of, ost, enc_frame, pkt); - if (enc_frame) - av_frame_unref(enc_frame); - if (ret < 0) - return ret; - } -} - static int do_audio_out(OutputFile *of, OutputStream *ost, AVFrame *frame, AVPacket *pkt) { @@ -881,7 +714,7 @@ static int do_audio_out(OutputFile *of, OutputStream *ost, if (!check_recording_time(ost, frame->pts, frame->time_base)) return AVERROR_EOF; - return submit_encode_frame(of, ost, frame, pkt); + return encode_frame(of, ost, frame, pkt); } static enum AVPictureType forced_kf_apply(void *logctx, KeyframeForceCtx *kf, @@ -949,7 +782,7 @@ static int do_video_out(OutputFile *of, OutputStream *ost, } #endif - return submit_encode_frame(of, ost, in_picture, pkt); + return encode_frame(of, ost, in_picture, pkt); } static int frame_encode(OutputStream *ost, AVFrame *frame, AVPacket *pkt) @@ -958,9 +791,12 @@ static int frame_encode(OutputStream *ost, AVFrame *frame, AVPacket *pkt) enum AVMediaType type = ost->type; if (type == AVMEDIA_TYPE_SUBTITLE) { + const AVSubtitle *subtitle = frame && frame->buf[0] ? + (AVSubtitle*)frame->buf[0]->data : NULL; + // no flushing for subtitles - return frame ? - do_subtitle_out(of, ost, (AVSubtitle*)frame->buf[0]->data, pkt) : 0; + return subtitle && subtitle->num_rects ? + do_subtitle_out(of, ost, subtitle, pkt) : 0; } if (frame) { @@ -968,7 +804,7 @@ static int frame_encode(OutputStream *ost, AVFrame *frame, AVPacket *pkt) do_audio_out(of, ost, frame, pkt); } - return submit_encode_frame(of, ost, NULL, pkt); + return encode_frame(of, ost, NULL, pkt); } static void enc_thread_set_name(const OutputStream *ost) @@ -1009,24 +845,50 @@ fail: void *encoder_thread(void *arg) { OutputStream *ost = arg; - OutputFile *of = output_files[ost->file_index]; Encoder *e = ost->enc; EncoderThread et; int ret = 0, input_status = 0; + int name_set = 0; ret = enc_thread_init(&et); if (ret < 0) goto finish; - enc_thread_set_name(ost); + /* Open the subtitle encoders immediately. AVFrame-based encoders + * are opened through a callback from the scheduler once they get + * their first frame + * + * N.B.: because the callback is called from a different thread, + * enc_ctx MUST NOT be accessed before sch_enc_receive() returns + * for the first time for audio/video. */ + if (ost->type != AVMEDIA_TYPE_VIDEO && ost->type != AVMEDIA_TYPE_AUDIO) { + ret = enc_open(ost, NULL); + if (ret < 0) + goto finish; + } while (!input_status) { - int dummy; - - input_status = tq_receive(e->queue_in, &dummy, et.frame); - if (input_status < 0) + input_status = sch_enc_receive(e->sch, e->sch_idx, et.frame); + if (input_status == AVERROR_EOF) { av_log(ost, AV_LOG_VERBOSE, "Encoder thread received EOF\n"); + if (!e->opened) { + av_log(ost, AV_LOG_ERROR, "Could not open encoder before EOF\n"); + ret = AVERROR(EINVAL); + goto finish; + } + } else if (input_status < 0) { + ret = input_status; + av_log(ost, AV_LOG_ERROR, "Error receiving a frame for encoding: %s\n", + av_err2str(ret)); + goto finish; + } + + if (!name_set) { + enc_thread_set_name(ost); + name_set = 1; + } + ret = frame_encode(ost, input_status >= 0 ? et.frame : NULL, et.pkt); av_packet_unref(et.pkt); @@ -1040,15 +902,6 @@ void *encoder_thread(void *arg) av_err2str(ret)); break; } - - // signal to the consumer thread that the frame was encoded - ret = tq_send(e->queue_out, 0, et.pkt); - if (ret < 0) { - if (ret != AVERROR_EOF) - av_log(ost, AV_LOG_ERROR, - "Error communicating with the main thread\n"); - break; - } } // EOF is normal thread termination @@ -1056,118 +909,7 @@ void *encoder_thread(void *arg) ret = 0; finish: - if (ost->sq_idx_encode >= 0) - sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL)); - - tq_receive_finish(e->queue_in, 0); - tq_send_finish (e->queue_out, 0); - enc_thread_uninit(&et); - av_log(ost, AV_LOG_VERBOSE, "Terminating encoder thread\n"); - return (void*)(intptr_t)ret; } - -int enc_frame(OutputStream *ost, AVFrame *frame) -{ - OutputFile *of = output_files[ost->file_index]; - Encoder *e = ost->enc; - int ret, thread_ret; - - ret = enc_open(ost, frame); - if (ret < 0) - return ret; - - if (!e->queue_in) - return AVERROR_EOF; - - // send the frame/EOF to the encoder thread - if (frame) { - ret = tq_send(e->queue_in, 0, frame); - if (ret < 0) - goto finish; - } else - tq_send_finish(e->queue_in, 0); - - // retrieve all encoded data for the frame - while (1) { - int dummy; - - ret = tq_receive(e->queue_out, &dummy, e->pkt); - if (ret < 0) - break; - - // frame fully encoded - if (!e->pkt->data && !e->pkt->side_data_elems) - return 0; - - // process the encoded packet - ret = of_output_packet(of, ost, e->pkt); - if (ret < 0) - goto finish; - } - -finish: - thread_ret = enc_thread_stop(e); - if (thread_ret < 0) { - av_log(ost, AV_LOG_ERROR, "Encoder thread returned error: %s\n", - av_err2str(thread_ret)); - ret = err_merge(ret, thread_ret); - } - - if (ret < 0 && ret != AVERROR_EOF) - return ret; - - // signal EOF to the muxer - return of_output_packet(of, ost, NULL); -} - -int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub) -{ - Encoder *e = ost->enc; - AVFrame *f = e->sub_frame; - int ret; - - // XXX the queue for transferring data to the encoder thread runs - // on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put - // that inside the frame - // eventually, subtitles should be switched to use AVFrames natively - ret = subtitle_wrap_frame(f, sub, 1); - if (ret < 0) - return ret; - - ret = enc_frame(ost, f); - av_frame_unref(f); - - return ret; -} - -int enc_flush(void) -{ - int ret = 0; - - for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) { - OutputFile *of = output_files[ost->file_index]; - if (ost->sq_idx_encode >= 0) - sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL)); - } - - for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) { - Encoder *e = ost->enc; - AVCodecContext *enc = ost->enc_ctx; - int err; - - if (!enc || !e->opened || - (enc->codec_type != AVMEDIA_TYPE_VIDEO && enc->codec_type != AVMEDIA_TYPE_AUDIO)) - continue; - - err = enc_frame(ost, NULL); - if (err != AVERROR_EOF && ret < 0) - ret = err_merge(ret, err); - - av_assert0(!e->queue_in); - } - - return ret; -} diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c index 635b1b0b6e..ada235b084 100644 --- a/fftools/ffmpeg_filter.c +++ b/fftools/ffmpeg_filter.c @@ -21,8 +21,6 @@ #include #include "ffmpeg.h" -#include "ffmpeg_utils.h" -#include "thread_queue.h" #include "libavfilter/avfilter.h" #include "libavfilter/buffersink.h" @@ -53,10 +51,11 @@ typedef struct FilterGraphPriv { // true when the filtergraph contains only meta filters // that do not modify the frame data int is_meta; + // source filters are present in the graph + int have_sources; int disable_conversions; - int nb_inputs_bound; - int nb_outputs_bound; + unsigned nb_outputs_done; const char *graph_desc; @@ -67,41 +66,6 @@ typedef struct FilterGraphPriv { Scheduler *sch; unsigned sch_idx; - - pthread_t thread; - /** - * Queue for sending frames from the main thread to the filtergraph. Has - * nb_inputs+1 streams - the first nb_inputs stream correspond to - * filtergraph inputs. Frames on those streams may have their opaque set to - * - FRAME_OPAQUE_EOF: frame contains no data, but pts+timebase of the - * EOF event for the correspondint stream. Will be immediately followed by - * this stream being send-closed. - * - FRAME_OPAQUE_SUB_HEARTBEAT: frame contains no data, but pts+timebase of - * a subtitle heartbeat event. Will only be sent for sub2video streams. - * - * The last stream is "control" - the main thread sends empty AVFrames with - * opaque set to - * - FRAME_OPAQUE_REAP_FILTERS: a request to retrieve all frame available - * from filtergraph outputs. These frames are sent to corresponding - * streams in queue_out. Finally an empty frame is sent to the control - * stream in queue_out. - * - FRAME_OPAQUE_CHOOSE_INPUT: same as above, but in case no frames are - * available the terminating empty frame's opaque will contain the index+1 - * of the filtergraph input to which more input frames should be supplied. - */ - ThreadQueue *queue_in; - /** - * Queue for sending frames from the filtergraph back to the main thread. - * Has nb_outputs+1 streams - the first nb_outputs stream correspond to - * filtergraph outputs. - * - * The last stream is "control" - see documentation for queue_in for more - * details. - */ - ThreadQueue *queue_out; - // submitting frames to filter thread returned EOF - // this only happens on thread exit, so is not per-input - int eof_in; } FilterGraphPriv; static FilterGraphPriv *fgp_from_fg(FilterGraph *fg) @@ -123,6 +87,9 @@ typedef struct FilterGraphThread { // The output index is stored in frame opaque. AVFifo *frame_queue_out; + // index of the next input to request from the scheduler + unsigned next_in; + // set to 1 after at least one frame passed through this output int got_frame; // EOF status of each input/output, as received by the thread @@ -253,9 +220,6 @@ typedef struct OutputFilterPriv { int64_t ts_offset; int64_t next_pts; FPSConvContext fps; - - // set to 1 after at least one frame passed through this output - int got_frame; } OutputFilterPriv; static OutputFilterPriv *ofp_from_ofilter(OutputFilter *ofilter) @@ -653,57 +617,6 @@ static int ifilter_has_all_input_formats(FilterGraph *fg) static void *filter_thread(void *arg); -// start the filtering thread once all inputs and outputs are bound -static int fg_thread_try_start(FilterGraphPriv *fgp) -{ - FilterGraph *fg = &fgp->fg; - ObjPool *op; - int ret = 0; - - if (fgp->nb_inputs_bound < fg->nb_inputs || - fgp->nb_outputs_bound < fg->nb_outputs) - return 0; - - op = objpool_alloc_frames(); - if (!op) - return AVERROR(ENOMEM); - - fgp->queue_in = tq_alloc(fg->nb_inputs + 1, 1, op, frame_move); - if (!fgp->queue_in) { - objpool_free(&op); - return AVERROR(ENOMEM); - } - - // at least one output is mandatory - op = objpool_alloc_frames(); - if (!op) - goto fail; - - fgp->queue_out = tq_alloc(fg->nb_outputs + 1, 1, op, frame_move); - if (!fgp->queue_out) { - objpool_free(&op); - goto fail; - } - - ret = pthread_create(&fgp->thread, NULL, filter_thread, fgp); - if (ret) { - ret = AVERROR(ret); - av_log(NULL, AV_LOG_ERROR, "pthread_create() for filtergraph %d failed: %s\n", - fg->index, av_err2str(ret)); - goto fail; - } - - return 0; -fail: - if (ret >= 0) - ret = AVERROR(ENOMEM); - - tq_free(&fgp->queue_in); - tq_free(&fgp->queue_out); - - return ret; -} - static char *describe_filter_link(FilterGraph *fg, AVFilterInOut *inout, int in) { AVFilterContext *ctx = inout->filter_ctx; @@ -729,7 +642,6 @@ static OutputFilter *ofilter_alloc(FilterGraph *fg) ofilter->graph = fg; ofp->format = -1; ofp->index = fg->nb_outputs - 1; - ofilter->last_pts = AV_NOPTS_VALUE; return ofilter; } @@ -760,10 +672,7 @@ static int ifilter_bind_ist(InputFilter *ifilter, InputStream *ist) return AVERROR(ENOMEM); } - fgp->nb_inputs_bound++; - av_assert0(fgp->nb_inputs_bound <= ifilter->graph->nb_inputs); - - return fg_thread_try_start(fgp); + return 0; } static int set_channel_layout(OutputFilterPriv *f, OutputStream *ost) @@ -902,10 +811,7 @@ int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost, if (ret < 0) return ret; - fgp->nb_outputs_bound++; - av_assert0(fgp->nb_outputs_bound <= fg->nb_outputs); - - return fg_thread_try_start(fgp); + return 0; } static InputFilter *ifilter_alloc(FilterGraph *fg) @@ -935,34 +841,6 @@ static InputFilter *ifilter_alloc(FilterGraph *fg) return ifilter; } -static int fg_thread_stop(FilterGraphPriv *fgp) -{ - void *ret; - - if (!fgp->queue_in) - return 0; - - for (int i = 0; i <= fgp->fg.nb_inputs; i++) { - InputFilterPriv *ifp = i < fgp->fg.nb_inputs ? - ifp_from_ifilter(fgp->fg.inputs[i]) : NULL; - - if (ifp) - ifp->eof = 1; - - tq_send_finish(fgp->queue_in, i); - } - - for (int i = 0; i <= fgp->fg.nb_outputs; i++) - tq_receive_finish(fgp->queue_out, i); - - pthread_join(fgp->thread, &ret); - - tq_free(&fgp->queue_in); - tq_free(&fgp->queue_out); - - return (int)(intptr_t)ret; -} - void fg_free(FilterGraph **pfg) { FilterGraph *fg = *pfg; @@ -972,8 +850,6 @@ void fg_free(FilterGraph **pfg) return; fgp = fgp_from_fg(fg); - fg_thread_stop(fgp); - avfilter_graph_free(&fg->graph); for (int j = 0; j < fg->nb_inputs; j++) { InputFilter *ifilter = fg->inputs[j]; @@ -1072,6 +948,15 @@ int fg_create(FilterGraph **pfg, char *graph_desc, Scheduler *sch) if (ret < 0) goto fail; + for (unsigned i = 0; i < graph->nb_filters; i++) { + const AVFilter *f = graph->filters[i]->filter; + if (!avfilter_filter_pad_count(f, 0) && + !(f->flags & AVFILTER_FLAG_DYNAMIC_INPUTS)) { + fgp->have_sources = 1; + break; + } + } + for (AVFilterInOut *cur = inputs; cur; cur = cur->next) { InputFilter *const ifilter = ifilter_alloc(fg); InputFilterPriv *ifp; @@ -1800,6 +1685,7 @@ static int configure_filtergraph(FilterGraph *fg, const FilterGraphThread *fgt) AVBufferRef *hw_device; AVFilterInOut *inputs, *outputs, *cur; int ret, i, simple = filtergraph_is_simple(fg); + int have_input_eof = 0; const char *graph_desc = fgp->graph_desc; cleanup_filtergraph(fg); @@ -1922,11 +1808,18 @@ static int configure_filtergraph(FilterGraph *fg, const FilterGraphThread *fgt) ret = av_buffersrc_add_frame(ifp->filter, NULL); if (ret < 0) goto fail; + have_input_eof = 1; } } - return 0; + if (have_input_eof) { + // make sure the EOF propagates to the end of the graph + ret = avfilter_graph_request_oldest(fg->graph); + if (ret < 0 && ret != AVERROR(EAGAIN) && ret != AVERROR_EOF) + goto fail; + } + return 0; fail: cleanup_filtergraph(fg); return ret; @@ -2182,7 +2075,7 @@ static void video_sync_process(OutputFilterPriv *ofp, AVFrame *frame, fps->frames_prev_hist[2]); if (!*nb_frames && fps->last_dropped) { - ofilter->nb_frames_drop++; + atomic_fetch_add(&ofilter->nb_frames_drop, 1); fps->last_dropped++; } @@ -2260,21 +2153,23 @@ finish: fps->frames_prev_hist[0] = *nb_frames_prev; if (*nb_frames_prev == 0 && fps->last_dropped) { - ofilter->nb_frames_drop++; + atomic_fetch_add(&ofilter->nb_frames_drop, 1); av_log(ost, AV_LOG_VERBOSE, "*** dropping frame %"PRId64" at ts %"PRId64"\n", fps->frame_number, fps->last_frame->pts); } if (*nb_frames > (*nb_frames_prev && fps->last_dropped) + (*nb_frames > *nb_frames_prev)) { + uint64_t nb_frames_dup; if (*nb_frames > dts_error_threshold * 30) { av_log(ost, AV_LOG_ERROR, "%"PRId64" frame duplication too large, skipping\n", *nb_frames - 1); - ofilter->nb_frames_drop++; + atomic_fetch_add(&ofilter->nb_frames_drop, 1); *nb_frames = 0; return; } - ofilter->nb_frames_dup += *nb_frames - (*nb_frames_prev && fps->last_dropped) - (*nb_frames > *nb_frames_prev); + nb_frames_dup = atomic_fetch_add(&ofilter->nb_frames_dup, + *nb_frames - (*nb_frames_prev && fps->last_dropped) - (*nb_frames > *nb_frames_prev)); av_log(ost, AV_LOG_VERBOSE, "*** %"PRId64" dup!\n", *nb_frames - 1); - if (ofilter->nb_frames_dup > fps->dup_warning) { + if (nb_frames_dup > fps->dup_warning) { av_log(ost, AV_LOG_WARNING, "More than %"PRIu64" frames duplicated\n", fps->dup_warning); fps->dup_warning *= 10; } @@ -2284,8 +2179,57 @@ finish: fps->dropped_keyframe |= fps->last_dropped && (frame->flags & AV_FRAME_FLAG_KEY); } +static int close_output(OutputFilterPriv *ofp, FilterGraphThread *fgt) +{ + FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph); + int ret; + + // we are finished and no frames were ever seen at this output, + // at least initialize the encoder with a dummy frame + if (!fgt->got_frame) { + AVFrame *frame = fgt->frame; + FrameData *fd; + + frame->time_base = ofp->tb_out; + frame->format = ofp->format; + + frame->width = ofp->width; + frame->height = ofp->height; + frame->sample_aspect_ratio = ofp->sample_aspect_ratio; + + frame->sample_rate = ofp->sample_rate; + if (ofp->ch_layout.nb_channels) { + ret = av_channel_layout_copy(&frame->ch_layout, &ofp->ch_layout); + if (ret < 0) + return ret; + } + + fd = frame_data(frame); + if (!fd) + return AVERROR(ENOMEM); + + fd->frame_rate_filter = ofp->fps.framerate; + + av_assert0(!frame->buf[0]); + + av_log(ofp->ofilter.ost, AV_LOG_WARNING, + "No filtered frames for output stream, trying to " + "initialize anyway.\n"); + + ret = sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, frame); + if (ret < 0) { + av_frame_unref(frame); + return ret; + } + } + + fgt->eof_out[ofp->index] = 1; + + return sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, NULL); +} + static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt, - AVFrame *frame, int buffer) + AVFrame *frame) { FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph); AVFrame *frame_prev = ofp->fps.last_frame; @@ -2332,28 +2276,17 @@ static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt, frame_out = frame; } - if (buffer) { - AVFrame *f = av_frame_alloc(); - - if (!f) { - av_frame_unref(frame_out); - return AVERROR(ENOMEM); - } - - av_frame_move_ref(f, frame_out); - f->opaque = (void*)(intptr_t)ofp->index; - - ret = av_fifo_write(fgt->frame_queue_out, &f, 1); - if (ret < 0) { - av_frame_free(&f); - return AVERROR(ENOMEM); - } - } else { - // return the frame to the main thread - ret = tq_send(fgp->queue_out, ofp->index, frame_out); + { + // send the frame to consumers + ret = sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, frame_out); if (ret < 0) { av_frame_unref(frame_out); - fgt->eof_out[ofp->index] = 1; + + if (!fgt->eof_out[ofp->index]) { + fgt->eof_out[ofp->index] = 1; + fgp->nb_outputs_done++; + } + return ret == AVERROR_EOF ? 0 : ret; } } @@ -2374,16 +2307,14 @@ static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt, av_frame_move_ref(frame_prev, frame); } - if (!frame) { - tq_send_finish(fgp->queue_out, ofp->index); - fgt->eof_out[ofp->index] = 1; - } + if (!frame) + return close_output(ofp, fgt); return 0; } static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt, - AVFrame *frame, int buffer) + AVFrame *frame) { FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph); OutputStream *ost = ofp->ofilter.ost; @@ -2393,8 +2324,8 @@ static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt, ret = av_buffersink_get_frame_flags(filter, frame, AV_BUFFERSINK_FLAG_NO_REQUEST); - if (ret == AVERROR_EOF && !buffer && !fgt->eof_out[ofp->index]) { - ret = fg_output_frame(ofp, fgt, NULL, buffer); + if (ret == AVERROR_EOF && !fgt->eof_out[ofp->index]) { + ret = fg_output_frame(ofp, fgt, NULL); return (ret < 0) ? ret : 1; } else if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) { return 1; @@ -2448,7 +2379,7 @@ static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt, fd->frame_rate_filter = ofp->fps.framerate; } - ret = fg_output_frame(ofp, fgt, frame, buffer); + ret = fg_output_frame(ofp, fgt, frame); av_frame_unref(frame); if (ret < 0) return ret; @@ -2456,44 +2387,68 @@ static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt, return 0; } -/* retrieve all frames available at filtergraph outputs and either send them to - * the main thread (buffer=0) or buffer them for later (buffer=1) */ +/* retrieve all frames available at filtergraph outputs + * and send them to consumers */ static int read_frames(FilterGraph *fg, FilterGraphThread *fgt, - AVFrame *frame, int buffer) + AVFrame *frame) { FilterGraphPriv *fgp = fgp_from_fg(fg); - int ret = 0; + int did_step = 0; - if (!fg->graph) - return 0; - - // process buffered frames - if (!buffer) { - AVFrame *f; - - while (av_fifo_read(fgt->frame_queue_out, &f, 1) >= 0) { - int out_idx = (intptr_t)f->opaque; - f->opaque = NULL; - ret = tq_send(fgp->queue_out, out_idx, f); - av_frame_free(&f); - if (ret < 0 && ret != AVERROR_EOF) - return ret; + // graph not configured, just select the input to request + if (!fg->graph) { + for (int i = 0; i < fg->nb_inputs; i++) { + InputFilterPriv *ifp = ifp_from_ifilter(fg->inputs[i]); + if (ifp->format < 0 && !fgt->eof_in[i]) { + fgt->next_in = i; + return 0; + } } + + // This state - graph is not configured, but all inputs are either + // initialized or EOF - should be unreachable because sending EOF to a + // filter without even a fallback format should fail + av_assert0(0); + return AVERROR_BUG; } - /* Reap all buffers present in the buffer sinks */ - for (int i = 0; i < fg->nb_outputs; i++) { - OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]); - int ret = 0; + while (fgp->nb_outputs_done < fg->nb_outputs) { + int ret; - while (!ret) { - ret = fg_output_step(ofp, fgt, frame, buffer); - if (ret < 0) - return ret; + ret = avfilter_graph_request_oldest(fg->graph); + if (ret == AVERROR(EAGAIN)) { + fgt->next_in = choose_input(fg, fgt); + break; + } else if (ret < 0) { + if (ret == AVERROR_EOF) + av_log(fg, AV_LOG_VERBOSE, "Filtergraph returned EOF, finishing\n"); + else + av_log(fg, AV_LOG_ERROR, + "Error requesting a frame from the filtergraph: %s\n", + av_err2str(ret)); + return ret; } - } + fgt->next_in = fg->nb_inputs; - return 0; + // return after one iteration, so that scheduler can rate-control us + if (did_step && fgp->have_sources) + return 0; + + /* Reap all buffers present in the buffer sinks */ + for (int i = 0; i < fg->nb_outputs; i++) { + OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]); + + ret = 0; + while (!ret) { + ret = fg_output_step(ofp, fgt, frame); + if (ret < 0) + return ret; + } + } + did_step = 1; + }; + + return (fgp->nb_outputs_done == fg->nb_outputs) ? AVERROR_EOF : 0; } static void sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb) @@ -2571,6 +2526,9 @@ static int send_eof(FilterGraphThread *fgt, InputFilter *ifilter, InputFilterPriv *ifp = ifp_from_ifilter(ifilter); int ret; + if (fgt->eof_in[ifp->index]) + return 0; + fgt->eof_in[ifp->index] = 1; if (ifp->filter) { @@ -2672,7 +2630,7 @@ static int send_frame(FilterGraph *fg, FilterGraphThread *fgt, return ret; } - ret = fg->graph ? read_frames(fg, fgt, tmp, 1) : 0; + ret = fg->graph ? read_frames(fg, fgt, tmp) : 0; av_frame_free(&tmp); if (ret < 0) return ret; @@ -2705,82 +2663,6 @@ static int send_frame(FilterGraph *fg, FilterGraphThread *fgt, return 0; } -static int msg_process(FilterGraphPriv *fgp, FilterGraphThread *fgt, - AVFrame *frame) -{ - const enum FrameOpaque msg = (intptr_t)frame->opaque; - FilterGraph *fg = &fgp->fg; - int graph_eof = 0; - int ret; - - frame->opaque = NULL; - av_assert0(msg > 0); - av_assert0(msg == FRAME_OPAQUE_SEND_COMMAND || !frame->buf[0]); - - if (!fg->graph) { - // graph not configured yet, ignore all messages other than choosing - // the input to read from - if (msg != FRAME_OPAQUE_CHOOSE_INPUT) { - av_frame_unref(frame); - goto done; - } - - for (int i = 0; i < fg->nb_inputs; i++) { - InputFilter *ifilter = fg->inputs[i]; - InputFilterPriv *ifp = ifp_from_ifilter(ifilter); - if (ifp->format < 0 && !fgt->eof_in[i]) { - frame->opaque = (void*)(intptr_t)(i + 1); - goto done; - } - } - - // This state - graph is not configured, but all inputs are either - // initialized or EOF - should be unreachable because sending EOF to a - // filter without even a fallback format should fail - av_assert0(0); - return AVERROR_BUG; - } - - if (msg == FRAME_OPAQUE_SEND_COMMAND) { - FilterCommand *fc = (FilterCommand*)frame->buf[0]->data; - send_command(fg, fc->time, fc->target, fc->command, fc->arg, fc->all_filters); - av_frame_unref(frame); - goto done; - } - - if (msg == FRAME_OPAQUE_CHOOSE_INPUT) { - ret = avfilter_graph_request_oldest(fg->graph); - - graph_eof = ret == AVERROR_EOF; - - if (ret == AVERROR(EAGAIN)) { - frame->opaque = (void*)(intptr_t)(choose_input(fg, fgt) + 1); - goto done; - } else if (ret < 0 && !graph_eof) - return ret; - } - - ret = read_frames(fg, fgt, frame, 0); - if (ret < 0) { - av_log(fg, AV_LOG_ERROR, "Error sending filtered frames for encoding\n"); - return ret; - } - - if (graph_eof) - return AVERROR_EOF; - - // signal to the main thread that we are done processing the message -done: - ret = tq_send(fgp->queue_out, fg->nb_outputs, frame); - if (ret < 0) { - if (ret != AVERROR_EOF) - av_log(fg, AV_LOG_ERROR, "Error communicating with the main thread\n"); - return ret; - } - - return 0; -} - static void fg_thread_set_name(const FilterGraph *fg) { char name[16]; @@ -2867,294 +2749,94 @@ static void *filter_thread(void *arg) InputFilter *ifilter; InputFilterPriv *ifp; enum FrameOpaque o; - int input_idx, eof_frame; + unsigned input_idx = fgt.next_in; - input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame); - if (input_idx < 0 || - (input_idx == fg->nb_inputs && input_status < 0)) { + input_status = sch_filter_receive(fgp->sch, fgp->sch_idx, + &input_idx, fgt.frame); + if (input_status == AVERROR_EOF) { av_log(fg, AV_LOG_VERBOSE, "Filtering thread received EOF\n"); break; + } else if (input_status == AVERROR(EAGAIN)) { + // should only happen when we didn't request any input + av_assert0(input_idx == fg->nb_inputs); + goto read_frames; } + av_assert0(input_status >= 0); + + o = (intptr_t)fgt.frame->opaque; o = (intptr_t)fgt.frame->opaque; // message on the control stream if (input_idx == fg->nb_inputs) { - ret = msg_process(fgp, &fgt, fgt.frame); - if (ret < 0) - goto finish; + FilterCommand *fc; + av_assert0(o == FRAME_OPAQUE_SEND_COMMAND && fgt.frame->buf[0]); + + fc = (FilterCommand*)fgt.frame->buf[0]->data; + send_command(fg, fc->time, fc->target, fc->command, fc->arg, + fc->all_filters); + av_frame_unref(fgt.frame); continue; } // we received an input frame or EOF ifilter = fg->inputs[input_idx]; ifp = ifp_from_ifilter(ifilter); - eof_frame = input_status >= 0 && o == FRAME_OPAQUE_EOF; + if (ifp->type_src == AVMEDIA_TYPE_SUBTITLE) { int hb_frame = input_status >= 0 && o == FRAME_OPAQUE_SUB_HEARTBEAT; ret = sub2video_frame(ifilter, (fgt.frame->buf[0] || hb_frame) ? fgt.frame : NULL); - } else if (input_status >= 0 && fgt.frame->buf[0]) { + } else if (fgt.frame->buf[0]) { ret = send_frame(fg, &fgt, ifilter, fgt.frame); } else { - int64_t pts = input_status >= 0 ? fgt.frame->pts : AV_NOPTS_VALUE; - AVRational tb = input_status >= 0 ? fgt.frame->time_base : (AVRational){ 1, 1 }; - ret = send_eof(&fgt, ifilter, pts, tb); + av_assert1(o == FRAME_OPAQUE_EOF); + ret = send_eof(&fgt, ifilter, fgt.frame->pts, fgt.frame->time_base); } av_frame_unref(fgt.frame); if (ret < 0) + goto finish; + +read_frames: + // retrieve all newly avalable frames + ret = read_frames(fg, &fgt, fgt.frame); + if (ret == AVERROR_EOF) { + av_log(fg, AV_LOG_VERBOSE, "All consumers returned EOF\n"); break; - - if (eof_frame) { - // an EOF frame is immediately followed by sender closing - // the corresponding stream, so retrieve that event - input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame); - av_assert0(input_status == AVERROR_EOF && input_idx == ifp->index); - } - - // signal to the main thread that we are done - ret = tq_send(fgp->queue_out, fg->nb_outputs, fgt.frame); - if (ret < 0) { - if (ret == AVERROR_EOF) - break; - - av_log(fg, AV_LOG_ERROR, "Error communicating with the main thread\n"); + } else if (ret < 0) { + av_log(fg, AV_LOG_ERROR, "Error sending frames to consumers: %s\n", + av_err2str(ret)); goto finish; } } + for (unsigned i = 0; i < fg->nb_outputs; i++) { + OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]); + + if (fgt.eof_out[i]) + continue; + + ret = fg_output_frame(ofp, &fgt, NULL); + if (ret < 0) + goto finish; + } + finish: // EOF is normal termination if (ret == AVERROR_EOF) ret = 0; - for (int i = 0; i <= fg->nb_inputs; i++) - tq_receive_finish(fgp->queue_in, i); - for (int i = 0; i <= fg->nb_outputs; i++) - tq_send_finish(fgp->queue_out, i); - fg_thread_uninit(&fgt); - av_log(fg, AV_LOG_VERBOSE, "Terminating filtering thread\n"); - return (void*)(intptr_t)ret; } -static int thread_send_frame(FilterGraphPriv *fgp, InputFilter *ifilter, - AVFrame *frame, enum FrameOpaque type) -{ - InputFilterPriv *ifp = ifp_from_ifilter(ifilter); - int output_idx, ret; - - if (ifp->eof) { - av_frame_unref(frame); - return AVERROR_EOF; - } - - frame->opaque = (void*)(intptr_t)type; - - ret = tq_send(fgp->queue_in, ifp->index, frame); - if (ret < 0) { - ifp->eof = 1; - av_frame_unref(frame); - return ret; - } - - if (type == FRAME_OPAQUE_EOF) - tq_send_finish(fgp->queue_in, ifp->index); - - // wait for the frame to be processed - ret = tq_receive(fgp->queue_out, &output_idx, frame); - av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF); - - return ret; -} - -int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference) -{ - FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph); - int ret; - - if (keep_reference) { - ret = av_frame_ref(fgp->frame, frame); - if (ret < 0) - return ret; - } else - av_frame_move_ref(fgp->frame, frame); - - return thread_send_frame(fgp, ifilter, fgp->frame, 0); -} - -int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb) -{ - FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph); - int ret; - - fgp->frame->pts = pts; - fgp->frame->time_base = tb; - - ret = thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_EOF); - - return ret == AVERROR_EOF ? 0 : ret; -} - -void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb) -{ - FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph); - - fgp->frame->pts = pts; - fgp->frame->time_base = tb; - - thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_SUB_HEARTBEAT); -} - -int fg_transcode_step(FilterGraph *graph, InputStream **best_ist) -{ - FilterGraphPriv *fgp = fgp_from_fg(graph); - int ret, got_frames = 0; - - if (fgp->eof_in) - return AVERROR_EOF; - - // signal to the filtering thread to return all frames it can - av_assert0(!fgp->frame->buf[0]); - fgp->frame->opaque = (void*)(intptr_t)(best_ist ? - FRAME_OPAQUE_CHOOSE_INPUT : - FRAME_OPAQUE_REAP_FILTERS); - - ret = tq_send(fgp->queue_in, graph->nb_inputs, fgp->frame); - if (ret < 0) { - fgp->eof_in = 1; - goto finish; - } - - while (1) { - OutputFilter *ofilter; - OutputFilterPriv *ofp; - OutputStream *ost; - int output_idx; - - ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame); - - // EOF on the whole queue or the control stream - if (output_idx < 0 || - (ret < 0 && output_idx == graph->nb_outputs)) - goto finish; - - // EOF for a specific stream - if (ret < 0) { - ofilter = graph->outputs[output_idx]; - ofp = ofp_from_ofilter(ofilter); - - // we are finished and no frames were ever seen at this output, - // at least initialize the encoder with a dummy frame - if (!ofp->got_frame) { - AVFrame *frame = fgp->frame; - FrameData *fd; - - frame->time_base = ofp->tb_out; - frame->format = ofp->format; - - frame->width = ofp->width; - frame->height = ofp->height; - frame->sample_aspect_ratio = ofp->sample_aspect_ratio; - - frame->sample_rate = ofp->sample_rate; - if (ofp->ch_layout.nb_channels) { - ret = av_channel_layout_copy(&frame->ch_layout, &ofp->ch_layout); - if (ret < 0) - return ret; - } - - fd = frame_data(frame); - if (!fd) - return AVERROR(ENOMEM); - - fd->frame_rate_filter = ofp->fps.framerate; - - av_assert0(!frame->buf[0]); - - av_log(ofilter->ost, AV_LOG_WARNING, - "No filtered frames for output stream, trying to " - "initialize anyway.\n"); - - enc_open(ofilter->ost, frame); - av_frame_unref(frame); - } - - close_output_stream(graph->outputs[output_idx]->ost); - continue; - } - - // request was fully processed by the filtering thread, - // return the input stream to read from, if needed - if (output_idx == graph->nb_outputs) { - int input_idx = (intptr_t)fgp->frame->opaque - 1; - av_assert0(input_idx <= graph->nb_inputs); - - if (best_ist) { - *best_ist = (input_idx >= 0 && input_idx < graph->nb_inputs) ? - ifp_from_ifilter(graph->inputs[input_idx])->ist : NULL; - - if (input_idx < 0 && !got_frames) { - for (int i = 0; i < graph->nb_outputs; i++) - graph->outputs[i]->ost->unavailable = 1; - } - } - break; - } - - // got a frame from the filtering thread, send it for encoding - ofilter = graph->outputs[output_idx]; - ost = ofilter->ost; - ofp = ofp_from_ofilter(ofilter); - - if (ost->finished) { - av_frame_unref(fgp->frame); - tq_receive_finish(fgp->queue_out, output_idx); - continue; - } - - if (fgp->frame->pts != AV_NOPTS_VALUE) { - ofilter->last_pts = av_rescale_q(fgp->frame->pts, - fgp->frame->time_base, - AV_TIME_BASE_Q); - } - - ret = enc_frame(ost, fgp->frame); - av_frame_unref(fgp->frame); - if (ret < 0) - goto finish; - - ofp->got_frame = 1; - got_frames = 1; - } - -finish: - if (ret < 0) { - fgp->eof_in = 1; - for (int i = 0; i < graph->nb_outputs; i++) - close_output_stream(graph->outputs[i]->ost); - } - - return ret; -} - -int reap_filters(FilterGraph *fg, int flush) -{ - return fg_transcode_step(fg, NULL); -} - void fg_send_command(FilterGraph *fg, double time, const char *target, const char *command, const char *arg, int all_filters) { FilterGraphPriv *fgp = fgp_from_fg(fg); AVBufferRef *buf; FilterCommand *fc; - int output_idx, ret; - - if (!fgp->queue_in) - return; fc = av_mallocz(sizeof(*fc)); if (!fc) @@ -3180,13 +2862,5 @@ void fg_send_command(FilterGraph *fg, double time, const char *target, fgp->frame->buf[0] = buf; fgp->frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_SEND_COMMAND; - ret = tq_send(fgp->queue_in, fg->nb_inputs, fgp->frame); - if (ret < 0) { - av_frame_unref(fgp->frame); - return; - } - - // wait for the frame to be processed - ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame); - av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF); + sch_filter_command(fgp->sch, fgp->sch_idx, fgp->frame); } diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c index ef5c2f60e0..067dc65d4e 100644 --- a/fftools/ffmpeg_mux.c +++ b/fftools/ffmpeg_mux.c @@ -23,16 +23,13 @@ #include "ffmpeg.h" #include "ffmpeg_mux.h" #include "ffmpeg_utils.h" -#include "objpool.h" #include "sync_queue.h" -#include "thread_queue.h" #include "libavutil/fifo.h" #include "libavutil/intreadwrite.h" #include "libavutil/log.h" #include "libavutil/mem.h" #include "libavutil/timestamp.h" -#include "libavutil/thread.h" #include "libavcodec/packet.h" @@ -41,10 +38,9 @@ typedef struct MuxThreadContext { AVPacket *pkt; + AVPacket *fix_sub_duration_pkt; } MuxThreadContext; -int want_sdp = 1; - static Muxer *mux_from_of(OutputFile *of) { return (Muxer*)of; @@ -207,14 +203,41 @@ static int sync_queue_process(Muxer *mux, OutputStream *ost, AVPacket *pkt, int return 0; } +static int of_streamcopy(OutputStream *ost, AVPacket *pkt); + /* apply the output bitstream filters */ -static int mux_packet_filter(Muxer *mux, OutputStream *ost, - AVPacket *pkt, int *stream_eof) +static int mux_packet_filter(Muxer *mux, MuxThreadContext *mt, + OutputStream *ost, AVPacket *pkt, int *stream_eof) { MuxStream *ms = ms_from_ost(ost); const char *err_msg; int ret = 0; + if (pkt && !ost->enc) { + ret = of_streamcopy(ost, pkt); + if (ret == AVERROR(EAGAIN)) + return 0; + else if (ret == AVERROR_EOF) { + av_packet_unref(pkt); + pkt = NULL; + ret = 0; + } else if (ret < 0) + goto fail; + } + + // emit heartbeat for -fix_sub_duration; + // we are only interested in heartbeats on on random access points. + if (pkt && (pkt->flags & AV_PKT_FLAG_KEY)) { + mt->fix_sub_duration_pkt->opaque = (void*)(intptr_t)PKT_OPAQUE_FIX_SUB_DURATION; + mt->fix_sub_duration_pkt->pts = pkt->pts; + mt->fix_sub_duration_pkt->time_base = pkt->time_base; + + ret = sch_mux_sub_heartbeat(mux->sch, mux->sch_idx, ms->sch_idx, + mt->fix_sub_duration_pkt); + if (ret < 0) + goto fail; + } + if (ms->bsf_ctx) { int bsf_eof = 0; @@ -278,6 +301,7 @@ static void thread_set_name(OutputFile *of) static void mux_thread_uninit(MuxThreadContext *mt) { av_packet_free(&mt->pkt); + av_packet_free(&mt->fix_sub_duration_pkt); memset(mt, 0, sizeof(*mt)); } @@ -290,6 +314,10 @@ static int mux_thread_init(MuxThreadContext *mt) if (!mt->pkt) goto fail; + mt->fix_sub_duration_pkt = av_packet_alloc(); + if (!mt->fix_sub_duration_pkt) + goto fail; + return 0; fail: @@ -316,19 +344,22 @@ void *muxer_thread(void *arg) OutputStream *ost; int stream_idx, stream_eof = 0; - ret = tq_receive(mux->tq, &stream_idx, mt.pkt); + ret = sch_mux_receive(mux->sch, of->index, mt.pkt); + stream_idx = mt.pkt->stream_index; if (stream_idx < 0) { av_log(mux, AV_LOG_VERBOSE, "All streams finished\n"); ret = 0; break; } - ost = of->streams[stream_idx]; - ret = mux_packet_filter(mux, ost, ret < 0 ? NULL : mt.pkt, &stream_eof); + ost = of->streams[mux->sch_stream_idx[stream_idx]]; + mt.pkt->stream_index = ost->index; + + ret = mux_packet_filter(mux, &mt, ost, ret < 0 ? NULL : mt.pkt, &stream_eof); av_packet_unref(mt.pkt); if (ret == AVERROR_EOF) { if (stream_eof) { - tq_receive_finish(mux->tq, stream_idx); + sch_mux_receive_finish(mux->sch, of->index, stream_idx); } else { av_log(mux, AV_LOG_VERBOSE, "Muxer returned EOF\n"); ret = 0; @@ -343,243 +374,55 @@ void *muxer_thread(void *arg) finish: mux_thread_uninit(&mt); - for (unsigned int i = 0; i < mux->fc->nb_streams; i++) - tq_receive_finish(mux->tq, i); - - av_log(mux, AV_LOG_VERBOSE, "Terminating muxer thread\n"); - return (void*)(intptr_t)ret; } -static int thread_submit_packet(Muxer *mux, OutputStream *ost, AVPacket *pkt) -{ - int ret = 0; - - if (!pkt || ost->finished & MUXER_FINISHED) - goto finish; - - ret = tq_send(mux->tq, ost->index, pkt); - if (ret < 0) - goto finish; - - return 0; - -finish: - if (pkt) - av_packet_unref(pkt); - - ost->finished |= MUXER_FINISHED; - tq_send_finish(mux->tq, ost->index); - return ret == AVERROR_EOF ? 0 : ret; -} - -static int queue_packet(OutputStream *ost, AVPacket *pkt) -{ - MuxStream *ms = ms_from_ost(ost); - AVPacket *tmp_pkt = NULL; - int ret; - - if (!av_fifo_can_write(ms->muxing_queue)) { - size_t cur_size = av_fifo_can_read(ms->muxing_queue); - size_t pkt_size = pkt ? pkt->size : 0; - unsigned int are_we_over_size = - (ms->muxing_queue_data_size + pkt_size) > ms->muxing_queue_data_threshold; - size_t limit = are_we_over_size ? ms->max_muxing_queue_size : SIZE_MAX; - size_t new_size = FFMIN(2 * cur_size, limit); - - if (new_size <= cur_size) { - av_log(ost, AV_LOG_ERROR, - "Too many packets buffered for output stream %d:%d.\n", - ost->file_index, ost->st->index); - return AVERROR(ENOSPC); - } - ret = av_fifo_grow2(ms->muxing_queue, new_size - cur_size); - if (ret < 0) - return ret; - } - - if (pkt) { - ret = av_packet_make_refcounted(pkt); - if (ret < 0) - return ret; - - tmp_pkt = av_packet_alloc(); - if (!tmp_pkt) - return AVERROR(ENOMEM); - - av_packet_move_ref(tmp_pkt, pkt); - ms->muxing_queue_data_size += tmp_pkt->size; - } - av_fifo_write(ms->muxing_queue, &tmp_pkt, 1); - - return 0; -} - -static int submit_packet(Muxer *mux, AVPacket *pkt, OutputStream *ost) -{ - int ret; - - if (mux->tq) { - return thread_submit_packet(mux, ost, pkt); - } else { - /* the muxer is not initialized yet, buffer the packet */ - ret = queue_packet(ost, pkt); - if (ret < 0) { - if (pkt) - av_packet_unref(pkt); - return ret; - } - } - - return 0; -} - -int of_output_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) -{ - Muxer *mux = mux_from_of(of); - int ret = 0; - - if (pkt && pkt->dts != AV_NOPTS_VALUE) - ost->last_mux_dts = av_rescale_q(pkt->dts, pkt->time_base, AV_TIME_BASE_Q); - - ret = submit_packet(mux, pkt, ost); - if (ret < 0) { - av_log(ost, AV_LOG_ERROR, "Error submitting a packet to the muxer: %s", - av_err2str(ret)); - return ret; - } - - return 0; -} - -int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts) +static int of_streamcopy(OutputStream *ost, AVPacket *pkt) { OutputFile *of = output_files[ost->file_index]; MuxStream *ms = ms_from_ost(ost); + DemuxPktData *pd = pkt->opaque_ref ? (DemuxPktData*)pkt->opaque_ref->data : NULL; + int64_t dts = pd ? pd->dts_est : AV_NOPTS_VALUE; int64_t start_time = (of->start_time == AV_NOPTS_VALUE) ? 0 : of->start_time; int64_t ts_offset; - AVPacket *opkt = ms->pkt; - int ret; - - av_packet_unref(opkt); if (of->recording_time != INT64_MAX && dts >= of->recording_time + start_time) - pkt = NULL; - - // EOF: flush output bitstream filters. - if (!pkt) - return of_output_packet(of, ost, NULL); + return AVERROR_EOF; if (!ms->streamcopy_started && !(pkt->flags & AV_PKT_FLAG_KEY) && !ms->copy_initial_nonkeyframes) - return 0; + return AVERROR(EAGAIN); if (!ms->streamcopy_started) { if (!ms->copy_prior_start && (pkt->pts == AV_NOPTS_VALUE ? dts < ms->ts_copy_start : pkt->pts < av_rescale_q(ms->ts_copy_start, AV_TIME_BASE_Q, pkt->time_base))) - return 0; + return AVERROR(EAGAIN); if (of->start_time != AV_NOPTS_VALUE && dts < of->start_time) - return 0; + return AVERROR(EAGAIN); } - ret = av_packet_ref(opkt, pkt); - if (ret < 0) - return ret; - - ts_offset = av_rescale_q(start_time, AV_TIME_BASE_Q, opkt->time_base); + ts_offset = av_rescale_q(start_time, AV_TIME_BASE_Q, pkt->time_base); if (pkt->pts != AV_NOPTS_VALUE) - opkt->pts -= ts_offset; + pkt->pts -= ts_offset; if (pkt->dts == AV_NOPTS_VALUE) { - opkt->dts = av_rescale_q(dts, AV_TIME_BASE_Q, opkt->time_base); + pkt->dts = av_rescale_q(dts, AV_TIME_BASE_Q, pkt->time_base); } else if (ost->st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) { - opkt->pts = opkt->dts - ts_offset; - } - opkt->dts -= ts_offset; - - { - int ret = trigger_fix_sub_duration_heartbeat(ost, pkt); - if (ret < 0) { - av_log(NULL, AV_LOG_ERROR, - "Subtitle heartbeat logic failed in %s! (%s)\n", - __func__, av_err2str(ret)); - return ret; - } + pkt->pts = pkt->dts - ts_offset; } - ret = of_output_packet(of, ost, opkt); - if (ret < 0) - return ret; + pkt->dts -= ts_offset; ms->streamcopy_started = 1; return 0; } -static int thread_stop(Muxer *mux) -{ - void *ret; - - if (!mux || !mux->tq) - return 0; - - for (unsigned int i = 0; i < mux->fc->nb_streams; i++) - tq_send_finish(mux->tq, i); - - pthread_join(mux->thread, &ret); - - tq_free(&mux->tq); - - return (int)(intptr_t)ret; -} - -static int thread_start(Muxer *mux) -{ - AVFormatContext *fc = mux->fc; - ObjPool *op; - int ret; - - op = objpool_alloc_packets(); - if (!op) - return AVERROR(ENOMEM); - - mux->tq = tq_alloc(fc->nb_streams, mux->thread_queue_size, op, pkt_move); - if (!mux->tq) { - objpool_free(&op); - return AVERROR(ENOMEM); - } - - ret = pthread_create(&mux->thread, NULL, muxer_thread, (void*)mux); - if (ret) { - tq_free(&mux->tq); - return AVERROR(ret); - } - - /* flush the muxing queues */ - for (int i = 0; i < fc->nb_streams; i++) { - OutputStream *ost = mux->of.streams[i]; - MuxStream *ms = ms_from_ost(ost); - AVPacket *pkt; - - while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) { - ret = thread_submit_packet(mux, ost, pkt); - if (pkt) { - ms->muxing_queue_data_size -= pkt->size; - av_packet_free(&pkt); - } - if (ret < 0) - return ret; - } - } - - return 0; -} - int print_sdp(const char *filename); int print_sdp(const char *filename) @@ -590,11 +433,6 @@ int print_sdp(const char *filename) AVIOContext *sdp_pb; AVFormatContext **avc; - for (i = 0; i < nb_output_files; i++) { - if (!mux_from_of(output_files[i])->header_written) - return 0; - } - avc = av_malloc_array(nb_output_files, sizeof(*avc)); if (!avc) return AVERROR(ENOMEM); @@ -629,25 +467,17 @@ int print_sdp(const char *filename) avio_closep(&sdp_pb); } - // SDP successfully written, allow muxer threads to start - ret = 1; - fail: av_freep(&avc); return ret; } -int mux_check_init(Muxer *mux) +int mux_check_init(void *arg) { + Muxer *mux = arg; OutputFile *of = &mux->of; AVFormatContext *fc = mux->fc; - int ret, i; - - for (i = 0; i < fc->nb_streams; i++) { - OutputStream *ost = of->streams[i]; - if (!ost->initialized) - return 0; - } + int ret; ret = avformat_write_header(fc, &mux->opts); if (ret < 0) { @@ -659,27 +489,7 @@ int mux_check_init(Muxer *mux) mux->header_written = 1; av_dump_format(fc, of->index, fc->url, 1); - nb_output_dumped++; - - if (sdp_filename || want_sdp) { - ret = print_sdp(sdp_filename); - if (ret < 0) { - av_log(NULL, AV_LOG_ERROR, "Error writing the SDP.\n"); - return ret; - } else if (ret == 1) { - /* SDP is written only after all the muxers are ready, so now we - * start ALL the threads */ - for (i = 0; i < nb_output_files; i++) { - ret = thread_start(mux_from_of(output_files[i])); - if (ret < 0) - return ret; - } - } - } else { - ret = thread_start(mux_from_of(of)); - if (ret < 0) - return ret; - } + atomic_fetch_add(&nb_output_dumped, 1); return 0; } @@ -736,9 +546,10 @@ int of_stream_init(OutputFile *of, OutputStream *ost) ost->st->time_base); } - ost->initialized = 1; + if (ms->sch_idx >= 0) + return sch_mux_stream_ready(mux->sch, of->index, ms->sch_idx); - return mux_check_init(mux); + return 0; } static int check_written(OutputFile *of) @@ -852,15 +663,13 @@ int of_write_trailer(OutputFile *of) AVFormatContext *fc = mux->fc; int ret, mux_result = 0; - if (!mux->tq) { + if (!mux->header_written) { av_log(mux, AV_LOG_ERROR, "Nothing was written into output file, because " "at least one of its streams received no packets.\n"); return AVERROR(EINVAL); } - mux_result = thread_stop(mux); - ret = av_write_trailer(fc); if (ret < 0) { av_log(mux, AV_LOG_ERROR, "Error writing trailer: %s\n", av_err2str(ret)); @@ -905,13 +714,6 @@ static void ost_free(OutputStream **post) ost->logfile = NULL; } - if (ms->muxing_queue) { - AVPacket *pkt; - while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) - av_packet_free(&pkt); - av_fifo_freep2(&ms->muxing_queue); - } - avcodec_parameters_free(&ost->par_in); av_bsf_free(&ms->bsf_ctx); @@ -976,8 +778,6 @@ void of_free(OutputFile **pof) return; mux = mux_from_of(of); - thread_stop(mux); - sq_free(&of->sq_encode); sq_free(&mux->sq_mux); diff --git a/fftools/ffmpeg_mux.h b/fftools/ffmpeg_mux.h index eee2b2cb07..5d7cf3fa76 100644 --- a/fftools/ffmpeg_mux.h +++ b/fftools/ffmpeg_mux.h @@ -25,7 +25,6 @@ #include #include "ffmpeg_sched.h" -#include "thread_queue.h" #include "libavformat/avformat.h" @@ -33,7 +32,6 @@ #include "libavutil/dict.h" #include "libavutil/fifo.h" -#include "libavutil/thread.h" typedef struct MuxStream { OutputStream ost; @@ -41,9 +39,6 @@ typedef struct MuxStream { // name used for logging char log_name[32]; - /* the packets are buffered here until the muxer is ready to be initialized */ - AVFifo *muxing_queue; - AVBSFContext *bsf_ctx; AVPacket *bsf_pkt; @@ -57,17 +52,6 @@ typedef struct MuxStream { int64_t max_frames; - /* - * The size of the AVPackets' buffers in queue. - * Updated when a packet is either pushed or pulled from the queue. - */ - size_t muxing_queue_data_size; - - int max_muxing_queue_size; - - /* Threshold after which max_muxing_queue_size will be in effect */ - size_t muxing_queue_data_threshold; - // timestamp from which the streamcopied streams should start, // in AV_TIME_BASE_Q; // everything before it should be discarded @@ -106,9 +90,6 @@ typedef struct Muxer { int *sch_stream_idx; int nb_sch_stream_idx; - pthread_t thread; - ThreadQueue *tq; - AVDictionary *opts; int thread_queue_size; @@ -122,10 +103,7 @@ typedef struct Muxer { AVPacket *sq_pkt; } Muxer; -/* whether we want to print an SDP, set in of_open() */ -extern int want_sdp; - -int mux_check_init(Muxer *mux); +int mux_check_init(void *arg); static MuxStream *ms_from_ost(OutputStream *ost) { diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c index 534b4379c7..6459296ab0 100644 --- a/fftools/ffmpeg_mux_init.c +++ b/fftools/ffmpeg_mux_init.c @@ -924,13 +924,6 @@ static int new_stream_audio(Muxer *mux, const OptionsContext *o, return 0; } -static int new_stream_attachment(Muxer *mux, const OptionsContext *o, - OutputStream *ost) -{ - ost->finished = 1; - return 0; -} - static int new_stream_subtitle(Muxer *mux, const OptionsContext *o, OutputStream *ost) { @@ -1168,9 +1161,6 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type, if (!ost->par_in) return AVERROR(ENOMEM); - ms->muxing_queue = av_fifo_alloc2(8, sizeof(AVPacket*), 0); - if (!ms->muxing_queue) - return AVERROR(ENOMEM); ms->last_mux_dts = AV_NOPTS_VALUE; ost->st = st; @@ -1190,7 +1180,8 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type, if (!ost->enc_ctx) return AVERROR(ENOMEM); - ret = sch_add_enc(mux->sch, encoder_thread, ost, NULL); + ret = sch_add_enc(mux->sch, encoder_thread, ost, + ost->type == AVMEDIA_TYPE_SUBTITLE ? NULL : enc_open); if (ret < 0) return ret; ms->sch_idx_enc = ret; @@ -1414,9 +1405,6 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type, sch_mux_stream_buffering(mux->sch, mux->sch_idx, ms->sch_idx, max_muxing_queue_size, muxing_queue_data_threshold); - - ms->max_muxing_queue_size = max_muxing_queue_size; - ms->muxing_queue_data_threshold = muxing_queue_data_threshold; } MATCH_PER_STREAM_OPT(bits_per_raw_sample, i, ost->bits_per_raw_sample, @@ -1434,8 +1422,6 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type, if (ost->enc_ctx && av_get_exact_bits_per_sample(ost->enc_ctx->codec_id) == 24) av_dict_set(&ost->swr_opts, "output_sample_bits", "24", 0); - ost->last_mux_dts = AV_NOPTS_VALUE; - MATCH_PER_STREAM_OPT(copy_initial_nonkeyframes, i, ms->copy_initial_nonkeyframes, oc, st); @@ -1443,7 +1429,6 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type, case AVMEDIA_TYPE_VIDEO: ret = new_stream_video (mux, o, ost); break; case AVMEDIA_TYPE_AUDIO: ret = new_stream_audio (mux, o, ost); break; case AVMEDIA_TYPE_SUBTITLE: ret = new_stream_subtitle (mux, o, ost); break; - case AVMEDIA_TYPE_ATTACHMENT: ret = new_stream_attachment(mux, o, ost); break; } if (ret < 0) return ret; @@ -1938,7 +1923,6 @@ static int setup_sync_queues(Muxer *mux, AVFormatContext *oc, int64_t buf_size_u MuxStream *ms = ms_from_ost(ost); enum AVMediaType type = ost->type; - ost->sq_idx_encode = -1; ost->sq_idx_mux = -1; nb_interleaved += IS_INTERLEAVED(type); @@ -1961,11 +1945,17 @@ static int setup_sync_queues(Muxer *mux, AVFormatContext *oc, int64_t buf_size_u * - at least one encoded audio/video stream is frame-limited, since * that has similar semantics to 'shortest' * - at least one audio encoder requires constant frame sizes + * + * Note that encoding sync queues are handled in the scheduler, because + * different encoders run in different threads and need external + * synchronization, while muxer sync queues can be handled inside the muxer */ if ((of->shortest && nb_av_enc > 1) || limit_frames_av_enc || nb_audio_fs) { - of->sq_encode = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, mux); - if (!of->sq_encode) - return AVERROR(ENOMEM); + int sq_idx, ret; + + sq_idx = sch_add_sq_enc(mux->sch, buf_size_us, mux); + if (sq_idx < 0) + return sq_idx; for (int i = 0; i < oc->nb_streams; i++) { OutputStream *ost = of->streams[i]; @@ -1975,13 +1965,11 @@ static int setup_sync_queues(Muxer *mux, AVFormatContext *oc, int64_t buf_size_u if (!IS_AV_ENC(ost, type)) continue; - ost->sq_idx_encode = sq_add_stream(of->sq_encode, - of->shortest || ms->max_frames < INT64_MAX); - if (ost->sq_idx_encode < 0) - return ost->sq_idx_encode; - - if (ms->max_frames != INT64_MAX) - sq_limit_frames(of->sq_encode, ost->sq_idx_encode, ms->max_frames); + ret = sch_sq_add_enc(mux->sch, sq_idx, ms->sch_idx_enc, + of->shortest || ms->max_frames < INT64_MAX, + ms->max_frames); + if (ret < 0) + return ret; } } @@ -2652,23 +2640,6 @@ static int validate_enc_avopt(Muxer *mux, const AVDictionary *codec_avopt) return 0; } -static int init_output_stream_nofilter(OutputStream *ost) -{ - int ret = 0; - - if (ost->enc_ctx) { - ret = enc_open(ost, NULL); - if (ret < 0) - return ret; - } else { - ret = of_stream_init(output_files[ost->file_index], ost); - if (ret < 0) - return ret; - } - - return ret; -} - static const char *output_file_item_name(void *obj) { const Muxer *mux = obj; @@ -2751,8 +2722,6 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch) av_strlcat(mux->log_name, "/", sizeof(mux->log_name)); av_strlcat(mux->log_name, oc->oformat->name, sizeof(mux->log_name)); - if (strcmp(oc->oformat->name, "rtp")) - want_sdp = 0; of->format = oc->oformat; if (recording_time != INT64_MAX) @@ -2768,7 +2737,7 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch) AVFMT_FLAG_BITEXACT); } - err = sch_add_mux(sch, muxer_thread, NULL, mux, + err = sch_add_mux(sch, muxer_thread, mux_check_init, mux, !strcmp(oc->oformat->name, "rtp")); if (err < 0) return err; @@ -2854,26 +2823,15 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch) of->url = filename; - /* initialize stream copy and subtitle/data streams. - * Encoded AVFrame based streams will get initialized when the first AVFrame - * is received in do_video_out - */ + /* initialize streamcopy streams. */ for (int i = 0; i < of->nb_streams; i++) { OutputStream *ost = of->streams[i]; - if (ost->filter) - continue; - - err = init_output_stream_nofilter(ost); - if (err < 0) - return err; - } - - /* write the header for files with no streams */ - if (of->format->flags & AVFMT_NOSTREAMS && oc->nb_streams == 0) { - int ret = mux_check_init(mux); - if (ret < 0) - return ret; + if (!ost->enc) { + err = of_stream_init(of, ost); + if (err < 0) + return err; + } } return 0; diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c index d463306546..6177a96a4e 100644 --- a/fftools/ffmpeg_opt.c +++ b/fftools/ffmpeg_opt.c @@ -64,7 +64,6 @@ const char *const opt_name_top_field_first[] = {"top", NULL}; HWDevice *filter_hw_device; char *vstats_filename; -char *sdp_filename; float audio_drift_threshold = 0.1; float dts_delta_threshold = 10; @@ -580,9 +579,8 @@ fail: static int opt_sdp_file(void *optctx, const char *opt, const char *arg) { - av_free(sdp_filename); - sdp_filename = av_strdup(arg); - return 0; + Scheduler *sch = optctx; + return sch_sdp_filename(sch, arg); } #if CONFIG_VAAPI diff --git a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat index 957a410921..bc9b833799 100644 --- a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat +++ b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat @@ -1,48 +1,40 @@ 1 -00:00:00,968 --> 00:00:01,001 +00:00:00,968 --> 00:00:01,168 {\an7}( 2 -00:00:01,001 --> 00:00:01,168 -{\an7}( - -3 00:00:01,168 --> 00:00:01,368 {\an7}( inaudibl -4 +3 00:00:01,368 --> 00:00:01,568 {\an7}( inaudible radio chat -5 +4 00:00:01,568 --> 00:00:02,002 {\an7}( inaudible radio chatter ) +5 +00:00:02,002 --> 00:00:03,103 +{\an7}( inaudible radio chatter ) + 6 -00:00:02,002 --> 00:00:03,003 -{\an7}( inaudible radio chatter ) - -7 -00:00:03,003 --> 00:00:03,103 -{\an7}( inaudible radio chatter ) - -8 00:00:03,103 --> 00:00:03,303 -{\an7}( inaudible radio chatter ) +{\an7}( inaudible radio chatter ) >> -9 +7 00:00:03,303 --> 00:00:03,503 -{\an7}( inaudible radio chatter ) +{\an7}( inaudible radio chatter ) >> Safety rema -10 +8 00:00:03,504 --> 00:00:03,704 -{\an7}( inaudible radio chatter ) +{\an7}( inaudible radio chatter ) >> Safety remains our numb -11 +9 00:00:03,704 --> 00:00:04,004 -{\an7}( inaudible radio chatter ) +{\an7}( inaudible radio chatter ) >> Safety remains our number one