From patchwork Tue Sep 19 19:10:48 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Anton Khirnov X-Patchwork-Id: 43805 Delivered-To: ffmpegpatchwork2@gmail.com Received: by 2002:a05:6a20:a886:b0:149:dfde:5c0a with SMTP id ca6csp170310pzb; Tue, 19 Sep 2023 12:24:26 -0700 (PDT) X-Google-Smtp-Source: AGHT+IHADGEATRvV6eVRNENlMnoPYeHqC32KtY8wGKmetw9TrjK22Ne+/PYCvCHfAXmX/pY3TIRx X-Received: by 2002:a17:907:78c5:b0:9ad:fb23:21cf with SMTP id kv5-20020a17090778c500b009adfb2321cfmr303875ejc.15.1695151466272; Tue, 19 Sep 2023 12:24:26 -0700 (PDT) ARC-Seal: i=1; a=rsa-sha256; t=1695151466; cv=none; d=google.com; s=arc-20160816; b=szSuTBGiCvidVg58xEZU19i1vw5ltmoEf65DkWD5g6kqPS6N8YHsgfJ9gb6Rg5qWc9 bVXBy3L6XcJZOt1Pqp9R2lrpZ8JXuIHgvY9RmfdknOJMx+5FI1/M7Wk957f2w/MfcTvs GQsUkNBEgOQMHa5k2DaX+jM/8iwOEsJKGxMLjCBzN8BuglyF/PQHtfM9IoLxQKqKQxYA HmfkgbDGtqCECRyOBzoKuZcxYBzFwQWA7wo5X3Ijr1mpWlRV4FAugpXsDv7/+t9yV076 3/OPLl9Z/5xO6hYLGQYKeRKEVFSKMxSEPNH9mSaHLV5uSB46oWwlw/7q2YBv12nZhmrT OSTw== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:content-transfer-encoding:reply-to:list-subscribe :list-help:list-post:list-archive:list-unsubscribe:list-id :precedence:subject:mime-version:references:in-reply-to:message-id :date:to:from:delivered-to; bh=JwU3Mj0mM85+9ZyfXJNqsqeJY9NdLinIxjdLiYZ+gJA=; fh=YOA8vD9MJZuwZ71F/05pj6KdCjf6jQRmzLS+CATXUQk=; b=OEn+vLcPfUjl0exNrSDLF+x0NeIpx+gCUAWAkob4e/fDxHlXh6uIh2RFS6iwoEd0wQ SBznEvGaP7pA3hQv8OzB0HRXyYq5pqUw6mJjEZHpZTe3Dl2qZ/W+wA0FRWatkO+Rg2KQ bBS3bXqdqr+ecrChm5/f4yvkKcj0JS6YeFuXQag2lj3IgkydyPlcgiM6bHrxra2owA+6 SQnpiR7G2cpv2PpiUQ9t6dp55E4KlPCn2W1jMOOO8ahOayw1EpSFUrq/JaXu0vpvtMSb 5rvI40YFo5WSgb/OhAO9GI6npxZVR+Z647le+GVMNzWyfFbxP0zCS5qKjVPYyHKd7xqB w97w== ARC-Authentication-Results: i=1; mx.google.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org. [79.124.17.100]) by mx.google.com with ESMTP id d23-20020a170906641700b009932d8a227asi10394367ejm.277.2023.09.19.12.24.25; Tue, 19 Sep 2023 12:24:26 -0700 (PDT) Received-SPF: pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) client-ip=79.124.17.100; Authentication-Results: mx.google.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 5828168C9B0; Tue, 19 Sep 2023 22:21:42 +0300 (EEST) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from mail1.khirnov.net (quelana.khirnov.net [94.230.150.81]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id 5943E68C927 for ; Tue, 19 Sep 2023 22:21:40 +0300 (EEST) Received: from localhost (mail1.khirnov.net [IPv6:::1]) by mail1.khirnov.net (Postfix) with ESMTP id E7FDF5101 for ; Tue, 19 Sep 2023 21:21:39 +0200 (CEST) Received: from mail1.khirnov.net ([IPv6:::1]) by localhost (mail1.khirnov.net [IPv6:::1]) (amavis, port 10024) with ESMTP id E-Eh26I9Qw7B for ; Tue, 19 Sep 2023 21:21:34 +0200 (CEST) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:2a00:c500:561:201::7]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256 client-signature RSA-PSS (2048 bits) client-digest SHA256) (Client CN "libav.khirnov.net", Issuer "smtp.khirnov.net SMTP CA" (verified OK)) by mail1.khirnov.net (Postfix) with ESMTPS id 15CD55206 for ; Tue, 19 Sep 2023 21:20:43 +0200 (CEST) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:::1]) by libav.khirnov.net (Postfix) with ESMTP id 0466C3A0D2D for ; Tue, 19 Sep 2023 21:20:43 +0200 (CEST) From: Anton Khirnov To: ffmpeg-devel@ffmpeg.org Date: Tue, 19 Sep 2023 21:10:48 +0200 Message-Id: <20230919191044.18873-22-anton@khirnov.net> X-Mailer: git-send-email 2.40.1 In-Reply-To: <20230919191044.18873-1-anton@khirnov.net> References: <20230919191044.18873-1-anton@khirnov.net> MIME-Version: 1.0 Subject: [FFmpeg-devel] [PATCH 21/27] WIP fftools/ffmpeg: add thread-aware transcode scheduling infrastructure X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" X-TUID: JxuX22pDJPFA See the comment block at the top of fftools/ffmpeg_sched.h for more details on what this scheduler is for. This commit adds the scheduling code itself, along with minimal integration with the rest of the program: * allocating and freeing the scheduler * passing it throughout the call stack in order to register the individual components (demuxers/decoders/filtergraphs/encoders/muxers) with the scheduler The scheduler is not actually used as of this commit, so it should not result in any change in behavior. That will change in future commits. --- fftools/Makefile | 1 + fftools/ffmpeg.c | 18 +- fftools/ffmpeg.h | 26 +- fftools/ffmpeg_dec.c | 10 +- fftools/ffmpeg_demux.c | 44 +- fftools/ffmpeg_enc.c | 13 +- fftools/ffmpeg_filter.c | 38 +- fftools/ffmpeg_mux.c | 15 +- fftools/ffmpeg_mux.h | 10 + fftools/ffmpeg_mux_init.c | 70 +- fftools/ffmpeg_opt.c | 22 +- fftools/ffmpeg_sched.c | 1703 +++++++++++++++++++++++++++++++++++++ fftools/ffmpeg_sched.h | 414 +++++++++ 13 files changed, 2332 insertions(+), 52 deletions(-) create mode 100644 fftools/ffmpeg_sched.c create mode 100644 fftools/ffmpeg_sched.h diff --git a/fftools/Makefile b/fftools/Makefile index 56820e6bc8..d6a8913a7f 100644 --- a/fftools/Makefile +++ b/fftools/Makefile @@ -18,6 +18,7 @@ OBJS-ffmpeg += \ fftools/ffmpeg_mux.o \ fftools/ffmpeg_mux_init.o \ fftools/ffmpeg_opt.o \ + fftools/ffmpeg_sched.o \ fftools/objpool.o \ fftools/sync_queue.o \ fftools/thread_queue.o \ diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c index e084318864..995424ca93 100644 --- a/fftools/ffmpeg.c +++ b/fftools/ffmpeg.c @@ -99,6 +99,7 @@ #include "cmdutils.h" #include "ffmpeg.h" +#include "ffmpeg_sched.h" #include "sync_queue.h" const char program_name[] = "ffmpeg"; @@ -1155,7 +1156,7 @@ static int transcode_step(OutputStream *ost, AVPacket *demux_pkt) /* * The following code is the main loop of the file converter */ -static int transcode(int *err_rate_exceeded) +static int transcode(Scheduler *sch, int *err_rate_exceeded) { int ret = 0, i; InputStream *ist; @@ -1293,6 +1294,8 @@ static int64_t getmaxrss(void) int main(int argc, char **argv) { + Scheduler *sch = NULL; + int ret, err_rate_exceeded; BenchmarkTimeStamps ti; @@ -1310,8 +1313,14 @@ int main(int argc, char **argv) show_banner(argc, argv, options); + sch = sch_alloc(); + if (!sch) { + ret = AVERROR(ENOMEM); + goto finish; + } + /* parse options and open all input/output files */ - ret = ffmpeg_parse_options(argc, argv); + ret = ffmpeg_parse_options(argc, argv, sch); if (ret < 0) goto finish; @@ -1329,7 +1338,7 @@ int main(int argc, char **argv) } current_time = ti = get_benchmark_time_stamps(); - ret = transcode(&err_rate_exceeded); + ret = transcode(sch, &err_rate_exceeded); if (ret >= 0 && do_benchmark) { int64_t utime, stime, rtime; current_time = get_benchmark_time_stamps(); @@ -1349,5 +1358,8 @@ finish: ret = 0; ffmpeg_cleanup(ret); + + sch_free(&sch); + return ret; } diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index a4fd825749..278216e5ff 100644 --- a/fftools/ffmpeg.h +++ b/fftools/ffmpeg.h @@ -27,6 +27,7 @@ #include #include "cmdutils.h" +#include "ffmpeg_sched.h" #include "sync_queue.h" #include "libavformat/avformat.h" @@ -731,7 +732,8 @@ int parse_and_set_vsync(const char *arg, int *vsync_var, int file_idx, int st_id int check_filter_outputs(void); int filtergraph_is_simple(const FilterGraph *fg); int init_simple_filtergraph(InputStream *ist, OutputStream *ost, - char *graph_desc); + char *graph_desc, + Scheduler *sch, unsigned sch_idx_enc); int init_complex_filtergraph(FilterGraph *fg); int copy_av_subtitle(AVSubtitle *dst, const AVSubtitle *src); @@ -754,7 +756,8 @@ void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational t */ int ifilter_parameters_from_dec(InputFilter *ifilter, const AVCodecContext *dec); -int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost); +int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost, + unsigned sched_idx_enc); /** * Create a new filtergraph in the global filtergraph list. @@ -762,7 +765,7 @@ int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost); * @param graph_desc Graph description; an av_malloc()ed string, filtergraph * takes ownership of it. */ -int fg_create(FilterGraph **pfg, char *graph_desc); +int fg_create(FilterGraph **pfg, char *graph_desc, Scheduler *sch); void fg_free(FilterGraph **pfg); @@ -786,7 +789,7 @@ void fg_send_command(FilterGraph *fg, double time, const char *target, */ int reap_filters(FilterGraph *fg, int flush); -int ffmpeg_parse_options(int argc, char **argv); +int ffmpeg_parse_options(int argc, char **argv, Scheduler *sch); void enc_stats_write(OutputStream *ost, EncStats *es, const AVFrame *frame, const AVPacket *pkt, @@ -809,7 +812,7 @@ AVBufferRef *hw_device_for_filter(void); int hwaccel_retrieve_data(AVCodecContext *avctx, AVFrame *input); -int dec_open(InputStream *ist); +int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx); void dec_free(Decoder **pdec); /** @@ -823,7 +826,8 @@ void dec_free(Decoder **pdec); */ int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof); -int enc_alloc(Encoder **penc, const AVCodec *codec); +int enc_alloc(Encoder **penc, const AVCodec *codec, + Scheduler *sch, unsigned sch_idx); void enc_free(Encoder **penc); int enc_open(OutputStream *ost, const AVFrame *frame); @@ -839,7 +843,7 @@ int enc_flush(void); */ int of_stream_init(OutputFile *of, OutputStream *ost); int of_write_trailer(OutputFile *of); -int of_open(const OptionsContext *o, const char *filename); +int of_open(const OptionsContext *o, const char *filename, Scheduler *sch); void of_free(OutputFile **pof); void of_enc_stats_close(void); @@ -853,7 +857,7 @@ int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts); int64_t of_filesize(OutputFile *of); -int ifile_open(const OptionsContext *o, const char *filename); +int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch); void ifile_close(InputFile **f); /** @@ -961,4 +965,10 @@ static inline void frame_move(void *dst, void *src) av_frame_move_ref(dst, src); } +void *muxer_thread(void *arg); +void *decoder_thread(void *arg); +void *encoder_thread(void *arg); + +int print_sdp(const char *filename); + #endif /* FFTOOLS_FFMPEG_H */ diff --git a/fftools/ffmpeg_dec.c b/fftools/ffmpeg_dec.c index 36163195ca..dc8d0374a3 100644 --- a/fftools/ffmpeg_dec.c +++ b/fftools/ffmpeg_dec.c @@ -51,6 +51,9 @@ struct Decoder { AVFrame *sub_prev[2]; AVFrame *sub_heartbeat; + Scheduler *sch; + unsigned sch_idx; + pthread_t thread; /** * Queue for sending coded packets from the main thread to @@ -667,7 +670,7 @@ fail: return AVERROR(ENOMEM); } -static void *decoder_thread(void *arg) +void *decoder_thread(void *arg) { InputStream *ist = arg; InputFile *ifile = input_files[ist->file_index]; @@ -1048,7 +1051,7 @@ static int hw_device_setup_for_decode(InputStream *ist) return 0; } -int dec_open(InputStream *ist) +int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx) { Decoder *d; const AVCodec *codec = ist->dec; @@ -1066,6 +1069,9 @@ int dec_open(InputStream *ist) return ret; d = ist->decoder; + d->sch = sch; + d->sch_idx = sch_idx; + if (codec->type == AVMEDIA_TYPE_SUBTITLE && ist->fix_sub_duration) { for (int i = 0; i < FF_ARRAY_ELEMS(d->sub_prev); i++) { d->sub_prev[i] = av_frame_alloc(); diff --git a/fftools/ffmpeg_demux.c b/fftools/ffmpeg_demux.c index ea74b45663..074546d517 100644 --- a/fftools/ffmpeg_demux.c +++ b/fftools/ffmpeg_demux.c @@ -20,6 +20,7 @@ #include #include "ffmpeg.h" +#include "ffmpeg_sched.h" #include "objpool.h" #include "thread_queue.h" @@ -59,6 +60,9 @@ typedef struct DemuxStream { // name used for logging char log_name[32]; + int sch_idx_stream; + int sch_idx_dec; + double ts_scale; int streamcopy_needed; @@ -110,6 +114,7 @@ typedef struct Demuxer { double readrate_initial_burst; + Scheduler *sch; ThreadQueue *thread_queue; int thread_queue_size; pthread_t thread; @@ -824,7 +829,9 @@ void ifile_close(InputFile **pf) static int ist_use(InputStream *ist, int decoding_needed) { + Demuxer *d = demuxer_from_ifile(input_files[ist->file_index]); DemuxStream *ds = ds_from_ist(ist); + int ret; if (ist->user_set_discard == AVDISCARD_ALL) { av_log(ist, AV_LOG_ERROR, "Cannot %s a disabled input stream\n", @@ -832,13 +839,30 @@ static int ist_use(InputStream *ist, int decoding_needed) return AVERROR(EINVAL); } + if (ds->sch_idx_stream < 0) { + ret = sch_add_demux_stream(d->sch, d->f.index); + if (ret < 0) + return ret; + ds->sch_idx_stream = ret; + } + ist->discard = 0; ist->st->discard = ist->user_set_discard; ist->decoding_needed |= decoding_needed; ds->streamcopy_needed |= !decoding_needed; - if (decoding_needed && !avcodec_is_open(ist->dec_ctx)) { - int ret = dec_open(ist); + if (decoding_needed && ds->sch_idx_dec < 0) { + ret = sch_add_dec(d->sch, decoder_thread, ist); + if (ret < 0) + return ret; + ds->sch_idx_dec = ret; + + ret = sch_connect(d->sch, SCH_DSTREAM(d->f.index, ds->sch_idx_stream), + SCH_DEC(ds->sch_idx_dec), NULL, NULL); + if (ret < 0) + return ret; + + ret = dec_open(ist, d->sch, ds->sch_idx_dec); if (ret < 0) return ret; } @@ -848,6 +872,7 @@ static int ist_use(InputStream *ist, int decoding_needed) int ist_output_add(InputStream *ist, OutputStream *ost) { + DemuxStream *ds = ds_from_ist(ist); int ret; ret = ist_use(ist, ost->enc ? DECODING_FOR_OST : 0); @@ -860,11 +885,12 @@ int ist_output_add(InputStream *ist, OutputStream *ost) ist->outputs[ist->nb_outputs - 1] = ost; - return 0; + return ost->enc ? ds->sch_idx_dec : ds->sch_idx_stream; } int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple) { + DemuxStream *ds = ds_from_ist(ist); int ret; ret = ist_use(ist, is_simple ? DECODING_FOR_OST : DECODING_FOR_FILTER); @@ -882,7 +908,7 @@ int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple) if (ret < 0) return ret; - return 0; + return ds->sch_idx_dec; } static int choose_decoder(const OptionsContext *o, AVFormatContext *s, AVStream *st, @@ -1009,6 +1035,9 @@ static DemuxStream *demux_stream_alloc(Demuxer *d, AVStream *st) if (!ds) return NULL; + ds->sch_idx_stream = -1; + ds->sch_idx_dec = -1; + ds->ist.st = st; ds->ist.file_index = f->index; ds->ist.index = st->index; @@ -1339,7 +1368,7 @@ static Demuxer *demux_alloc(void) return d; } -int ifile_open(const OptionsContext *o, const char *filename) +int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch) { Demuxer *d; InputFile *f; @@ -1366,6 +1395,11 @@ int ifile_open(const OptionsContext *o, const char *filename) f = &d->f; + ret = sch_add_demux(sch, input_thread, d); + if (ret < 0) + return ret; + d->sch = sch; + if (stop_time != INT64_MAX && recording_time != INT64_MAX) { stop_time = INT64_MAX; av_log(d, AV_LOG_WARNING, "-t and -to cannot be used together; using -t.\n"); diff --git a/fftools/ffmpeg_enc.c b/fftools/ffmpeg_enc.c index ea542173c5..9bede78a1e 100644 --- a/fftools/ffmpeg_enc.c +++ b/fftools/ffmpeg_enc.c @@ -55,6 +55,9 @@ struct Encoder { int opened; int finished; + Scheduler *sch; + unsigned sch_idx; + pthread_t thread; /** * Queue for sending frames from the main thread to @@ -112,7 +115,8 @@ void enc_free(Encoder **penc) av_freep(penc); } -int enc_alloc(Encoder **penc, const AVCodec *codec) +int enc_alloc(Encoder **penc, const AVCodec *codec, + Scheduler *sch, unsigned sch_idx) { Encoder *enc; @@ -132,6 +136,9 @@ int enc_alloc(Encoder **penc, const AVCodec *codec) if (!enc->pkt) goto fail; + enc->sch = sch; + enc->sch_idx = sch_idx; + *penc = enc; return 0; @@ -216,8 +223,6 @@ static int set_encoder_id(OutputFile *of, OutputStream *ost) return 0; } -static void *encoder_thread(void *arg); - static int enc_thread_start(OutputStream *ost) { Encoder *e = ost->enc; @@ -1028,7 +1033,7 @@ fail: return AVERROR(ENOMEM); } -static void *encoder_thread(void *arg) +void *encoder_thread(void *arg) { OutputStream *ost = arg; OutputFile *of = output_files[ost->file_index]; diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c index 04c4b4ea7b..e8e78f5454 100644 --- a/fftools/ffmpeg_filter.c +++ b/fftools/ffmpeg_filter.c @@ -72,6 +72,9 @@ typedef struct FilterGraphPriv { // frame for sending output to the encoder AVFrame *frame_enc; + Scheduler *sch; + unsigned sch_idx; + pthread_t thread; /** * Queue for sending frames from the main thread to the filtergraph. Has @@ -742,14 +745,20 @@ static int ifilter_bind_ist(InputFilter *ifilter, InputStream *ist) { InputFilterPriv *ifp = ifp_from_ifilter(ifilter); FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph); - int ret; + int ret, dec_idx; av_assert0(!ifp->ist); ifp->ist = ist; ifp->type_src = ist->st->codecpar->codec_type; - ret = ist_filter_add(ist, ifilter, filtergraph_is_simple(ifilter->graph)); + dec_idx = ist_filter_add(ist, ifilter, filtergraph_is_simple(ifilter->graph)); + if (dec_idx < 0) + return dec_idx; + + ret = sch_connect(fgp->sch, SCH_DEC(dec_idx), + SCH_FILTER_IN(fgp->sch_idx, ifp->index), + NULL, NULL); if (ret < 0) return ret; @@ -805,13 +814,15 @@ static int set_channel_layout(OutputFilterPriv *f, OutputStream *ost) return 0; } -int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost) +int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost, + unsigned sched_idx_enc) { const OutputFile *of = output_files[ost->file_index]; OutputFilterPriv *ofp = ofp_from_ofilter(ofilter); FilterGraph *fg = ofilter->graph; FilterGraphPriv *fgp = fgp_from_fg(fg); const AVCodec *c = ost->enc_ctx->codec; + int ret; av_assert0(!ofilter->ost); @@ -894,6 +905,11 @@ int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost) break; } + ret = sch_connect(fgp->sch, SCH_FILTER_OUT(fgp->sch_idx, ofp->index), + SCH_ENC(sched_idx_enc), NULL, NULL); + if (ret < 0) + return ret; + fgp->nb_outputs_bound++; av_assert0(fgp->nb_outputs_bound <= fg->nb_outputs); @@ -1023,7 +1039,7 @@ static const AVClass fg_class = { .category = AV_CLASS_CATEGORY_FILTER, }; -int fg_create(FilterGraph **pfg, char *graph_desc) +int fg_create(FilterGraph **pfg, char *graph_desc, Scheduler *sch) { FilterGraphPriv *fgp; FilterGraph *fg; @@ -1044,6 +1060,7 @@ int fg_create(FilterGraph **pfg, char *graph_desc) fg->index = nb_filtergraphs - 1; fgp->graph_desc = graph_desc; fgp->disable_conversions = !auto_conversion_filters; + fgp->sch = sch; snprintf(fgp->log_name, sizeof(fgp->log_name), "fc#%d", fg->index); @@ -1103,6 +1120,12 @@ int fg_create(FilterGraph **pfg, char *graph_desc) goto fail; } + ret = sch_add_filtergraph(sch, fg->nb_inputs, fg->nb_outputs, + filter_thread, fgp); + if (ret < 0) + goto fail; + fgp->sch_idx = ret; + fail: avfilter_inout_free(&inputs); avfilter_inout_free(&outputs); @@ -1115,13 +1138,14 @@ fail: } int init_simple_filtergraph(InputStream *ist, OutputStream *ost, - char *graph_desc) + char *graph_desc, + Scheduler *sch, unsigned sched_idx_enc) { FilterGraph *fg; FilterGraphPriv *fgp; int ret; - ret = fg_create(&fg, graph_desc); + ret = fg_create(&fg, graph_desc, sch); if (ret < 0) return ret; fgp = fgp_from_fg(fg); @@ -1147,7 +1171,7 @@ int init_simple_filtergraph(InputStream *ist, OutputStream *ost, if (ret < 0) return ret; - ret = ofilter_bind_ost(fg->outputs[0], ost); + ret = ofilter_bind_ost(fg->outputs[0], ost, sched_idx_enc); if (ret < 0) return ret; diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c index 033894ae86..9628728d95 100644 --- a/fftools/ffmpeg_mux.c +++ b/fftools/ffmpeg_mux.c @@ -235,7 +235,7 @@ fail: return AVERROR(ENOMEM); } -static void *muxer_thread(void *arg) +void *muxer_thread(void *arg) { Muxer *mux = arg; OutputFile *of = &mux->of; @@ -557,7 +557,7 @@ static int thread_start(Muxer *mux) return 0; } -static int print_sdp(void) +int print_sdp(const char *filename) { char sdp[16384]; int i; @@ -590,19 +590,18 @@ static int print_sdp(void) if (ret < 0) goto fail; - if (!sdp_filename) { + if (!filename) { printf("SDP:\n%s\n", sdp); fflush(stdout); } else { - ret = avio_open2(&sdp_pb, sdp_filename, AVIO_FLAG_WRITE, &int_cb, NULL); + ret = avio_open2(&sdp_pb, filename, AVIO_FLAG_WRITE, &int_cb, NULL); if (ret < 0) { - av_log(NULL, AV_LOG_ERROR, "Failed to open sdp file '%s'\n", sdp_filename); + av_log(NULL, AV_LOG_ERROR, "Failed to open sdp file '%s'\n", filename); goto fail; } avio_print(sdp_pb, sdp); avio_closep(&sdp_pb); - av_freep(&sdp_filename); } // SDP successfully written, allow muxer threads to start @@ -638,7 +637,7 @@ int mux_check_init(Muxer *mux) nb_output_dumped++; if (sdp_filename || want_sdp) { - ret = print_sdp(); + ret = print_sdp(sdp_filename); if (ret < 0) { av_log(NULL, AV_LOG_ERROR, "Error writing the SDP.\n"); return ret; @@ -961,6 +960,8 @@ void of_free(OutputFile **pof) ost_free(&of->streams[i]); av_freep(&of->streams); + av_freep(&mux->sch_stream_idx); + av_dict_free(&mux->opts); av_packet_free(&mux->sq_pkt); diff --git a/fftools/ffmpeg_mux.h b/fftools/ffmpeg_mux.h index a2bb4dfc7d..d5aba6db36 100644 --- a/fftools/ffmpeg_mux.h +++ b/fftools/ffmpeg_mux.h @@ -24,6 +24,7 @@ #include #include +#include "ffmpeg_sched.h" #include "thread_queue.h" #include "libavformat/avformat.h" @@ -50,6 +51,9 @@ typedef struct MuxStream { EncStats stats; + int sched_idx; + int sched_idx_enc; + int64_t max_frames; /* @@ -94,6 +98,12 @@ typedef struct Muxer { AVFormatContext *fc; + Scheduler *sch; + + // OutputStream indices indexed by scheduler stream indices + int *sch_stream_idx; + int nb_sch_stream_idx; + pthread_t thread; ThreadQueue *tq; diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c index f35680e355..3380cbeb5c 100644 --- a/fftools/ffmpeg_mux_init.c +++ b/fftools/ffmpeg_mux_init.c @@ -23,6 +23,7 @@ #include "cmdutils.h" #include "ffmpeg.h" #include "ffmpeg_mux.h" +#include "ffmpeg_sched.h" #include "fopen_utf8.h" #include "libavformat/avformat.h" @@ -436,6 +437,9 @@ static MuxStream *mux_stream_alloc(Muxer *mux, enum AVMediaType type) ms->ost.class = &output_stream_class; + ms->sched_idx = -1; + ms->sched_idx_enc = -1; + snprintf(ms->log_name, sizeof(ms->log_name), "%cost#%d:%d", type_str ? *type_str : '?', mux->of.index, ms->ost.index); @@ -1123,6 +1127,22 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type, if (!ms) return AVERROR(ENOMEM); + // only streams with sources (i.e. not attachments) + // are handled by the scheduler + if (ist || ofilter) { + ret = GROW_ARRAY(mux->sch_stream_idx, mux->nb_sch_stream_idx); + if (ret < 0) + return ret; + + ret = sch_add_mux_stream(mux->sch, mux->of.index); + if (ret < 0) + return ret; + + av_assert0(ret == mux->nb_sch_stream_idx - 1); + mux->sch_stream_idx[ret] = ms->ost.index; + ms->sched_idx = ret; + } + ost = &ms->ost; if (o->streamid) { @@ -1166,7 +1186,12 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type, if (!ost->enc_ctx) return AVERROR(ENOMEM); - ret = enc_alloc(&ost->enc, enc); + ret = sch_add_enc(mux->sch, encoder_thread, ost, NULL); + if (ret < 0) + return ret; + ms->sched_idx_enc = ret; + + ret = enc_alloc(&ost->enc, enc, mux->sch, ms->sched_idx_enc); if (ret < 0) return ret; @@ -1421,23 +1446,48 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type, (type == AVMEDIA_TYPE_VIDEO || type == AVMEDIA_TYPE_AUDIO)) { if (ofilter) { ost->filter = ofilter; - ret = ofilter_bind_ost(ofilter, ost); + ret = ofilter_bind_ost(ofilter, ost, ms->sched_idx_enc); if (ret < 0) return ret; } else { - ret = init_simple_filtergraph(ost->ist, ost, filters); + ret = init_simple_filtergraph(ost->ist, ost, filters, + mux->sch, ms->sched_idx_enc); if (ret < 0) { av_log(ost, AV_LOG_ERROR, "Error initializing a simple filtergraph\n"); return ret; } } + + ret = sch_connect(mux->sch, SCH_ENC(ms->sched_idx_enc), + SCH_MSTREAM(ost->file_index, ms->sched_idx), + NULL, NULL); + if (ret < 0) + return ret; } else if (ost->ist) { - ret = ist_output_add(ost->ist, ost); - if (ret < 0) { + int sched_idx = ist_output_add(ost->ist, ost); + if (sched_idx < 0) { av_log(ost, AV_LOG_ERROR, "Error binding an input stream\n"); - return ret; + return sched_idx; + } + + if (ost->enc) { + ret = sch_connect(mux->sch, SCH_DEC(sched_idx), SCH_ENC(ms->sched_idx_enc), + NULL, NULL); + if (ret < 0) + return ret; + + ret = sch_connect(mux->sch, SCH_ENC(ms->sched_idx_enc), + SCH_MSTREAM(ost->file_index, ms->sched_idx), + NULL, NULL); + if (ret < 0) + return ret; + } else { + ret = sch_connect(mux->sch, SCH_DSTREAM(ost->ist->file_index, sched_idx), + SCH_MSTREAM(ost->file_index, ms->sched_idx), NULL, NULL); + if (ret < 0) + return ret; } } @@ -2617,7 +2667,7 @@ static Muxer *mux_alloc(void) return mux; } -int of_open(const OptionsContext *o, const char *filename) +int of_open(const OptionsContext *o, const char *filename, Scheduler *sch) { Muxer *mux; AVFormatContext *oc; @@ -2687,6 +2737,12 @@ int of_open(const OptionsContext *o, const char *filename) AVFMT_FLAG_BITEXACT); } + err = sch_add_mux(sch, muxer_thread, NULL, mux, + !strcmp(oc->oformat->name, "rtp")); + if (err < 0) + return err; + mux->sch = sch; + /* create all output streams for this file */ err = create_streams(mux, o); if (err < 0) diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c index 304471dd03..d463306546 100644 --- a/fftools/ffmpeg_opt.c +++ b/fftools/ffmpeg_opt.c @@ -28,6 +28,7 @@ #endif #include "ffmpeg.h" +#include "ffmpeg_sched.h" #include "cmdutils.h" #include "opt_common.h" #include "sync_queue.h" @@ -1157,20 +1158,22 @@ static int opt_audio_qscale(void *optctx, const char *opt, const char *arg) static int opt_filter_complex(void *optctx, const char *opt, const char *arg) { + Scheduler *sch = optctx; char *graph_desc = av_strdup(arg); if (!graph_desc) return AVERROR(ENOMEM); - return fg_create(NULL, graph_desc); + return fg_create(NULL, graph_desc, sch); } static int opt_filter_complex_script(void *optctx, const char *opt, const char *arg) { + Scheduler *sch = optctx; char *graph_desc = file_read(arg); if (!graph_desc) return AVERROR(EINVAL); - return fg_create(NULL, graph_desc); + return fg_create(NULL, graph_desc, sch); } void show_help_default(const char *opt, const char *arg) @@ -1262,8 +1265,9 @@ static const OptionGroupDef groups[] = { [GROUP_INFILE] = { "input url", "i", OPT_INPUT }, }; -static int open_files(OptionGroupList *l, const char *inout, - int (*open_file)(const OptionsContext*, const char*)) +static int open_files(OptionGroupList *l, const char *inout, Scheduler *sch, + int (*open_file)(const OptionsContext*, const char*, + Scheduler*)) { int i, ret; @@ -1283,7 +1287,7 @@ static int open_files(OptionGroupList *l, const char *inout, } av_log(NULL, AV_LOG_DEBUG, "Opening an %s file: %s.\n", inout, g->arg); - ret = open_file(&o, g->arg); + ret = open_file(&o, g->arg, sch); uninit_options(&o); if (ret < 0) { av_log(NULL, AV_LOG_ERROR, "Error opening %s file %s.\n", @@ -1296,7 +1300,7 @@ static int open_files(OptionGroupList *l, const char *inout, return 0; } -int ffmpeg_parse_options(int argc, char **argv) +int ffmpeg_parse_options(int argc, char **argv, Scheduler *sch) { OptionParseContext octx; const char *errmsg = NULL; @@ -1313,7 +1317,7 @@ int ffmpeg_parse_options(int argc, char **argv) } /* apply global options */ - ret = parse_optgroup(NULL, &octx.global_opts); + ret = parse_optgroup(sch, &octx.global_opts); if (ret < 0) { errmsg = "parsing global options"; goto fail; @@ -1323,7 +1327,7 @@ int ffmpeg_parse_options(int argc, char **argv) term_init(); /* open input files */ - ret = open_files(&octx.groups[GROUP_INFILE], "input", ifile_open); + ret = open_files(&octx.groups[GROUP_INFILE], "input", sch, ifile_open); if (ret < 0) { errmsg = "opening input files"; goto fail; @@ -1337,7 +1341,7 @@ int ffmpeg_parse_options(int argc, char **argv) } /* open output files */ - ret = open_files(&octx.groups[GROUP_OUTFILE], "output", of_open); + ret = open_files(&octx.groups[GROUP_OUTFILE], "output", sch, of_open); if (ret < 0) { errmsg = "opening output files"; goto fail; diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c new file mode 100644 index 0000000000..de7070906a --- /dev/null +++ b/fftools/ffmpeg_sched.c @@ -0,0 +1,1703 @@ +/* + * Inter-thread scheduling/synchronization. + * + * This file is part of FFmpeg. + * + * FFmpeg is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * FFmpeg is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with FFmpeg; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include +#include + +#include "cmdutils.h" +#include "ffmpeg_sched.h" +#include "sync_queue.h" +#include "thread_queue.h" + +// TODO: try to get rid of this +#include "ffmpeg.h" + +#include "libavcodec/bsf.h" +#include "libavcodec/packet.h" + +#include "libavutil/avassert.h" +#include "libavutil/error.h" +#include "libavutil/frame.h" +#include "libavutil/mem.h" +#include "libavutil/thread.h" + +// 100 ms +// XXX: some other value? make this dynamic? +#define SCHEDULE_TOLERANCE (100 * 1000) + +enum QueueType { + QUEUE_PACKETS, + QUEUE_FRAMES, +}; + +typedef struct SchTask { + SchThreadFunc func; + void *func_arg; + + pthread_t thread; + int thread_running; +} SchTask; + +typedef struct SchDec { + SchedulerNode src; + SchedulerNode *dst; + uint8_t *dst_finished; + unsigned nb_dst; + + SchTask task; + // Queue for receiving input packets, one stream. + ThreadQueue *queue; + + // temporary storage used by sch_dec_send() + AVFrame *send_frame; +} SchDec; + +typedef struct SchSyncQueue { + SyncQueue *sq; + AVFrame *frame; + pthread_mutex_t lock; + + unsigned *enc_idx; + unsigned nb_enc_idx; +} SchSyncQueue; + +typedef struct SchEnc { + SchedulerNode src; + SchedulerNode dst; + + // [0] - index of the sync queue in Scheduler.sq_enc, + // [1] - index of this encoder in the sq + int sq_idx[2]; + + pthread_mutex_t open_lock; + pthread_cond_t open_cond; + int (*open_cb)(void *opaque, const AVFrame *frame); + int can_open; + int opened; + + SchTask task; + // Queue for receiving input frames, one stream. + ThreadQueue *queue; +} SchEnc; + +typedef struct SchDemuxStream { + SchedulerNode *dst; + uint8_t *dst_finished; + unsigned nb_dst; +} SchDemuxStream; + +typedef struct SchDemux { + SchDemuxStream *streams; + unsigned nb_streams; + + SchTask task; + + pthread_mutex_t demux_lock; + pthread_cond_t demux_cond; + atomic_int can_demux; + int terminate; + + int finished; + + // temporary storage used by sch_demux_send() + AVPacket *send_pkt; + + // the following must not be accessed outside of schedule_update_locked() + int can_demux_prev; + int can_demux_next; +} SchDemux; + +typedef struct SchMuxStream { + SchedulerNode src; + SchedulerNode src_sched; + + // XXX + int (*hook)(void *opaque, void *item); + void *opaque; + + AVBSFContext *bsf_ctx; + AVPacket *bsf_pkt; + + //////////////////////////////////////////////////////////// + // The following are protected by Scheduler.schedule_lock // + + /* dts of the last packet sent to this stream + in AV_TIME_BASE_Q */ + int64_t last_dts; + int source_blocked; + // this stream no longer accepts input + int finished; + //////////////////////////////////////////////////////////// +} SchMuxStream; + +typedef struct SchMux { + SchMuxStream *streams; + unsigned nb_streams; + unsigned nb_streams_ready; + + int (*init)(void *arg); + + SchTask task; + ThreadQueue *queue; +} SchMux; + +typedef struct SchFilterIn { + SchedulerNode src; + SchedulerNode src_sched; +} SchFilterIn; + +typedef struct SchFilterOut { + SchedulerNode dst; +} SchFilterOut; + +typedef struct SchFilterGraph { + SchFilterIn *inputs; + unsigned nb_inputs; + + SchFilterOut *outputs; + unsigned nb_outputs; + + SchTask task; + ThreadQueue *queue; + + // protected by schedule_lock + int best_input; +} SchFilterGraph; + +struct Scheduler { + SchDemux *demux; + unsigned nb_demux; + + SchMux *mux; + unsigned nb_mux; + + unsigned nb_mux_ready; + pthread_mutex_t mux_ready_lock; + + unsigned nb_mux_done; + pthread_mutex_t mux_done_lock; + pthread_cond_t mux_done_cond; + + + SchDec *dec; + unsigned nb_dec; + + SchEnc *enc; + unsigned nb_enc; + + SchSyncQueue *sq_enc; + unsigned nb_sq_enc; + + SchFilterGraph *filters; + unsigned nb_filters; + + char *sdp_filename; + int sdp_auto; + + int transcode_started; + + pthread_mutex_t schedule_lock; +}; + +static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size, + enum QueueType type) +{ + ThreadQueue *tq; + ObjPool *op; + + op = (type == QUEUE_PACKETS) ? objpool_alloc_packets() : + objpool_alloc_frames(); + if (!op) + return AVERROR(ENOMEM); + + tq = tq_alloc(nb_streams, queue_size, op, + (type == QUEUE_PACKETS) ? pkt_move : frame_move); + if (!tq) { + objpool_free(&op); + return AVERROR(ENOMEM); + } + + *ptq = tq; + return 0; +} + +static int task_stop(SchTask *task) +{ + int ret; + void *thread_ret; + + if (!task->thread_running) + return 0; + + ret = pthread_join(task->thread, &thread_ret); + av_assert0(ret == 0); + + task->thread_running = 0; + + return (intptr_t)thread_ret; +} + +static int task_start(SchTask *task) +{ + int ret; + + av_assert0(!task->thread_running); + + ret = pthread_create(&task->thread, NULL, task->func, task->func_arg); + if (ret) { + av_log(NULL, AV_LOG_ERROR, "pthread_create() failed: %s\n", strerror(ret)); + return AVERROR(ret); + } + + task->thread_running = 1; + return 0; +} + +int sch_stop(Scheduler *sch) +{ + int ret = 0, err; + + // XXX ensure no threads can get stuck + + for (unsigned i = 0; i < sch->nb_demux; i++) { + SchDemux *d = &sch->demux[i]; + + pthread_mutex_lock(&d->demux_lock); + + d->terminate = 1; + atomic_store(&d->can_demux, 0); + pthread_cond_signal(&d->demux_cond); + + pthread_mutex_unlock(&d->demux_lock); + + err = task_stop(&d->task); + ret = err_merge(ret, err); + } + + for (unsigned i = 0; i < sch->nb_dec; i++) { + SchDec *dec = &sch->dec[i]; + + // XXX should not be needed? + //tq_send_finish(dec->queue, 0); + + err = task_stop(&dec->task); + ret = err_merge(ret, err); + } + + for (unsigned i = 0; i < sch->nb_filters; i++) { + SchFilterGraph *fg = &sch->filters[i]; + + err = task_stop(&fg->task); + ret = err_merge(ret, err); + } + + for (unsigned i = 0; i < sch->nb_enc; i++) { + SchEnc *enc = &sch->enc[i]; + + err = task_stop(&enc->task); + ret = err_merge(ret, err); + } + + for (unsigned i = 0; i < sch->nb_mux; i++) { + SchMux *mux = &sch->mux[i]; + + err = task_stop(&mux->task); + ret = err_merge(ret, err); + } + + return ret; +} + +void sch_free(Scheduler **psch) +{ + Scheduler *sch = *psch; + + if (!sch) + return; + + sch_stop(sch); + + for (unsigned i = 0; i < sch->nb_demux; i++) { + SchDemux *d = &sch->demux[i]; + + for (unsigned j = 0; j < d->nb_streams; j++) { + SchDemuxStream *ds = &d->streams[j]; + av_freep(&ds->dst); + av_freep(&ds->dst_finished); + } + av_freep(&d->streams); + + av_packet_free(&d->send_pkt); + + pthread_mutex_destroy(&d->demux_lock); + pthread_cond_destroy(&d->demux_cond); + } + av_freep(&sch->demux); + + for (unsigned i = 0; i < sch->nb_mux; i++) { + SchMux *mux = &sch->mux[i]; + + for (unsigned j = 0; j < mux->nb_streams; j++) { + SchMuxStream *ms = &mux->streams[j]; + + av_bsf_free(&ms->bsf_ctx); + av_packet_free(&ms->bsf_pkt); + } + + av_freep(&mux->streams); + + tq_free(&mux->queue); + } + av_freep(&sch->mux); + + for (unsigned i = 0; i < sch->nb_dec; i++) { + SchDec *dec = &sch->dec[i]; + + tq_free(&dec->queue); + + av_freep(&dec->dst); + av_freep(&dec->dst_finished); + + av_frame_free(&dec->send_frame); + } + av_freep(&sch->dec); + + for (unsigned i = 0; i < sch->nb_enc; i++) { + SchEnc *enc = &sch->enc[i]; + + tq_free(&enc->queue); + } + av_freep(&sch->enc); + + for (unsigned i = 0; i < sch->nb_sq_enc; i++) { + SchSyncQueue *sq = &sch->sq_enc[i]; + sq_free(&sq->sq); + av_frame_free(&sq->frame); + pthread_mutex_destroy(&sq->lock); + av_freep(&sq->enc_idx); + } + av_freep(&sch->sq_enc); + + for (unsigned i = 0; i < sch->nb_filters; i++) { + SchFilterGraph *fg = &sch->filters[i]; + + tq_free(&fg->queue); + + av_freep(&fg->inputs); + av_freep(&fg->outputs); + } + av_freep(&sch->filters); + + av_freep(&sch->sdp_filename); + + pthread_mutex_destroy(&sch->mux_ready_lock); + + pthread_mutex_destroy(&sch->mux_done_lock); + pthread_cond_destroy(&sch->mux_done_cond); + + av_freep(psch); +} + +Scheduler *sch_alloc(void) +{ + Scheduler *sch; + int ret; + + sch = av_mallocz(sizeof(*sch)); + if (!sch) + return NULL; + + sch->sdp_auto = 1; + + ret = pthread_mutex_init(&sch->mux_ready_lock, NULL); + if (ret) + goto fail; + + ret = pthread_mutex_init(&sch->mux_done_lock, NULL); + if (ret) + goto fail; + + ret = pthread_cond_init(&sch->mux_done_cond, NULL); + if (ret) + goto fail; + + return sch; +fail: + sch_free(&sch); + return NULL; +} + +int sch_sdp_filename(Scheduler *sch, const char *sdp_filename) +{ + av_freep(&sch->sdp_filename); + sch->sdp_filename = av_strdup(sdp_filename); + return sch->sdp_filename ? 0 : AVERROR(ENOMEM); +} + +int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *), + void *arg, int sdp_auto) +{ + SchMux *mux; + int ret; + + ret = GROW_ARRAY(sch->mux, sch->nb_mux); + if (ret < 0) + return ret; + + mux = &sch->mux[sch->nb_mux - 1]; + mux->task.func = func; + mux->task.func_arg = arg; + + mux->init = init; + + sch->sdp_auto &= sdp_auto; + + return mux - sch->mux; +} + +int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx) +{ + SchMux *mux; + unsigned stream_idx; + int ret; + + av_assert0(mux_idx < sch->nb_mux); + mux = &sch->mux[mux_idx]; + + ret = GROW_ARRAY(mux->streams, mux->nb_streams); + if (ret < 0) + return ret; + stream_idx = mux->nb_streams - 1; + + mux->streams[stream_idx].last_dts = AV_NOPTS_VALUE; + + return stream_idx; +} + +int sch_add_mux_stream_bsf(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, + const char *bsf) +{ + SchMuxStream *ms; + int ret; + + av_assert0(mux_idx < sch->nb_mux && stream_idx < sch->mux[mux_idx].nb_streams); + ms = &sch->mux[mux_idx].streams[stream_idx]; + + av_assert0(!ms->bsf_ctx); + + ret = av_bsf_list_parse_str(bsf, &ms->bsf_ctx); + if (ret < 0) + return ret; + + ms->bsf_pkt = av_packet_alloc(); + if (!ms->bsf_pkt) + return AVERROR(ENOMEM); + + return 0; +} + +int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *func_arg) +{ + SchDemux *d; + int ret; + + ret = GROW_ARRAY(sch->demux, sch->nb_demux); + if (ret < 0) + return ret; + + d = &sch->demux[sch->nb_demux - 1]; + d->task.func = func; + d->task.func_arg = func_arg; + + d->send_pkt = av_packet_alloc(); + if (!d->send_pkt) + return AVERROR(ENOMEM); + + ret = pthread_mutex_init(&d->demux_lock, NULL); + if (ret) + return AVERROR(errno); + + ret = pthread_cond_init(&d->demux_cond, NULL); + if (ret) + return AVERROR(errno); + + return d - sch->demux; +} + +int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx) +{ + SchDemux *d; + int ret; + + av_assert0(demux_idx < sch->nb_demux); + d = &sch->demux[demux_idx]; + + ret = GROW_ARRAY(d->streams, d->nb_streams); + return ret < 0 ? ret : d->nb_streams - 1; +} + +int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *func_arg) +{ + SchDec *dec; + int ret; + + ret = GROW_ARRAY(sch->dec, sch->nb_dec); + if (ret < 0) + return ret; + + dec = &sch->dec[sch->nb_dec - 1]; + dec->task.func = func; + dec->task.func_arg = func_arg; + + dec->send_frame = av_frame_alloc(); + if (!dec->send_frame) + return AVERROR(ENOMEM); + + ret = queue_alloc(&dec->queue, 1, 1, QUEUE_PACKETS); + if (ret < 0) + return ret; + + return dec - sch->dec; +} + +int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *func_arg, + int (*open_cb)(void *opaque, const AVFrame *frame)) +{ + SchEnc *enc; + int ret; + + ret = GROW_ARRAY(sch->enc, sch->nb_enc); + if (ret < 0) + return ret; + + enc = &sch->enc[sch->nb_enc - 1]; + enc->task.func = func; + enc->task.func_arg = func_arg; + enc->open_cb = open_cb; + + ret = pthread_mutex_init(&enc->open_lock, NULL); + if (ret) + return AVERROR(ret); + + ret = pthread_cond_init(&enc->open_cond, NULL); + if (ret) + return AVERROR(ret); + + enc->sq_idx[0] = -1; + enc->sq_idx[1] = -1; + + ret = queue_alloc(&enc->queue, 1, 1, QUEUE_FRAMES); + if (ret < 0) + return ret; + + return enc - sch->enc; +} + +int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs, + SchThreadFunc func, void *func_arg) +{ + SchFilterGraph *fg; + int ret; + + ret = GROW_ARRAY(sch->filters, sch->nb_filters); + if (ret < 0) + return ret; + fg = &sch->filters[sch->nb_filters - 1]; + + if (nb_inputs) { + fg->inputs = av_calloc(nb_inputs, sizeof(*fg->inputs)); + if (!fg->inputs) + return AVERROR(ENOMEM); + fg->nb_inputs = nb_inputs; + } + + if (nb_outputs) { + fg->outputs = av_calloc(nb_outputs, sizeof(*fg->outputs)); + if (!fg->outputs) + return AVERROR(ENOMEM); + fg->nb_outputs = nb_outputs; + } + + fg->best_input = nb_inputs ? 0 : -1; + fg->task.func = func; + fg->task.func_arg = func_arg; + + ret = queue_alloc(&fg->queue, fg->nb_inputs, 1, QUEUE_FRAMES); + if (ret < 0) + return ret; + + return fg - sch->filters; +} + +int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx) +{ + SchSyncQueue *sq; + int ret; + + ret = GROW_ARRAY(sch->sq_enc, sch->nb_sq_enc); + if (ret < 0) + return ret; + sq = &sch->sq_enc[sch->nb_sq_enc - 1]; + + sq->sq = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, logctx); + if (!sq->sq) + return AVERROR(ENOMEM); + + sq->frame = av_frame_alloc(); + if (!sq->frame) + return AVERROR(ENOMEM); + + ret = pthread_mutex_init(&sq->lock, NULL); + if (ret) + return AVERROR(ret); + + return sq - sch->sq_enc; +} + +int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx, + int limiting, uint64_t max_frames) +{ + SchSyncQueue *sq; + SchEnc *enc; + int ret; + + av_assert0(sq_idx < sch->nb_sq_enc); + sq = &sch->sq_enc[sq_idx]; + + av_assert0(enc_idx < sch->nb_enc); + enc = &sch->enc[enc_idx]; + + ret = GROW_ARRAY(sq->enc_idx, sq->nb_enc_idx); + if (ret < 0) + return ret; + sq->enc_idx[sq->nb_enc_idx - 1] = enc_idx; + + ret = sq_add_stream(sq->sq, limiting); + if (ret < 0) + return ret; + + enc->sq_idx[0] = sq_idx; + enc->sq_idx[1] = ret; + + if (max_frames != INT64_MAX) + sq_limit_frames(sq->sq, enc->sq_idx[1], max_frames); + + return 0; +} + +int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst, + int (*hook)(void *opaque, void *item), void *opaque) +{ + int ret; + + // XXX hack + if (hook) + av_assert0(src.type == SCH_NODE_TYPE_DEMUX && dst.type == SCH_NODE_TYPE_MUX); + + switch (src.type) { + case SCH_NODE_TYPE_DEMUX: { + SchDemuxStream *ds; + + av_assert0(src.idx < sch->nb_demux && + src.idx_stream < sch->demux[src.idx].nb_streams); + ds = &sch->demux[src.idx].streams[src.idx_stream]; + + ret = GROW_ARRAY(ds->dst, ds->nb_dst); + if (ret < 0) + return ret; + + ds->dst[ds->nb_dst - 1] = dst; + + // demuxed packets go to decoding or streamcopy + switch (dst.type) { + case SCH_NODE_TYPE_DEC: { + SchDec *dec; + + av_assert0(dst.idx < sch->nb_dec); + dec = &sch->dec[dst.idx]; + + av_assert0(!dec->src.type); + dec->src = src; + break; + } + case SCH_NODE_TYPE_MUX: { + SchMuxStream *ms; + + av_assert0(dst.idx < sch->nb_mux && + dst.idx_stream < sch->mux[dst.idx].nb_streams); + ms = &sch->mux[dst.idx].streams[dst.idx_stream]; + + av_assert0(!ms->src.type); + ms->src = src; + + ms->hook = hook; + ms->opaque = opaque; + + break; + } + default: av_assert0(0); + } + + break; + } + case SCH_NODE_TYPE_DEC: { + SchDec *dec; + + av_assert0(src.idx < sch->nb_dec); + dec = &sch->dec[src.idx]; + + ret = GROW_ARRAY(dec->dst, dec->nb_dst); + if (ret < 0) + return ret; + + dec->dst[dec->nb_dst - 1] = dst; + + // decoded frames go to filters or encoding + switch (dst.type) { + case SCH_NODE_TYPE_FILTER_IN: { + SchFilterIn *fi; + + av_assert0(dst.idx < sch->nb_filters && + dst.idx_stream < sch->filters[dst.idx].nb_inputs); + fi = &sch->filters[dst.idx].inputs[dst.idx_stream]; + + av_assert0(!fi->src.type); + fi->src = src; + break; + } + case SCH_NODE_TYPE_ENC: { + SchEnc *enc; + + av_assert0(dst.idx < sch->nb_enc); + enc = &sch->enc[dst.idx]; + + av_assert0(!enc->src.type); + enc->src = src; + break; + } + default: av_assert0(0); + } + + break; + } + case SCH_NODE_TYPE_FILTER_OUT: { + SchFilterOut *fo; + SchEnc *enc; + + av_assert0(src.idx < sch->nb_filters && + src.idx_stream < sch->filters[src.idx].nb_outputs); + // filtered frames go to encoding + av_assert0(dst.type == SCH_NODE_TYPE_ENC && + dst.idx < sch->nb_enc); + + fo = &sch->filters[src.idx].outputs[src.idx_stream]; + enc = &sch->enc[dst.idx]; + + av_assert0(!fo->dst.type && !enc->src.type); + fo->dst = dst; + enc->src = src; + + break; + } + case SCH_NODE_TYPE_ENC: { + SchEnc *enc; + SchMuxStream *ms; + + av_assert0(src.idx < sch->nb_enc); + // encoding packets go to muxing + av_assert0(dst.type == SCH_NODE_TYPE_MUX && + dst.idx < sch->nb_mux && + dst.idx_stream < sch->mux[dst.idx].nb_streams); + enc = &sch->enc[src.idx]; + ms = &sch->mux[dst.idx].streams[dst.idx_stream]; + + av_assert0(!enc->dst.type && !ms->src.type); + enc->dst = dst; + ms->src = src; + + break; + } + default: av_assert0(0); + } + + return 0; +} + +static int mux_init(Scheduler *sch, SchMux *mux) +{ + int ret; + + ret = mux->init(mux->task.func_arg); + if (ret < 0) + return ret; + + sch->nb_mux_ready++; + + // XXX: test this + if (sch->sdp_filename || sch->sdp_auto) { + if (sch->nb_mux_ready < sch->nb_mux) + return 0; + + ret = print_sdp(sch->sdp_filename); + if (ret < 0) { + av_log(NULL, AV_LOG_ERROR, "Error writing the SDP.\n"); + return ret; + } + + /* SDP is written only after all the muxers are ready, so now we + * start ALL the threads */ + for (int i = 0; i < sch->nb_mux; i++) { + ret = task_start(&sch->mux[i].task); + if (ret < 0) + return ret; + } + } else { + ret = task_start(&mux->task); + if (ret < 0) + return ret; + } + + // XXX +#if 0 + /* flush the muxing queues */ + for (int i = 0; i < fc->nb_streams; i++) { + OutputStream *ost = mux->of.streams[i]; + MuxStream *ms = ms_from_ost(ost); + AVPacket *pkt; + + while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) { + ret = thread_submit_packet(mux, ost, pkt); + if (pkt) { + ms->muxing_queue_data_size -= pkt->size; + av_packet_free(&pkt); + } + if (ret < 0) + return ret; + } + } +#endif + + return 0; +} + +static int64_t trailing_dts(const Scheduler *sch) +{ + int64_t min_dts = INT64_MAX; + + for (unsigned i = 0; i < sch->nb_mux; i++) { + const SchMux *mux = &sch->mux[i]; + + for (int j = 0; j < mux->nb_streams; j++) { + const SchMuxStream *ms = &mux->streams[j]; + + if (ms->finished) + continue; + if (ms->last_dts == AV_NOPTS_VALUE) + return AV_NOPTS_VALUE; + + min_dts = FFMIN(min_dts, ms->last_dts); + } + } + + return min_dts == INT64_MAX ? AV_NOPTS_VALUE : min_dts; +} + +static void schedule_update_locked(Scheduler *sch) +{ + int64_t dts = trailing_dts(sch); + + // initialize our internal state + // XXX handle filtergraphs with no inputs here + for (unsigned i = 0; i < sch->nb_demux; i++) { + SchDemux *d = &sch->demux[i]; + d->can_demux_prev = atomic_load(&d->can_demux); + d->can_demux_next = 0; + } + + // figure out the sources that are allowed to proceed + for (unsigned i = 0; i < sch->nb_mux; i++) { + SchMux *mux = &sch->mux[i]; + + for (int j = 0; j < mux->nb_streams; j++) { + SchMuxStream *ms = &mux->streams[j]; + SchDemux *d; + + // unblock sources for output streams that are not finished + // and not too far ahead of the trailing stream + if (ms->finished) + continue; + if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE) + continue; + if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE) + continue; + + // for outputs fed from filtergraphs, consider that filtergraph's + // best_input information, in other cases there is a well-defined + // source demuxer + if (ms->src_sched.type == SCH_NODE_TYPE_FILTER_OUT) { + SchFilterGraph *fg = &sch->filters[ms->src_sched.idx]; + SchFilterIn *fi; + + // XXX handle filtergraphs with no inputs here + if (fg->best_input < 0) + continue; + fi = &fg->inputs[fg->best_input]; + + d = &sch->demux[fi->src_sched.idx]; + } else + d = &sch->demux[ms->src_sched.idx]; + + d->can_demux_next = 1; + } + } + + for (unsigned i = 0; i < sch->nb_demux; i++) { + SchDemux *d = &sch->demux[i]; + + if (d->can_demux_prev == d->can_demux_next) + continue; + + pthread_mutex_lock(&d->demux_lock); + + atomic_store(&d->can_demux, d->can_demux_next); + pthread_cond_signal(&d->demux_cond); + + pthread_mutex_unlock(&d->demux_lock); + } +} + +int sch_start(Scheduler *sch) +{ + int ret; + + sch->transcode_started = 1; + + // XXX add comprehensive logging + + for (unsigned i = 0; i < sch->nb_mux; i++) { + SchMux *mux = &sch->mux[i]; + + for (unsigned j = 0; j < mux->nb_streams; j++) { + SchMuxStream *ms = &mux->streams[j]; + + switch (ms->src.type) { + case SCH_NODE_TYPE_ENC: { + SchEnc *enc = &sch->enc[ms->src.idx]; + if (enc->src.type == SCH_NODE_TYPE_DEC) { + ms->src_sched = sch->dec[enc->src.idx].src; + av_assert0(ms->src_sched.type == SCH_NODE_TYPE_DEMUX); + } else { + ms->src_sched = enc->src; + av_assert0(ms->src_sched.type == SCH_NODE_TYPE_FILTER_OUT); + } + break; + } + case SCH_NODE_TYPE_DEMUX: + ms->src_sched = ms->src; + break; + default: + av_log(NULL, AV_LOG_ERROR, + "Muxer stream #%u:%u not connected to a source\n", i, j); + return AVERROR(EINVAL); + } + } + + // XXX should be special buffering queue + ret = queue_alloc(&mux->queue, mux->nb_streams, 1, QUEUE_PACKETS); + if (ret < 0) + return ret; + + if (mux->nb_streams_ready == mux->nb_streams) { + ret = mux_init(sch, mux); + if (ret < 0) + return ret; + } + } + + for (unsigned i = 0; i < sch->nb_enc; i++) { + SchEnc *enc = &sch->enc[i]; + + if (!enc->src.type) { + av_log(NULL, AV_LOG_ERROR, + "Encoder %u not connected to a source\n", i); + return AVERROR(EINVAL); + } + if (!enc->dst.type) { + av_log(NULL, AV_LOG_ERROR, + "Encoder %u not connected to a sink\n", i); + return AVERROR(EINVAL); + } + + ret = task_start(&enc->task); + if (ret < 0) + return ret; + } + + for (unsigned i = 0; i < sch->nb_filters; i++) { + SchFilterGraph *fg = &sch->filters[i]; + + for (unsigned j = 0; j < fg->nb_inputs; j++) { + SchFilterIn *fi = &fg->inputs[j]; + + if (!fi->src.type) { + av_log(NULL, AV_LOG_ERROR, + "Filtergraph %u input %u not connected to a source\n", i, j); + return AVERROR(EINVAL); + } + + fi->src_sched = sch->dec[fi->src.idx].src; + } + + for (unsigned j = 0; j < fg->nb_outputs; j++) { + SchFilterOut *fo = &fg->outputs[j]; + + if (!fo->dst.type) { + av_log(NULL, AV_LOG_ERROR, + "Filtergraph %u output %u not connected to a sink\n", i, j); + return AVERROR(EINVAL); + } + } + + ret = task_start(&fg->task); + if (ret < 0) + return ret; + } + + for (unsigned i = 0; i < sch->nb_dec; i++) { + SchDec *dec = &sch->dec[i]; + + if (!dec->src.type) { + av_log(NULL, AV_LOG_ERROR, + "Decoder %u not connected to a source\n", i); + return AVERROR(EINVAL); + } + if (!dec->nb_dst) { + av_log(NULL, AV_LOG_ERROR, + "Decoder %u not connected to any sink\n", i); + return AVERROR(EINVAL); + } + + dec->dst_finished = av_calloc(dec->nb_dst, sizeof(*dec->dst_finished)); + if (!dec->dst_finished) + return AVERROR(ENOMEM); + + ret = task_start(&dec->task); + if (ret < 0) + return ret; + } + + for (unsigned i = 0; i < sch->nb_demux; i++) { + SchDemux *d = &sch->demux[i]; + + // XXX: is this useful? + if (!d->nb_streams) + continue; + + for (unsigned j = 0; j < d->nb_streams; j++) { + SchDemuxStream *ds = &d->streams[j]; + + if (!ds->nb_dst) { + av_log(NULL, AV_LOG_ERROR, + "Demuxer stream #%u:%u not connected to any sink\n", i, j); + return AVERROR(EINVAL); + } + + ds->dst_finished = av_calloc(ds->nb_dst, sizeof(*ds->dst_finished)); + if (!ds->dst_finished) + return AVERROR(ENOMEM); + } + + ret = task_start(&d->task); + if (ret < 0) + return ret; + } + + pthread_mutex_lock(&sch->schedule_lock); + schedule_update_locked(sch); + pthread_mutex_unlock(&sch->schedule_lock); + + return 0; +} + +int sch_wait(Scheduler *sch, uint64_t timeout_us) +{ + int ret; + + pthread_mutex_lock(&sch->mux_done_lock); + + if (sch->nb_mux_done < sch->nb_mux) { + struct timespec tv = { .tv_sec = timeout_us / 1000000, + .tv_nsec = (timeout_us % 1000000) * 1000 }; + pthread_cond_timedwait(&sch->mux_done_cond, &sch->mux_done_lock, &tv); + } + + ret = sch->nb_mux_done == sch->nb_mux; + + pthread_mutex_unlock(&sch->mux_done_lock); + + return ret; +} + +static void enc_allow_open(SchEnc *enc) +{ + pthread_mutex_lock(&enc->open_lock); + + enc->can_open = 1; + pthread_cond_signal(&enc->open_cond); + + pthread_mutex_unlock(&enc->open_lock); +} + +static int sch_enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame) +{ + int ret; + + // this is called from the sending thread (filter/decoder), + // so wait until the encoding thread is ready to be opened + pthread_mutex_lock(&enc->open_lock); + + while (!enc->can_open) + pthread_cond_wait(&enc->open_cond, &enc->open_lock); + + pthread_mutex_unlock(&enc->open_lock); + + ret = enc->open_cb(enc->task.func_arg, frame); + if (ret < 0) + return ret; + + // ret>0 signals audio frame size, which means sync queue should + // have been enabled during encoder creation + if (ret > 0) { + av_assert0(enc->sq_idx[0] >= 0); + sq_frame_samples(sch->sq_enc[enc->sq_idx[0]].sq, enc->sq_idx[1], ret); + } + + return 0; +} + +static int send_to_enc_thread(Scheduler *sch, SchEnc *enc, AVFrame *frame) +{ + if (frame) + return tq_send(enc->queue, 0, frame); + + tq_send_finish(enc->queue, 0); + return 0; +} + +static int send_to_sq(Scheduler *sch, SchSyncQueue *sq, + AVFrame *frame, unsigned stream_idx) +{ + int ret = 0; + + pthread_mutex_lock(&sq->lock); + + ret = sq_send(sq->sq, stream_idx, SQFRAME(frame)); + if (ret < 0) + goto finish; + + while (1) { + SchEnc *enc; + + // TODO: the SQ API should be extended to allow returning EOF + // for individual streams + ret = sq_receive(sq->sq, -1, SQFRAME(sq->frame)); + if (ret == AVERROR(EAGAIN)) { + ret = 0; + goto finish; + } else if (ret < 0) { + // close all encoders fed from this sync queue + for (unsigned i = 0; i < sq->nb_enc_idx; i++) { + int err = send_to_enc_thread(sch, &sch->enc[sq->enc_idx[i]], NULL); + + // if the sync queue error is EOF and closing the encoder + // produces a more serious error, make sure to pick the latter + ret = err_merge((ret == AVERROR_EOF && err < 0) ? 0 : ret, err); + } + goto finish; + } + + enc = &sch->enc[sq->enc_idx[ret]]; + ret = send_to_enc_thread(sch, enc, sq->frame); + av_frame_unref(sq->frame); + if (ret < 0) { + sq_send(sq->sq, enc->sq_idx[1], SQFRAME(NULL)); + goto finish; + } + } + +finish: + pthread_mutex_unlock(&sq->lock); + + return ret; +} + +static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame) +{ + if (enc->open_cb && frame && !enc->opened) { + int ret = sch_enc_open(sch, enc, frame); + if (ret < 0) + return ret; + enc->opened = 1; + } + + return (enc->sq_idx[0] >= 0) ? + send_to_sq(sch, &sch->sq_enc[enc->sq_idx[0]], frame, enc->sq_idx[1]) : + send_to_enc_thread(sch, enc, frame); +} + +static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx, + AVPacket *pkt) +{ + SchMuxStream *ms = &mux->streams[stream_idx]; + int64_t dts = AV_NOPTS_VALUE; + + if (pkt) { + int ret; + + if (pkt->dts != AV_NOPTS_VALUE) + dts = av_rescale_q(pkt->dts, pkt->time_base, AV_TIME_BASE_Q); + + ret = tq_send(mux->queue, stream_idx, pkt); + if (ret < 0) + return ret; + } else + tq_send_finish(mux->queue, stream_idx); + + // TODO: use atomics to check whether this changes trailing dts + // to avoid locking unnecesarily + if (dts != AV_NOPTS_VALUE || !pkt) { + pthread_mutex_lock(&sch->schedule_lock); + + if (pkt) ms->last_dts = dts; + else ms->finished = 1; + + schedule_update_locked(sch); + + pthread_mutex_unlock(&sch->schedule_lock); + } + + return 0; +} + +static int +demux_stream_send_to_dst(Scheduler *sch, const SchedulerNode dst, + uint8_t *dst_finished, AVPacket *pkt) +{ + int ret; + + if (*dst_finished) + return AVERROR_EOF; + + if (!pkt) + goto finish; + + if (dst.type == SCH_NODE_TYPE_MUX) { + SchMux *mux = &sch->mux[dst.idx]; + SchMuxStream *ms = &mux->streams[dst.idx_stream]; + + // XXX check if this can be dropped + if (ms->hook) { + ret = ms->hook(ms->opaque, pkt); + if (ret == AVERROR_EOF) + goto finish; + else if (ret < 0) + return (ret == AVERROR(EAGAIN)) ? 0 : ret; + } + + ret = send_to_mux(sch, mux, dst.idx_stream, pkt); + } else + ret = tq_send(sch->dec[dst.idx].queue, 0, pkt); + + if (ret == AVERROR_EOF) + goto finish; + + return ret; + +finish: + if (dst.type == SCH_NODE_TYPE_MUX) + send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt); + else + tq_send_finish(sch->dec[dst.idx].queue, 0); + + *dst_finished = 1; + return AVERROR_EOF; +} + +static int demux_send_for_stream(Scheduler *sch, SchDemux *d, + SchDemuxStream *ds, AVPacket *pkt) +{ + unsigned nb_done = 0; + + for (unsigned i = 0; i < ds->nb_dst; i++) { + AVPacket *to_send = pkt; + uint8_t *finished = &ds->dst_finished[i]; + + int ret; + + // sending a packet consumes it, so make a temporary reference if needed + if (pkt && !*finished && i < ds->nb_dst - 1) { + to_send = d->send_pkt; + + ret = av_packet_ref(to_send, pkt); + if (ret < 0) + return ret; + } + + ret = demux_stream_send_to_dst(sch, ds->dst[i], finished, to_send); + if (to_send) + av_packet_unref(to_send); + if (ret == AVERROR_EOF) + nb_done++; + else if (ret < 0) + return ret; + } + + // XXX sub2video_heartbeat() + + return (nb_done == ds->nb_dst) ? AVERROR_EOF : 0; +} + +int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt) +{ + SchDemux *d; + int ret = 0, terminate = 0; + + av_assert0(demux_idx < sch->nb_demux); + d = &sch->demux[demux_idx]; + + if (d->finished) + return AVERROR_EXIT; + + // sleep until the scheduling algorithm allows us to proceed + if (!atomic_load(&d->can_demux)) { + + pthread_mutex_lock(&d->demux_lock); + + while (!atomic_load(&d->can_demux) && !d->terminate) + pthread_cond_wait(&d->demux_cond, &d->demux_lock); + + terminate = d->terminate; + + pthread_mutex_unlock(&d->demux_lock); + } + + if (!pkt || terminate) { + for (unsigned i = 0; i < d->nb_streams; i++) { + ret = demux_send_for_stream(sch, d, &d->streams[i], NULL); + av_assert0(ret >= 0 || ret == AVERROR_EOF); + } + + pthread_mutex_lock(&sch->schedule_lock); + + d->finished = 1; + schedule_update_locked(sch); + + pthread_mutex_unlock(&sch->schedule_lock); + + return pkt ? AVERROR_EXIT : 0; + } + + + av_assert0(pkt->stream_index < d->nb_streams); + + return demux_send_for_stream(sch, d, &d->streams[pkt->stream_index], pkt); +} + +int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt) +{ + SchMux *mux; + int ret, stream_idx; + + av_assert0(mux_idx < sch->nb_mux); + mux = &sch->mux[mux_idx]; + + ret = tq_receive(mux->queue, &stream_idx, pkt); + pkt->stream_index = stream_idx; + return ret; +} + +void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, int stream_idx) +{ + SchMux *mux; + + av_assert0(mux_idx < sch->nb_mux); + mux = &sch->mux[mux_idx]; + + pthread_mutex_lock(&sch->schedule_lock); + + if (stream_idx >= 0) { + av_assert0(stream_idx < mux->nb_streams); + tq_receive_finish(mux->queue, stream_idx); + mux->streams[stream_idx].finished = 1; + } else { + // the muxer as a whole is done + for (unsigned i = 0; i < mux->nb_streams; i++) { + tq_receive_finish(mux->queue, i); + mux->streams[i].finished = 1; + } + + pthread_mutex_lock(&sch->mux_done_lock); + + av_assert0(sch->nb_mux_done < sch->nb_mux); + sch->nb_mux_done++; + + pthread_cond_signal(&sch->mux_done_cond); + + pthread_mutex_unlock(&sch->mux_done_lock); + } + + schedule_update_locked(sch); + + pthread_mutex_unlock(&sch->schedule_lock); +} + +int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx) +{ + SchMux *mux; + int ret = 0; + + av_assert0(mux_idx < sch->nb_mux); + mux = &sch->mux[mux_idx]; + + av_assert0(stream_idx < mux->nb_streams); + + pthread_mutex_lock(&sch->mux_ready_lock); + + av_assert0(mux->nb_streams_ready < mux->nb_streams); + + // this may be called during initialization - do not start + // threads before sch_start() is called + if (++mux->nb_streams_ready == mux->nb_streams && sch->transcode_started) + ret = mux_init(sch, mux); + + pthread_mutex_unlock(&sch->mux_ready_lock); + + return ret; +} + +int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt) +{ + SchDec *dec; + int ret, dummy; + + av_assert0(dec_idx < sch->nb_dec); + dec = &sch->dec[dec_idx]; + + ret = tq_receive(dec->queue, &dummy, pkt); + av_assert0(dummy <= 0); + + return ret; +} + +static int send_to_filter(Scheduler *sch, SchFilterGraph *fg, + unsigned in_idx, AVFrame *frame) +{ + if (frame) + return tq_send(fg->queue, in_idx, frame); + + tq_send_finish(fg->queue, in_idx); + return 0; +} + +static int dec_send_to_dst(Scheduler *sch, const SchedulerNode dst, + uint8_t *dst_finished, AVFrame *frame) +{ + int ret; + + if (*dst_finished) + return AVERROR_EOF; + + if (!frame) + goto finish; + + ret = (dst.type == SCH_NODE_TYPE_FILTER_IN) ? + send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame) : + send_to_enc(sch, &sch->enc[dst.idx], frame); + if (ret == AVERROR_EOF) + goto finish; + + return ret; + +finish: + if (dst.type == SCH_NODE_TYPE_FILTER_IN) + send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL); + else + send_to_enc(sch, &sch->enc[dst.idx], NULL); + + *dst_finished = 1; + + return AVERROR_EOF; +} + +int sch_dec_send(Scheduler *sch, unsigned dec_idx, AVFrame *frame) +{ + SchDec *dec; + int ret = 0; + unsigned nb_done = 0; + + av_assert0(dec_idx < sch->nb_dec); + dec = &sch->dec[dec_idx]; + + for (unsigned i = 0; i < dec->nb_dst; i++) { + uint8_t *finished = &dec->dst_finished[i]; + + AVFrame *to_send = frame; + + // sending a frame consumes it, so make a temporary reference if needed + if (frame && !*finished && i < dec->nb_dst - 1) { + to_send = dec->send_frame; + + // frame may sometimes contain props only, + // e.g. to signal EOF timestamp + ret = frame->buf[0] ? av_frame_ref(to_send, frame) : + av_frame_copy_props(to_send, frame); + if (ret < 0) + return ret; + } + + ret = dec_send_to_dst(sch, dec->dst[i], finished, to_send); + if (to_send) + av_frame_unref(to_send); + if (ret == AVERROR_EOF) { + nb_done++; + ret = 0; + continue; + } else if (ret < 0) + goto finish; + } + +finish: + // close the decoder's input queue at the end + if (!frame) { + tq_receive_finish(dec->queue, 0); + return 0; + } + + return ret < 0 ? ret : + (nb_done == dec->nb_dst) ? AVERROR_EOF : 0; +} + +int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame) +{ + SchEnc *enc; + int ret, dummy; + + av_assert0(enc_idx < sch->nb_enc); + enc = &sch->enc[enc_idx]; + + if (!enc->can_open) + enc_allow_open(enc); + + ret = tq_receive(enc->queue, &dummy, frame); + av_assert0(dummy <= 0); + + return ret; +} + +int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt) +{ + SchEnc *enc; + int ret; + + av_assert0(enc_idx < sch->nb_enc); + enc = &sch->enc[enc_idx]; + + // XXX this is for cases when sch_enc_receive() was never called + // should it be here or in sch_stop()? + if (!enc->can_open) { + av_assert0(!pkt); + enc_allow_open(enc); + } + + ret = send_to_mux(sch, &sch->mux[enc->dst.idx], enc->dst.idx_stream, pkt); + if (pkt) + av_packet_unref(pkt); + else { + // close the encoder's input queue at the end + tq_receive_finish(enc->queue, 0); + } + + return ret; +} + +int sch_filter_receive(Scheduler *sch, unsigned fg_idx, + int *in_idx, AVFrame *frame) +{ + SchFilterGraph *fg; + int ret; + + av_assert0(fg_idx < sch->nb_filters); + fg = &sch->filters[fg_idx]; + + av_assert0((*in_idx >= 0 || !fg->nb_inputs) && + *in_idx < (int)fg->nb_inputs); + + // update scheduling to account for desired input stream, if it changed + if (*in_idx != fg->best_input) { + pthread_mutex_lock(&sch->schedule_lock); + + fg->best_input = *in_idx; + schedule_update_locked(sch); + + pthread_mutex_unlock(&sch->schedule_lock); + } + + // XXX: handle graphs with no inputs + + ret = tq_receive(fg->queue, in_idx, frame); + av_assert0(*in_idx < (int)fg->nb_inputs); + + return ret; +} + +int sch_filter_send(Scheduler *sch, unsigned fg_idx, int out_idx, AVFrame *frame) +{ + SchFilterGraph *fg; + int ret; + + av_assert0(fg_idx < sch->nb_filters); + fg = &sch->filters[fg_idx]; + + if (out_idx < 0) { + av_assert0(!frame); + + for (unsigned i = 0; i < fg->nb_inputs; i++) + tq_receive_finish(fg->queue, i); + + // XXX update schedule here? + + for (unsigned i = 0; i < fg->nb_outputs; i++) { + SchEnc *enc = &sch->enc[fg->outputs[i].dst.idx]; + int err = send_to_enc(sch, enc, NULL); + ret = err_merge(ret, err); + } + + return ret; + } + + av_assert0(out_idx < fg->nb_outputs); + ret = send_to_enc(sch, &sch->enc[fg->outputs[out_idx].dst.idx], frame); + + if (frame) + av_frame_unref(frame); + + return ret; +} diff --git a/fftools/ffmpeg_sched.h b/fftools/ffmpeg_sched.h new file mode 100644 index 0000000000..d1f1a006b4 --- /dev/null +++ b/fftools/ffmpeg_sched.h @@ -0,0 +1,414 @@ +/* + * Inter-thread scheduling/synchronization. + * + * This file is part of FFmpeg. + * + * FFmpeg is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * FFmpeg is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with FFmpeg; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef FFTOOLS_FFMPEG_SCHED_H +#define FFTOOLS_FFMPEG_SCHED_H + +/* + * This file contains the API for the transcode scheduler. + * + * Overall architecture of the transcoding process involves instances of the + * following components: + * - demuxers, each containing any number of streams; demuxed packets belonging + * to some stream are sent to any number of decoders (transcoding) or + * muxers (streamcopy); + * - decoders, which receive encoded packets from some demuxed stream, decode + * them, and send decoded frames to any number of filtergraph inputs + * (audio/video) or encoders (subtitles); + * - filtergraphs, which receive decoded frames from some decoder on every + * input, filter them, and send filtered frames from each output to some + * encoder; a generic filtergraph may have zero or more inputs (0 in case the + * filtergraph contains a lavfi source filter), and one or more outputs; the + * inputs and outputs need not have matching media types; + * - encoders, which receive decoded frames from some decoder (subtitles) or + * some filtergraph output (audio/video), encode them, and send encoded + * packets to some muxed stream; + * - muxers, each containing any number of muxed streams; each muxed stream + * receives encoded packets from some demuxed stream (streamcopy) or some + * encoder (transcoding); those packets are interleaved and written out by the + * muxer. + * + * There must be at least one muxer instance, otherwise the transcode produces + * no output and is meaningless. Otherwise, in a generic transcoding scenario + * there may be arbitrary number of instances of any of the above components, + * interconnected in various ways. + * + * The code tries to keep all the output streams across all the muxers in sync + * (i.e. at the same DTS), which is accomplished by varying the rates at which + * packets are read from different demuxers. Note that the degree of control we + * have over synchronization is fundamentally limited - if some demuxed streams + * in the same input are interleaved at different rates than that at which they + * are to be muxed (e.g. because an input file is badly interleaved, or the user + * changed their speed by mismatching amounts), then there will be increasing + * amounts of buffering followed by eventual transcoding failure. + * + * N.B. 1: there are meaningful transcode scenarios with no demuxers, e.g. + * - encoding and muxing output from filtergraph(s) that have no inputs; + * - creating a file that contains nothing but attachments and/or metadata. + * + * N.B. 2: a filtergraph output could, in principle, feed multiple encoders, but + * this is unnecessary because the (a)split filter provides the same + * functionality. + * + * The scheduler, in the above model, is the master object that oversees and + * facilitates the transcoding process. The basic idea is that all instances + * of the abovementioned components communicate only with the scheduler and not + * with each other. The scheduler is then the single place containing the + * knowledge about the whole transcoding pipeline. + */ + +typedef struct Scheduler Scheduler; + +enum SchedulerNodeType { + SCH_NODE_TYPE_NONE = 0, + SCH_NODE_TYPE_DEMUX, + SCH_NODE_TYPE_MUX, + SCH_NODE_TYPE_DEC, + SCH_NODE_TYPE_ENC, + SCH_NODE_TYPE_FILTER_IN, + SCH_NODE_TYPE_FILTER_OUT, +}; + +typedef struct SchedulerNode { + enum SchedulerNodeType type; + unsigned idx; + unsigned idx_stream; +} SchedulerNode; + +typedef void* (*SchThreadFunc)(void *arg); + +#define SCH_DSTREAM(file, stream) \ + (SchedulerNode){ .type = SCH_NODE_TYPE_DEMUX, \ + .idx = file, .idx_stream = stream } +#define SCH_MSTREAM(file, stream) \ + (SchedulerNode){ .type = SCH_NODE_TYPE_MUX, \ + .idx = file, .idx_stream = stream } +#define SCH_DEC(decoder) \ + (SchedulerNode){ .type = SCH_NODE_TYPE_DEC, \ + .idx = decoder } +#define SCH_ENC(encoder) \ + (SchedulerNode){ .type = SCH_NODE_TYPE_ENC, \ + .idx = encoder } +#define SCH_FILTER_IN(filter, input) \ + (SchedulerNode){ .type = SCH_NODE_TYPE_FILTER_IN, \ + .idx = filter, .idx_stream = input } +#define SCH_FILTER_OUT(filter, output) \ + (SchedulerNode){ .type = SCH_NODE_TYPE_FILTER_OUT, \ + .idx = filter, .idx_stream = output } + +Scheduler *sch_alloc(void); +void sch_free(Scheduler **sch); + +int sch_start(Scheduler *sch); +int sch_stop(Scheduler *sch); + +/** + * Wait until transcoding terminates or the specified timeout elapses. + * + * @param timeout_us Amount of time in microseconds after which this function + * will timeout. + * + * @retval 0 waiting timed out, transcoding is not finished + * @retval 1 transcoding is finished + */ +int sch_wait(Scheduler *sch, uint64_t timeout_us); + +/** + * Add a demuxer to the scheduler. + * + * @retval ">=0" Index of the newly-created demuxer. + * @retval "<0" Error code. + */ +int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *func_arg); +/** + * Add a demuxed stream for a previously added demuxer. + * + * @param demux_idx index previously returned by sch_add_demux() + * + * @retval ">=0" Index of the newly-created demuxed stream. + * @retval "<0" Error code. + */ +int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx); + +/** + * Add a decoder to the scheduler. + * + * @retval ">=0" Index of the newly-created decoder. + * @retval "<0" Error code. + */ +int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *func_arg); + +/** + * Add a filtergraph to the scheduler. + * + * @param nb_inputs Number of filtergraph inputs. + * @param nb_outputs number of filtergraph outputs + * + * @retval ">=0" Index of the newly-created filtergraph. + * @retval "<0" Error code. + */ +int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs, + SchThreadFunc func, void *func_arg); + +/** + * Add a muxer to the scheduler. + * + * Note that muxer thread startup is more complicated than for other components, + * because + * - muxer streams fed by audio/video encoders become initialized dynamically at + * runtime, after those encoders receive their first frame and initialize + * themselves, followed by calling sch_mux_stream_ready() + * - the header can be written after all the streams for a muxer are initialized + * - we may need to write an SDP, which must happen + * - AFTER all the headers are written + * - BEFORE any packets are written by any muxer + * - with all the muxers quiescent + * To avoid complicated muxer-thread synchronization dances, we postpone + * starting the muxer threads until after the SDP is written. The sequence of + * events is then as follows: + * - After sch_mux_stream_ready() is called for all the streams in a given muxer, + * the header for that muxer is written (care is taken that headers for + * different muxers are not written concurrently, since they write file + * information to stderr). If SDP is not wanted, the muxer thread then starts + * and muxing begins. + * - When SDP _is_ wanted, no muxer threads start until the header for the last + * muxer is written. After that, the SDP is written, after which all the muxer + * threads are started at once. + * + * In order, for the above to work, the scheduler needs to be able to invoke + * just writing the header, which is the reason the init parameter exists. + * + * @param init Callback that is called to initialize the muxer and write the + * header. Called after sch_mux_stream_ready() is called for all the + * streams in the muxer. + * @param sdp_auto Determines automatic SDP writing - see sch_sdp_filename(). + * + * @retval ">=0" Index of the newly-created muxer. + * @retval "<0" Error code. + */ +int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *), + void *arg, int sdp_auto); +/** + * Add a muxed stream for a previously added muxer. + * + * @param mux_idx index previously returned by sch_add_mux() + * + * @retval ">=0" Index of the newly-created muxed stream. + * @retval "<0" Error code. + */ +int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx); + +/** + * Signal to the scheduler that the specified muxed stream is initialized and + * ready. Muxing is started once all the streams are ready. + */ +int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx); + +/** + * Set the file path for the SDP. + * + * The SDP is written when either of the following is true: + * - this function is called at least once + * - sdp_auto=1 is passed to EVERY call of sch_add_mux() + */ +int sch_sdp_filename(Scheduler *sch, const char *sdp_filename); + +/** + * Add an encoder to the scheduler. + * + * @param open_cb This callback, if specified, will be called when the first + * frame is obtained for this encoder. For audio encoders with a + * fixed frame size (which use a sync queue in the scheduler to + * rechunk frames), it must return that frame size on success. + * Otherwise (non-audio, variable frame size) it should return 0. + * + * @retval ">=0" Index of the newly-created encoder. + * @retval "<0" Error code. + */ +int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *func_arg, + int (*open_cb)(void *func_arg, const AVFrame *frame)); + +/** + * Add an pre-encoding sync queue to the scheduler. + * + * @param buf_size_us Sync queue buffering size, passed to sq_alloc(). + * @param logctx Logging context for the sync queue. passed to sq_alloc(). + * + * @retval ">=0" Index of the newly-created sync queue. + * @retval "<0" Error code. + */ +int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx); +int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx, + int limiting, uint64_t max_frames); + +/** + * XXX: TODO + */ +int sch_add_mux_stream_bsf(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, + const char *bsf); + +int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst, + int (*hook)(void *opaque, void *item), void *opaque); + +/** + * Called by demuxing threads to send a demuxed packet or EOF to all its + * consumers. The stream is indentified by the packet's stream_index field. + * + * Every demuxer task must call this function with pkt=NULL exactly once, right + * before it exits - this call always succeeeds. + * + * @param demux_idx demuxer index + * @param pkt A demuxed packet to send or NULL to signal end of demuxing. + * If non-NULL, on success the packet is consumed and cleared + * by this function + * + * @retval "non-negative value" success + * @retval AVERROR_EOF all consumers for the stream are done + * @retval AVERROR_EXIT all consumers are done, should terminate demuxing + * @retval "anoter negative error code" other failure + */ +int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt); + +/** + * Called by decoders to receive a packet for decoding. + * + * @param dec_idx decoder index + * @param pkt Input packet will be written here on success. An empty packet + * signals that the decoder should be flushed, but more packets will + * follow (e.g. after seeking). + * + * @retval "non-negative value" success + * @retval AVERROR_EOF no more packets will arrive, should terminate decoding + * @retval "another negative error code" other failure + */ +int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt); + +/** + * Called by decoder tasks to send a decoded frame or EOF downstream. + * + * Every decoder task must call this function with frame=NULL exactly once, right + * before it exits - this call always succeeeds. + * + * @param dec_idx Decoder index previously returned by sch_add_dec(). + * @param frame Decoded frame or NULL to signal end of decoding. + * If non-NULL, on success the frame is consumed and cleared + * by this function + * + * @retval ">=0" success + * @retval AVERROR_EOF all consumers are done, should terminate decoding + * @retval "another negative error code" other failure + */ +int sch_dec_send(Scheduler *sch, unsigned dec_idx, AVFrame *frame); + +/** + * Called by filtergraph tasks to obtain frames for filtering. Will wait for a + * frame to become available and return it in frame. + * + * @param fg_idx Filtergraph index previously returned by sch_add_filtergraph(). + * @param[in,out] in_idx On input contains the index of the input on which a frame + * is most desired. On output contains input index of the + * actually returned frame or EOF. + */ +int sch_filter_receive(Scheduler *sch, unsigned fg_idx, + int *in_idx, AVFrame *frame); +/** + * Called by filtergraph tasks to send a filtered frame or EOF to consumers. + * + * Every filter task must call this function with out_idx<0 exactly once, + * right before it exits - that call always succeeds. + * + * @param fg_idx Filtergraph index previously returned by sch_add_filtergraph(). + * @param out_idx Index of the output which produced the frame. When negative, + * this call signals EOF on all streams, i.e. the filtering + * task is being terminated - in that case frame must be NULL. + * @param frame The frame to send to consumers. When NULL, signals that no more + * frames will be produced for the specified output (or all outputs + * when out_idx<0). When non-NULL, the frame is always consumed and + * cleared by this function. + * + * @retval "non-negative value" success + * @retval AVERROR_EOF all consumers are done + * @retval "anoter negative error code" other failure + */ +int sch_filter_send(Scheduler *sch, unsigned fg_idx, int out_idx, AVFrame *frame); + +/** + * Called by encoder tasks to obtain frames for encoding. Will wait for a frame + * to become available and return it in frame. + * + * @param enc_idx Encoder index previously returned by sch_add_enc(). + * @param frame Newly-received frame will be stored here. Must be clean on + * entrance to this function. + * + * @retval 0 A frame was successfully delivered into frame. + * @retval AVERROR_EOF No more frames will be delivered, the encoder should + * flush everything and terminate. + * + */ +int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame); +/** + * Called by encoder tasks to send encoded packets or EOF downstream. + * Will not block. + * + * Every encoder task must call this function with pkt=NULL exactly once, + * right before it exits - that call always succeeds. + * + * @param enc_idx Encoder index previously returned by sch_add_enc(). + * @param pkt An encoded packet or NULL to signal EOF. When non-NULL, the + * packet will be consumed and cleared by this function on + * success. + * + * @retval 0 success + * @retval "<0" Error code. + */ +int sch_enc_send (Scheduler *sch, unsigned enc_idx, AVPacket *pkt); + +/** + * Called by muxer tasks to obtain packets for muxing. Will wait for a packet + * for any muxed stream to become available and return it in pkt. + * + * @param mux_idx Muxer index previously returned by sch_add_mux(). + * @param pkt Newly-received packet will be stored here. Must be clean + * on entrance to this function. + * + * @retval 0 A packet was successfully delivered into pkt. Its stream_index + * corresponds to a stream index previously returned from + * sch_add_mux_stream(). + * @retval AVERROR_EOF When pkt->stream_index is non-negative, this signals that + * no more packets will be delivered for this stream index. + * Otherwise this indicates that no more packets will be + * delivered for any stream and the muxer should therefore + * flush everything and terminate. + */ +int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt); + +/** + * Called by muxer tasks to signal that a single stream or all streams will no + * longer accept input. + * + * Every muxer task must call this function with stream_idx=-1 exactly once, + * right before it exits. + * + * @param stream_idx A non-negative stream index to mark one stream as finished, + * -1 to mark all. + */ +void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, int stream_idx); + +#endif /* FFTOOLS_FFMPEG_SCHED_H */