From patchwork Thu Nov 23 19:15:07 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Anton Khirnov X-Patchwork-Id: 44777 Delivered-To: ffmpegpatchwork2@gmail.com Received: by 2002:a05:6a20:bca6:b0:181:818d:5e7f with SMTP id fx38csp802888pzb; Thu, 23 Nov 2023 11:20:53 -0800 (PST) X-Google-Smtp-Source: AGHT+IFHV+XAAk/5J7CR3rJWALBw4qslMBe1jkvqTD9YMB49lw/1OCKbYeu+VEm60DO0qauT1yOw X-Received: by 2002:a50:aac1:0:b0:53e:782f:cfa6 with SMTP id r1-20020a50aac1000000b0053e782fcfa6mr151381edc.37.1700767253480; Thu, 23 Nov 2023 11:20:53 -0800 (PST) ARC-Seal: i=1; a=rsa-sha256; t=1700767253; cv=none; d=google.com; s=arc-20160816; b=urn4kpnrzM6lOJ7IPdaQ44ntKgdRQC2+P+NgxMfHXjvxno9ZWAA02NTFteRmGPeSPi TT939Tni2WTplg6jGRXljpcOm/KjkuQGj3M5KWTQ0CJYLlJjP43MWSR7pYSBrmktaHZS 4GW9lGYwsgEvd2WhRUdezO1awM3SpV21poHwKyKGFXEIhbJWYXsTkHp46RLMvdjnAxxg w+QbABPUkAw5YEeysI9vqbXytym5j/YskpA/gQCQHol7cK6i7jkSiQl9CdXk2vbXjEgz 4BFGhQKAo2g4Q8+P3nQStcontiKB7IDDNS0mqc9UV2YqHIbgNnRPzE8WQXMSLAKd1mYf hroQ== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:content-transfer-encoding:reply-to:list-subscribe :list-help:list-post:list-archive:list-unsubscribe:list-id :precedence:subject:mime-version:references:in-reply-to:message-id :date:to:from:delivered-to; bh=ekDFyBvrd2okvJqwnzUxrt/SeBnBNKKUwbxN7ye+BZs=; fh=YOA8vD9MJZuwZ71F/05pj6KdCjf6jQRmzLS+CATXUQk=; b=HfpnbC1lXERvzLfYvHFNpZlE4qwhraX5RHVPIwVTyk/oepYG4izi2PgADZ3/u/S19P NwLW34c7QNXU++YhuL/qd6YVpY7f3BOYlc/vU/UxdDxqnjpd5tH5gV/i5lntlF1IWp26 gVNPfXtisgAfQu0YzSr04pAl90CEMWWy/RjnyYWztafsBrH6xybnXEgyhFJyGPgDkw33 x6Ftx6oDoH1onsGuEyqMdg638LjSS8j1qCWtfJ8zZ0xN0Edw6Y2cC+vl1n5SztGjtGh1 wbTnhCEq3N+rmL3iKx3xYJ63CyivXfPrvO43j2Gm4KG69CaEOTtYYtdZJg6iP6758ki1 xINg== ARC-Authentication-Results: i=1; mx.google.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org. [79.124.17.100]) by mx.google.com with ESMTP id e20-20020a056402149400b00548e0602502si845729edv.682.2023.11.23.11.20.53; Thu, 23 Nov 2023 11:20:53 -0800 (PST) Received-SPF: pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) client-ip=79.124.17.100; Authentication-Results: mx.google.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 3E9A568CF47; Thu, 23 Nov 2023 21:19:08 +0200 (EET) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from mail1.khirnov.net (quelana.khirnov.net [94.230.150.81]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id B822968CF0A for ; Thu, 23 Nov 2023 21:18:54 +0200 (EET) Received: from localhost (mail1.khirnov.net [IPv6:::1]) by mail1.khirnov.net (Postfix) with ESMTP id CBD121531 for ; Thu, 23 Nov 2023 20:18:20 +0100 (CET) Received: from mail1.khirnov.net ([IPv6:::1]) by localhost (mail1.khirnov.net [IPv6:::1]) (amavis, port 10024) with ESMTP id y5dSgnZeJOSS for ; Thu, 23 Nov 2023 20:18:19 +0100 (CET) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:2a00:c500:561:201::7]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256 client-signature RSA-PSS (2048 bits) client-digest SHA256) (Client CN "libav.khirnov.net", Issuer "smtp.khirnov.net SMTP CA" (verified OK)) by mail1.khirnov.net (Postfix) with ESMTPS id 9B84F1CB4 for ; Thu, 23 Nov 2023 20:18:15 +0100 (CET) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:::1]) by libav.khirnov.net (Postfix) with ESMTP id 8687C3A05D0 for ; Thu, 23 Nov 2023 20:18:15 +0100 (CET) From: Anton Khirnov To: ffmpeg-devel@ffmpeg.org Date: Thu, 23 Nov 2023 20:15:07 +0100 Message-ID: <20231123191524.11296-14-anton@khirnov.net> X-Mailer: git-send-email 2.42.0 In-Reply-To: <20231123191524.11296-2-anton@khirnov.net> References: <20231123191524.11296-2-anton@khirnov.net> MIME-Version: 1.0 Subject: [FFmpeg-devel] [PATCH 12/13] fftools/ffmpeg: add thread-aware transcode scheduling infrastructure X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" X-TUID: PpsVJbUW2bCr See the comment block at the top of fftools/ffmpeg_sched.h for more details on what this scheduler is for. This commit adds the scheduling code itself, along with minimal integration with the rest of the program: * allocating and freeing the scheduler * passing it throughout the call stack in order to register the individual components (demuxers/decoders/filtergraphs/encoders/muxers) with the scheduler The scheduler is not actually used as of this commit, so it should not result in any change in behavior. That will change in future commits. --- fftools/Makefile | 1 + fftools/ffmpeg.c | 18 +- fftools/ffmpeg.h | 24 +- fftools/ffmpeg_dec.c | 10 +- fftools/ffmpeg_demux.c | 46 +- fftools/ffmpeg_enc.c | 13 +- fftools/ffmpeg_filter.c | 37 +- fftools/ffmpeg_mux.c | 17 +- fftools/ffmpeg_mux.h | 12 + fftools/ffmpeg_mux_init.c | 106 +- fftools/ffmpeg_opt.c | 22 +- fftools/ffmpeg_sched.c | 2174 +++++++++++++++++++++++++++++++++++++ fftools/ffmpeg_sched.h | 468 ++++++++ 13 files changed, 2892 insertions(+), 56 deletions(-) create mode 100644 fftools/ffmpeg_sched.c create mode 100644 fftools/ffmpeg_sched.h diff --git a/fftools/Makefile b/fftools/Makefile index 3c763e3db9..083a1368ce 100644 --- a/fftools/Makefile +++ b/fftools/Makefile @@ -18,6 +18,7 @@ OBJS-ffmpeg += \ fftools/ffmpeg_mux.o \ fftools/ffmpeg_mux_init.o \ fftools/ffmpeg_opt.o \ + fftools/ffmpeg_sched.o \ fftools/objpool.o \ fftools/sync_queue.o \ fftools/thread_queue.o \ diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c index cf8a50bffc..b8a97258a0 100644 --- a/fftools/ffmpeg.c +++ b/fftools/ffmpeg.c @@ -99,6 +99,7 @@ #include "cmdutils.h" #include "ffmpeg.h" +#include "ffmpeg_sched.h" #include "ffmpeg_utils.h" #include "sync_queue.h" @@ -1167,7 +1168,7 @@ static int transcode_step(OutputStream *ost, AVPacket *demux_pkt) /* * The following code is the main loop of the file converter */ -static int transcode(int *err_rate_exceeded) +static int transcode(Scheduler *sch, int *err_rate_exceeded) { int ret = 0, i; InputStream *ist; @@ -1305,6 +1306,8 @@ static int64_t getmaxrss(void) int main(int argc, char **argv) { + Scheduler *sch = NULL; + int ret, err_rate_exceeded; BenchmarkTimeStamps ti; @@ -1322,8 +1325,14 @@ int main(int argc, char **argv) show_banner(argc, argv, options); + sch = sch_alloc(); + if (!sch) { + ret = AVERROR(ENOMEM); + goto finish; + } + /* parse options and open all input/output files */ - ret = ffmpeg_parse_options(argc, argv); + ret = ffmpeg_parse_options(argc, argv, sch); if (ret < 0) goto finish; @@ -1341,7 +1350,7 @@ int main(int argc, char **argv) } current_time = ti = get_benchmark_time_stamps(); - ret = transcode(&err_rate_exceeded); + ret = transcode(sch, &err_rate_exceeded); if (ret >= 0 && do_benchmark) { int64_t utime, stime, rtime; current_time = get_benchmark_time_stamps(); @@ -1361,5 +1370,8 @@ finish: ret = 0; ffmpeg_cleanup(ret); + + sch_free(&sch); + return ret; } diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index 3c153021f8..a89038b765 100644 --- a/fftools/ffmpeg.h +++ b/fftools/ffmpeg.h @@ -27,6 +27,7 @@ #include #include "cmdutils.h" +#include "ffmpeg_sched.h" #include "sync_queue.h" #include "libavformat/avformat.h" @@ -721,7 +722,8 @@ int parse_and_set_vsync(const char *arg, int *vsync_var, int file_idx, int st_id int check_filter_outputs(void); int filtergraph_is_simple(const FilterGraph *fg); int init_simple_filtergraph(InputStream *ist, OutputStream *ost, - char *graph_desc); + char *graph_desc, + Scheduler *sch, unsigned sch_idx_enc); int init_complex_filtergraph(FilterGraph *fg); int copy_av_subtitle(AVSubtitle *dst, const AVSubtitle *src); @@ -746,7 +748,8 @@ void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational t */ int ifilter_parameters_from_dec(InputFilter *ifilter, const AVCodecContext *dec); -int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost); +int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost, + unsigned sched_idx_enc); /** * Create a new filtergraph in the global filtergraph list. @@ -754,7 +757,7 @@ int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost); * @param graph_desc Graph description; an av_malloc()ed string, filtergraph * takes ownership of it. */ -int fg_create(FilterGraph **pfg, char *graph_desc); +int fg_create(FilterGraph **pfg, char *graph_desc, Scheduler *sch); void fg_free(FilterGraph **pfg); @@ -778,7 +781,7 @@ void fg_send_command(FilterGraph *fg, double time, const char *target, */ int reap_filters(FilterGraph *fg, int flush); -int ffmpeg_parse_options(int argc, char **argv); +int ffmpeg_parse_options(int argc, char **argv, Scheduler *sch); void enc_stats_write(OutputStream *ost, EncStats *es, const AVFrame *frame, const AVPacket *pkt, @@ -801,7 +804,7 @@ AVBufferRef *hw_device_for_filter(void); int hwaccel_retrieve_data(AVCodecContext *avctx, AVFrame *input); -int dec_open(InputStream *ist); +int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx); void dec_free(Decoder **pdec); /** @@ -815,7 +818,8 @@ void dec_free(Decoder **pdec); */ int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof); -int enc_alloc(Encoder **penc, const AVCodec *codec); +int enc_alloc(Encoder **penc, const AVCodec *codec, + Scheduler *sch, unsigned sch_idx); void enc_free(Encoder **penc); int enc_open(OutputStream *ost, const AVFrame *frame); @@ -831,7 +835,7 @@ int enc_flush(void); */ int of_stream_init(OutputFile *of, OutputStream *ost); int of_write_trailer(OutputFile *of); -int of_open(const OptionsContext *o, const char *filename); +int of_open(const OptionsContext *o, const char *filename, Scheduler *sch); void of_free(OutputFile **pof); void of_enc_stats_close(void); @@ -845,7 +849,7 @@ int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts); int64_t of_filesize(OutputFile *of); -int ifile_open(const OptionsContext *o, const char *filename); +int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch); void ifile_close(InputFile **f); /** @@ -932,4 +936,8 @@ extern const char * const opt_name_frame_rates[]; extern const char * const opt_name_top_field_first[]; #endif +void *muxer_thread(void *arg); +void *decoder_thread(void *arg); +void *encoder_thread(void *arg); + #endif /* FFTOOLS_FFMPEG_H */ diff --git a/fftools/ffmpeg_dec.c b/fftools/ffmpeg_dec.c index b60bad1220..90ea0d6d93 100644 --- a/fftools/ffmpeg_dec.c +++ b/fftools/ffmpeg_dec.c @@ -52,6 +52,9 @@ struct Decoder { AVFrame *sub_prev[2]; AVFrame *sub_heartbeat; + Scheduler *sch; + unsigned sch_idx; + pthread_t thread; /** * Queue for sending coded packets from the main thread to @@ -673,7 +676,7 @@ fail: return AVERROR(ENOMEM); } -static void *decoder_thread(void *arg) +void *decoder_thread(void *arg) { InputStream *ist = arg; InputFile *ifile = input_files[ist->file_index]; @@ -1045,7 +1048,7 @@ static int hw_device_setup_for_decode(InputStream *ist) return 0; } -int dec_open(InputStream *ist) +int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx) { Decoder *d; const AVCodec *codec = ist->dec; @@ -1063,6 +1066,9 @@ int dec_open(InputStream *ist) return ret; d = ist->decoder; + d->sch = sch; + d->sch_idx = sch_idx; + if (codec->type == AVMEDIA_TYPE_SUBTITLE && ist->fix_sub_duration) { for (int i = 0; i < FF_ARRAY_ELEMS(d->sub_prev); i++) { d->sub_prev[i] = av_frame_alloc(); diff --git a/fftools/ffmpeg_demux.c b/fftools/ffmpeg_demux.c index 65a5e08ca5..2234dbe076 100644 --- a/fftools/ffmpeg_demux.c +++ b/fftools/ffmpeg_demux.c @@ -20,6 +20,7 @@ #include #include "ffmpeg.h" +#include "ffmpeg_sched.h" #include "ffmpeg_utils.h" #include "objpool.h" #include "thread_queue.h" @@ -60,6 +61,9 @@ typedef struct DemuxStream { // name used for logging char log_name[32]; + int sch_idx_stream; + int sch_idx_dec; + double ts_scale; int streamcopy_needed; @@ -108,6 +112,7 @@ typedef struct Demuxer { double readrate_initial_burst; + Scheduler *sch; ThreadQueue *thread_queue; int thread_queue_size; pthread_t thread; @@ -780,7 +785,9 @@ void ifile_close(InputFile **pf) static int ist_use(InputStream *ist, int decoding_needed) { + Demuxer *d = demuxer_from_ifile(input_files[ist->file_index]); DemuxStream *ds = ds_from_ist(ist); + int ret; if (ist->user_set_discard == AVDISCARD_ALL) { av_log(ist, AV_LOG_ERROR, "Cannot %s a disabled input stream\n", @@ -788,13 +795,32 @@ static int ist_use(InputStream *ist, int decoding_needed) return AVERROR(EINVAL); } + if (ds->sch_idx_stream < 0) { + ret = sch_add_demux_stream(d->sch, d->f.index); + if (ret < 0) + return ret; + ds->sch_idx_stream = ret; + } + ist->discard = 0; ist->st->discard = ist->user_set_discard; ist->decoding_needed |= decoding_needed; ds->streamcopy_needed |= !decoding_needed; - if (decoding_needed && !avcodec_is_open(ist->dec_ctx)) { - int ret = dec_open(ist); + if (decoding_needed && ds->sch_idx_dec < 0) { + int is_audio = ist->st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO; + + ret = sch_add_dec(d->sch, decoder_thread, ist, d->loop && is_audio); + if (ret < 0) + return ret; + ds->sch_idx_dec = ret; + + ret = sch_connect(d->sch, SCH_DSTREAM(d->f.index, ds->sch_idx_stream), + SCH_DEC(ds->sch_idx_dec)); + if (ret < 0) + return ret; + + ret = dec_open(ist, d->sch, ds->sch_idx_dec); if (ret < 0) return ret; } @@ -804,6 +830,7 @@ static int ist_use(InputStream *ist, int decoding_needed) int ist_output_add(InputStream *ist, OutputStream *ost) { + DemuxStream *ds = ds_from_ist(ist); int ret; ret = ist_use(ist, ost->enc ? DECODING_FOR_OST : 0); @@ -816,11 +843,12 @@ int ist_output_add(InputStream *ist, OutputStream *ost) ist->outputs[ist->nb_outputs - 1] = ost; - return 0; + return ost->enc ? ds->sch_idx_dec : ds->sch_idx_stream; } int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple) { + DemuxStream *ds = ds_from_ist(ist); int ret; ret = ist_use(ist, is_simple ? DECODING_FOR_OST : DECODING_FOR_FILTER); @@ -838,7 +866,7 @@ int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple) if (ret < 0) return ret; - return 0; + return ds->sch_idx_dec; } static int choose_decoder(const OptionsContext *o, AVFormatContext *s, AVStream *st, @@ -970,6 +998,9 @@ static DemuxStream *demux_stream_alloc(Demuxer *d, AVStream *st) if (!ds) return NULL; + ds->sch_idx_stream = -1; + ds->sch_idx_dec = -1; + ds->ist.st = st; ds->ist.file_index = f->index; ds->ist.index = st->index; @@ -1295,7 +1326,7 @@ static Demuxer *demux_alloc(void) return d; } -int ifile_open(const OptionsContext *o, const char *filename) +int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch) { Demuxer *d; InputFile *f; @@ -1322,6 +1353,11 @@ int ifile_open(const OptionsContext *o, const char *filename) f = &d->f; + ret = sch_add_demux(sch, input_thread, d); + if (ret < 0) + return ret; + d->sch = sch; + if (stop_time != INT64_MAX && recording_time != INT64_MAX) { stop_time = INT64_MAX; av_log(d, AV_LOG_WARNING, "-t and -to cannot be used together; using -t.\n"); diff --git a/fftools/ffmpeg_enc.c b/fftools/ffmpeg_enc.c index 46c21fc0e4..9871381c0e 100644 --- a/fftools/ffmpeg_enc.c +++ b/fftools/ffmpeg_enc.c @@ -56,6 +56,9 @@ struct Encoder { int opened; int finished; + Scheduler *sch; + unsigned sch_idx; + pthread_t thread; /** * Queue for sending frames from the main thread to @@ -113,7 +116,8 @@ void enc_free(Encoder **penc) av_freep(penc); } -int enc_alloc(Encoder **penc, const AVCodec *codec) +int enc_alloc(Encoder **penc, const AVCodec *codec, + Scheduler *sch, unsigned sch_idx) { Encoder *enc; @@ -133,6 +137,9 @@ int enc_alloc(Encoder **penc, const AVCodec *codec) if (!enc->pkt) goto fail; + enc->sch = sch; + enc->sch_idx = sch_idx; + *penc = enc; return 0; @@ -217,8 +224,6 @@ static int set_encoder_id(OutputFile *of, OutputStream *ost) return 0; } -static void *encoder_thread(void *arg); - static int enc_thread_start(OutputStream *ost) { Encoder *e = ost->enc; @@ -1001,7 +1006,7 @@ fail: return AVERROR(ENOMEM); } -static void *encoder_thread(void *arg) +void *encoder_thread(void *arg) { OutputStream *ost = arg; OutputFile *of = output_files[ost->file_index]; diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c index d8320b7526..1b41d32540 100644 --- a/fftools/ffmpeg_filter.c +++ b/fftools/ffmpeg_filter.c @@ -65,6 +65,9 @@ typedef struct FilterGraphPriv { // frame for sending output to the encoder AVFrame *frame_enc; + Scheduler *sch; + unsigned sch_idx; + pthread_t thread; /** * Queue for sending frames from the main thread to the filtergraph. Has @@ -735,14 +738,19 @@ static int ifilter_bind_ist(InputFilter *ifilter, InputStream *ist) { InputFilterPriv *ifp = ifp_from_ifilter(ifilter); FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph); - int ret; + int ret, dec_idx; av_assert0(!ifp->ist); ifp->ist = ist; ifp->type_src = ist->st->codecpar->codec_type; - ret = ist_filter_add(ist, ifilter, filtergraph_is_simple(ifilter->graph)); + dec_idx = ist_filter_add(ist, ifilter, filtergraph_is_simple(ifilter->graph)); + if (dec_idx < 0) + return dec_idx; + + ret = sch_connect(fgp->sch, SCH_DEC(dec_idx), + SCH_FILTER_IN(fgp->sch_idx, ifp->index)); if (ret < 0) return ret; @@ -798,13 +806,15 @@ static int set_channel_layout(OutputFilterPriv *f, OutputStream *ost) return 0; } -int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost) +int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost, + unsigned sched_idx_enc) { const OutputFile *of = output_files[ost->file_index]; OutputFilterPriv *ofp = ofp_from_ofilter(ofilter); FilterGraph *fg = ofilter->graph; FilterGraphPriv *fgp = fgp_from_fg(fg); const AVCodec *c = ost->enc_ctx->codec; + int ret; av_assert0(!ofilter->ost); @@ -887,6 +897,11 @@ int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost) break; } + ret = sch_connect(fgp->sch, SCH_FILTER_OUT(fgp->sch_idx, ofp->index), + SCH_ENC(sched_idx_enc)); + if (ret < 0) + return ret; + fgp->nb_outputs_bound++; av_assert0(fgp->nb_outputs_bound <= fg->nb_outputs); @@ -1016,7 +1031,7 @@ static const AVClass fg_class = { .category = AV_CLASS_CATEGORY_FILTER, }; -int fg_create(FilterGraph **pfg, char *graph_desc) +int fg_create(FilterGraph **pfg, char *graph_desc, Scheduler *sch) { FilterGraphPriv *fgp; FilterGraph *fg; @@ -1037,6 +1052,7 @@ int fg_create(FilterGraph **pfg, char *graph_desc) fg->index = nb_filtergraphs - 1; fgp->graph_desc = graph_desc; fgp->disable_conversions = !auto_conversion_filters; + fgp->sch = sch; snprintf(fgp->log_name, sizeof(fgp->log_name), "fc#%d", fg->index); @@ -1104,6 +1120,12 @@ int fg_create(FilterGraph **pfg, char *graph_desc) goto fail; } + ret = sch_add_filtergraph(sch, fg->nb_inputs, fg->nb_outputs, + filter_thread, fgp); + if (ret < 0) + goto fail; + fgp->sch_idx = ret; + fail: avfilter_inout_free(&inputs); avfilter_inout_free(&outputs); @@ -1116,13 +1138,14 @@ fail: } int init_simple_filtergraph(InputStream *ist, OutputStream *ost, - char *graph_desc) + char *graph_desc, + Scheduler *sch, unsigned sched_idx_enc) { FilterGraph *fg; FilterGraphPriv *fgp; int ret; - ret = fg_create(&fg, graph_desc); + ret = fg_create(&fg, graph_desc, sch); if (ret < 0) return ret; fgp = fgp_from_fg(fg); @@ -1148,7 +1171,7 @@ int init_simple_filtergraph(InputStream *ist, OutputStream *ost, if (ret < 0) return ret; - ret = ofilter_bind_ost(fg->outputs[0], ost); + ret = ofilter_bind_ost(fg->outputs[0], ost, sched_idx_enc); if (ret < 0) return ret; diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c index 57fb8a8413..ef5c2f60e0 100644 --- a/fftools/ffmpeg_mux.c +++ b/fftools/ffmpeg_mux.c @@ -297,7 +297,7 @@ fail: return AVERROR(ENOMEM); } -static void *muxer_thread(void *arg) +void *muxer_thread(void *arg) { Muxer *mux = arg; OutputFile *of = &mux->of; @@ -580,7 +580,9 @@ static int thread_start(Muxer *mux) return 0; } -static int print_sdp(void) +int print_sdp(const char *filename); + +int print_sdp(const char *filename) { char sdp[16384]; int i; @@ -613,19 +615,18 @@ static int print_sdp(void) if (ret < 0) goto fail; - if (!sdp_filename) { + if (!filename) { printf("SDP:\n%s\n", sdp); fflush(stdout); } else { - ret = avio_open2(&sdp_pb, sdp_filename, AVIO_FLAG_WRITE, &int_cb, NULL); + ret = avio_open2(&sdp_pb, filename, AVIO_FLAG_WRITE, &int_cb, NULL); if (ret < 0) { - av_log(NULL, AV_LOG_ERROR, "Failed to open sdp file '%s'\n", sdp_filename); + av_log(NULL, AV_LOG_ERROR, "Failed to open sdp file '%s'\n", filename); goto fail; } avio_print(sdp_pb, sdp); avio_closep(&sdp_pb); - av_freep(&sdp_filename); } // SDP successfully written, allow muxer threads to start @@ -661,7 +662,7 @@ int mux_check_init(Muxer *mux) nb_output_dumped++; if (sdp_filename || want_sdp) { - ret = print_sdp(); + ret = print_sdp(sdp_filename); if (ret < 0) { av_log(NULL, AV_LOG_ERROR, "Error writing the SDP.\n"); return ret; @@ -984,6 +985,8 @@ void of_free(OutputFile **pof) ost_free(&of->streams[i]); av_freep(&of->streams); + av_freep(&mux->sch_stream_idx); + av_dict_free(&mux->opts); av_packet_free(&mux->sq_pkt); diff --git a/fftools/ffmpeg_mux.h b/fftools/ffmpeg_mux.h index a2bb4dfc7d..eee2b2cb07 100644 --- a/fftools/ffmpeg_mux.h +++ b/fftools/ffmpeg_mux.h @@ -24,6 +24,7 @@ #include #include +#include "ffmpeg_sched.h" #include "thread_queue.h" #include "libavformat/avformat.h" @@ -50,6 +51,10 @@ typedef struct MuxStream { EncStats stats; + int sch_idx; + int sch_idx_enc; + int sch_idx_src; + int64_t max_frames; /* @@ -94,6 +99,13 @@ typedef struct Muxer { AVFormatContext *fc; + Scheduler *sch; + unsigned sch_idx; + + // OutputStream indices indexed by scheduler stream indices + int *sch_stream_idx; + int nb_sch_stream_idx; + pthread_t thread; ThreadQueue *tq; diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c index 63a25a350f..534b4379c7 100644 --- a/fftools/ffmpeg_mux_init.c +++ b/fftools/ffmpeg_mux_init.c @@ -23,6 +23,7 @@ #include "cmdutils.h" #include "ffmpeg.h" #include "ffmpeg_mux.h" +#include "ffmpeg_sched.h" #include "fopen_utf8.h" #include "libavformat/avformat.h" @@ -436,6 +437,9 @@ static MuxStream *mux_stream_alloc(Muxer *mux, enum AVMediaType type) ms->ost.class = &output_stream_class; + ms->sch_idx = -1; + ms->sch_idx_enc = -1; + snprintf(ms->log_name, sizeof(ms->log_name), "%cost#%d:%d", type_str ? *type_str : '?', mux->of.index, ms->ost.index); @@ -1127,6 +1131,22 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type, if (!ms) return AVERROR(ENOMEM); + // only streams with sources (i.e. not attachments) + // are handled by the scheduler + if (ist || ofilter) { + ret = GROW_ARRAY(mux->sch_stream_idx, mux->nb_sch_stream_idx); + if (ret < 0) + return ret; + + ret = sch_add_mux_stream(mux->sch, mux->sch_idx); + if (ret < 0) + return ret; + + av_assert0(ret == mux->nb_sch_stream_idx - 1); + mux->sch_stream_idx[ret] = ms->ost.index; + ms->sch_idx = ret; + } + ost = &ms->ost; if (o->streamid) { @@ -1170,7 +1190,12 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type, if (!ost->enc_ctx) return AVERROR(ENOMEM); - ret = enc_alloc(&ost->enc, enc); + ret = sch_add_enc(mux->sch, encoder_thread, ost, NULL); + if (ret < 0) + return ret; + ms->sch_idx_enc = ret; + + ret = enc_alloc(&ost->enc, enc, mux->sch, ms->sch_idx_enc); if (ret < 0) return ret; @@ -1380,11 +1405,19 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type, ost->enc_ctx->global_quality = FF_QP2LAMBDA * qscale; } - ms->max_muxing_queue_size = 128; - MATCH_PER_STREAM_OPT(max_muxing_queue_size, i, ms->max_muxing_queue_size, oc, st); + if (ms->sch_idx >= 0) { + int max_muxing_queue_size = 128; + int muxing_queue_data_threshold = 50 * 1024 * 1024; - ms->muxing_queue_data_threshold = 50*1024*1024; - MATCH_PER_STREAM_OPT(muxing_queue_data_threshold, i, ms->muxing_queue_data_threshold, oc, st); + MATCH_PER_STREAM_OPT(max_muxing_queue_size, i, max_muxing_queue_size, oc, st); + MATCH_PER_STREAM_OPT(muxing_queue_data_threshold, i, muxing_queue_data_threshold, oc, st); + + sch_mux_stream_buffering(mux->sch, mux->sch_idx, ms->sch_idx, + max_muxing_queue_size, muxing_queue_data_threshold); + + ms->max_muxing_queue_size = max_muxing_queue_size; + ms->muxing_queue_data_threshold = muxing_queue_data_threshold; + } MATCH_PER_STREAM_OPT(bits_per_raw_sample, i, ost->bits_per_raw_sample, oc, st); @@ -1425,23 +1458,47 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type, (type == AVMEDIA_TYPE_VIDEO || type == AVMEDIA_TYPE_AUDIO)) { if (ofilter) { ost->filter = ofilter; - ret = ofilter_bind_ost(ofilter, ost); + ret = ofilter_bind_ost(ofilter, ost, ms->sch_idx_enc); if (ret < 0) return ret; } else { - ret = init_simple_filtergraph(ost->ist, ost, filters); + ret = init_simple_filtergraph(ost->ist, ost, filters, + mux->sch, ms->sch_idx_enc); if (ret < 0) { av_log(ost, AV_LOG_ERROR, "Error initializing a simple filtergraph\n"); return ret; } } + + ret = sch_connect(mux->sch, SCH_ENC(ms->sch_idx_enc), + SCH_MSTREAM(mux->sch_idx, ms->sch_idx)); + if (ret < 0) + return ret; } else if (ost->ist) { - ret = ist_output_add(ost->ist, ost); - if (ret < 0) { + int sched_idx = ist_output_add(ost->ist, ost); + if (sched_idx < 0) { av_log(ost, AV_LOG_ERROR, "Error binding an input stream\n"); - return ret; + return sched_idx; + } + ms->sch_idx_src = sched_idx; + + if (ost->enc) { + ret = sch_connect(mux->sch, SCH_DEC(sched_idx), + SCH_ENC(ms->sch_idx_enc)); + if (ret < 0) + return ret; + + ret = sch_connect(mux->sch, SCH_ENC(ms->sch_idx_enc), + SCH_MSTREAM(mux->sch_idx, ms->sch_idx)); + if (ret < 0) + return ret; + } else { + ret = sch_connect(mux->sch, SCH_DSTREAM(ost->ist->file_index, sched_idx), + SCH_MSTREAM(ost->file_index, ms->sch_idx)); + if (ret < 0) + return ret; } } @@ -1837,6 +1894,26 @@ static int create_streams(Muxer *mux, const OptionsContext *o) if (ret < 0) return ret; + // setup fix_sub_duration_heartbeat mappings + for (unsigned i = 0; i < oc->nb_streams; i++) { + MuxStream *src = ms_from_ost(mux->of.streams[i]); + + if (!src->ost.fix_sub_duration_heartbeat) + continue; + + for (unsigned j = 0; j < oc->nb_streams; j++) { + MuxStream *dst = ms_from_ost(mux->of.streams[j]); + + if (src == dst || dst->ost.type != AVMEDIA_TYPE_SUBTITLE || + !dst->ost.enc || !dst->ost.ist || !dst->ost.ist->fix_sub_duration) + continue; + + ret = sch_mux_sub_heartbeat_add(mux->sch, mux->sch_idx, src->sch_idx, + dst->sch_idx_src); + + } + } + if (!oc->nb_streams && !(oc->oformat->flags & AVFMT_NOSTREAMS)) { av_dump_format(oc, nb_output_files - 1, oc->url, 1); av_log(mux, AV_LOG_ERROR, "Output file does not contain any stream\n"); @@ -2621,7 +2698,7 @@ static Muxer *mux_alloc(void) return mux; } -int of_open(const OptionsContext *o, const char *filename) +int of_open(const OptionsContext *o, const char *filename, Scheduler *sch) { Muxer *mux; AVFormatContext *oc; @@ -2691,6 +2768,13 @@ int of_open(const OptionsContext *o, const char *filename) AVFMT_FLAG_BITEXACT); } + err = sch_add_mux(sch, muxer_thread, NULL, mux, + !strcmp(oc->oformat->name, "rtp")); + if (err < 0) + return err; + mux->sch = sch; + mux->sch_idx = err; + /* create all output streams for this file */ err = create_streams(mux, o); if (err < 0) diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c index 304471dd03..d463306546 100644 --- a/fftools/ffmpeg_opt.c +++ b/fftools/ffmpeg_opt.c @@ -28,6 +28,7 @@ #endif #include "ffmpeg.h" +#include "ffmpeg_sched.h" #include "cmdutils.h" #include "opt_common.h" #include "sync_queue.h" @@ -1157,20 +1158,22 @@ static int opt_audio_qscale(void *optctx, const char *opt, const char *arg) static int opt_filter_complex(void *optctx, const char *opt, const char *arg) { + Scheduler *sch = optctx; char *graph_desc = av_strdup(arg); if (!graph_desc) return AVERROR(ENOMEM); - return fg_create(NULL, graph_desc); + return fg_create(NULL, graph_desc, sch); } static int opt_filter_complex_script(void *optctx, const char *opt, const char *arg) { + Scheduler *sch = optctx; char *graph_desc = file_read(arg); if (!graph_desc) return AVERROR(EINVAL); - return fg_create(NULL, graph_desc); + return fg_create(NULL, graph_desc, sch); } void show_help_default(const char *opt, const char *arg) @@ -1262,8 +1265,9 @@ static const OptionGroupDef groups[] = { [GROUP_INFILE] = { "input url", "i", OPT_INPUT }, }; -static int open_files(OptionGroupList *l, const char *inout, - int (*open_file)(const OptionsContext*, const char*)) +static int open_files(OptionGroupList *l, const char *inout, Scheduler *sch, + int (*open_file)(const OptionsContext*, const char*, + Scheduler*)) { int i, ret; @@ -1283,7 +1287,7 @@ static int open_files(OptionGroupList *l, const char *inout, } av_log(NULL, AV_LOG_DEBUG, "Opening an %s file: %s.\n", inout, g->arg); - ret = open_file(&o, g->arg); + ret = open_file(&o, g->arg, sch); uninit_options(&o); if (ret < 0) { av_log(NULL, AV_LOG_ERROR, "Error opening %s file %s.\n", @@ -1296,7 +1300,7 @@ static int open_files(OptionGroupList *l, const char *inout, return 0; } -int ffmpeg_parse_options(int argc, char **argv) +int ffmpeg_parse_options(int argc, char **argv, Scheduler *sch) { OptionParseContext octx; const char *errmsg = NULL; @@ -1313,7 +1317,7 @@ int ffmpeg_parse_options(int argc, char **argv) } /* apply global options */ - ret = parse_optgroup(NULL, &octx.global_opts); + ret = parse_optgroup(sch, &octx.global_opts); if (ret < 0) { errmsg = "parsing global options"; goto fail; @@ -1323,7 +1327,7 @@ int ffmpeg_parse_options(int argc, char **argv) term_init(); /* open input files */ - ret = open_files(&octx.groups[GROUP_INFILE], "input", ifile_open); + ret = open_files(&octx.groups[GROUP_INFILE], "input", sch, ifile_open); if (ret < 0) { errmsg = "opening input files"; goto fail; @@ -1337,7 +1341,7 @@ int ffmpeg_parse_options(int argc, char **argv) } /* open output files */ - ret = open_files(&octx.groups[GROUP_OUTFILE], "output", of_open); + ret = open_files(&octx.groups[GROUP_OUTFILE], "output", sch, of_open); if (ret < 0) { errmsg = "opening output files"; goto fail; diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c new file mode 100644 index 0000000000..51144a5d3f --- /dev/null +++ b/fftools/ffmpeg_sched.c @@ -0,0 +1,2174 @@ +/* + * Inter-thread scheduling/synchronization. + * Copyright (c) 2023 Anton Khirnov + * + * This file is part of FFmpeg. + * + * FFmpeg is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * FFmpeg is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with FFmpeg; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include +#include + +#include "cmdutils.h" +#include "ffmpeg_sched.h" +#include "ffmpeg_utils.h" +#include "sync_queue.h" +#include "thread_queue.h" + +#include "libavcodec/packet.h" + +#include "libavutil/avassert.h" +#include "libavutil/error.h" +#include "libavutil/fifo.h" +#include "libavutil/frame.h" +#include "libavutil/mem.h" +#include "libavutil/thread.h" +#include "libavutil/threadmessage.h" +#include "libavutil/time.h" + +// 100 ms +// FIXME: some other value? make this dynamic? +#define SCHEDULE_TOLERANCE (100 * 1000) + +enum QueueType { + QUEUE_PACKETS, + QUEUE_FRAMES, +}; + +typedef struct SchWaiter { + pthread_mutex_t lock; + pthread_cond_t cond; + atomic_int choked; + + // the following are internal state of schedule_update_locked() and must not + // be accessed outside of it + int choked_prev; + int choked_next; +} SchWaiter; + +typedef struct SchTask { + Scheduler *parent; + SchedulerNode node; + + SchThreadFunc func; + void *func_arg; + + pthread_t thread; + int thread_running; +} SchTask; + +typedef struct SchDec { + const AVClass *class; + + SchedulerNode src; + SchedulerNode *dst; + uint8_t *dst_finished; + unsigned nb_dst; + + SchTask task; + // Queue for receiving input packets, one stream. + ThreadQueue *queue; + + // Queue for sending post-flush end timestamps back to the source + AVThreadMessageQueue *queue_end_ts; + int expect_end_ts; + + // temporary storage used by sch_dec_send() + AVFrame *send_frame; +} SchDec; + +typedef struct SchSyncQueue { + SyncQueue *sq; + AVFrame *frame; + pthread_mutex_t lock; + + unsigned *enc_idx; + unsigned nb_enc_idx; +} SchSyncQueue; + +typedef struct SchEnc { + const AVClass *class; + + SchedulerNode src; + SchedulerNode dst; + + // [0] - index of the sync queue in Scheduler.sq_enc, + // [1] - index of this encoder in the sq + int sq_idx[2]; + + /* Opening encoders is somewhat nontrivial due to their interaction with + * sync queues, which are (among other things) responsible for maintaining + * constant audio frame size, when it is required by the encoder. + * + * Opening the encoder requires stream parameters, obtained from the first + * frame. However, that frame cannot be properly chunked by the sync queue + * without knowing the required frame size, which is only available after + * opening the encoder. + * + * This apparent circular dependency is resolved in the following way: + * - the caller creating the encoder gives us a callback which opens the + * encoder and returns the required frame size (if any) + * - when the first frame is sent to the encoder, the sending thread + * - calls this callback, opening the encoder + * - passes the returned frame size to the sync queue + */ + int (*open_cb)(void *opaque, const AVFrame *frame); + int opened; + + SchTask task; + // Queue for receiving input frames, one stream. + ThreadQueue *queue; + // tq_send() to queue returned EOF + int in_finished; +} SchEnc; + +typedef struct SchDemuxStream { + SchedulerNode *dst; + uint8_t *dst_finished; + unsigned nb_dst; +} SchDemuxStream; + +typedef struct SchDemux { + const AVClass *class; + + SchDemuxStream *streams; + unsigned nb_streams; + + SchTask task; + SchWaiter waiter; + + // temporary storage used by sch_demux_send() + AVPacket *send_pkt; +} SchDemux; + +typedef struct PreMuxQueue { + /** + * Queue for buffering the packets before the muxer task can be started. + */ + AVFifo *fifo; + /** + * Maximum number of packets in fifo. + */ + int max_packets; + /* + * The size of the AVPackets' buffers in queue. + * Updated when a packet is either pushed or pulled from the queue. + */ + size_t data_size; + /* Threshold after which max_packets will be in effect */ + size_t data_threshold; +} PreMuxQueue; + +typedef struct SchMuxStream { + SchedulerNode src; + SchedulerNode src_sched; + + unsigned *sub_heartbeat_dst; + unsigned nb_sub_heartbeat_dst; + + PreMuxQueue pre_mux_queue; + + //////////////////////////////////////////////////////////// + // The following are protected by Scheduler.schedule_lock // + + /* dts of the last packet sent to this stream + in AV_TIME_BASE_Q */ + int64_t last_dts; + // this stream no longer accepts input + int source_finished; + //////////////////////////////////////////////////////////// +} SchMuxStream; + +typedef struct SchMux { + const AVClass *class; + + SchMuxStream *streams; + unsigned nb_streams; + unsigned nb_streams_ready; + + int (*init)(void *arg); + + SchTask task; + /** + * Set to 1 after starting the muxer task and flushing the + * pre-muxing queues. + * Set either before any tasks have started, or with + * Scheduler.mux_ready_lock held. + */ + atomic_int mux_started; + ThreadQueue *queue; + + AVPacket *sub_heartbeat_pkt; +} SchMux; + +typedef struct SchFilterIn { + SchedulerNode src; + SchedulerNode src_sched; + int send_finished; +} SchFilterIn; + +typedef struct SchFilterOut { + SchedulerNode dst; +} SchFilterOut; + +typedef struct SchFilterGraph { + const AVClass *class; + + SchFilterIn *inputs; + unsigned nb_inputs; + atomic_uint nb_inputs_finished; + + SchFilterOut *outputs; + unsigned nb_outputs; + + SchTask task; + // input queue, nb_inputs+1 streams + // last stream is control + ThreadQueue *queue; + SchWaiter waiter; + + // protected by schedule_lock + unsigned best_input; +} SchFilterGraph; + +struct Scheduler { + const AVClass *class; + + SchDemux *demux; + unsigned nb_demux; + + SchMux *mux; + unsigned nb_mux; + + unsigned nb_mux_ready; + pthread_mutex_t mux_ready_lock; + + unsigned nb_mux_done; + pthread_mutex_t mux_done_lock; + pthread_cond_t mux_done_cond; + + + SchDec *dec; + unsigned nb_dec; + + SchEnc *enc; + unsigned nb_enc; + + SchSyncQueue *sq_enc; + unsigned nb_sq_enc; + + SchFilterGraph *filters; + unsigned nb_filters; + + char *sdp_filename; + int sdp_auto; + + int transcode_started; + atomic_int terminate; + atomic_int task_failed; + + pthread_mutex_t schedule_lock; + + atomic_int_least64_t last_dts; +}; + +/** + * Wait until this task is allowed to proceed. + * + * @retval 0 the caller should proceed + * @retval 1 the caller should terminate + */ +static int waiter_wait(Scheduler *sch, SchWaiter *w) +{ + int terminate; + + if (!atomic_load(&w->choked)) + return 0; + + pthread_mutex_lock(&w->lock); + + while (atomic_load(&w->choked) && !atomic_load(&sch->terminate)) + pthread_cond_wait(&w->cond, &w->lock); + + terminate = atomic_load(&sch->terminate); + + pthread_mutex_unlock(&w->lock); + + return terminate; +} + +static void waiter_set(SchWaiter *w, int choked) +{ + pthread_mutex_lock(&w->lock); + + atomic_store(&w->choked, choked); + pthread_cond_signal(&w->cond); + + pthread_mutex_unlock(&w->lock); +} + +static int waiter_init(SchWaiter *w) +{ + int ret; + + atomic_init(&w->choked, 0); + + ret = pthread_mutex_init(&w->lock, NULL); + if (ret) + return AVERROR(ret); + + ret = pthread_cond_init(&w->cond, NULL); + if (ret) + return AVERROR(ret); + + return 0; +} + +static void waiter_uninit(SchWaiter *w) +{ + pthread_mutex_destroy(&w->lock); + pthread_cond_destroy(&w->cond); +} + +static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size, + enum QueueType type) +{ + ThreadQueue *tq; + ObjPool *op; + + op = (type == QUEUE_PACKETS) ? objpool_alloc_packets() : + objpool_alloc_frames(); + if (!op) + return AVERROR(ENOMEM); + + tq = tq_alloc(nb_streams, queue_size, op, + (type == QUEUE_PACKETS) ? pkt_move : frame_move); + if (!tq) { + objpool_free(&op); + return AVERROR(ENOMEM); + } + + *ptq = tq; + return 0; +} + +static void *task_wrapper(void *arg); + +static int task_stop(SchTask *task) +{ + int ret; + void *thread_ret; + + if (!task->thread_running) + return 0; + + ret = pthread_join(task->thread, &thread_ret); + av_assert0(ret == 0); + + task->thread_running = 0; + + return (intptr_t)thread_ret; +} + +static int task_start(SchTask *task) +{ + int ret; + + av_log(task->func_arg, AV_LOG_VERBOSE, "Starting thread...\n"); + + av_assert0(!task->thread_running); + + ret = pthread_create(&task->thread, NULL, task_wrapper, task); + if (ret) { + av_log(task->func_arg, AV_LOG_ERROR, "pthread_create() failed: %s\n", + strerror(ret)); + return AVERROR(ret); + } + + task->thread_running = 1; + return 0; +} + +static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx, + SchThreadFunc func, void *func_arg) +{ + task->parent = sch; + + task->node.type = type; + task->node.idx = idx; + + task->func = func; + task->func_arg = func_arg; +} + +int sch_stop(Scheduler *sch) +{ + int ret = 0, err; + + atomic_store(&sch->terminate, 1); + + for (unsigned type = 0; type < 2; type++) + for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) { + SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter; + waiter_set(w, 1); + } + + for (unsigned i = 0; i < sch->nb_demux; i++) { + SchDemux *d = &sch->demux[i]; + + err = task_stop(&d->task); + ret = err_merge(ret, err); + } + + for (unsigned i = 0; i < sch->nb_dec; i++) { + SchDec *dec = &sch->dec[i]; + + err = task_stop(&dec->task); + ret = err_merge(ret, err); + } + + for (unsigned i = 0; i < sch->nb_filters; i++) { + SchFilterGraph *fg = &sch->filters[i]; + + err = task_stop(&fg->task); + ret = err_merge(ret, err); + } + + for (unsigned i = 0; i < sch->nb_enc; i++) { + SchEnc *enc = &sch->enc[i]; + + err = task_stop(&enc->task); + ret = err_merge(ret, err); + } + + for (unsigned i = 0; i < sch->nb_mux; i++) { + SchMux *mux = &sch->mux[i]; + + err = task_stop(&mux->task); + ret = err_merge(ret, err); + } + + return ret; +} + +void sch_free(Scheduler **psch) +{ + Scheduler *sch = *psch; + + if (!sch) + return; + + sch_stop(sch); + + for (unsigned i = 0; i < sch->nb_demux; i++) { + SchDemux *d = &sch->demux[i]; + + for (unsigned j = 0; j < d->nb_streams; j++) { + SchDemuxStream *ds = &d->streams[j]; + av_freep(&ds->dst); + av_freep(&ds->dst_finished); + } + av_freep(&d->streams); + + av_packet_free(&d->send_pkt); + + waiter_uninit(&d->waiter); + } + av_freep(&sch->demux); + + for (unsigned i = 0; i < sch->nb_mux; i++) { + SchMux *mux = &sch->mux[i]; + + for (unsigned j = 0; j < mux->nb_streams; j++) { + SchMuxStream *ms = &mux->streams[j]; + + if (ms->pre_mux_queue.fifo) { + AVPacket *pkt; + while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0) + av_packet_free(&pkt); + av_fifo_freep2(&ms->pre_mux_queue.fifo); + } + + av_freep(&ms->sub_heartbeat_dst); + } + av_freep(&mux->streams); + + av_packet_free(&mux->sub_heartbeat_pkt); + + tq_free(&mux->queue); + } + av_freep(&sch->mux); + + for (unsigned i = 0; i < sch->nb_dec; i++) { + SchDec *dec = &sch->dec[i]; + + tq_free(&dec->queue); + + av_thread_message_queue_free(&dec->queue_end_ts); + + av_freep(&dec->dst); + av_freep(&dec->dst_finished); + + av_frame_free(&dec->send_frame); + } + av_freep(&sch->dec); + + for (unsigned i = 0; i < sch->nb_enc; i++) { + SchEnc *enc = &sch->enc[i]; + + tq_free(&enc->queue); + } + av_freep(&sch->enc); + + for (unsigned i = 0; i < sch->nb_sq_enc; i++) { + SchSyncQueue *sq = &sch->sq_enc[i]; + sq_free(&sq->sq); + av_frame_free(&sq->frame); + pthread_mutex_destroy(&sq->lock); + av_freep(&sq->enc_idx); + } + av_freep(&sch->sq_enc); + + for (unsigned i = 0; i < sch->nb_filters; i++) { + SchFilterGraph *fg = &sch->filters[i]; + + tq_free(&fg->queue); + + av_freep(&fg->inputs); + av_freep(&fg->outputs); + + waiter_uninit(&fg->waiter); + } + av_freep(&sch->filters); + + av_freep(&sch->sdp_filename); + + pthread_mutex_destroy(&sch->mux_ready_lock); + + pthread_mutex_destroy(&sch->mux_done_lock); + pthread_cond_destroy(&sch->mux_done_cond); + + av_freep(psch); +} + +static const AVClass scheduler_class = { + .class_name = "Scheduler", + .version = LIBAVUTIL_VERSION_INT, +}; + +Scheduler *sch_alloc(void) +{ + Scheduler *sch; + int ret; + + sch = av_mallocz(sizeof(*sch)); + if (!sch) + return NULL; + + sch->class = &scheduler_class; + sch->sdp_auto = 1; + + ret = pthread_mutex_init(&sch->mux_ready_lock, NULL); + if (ret) + goto fail; + + ret = pthread_mutex_init(&sch->mux_done_lock, NULL); + if (ret) + goto fail; + + ret = pthread_cond_init(&sch->mux_done_cond, NULL); + if (ret) + goto fail; + + return sch; +fail: + sch_free(&sch); + return NULL; +} + +int sch_sdp_filename(Scheduler *sch, const char *sdp_filename) +{ + av_freep(&sch->sdp_filename); + sch->sdp_filename = av_strdup(sdp_filename); + return sch->sdp_filename ? 0 : AVERROR(ENOMEM); +} + +static const AVClass sch_mux_class = { + .class_name = "SchMux", + .version = LIBAVUTIL_VERSION_INT, + .parent_log_context_offset = offsetof(SchMux, task.func_arg), +}; + +int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *), + void *arg, int sdp_auto) +{ + const unsigned idx = sch->nb_mux; + + SchMux *mux; + int ret; + + ret = GROW_ARRAY(sch->mux, sch->nb_mux); + if (ret < 0) + return ret; + + mux = &sch->mux[idx]; + mux->class = &sch_mux_class; + mux->init = init; + + task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg); + + sch->sdp_auto &= sdp_auto; + + return idx; +} + +int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx) +{ + SchMux *mux; + SchMuxStream *ms; + unsigned stream_idx; + int ret; + + av_assert0(mux_idx < sch->nb_mux); + mux = &sch->mux[mux_idx]; + + ret = GROW_ARRAY(mux->streams, mux->nb_streams); + if (ret < 0) + return ret; + stream_idx = mux->nb_streams - 1; + + ms = &mux->streams[stream_idx]; + + ms->pre_mux_queue.fifo = av_fifo_alloc2(8, sizeof(AVPacket*), 0); + if (!ms->pre_mux_queue.fifo) + return AVERROR(ENOMEM); + + ms->last_dts = AV_NOPTS_VALUE; + + return stream_idx; +} + +static const AVClass sch_demux_class = { + .class_name = "SchDemux", + .version = LIBAVUTIL_VERSION_INT, + .parent_log_context_offset = offsetof(SchDemux, task.func_arg), +}; + +int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx) +{ + const unsigned idx = sch->nb_demux; + + SchDemux *d; + int ret; + + ret = GROW_ARRAY(sch->demux, sch->nb_demux); + if (ret < 0) + return ret; + + d = &sch->demux[idx]; + + task_init(sch, &d->task, SCH_NODE_TYPE_DEMUX, idx, func, ctx); + + d->class = &sch_demux_class; + d->send_pkt = av_packet_alloc(); + if (!d->send_pkt) + return AVERROR(ENOMEM); + + ret = waiter_init(&d->waiter); + if (ret < 0) + return ret; + + return idx; +} + +int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx) +{ + SchDemux *d; + int ret; + + av_assert0(demux_idx < sch->nb_demux); + d = &sch->demux[demux_idx]; + + ret = GROW_ARRAY(d->streams, d->nb_streams); + return ret < 0 ? ret : d->nb_streams - 1; +} + +static const AVClass sch_dec_class = { + .class_name = "SchDec", + .version = LIBAVUTIL_VERSION_INT, + .parent_log_context_offset = offsetof(SchDec, task.func_arg), +}; + +int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, + int send_end_ts) +{ + const unsigned idx = sch->nb_dec; + + SchDec *dec; + int ret; + + ret = GROW_ARRAY(sch->dec, sch->nb_dec); + if (ret < 0) + return ret; + + dec = &sch->dec[idx]; + + task_init(sch, &dec->task, SCH_NODE_TYPE_DEC, idx, func, ctx); + + dec->class = &sch_dec_class; + dec->send_frame = av_frame_alloc(); + if (!dec->send_frame) + return AVERROR(ENOMEM); + + ret = queue_alloc(&dec->queue, 1, 1, QUEUE_PACKETS); + if (ret < 0) + return ret; + + if (send_end_ts) { + ret = av_thread_message_queue_alloc(&dec->queue_end_ts, 1, sizeof(Timestamp)); + if (ret < 0) + return ret; + } + + return idx; +} + +static const AVClass sch_enc_class = { + .class_name = "SchEnc", + .version = LIBAVUTIL_VERSION_INT, + .parent_log_context_offset = offsetof(SchEnc, task.func_arg), +}; + +int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx, + int (*open_cb)(void *opaque, const AVFrame *frame)) +{ + const unsigned idx = sch->nb_enc; + + SchEnc *enc; + int ret; + + ret = GROW_ARRAY(sch->enc, sch->nb_enc); + if (ret < 0) + return ret; + + enc = &sch->enc[idx]; + + enc->class = &sch_enc_class; + enc->open_cb = open_cb; + enc->sq_idx[0] = -1; + enc->sq_idx[1] = -1; + + task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx); + + ret = queue_alloc(&enc->queue, 1, 1, QUEUE_FRAMES); + if (ret < 0) + return ret; + + return idx; +} + +static const AVClass sch_fg_class = { + .class_name = "SchFilterGraph", + .version = LIBAVUTIL_VERSION_INT, + .parent_log_context_offset = offsetof(SchFilterGraph, task.func_arg), +}; + +int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs, + SchThreadFunc func, void *ctx) +{ + const unsigned idx = sch->nb_filters; + + SchFilterGraph *fg; + int ret; + + ret = GROW_ARRAY(sch->filters, sch->nb_filters); + if (ret < 0) + return ret; + fg = &sch->filters[idx]; + + fg->class = &sch_fg_class; + + task_init(sch, &fg->task, SCH_NODE_TYPE_FILTER_IN, idx, func, ctx); + + if (nb_inputs) { + fg->inputs = av_calloc(nb_inputs, sizeof(*fg->inputs)); + if (!fg->inputs) + return AVERROR(ENOMEM); + fg->nb_inputs = nb_inputs; + } + + if (nb_outputs) { + fg->outputs = av_calloc(nb_outputs, sizeof(*fg->outputs)); + if (!fg->outputs) + return AVERROR(ENOMEM); + fg->nb_outputs = nb_outputs; + } + + ret = waiter_init(&fg->waiter); + if (ret < 0) + return ret; + + ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 1, QUEUE_FRAMES); + if (ret < 0) + return ret; + + return idx; +} + +int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx) +{ + SchSyncQueue *sq; + int ret; + + ret = GROW_ARRAY(sch->sq_enc, sch->nb_sq_enc); + if (ret < 0) + return ret; + sq = &sch->sq_enc[sch->nb_sq_enc - 1]; + + sq->sq = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, logctx); + if (!sq->sq) + return AVERROR(ENOMEM); + + sq->frame = av_frame_alloc(); + if (!sq->frame) + return AVERROR(ENOMEM); + + ret = pthread_mutex_init(&sq->lock, NULL); + if (ret) + return AVERROR(ret); + + return sq - sch->sq_enc; +} + +int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx, + int limiting, uint64_t max_frames) +{ + SchSyncQueue *sq; + SchEnc *enc; + int ret; + + av_assert0(sq_idx < sch->nb_sq_enc); + sq = &sch->sq_enc[sq_idx]; + + av_assert0(enc_idx < sch->nb_enc); + enc = &sch->enc[enc_idx]; + + ret = GROW_ARRAY(sq->enc_idx, sq->nb_enc_idx); + if (ret < 0) + return ret; + sq->enc_idx[sq->nb_enc_idx - 1] = enc_idx; + + ret = sq_add_stream(sq->sq, limiting); + if (ret < 0) + return ret; + + enc->sq_idx[0] = sq_idx; + enc->sq_idx[1] = ret; + + if (max_frames != INT64_MAX) + sq_limit_frames(sq->sq, enc->sq_idx[1], max_frames); + + return 0; +} + +int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst) +{ + int ret; + + switch (src.type) { + case SCH_NODE_TYPE_DEMUX: { + SchDemuxStream *ds; + + av_assert0(src.idx < sch->nb_demux && + src.idx_stream < sch->demux[src.idx].nb_streams); + ds = &sch->demux[src.idx].streams[src.idx_stream]; + + ret = GROW_ARRAY(ds->dst, ds->nb_dst); + if (ret < 0) + return ret; + + ds->dst[ds->nb_dst - 1] = dst; + + // demuxed packets go to decoding or streamcopy + switch (dst.type) { + case SCH_NODE_TYPE_DEC: { + SchDec *dec; + + av_assert0(dst.idx < sch->nb_dec); + dec = &sch->dec[dst.idx]; + + av_assert0(!dec->src.type); + dec->src = src; + break; + } + case SCH_NODE_TYPE_MUX: { + SchMuxStream *ms; + + av_assert0(dst.idx < sch->nb_mux && + dst.idx_stream < sch->mux[dst.idx].nb_streams); + ms = &sch->mux[dst.idx].streams[dst.idx_stream]; + + av_assert0(!ms->src.type); + ms->src = src; + + break; + } + default: av_assert0(0); + } + + break; + } + case SCH_NODE_TYPE_DEC: { + SchDec *dec; + + av_assert0(src.idx < sch->nb_dec); + dec = &sch->dec[src.idx]; + + ret = GROW_ARRAY(dec->dst, dec->nb_dst); + if (ret < 0) + return ret; + + dec->dst[dec->nb_dst - 1] = dst; + + // decoded frames go to filters or encoding + switch (dst.type) { + case SCH_NODE_TYPE_FILTER_IN: { + SchFilterIn *fi; + + av_assert0(dst.idx < sch->nb_filters && + dst.idx_stream < sch->filters[dst.idx].nb_inputs); + fi = &sch->filters[dst.idx].inputs[dst.idx_stream]; + + av_assert0(!fi->src.type); + fi->src = src; + break; + } + case SCH_NODE_TYPE_ENC: { + SchEnc *enc; + + av_assert0(dst.idx < sch->nb_enc); + enc = &sch->enc[dst.idx]; + + av_assert0(!enc->src.type); + enc->src = src; + break; + } + default: av_assert0(0); + } + + break; + } + case SCH_NODE_TYPE_FILTER_OUT: { + SchFilterOut *fo; + SchEnc *enc; + + av_assert0(src.idx < sch->nb_filters && + src.idx_stream < sch->filters[src.idx].nb_outputs); + // filtered frames go to encoding + av_assert0(dst.type == SCH_NODE_TYPE_ENC && + dst.idx < sch->nb_enc); + + fo = &sch->filters[src.idx].outputs[src.idx_stream]; + enc = &sch->enc[dst.idx]; + + av_assert0(!fo->dst.type && !enc->src.type); + fo->dst = dst; + enc->src = src; + + break; + } + case SCH_NODE_TYPE_ENC: { + SchEnc *enc; + SchMuxStream *ms; + + av_assert0(src.idx < sch->nb_enc); + // encoding packets go to muxing + av_assert0(dst.type == SCH_NODE_TYPE_MUX && + dst.idx < sch->nb_mux && + dst.idx_stream < sch->mux[dst.idx].nb_streams); + enc = &sch->enc[src.idx]; + ms = &sch->mux[dst.idx].streams[dst.idx_stream]; + + av_assert0(!enc->dst.type && !ms->src.type); + enc->dst = dst; + ms->src = src; + + break; + } + default: av_assert0(0); + } + + return 0; +} + +static int mux_task_start(SchMux *mux) +{ + int ret = 0; + + ret = task_start(&mux->task); + if (ret < 0) + return ret; + + /* flush the pre-muxing queues */ + for (unsigned i = 0; i < mux->nb_streams; i++) { + SchMuxStream *ms = &mux->streams[i]; + AVPacket *pkt; + int finished = 0; + + while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0) { + if (pkt) { + if (!finished) + ret = tq_send(mux->queue, i, pkt); + av_packet_free(&pkt); + if (ret == AVERROR_EOF) + finished = 1; + else if (ret < 0) + return ret; + } else + tq_send_finish(mux->queue, i); + } + } + + atomic_store(&mux->mux_started, 1); + + return 0; +} + +int print_sdp(const char *filename); + +static int mux_init(Scheduler *sch, SchMux *mux) +{ + int ret; + + ret = mux->init(mux->task.func_arg); + if (ret < 0) + return ret; + + sch->nb_mux_ready++; + + if (sch->sdp_filename || sch->sdp_auto) { + if (sch->nb_mux_ready < sch->nb_mux) + return 0; + + ret = print_sdp(sch->sdp_filename); + if (ret < 0) { + av_log(sch, AV_LOG_ERROR, "Error writing the SDP.\n"); + return ret; + } + + /* SDP is written only after all the muxers are ready, so now we + * start ALL the threads */ + for (unsigned i = 0; i < sch->nb_mux; i++) { + ret = mux_task_start(&sch->mux[i]); + if (ret < 0) + return ret; + } + } else { + ret = mux_task_start(mux); + if (ret < 0) + return ret; + } + + return 0; +} + +void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, + size_t data_threshold, int max_packets) +{ + SchMux *mux; + SchMuxStream *ms; + + av_assert0(mux_idx < sch->nb_mux); + mux = &sch->mux[mux_idx]; + + av_assert0(stream_idx < mux->nb_streams); + ms = &mux->streams[stream_idx]; + + ms->pre_mux_queue.max_packets = max_packets; + ms->pre_mux_queue.data_threshold = data_threshold; +} + +int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx) +{ + SchMux *mux; + int ret = 0; + + av_assert0(mux_idx < sch->nb_mux); + mux = &sch->mux[mux_idx]; + + av_assert0(stream_idx < mux->nb_streams); + + pthread_mutex_lock(&sch->mux_ready_lock); + + av_assert0(mux->nb_streams_ready < mux->nb_streams); + + // this may be called during initialization - do not start + // threads before sch_start() is called + if (++mux->nb_streams_ready == mux->nb_streams && sch->transcode_started) + ret = mux_init(sch, mux); + + pthread_mutex_unlock(&sch->mux_ready_lock); + + return ret; +} + +int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, + unsigned dec_idx) +{ + SchMux *mux; + SchMuxStream *ms; + int ret = 0; + + av_assert0(mux_idx < sch->nb_mux); + mux = &sch->mux[mux_idx]; + + av_assert0(stream_idx < mux->nb_streams); + ms = &mux->streams[stream_idx]; + + ret = GROW_ARRAY(ms->sub_heartbeat_dst, ms->nb_sub_heartbeat_dst); + if (ret < 0) + return ret; + + av_assert0(dec_idx < sch->nb_dec); + ms->sub_heartbeat_dst[ms->nb_sub_heartbeat_dst - 1] = dec_idx; + + if (!mux->sub_heartbeat_pkt) { + mux->sub_heartbeat_pkt = av_packet_alloc(); + if (!mux->sub_heartbeat_pkt) + return AVERROR(ENOMEM); + } + + return 0; +} + +static int64_t trailing_dts(const Scheduler *sch) +{ + int64_t min_dts = INT64_MAX; + + for (unsigned i = 0; i < sch->nb_mux; i++) { + const SchMux *mux = &sch->mux[i]; + + for (unsigned j = 0; j < mux->nb_streams; j++) { + const SchMuxStream *ms = &mux->streams[j]; + + if (ms->source_finished) + continue; + if (ms->last_dts == AV_NOPTS_VALUE) + return AV_NOPTS_VALUE; + + min_dts = FFMIN(min_dts, ms->last_dts); + } + } + + return min_dts == INT64_MAX ? AV_NOPTS_VALUE : min_dts; +} + +static void schedule_update_locked(Scheduler *sch) +{ + int64_t dts; + + // on termination request all waiters are choked, + // we are not to unchoke them + if (atomic_load(&sch->terminate)) + return; + + dts = trailing_dts(sch); + + atomic_store(&sch->last_dts, dts); + + // initialize our internal state + for (unsigned type = 0; type < 2; type++) + for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) { + SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter; + w->choked_prev = atomic_load(&w->choked); + w->choked_next = 1; + } + + // figure out the sources that are allowed to proceed + for (unsigned i = 0; i < sch->nb_mux; i++) { + SchMux *mux = &sch->mux[i]; + + for (unsigned j = 0; j < mux->nb_streams; j++) { + SchMuxStream *ms = &mux->streams[j]; + SchDemux *d; + + // unblock sources for output streams that are not finished + // and not too far ahead of the trailing stream + if (ms->source_finished) + continue; + if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE) + continue; + if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE) + continue; + + // for outputs fed from filtergraphs, consider that filtergraph's + // best_input information, in other cases there is a well-defined + // source demuxer + if (ms->src_sched.type == SCH_NODE_TYPE_FILTER_OUT) { + SchFilterGraph *fg = &sch->filters[ms->src_sched.idx]; + SchFilterIn *fi; + + // the filtergraph contains internal sources and + // requested to be scheduled directly + if (fg->best_input == fg->nb_inputs) { + fg->waiter.choked_next = 0; + continue; + } + + fi = &fg->inputs[fg->best_input]; + d = &sch->demux[fi->src_sched.idx]; + } else + d = &sch->demux[ms->src_sched.idx]; + + d->waiter.choked_next = 0; + } + } + + for (unsigned type = 0; type < 2; type++) + for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) { + SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter; + if (w->choked_prev != w->choked_next) + waiter_set(w, w->choked_next); + } + +} + +int sch_start(Scheduler *sch) +{ + int ret; + + sch->transcode_started = 1; + + for (unsigned i = 0; i < sch->nb_mux; i++) { + SchMux *mux = &sch->mux[i]; + + for (unsigned j = 0; j < mux->nb_streams; j++) { + SchMuxStream *ms = &mux->streams[j]; + + switch (ms->src.type) { + case SCH_NODE_TYPE_ENC: { + SchEnc *enc = &sch->enc[ms->src.idx]; + if (enc->src.type == SCH_NODE_TYPE_DEC) { + ms->src_sched = sch->dec[enc->src.idx].src; + av_assert0(ms->src_sched.type == SCH_NODE_TYPE_DEMUX); + } else { + ms->src_sched = enc->src; + av_assert0(ms->src_sched.type == SCH_NODE_TYPE_FILTER_OUT); + } + break; + } + case SCH_NODE_TYPE_DEMUX: + ms->src_sched = ms->src; + break; + default: + av_log(mux, AV_LOG_ERROR, + "Muxer stream #%u not connected to a source\n", j); + return AVERROR(EINVAL); + } + } + + ret = queue_alloc(&mux->queue, mux->nb_streams, 1, QUEUE_PACKETS); + if (ret < 0) + return ret; + + if (mux->nb_streams_ready == mux->nb_streams) { + ret = mux_init(sch, mux); + if (ret < 0) + return ret; + } + } + + for (unsigned i = 0; i < sch->nb_enc; i++) { + SchEnc *enc = &sch->enc[i]; + + if (!enc->src.type) { + av_log(enc, AV_LOG_ERROR, + "Encoder not connected to a source\n"); + return AVERROR(EINVAL); + } + if (!enc->dst.type) { + av_log(enc, AV_LOG_ERROR, + "Encoder not connected to a sink\n"); + return AVERROR(EINVAL); + } + + ret = task_start(&enc->task); + if (ret < 0) + return ret; + } + + for (unsigned i = 0; i < sch->nb_filters; i++) { + SchFilterGraph *fg = &sch->filters[i]; + + for (unsigned j = 0; j < fg->nb_inputs; j++) { + SchFilterIn *fi = &fg->inputs[j]; + + if (!fi->src.type) { + av_log(fg, AV_LOG_ERROR, + "Filtergraph input %u not connected to a source\n", j); + return AVERROR(EINVAL); + } + + fi->src_sched = sch->dec[fi->src.idx].src; + } + + for (unsigned j = 0; j < fg->nb_outputs; j++) { + SchFilterOut *fo = &fg->outputs[j]; + + if (!fo->dst.type) { + av_log(fg, AV_LOG_ERROR, + "Filtergraph %u output %u not connected to a sink\n", i, j); + return AVERROR(EINVAL); + } + } + + ret = task_start(&fg->task); + if (ret < 0) + return ret; + } + + for (unsigned i = 0; i < sch->nb_dec; i++) { + SchDec *dec = &sch->dec[i]; + + if (!dec->src.type) { + av_log(dec, AV_LOG_ERROR, + "Decoder not connected to a source\n"); + return AVERROR(EINVAL); + } + if (!dec->nb_dst) { + av_log(dec, AV_LOG_ERROR, + "Decoder not connected to any sink\n"); + return AVERROR(EINVAL); + } + + dec->dst_finished = av_calloc(dec->nb_dst, sizeof(*dec->dst_finished)); + if (!dec->dst_finished) + return AVERROR(ENOMEM); + + ret = task_start(&dec->task); + if (ret < 0) + return ret; + } + + for (unsigned i = 0; i < sch->nb_demux; i++) { + SchDemux *d = &sch->demux[i]; + + if (!d->nb_streams) + continue; + + for (unsigned j = 0; j < d->nb_streams; j++) { + SchDemuxStream *ds = &d->streams[j]; + + if (!ds->nb_dst) { + av_log(d, AV_LOG_ERROR, + "Demuxer stream %u not connected to any sink\n", j); + return AVERROR(EINVAL); + } + + ds->dst_finished = av_calloc(ds->nb_dst, sizeof(*ds->dst_finished)); + if (!ds->dst_finished) + return AVERROR(ENOMEM); + } + + ret = task_start(&d->task); + if (ret < 0) + return ret; + } + + pthread_mutex_lock(&sch->schedule_lock); + schedule_update_locked(sch); + pthread_mutex_unlock(&sch->schedule_lock); + + return 0; +} + +int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts) +{ + int ret, err; + + // convert delay to absolute timestamp + timeout_us += av_gettime(); + + pthread_mutex_lock(&sch->mux_done_lock); + + if (sch->nb_mux_done < sch->nb_mux) { + struct timespec tv = { .tv_sec = timeout_us / 1000000, + .tv_nsec = (timeout_us % 1000000) * 1000 }; + pthread_cond_timedwait(&sch->mux_done_cond, &sch->mux_done_lock, &tv); + } + + ret = sch->nb_mux_done == sch->nb_mux; + + pthread_mutex_unlock(&sch->mux_done_lock); + + *transcode_ts = atomic_load(&sch->last_dts); + + // abort transcoding if any task failed + err = atomic_load(&sch->task_failed); + if (err < 0) + return err; + + return ret; +} + +static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame) +{ + int ret; + + ret = enc->open_cb(enc->task.func_arg, frame); + if (ret < 0) + return ret; + + // ret>0 signals audio frame size, which means sync queue must + // have been enabled during encoder creation + if (ret > 0) { + SchSyncQueue *sq; + + av_assert0(enc->sq_idx[0] >= 0); + sq = &sch->sq_enc[enc->sq_idx[0]]; + + pthread_mutex_lock(&sq->lock); + + sq_frame_samples(sq->sq, enc->sq_idx[1], ret); + + pthread_mutex_unlock(&sq->lock); + } + + return 0; +} + +static int send_to_enc_thread(Scheduler *sch, SchEnc *enc, AVFrame *frame) +{ + int ret; + + if (!frame) { + tq_send_finish(enc->queue, 0); + return 0; + } + + if (enc->in_finished) + return AVERROR_EOF; + + ret = tq_send(enc->queue, 0, frame); + if (ret < 0) + enc->in_finished = 1; + + return ret; +} + +static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame) +{ + SchSyncQueue *sq = &sch->sq_enc[enc->sq_idx[0]]; + int ret = 0; + + // inform the scheduling code that no more input will arrive along this path; + // this is necessary because the sync queue may not send an EOF downstream + // until other streams finish + // TODO: consider a cleaner way of passing this information through + // the pipeline + if (!frame) { + SchMux *mux = &sch->mux[enc->dst.idx]; + SchMuxStream *ms = &mux->streams[enc->dst.idx_stream]; + + pthread_mutex_lock(&sch->schedule_lock); + + ms->source_finished = 1; + schedule_update_locked(sch); + + pthread_mutex_unlock(&sch->schedule_lock); + } + + pthread_mutex_lock(&sq->lock); + + ret = sq_send(sq->sq, enc->sq_idx[1], SQFRAME(frame)); + if (ret < 0) + goto finish; + + while (1) { + SchEnc *enc; + + // TODO: the SQ API should be extended to allow returning EOF + // for individual streams + ret = sq_receive(sq->sq, -1, SQFRAME(sq->frame)); + if (ret == AVERROR(EAGAIN)) { + ret = 0; + goto finish; + } else if (ret < 0) { + // close all encoders fed from this sync queue + for (unsigned i = 0; i < sq->nb_enc_idx; i++) { + int err = send_to_enc_thread(sch, &sch->enc[sq->enc_idx[i]], NULL); + + // if the sync queue error is EOF and closing the encoder + // produces a more serious error, make sure to pick the latter + ret = err_merge((ret == AVERROR_EOF && err < 0) ? 0 : ret, err); + } + goto finish; + } + + enc = &sch->enc[sq->enc_idx[ret]]; + ret = send_to_enc_thread(sch, enc, sq->frame); + if (ret < 0) { + av_assert0(ret == AVERROR_EOF); + av_frame_unref(sq->frame); + sq_send(sq->sq, enc->sq_idx[1], SQFRAME(NULL)); + continue; + } + } + +finish: + pthread_mutex_unlock(&sq->lock); + + return ret; +} + +static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame) +{ + if (enc->open_cb && frame && !enc->opened) { + int ret = enc_open(sch, enc, frame); + if (ret < 0) + return ret; + enc->opened = 1; + + // discard empty frames that only carry encoder init parameters + if (!frame->buf[0]) { + av_frame_unref(frame); + return 0; + } + } + + return (enc->sq_idx[0] >= 0) ? + send_to_enc_sq (sch, enc, frame) : + send_to_enc_thread(sch, enc, frame); +} + +static int mux_queue_packet(SchMux *mux, SchMuxStream *ms, AVPacket *pkt) +{ + PreMuxQueue *q = &ms->pre_mux_queue; + AVPacket *tmp_pkt = NULL; + int ret; + + if (!av_fifo_can_write(q->fifo)) { + size_t packets = av_fifo_can_read(q->fifo); + size_t pkt_size = pkt ? pkt->size : 0; + int thresh_reached = (q->data_size + pkt_size) > q->data_threshold; + size_t max_packets = thresh_reached ? q->max_packets : SIZE_MAX; + size_t new_size = FFMIN(2 * packets, max_packets); + + if (new_size <= packets) { + av_log(mux, AV_LOG_ERROR, + "Too many packets buffered for output stream.\n"); + return AVERROR(ENOSPC); + } + ret = av_fifo_grow2(q->fifo, new_size - packets); + if (ret < 0) + return ret; + } + + if (pkt) { + tmp_pkt = av_packet_alloc(); + if (!tmp_pkt) + return AVERROR(ENOMEM); + + av_packet_move_ref(tmp_pkt, pkt); + q->data_size += tmp_pkt->size; + } + av_fifo_write(q->fifo, &tmp_pkt, 1); + + return 0; +} + +static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx, + AVPacket *pkt) +{ + SchMuxStream *ms = &mux->streams[stream_idx]; + int64_t dts = (pkt && pkt->dts != AV_NOPTS_VALUE) ? + av_rescale_q(pkt->dts, pkt->time_base, AV_TIME_BASE_Q) : + AV_NOPTS_VALUE; + + // queue the packet if the muxer cannot be started yet + if (!atomic_load(&mux->mux_started)) { + int queued = 0; + + // the muxer could have started between the above atomic check and + // locking the mutex, then this block falls through to normal send path + pthread_mutex_lock(&sch->mux_ready_lock); + + if (!atomic_load(&mux->mux_started)) { + int ret = mux_queue_packet(mux, ms, pkt); + queued = ret < 0 ? ret : 1; + } + + pthread_mutex_unlock(&sch->mux_ready_lock); + + if (queued < 0) + return queued; + else if (queued) + goto update_schedule; + } + + if (pkt) { + int ret = tq_send(mux->queue, stream_idx, pkt); + if (ret < 0) + return ret; + } else + tq_send_finish(mux->queue, stream_idx); + +update_schedule: + // TODO: use atomics to check whether this changes trailing dts + // to avoid locking unnecesarily + if (dts != AV_NOPTS_VALUE || !pkt) { + pthread_mutex_lock(&sch->schedule_lock); + + if (pkt) ms->last_dts = dts; + else ms->source_finished = 1; + + schedule_update_locked(sch); + + pthread_mutex_unlock(&sch->schedule_lock); + } + + return 0; +} + +static int +demux_stream_send_to_dst(Scheduler *sch, const SchedulerNode dst, + uint8_t *dst_finished, AVPacket *pkt, unsigned flags) +{ + int ret; + + if (*dst_finished) + return AVERROR_EOF; + + if (pkt && dst.type == SCH_NODE_TYPE_MUX && + (flags & DEMUX_SEND_STREAMCOPY_EOF)) { + av_packet_unref(pkt); + pkt = NULL; + } + + if (!pkt) + goto finish; + + ret = (dst.type == SCH_NODE_TYPE_MUX) ? + send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) : + tq_send(sch->dec[dst.idx].queue, 0, pkt); + if (ret == AVERROR_EOF) + goto finish; + + return ret; + +finish: + if (dst.type == SCH_NODE_TYPE_MUX) + send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL); + else + tq_send_finish(sch->dec[dst.idx].queue, 0); + + *dst_finished = 1; + return AVERROR_EOF; +} + +static int demux_send_for_stream(Scheduler *sch, SchDemux *d, SchDemuxStream *ds, + AVPacket *pkt, unsigned flags) +{ + unsigned nb_done = 0; + + for (unsigned i = 0; i < ds->nb_dst; i++) { + AVPacket *to_send = pkt; + uint8_t *finished = &ds->dst_finished[i]; + + int ret; + + // sending a packet consumes it, so make a temporary reference if needed + if (pkt && i < ds->nb_dst - 1) { + to_send = d->send_pkt; + + ret = av_packet_ref(to_send, pkt); + if (ret < 0) + return ret; + } + + ret = demux_stream_send_to_dst(sch, ds->dst[i], finished, to_send, flags); + if (to_send) + av_packet_unref(to_send); + if (ret == AVERROR_EOF) + nb_done++; + else if (ret < 0) + return ret; + } + + return (nb_done == ds->nb_dst) ? AVERROR_EOF : 0; +} + +static int demux_flush(Scheduler *sch, SchDemux *d, AVPacket *pkt) +{ + Timestamp max_end_ts = (Timestamp){ .ts = AV_NOPTS_VALUE }; + + av_assert0(!pkt->buf && !pkt->data && !pkt->side_data_elems); + + for (unsigned i = 0; i < d->nb_streams; i++) { + SchDemuxStream *ds = &d->streams[i]; + + for (unsigned j = 0; j < ds->nb_dst; j++) { + const SchedulerNode *dst = &ds->dst[j]; + SchDec *dec; + int ret; + + if (ds->dst_finished[j] || dst->type != SCH_NODE_TYPE_DEC) + continue; + + dec = &sch->dec[dst->idx]; + + ret = tq_send(dec->queue, 0, pkt); + if (ret < 0) + return ret; + + if (dec->queue_end_ts) { + Timestamp ts; + ret = av_thread_message_queue_recv(dec->queue_end_ts, &ts, 0); + if (ret < 0) + return ret; + + if (max_end_ts.ts == AV_NOPTS_VALUE || + (ts.ts != AV_NOPTS_VALUE && + av_compare_ts(max_end_ts.ts, max_end_ts.tb, ts.ts, ts.tb) < 0)) + max_end_ts = ts; + + } + } + } + + pkt->pts = max_end_ts.ts; + pkt->time_base = max_end_ts.tb; + + return 0; +} + +int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt, + unsigned flags) +{ + SchDemux *d; + int terminate; + + av_assert0(demux_idx < sch->nb_demux); + d = &sch->demux[demux_idx]; + + terminate = waiter_wait(sch, &d->waiter); + if (terminate) + return AVERROR_EXIT; + + // flush the downstreams after seek + if (pkt->stream_index == -1) + return demux_flush(sch, d, pkt); + + av_assert0(pkt->stream_index < d->nb_streams); + + return demux_send_for_stream(sch, d, &d->streams[pkt->stream_index], pkt, flags); +} + +static int demux_done(Scheduler *sch, unsigned demux_idx) +{ + SchDemux *d = &sch->demux[demux_idx]; + int ret = 0; + + for (unsigned i = 0; i < d->nb_streams; i++) { + int err = demux_send_for_stream(sch, d, &d->streams[i], NULL, 0); + if (err != AVERROR_EOF) + ret = err_merge(ret, err); + } + + pthread_mutex_lock(&sch->schedule_lock); + + schedule_update_locked(sch); + + pthread_mutex_unlock(&sch->schedule_lock); + + return ret; +} + +int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt) +{ + SchMux *mux; + int ret, stream_idx; + + av_assert0(mux_idx < sch->nb_mux); + mux = &sch->mux[mux_idx]; + + ret = tq_receive(mux->queue, &stream_idx, pkt); + pkt->stream_index = stream_idx; + return ret; +} + +void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx) +{ + SchMux *mux; + + av_assert0(mux_idx < sch->nb_mux); + mux = &sch->mux[mux_idx]; + + av_assert0(stream_idx < mux->nb_streams); + tq_receive_finish(mux->queue, stream_idx); + + pthread_mutex_lock(&sch->schedule_lock); + mux->streams[stream_idx].source_finished = 1; + + schedule_update_locked(sch); + + pthread_mutex_unlock(&sch->schedule_lock); +} + +int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, + const AVPacket *pkt) +{ + SchMux *mux; + SchMuxStream *ms; + + av_assert0(mux_idx < sch->nb_mux); + mux = &sch->mux[mux_idx]; + + av_assert0(stream_idx < mux->nb_streams); + ms = &mux->streams[stream_idx]; + + for (unsigned i = 0; i < ms->nb_sub_heartbeat_dst; i++) { + SchDec *dst = &sch->dec[ms->sub_heartbeat_dst[i]]; + int ret; + + ret = av_packet_copy_props(mux->sub_heartbeat_pkt, pkt); + if (ret < 0) + return ret; + + tq_send(dst->queue, 0, mux->sub_heartbeat_pkt); + } + + return 0; +} + +static int mux_done(Scheduler *sch, unsigned mux_idx) +{ + SchMux *mux = &sch->mux[mux_idx]; + + pthread_mutex_lock(&sch->schedule_lock); + + for (unsigned i = 0; i < mux->nb_streams; i++) { + tq_receive_finish(mux->queue, i); + mux->streams[i].source_finished = 1; + } + + schedule_update_locked(sch); + + pthread_mutex_unlock(&sch->schedule_lock); + + pthread_mutex_lock(&sch->mux_done_lock); + + av_assert0(sch->nb_mux_done < sch->nb_mux); + sch->nb_mux_done++; + + pthread_cond_signal(&sch->mux_done_cond); + + pthread_mutex_unlock(&sch->mux_done_lock); + + return 0; +} + +int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt) +{ + SchDec *dec; + int ret, dummy; + + av_assert0(dec_idx < sch->nb_dec); + dec = &sch->dec[dec_idx]; + + // the decoder should have given us post-flush end timestamp in pkt + if (dec->expect_end_ts) { + Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base }; + ret = av_thread_message_queue_send(dec->queue_end_ts, &ts, 0); + if (ret < 0) + return ret; + + dec->expect_end_ts = 0; + } + + ret = tq_receive(dec->queue, &dummy, pkt); + av_assert0(dummy <= 0); + + // got a flush packet, on the next call to this function the decoder + // will give us post-flush end timestamp + if (ret >= 0 && !pkt->data && !pkt->side_data_elems && dec->queue_end_ts) + dec->expect_end_ts = 1; + + return ret; +} + +static int send_to_filter(Scheduler *sch, SchFilterGraph *fg, + unsigned in_idx, AVFrame *frame) +{ + if (frame) + return tq_send(fg->queue, in_idx, frame); + + if (!fg->inputs[in_idx].send_finished) { + fg->inputs[in_idx].send_finished = 1; + tq_send_finish(fg->queue, in_idx); + + // close the control stream when all actual inputs are done + if (atomic_fetch_add(&fg->nb_inputs_finished, 1) == fg->nb_inputs - 1) + tq_send_finish(fg->queue, fg->nb_inputs); + } + return 0; +} + +static int dec_send_to_dst(Scheduler *sch, const SchedulerNode dst, + uint8_t *dst_finished, AVFrame *frame) +{ + int ret; + + if (*dst_finished) + return AVERROR_EOF; + + if (!frame) + goto finish; + + ret = (dst.type == SCH_NODE_TYPE_FILTER_IN) ? + send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame) : + send_to_enc(sch, &sch->enc[dst.idx], frame); + if (ret == AVERROR_EOF) + goto finish; + + return ret; + +finish: + if (dst.type == SCH_NODE_TYPE_FILTER_IN) + send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL); + else + send_to_enc(sch, &sch->enc[dst.idx], NULL); + + *dst_finished = 1; + + return AVERROR_EOF; +} + +int sch_dec_send(Scheduler *sch, unsigned dec_idx, AVFrame *frame) +{ + SchDec *dec; + int ret = 0; + unsigned nb_done = 0; + + av_assert0(dec_idx < sch->nb_dec); + dec = &sch->dec[dec_idx]; + + for (unsigned i = 0; i < dec->nb_dst; i++) { + uint8_t *finished = &dec->dst_finished[i]; + AVFrame *to_send = frame; + + // sending a frame consumes it, so make a temporary reference if needed + if (i < dec->nb_dst - 1) { + to_send = dec->send_frame; + + // frame may sometimes contain props only, + // e.g. to signal EOF timestamp + ret = frame->buf[0] ? av_frame_ref(to_send, frame) : + av_frame_copy_props(to_send, frame); + if (ret < 0) + return ret; + } + + ret = dec_send_to_dst(sch, dec->dst[i], finished, to_send); + if (ret < 0) { + av_frame_unref(to_send); + if (ret == AVERROR_EOF) { + nb_done++; + ret = 0; + continue; + } + goto finish; + } + } + +finish: + return ret < 0 ? ret : + (nb_done == dec->nb_dst) ? AVERROR_EOF : 0; +} + +static int dec_done(Scheduler *sch, unsigned dec_idx) +{ + SchDec *dec = &sch->dec[dec_idx]; + int ret = 0; + + tq_receive_finish(dec->queue, 0); + + // make sure our source does not get stuck waiting for end timestamps + // that will never arrive + if (dec->queue_end_ts) + av_thread_message_queue_set_err_recv(dec->queue_end_ts, AVERROR_EOF); + + for (unsigned i = 0; i < dec->nb_dst; i++) { + int err = dec_send_to_dst(sch, dec->dst[i], &dec->dst_finished[i], NULL); + if (err < 0 && err != AVERROR_EOF) + ret = err_merge(ret, err); + } + + return ret; +} + +int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame) +{ + SchEnc *enc; + int ret, dummy; + + av_assert0(enc_idx < sch->nb_enc); + enc = &sch->enc[enc_idx]; + + ret = tq_receive(enc->queue, &dummy, frame); + av_assert0(dummy <= 0); + + return ret; +} + +int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt) +{ + SchEnc *enc; + + av_assert0(enc_idx < sch->nb_enc); + enc = &sch->enc[enc_idx]; + + return send_to_mux(sch, &sch->mux[enc->dst.idx], enc->dst.idx_stream, pkt); +} + +static int enc_done(Scheduler *sch, unsigned enc_idx) +{ + SchEnc *enc = &sch->enc[enc_idx]; + + tq_receive_finish(enc->queue, 0); + + return send_to_mux(sch, &sch->mux[enc->dst.idx], enc->dst.idx_stream, NULL); +} + +int sch_filter_receive(Scheduler *sch, unsigned fg_idx, + unsigned *in_idx, AVFrame *frame) +{ + SchFilterGraph *fg; + + av_assert0(fg_idx < sch->nb_filters); + fg = &sch->filters[fg_idx]; + + av_assert0(*in_idx <= fg->nb_inputs); + + // update scheduling to account for desired input stream, if it changed + // + // this check needs no locking because only the filtering thread + // updates this value + if (*in_idx != fg->best_input) { + pthread_mutex_lock(&sch->schedule_lock); + + fg->best_input = *in_idx; + schedule_update_locked(sch); + + pthread_mutex_unlock(&sch->schedule_lock); + } + + if (*in_idx == fg->nb_inputs) { + int terminate = waiter_wait(sch, &fg->waiter); + return terminate ? AVERROR_EOF : AVERROR(EAGAIN); + } + + while (1) { + int ret, idx; + + ret = tq_receive(fg->queue, &idx, frame); + if (idx < 0) + return AVERROR_EOF; + else if (ret >= 0) { + *in_idx = idx; + return 0; + } + + // disregard EOFs for specific streams - they should always be + // preceded by an EOF frame + } +} + +int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame) +{ + SchFilterGraph *fg; + + av_assert0(fg_idx < sch->nb_filters); + fg = &sch->filters[fg_idx]; + + av_assert0(out_idx < fg->nb_outputs); + return send_to_enc(sch, &sch->enc[fg->outputs[out_idx].dst.idx], frame); +} + +static int filter_done(Scheduler *sch, unsigned fg_idx) +{ + SchFilterGraph *fg = &sch->filters[fg_idx]; + int ret = 0; + + for (unsigned i = 0; i <= fg->nb_inputs; i++) + tq_receive_finish(fg->queue, i); + + for (unsigned i = 0; i < fg->nb_outputs; i++) { + SchEnc *enc = &sch->enc[fg->outputs[i].dst.idx]; + int err = send_to_enc(sch, enc, NULL); + if (err < 0 && err != AVERROR_EOF) + ret = err_merge(ret, err); + } + + return ret; +} + +int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame) +{ + SchFilterGraph *fg; + + av_assert0(fg_idx < sch->nb_filters); + fg = &sch->filters[fg_idx]; + + return send_to_filter(sch, fg, fg->nb_inputs, frame); +} + +static void *task_wrapper(void *arg) +{ + SchTask *task = arg; + Scheduler *sch = task->parent; + int ret; + int err = 0; + + ret = (intptr_t)task->func(task->func_arg); + if (ret < 0) + av_log(task->func_arg, AV_LOG_ERROR, + "Task finished with error code: %d (%s)\n", ret, av_err2str(ret)); + + switch (task->node.type) { + case SCH_NODE_TYPE_DEMUX: err = demux_done (sch, task->node.idx); break; + case SCH_NODE_TYPE_MUX: err = mux_done (sch, task->node.idx); break; + case SCH_NODE_TYPE_DEC: err = dec_done (sch, task->node.idx); break; + case SCH_NODE_TYPE_ENC: err = enc_done (sch, task->node.idx); break; + case SCH_NODE_TYPE_FILTER_IN: err = filter_done(sch, task->node.idx); break; + default: av_assert0(0); + } + + ret = err_merge(ret, err); + + // EOF is considered normal termination + if (ret == AVERROR_EOF) + ret = 0; + if (ret < 0) + atomic_store(&sch->task_failed, 1); + + av_log(task->func_arg, ret < 0 ? AV_LOG_ERROR : AV_LOG_VERBOSE, + "Terminating thread with return code %d (%s)\n", ret, + ret < 0 ? av_err2str(ret) : "success"); + + return (void*)(intptr_t)ret; +} diff --git a/fftools/ffmpeg_sched.h b/fftools/ffmpeg_sched.h new file mode 100644 index 0000000000..94bbd30e98 --- /dev/null +++ b/fftools/ffmpeg_sched.h @@ -0,0 +1,468 @@ +/* + * Inter-thread scheduling/synchronization. + * Copyright (c) 2023 Anton Khirnov + * + * This file is part of FFmpeg. + * + * FFmpeg is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * FFmpeg is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with FFmpeg; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef FFTOOLS_FFMPEG_SCHED_H +#define FFTOOLS_FFMPEG_SCHED_H + +#include +#include + +#include "ffmpeg_utils.h" + +/* + * This file contains the API for the transcode scheduler. + * + * Overall architecture of the transcoding process involves instances of the + * following components: + * - demuxers, each containing any number of demuxed streams; demuxed packets + * belonging to some stream are sent to any number of decoders (transcoding) + * and/or muxers (streamcopy); + * - decoders, which receive encoded packets from some demuxed stream, decode + * them, and send decoded frames to any number of filtergraph inputs + * (audio/video) or encoders (subtitles); + * - filtergraphs, each containing zero or more inputs (0 in case the + * filtergraph contains a lavfi source filter), and one or more outputs; the + * inputs and outputs need not have matching media types; + * each filtergraph input receives decoded frames from some decoder; + * filtered frames from each output are sent to some encoder; + * - encoders, which receive decoded frames from some decoder (subtitles) or + * some filtergraph output (audio/video), encode them, and send encoded + * packets to some muxed stream; + * - muxers, each containing any number of muxed streams; each muxed stream + * receives encoded packets from some demuxed stream (streamcopy) or some + * encoder (transcoding); those packets are interleaved and written out by the + * muxer. + * + * There must be at least one muxer instance, otherwise the transcode produces + * no output and is meaningless. Otherwise, in a generic transcoding scenario + * there may be arbitrary number of instances of any of the above components, + * interconnected in various ways. + * + * The code tries to keep all the output streams across all the muxers in sync + * (i.e. at the same DTS), which is accomplished by varying the rates at which + * packets are read from different demuxers and lavfi sources. Note that the + * degree of control we have over synchronization is fundamentally limited - if + * some demuxed streams in the same input are interleaved at different rates + * than that at which they are to be muxed (e.g. because an input file is badly + * interleaved, or the user changed their speed by mismatching amounts), then + * there will be increasing amounts of buffering followed by eventual + * transcoding failure. + * + * N.B. 1: there are meaningful transcode scenarios with no demuxers, e.g. + * - encoding and muxing output from filtergraph(s) that have no inputs; + * - creating a file that contains nothing but attachments and/or metadata. + * + * N.B. 2: a filtergraph output could, in principle, feed multiple encoders, but + * this is unnecessary because the (a)split filter provides the same + * functionality. + * + * The scheduler, in the above model, is the master object that oversees and + * facilitates the transcoding process. The basic idea is that all instances + * of the abovementioned components communicate only with the scheduler and not + * with each other. The scheduler is then the single place containing the + * knowledge about the whole transcoding pipeline. + */ + +struct AVFrame; +struct AVPacket; + +typedef struct Scheduler Scheduler; + +enum SchedulerNodeType { + SCH_NODE_TYPE_NONE = 0, + SCH_NODE_TYPE_DEMUX, + SCH_NODE_TYPE_MUX, + SCH_NODE_TYPE_DEC, + SCH_NODE_TYPE_ENC, + SCH_NODE_TYPE_FILTER_IN, + SCH_NODE_TYPE_FILTER_OUT, +}; + +typedef struct SchedulerNode { + enum SchedulerNodeType type; + unsigned idx; + unsigned idx_stream; +} SchedulerNode; + +typedef void* (*SchThreadFunc)(void *arg); + +#define SCH_DSTREAM(file, stream) \ + (SchedulerNode){ .type = SCH_NODE_TYPE_DEMUX, \ + .idx = file, .idx_stream = stream } +#define SCH_MSTREAM(file, stream) \ + (SchedulerNode){ .type = SCH_NODE_TYPE_MUX, \ + .idx = file, .idx_stream = stream } +#define SCH_DEC(decoder) \ + (SchedulerNode){ .type = SCH_NODE_TYPE_DEC, \ + .idx = decoder } +#define SCH_ENC(encoder) \ + (SchedulerNode){ .type = SCH_NODE_TYPE_ENC, \ + .idx = encoder } +#define SCH_FILTER_IN(filter, input) \ + (SchedulerNode){ .type = SCH_NODE_TYPE_FILTER_IN, \ + .idx = filter, .idx_stream = input } +#define SCH_FILTER_OUT(filter, output) \ + (SchedulerNode){ .type = SCH_NODE_TYPE_FILTER_OUT, \ + .idx = filter, .idx_stream = output } + +Scheduler *sch_alloc(void); +void sch_free(Scheduler **sch); + +int sch_start(Scheduler *sch); +int sch_stop(Scheduler *sch); + +/** + * Wait until transcoding terminates or the specified timeout elapses. + * + * @param timeout_us Amount of time in microseconds after which this function + * will timeout. + * @param transcode_ts Current transcode timestamp in AV_TIME_BASE_Q, for + * informational purposes only. + * + * @retval 0 waiting timed out, transcoding is not finished + * @retval 1 transcoding is finished + */ +int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts); + +/** + * Add a demuxer to the scheduler. + * + * @param func Function executed as the demuxer task. + * @param ctx Demuxer state; will be passed to func and used for logging. + * + * @retval ">=0" Index of the newly-created demuxer. + * @retval "<0" Error code. + */ +int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx); +/** + * Add a demuxed stream for a previously added demuxer. + * + * @param demux_idx index previously returned by sch_add_demux() + * + * @retval ">=0" Index of the newly-created demuxed stream. + * @retval "<0" Error code. + */ +int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx); + +/** + * Add a decoder to the scheduler. + * + * @param func Function executed as the decoder task. + * @param ctx Decoder state; will be passed to func and used for logging. + * @param send_end_ts The decoder will return an end timestamp after flush packets + * are delivered to it. See documentation for + * sch_dec_receive() for more details. + * + * @retval ">=0" Index of the newly-created decoder. + * @retval "<0" Error code. + */ +int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, + int send_end_ts); + +/** + * Add a filtergraph to the scheduler. + * + * @param nb_inputs Number of filtergraph inputs. + * @param nb_outputs number of filtergraph outputs + * @param func Function executed as the filtering task. + * @param ctx Filter state; will be passed to func and used for logging. + * + * @retval ">=0" Index of the newly-created filtergraph. + * @retval "<0" Error code. + */ +int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs, + SchThreadFunc func, void *ctx); + +/** + * Add a muxer to the scheduler. + * + * Note that muxer thread startup is more complicated than for other components, + * because + * - muxer streams fed by audio/video encoders become initialized dynamically at + * runtime, after those encoders receive their first frame and initialize + * themselves, followed by calling sch_mux_stream_ready() + * - the header can be written after all the streams for a muxer are initialized + * - we may need to write an SDP, which must happen + * - AFTER all the headers are written + * - BEFORE any packets are written by any muxer + * - with all the muxers quiescent + * To avoid complicated muxer-thread synchronization dances, we postpone + * starting the muxer threads until after the SDP is written. The sequence of + * events is then as follows: + * - After sch_mux_stream_ready() is called for all the streams in a given muxer, + * the header for that muxer is written (care is taken that headers for + * different muxers are not written concurrently, since they write file + * information to stderr). If SDP is not wanted, the muxer thread then starts + * and muxing begins. + * - When SDP _is_ wanted, no muxer threads start until the header for the last + * muxer is written. After that, the SDP is written, after which all the muxer + * threads are started at once. + * + * In order for the above to work, the scheduler needs to be able to invoke + * just writing the header, which is the reason the init parameter exists. + * + * @param func Function executed as the muxing task. + * @param init Callback that is called to initialize the muxer and write the + * header. Called after sch_mux_stream_ready() is called for all the + * streams in the muxer. + * @param ctx Muxer state; will be passed to func/init and used for logging. + * @param sdp_auto Determines automatic SDP writing - see sch_sdp_filename(). + * + * @retval ">=0" Index of the newly-created muxer. + * @retval "<0" Error code. + */ +int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *), + void *ctx, int sdp_auto); +/** + * Add a muxed stream for a previously added muxer. + * + * @param mux_idx index previously returned by sch_add_mux() + * + * @retval ">=0" Index of the newly-created muxed stream. + * @retval "<0" Error code. + */ +int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx); + +/** + * Configure limits on packet buffering performed before the muxer task is + * started. + * + * @param mux_idx index previously returned by sch_add_mux() + * @param stream_idx_idx index previously returned by sch_add_mux_stream() + * @param data_threshold Total size of the buffered packets' data after which + * max_packets applies. + * @param max_packets maximum Maximum number of buffered packets after + * data_threshold is reached. + */ +void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, + size_t data_threshold, int max_packets); + +/** + * Signal to the scheduler that the specified muxed stream is initialized and + * ready. Muxing is started once all the streams are ready. + */ +int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx); + +/** + * Set the file path for the SDP. + * + * The SDP is written when either of the following is true: + * - this function is called at least once + * - sdp_auto=1 is passed to EVERY call of sch_add_mux() + */ +int sch_sdp_filename(Scheduler *sch, const char *sdp_filename); + +/** + * Add an encoder to the scheduler. + * + * @param func Function executed as the encoding task. + * @param ctx Encoder state; will be passed to func and used for logging. + * @param open_cb This callback, if specified, will be called when the first + * frame is obtained for this encoder. For audio encoders with a + * fixed frame size (which use a sync queue in the scheduler to + * rechunk frames), it must return that frame size on success. + * Otherwise (non-audio, variable frame size) it should return 0. + * + * @retval ">=0" Index of the newly-created encoder. + * @retval "<0" Error code. + */ +int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx, + int (*open_cb)(void *func_arg, const struct AVFrame *frame)); + +/** + * Add an pre-encoding sync queue to the scheduler. + * + * @param buf_size_us Sync queue buffering size, passed to sq_alloc(). + * @param logctx Logging context for the sync queue. passed to sq_alloc(). + * + * @retval ">=0" Index of the newly-created sync queue. + * @retval "<0" Error code. + */ +int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx); +int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx, + int limiting, uint64_t max_frames); + +int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst); + +enum DemuxSendFlags { + /** + * Treat the packet as an EOF for SCH_NODE_TYPE_MUX destinations + * send normally to other types. + */ + DEMUX_SEND_STREAMCOPY_EOF = (1 << 0), +}; + +/** + * Called by demuxer tasks to communicate with their downstreams. The following + * may be sent: + * - a demuxed packet for the stream identified by pkt->stream_index; + * - demuxer discontinuity/reset (e.g. after a seek) - this is signalled by an + * empty packet with stream_index=-1. + * + * @param demux_idx demuxer index + * @param pkt A demuxed packet to send. + * When flushing (i.e. pkt->stream_index=-1 on entry to this + * function), on successful return pkt->pts/pkt->time_base will be + * set to the maximum end timestamp of any decoded audio stream, or + * AV_NOPTS_VALUE if no decoded audio streams are present. + * + * @retval "non-negative value" success + * @retval AVERROR_EOF all consumers for the stream are done + * @retval AVERROR_EXIT all consumers are done, should terminate demuxing + * @retval "anoter negative error code" other failure + */ +int sch_demux_send(Scheduler *sch, unsigned demux_idx, struct AVPacket *pkt, + unsigned flags); + +/** + * Called by decoder tasks to receive a packet for decoding. + * + * @param dec_idx decoder index + * @param pkt Input packet will be written here on success. + * + * An empty packet signals that the decoder should be flushed, but + * more packets will follow (e.g. after seeking). When a decoder + * created with send_end_ts=1 receives a flush packet, it must write + * the end timestamp of the stream after flushing to + * pkt->pts/time_base on the next call to this function (if any). + * + * @retval "non-negative value" success + * @retval AVERROR_EOF no more packets will arrive, should terminate decoding + * @retval "another negative error code" other failure + */ +int sch_dec_receive(Scheduler *sch, unsigned dec_idx, struct AVPacket *pkt); + +/** + * Called by decoder tasks to send a decoded frame downstream. + * + * @param dec_idx Decoder index previously returned by sch_add_dec(). + * @param frame Decoded frame; on success it is consumed and cleared by this + * function + * + * @retval ">=0" success + * @retval AVERROR_EOF all consumers are done, should terminate decoding + * @retval "another negative error code" other failure + */ +int sch_dec_send(Scheduler *sch, unsigned dec_idx, struct AVFrame *frame); + +/** + * Called by filtergraph tasks to obtain frames for filtering. Will wait for a + * frame to become available and return it in frame. + * + * Filtergraphs that contain lavfi sources and do not currently require new + * input frames should call this function as a means of rate control - then + * in_idx should be set equal to nb_inputs on entry to this function. + * + * @param fg_idx Filtergraph index previously returned by sch_add_filtergraph(). + * @param[in,out] in_idx On input contains the index of the input on which a frame + * is most desired. May be set to nb_inputs to signal that + * the filtergraph does not need more input currently. + * + * On success, will be replaced with the input index of + * the actually returned frame or EOF timestamp. + * + * @retval ">=0" Frame data or EOF timestamp was delivered into frame, in_idx + * contains the index of the input it belongs to. + * @retval AVERROR(EAGAIN) No frame was returned, the filtergraph should + * resume filtering. May only be returned when + * in_idx=nb_inputs on entry to this function. + * @retval AVERROR_EOF No more frames will arrive, should terminate filtering. + */ +int sch_filter_receive(Scheduler *sch, unsigned fg_idx, + unsigned *in_idx, struct AVFrame *frame); + +/** + * Called by filtergraph tasks to send a filtered frame or EOF to consumers. + * + * @param fg_idx Filtergraph index previously returned by sch_add_filtergraph(). + * @param out_idx Index of the output which produced the frame. + * @param frame The frame to send to consumers. When NULL, signals that no more + * frames will be produced for the specified output. When non-NULL, + * the frame is consumed and cleared by this function on success. + * + * @retval "non-negative value" success + * @retval AVERROR_EOF all consumers are done + * @retval "anoter negative error code" other failure + */ +int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, + struct AVFrame *frame); + +int sch_filter_command(Scheduler *sch, unsigned fg_idx, struct AVFrame *frame); + +/** + * Called by encoder tasks to obtain frames for encoding. Will wait for a frame + * to become available and return it in frame. + * + * @param enc_idx Encoder index previously returned by sch_add_enc(). + * @param frame Newly-received frame will be stored here on success. Must be + * clean on entrance to this function. + * + * @retval 0 A frame was successfully delivered into frame. + * @retval AVERROR_EOF No more frames will be delivered, the encoder should + * flush everything and terminate. + * + */ +int sch_enc_receive(Scheduler *sch, unsigned enc_idx, struct AVFrame *frame); + +/** + * Called by encoder tasks to send encoded packets downstream. + * + * @param enc_idx Encoder index previously returned by sch_add_enc(). + * @param pkt An encoded packet; it will be consumed and cleared by this + * function on success. + * + * @retval 0 success + * @retval "<0" Error code. + */ +int sch_enc_send (Scheduler *sch, unsigned enc_idx, struct AVPacket *pkt); + +/** + * Called by muxer tasks to obtain packets for muxing. Will wait for a packet + * for any muxed stream to become available and return it in pkt. + * + * @param mux_idx Muxer index previously returned by sch_add_mux(). + * @param pkt Newly-received packet will be stored here on success. Must be + * clean on entrance to this function. + * + * @retval 0 A packet was successfully delivered into pkt. Its stream_index + * corresponds to a stream index previously returned from + * sch_add_mux_stream(). + * @retval AVERROR_EOF When pkt->stream_index is non-negative, this signals that + * no more packets will be delivered for this stream index. + * Otherwise this indicates that no more packets will be + * delivered for any stream and the muxer should therefore + * flush everything and terminate. + */ +int sch_mux_receive(Scheduler *sch, unsigned mux_idx, struct AVPacket *pkt); + +/** + * Called by muxer tasks to signal that a stream will no longer accept input. + * + * @param stream_idx Stream index previously returned from sch_add_mux_stream(). + */ +void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx); + +int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, + unsigned dec_idx); +int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, + const AVPacket *pkt); + +#endif /* FFTOOLS_FFMPEG_SCHED_H */