From patchwork Thu Nov 9 11:45:07 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Anton Khirnov X-Patchwork-Id: 44582 Delivered-To: ffmpegpatchwork2@gmail.com Received: by 2002:a05:6a20:4fa4:b0:181:818d:5e7f with SMTP id gh36csp422991pzb; Thu, 9 Nov 2023 03:45:52 -0800 (PST) X-Google-Smtp-Source: AGHT+IG3gk4bekTWgd40F/g1rJcd1VpScBmioldpT8wC+NFfNkX6R2MDW77ISE0tYiRd2y+NQ4b7 X-Received: by 2002:a17:907:d8e:b0:9ae:6a08:6f53 with SMTP id go14-20020a1709070d8e00b009ae6a086f53mr4286561ejc.63.1699530352550; Thu, 09 Nov 2023 03:45:52 -0800 (PST) ARC-Seal: i=1; a=rsa-sha256; t=1699530352; cv=none; d=google.com; s=arc-20160816; b=0x+hNry7/TmFXAOwEDJlmjwVFqHI+8i0Sb9nqPJSabavMs38VdWXkoPIQTIydESwUP rUHzKafH9C+pRhh6ANMF5c0Lb8n8Eflvju0/HU2iQjPmfOkaETwwipmZSAc5HgVdROFD iVyOMyW8IcuMbjE9ILLlb5QUrUyv/1DrmKCyWFJsLWA4UZ0NvL29jXNtdllNltxwLQA4 KBXoukYlhCDGdc+s1lSyiDwXSyoYLGU7SmDb7Kq3QoH/oG935c83OuQ9cf0KOtVEA1gM 7A1zjbCuE0pgn9CE0K/vkF/IGJ1BCDy/DYrHeDNQCu/q1O596bRwAOFtfVA0A37Iemms cN1w== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:content-transfer-encoding:reply-to:list-subscribe :list-help:list-post:list-archive:list-unsubscribe:list-id :precedence:subject:mime-version:references:in-reply-to:message-id :date:to:from:delivered-to; bh=Q+vzOGYmuBnuNXQDZTSd2aRuZgmj7drLv5tjyQflXP8=; fh=YOA8vD9MJZuwZ71F/05pj6KdCjf6jQRmzLS+CATXUQk=; b=Qga+nECag6Hc/qzG7uVQH99DuQ/hasSAzQ7DJBCVfyn0JA7OEKyQa+HCCzY77QKYK4 Nr0NgMoV+s/9JsoWWzclNGHf7OKvsRXsmjsEZPtOOVHAf1Kc/H/TSPLgw5T/1/gqcyPy 7JrO4rFwclSOwI6csf+/VnEmabv31PqSycteLrqGGMuCELrmsmhw5T+h2KS6A8ZKlNPt QAypXQjvtmDpihe9rsZYlLucfyOX+iXs4rlatcE7MypPVpWl2xIQSSw2LcsNIYxWI9qJ 0aD5oShb+eTzqF4WW2EsH2+Humqbe71Jk7QAopp69kRt2ZeDbwKs6mdpWZgZOP2GbcuV INdQ== ARC-Authentication-Results: i=1; mx.google.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org. [79.124.17.100]) by mx.google.com with ESMTP id u5-20020a50c045000000b0054553c4addasi730669edd.476.2023.11.09.03.45.51; Thu, 09 Nov 2023 03:45:52 -0800 (PST) Received-SPF: pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) client-ip=79.124.17.100; Authentication-Results: mx.google.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 3E3AF68CBA6; Thu, 9 Nov 2023 13:45:48 +0200 (EET) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from mail1.khirnov.net (quelana.khirnov.net [94.230.150.81]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id C82A468CB83 for ; Thu, 9 Nov 2023 13:45:41 +0200 (EET) Received: from localhost (mail1.khirnov.net [IPv6:::1]) by mail1.khirnov.net (Postfix) with ESMTP id 5D78411B4 for ; Thu, 9 Nov 2023 12:45:41 +0100 (CET) Received: from mail1.khirnov.net ([IPv6:::1]) by localhost (mail1.khirnov.net [IPv6:::1]) (amavis, port 10024) with ESMTP id esS_80cL2t0p for ; Thu, 9 Nov 2023 12:45:40 +0100 (CET) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:2a00:c500:561:201::7]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256 client-signature RSA-PSS (2048 bits) client-digest SHA256) (Client CN "libav.khirnov.net", Issuer "smtp.khirnov.net SMTP CA" (verified OK)) by mail1.khirnov.net (Postfix) with ESMTPS id 08797929 for ; Thu, 9 Nov 2023 12:45:40 +0100 (CET) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:::1]) by libav.khirnov.net (Postfix) with ESMTP id D4EE23A05FF for ; Thu, 9 Nov 2023 12:45:39 +0100 (CET) From: Anton Khirnov To: ffmpeg-devel@ffmpeg.org Date: Thu, 9 Nov 2023 12:45:07 +0100 Message-ID: <20231109114536.18805-1-anton@khirnov.net> X-Mailer: git-send-email 2.42.0 In-Reply-To: <20231104195421.GJ3543730@pb2> References: <20231104195421.GJ3543730@pb2> MIME-Version: 1.0 Subject: [FFmpeg-devel] [PATCH v2 10/24] fftools/ffmpeg_filter: move filtering to a separate thread X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" X-TUID: WMav7MD/z592 As previously for decoding, this is merely "scaffolding" for moving to a fully threaded architecture and does not yet make filtering truly parallel - the main thread will currently wait for the filtering thread to finish its work before continuing. That will change in future commits after encoders are also moved to threads and a thread-aware scheduler is added. --- Thanks for the report, fixed. --- fftools/ffmpeg.h | 9 +- fftools/ffmpeg_dec.c | 39 +- fftools/ffmpeg_filter.c | 825 ++++++++++++++++++++++++++++++++++------ 3 files changed, 730 insertions(+), 143 deletions(-) diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index 41935d39d5..9852df8320 100644 --- a/fftools/ffmpeg.h +++ b/fftools/ffmpeg.h @@ -80,6 +80,14 @@ enum HWAccelID { HWACCEL_GENERIC, }; +enum FrameOpaque { + FRAME_OPAQUE_REAP_FILTERS = 1, + FRAME_OPAQUE_CHOOSE_INPUT, + FRAME_OPAQUE_SUB_HEARTBEAT, + FRAME_OPAQUE_EOF, + FRAME_OPAQUE_SEND_COMMAND, +}; + typedef struct HWDevice { const char *name; enum AVHWDeviceType type; @@ -728,7 +736,6 @@ FrameData *frame_data(AVFrame *frame); int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference); int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb); -int ifilter_sub2video(InputFilter *ifilter, const AVFrame *frame); void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb); /** diff --git a/fftools/ffmpeg_dec.c b/fftools/ffmpeg_dec.c index 517d6b3ced..b60bad1220 100644 --- a/fftools/ffmpeg_dec.c +++ b/fftools/ffmpeg_dec.c @@ -147,11 +147,12 @@ fail: static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame) { - int i, ret; + int i, ret = 0; - av_assert1(ist->nb_filters > 0); /* ensure ret is initialized */ for (i = 0; i < ist->nb_filters; i++) { - ret = ifilter_send_frame(ist->filters[i], decoded_frame, i < ist->nb_filters - 1); + ret = ifilter_send_frame(ist->filters[i], decoded_frame, + i < ist->nb_filters - 1 || + ist->dec->type == AVMEDIA_TYPE_SUBTITLE); if (ret == AVERROR_EOF) ret = 0; /* ignore */ if (ret < 0) { @@ -380,15 +381,6 @@ static int video_frame_process(InputStream *ist, AVFrame *frame) return 0; } -static void sub2video_flush(InputStream *ist) -{ - for (int i = 0; i < ist->nb_filters; i++) { - int ret = ifilter_sub2video(ist->filters[i], NULL); - if (ret != AVERROR_EOF && ret < 0) - av_log(NULL, AV_LOG_WARNING, "Flush the frame error.\n"); - } -} - static int process_subtitle(InputStream *ist, AVFrame *frame) { Decoder *d = ist->decoder; @@ -426,14 +418,9 @@ static int process_subtitle(InputStream *ist, AVFrame *frame) if (!subtitle) return 0; - for (int i = 0; i < ist->nb_filters; i++) { - ret = ifilter_sub2video(ist->filters[i], frame); - if (ret < 0) { - av_log(ist, AV_LOG_ERROR, "Error sending a subtitle for filtering: %s\n", - av_err2str(ret)); - return ret; - } - } + ret = send_frame_to_filters(ist, frame); + if (ret < 0) + return ret; subtitle = (AVSubtitle*)frame->buf[0]->data; if (!subtitle->num_rects) @@ -824,14 +811,10 @@ finish: return ret; // signal EOF to our downstreams - if (ist->dec->type == AVMEDIA_TYPE_SUBTITLE) - sub2video_flush(ist); - else { - ret = send_filter_eof(ist); - if (ret < 0) { - av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n"); - return ret; - } + ret = send_filter_eof(ist); + if (ret < 0) { + av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n"); + return ret; } return AVERROR_EOF; diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c index f0b8e01f5c..9030cc656e 100644 --- a/fftools/ffmpeg_filter.c +++ b/fftools/ffmpeg_filter.c @@ -21,6 +21,8 @@ #include #include "ffmpeg.h" +#include "ffmpeg_utils.h" +#include "thread_queue.h" #include "libavfilter/avfilter.h" #include "libavfilter/buffersink.h" @@ -53,12 +55,50 @@ typedef struct FilterGraphPriv { int is_meta; int disable_conversions; + int nb_inputs_bound; + int nb_outputs_bound; + const char *graph_desc; // frame for temporarily holding output from the filtergraph AVFrame *frame; // frame for sending output to the encoder AVFrame *frame_enc; + + pthread_t thread; + /** + * Queue for sending frames from the main thread to the filtergraph. Has + * nb_inputs+1 streams - the first nb_inputs stream correspond to + * filtergraph inputs. Frames on those streams may have their opaque set to + * - FRAME_OPAQUE_EOF: frame contains no data, but pts+timebase of the + * EOF event for the correspondint stream. Will be immediately followed by + * this stream being send-closed. + * - FRAME_OPAQUE_SUB_HEARTBEAT: frame contains no data, but pts+timebase of + * a subtitle heartbeat event. Will only be sent for sub2video streams. + * + * The last stream is "control" - the main thread sends empty AVFrames with + * opaque set to + * - FRAME_OPAQUE_REAP_FILTERS: a request to retrieve all frame available + * from filtergraph outputs. These frames are sent to corresponding + * streams in queue_out. Finally an empty frame is sent to the control + * stream in queue_out. + * - FRAME_OPAQUE_CHOOSE_INPUT: same as above, but in case no frames are + * available the terminating empty frame's opaque will contain the index+1 + * of the filtergraph input to which more input frames should be supplied. + */ + ThreadQueue *queue_in; + /** + * Queue for sending frames from the filtergraph back to the main thread. + * Has nb_outputs+1 streams - the first nb_outputs stream correspond to + * filtergraph outputs. + * + * The last stream is "control" - see documentation for queue_in for more + * details. + */ + ThreadQueue *queue_out; + // submitting frames to filter thread returned EOF + // this only happens on thread exit, so is not per-input + int eof_in; } FilterGraphPriv; static FilterGraphPriv *fgp_from_fg(FilterGraph *fg) @@ -71,6 +111,22 @@ static const FilterGraphPriv *cfgp_from_cfg(const FilterGraph *fg) return (const FilterGraphPriv*)fg; } +// data that is local to the filter thread and not visible outside of it +typedef struct FilterGraphThread { + AVFrame *frame; + + // Temporary buffer for output frames, since on filtergraph reset + // we cannot send them to encoders immediately. + // The output index is stored in frame opaque. + AVFifo *frame_queue_out; + + int got_frame; + + // EOF status of each input/output, as received by the thread + uint8_t *eof_in; + uint8_t *eof_out; +} FilterGraphThread; + typedef struct InputFilterPriv { InputFilter ifilter; @@ -204,7 +260,25 @@ static OutputFilterPriv *ofp_from_ofilter(OutputFilter *ofilter) return (OutputFilterPriv*)ofilter; } -static int configure_filtergraph(FilterGraph *fg); +typedef struct FilterCommand { + char *target; + char *command; + char *arg; + + double time; + int all_filters; +} FilterCommand; + +static void filter_command_free(void *opaque, uint8_t *data) +{ + FilterCommand *fc = (FilterCommand*)data; + + av_freep(&fc->target); + av_freep(&fc->command); + av_freep(&fc->arg); + + av_free(data); +} static int sub2video_get_blank_frame(InputFilterPriv *ifp) { @@ -574,6 +648,59 @@ static int ifilter_has_all_input_formats(FilterGraph *fg) return 1; } +static void *filter_thread(void *arg); + +// start the filtering thread once all inputs and outputs are bound +static int fg_thread_try_start(FilterGraphPriv *fgp) +{ + FilterGraph *fg = &fgp->fg; + ObjPool *op; + int ret = 0; + + if (fgp->nb_inputs_bound < fg->nb_inputs || + fgp->nb_outputs_bound < fg->nb_outputs) + return 0; + + op = objpool_alloc_frames(); + if (!op) + return AVERROR(ENOMEM); + + fgp->queue_in = tq_alloc(fg->nb_inputs + 1, 1, op, frame_move); + if (!fgp->queue_in) { + objpool_free(&op); + return AVERROR(ENOMEM); + } + + // at least one output is mandatory + op = objpool_alloc_frames(); + if (!op) + goto fail; + + fgp->queue_out = tq_alloc(fg->nb_outputs + 1, 1, op, frame_move); + if (!fgp->queue_out) { + objpool_free(&op); + goto fail; + } + + ret = pthread_create(&fgp->thread, NULL, filter_thread, fgp); + if (ret) { + ret = AVERROR(ret); + av_log(NULL, AV_LOG_ERROR, "pthread_create() for filtergraph %d failed: %s\n", + fg->index, av_err2str(ret)); + goto fail; + } + + return 0; +fail: + if (ret >= 0) + ret = AVERROR(ENOMEM); + + tq_free(&fgp->queue_in); + tq_free(&fgp->queue_out); + + return ret; +} + static char *describe_filter_link(FilterGraph *fg, AVFilterInOut *inout, int in) { AVFilterContext *ctx = inout->filter_ctx; @@ -607,6 +734,7 @@ static OutputFilter *ofilter_alloc(FilterGraph *fg) static int ifilter_bind_ist(InputFilter *ifilter, InputStream *ist) { InputFilterPriv *ifp = ifp_from_ifilter(ifilter); + FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph); int ret; av_assert0(!ifp->ist); @@ -624,7 +752,10 @@ static int ifilter_bind_ist(InputFilter *ifilter, InputStream *ist) return AVERROR(ENOMEM); } - return 0; + fgp->nb_inputs_bound++; + av_assert0(fgp->nb_inputs_bound <= ifilter->graph->nb_inputs); + + return fg_thread_try_start(fgp); } static int set_channel_layout(OutputFilterPriv *f, OutputStream *ost) @@ -756,24 +887,10 @@ int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost) break; } - // if we have all input parameters and all outputs are bound, - // the graph can now be configured - if (ifilter_has_all_input_formats(fg)) { - int ret; + fgp->nb_outputs_bound++; + av_assert0(fgp->nb_outputs_bound <= fg->nb_outputs); - for (int i = 0; i < fg->nb_outputs; i++) - if (!fg->outputs[i]->ost) - return 0; - - ret = configure_filtergraph(fg); - if (ret < 0) { - av_log(fg, AV_LOG_ERROR, "Error configuring filter graph: %s\n", - av_err2str(ret)); - return ret; - } - } - - return 0; + return fg_thread_try_start(fgp); } static InputFilter *ifilter_alloc(FilterGraph *fg) @@ -803,6 +920,34 @@ static InputFilter *ifilter_alloc(FilterGraph *fg) return ifilter; } +static int fg_thread_stop(FilterGraphPriv *fgp) +{ + void *ret; + + if (!fgp->queue_in) + return 0; + + for (int i = 0; i <= fgp->fg.nb_inputs; i++) { + InputFilterPriv *ifp = i < fgp->fg.nb_inputs ? + ifp_from_ifilter(fgp->fg.inputs[i]) : NULL; + + if (ifp) + ifp->eof = 1; + + tq_send_finish(fgp->queue_in, i); + } + + for (int i = 0; i <= fgp->fg.nb_outputs; i++) + tq_receive_finish(fgp->queue_out, i); + + pthread_join(fgp->thread, &ret); + + tq_free(&fgp->queue_in); + tq_free(&fgp->queue_out); + + return (int)(intptr_t)ret; +} + void fg_free(FilterGraph **pfg) { FilterGraph *fg = *pfg; @@ -812,6 +957,8 @@ void fg_free(FilterGraph **pfg) return; fgp = fgp_from_fg(fg); + fg_thread_stop(fgp); + avfilter_graph_free(&fg->graph); for (int j = 0; j < fg->nb_inputs; j++) { InputFilter *ifilter = fg->inputs[j]; @@ -1622,7 +1769,7 @@ static int graph_is_meta(AVFilterGraph *graph) return 1; } -static int configure_filtergraph(FilterGraph *fg) +static int configure_filtergraph(FilterGraph *fg, const FilterGraphThread *fgt) { FilterGraphPriv *fgp = fgp_from_fg(fg); AVBufferRef *hw_device; @@ -1746,7 +1893,7 @@ static int configure_filtergraph(FilterGraph *fg) /* send the EOFs for the finished inputs */ for (i = 0; i < fg->nb_inputs; i++) { InputFilterPriv *ifp = ifp_from_ifilter(fg->inputs[i]); - if (ifp->eof) { + if (fgt->eof_in[i]) { ret = av_buffersrc_add_frame(ifp->filter, NULL); if (ret < 0) goto fail; @@ -1829,8 +1976,8 @@ int filtergraph_is_simple(const FilterGraph *fg) return fgp->is_simple; } -void fg_send_command(FilterGraph *fg, double time, const char *target, - const char *command, const char *arg, int all_filters) +static void send_command(FilterGraph *fg, double time, const char *target, + const char *command, const char *arg, int all_filters) { int ret; @@ -1853,6 +2000,29 @@ void fg_send_command(FilterGraph *fg, double time, const char *target, } } +static int choose_input(const FilterGraph *fg, const FilterGraphThread *fgt) +{ + int nb_requests, nb_requests_max = 0; + int best_input = -1; + + for (int i = 0; i < fg->nb_inputs; i++) { + InputFilter *ifilter = fg->inputs[i]; + InputFilterPriv *ifp = ifp_from_ifilter(ifilter); + InputStream *ist = ifp->ist; + + if (input_files[ist->file_index]->eagain || fgt->eof_in[i]) + continue; + + nb_requests = av_buffersrc_get_nb_failed_requests(ifp->filter); + if (nb_requests > nb_requests_max) { + nb_requests_max = nb_requests; + best_input = i; + } + } + + return best_input; +} + static int choose_out_timebase(OutputFilterPriv *ofp, AVFrame *frame) { OutputFilter *ofilter = &ofp->ofilter; @@ -2088,16 +2258,16 @@ finish: fps->dropped_keyframe |= fps->last_dropped && (frame->flags & AV_FRAME_FLAG_KEY); } -static int fg_output_frame(OutputFilterPriv *ofp, AVFrame *frame) +static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt, + AVFrame *frame, int buffer) { FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph); - OutputStream *ost = ofp->ofilter.ost; AVFrame *frame_prev = ofp->fps.last_frame; enum AVMediaType type = ofp->ofilter.type; - int64_t nb_frames = 1, nb_frames_prev = 0; + int64_t nb_frames = !!frame, nb_frames_prev = 0; - if (type == AVMEDIA_TYPE_VIDEO) + if (type == AVMEDIA_TYPE_VIDEO && (frame || fgt->got_frame)) video_sync_process(ofp, frame, &nb_frames, &nb_frames_prev); for (int64_t i = 0; i < nb_frames; i++) { @@ -2136,10 +2306,31 @@ static int fg_output_frame(OutputFilterPriv *ofp, AVFrame *frame) frame_out = frame; } - ret = enc_frame(ost, frame_out); - av_frame_unref(frame_out); - if (ret < 0) - return ret; + if (buffer) { + AVFrame *f = av_frame_alloc(); + + if (!f) { + av_frame_unref(frame_out); + return AVERROR(ENOMEM); + } + + av_frame_move_ref(f, frame_out); + f->opaque = (void*)(intptr_t)ofp->index; + + ret = av_fifo_write(fgt->frame_queue_out, &f, 1); + if (ret < 0) { + av_frame_free(&f); + return AVERROR(ENOMEM); + } + } else { + // return the frame to the main thread + ret = tq_send(fgp->queue_out, ofp->index, frame_out); + if (ret < 0) { + av_frame_unref(frame_out); + fgt->eof_out[ofp->index] = 1; + return ret == AVERROR_EOF ? 0 : ret; + } + } if (type == AVMEDIA_TYPE_VIDEO) { ofp->fps.frame_number++; @@ -2149,7 +2340,7 @@ static int fg_output_frame(OutputFilterPriv *ofp, AVFrame *frame) frame->flags &= ~AV_FRAME_FLAG_KEY; } - ofp->got_frame = 1; + fgt->got_frame = 1; } if (frame && frame_prev) { @@ -2157,23 +2348,27 @@ static int fg_output_frame(OutputFilterPriv *ofp, AVFrame *frame) av_frame_move_ref(frame_prev, frame); } + if (!frame) { + tq_send_finish(fgp->queue_out, ofp->index); + fgt->eof_out[ofp->index] = 1; + } + return 0; } -static int fg_output_step(OutputFilterPriv *ofp, int flush) +static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt, + AVFrame *frame, int buffer) { FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph); OutputStream *ost = ofp->ofilter.ost; - AVFrame *frame = fgp->frame; AVFilterContext *filter = ofp->filter; FrameData *fd; int ret; ret = av_buffersink_get_frame_flags(filter, frame, AV_BUFFERSINK_FLAG_NO_REQUEST); - if (flush && ret == AVERROR_EOF && ofp->got_frame && - ost->type == AVMEDIA_TYPE_VIDEO) { - ret = fg_output_frame(ofp, NULL); + if (ret == AVERROR_EOF && !buffer && !fgt->eof_out[ofp->index]) { + ret = fg_output_frame(ofp, fgt, NULL, buffer); return (ret < 0) ? ret : 1; } else if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) { return 1; @@ -2183,22 +2378,18 @@ static int fg_output_step(OutputFilterPriv *ofp, int flush) av_err2str(ret)); return ret; } - if (ost->finished) { + + if (fgt->eof_out[ofp->index]) { av_frame_unref(frame); return 0; } frame->time_base = av_buffersink_get_time_base(filter); - if (frame->pts != AV_NOPTS_VALUE) { - ost->filter->last_pts = av_rescale_q(frame->pts, frame->time_base, - AV_TIME_BASE_Q); - - if (debug_ts) - av_log(fgp, AV_LOG_INFO, "filter_raw -> pts:%s pts_time:%s time_base:%d/%d\n", - av_ts2str(frame->pts), av_ts2timestr(frame->pts, &frame->time_base), - frame->time_base.num, frame->time_base.den); - } + if (debug_ts) + av_log(fgp, AV_LOG_INFO, "filter_raw -> pts:%s pts_time:%s time_base:%d/%d\n", + av_ts2str(frame->pts), av_ts2timestr(frame->pts, &frame->time_base), + frame->time_base.num, frame->time_base.den); // Choose the output timebase the first time we get a frame. if (!ofp->tb_out_locked) { @@ -2231,7 +2422,7 @@ static int fg_output_step(OutputFilterPriv *ofp, int flush) fd->frame_rate_filter = ofp->fps.framerate; } - ret = fg_output_frame(ofp, frame); + ret = fg_output_frame(ofp, fgt, frame, buffer); av_frame_unref(frame); if (ret < 0) return ret; @@ -2239,18 +2430,38 @@ static int fg_output_step(OutputFilterPriv *ofp, int flush) return 0; } -int reap_filters(FilterGraph *fg, int flush) +/* retrieve all frames available at filtergraph outputs and either send them to + * the main thread (buffer=0) or buffer them for later (buffer=1) */ +static int read_frames(FilterGraph *fg, FilterGraphThread *fgt, + AVFrame *frame, int buffer) { + FilterGraphPriv *fgp = fgp_from_fg(fg); + int ret = 0; + if (!fg->graph) return 0; + // process buffered frames + if (!buffer) { + AVFrame *f; + + while (av_fifo_read(fgt->frame_queue_out, &f, 1) >= 0) { + int out_idx = (intptr_t)f->opaque; + f->opaque = NULL; + ret = tq_send(fgp->queue_out, out_idx, f); + av_frame_free(&f); + if (ret < 0 && ret != AVERROR_EOF) + return ret; + } + } + /* Reap all buffers present in the buffer sinks */ for (int i = 0; i < fg->nb_outputs; i++) { OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]); int ret = 0; while (!ret) { - ret = fg_output_step(ofp, flush); + ret = fg_output_step(ofp, fgt, frame, buffer); if (ret < 0) return ret; } @@ -2259,7 +2470,7 @@ int reap_filters(FilterGraph *fg, int flush) return 0; } -void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb) +static void sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb) { InputFilterPriv *ifp = ifp_from_ifilter(ifilter); int64_t pts2; @@ -2282,11 +2493,17 @@ void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational t sub2video_update(ifp, pts2 + 1, NULL); } -int ifilter_sub2video(InputFilter *ifilter, const AVFrame *frame) +static int sub2video_frame(InputFilter *ifilter, const AVFrame *frame) { InputFilterPriv *ifp = ifp_from_ifilter(ifilter); int ret; + // heartbeat frame + if (frame && !frame->buf[0]) { + sub2video_heartbeat(ifilter, frame->pts, frame->time_base); + return 0; + } + if (ifilter->graph->graph) { if (!frame) { if (ifp->sub2video.end_pts < INT64_MAX) @@ -2315,12 +2532,13 @@ int ifilter_sub2video(InputFilter *ifilter, const AVFrame *frame) return 0; } -int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb) +static int send_eof(FilterGraphThread *fgt, InputFilter *ifilter, + int64_t pts, AVRational tb) { InputFilterPriv *ifp = ifp_from_ifilter(ifilter); int ret; - ifp->eof = 1; + fgt->eof_in[ifp->index] = 1; if (ifp->filter) { pts = av_rescale_q_rnd(pts, tb, ifp->time_base, @@ -2344,7 +2562,7 @@ int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb) return ret; if (ifilter_has_all_input_formats(ifilter->graph)) { - ret = configure_filtergraph(ifilter->graph); + ret = configure_filtergraph(ifilter->graph, fgt); if (ret < 0) { av_log(NULL, AV_LOG_ERROR, "Error initializing filters!\n"); return ret; @@ -2363,10 +2581,10 @@ int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb) return 0; } -int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference) +static int send_frame(FilterGraph *fg, FilterGraphThread *fgt, + InputFilter *ifilter, AVFrame *frame) { InputFilterPriv *ifp = ifp_from_ifilter(ifilter); - FilterGraph *fg = ifilter->graph; AVFrameSideData *sd; int need_reinit, ret; @@ -2406,10 +2624,13 @@ int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference) /* (re)init the graph if possible, otherwise buffer the frame and return */ if (need_reinit || !fg->graph) { + AVFrame *tmp = av_frame_alloc(); + + if (!tmp) + return AVERROR(ENOMEM); + if (!ifilter_has_all_input_formats(fg)) { - AVFrame *tmp = av_frame_clone(frame); - if (!tmp) - return AVERROR(ENOMEM); + av_frame_move_ref(tmp, frame); ret = av_fifo_write(ifp->frame_queue, &tmp, 1); if (ret < 0) @@ -2418,27 +2639,18 @@ int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference) return ret; } - ret = reap_filters(fg, 0); - if (ret < 0 && ret != AVERROR_EOF) { - av_log(fg, AV_LOG_ERROR, "Error while filtering: %s\n", av_err2str(ret)); + ret = fg->graph ? read_frames(fg, fgt, tmp, 1) : 0; + av_frame_free(&tmp); + if (ret < 0) return ret; - } - ret = configure_filtergraph(fg); + ret = configure_filtergraph(fg, fgt); if (ret < 0) { av_log(fg, AV_LOG_ERROR, "Error reinitializing filters!\n"); return ret; } } - if (keep_reference) { - ret = av_frame_ref(ifp->frame, frame); - if (ret < 0) - return ret; - } else - av_frame_move_ref(ifp->frame, frame); - frame = ifp->frame; - frame->pts = av_rescale_q(frame->pts, frame->time_base, ifp->time_base); frame->duration = av_rescale_q(frame->duration, frame->time_base, ifp->time_base); frame->time_base = ifp->time_base; @@ -2460,20 +2672,30 @@ int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference) return 0; } -int fg_transcode_step(FilterGraph *graph, InputStream **best_ist) +static int msg_process(FilterGraphPriv *fgp, FilterGraphThread *fgt, + AVFrame *frame) { - FilterGraphPriv *fgp = fgp_from_fg(graph); - int i, ret; - int nb_requests, nb_requests_max = 0; - InputStream *ist; + const enum FrameOpaque msg = (intptr_t)frame->opaque; + FilterGraph *fg = &fgp->fg; + int graph_eof = 0; + int ret; - if (!graph->graph) { - for (int i = 0; i < graph->nb_inputs; i++) { - InputFilter *ifilter = graph->inputs[i]; + frame->opaque = NULL; + av_assert0(msg > 0); + av_assert0(msg == FRAME_OPAQUE_SEND_COMMAND || !frame->buf[0]); + + if (!fg->graph) { + // graph not configured yet, ignore all messages other than choosing + // the input to read from + if (msg != FRAME_OPAQUE_CHOOSE_INPUT) + goto done; + + for (int i = 0; i < fg->nb_inputs; i++) { + InputFilter *ifilter = fg->inputs[i]; InputFilterPriv *ifp = ifp_from_ifilter(ifilter); - if (ifp->format < 0 && !ifp->eof) { - *best_ist = ifp->ist; - return 0; + if (ifp->format < 0 && !fgt->eof_in[i]) { + frame->opaque = (void*)(intptr_t)(i + 1); + goto done; } } @@ -2484,16 +2706,310 @@ int fg_transcode_step(FilterGraph *graph, InputStream **best_ist) return AVERROR_BUG; } - *best_ist = NULL; - ret = avfilter_graph_request_oldest(graph->graph); - if (ret >= 0) - return reap_filters(graph, 0); + if (msg == FRAME_OPAQUE_SEND_COMMAND) { + FilterCommand *fc = (FilterCommand*)frame->buf[0]->data; + send_command(fg, fc->time, fc->target, fc->command, fc->arg, fc->all_filters); + av_frame_unref(frame); + goto done; + } - if (ret == AVERROR_EOF) { - reap_filters(graph, 1); - for (int i = 0; i < graph->nb_outputs; i++) { - OutputFilter *ofilter = graph->outputs[i]; - OutputFilterPriv *ofp = ofp_from_ofilter(ofilter); + if (msg == FRAME_OPAQUE_CHOOSE_INPUT) { + ret = avfilter_graph_request_oldest(fg->graph); + + graph_eof = ret == AVERROR_EOF; + + if (ret == AVERROR(EAGAIN)) { + frame->opaque = (void*)(intptr_t)(choose_input(fg, fgt) + 1); + goto done; + } else if (ret < 0 && !graph_eof) + return ret; + } + + ret = read_frames(fg, fgt, frame, 0); + if (ret < 0) { + av_log(fg, AV_LOG_ERROR, "Error sending filtered frames for encoding\n"); + return ret; + } + + if (graph_eof) + return AVERROR_EOF; + + // signal to the main thread that we are done processing the message +done: + ret = tq_send(fgp->queue_out, fg->nb_outputs, frame); + if (ret < 0) { + if (ret != AVERROR_EOF) + av_log(fg, AV_LOG_ERROR, "Error communicating with the main thread\n"); + return ret; + } + + return 0; +} + +static void fg_thread_set_name(const FilterGraph *fg) +{ + char name[16]; + if (filtergraph_is_simple(fg)) { + OutputStream *ost = fg->outputs[0]->ost; + snprintf(name, sizeof(name), "%cf#%d:%d", + av_get_media_type_string(ost->type)[0], + ost->file_index, ost->index); + } else { + snprintf(name, sizeof(name), "fc%d", fg->index); + } + + ff_thread_setname(name); +} + +static void fg_thread_uninit(FilterGraphThread *fgt) +{ + if (fgt->frame_queue_out) { + AVFrame *frame; + while (av_fifo_read(fgt->frame_queue_out, &frame, 1) >= 0) + av_frame_free(&frame); + av_fifo_freep2(&fgt->frame_queue_out); + } + + av_frame_free(&fgt->frame); + av_freep(&fgt->eof_in); + av_freep(&fgt->eof_out); + + memset(fgt, 0, sizeof(*fgt)); +} + +static int fg_thread_init(FilterGraphThread *fgt, const FilterGraph *fg) +{ + memset(fgt, 0, sizeof(*fgt)); + + fgt->frame = av_frame_alloc(); + if (!fgt->frame) + goto fail; + + fgt->eof_in = av_calloc(fg->nb_inputs, sizeof(*fgt->eof_in)); + if (!fgt->eof_in) + goto fail; + + fgt->eof_out = av_calloc(fg->nb_outputs, sizeof(*fgt->eof_out)); + if (!fgt->eof_out) + goto fail; + + fgt->frame_queue_out = av_fifo_alloc2(1, sizeof(AVFrame*), AV_FIFO_FLAG_AUTO_GROW); + if (!fgt->frame_queue_out) + goto fail; + + return 0; + +fail: + fg_thread_uninit(fgt); + return AVERROR(ENOMEM); +} + +static void *filter_thread(void *arg) +{ + FilterGraphPriv *fgp = arg; + FilterGraph *fg = &fgp->fg; + + FilterGraphThread fgt; + int ret = 0, input_status = 0; + + ret = fg_thread_init(&fgt, fg); + if (ret < 0) + goto finish; + + fg_thread_set_name(fg); + + // if we have all input parameters the graph can now be configured + if (ifilter_has_all_input_formats(fg)) { + ret = configure_filtergraph(fg, &fgt); + if (ret < 0) { + av_log(fg, AV_LOG_ERROR, "Error configuring filter graph: %s\n", + av_err2str(ret)); + goto finish; + } + } + + while (1) { + InputFilter *ifilter; + InputFilterPriv *ifp; + enum FrameOpaque o; + int input_idx, eof_frame; + + input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame); + if (input_idx < 0 || + (input_idx == fg->nb_inputs && input_status < 0)) { + av_log(fg, AV_LOG_VERBOSE, "Filtering thread received EOF\n"); + break; + } + + o = (intptr_t)fgt.frame->opaque; + + // message on the control stream + if (input_idx == fg->nb_inputs) { + ret = msg_process(fgp, &fgt, fgt.frame); + if (ret < 0) + goto finish; + + continue; + } + + // we received an input frame or EOF + ifilter = fg->inputs[input_idx]; + ifp = ifp_from_ifilter(ifilter); + eof_frame = input_status >= 0 && o == FRAME_OPAQUE_EOF; + if (ifp->type_src == AVMEDIA_TYPE_SUBTITLE) { + int hb_frame = input_status >= 0 && o == FRAME_OPAQUE_SUB_HEARTBEAT; + ret = sub2video_frame(ifilter, (fgt.frame->buf[0] || hb_frame) ? fgt.frame : NULL); + } else if (input_status >= 0 && fgt.frame->buf[0]) { + ret = send_frame(fg, &fgt, ifilter, fgt.frame); + } else { + int64_t pts = input_status >= 0 ? fgt.frame->pts : AV_NOPTS_VALUE; + AVRational tb = input_status >= 0 ? fgt.frame->time_base : (AVRational){ 1, 1 }; + ret = send_eof(&fgt, ifilter, pts, tb); + } + av_frame_unref(fgt.frame); + if (ret < 0) + break; + + if (eof_frame) { + // an EOF frame is immediately followed by sender closing + // the corresponding stream, so retrieve that event + input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame); + av_assert0(input_status == AVERROR_EOF && input_idx == ifp->index); + } + + // signal to the main thread that we are done + ret = tq_send(fgp->queue_out, fg->nb_outputs, fgt.frame); + if (ret < 0) { + if (ret == AVERROR_EOF) + break; + + av_log(fg, AV_LOG_ERROR, "Error communicating with the main thread\n"); + goto finish; + } + } + +finish: + // EOF is normal termination + if (ret == AVERROR_EOF) + ret = 0; + + for (int i = 0; i <= fg->nb_inputs; i++) + tq_receive_finish(fgp->queue_in, i); + for (int i = 0; i <= fg->nb_outputs; i++) + tq_send_finish(fgp->queue_out, i); + + fg_thread_uninit(&fgt); + + av_log(fg, AV_LOG_VERBOSE, "Terminating filtering thread\n"); + + return (void*)(intptr_t)ret; +} + +static int thread_send_frame(FilterGraphPriv *fgp, InputFilter *ifilter, + AVFrame *frame, enum FrameOpaque type) +{ + InputFilterPriv *ifp = ifp_from_ifilter(ifilter); + int output_idx, ret; + + if (ifp->eof) { + av_frame_unref(frame); + return AVERROR_EOF; + } + + frame->opaque = (void*)(intptr_t)type; + + ret = tq_send(fgp->queue_in, ifp->index, frame); + if (ret < 0) { + ifp->eof = 1; + av_frame_unref(frame); + return ret; + } + + if (type == FRAME_OPAQUE_EOF) + tq_send_finish(fgp->queue_in, ifp->index); + + // wait for the frame to be processed + ret = tq_receive(fgp->queue_out, &output_idx, frame); + av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF); + + return ret; +} + +int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference) +{ + FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph); + int ret; + + if (keep_reference) { + ret = av_frame_ref(fgp->frame, frame); + if (ret < 0) + return ret; + } else + av_frame_move_ref(fgp->frame, frame); + + return thread_send_frame(fgp, ifilter, fgp->frame, 0); +} + +int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb) +{ + FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph); + int ret; + + fgp->frame->pts = pts; + fgp->frame->time_base = tb; + + ret = thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_EOF); + + return ret == AVERROR_EOF ? 0 : ret; +} + +void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb) +{ + FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph); + + fgp->frame->pts = pts; + fgp->frame->time_base = tb; + + thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_SUB_HEARTBEAT); +} + +int fg_transcode_step(FilterGraph *graph, InputStream **best_ist) +{ + FilterGraphPriv *fgp = fgp_from_fg(graph); + int ret, got_frames = 0; + + if (fgp->eof_in) + return AVERROR_EOF; + + // signal to the filtering thread to return all frames it can + av_assert0(!fgp->frame->buf[0]); + fgp->frame->opaque = (void*)(intptr_t)(best_ist ? + FRAME_OPAQUE_CHOOSE_INPUT : + FRAME_OPAQUE_REAP_FILTERS); + + ret = tq_send(fgp->queue_in, graph->nb_inputs, fgp->frame); + if (ret < 0) { + fgp->eof_in = 1; + goto finish; + } + + while (1) { + OutputFilter *ofilter; + OutputFilterPriv *ofp; + OutputStream *ost; + int output_idx; + + ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame); + + // EOF on the whole queue or the control stream + if (output_idx < 0 || + (ret < 0 && output_idx == graph->nb_outputs)) + goto finish; + + // EOF for a specific stream + if (ret < 0) { + ofilter = graph->outputs[output_idx]; + ofp = ofp_from_ofilter(ofilter); // we are finished and no frames were ever seen at this output, // at least initialize the encoder with a dummy frame @@ -2531,30 +3047,111 @@ int fg_transcode_step(FilterGraph *graph, InputStream **best_ist) av_frame_unref(frame); } - close_output_stream(ofilter->ost); - } - return 0; - } - if (ret != AVERROR(EAGAIN)) - return ret; - - for (i = 0; i < graph->nb_inputs; i++) { - InputFilter *ifilter = graph->inputs[i]; - InputFilterPriv *ifp = ifp_from_ifilter(ifilter); - - ist = ifp->ist; - if (input_files[ist->file_index]->eagain || ifp->eof) + close_output_stream(graph->outputs[output_idx]->ost); continue; - nb_requests = av_buffersrc_get_nb_failed_requests(ifp->filter); - if (nb_requests > nb_requests_max) { - nb_requests_max = nb_requests; - *best_ist = ist; } + + // request was fully processed by the filtering thread, + // return the input stream to read from, if needed + if (output_idx == graph->nb_outputs) { + int input_idx = (intptr_t)fgp->frame->opaque - 1; + av_assert0(input_idx <= graph->nb_inputs); + + if (best_ist) { + *best_ist = (input_idx >= 0 && input_idx < graph->nb_inputs) ? + ifp_from_ifilter(graph->inputs[input_idx])->ist : NULL; + + if (input_idx < 0 && !got_frames) { + for (int i = 0; i < graph->nb_outputs; i++) + graph->outputs[i]->ost->unavailable = 1; + } + } + break; + } + + // got a frame from the filtering thread, send it for encoding + ofilter = graph->outputs[output_idx]; + ost = ofilter->ost; + ofp = ofp_from_ofilter(ofilter); + + if (ost->finished) { + av_frame_unref(fgp->frame); + tq_receive_finish(fgp->queue_out, output_idx); + continue; + } + + if (fgp->frame->pts != AV_NOPTS_VALUE) { + ofilter->last_pts = av_rescale_q(fgp->frame->pts, + fgp->frame->time_base, + AV_TIME_BASE_Q); + } + + ret = enc_frame(ost, fgp->frame); + av_frame_unref(fgp->frame); + if (ret < 0) + goto finish; + + ofp->got_frame = 1; + got_frames = 1; } - if (!*best_ist) - for (i = 0; i < graph->nb_outputs; i++) - graph->outputs[i]->ost->unavailable = 1; +finish: + if (ret < 0) { + fgp->eof_in = 1; + for (int i = 0; i < graph->nb_outputs; i++) + close_output_stream(graph->outputs[i]->ost); + } - return 0; + return ret; +} + +int reap_filters(FilterGraph *fg, int flush) +{ + return fg_transcode_step(fg, NULL); +} + +void fg_send_command(FilterGraph *fg, double time, const char *target, + const char *command, const char *arg, int all_filters) +{ + FilterGraphPriv *fgp = fgp_from_fg(fg); + AVBufferRef *buf; + FilterCommand *fc; + int output_idx, ret; + + if (!fgp->queue_in) + return; + + fc = av_mallocz(sizeof(*fc)); + if (!fc) + return; + + buf = av_buffer_create((uint8_t*)fc, sizeof(*fc), filter_command_free, NULL, 0); + if (!buf) { + av_freep(&fc); + return; + } + + fc->target = av_strdup(target); + fc->command = av_strdup(command); + fc->arg = av_strdup(arg); + if (!fc->target || !fc->command || !fc->arg) { + av_buffer_unref(&buf); + return; + } + + fc->time = time; + fc->all_filters = all_filters; + + fgp->frame->buf[0] = buf; + fgp->frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_SEND_COMMAND; + + ret = tq_send(fgp->queue_in, fg->nb_inputs, fgp->frame); + if (ret < 0) { + av_frame_unref(fgp->frame); + return; + } + + // wait for the frame to be processed + ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame); + av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF); } From patchwork Thu Nov 9 11:36:26 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Anton Khirnov X-Patchwork-Id: 44581 Delivered-To: ffmpegpatchwork2@gmail.com Received: by 2002:a05:6a20:4fa4:b0:181:818d:5e7f with SMTP id gh36csp419223pzb; Thu, 9 Nov 2023 03:37:27 -0800 (PST) X-Google-Smtp-Source: AGHT+IGEWW2rlVixtdNB9kRjnT1sQBLQ5NDA1LSKErVRS6LKIbSeXTaaj+IlWHFSMwgHSYNtKQyw X-Received: by 2002:a17:907:2682:b0:9be:40ba:5f1 with SMTP id bn2-20020a170907268200b009be40ba05f1mr3424430ejc.60.1699529847037; Thu, 09 Nov 2023 03:37:27 -0800 (PST) ARC-Seal: i=1; a=rsa-sha256; t=1699529847; cv=none; d=google.com; s=arc-20160816; b=RpIHrP8vnJmrPkLcEFyzEB1T3YwoOOix6BHIcdFmw92ZNUmzBEgFGxNm5yPWknQ+oE rqyAPWr4M2/ndo0G5E73SWu1ianjZSWMHZBhNtKR1iIbGTR8s80RwkuAlTnYtE7qAdBi eJqflPMh5QuC21qhgSysg3cc2cdxJmTlvJX8cAGPcZQ04FAwL56I+Pc/f11KxgPc1ImN JcTY0eTcv5dpVayyOy3QRABeGQnT31XvaNxPcOIt7rInPmskVPjxP3EjHtf8Y3ICpaNR NnMELDJwe4qEaAzDD/EA11pEzyx7fQsx0zqETSbI19UM+rm3zdnVjnxu5P33exi+7MC7 oDVg== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:content-transfer-encoding:reply-to:list-subscribe :list-help:list-post:list-archive:list-unsubscribe:list-id :precedence:subject:mime-version:references:in-reply-to:message-id :date:to:from:delivered-to; bh=NVc3bCMh7+80RaNlC91xOMQNvr8ekxXSE2kUtaAIPao=; fh=YOA8vD9MJZuwZ71F/05pj6KdCjf6jQRmzLS+CATXUQk=; b=ht7vG6DKlvRB/KJCDEn/wTIs7cICuGwBSVI3FAYbaCLVilKwW9vnrbxSlL+ta6PKLE eIWYYWmvwjraBSe18xhzgLB8Ahw/RopvmQdoS596XAPci5Gcf4cKb1LZL9qRVrRw+z2X 2hvNMoNrtVZRHjtv/9J0ZhQaKlb4N/tJvkQOvMPkP80sbpQjTJmSxsyq3f+ENas1Eki8 2hncjt65I0gZWs0hK9eMROtqXFni3zIeVkTWOnZ3Xl8SYaMZNop/Hz2hNzXqD6YJzxL0 72LclasZzt3irrzLnOLPieCCy6SXVQTsG58JUdGEQ4VfBFgSMgaj4BuwiLdDe2Qxc0uL OdbA== ARC-Authentication-Results: i=1; mx.google.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org. [79.124.17.100]) by mx.google.com with ESMTP id ji12-20020a170907980c00b009b274ce51e2si3223936ejc.280.2023.11.09.03.37.16; Thu, 09 Nov 2023 03:37:27 -0800 (PST) Received-SPF: pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) client-ip=79.124.17.100; Authentication-Results: mx.google.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 71AB868CB87; Thu, 9 Nov 2023 13:37:12 +0200 (EET) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from mail1.khirnov.net (quelana.khirnov.net [94.230.150.81]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id B020868C5FF for ; Thu, 9 Nov 2023 13:37:05 +0200 (EET) Received: from localhost (mail1.khirnov.net [IPv6:::1]) by mail1.khirnov.net (Postfix) with ESMTP id D1C2711B4 for ; Thu, 9 Nov 2023 12:37:04 +0100 (CET) Received: from mail1.khirnov.net ([IPv6:::1]) by localhost (mail1.khirnov.net [IPv6:::1]) (amavis, port 10024) with ESMTP id 097QYSVwqgFw for ; Thu, 9 Nov 2023 12:37:02 +0100 (CET) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:2a00:c500:561:201::7]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256 client-signature RSA-PSS (2048 bits) client-digest SHA256) (Client CN "libav.khirnov.net", Issuer "smtp.khirnov.net SMTP CA" (verified OK)) by mail1.khirnov.net (Postfix) with ESMTPS id 08561929 for ; Thu, 9 Nov 2023 12:37:02 +0100 (CET) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:::1]) by libav.khirnov.net (Postfix) with ESMTP id 933673A05FF for ; Thu, 9 Nov 2023 12:36:54 +0100 (CET) From: Anton Khirnov To: ffmpeg-devel@ffmpeg.org Date: Thu, 9 Nov 2023 12:36:26 +0100 Message-ID: <20231109113647.16209-1-anton@khirnov.net> X-Mailer: git-send-email 2.42.0 In-Reply-To: <20231104184409.GI3543730@pb2> References: <20231104184409.GI3543730@pb2> MIME-Version: 1.0 Subject: [FFmpeg-devel] [PATCH v2 18/24] fftools/ffmpeg: add thread-aware transcode scheduling infrastructure X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" X-TUID: YAR3NkJuTn1y See the comment block at the top of fftools/ffmpeg_sched.h for more details on what this scheduler is for. This commit adds the scheduling code itself, along with minimal integration with the rest of the program: * allocating and freeing the scheduler * passing it throughout the call stack in order to register the individual components (demuxers/decoders/filtergraphs/encoders/muxers) with the scheduler The scheduler is not actually used as of this commit, so it should not result in any change in behavior. That will change in future commits. --- fftools/Makefile | 1 + fftools/ffmpeg.c | 18 +- fftools/ffmpeg.h | 24 +- fftools/ffmpeg_dec.c | 10 +- fftools/ffmpeg_demux.c | 46 +- fftools/ffmpeg_enc.c | 13 +- fftools/ffmpeg_filter.c | 37 +- fftools/ffmpeg_mux.c | 17 +- fftools/ffmpeg_mux.h | 11 + fftools/ffmpeg_mux_init.c | 85 +- fftools/ffmpeg_opt.c | 22 +- fftools/ffmpeg_sched.c | 2072 +++++++++++++++++++++++++++++++++++++ fftools/ffmpeg_sched.h | 461 +++++++++ 13 files changed, 2761 insertions(+), 56 deletions(-) create mode 100644 fftools/ffmpeg_sched.c create mode 100644 fftools/ffmpeg_sched.h diff --git a/fftools/Makefile b/fftools/Makefile index 56820e6bc8..d6a8913a7f 100644 --- a/fftools/Makefile +++ b/fftools/Makefile @@ -18,6 +18,7 @@ OBJS-ffmpeg += \ fftools/ffmpeg_mux.o \ fftools/ffmpeg_mux_init.o \ fftools/ffmpeg_opt.o \ + fftools/ffmpeg_sched.o \ fftools/objpool.o \ fftools/sync_queue.o \ fftools/thread_queue.o \ diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c index f2293e0250..1a58bf98cf 100644 --- a/fftools/ffmpeg.c +++ b/fftools/ffmpeg.c @@ -99,6 +99,7 @@ #include "cmdutils.h" #include "ffmpeg.h" +#include "ffmpeg_sched.h" #include "ffmpeg_utils.h" #include "sync_queue.h" @@ -1123,7 +1124,7 @@ static int transcode_step(OutputStream *ost, AVPacket *demux_pkt) /* * The following code is the main loop of the file converter */ -static int transcode(int *err_rate_exceeded) +static int transcode(Scheduler *sch, int *err_rate_exceeded) { int ret = 0, i; InputStream *ist; @@ -1261,6 +1262,8 @@ static int64_t getmaxrss(void) int main(int argc, char **argv) { + Scheduler *sch = NULL; + int ret, err_rate_exceeded; BenchmarkTimeStamps ti; @@ -1278,8 +1281,14 @@ int main(int argc, char **argv) show_banner(argc, argv, options); + sch = sch_alloc(); + if (!sch) { + ret = AVERROR(ENOMEM); + goto finish; + } + /* parse options and open all input/output files */ - ret = ffmpeg_parse_options(argc, argv); + ret = ffmpeg_parse_options(argc, argv, sch); if (ret < 0) goto finish; @@ -1297,7 +1306,7 @@ int main(int argc, char **argv) } current_time = ti = get_benchmark_time_stamps(); - ret = transcode(&err_rate_exceeded); + ret = transcode(sch, &err_rate_exceeded); if (ret >= 0 && do_benchmark) { int64_t utime, stime, rtime; current_time = get_benchmark_time_stamps(); @@ -1317,5 +1326,8 @@ finish: ret = 0; ffmpeg_cleanup(ret); + + sch_free(&sch); + return ret; } diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index c954ed5ebf..5833f85ab5 100644 --- a/fftools/ffmpeg.h +++ b/fftools/ffmpeg.h @@ -27,6 +27,7 @@ #include #include "cmdutils.h" +#include "ffmpeg_sched.h" #include "sync_queue.h" #include "libavformat/avformat.h" @@ -713,7 +714,8 @@ int parse_and_set_vsync(const char *arg, int *vsync_var, int file_idx, int st_id int check_filter_outputs(void); int filtergraph_is_simple(const FilterGraph *fg); int init_simple_filtergraph(InputStream *ist, OutputStream *ost, - char *graph_desc); + char *graph_desc, + Scheduler *sch, unsigned sch_idx_enc); int init_complex_filtergraph(FilterGraph *fg); int copy_av_subtitle(AVSubtitle *dst, const AVSubtitle *src); @@ -736,7 +738,8 @@ void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational t */ int ifilter_parameters_from_dec(InputFilter *ifilter, const AVCodecContext *dec); -int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost); +int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost, + unsigned sched_idx_enc); /** * Create a new filtergraph in the global filtergraph list. @@ -744,7 +747,7 @@ int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost); * @param graph_desc Graph description; an av_malloc()ed string, filtergraph * takes ownership of it. */ -int fg_create(FilterGraph **pfg, char *graph_desc); +int fg_create(FilterGraph **pfg, char *graph_desc, Scheduler *sch); void fg_free(FilterGraph **pfg); @@ -768,7 +771,7 @@ void fg_send_command(FilterGraph *fg, double time, const char *target, */ int reap_filters(FilterGraph *fg, int flush); -int ffmpeg_parse_options(int argc, char **argv); +int ffmpeg_parse_options(int argc, char **argv, Scheduler *sch); void enc_stats_write(OutputStream *ost, EncStats *es, const AVFrame *frame, const AVPacket *pkt, @@ -791,7 +794,7 @@ AVBufferRef *hw_device_for_filter(void); int hwaccel_retrieve_data(AVCodecContext *avctx, AVFrame *input); -int dec_open(InputStream *ist); +int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx); void dec_free(Decoder **pdec); /** @@ -805,7 +808,8 @@ void dec_free(Decoder **pdec); */ int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof); -int enc_alloc(Encoder **penc, const AVCodec *codec); +int enc_alloc(Encoder **penc, const AVCodec *codec, + Scheduler *sch, unsigned sch_idx); void enc_free(Encoder **penc); int enc_open(OutputStream *ost, const AVFrame *frame); @@ -821,7 +825,7 @@ int enc_flush(void); */ int of_stream_init(OutputFile *of, OutputStream *ost); int of_write_trailer(OutputFile *of); -int of_open(const OptionsContext *o, const char *filename); +int of_open(const OptionsContext *o, const char *filename, Scheduler *sch); void of_free(OutputFile **pof); void of_enc_stats_close(void); @@ -835,7 +839,7 @@ int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts); int64_t of_filesize(OutputFile *of); -int ifile_open(const OptionsContext *o, const char *filename); +int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch); void ifile_close(InputFile **f); /** @@ -920,4 +924,8 @@ extern const char * const opt_name_frame_rates[]; extern const char * const opt_name_top_field_first[]; #endif +void *muxer_thread(void *arg); +void *decoder_thread(void *arg); +void *encoder_thread(void *arg); + #endif /* FFTOOLS_FFMPEG_H */ diff --git a/fftools/ffmpeg_dec.c b/fftools/ffmpeg_dec.c index 798ddc25b3..53e14f061e 100644 --- a/fftools/ffmpeg_dec.c +++ b/fftools/ffmpeg_dec.c @@ -52,6 +52,9 @@ struct Decoder { AVFrame *sub_prev[2]; AVFrame *sub_heartbeat; + Scheduler *sch; + unsigned sch_idx; + pthread_t thread; /** * Queue for sending coded packets from the main thread to @@ -650,7 +653,7 @@ fail: return AVERROR(ENOMEM); } -static void *decoder_thread(void *arg) +void *decoder_thread(void *arg) { InputStream *ist = arg; InputFile *ifile = input_files[ist->file_index]; @@ -1022,7 +1025,7 @@ static int hw_device_setup_for_decode(InputStream *ist) return 0; } -int dec_open(InputStream *ist) +int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx) { Decoder *d; const AVCodec *codec = ist->dec; @@ -1040,6 +1043,9 @@ int dec_open(InputStream *ist) return ret; d = ist->decoder; + d->sch = sch; + d->sch_idx = sch_idx; + if (codec->type == AVMEDIA_TYPE_SUBTITLE && ist->fix_sub_duration) { for (int i = 0; i < FF_ARRAY_ELEMS(d->sub_prev); i++) { d->sub_prev[i] = av_frame_alloc(); diff --git a/fftools/ffmpeg_demux.c b/fftools/ffmpeg_demux.c index 65a5e08ca5..2234dbe076 100644 --- a/fftools/ffmpeg_demux.c +++ b/fftools/ffmpeg_demux.c @@ -20,6 +20,7 @@ #include #include "ffmpeg.h" +#include "ffmpeg_sched.h" #include "ffmpeg_utils.h" #include "objpool.h" #include "thread_queue.h" @@ -60,6 +61,9 @@ typedef struct DemuxStream { // name used for logging char log_name[32]; + int sch_idx_stream; + int sch_idx_dec; + double ts_scale; int streamcopy_needed; @@ -108,6 +112,7 @@ typedef struct Demuxer { double readrate_initial_burst; + Scheduler *sch; ThreadQueue *thread_queue; int thread_queue_size; pthread_t thread; @@ -780,7 +785,9 @@ void ifile_close(InputFile **pf) static int ist_use(InputStream *ist, int decoding_needed) { + Demuxer *d = demuxer_from_ifile(input_files[ist->file_index]); DemuxStream *ds = ds_from_ist(ist); + int ret; if (ist->user_set_discard == AVDISCARD_ALL) { av_log(ist, AV_LOG_ERROR, "Cannot %s a disabled input stream\n", @@ -788,13 +795,32 @@ static int ist_use(InputStream *ist, int decoding_needed) return AVERROR(EINVAL); } + if (ds->sch_idx_stream < 0) { + ret = sch_add_demux_stream(d->sch, d->f.index); + if (ret < 0) + return ret; + ds->sch_idx_stream = ret; + } + ist->discard = 0; ist->st->discard = ist->user_set_discard; ist->decoding_needed |= decoding_needed; ds->streamcopy_needed |= !decoding_needed; - if (decoding_needed && !avcodec_is_open(ist->dec_ctx)) { - int ret = dec_open(ist); + if (decoding_needed && ds->sch_idx_dec < 0) { + int is_audio = ist->st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO; + + ret = sch_add_dec(d->sch, decoder_thread, ist, d->loop && is_audio); + if (ret < 0) + return ret; + ds->sch_idx_dec = ret; + + ret = sch_connect(d->sch, SCH_DSTREAM(d->f.index, ds->sch_idx_stream), + SCH_DEC(ds->sch_idx_dec)); + if (ret < 0) + return ret; + + ret = dec_open(ist, d->sch, ds->sch_idx_dec); if (ret < 0) return ret; } @@ -804,6 +830,7 @@ static int ist_use(InputStream *ist, int decoding_needed) int ist_output_add(InputStream *ist, OutputStream *ost) { + DemuxStream *ds = ds_from_ist(ist); int ret; ret = ist_use(ist, ost->enc ? DECODING_FOR_OST : 0); @@ -816,11 +843,12 @@ int ist_output_add(InputStream *ist, OutputStream *ost) ist->outputs[ist->nb_outputs - 1] = ost; - return 0; + return ost->enc ? ds->sch_idx_dec : ds->sch_idx_stream; } int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple) { + DemuxStream *ds = ds_from_ist(ist); int ret; ret = ist_use(ist, is_simple ? DECODING_FOR_OST : DECODING_FOR_FILTER); @@ -838,7 +866,7 @@ int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple) if (ret < 0) return ret; - return 0; + return ds->sch_idx_dec; } static int choose_decoder(const OptionsContext *o, AVFormatContext *s, AVStream *st, @@ -970,6 +998,9 @@ static DemuxStream *demux_stream_alloc(Demuxer *d, AVStream *st) if (!ds) return NULL; + ds->sch_idx_stream = -1; + ds->sch_idx_dec = -1; + ds->ist.st = st; ds->ist.file_index = f->index; ds->ist.index = st->index; @@ -1295,7 +1326,7 @@ static Demuxer *demux_alloc(void) return d; } -int ifile_open(const OptionsContext *o, const char *filename) +int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch) { Demuxer *d; InputFile *f; @@ -1322,6 +1353,11 @@ int ifile_open(const OptionsContext *o, const char *filename) f = &d->f; + ret = sch_add_demux(sch, input_thread, d); + if (ret < 0) + return ret; + d->sch = sch; + if (stop_time != INT64_MAX && recording_time != INT64_MAX) { stop_time = INT64_MAX; av_log(d, AV_LOG_WARNING, "-t and -to cannot be used together; using -t.\n"); diff --git a/fftools/ffmpeg_enc.c b/fftools/ffmpeg_enc.c index f1c41272b0..fbfe592f20 100644 --- a/fftools/ffmpeg_enc.c +++ b/fftools/ffmpeg_enc.c @@ -56,6 +56,9 @@ struct Encoder { int opened; int finished; + Scheduler *sch; + unsigned sch_idx; + pthread_t thread; /** * Queue for sending frames from the main thread to @@ -113,7 +116,8 @@ void enc_free(Encoder **penc) av_freep(penc); } -int enc_alloc(Encoder **penc, const AVCodec *codec) +int enc_alloc(Encoder **penc, const AVCodec *codec, + Scheduler *sch, unsigned sch_idx) { Encoder *enc; @@ -133,6 +137,9 @@ int enc_alloc(Encoder **penc, const AVCodec *codec) if (!enc->pkt) goto fail; + enc->sch = sch; + enc->sch_idx = sch_idx; + *penc = enc; return 0; @@ -217,8 +224,6 @@ static int set_encoder_id(OutputFile *of, OutputStream *ost) return 0; } -static void *encoder_thread(void *arg); - static int enc_thread_start(OutputStream *ost) { Encoder *e = ost->enc; @@ -994,7 +999,7 @@ fail: return AVERROR(ENOMEM); } -static void *encoder_thread(void *arg) +void *encoder_thread(void *arg) { OutputStream *ost = arg; OutputFile *of = output_files[ost->file_index]; diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c index c3463253f5..466a0642be 100644 --- a/fftools/ffmpeg_filter.c +++ b/fftools/ffmpeg_filter.c @@ -65,6 +65,9 @@ typedef struct FilterGraphPriv { // frame for sending output to the encoder AVFrame *frame_enc; + Scheduler *sch; + unsigned sch_idx; + pthread_t thread; /** * Queue for sending frames from the main thread to the filtergraph. Has @@ -735,14 +738,19 @@ static int ifilter_bind_ist(InputFilter *ifilter, InputStream *ist) { InputFilterPriv *ifp = ifp_from_ifilter(ifilter); FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph); - int ret; + int ret, dec_idx; av_assert0(!ifp->ist); ifp->ist = ist; ifp->type_src = ist->st->codecpar->codec_type; - ret = ist_filter_add(ist, ifilter, filtergraph_is_simple(ifilter->graph)); + dec_idx = ist_filter_add(ist, ifilter, filtergraph_is_simple(ifilter->graph)); + if (dec_idx < 0) + return dec_idx; + + ret = sch_connect(fgp->sch, SCH_DEC(dec_idx), + SCH_FILTER_IN(fgp->sch_idx, ifp->index)); if (ret < 0) return ret; @@ -798,13 +806,15 @@ static int set_channel_layout(OutputFilterPriv *f, OutputStream *ost) return 0; } -int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost) +int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost, + unsigned sched_idx_enc) { const OutputFile *of = output_files[ost->file_index]; OutputFilterPriv *ofp = ofp_from_ofilter(ofilter); FilterGraph *fg = ofilter->graph; FilterGraphPriv *fgp = fgp_from_fg(fg); const AVCodec *c = ost->enc_ctx->codec; + int ret; av_assert0(!ofilter->ost); @@ -887,6 +897,11 @@ int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost) break; } + ret = sch_connect(fgp->sch, SCH_FILTER_OUT(fgp->sch_idx, ofp->index), + SCH_ENC(sched_idx_enc)); + if (ret < 0) + return ret; + fgp->nb_outputs_bound++; av_assert0(fgp->nb_outputs_bound <= fg->nb_outputs); @@ -1016,7 +1031,7 @@ static const AVClass fg_class = { .category = AV_CLASS_CATEGORY_FILTER, }; -int fg_create(FilterGraph **pfg, char *graph_desc) +int fg_create(FilterGraph **pfg, char *graph_desc, Scheduler *sch) { FilterGraphPriv *fgp; FilterGraph *fg; @@ -1037,6 +1052,7 @@ int fg_create(FilterGraph **pfg, char *graph_desc) fg->index = nb_filtergraphs - 1; fgp->graph_desc = graph_desc; fgp->disable_conversions = !auto_conversion_filters; + fgp->sch = sch; snprintf(fgp->log_name, sizeof(fgp->log_name), "fc#%d", fg->index); @@ -1104,6 +1120,12 @@ int fg_create(FilterGraph **pfg, char *graph_desc) goto fail; } + ret = sch_add_filtergraph(sch, fg->nb_inputs, fg->nb_outputs, + filter_thread, fgp); + if (ret < 0) + goto fail; + fgp->sch_idx = ret; + fail: avfilter_inout_free(&inputs); avfilter_inout_free(&outputs); @@ -1116,13 +1138,14 @@ fail: } int init_simple_filtergraph(InputStream *ist, OutputStream *ost, - char *graph_desc) + char *graph_desc, + Scheduler *sch, unsigned sched_idx_enc) { FilterGraph *fg; FilterGraphPriv *fgp; int ret; - ret = fg_create(&fg, graph_desc); + ret = fg_create(&fg, graph_desc, sch); if (ret < 0) return ret; fgp = fgp_from_fg(fg); @@ -1148,7 +1171,7 @@ int init_simple_filtergraph(InputStream *ist, OutputStream *ost, if (ret < 0) return ret; - ret = ofilter_bind_ost(fg->outputs[0], ost); + ret = ofilter_bind_ost(fg->outputs[0], ost, sched_idx_enc); if (ret < 0) return ret; diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c index bc6ce33483..7dd8e8c848 100644 --- a/fftools/ffmpeg_mux.c +++ b/fftools/ffmpeg_mux.c @@ -297,7 +297,7 @@ fail: return AVERROR(ENOMEM); } -static void *muxer_thread(void *arg) +void *muxer_thread(void *arg) { Muxer *mux = arg; OutputFile *of = &mux->of; @@ -570,7 +570,9 @@ static int thread_start(Muxer *mux) return 0; } -static int print_sdp(void) +int print_sdp(const char *filename); + +int print_sdp(const char *filename) { char sdp[16384]; int i; @@ -603,19 +605,18 @@ static int print_sdp(void) if (ret < 0) goto fail; - if (!sdp_filename) { + if (!filename) { printf("SDP:\n%s\n", sdp); fflush(stdout); } else { - ret = avio_open2(&sdp_pb, sdp_filename, AVIO_FLAG_WRITE, &int_cb, NULL); + ret = avio_open2(&sdp_pb, filename, AVIO_FLAG_WRITE, &int_cb, NULL); if (ret < 0) { - av_log(NULL, AV_LOG_ERROR, "Failed to open sdp file '%s'\n", sdp_filename); + av_log(NULL, AV_LOG_ERROR, "Failed to open sdp file '%s'\n", filename); goto fail; } avio_print(sdp_pb, sdp); avio_closep(&sdp_pb); - av_freep(&sdp_filename); } // SDP successfully written, allow muxer threads to start @@ -651,7 +652,7 @@ int mux_check_init(Muxer *mux) nb_output_dumped++; if (sdp_filename || want_sdp) { - ret = print_sdp(); + ret = print_sdp(sdp_filename); if (ret < 0) { av_log(NULL, AV_LOG_ERROR, "Error writing the SDP.\n"); return ret; @@ -974,6 +975,8 @@ void of_free(OutputFile **pof) ost_free(&of->streams[i]); av_freep(&of->streams); + av_freep(&mux->sch_stream_idx); + av_dict_free(&mux->opts); av_packet_free(&mux->sq_pkt); diff --git a/fftools/ffmpeg_mux.h b/fftools/ffmpeg_mux.h index a2bb4dfc7d..aaf81eaa8d 100644 --- a/fftools/ffmpeg_mux.h +++ b/fftools/ffmpeg_mux.h @@ -24,6 +24,7 @@ #include #include +#include "ffmpeg_sched.h" #include "thread_queue.h" #include "libavformat/avformat.h" @@ -50,6 +51,9 @@ typedef struct MuxStream { EncStats stats; + int sch_idx; + int sch_idx_enc; + int64_t max_frames; /* @@ -94,6 +98,13 @@ typedef struct Muxer { AVFormatContext *fc; + Scheduler *sch; + unsigned sch_idx; + + // OutputStream indices indexed by scheduler stream indices + int *sch_stream_idx; + int nb_sch_stream_idx; + pthread_t thread; ThreadQueue *tq; diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c index d5a10e92bd..922e5f85f2 100644 --- a/fftools/ffmpeg_mux_init.c +++ b/fftools/ffmpeg_mux_init.c @@ -23,6 +23,7 @@ #include "cmdutils.h" #include "ffmpeg.h" #include "ffmpeg_mux.h" +#include "ffmpeg_sched.h" #include "fopen_utf8.h" #include "libavformat/avformat.h" @@ -435,6 +436,9 @@ static MuxStream *mux_stream_alloc(Muxer *mux, enum AVMediaType type) ms->ost.class = &output_stream_class; + ms->sch_idx = -1; + ms->sch_idx_enc = -1; + snprintf(ms->log_name, sizeof(ms->log_name), "%cost#%d:%d", type_str ? *type_str : '?', mux->of.index, ms->ost.index); @@ -1126,6 +1130,22 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type, if (!ms) return AVERROR(ENOMEM); + // only streams with sources (i.e. not attachments) + // are handled by the scheduler + if (ist || ofilter) { + ret = GROW_ARRAY(mux->sch_stream_idx, mux->nb_sch_stream_idx); + if (ret < 0) + return ret; + + ret = sch_add_mux_stream(mux->sch, mux->sch_idx); + if (ret < 0) + return ret; + + av_assert0(ret == mux->nb_sch_stream_idx - 1); + mux->sch_stream_idx[ret] = ms->ost.index; + ms->sch_idx = ret; + } + ost = &ms->ost; if (o->streamid) { @@ -1169,7 +1189,12 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type, if (!ost->enc_ctx) return AVERROR(ENOMEM); - ret = enc_alloc(&ost->enc, enc); + ret = sch_add_enc(mux->sch, encoder_thread, ost, NULL); + if (ret < 0) + return ret; + ms->sch_idx_enc = ret; + + ret = enc_alloc(&ost->enc, enc, mux->sch, ms->sch_idx_enc); if (ret < 0) return ret; @@ -1379,11 +1404,19 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type, ost->enc_ctx->global_quality = FF_QP2LAMBDA * qscale; } - ms->max_muxing_queue_size = 128; - MATCH_PER_STREAM_OPT(max_muxing_queue_size, i, ms->max_muxing_queue_size, oc, st); + if (ms->sch_idx >= 0) { + int max_muxing_queue_size = 128; + int muxing_queue_data_threshold = 50 * 1024 * 1024; - ms->muxing_queue_data_threshold = 50*1024*1024; - MATCH_PER_STREAM_OPT(muxing_queue_data_threshold, i, ms->muxing_queue_data_threshold, oc, st); + MATCH_PER_STREAM_OPT(max_muxing_queue_size, i, max_muxing_queue_size, oc, st); + MATCH_PER_STREAM_OPT(muxing_queue_data_threshold, i, muxing_queue_data_threshold, oc, st); + + sch_mux_stream_buffering(mux->sch, mux->sch_idx, ms->sch_idx, + max_muxing_queue_size, muxing_queue_data_threshold); + + ms->max_muxing_queue_size = max_muxing_queue_size; + ms->muxing_queue_data_threshold = muxing_queue_data_threshold; + } MATCH_PER_STREAM_OPT(bits_per_raw_sample, i, ost->bits_per_raw_sample, oc, st); @@ -1421,23 +1454,46 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type, (type == AVMEDIA_TYPE_VIDEO || type == AVMEDIA_TYPE_AUDIO)) { if (ofilter) { ost->filter = ofilter; - ret = ofilter_bind_ost(ofilter, ost); + ret = ofilter_bind_ost(ofilter, ost, ms->sch_idx_enc); if (ret < 0) return ret; } else { - ret = init_simple_filtergraph(ost->ist, ost, filters); + ret = init_simple_filtergraph(ost->ist, ost, filters, + mux->sch, ms->sch_idx_enc); if (ret < 0) { av_log(ost, AV_LOG_ERROR, "Error initializing a simple filtergraph\n"); return ret; } } + + ret = sch_connect(mux->sch, SCH_ENC(ms->sch_idx_enc), + SCH_MSTREAM(mux->sch_idx, ms->sch_idx)); + if (ret < 0) + return ret; } else if (ost->ist) { - ret = ist_output_add(ost->ist, ost); - if (ret < 0) { + int sched_idx = ist_output_add(ost->ist, ost); + if (sched_idx < 0) { av_log(ost, AV_LOG_ERROR, "Error binding an input stream\n"); - return ret; + return sched_idx; + } + + if (ost->enc) { + ret = sch_connect(mux->sch, SCH_DEC(sched_idx), + SCH_ENC(ms->sch_idx_enc)); + if (ret < 0) + return ret; + + ret = sch_connect(mux->sch, SCH_ENC(ms->sch_idx_enc), + SCH_MSTREAM(mux->sch_idx, ms->sch_idx)); + if (ret < 0) + return ret; + } else { + ret = sch_connect(mux->sch, SCH_DSTREAM(ost->ist->file_index, sched_idx), + SCH_MSTREAM(ost->file_index, ms->sch_idx)); + if (ret < 0) + return ret; } } @@ -2617,7 +2673,7 @@ static Muxer *mux_alloc(void) return mux; } -int of_open(const OptionsContext *o, const char *filename) +int of_open(const OptionsContext *o, const char *filename, Scheduler *sch) { Muxer *mux; AVFormatContext *oc; @@ -2687,6 +2743,13 @@ int of_open(const OptionsContext *o, const char *filename) AVFMT_FLAG_BITEXACT); } + err = sch_add_mux(sch, muxer_thread, NULL, mux, + !strcmp(oc->oformat->name, "rtp")); + if (err < 0) + return err; + mux->sch = sch; + mux->sch_idx = err; + /* create all output streams for this file */ err = create_streams(mux, o); if (err < 0) diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c index cd1aaabccc..e1680ebe0e 100644 --- a/fftools/ffmpeg_opt.c +++ b/fftools/ffmpeg_opt.c @@ -28,6 +28,7 @@ #endif #include "ffmpeg.h" +#include "ffmpeg_sched.h" #include "cmdutils.h" #include "opt_common.h" #include "sync_queue.h" @@ -1163,20 +1164,22 @@ static int opt_audio_qscale(void *optctx, const char *opt, const char *arg) static int opt_filter_complex(void *optctx, const char *opt, const char *arg) { + Scheduler *sch = optctx; char *graph_desc = av_strdup(arg); if (!graph_desc) return AVERROR(ENOMEM); - return fg_create(NULL, graph_desc); + return fg_create(NULL, graph_desc, sch); } static int opt_filter_complex_script(void *optctx, const char *opt, const char *arg) { + Scheduler *sch = optctx; char *graph_desc = file_read(arg); if (!graph_desc) return AVERROR(EINVAL); - return fg_create(NULL, graph_desc); + return fg_create(NULL, graph_desc, sch); } void show_help_default(const char *opt, const char *arg) @@ -1268,8 +1271,9 @@ static const OptionGroupDef groups[] = { [GROUP_INFILE] = { "input url", "i", OPT_INPUT }, }; -static int open_files(OptionGroupList *l, const char *inout, - int (*open_file)(const OptionsContext*, const char*)) +static int open_files(OptionGroupList *l, const char *inout, Scheduler *sch, + int (*open_file)(const OptionsContext*, const char*, + Scheduler*)) { int i, ret; @@ -1289,7 +1293,7 @@ static int open_files(OptionGroupList *l, const char *inout, } av_log(NULL, AV_LOG_DEBUG, "Opening an %s file: %s.\n", inout, g->arg); - ret = open_file(&o, g->arg); + ret = open_file(&o, g->arg, sch); uninit_options(&o); if (ret < 0) { av_log(NULL, AV_LOG_ERROR, "Error opening %s file %s.\n", @@ -1302,7 +1306,7 @@ static int open_files(OptionGroupList *l, const char *inout, return 0; } -int ffmpeg_parse_options(int argc, char **argv) +int ffmpeg_parse_options(int argc, char **argv, Scheduler *sch) { OptionParseContext octx; const char *errmsg = NULL; @@ -1319,7 +1323,7 @@ int ffmpeg_parse_options(int argc, char **argv) } /* apply global options */ - ret = parse_optgroup(NULL, &octx.global_opts); + ret = parse_optgroup(sch, &octx.global_opts); if (ret < 0) { errmsg = "parsing global options"; goto fail; @@ -1329,7 +1333,7 @@ int ffmpeg_parse_options(int argc, char **argv) term_init(); /* open input files */ - ret = open_files(&octx.groups[GROUP_INFILE], "input", ifile_open); + ret = open_files(&octx.groups[GROUP_INFILE], "input", sch, ifile_open); if (ret < 0) { errmsg = "opening input files"; goto fail; @@ -1343,7 +1347,7 @@ int ffmpeg_parse_options(int argc, char **argv) } /* open output files */ - ret = open_files(&octx.groups[GROUP_OUTFILE], "output", of_open); + ret = open_files(&octx.groups[GROUP_OUTFILE], "output", sch, of_open); if (ret < 0) { errmsg = "opening output files"; goto fail; diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c new file mode 100644 index 0000000000..c2beded986 --- /dev/null +++ b/fftools/ffmpeg_sched.c @@ -0,0 +1,2072 @@ +/* + * Inter-thread scheduling/synchronization. + * Copyright (c) 2023 Anton Khirnov + * + * This file is part of FFmpeg. + * + * FFmpeg is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * FFmpeg is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with FFmpeg; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include +#include + +#include "cmdutils.h" +#include "ffmpeg_sched.h" +#include "ffmpeg_utils.h" +#include "sync_queue.h" +#include "thread_queue.h" + +#include "libavcodec/packet.h" + +#include "libavutil/avassert.h" +#include "libavutil/error.h" +#include "libavutil/fifo.h" +#include "libavutil/frame.h" +#include "libavutil/mem.h" +#include "libavutil/thread.h" +#include "libavutil/threadmessage.h" +#include "libavutil/time.h" + +// 100 ms +// FIXME: some other value? make this dynamic? +#define SCHEDULE_TOLERANCE (100 * 1000) + +enum QueueType { + QUEUE_PACKETS, + QUEUE_FRAMES, +}; + +typedef struct SchWaiter { + pthread_mutex_t lock; + pthread_cond_t cond; + atomic_int choked; + + // the following are internal state of schedule_update_locked() and must not + // be accessed outside of it + int choked_prev; + int choked_next; +} SchWaiter; + +typedef struct SchTask { + Scheduler *parent; + SchedulerNode node; + + SchThreadFunc func; + void *func_arg; + + pthread_t thread; + int thread_running; +} SchTask; + +typedef struct SchDec { + const AVClass *class; + + SchedulerNode src; + SchedulerNode *dst; + uint8_t *dst_finished; + unsigned nb_dst; + + SchTask task; + // Queue for receiving input packets, one stream. + ThreadQueue *queue; + + // Queue for sending post-flush end timestamps back to the source + AVThreadMessageQueue *queue_end_ts; + int expect_end_ts; + + // temporary storage used by sch_dec_send() + AVFrame *send_frame; +} SchDec; + +typedef struct SchSyncQueue { + SyncQueue *sq; + AVFrame *frame; + pthread_mutex_t lock; + + unsigned *enc_idx; + unsigned nb_enc_idx; +} SchSyncQueue; + +typedef struct SchEnc { + const AVClass *class; + + SchedulerNode src; + SchedulerNode dst; + + // [0] - index of the sync queue in Scheduler.sq_enc, + // [1] - index of this encoder in the sq + int sq_idx[2]; + + /* Opening encoders is somewhat nontrivial due to their interaction with + * sync queues, which are (among other things) responsible for maintaining + * constant audio frame size, when it is required by the encoder. + * + * Opening the encoder requires stream parameters, obtained from the first + * frame. However, that frame cannot be properly chunked by the sync queue + * without knowing the required frame size, which is only available after + * opening the encoder. + * + * This apparent circular dependency is resolved in the following way: + * - the caller creating the encoder gives us a callback which opens the + * encoder and returns the required frame size (if any) + * - when the first frame is sent to the encoder, the sending thread + * - calls this callback, opening the encoder + * - passes the returned frame size to the sync queue + */ + int (*open_cb)(void *opaque, const AVFrame *frame); + int opened; + + SchTask task; + // Queue for receiving input frames, one stream. + ThreadQueue *queue; + // tq_send() to queue returned EOF + int in_finished; +} SchEnc; + +typedef struct SchDemuxStream { + SchedulerNode *dst; + uint8_t *dst_finished; + unsigned nb_dst; +} SchDemuxStream; + +typedef struct SchDemux { + const AVClass *class; + + SchDemuxStream *streams; + unsigned nb_streams; + + SchTask task; + SchWaiter waiter; + + // temporary storage used by sch_demux_send() + AVPacket *send_pkt; +} SchDemux; + +typedef struct PreMuxQueue { + /** + * Queue for buffering the packets before the muxer task can be started. + */ + AVFifo *fifo; + /** + * Maximum number of packets in fifo. + */ + int max_packets; + /* + * The size of the AVPackets' buffers in queue. + * Updated when a packet is either pushed or pulled from the queue. + */ + size_t data_size; + /* Threshold after which max_packets will be in effect */ + size_t data_threshold; +} PreMuxQueue; + +typedef struct SchMuxStream { + SchedulerNode src; + SchedulerNode src_sched; + + PreMuxQueue pre_mux_queue; + + //////////////////////////////////////////////////////////// + // The following are protected by Scheduler.schedule_lock // + + /* dts of the last packet sent to this stream + in AV_TIME_BASE_Q */ + int64_t last_dts; + // this stream no longer accepts input + int finished; + //////////////////////////////////////////////////////////// +} SchMuxStream; + +typedef struct SchMux { + const AVClass *class; + + SchMuxStream *streams; + unsigned nb_streams; + unsigned nb_streams_ready; + + int (*init)(void *arg); + + SchTask task; + /** + * Set to 1 after starting the muxer task and flushing the + * pre-muxing queues. + * Set either before any tasks have started, or with + * Scheduler.mux_ready_lock held. + */ + atomic_int mux_started; + ThreadQueue *queue; +} SchMux; + +typedef struct SchFilterIn { + SchedulerNode src; + SchedulerNode src_sched; + int send_finished; +} SchFilterIn; + +typedef struct SchFilterOut { + SchedulerNode dst; +} SchFilterOut; + +typedef struct SchFilterGraph { + const AVClass *class; + + SchFilterIn *inputs; + unsigned nb_inputs; + unsigned nb_inputs_finished; + + SchFilterOut *outputs; + unsigned nb_outputs; + + SchTask task; + // input queue, nb_inputs+1 streams + // last stream is control + ThreadQueue *queue; + SchWaiter waiter; + + // protected by schedule_lock + unsigned best_input; +} SchFilterGraph; + +struct Scheduler { + const AVClass *class; + + SchDemux *demux; + unsigned nb_demux; + + SchMux *mux; + unsigned nb_mux; + + unsigned nb_mux_ready; + pthread_mutex_t mux_ready_lock; + + unsigned nb_mux_done; + pthread_mutex_t mux_done_lock; + pthread_cond_t mux_done_cond; + + + SchDec *dec; + unsigned nb_dec; + + SchEnc *enc; + unsigned nb_enc; + + SchSyncQueue *sq_enc; + unsigned nb_sq_enc; + + SchFilterGraph *filters; + unsigned nb_filters; + + char *sdp_filename; + int sdp_auto; + + int transcode_started; + atomic_int terminate; + atomic_int task_failed; + + pthread_mutex_t schedule_lock; + + atomic_int_least64_t last_dts; +}; + +/** + * Wait until this task is allowed to proceed. + * + * @retval 0 the caller should proceed + * @retval 1 the caller should terminate + */ +static int waiter_wait(Scheduler *sch, SchWaiter *w) +{ + int terminate; + + if (!atomic_load(&w->choked)) + return 0; + + pthread_mutex_lock(&w->lock); + + while (atomic_load(&w->choked) && !atomic_load(&sch->terminate)) + pthread_cond_wait(&w->cond, &w->lock); + + terminate = atomic_load(&sch->terminate); + + pthread_mutex_unlock(&w->lock); + + return terminate; +} + +static void waiter_set(SchWaiter *w, int choked) +{ + pthread_mutex_lock(&w->lock); + + atomic_store(&w->choked, choked); + pthread_cond_signal(&w->cond); + + pthread_mutex_unlock(&w->lock); +} + +static int waiter_init(SchWaiter *w) +{ + int ret; + + atomic_init(&w->choked, 0); + + ret = pthread_mutex_init(&w->lock, NULL); + if (ret) + return AVERROR(errno); + + ret = pthread_cond_init(&w->cond, NULL); + if (ret) + return AVERROR(errno); + + return 0; +} + +static void waiter_uninit(SchWaiter *w) +{ + pthread_mutex_destroy(&w->lock); + pthread_cond_destroy(&w->cond); +} + +static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size, + enum QueueType type) +{ + ThreadQueue *tq; + ObjPool *op; + + op = (type == QUEUE_PACKETS) ? objpool_alloc_packets() : + objpool_alloc_frames(); + if (!op) + return AVERROR(ENOMEM); + + tq = tq_alloc(nb_streams, queue_size, op, + (type == QUEUE_PACKETS) ? pkt_move : frame_move); + if (!tq) { + objpool_free(&op); + return AVERROR(ENOMEM); + } + + *ptq = tq; + return 0; +} + +static void *task_wrapper(void *arg); + +static int task_stop(SchTask *task) +{ + int ret; + void *thread_ret; + + if (!task->thread_running) + return 0; + + ret = pthread_join(task->thread, &thread_ret); + av_assert0(ret == 0); + + task->thread_running = 0; + + return (intptr_t)thread_ret; +} + +static int task_start(SchTask *task) +{ + int ret; + + av_log(task->func_arg, AV_LOG_VERBOSE, "Starting thread...\n"); + + av_assert0(!task->thread_running); + + ret = pthread_create(&task->thread, NULL, task_wrapper, task); + if (ret) { + av_log(task->func_arg, AV_LOG_ERROR, "pthread_create() failed: %s\n", + strerror(ret)); + return AVERROR(ret); + } + + task->thread_running = 1; + return 0; +} + +static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx, + SchThreadFunc func, void *func_arg) +{ + task->parent = sch; + + task->node.type = type; + task->node.idx = idx; + + task->func = func; + task->func_arg = func_arg; +} + +int sch_stop(Scheduler *sch) +{ + int ret = 0, err; + + atomic_store(&sch->terminate, 1); + + for (unsigned type = 0; type < 2; type++) + for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) { + SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter; + waiter_set(w, 1); + } + + for (unsigned i = 0; i < sch->nb_demux; i++) { + SchDemux *d = &sch->demux[i]; + + err = task_stop(&d->task); + ret = err_merge(ret, err); + } + + for (unsigned i = 0; i < sch->nb_dec; i++) { + SchDec *dec = &sch->dec[i]; + + err = task_stop(&dec->task); + ret = err_merge(ret, err); + } + + for (unsigned i = 0; i < sch->nb_filters; i++) { + SchFilterGraph *fg = &sch->filters[i]; + + err = task_stop(&fg->task); + ret = err_merge(ret, err); + } + + for (unsigned i = 0; i < sch->nb_enc; i++) { + SchEnc *enc = &sch->enc[i]; + + err = task_stop(&enc->task); + ret = err_merge(ret, err); + } + + for (unsigned i = 0; i < sch->nb_mux; i++) { + SchMux *mux = &sch->mux[i]; + + err = task_stop(&mux->task); + ret = err_merge(ret, err); + } + + return ret; +} + +void sch_free(Scheduler **psch) +{ + Scheduler *sch = *psch; + + if (!sch) + return; + + sch_stop(sch); + + for (unsigned i = 0; i < sch->nb_demux; i++) { + SchDemux *d = &sch->demux[i]; + + for (unsigned j = 0; j < d->nb_streams; j++) { + SchDemuxStream *ds = &d->streams[j]; + av_freep(&ds->dst); + av_freep(&ds->dst_finished); + } + av_freep(&d->streams); + + av_packet_free(&d->send_pkt); + + waiter_uninit(&d->waiter); + } + av_freep(&sch->demux); + + for (unsigned i = 0; i < sch->nb_mux; i++) { + SchMux *mux = &sch->mux[i]; + + for (unsigned j = 0; j < mux->nb_streams; j++) { + SchMuxStream *ms = &mux->streams[j]; + + if (ms->pre_mux_queue.fifo) { + AVPacket *pkt; + while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0) + av_packet_free(&pkt); + av_fifo_freep2(&ms->pre_mux_queue.fifo); + } + } + + av_freep(&mux->streams); + + tq_free(&mux->queue); + } + av_freep(&sch->mux); + + for (unsigned i = 0; i < sch->nb_dec; i++) { + SchDec *dec = &sch->dec[i]; + + tq_free(&dec->queue); + + av_thread_message_queue_free(&dec->queue_end_ts); + + av_freep(&dec->dst); + av_freep(&dec->dst_finished); + + av_frame_free(&dec->send_frame); + } + av_freep(&sch->dec); + + for (unsigned i = 0; i < sch->nb_enc; i++) { + SchEnc *enc = &sch->enc[i]; + + tq_free(&enc->queue); + } + av_freep(&sch->enc); + + for (unsigned i = 0; i < sch->nb_sq_enc; i++) { + SchSyncQueue *sq = &sch->sq_enc[i]; + sq_free(&sq->sq); + av_frame_free(&sq->frame); + pthread_mutex_destroy(&sq->lock); + av_freep(&sq->enc_idx); + } + av_freep(&sch->sq_enc); + + for (unsigned i = 0; i < sch->nb_filters; i++) { + SchFilterGraph *fg = &sch->filters[i]; + + tq_free(&fg->queue); + + av_freep(&fg->inputs); + av_freep(&fg->outputs); + + waiter_uninit(&fg->waiter); + } + av_freep(&sch->filters); + + av_freep(&sch->sdp_filename); + + pthread_mutex_destroy(&sch->mux_ready_lock); + + pthread_mutex_destroy(&sch->mux_done_lock); + pthread_cond_destroy(&sch->mux_done_cond); + + av_freep(psch); +} + +static const AVClass scheduler_class = { + .class_name = "Scheduler", + .version = LIBAVUTIL_VERSION_INT, + .item_name = av_default_item_name, +}; + +Scheduler *sch_alloc(void) +{ + Scheduler *sch; + int ret; + + sch = av_mallocz(sizeof(*sch)); + if (!sch) + return NULL; + + sch->class = &scheduler_class; + sch->sdp_auto = 1; + + ret = pthread_mutex_init(&sch->mux_ready_lock, NULL); + if (ret) + goto fail; + + ret = pthread_mutex_init(&sch->mux_done_lock, NULL); + if (ret) + goto fail; + + ret = pthread_cond_init(&sch->mux_done_cond, NULL); + if (ret) + goto fail; + + return sch; +fail: + sch_free(&sch); + return NULL; +} + +int sch_sdp_filename(Scheduler *sch, const char *sdp_filename) +{ + av_freep(&sch->sdp_filename); + sch->sdp_filename = av_strdup(sdp_filename); + return sch->sdp_filename ? 0 : AVERROR(ENOMEM); +} + +static const AVClass sch_mux_class = { + .class_name = "SchMux", + .version = LIBAVUTIL_VERSION_INT, + .item_name = av_default_item_name, + .parent_log_context_offset = offsetof(SchMux, task.func_arg), +}; + +int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *), + void *arg, int sdp_auto) +{ + const unsigned idx = sch->nb_mux; + + SchMux *mux; + int ret; + + ret = GROW_ARRAY(sch->mux, sch->nb_mux); + if (ret < 0) + return ret; + + mux = &sch->mux[idx]; + mux->class = &sch_mux_class; + mux->init = init; + + task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg); + + sch->sdp_auto &= sdp_auto; + + return idx; +} + +int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx) +{ + SchMux *mux; + SchMuxStream *ms; + unsigned stream_idx; + int ret; + + av_assert0(mux_idx < sch->nb_mux); + mux = &sch->mux[mux_idx]; + + ret = GROW_ARRAY(mux->streams, mux->nb_streams); + if (ret < 0) + return ret; + stream_idx = mux->nb_streams - 1; + + ms = &mux->streams[stream_idx]; + + ms->pre_mux_queue.fifo = av_fifo_alloc2(8, sizeof(AVPacket*), 0); + if (!ms->pre_mux_queue.fifo) + return AVERROR(ENOMEM); + + ms->last_dts = AV_NOPTS_VALUE; + + return stream_idx; +} + +static const AVClass sch_demux_class = { + .class_name = "SchDemux", + .version = LIBAVUTIL_VERSION_INT, + .item_name = av_default_item_name, + .parent_log_context_offset = offsetof(SchDemux, task.func_arg), +}; + +int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx) +{ + const unsigned idx = sch->nb_demux; + + SchDemux *d; + int ret; + + ret = GROW_ARRAY(sch->demux, sch->nb_demux); + if (ret < 0) + return ret; + + d = &sch->demux[idx]; + + task_init(sch, &d->task, SCH_NODE_TYPE_DEMUX, idx, func, ctx); + + d->class = &sch_demux_class; + d->send_pkt = av_packet_alloc(); + if (!d->send_pkt) + return AVERROR(ENOMEM); + + ret = waiter_init(&d->waiter); + if (ret < 0) + return ret; + + return idx; +} + +int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx) +{ + SchDemux *d; + int ret; + + av_assert0(demux_idx < sch->nb_demux); + d = &sch->demux[demux_idx]; + + ret = GROW_ARRAY(d->streams, d->nb_streams); + return ret < 0 ? ret : d->nb_streams - 1; +} + +static const AVClass sch_dec_class = { + .class_name = "SchDec", + .version = LIBAVUTIL_VERSION_INT, + .item_name = av_default_item_name, + .parent_log_context_offset = offsetof(SchDec, task.func_arg), +}; + +int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, + int send_end_ts) +{ + const unsigned idx = sch->nb_dec; + + SchDec *dec; + int ret; + + ret = GROW_ARRAY(sch->dec, sch->nb_dec); + if (ret < 0) + return ret; + + dec = &sch->dec[idx]; + + task_init(sch, &dec->task, SCH_NODE_TYPE_DEC, idx, func, ctx); + + dec->class = &sch_dec_class; + dec->send_frame = av_frame_alloc(); + if (!dec->send_frame) + return AVERROR(ENOMEM); + + ret = queue_alloc(&dec->queue, 1, 1, QUEUE_PACKETS); + if (ret < 0) + return ret; + + if (send_end_ts) { + ret = av_thread_message_queue_alloc(&dec->queue_end_ts, 1, sizeof(Timestamp)); + if (ret < 0) + return ret; + } + + return idx; +} + +int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx, + int (*open_cb)(void *opaque, const AVFrame *frame)) +{ + const unsigned idx = sch->nb_enc; + + SchEnc *enc; + int ret; + + ret = GROW_ARRAY(sch->enc, sch->nb_enc); + if (ret < 0) + return ret; + + enc = &sch->enc[idx]; + + enc->open_cb = open_cb; + enc->sq_idx[0] = -1; + enc->sq_idx[1] = -1; + + task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx); + + ret = queue_alloc(&enc->queue, 1, 1, QUEUE_FRAMES); + if (ret < 0) + return ret; + + return idx; +} + +int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs, + SchThreadFunc func, void *ctx) +{ + const unsigned idx = sch->nb_filters; + + SchFilterGraph *fg; + int ret; + + ret = GROW_ARRAY(sch->filters, sch->nb_filters); + if (ret < 0) + return ret; + fg = &sch->filters[idx]; + + task_init(sch, &fg->task, SCH_NODE_TYPE_FILTER_IN, idx, func, ctx); + + if (nb_inputs) { + fg->inputs = av_calloc(nb_inputs, sizeof(*fg->inputs)); + if (!fg->inputs) + return AVERROR(ENOMEM); + fg->nb_inputs = nb_inputs; + } + + if (nb_outputs) { + fg->outputs = av_calloc(nb_outputs, sizeof(*fg->outputs)); + if (!fg->outputs) + return AVERROR(ENOMEM); + fg->nb_outputs = nb_outputs; + } + + ret = waiter_init(&fg->waiter); + if (ret < 0) + return ret; + + ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 1, QUEUE_FRAMES); + if (ret < 0) + return ret; + + return idx; +} + +int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx) +{ + SchSyncQueue *sq; + int ret; + + ret = GROW_ARRAY(sch->sq_enc, sch->nb_sq_enc); + if (ret < 0) + return ret; + sq = &sch->sq_enc[sch->nb_sq_enc - 1]; + + sq->sq = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, logctx); + if (!sq->sq) + return AVERROR(ENOMEM); + + sq->frame = av_frame_alloc(); + if (!sq->frame) + return AVERROR(ENOMEM); + + ret = pthread_mutex_init(&sq->lock, NULL); + if (ret) + return AVERROR(ret); + + return sq - sch->sq_enc; +} + +int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx, + int limiting, uint64_t max_frames) +{ + SchSyncQueue *sq; + SchEnc *enc; + int ret; + + av_assert0(sq_idx < sch->nb_sq_enc); + sq = &sch->sq_enc[sq_idx]; + + av_assert0(enc_idx < sch->nb_enc); + enc = &sch->enc[enc_idx]; + + ret = GROW_ARRAY(sq->enc_idx, sq->nb_enc_idx); + if (ret < 0) + return ret; + sq->enc_idx[sq->nb_enc_idx - 1] = enc_idx; + + ret = sq_add_stream(sq->sq, limiting); + if (ret < 0) + return ret; + + enc->sq_idx[0] = sq_idx; + enc->sq_idx[1] = ret; + + if (max_frames != INT64_MAX) + sq_limit_frames(sq->sq, enc->sq_idx[1], max_frames); + + return 0; +} + +int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst) +{ + int ret; + + switch (src.type) { + case SCH_NODE_TYPE_DEMUX: { + SchDemuxStream *ds; + + av_assert0(src.idx < sch->nb_demux && + src.idx_stream < sch->demux[src.idx].nb_streams); + ds = &sch->demux[src.idx].streams[src.idx_stream]; + + ret = GROW_ARRAY(ds->dst, ds->nb_dst); + if (ret < 0) + return ret; + + ds->dst[ds->nb_dst - 1] = dst; + + // demuxed packets go to decoding or streamcopy + switch (dst.type) { + case SCH_NODE_TYPE_DEC: { + SchDec *dec; + + av_assert0(dst.idx < sch->nb_dec); + dec = &sch->dec[dst.idx]; + + av_assert0(!dec->src.type); + dec->src = src; + break; + } + case SCH_NODE_TYPE_MUX: { + SchMuxStream *ms; + + av_assert0(dst.idx < sch->nb_mux && + dst.idx_stream < sch->mux[dst.idx].nb_streams); + ms = &sch->mux[dst.idx].streams[dst.idx_stream]; + + av_assert0(!ms->src.type); + ms->src = src; + + break; + } + default: av_assert0(0); + } + + break; + } + case SCH_NODE_TYPE_DEC: { + SchDec *dec; + + av_assert0(src.idx < sch->nb_dec); + dec = &sch->dec[src.idx]; + + ret = GROW_ARRAY(dec->dst, dec->nb_dst); + if (ret < 0) + return ret; + + dec->dst[dec->nb_dst - 1] = dst; + + // decoded frames go to filters or encoding + switch (dst.type) { + case SCH_NODE_TYPE_FILTER_IN: { + SchFilterIn *fi; + + av_assert0(dst.idx < sch->nb_filters && + dst.idx_stream < sch->filters[dst.idx].nb_inputs); + fi = &sch->filters[dst.idx].inputs[dst.idx_stream]; + + av_assert0(!fi->src.type); + fi->src = src; + break; + } + case SCH_NODE_TYPE_ENC: { + SchEnc *enc; + + av_assert0(dst.idx < sch->nb_enc); + enc = &sch->enc[dst.idx]; + + av_assert0(!enc->src.type); + enc->src = src; + break; + } + default: av_assert0(0); + } + + break; + } + case SCH_NODE_TYPE_FILTER_OUT: { + SchFilterOut *fo; + SchEnc *enc; + + av_assert0(src.idx < sch->nb_filters && + src.idx_stream < sch->filters[src.idx].nb_outputs); + // filtered frames go to encoding + av_assert0(dst.type == SCH_NODE_TYPE_ENC && + dst.idx < sch->nb_enc); + + fo = &sch->filters[src.idx].outputs[src.idx_stream]; + enc = &sch->enc[dst.idx]; + + av_assert0(!fo->dst.type && !enc->src.type); + fo->dst = dst; + enc->src = src; + + break; + } + case SCH_NODE_TYPE_ENC: { + SchEnc *enc; + SchMuxStream *ms; + + av_assert0(src.idx < sch->nb_enc); + // encoding packets go to muxing + av_assert0(dst.type == SCH_NODE_TYPE_MUX && + dst.idx < sch->nb_mux && + dst.idx_stream < sch->mux[dst.idx].nb_streams); + enc = &sch->enc[src.idx]; + ms = &sch->mux[dst.idx].streams[dst.idx_stream]; + + av_assert0(!enc->dst.type && !ms->src.type); + enc->dst = dst; + ms->src = src; + + break; + } + default: av_assert0(0); + } + + return 0; +} + +static int mux_task_start(SchMux *mux) +{ + int ret = 0; + + ret = task_start(&mux->task); + if (ret < 0) + return ret; + + /* flush the pre-muxing queues */ + for (unsigned i = 0; i < mux->nb_streams; i++) { + SchMuxStream *ms = &mux->streams[i]; + AVPacket *pkt; + int finished = 0; + + while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0) { + if (pkt) { + if (!finished) + ret = tq_send(mux->queue, i, pkt); + av_packet_free(&pkt); + if (ret == AVERROR_EOF) + finished = 1; + else if (ret < 0) + return ret; + } else + tq_send_finish(mux->queue, i); + } + } + + atomic_store(&mux->mux_started, 1); + + return 0; +} + +int print_sdp(const char *filename); + +static int mux_init(Scheduler *sch, SchMux *mux) +{ + int ret; + + ret = mux->init(mux->task.func_arg); + if (ret < 0) + return ret; + + sch->nb_mux_ready++; + + if (sch->sdp_filename || sch->sdp_auto) { + if (sch->nb_mux_ready < sch->nb_mux) + return 0; + + ret = print_sdp(sch->sdp_filename); + if (ret < 0) { + av_log(sch, AV_LOG_ERROR, "Error writing the SDP.\n"); + return ret; + } + + /* SDP is written only after all the muxers are ready, so now we + * start ALL the threads */ + for (unsigned i = 0; i < sch->nb_mux; i++) { + ret = mux_task_start(&sch->mux[i]); + if (ret < 0) + return ret; + } + } else { + ret = mux_task_start(mux); + if (ret < 0) + return ret; + } + + return 0; +} + +void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, + size_t data_threshold, int max_packets) +{ + SchMux *mux; + SchMuxStream *ms; + + av_assert0(mux_idx < sch->nb_mux); + mux = &sch->mux[mux_idx]; + + av_assert0(stream_idx < mux->nb_streams); + ms = &mux->streams[stream_idx]; + + ms->pre_mux_queue.max_packets = max_packets; + ms->pre_mux_queue.data_threshold = data_threshold; +} + +int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx) +{ + SchMux *mux; + int ret = 0; + + av_assert0(mux_idx < sch->nb_mux); + mux = &sch->mux[mux_idx]; + + av_assert0(stream_idx < mux->nb_streams); + + pthread_mutex_lock(&sch->mux_ready_lock); + + av_assert0(mux->nb_streams_ready < mux->nb_streams); + + // this may be called during initialization - do not start + // threads before sch_start() is called + if (++mux->nb_streams_ready == mux->nb_streams && sch->transcode_started) + ret = mux_init(sch, mux); + + pthread_mutex_unlock(&sch->mux_ready_lock); + + return ret; +} + +static int64_t trailing_dts(const Scheduler *sch) +{ + int64_t min_dts = INT64_MAX; + + for (unsigned i = 0; i < sch->nb_mux; i++) { + const SchMux *mux = &sch->mux[i]; + + for (unsigned j = 0; j < mux->nb_streams; j++) { + const SchMuxStream *ms = &mux->streams[j]; + + if (ms->finished) + continue; + if (ms->last_dts == AV_NOPTS_VALUE) + return AV_NOPTS_VALUE; + + min_dts = FFMIN(min_dts, ms->last_dts); + } + } + + return min_dts == INT64_MAX ? AV_NOPTS_VALUE : min_dts; +} + +static void schedule_update_locked(Scheduler *sch) +{ + int64_t dts; + + // on termination request all waiters are choked, + // we are not to unchoke them + if (atomic_load(&sch->terminate)) + return; + + dts = trailing_dts(sch); + + atomic_store(&sch->last_dts, dts); + + // initialize our internal state + for (unsigned type = 0; type < 2; type++) + for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) { + SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter; + w->choked_prev = atomic_load(&w->choked); + w->choked_next = 1; + } + + // figure out the sources that are allowed to proceed + for (unsigned i = 0; i < sch->nb_mux; i++) { + SchMux *mux = &sch->mux[i]; + + for (unsigned j = 0; j < mux->nb_streams; j++) { + SchMuxStream *ms = &mux->streams[j]; + SchDemux *d; + + // unblock sources for output streams that are not finished + // and not too far ahead of the trailing stream + if (ms->finished) + continue; + if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE) + continue; + if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE) + continue; + + // for outputs fed from filtergraphs, consider that filtergraph's + // best_input information, in other cases there is a well-defined + // source demuxer + if (ms->src_sched.type == SCH_NODE_TYPE_FILTER_OUT) { + SchFilterGraph *fg = &sch->filters[ms->src_sched.idx]; + SchFilterIn *fi; + + // the filtergraph contains internal sources and + // requested to be scheduled directly + if (fg->best_input == fg->nb_inputs) { + fg->waiter.choked_next = 0; + continue; + } + + fi = &fg->inputs[fg->best_input]; + d = &sch->demux[fi->src_sched.idx]; + } else + d = &sch->demux[ms->src_sched.idx]; + + d->waiter.choked_next = 0; + } + } + + for (unsigned type = 0; type < 2; type++) + for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) { + SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter; + if (w->choked_prev != w->choked_next) + waiter_set(w, w->choked_next); + } + +} + +int sch_start(Scheduler *sch) +{ + int ret; + + sch->transcode_started = 1; + + for (unsigned i = 0; i < sch->nb_mux; i++) { + SchMux *mux = &sch->mux[i]; + + for (unsigned j = 0; j < mux->nb_streams; j++) { + SchMuxStream *ms = &mux->streams[j]; + + switch (ms->src.type) { + case SCH_NODE_TYPE_ENC: { + SchEnc *enc = &sch->enc[ms->src.idx]; + if (enc->src.type == SCH_NODE_TYPE_DEC) { + ms->src_sched = sch->dec[enc->src.idx].src; + av_assert0(ms->src_sched.type == SCH_NODE_TYPE_DEMUX); + } else { + ms->src_sched = enc->src; + av_assert0(ms->src_sched.type == SCH_NODE_TYPE_FILTER_OUT); + } + break; + } + case SCH_NODE_TYPE_DEMUX: + ms->src_sched = ms->src; + break; + default: + av_log(mux, AV_LOG_ERROR, + "Muxer stream #%u not connected to a source\n", j); + return AVERROR(EINVAL); + } + } + + ret = queue_alloc(&mux->queue, mux->nb_streams, 1, QUEUE_PACKETS); + if (ret < 0) + return ret; + + if (mux->nb_streams_ready == mux->nb_streams) { + ret = mux_init(sch, mux); + if (ret < 0) + return ret; + } + } + + for (unsigned i = 0; i < sch->nb_enc; i++) { + SchEnc *enc = &sch->enc[i]; + + if (!enc->src.type) { + av_log(enc, AV_LOG_ERROR, + "Encoder not connected to a source\n"); + return AVERROR(EINVAL); + } + if (!enc->dst.type) { + av_log(enc, AV_LOG_ERROR, + "Encoder not connected to a sink\n"); + return AVERROR(EINVAL); + } + + ret = task_start(&enc->task); + if (ret < 0) + return ret; + } + + for (unsigned i = 0; i < sch->nb_filters; i++) { + SchFilterGraph *fg = &sch->filters[i]; + + for (unsigned j = 0; j < fg->nb_inputs; j++) { + SchFilterIn *fi = &fg->inputs[j]; + + if (!fi->src.type) { + av_log(fg, AV_LOG_ERROR, + "Filtergraph input %u not connected to a source\n", j); + return AVERROR(EINVAL); + } + + fi->src_sched = sch->dec[fi->src.idx].src; + } + + for (unsigned j = 0; j < fg->nb_outputs; j++) { + SchFilterOut *fo = &fg->outputs[j]; + + if (!fo->dst.type) { + av_log(fg, AV_LOG_ERROR, + "Filtergraph %u output %u not connected to a sink\n", i, j); + return AVERROR(EINVAL); + } + } + + ret = task_start(&fg->task); + if (ret < 0) + return ret; + } + + for (unsigned i = 0; i < sch->nb_dec; i++) { + SchDec *dec = &sch->dec[i]; + + if (!dec->src.type) { + av_log(dec, AV_LOG_ERROR, + "Decoder not connected to a source\n"); + return AVERROR(EINVAL); + } + if (!dec->nb_dst) { + av_log(dec, AV_LOG_ERROR, + "Decoder not connected to any sink\n"); + return AVERROR(EINVAL); + } + + dec->dst_finished = av_calloc(dec->nb_dst, sizeof(*dec->dst_finished)); + if (!dec->dst_finished) + return AVERROR(ENOMEM); + + ret = task_start(&dec->task); + if (ret < 0) + return ret; + } + + for (unsigned i = 0; i < sch->nb_demux; i++) { + SchDemux *d = &sch->demux[i]; + + if (!d->nb_streams) + continue; + + for (unsigned j = 0; j < d->nb_streams; j++) { + SchDemuxStream *ds = &d->streams[j]; + + if (!ds->nb_dst) { + av_log(d, AV_LOG_ERROR, + "Demuxer stream %u not connected to any sink\n", j); + return AVERROR(EINVAL); + } + + ds->dst_finished = av_calloc(ds->nb_dst, sizeof(*ds->dst_finished)); + if (!ds->dst_finished) + return AVERROR(ENOMEM); + } + + ret = task_start(&d->task); + if (ret < 0) + return ret; + } + + pthread_mutex_lock(&sch->schedule_lock); + schedule_update_locked(sch); + pthread_mutex_unlock(&sch->schedule_lock); + + return 0; +} + +int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts) +{ + int ret; + + // convert delay to absolute timestamp + timeout_us += av_gettime(); + + pthread_mutex_lock(&sch->mux_done_lock); + + if (sch->nb_mux_done < sch->nb_mux) { + struct timespec tv = { .tv_sec = timeout_us / 1000000, + .tv_nsec = (timeout_us % 1000000) * 1000 }; + pthread_cond_timedwait(&sch->mux_done_cond, &sch->mux_done_lock, &tv); + } + + ret = sch->nb_mux_done == sch->nb_mux; + + pthread_mutex_unlock(&sch->mux_done_lock); + + *transcode_ts = atomic_load(&sch->last_dts); + + // abort transcoding if any task failed + ret |= atomic_load(&sch->task_failed); + + return ret; +} + +static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame) +{ + int ret; + + ret = enc->open_cb(enc->task.func_arg, frame); + if (ret < 0) + return ret; + + // ret>0 signals audio frame size, which means sync queue should + // have been enabled during encoder creation + if (ret > 0) { + av_assert0(enc->sq_idx[0] >= 0); + sq_frame_samples(sch->sq_enc[enc->sq_idx[0]].sq, enc->sq_idx[1], ret); + } + + return 0; +} + +static int send_to_enc_thread(Scheduler *sch, SchEnc *enc, AVFrame *frame) +{ + int ret; + + if (!frame) { + tq_send_finish(enc->queue, 0); + return 0; + } + + if (enc->in_finished) + return AVERROR_EOF; + + ret = tq_send(enc->queue, 0, frame); + if (ret < 0) + enc->in_finished = 1; + + return ret; +} + +static int send_to_sq(Scheduler *sch, SchSyncQueue *sq, + AVFrame *frame, unsigned stream_idx) +{ + int ret = 0; + + pthread_mutex_lock(&sq->lock); + + ret = sq_send(sq->sq, stream_idx, SQFRAME(frame)); + if (ret < 0) + goto finish; + + while (1) { + SchEnc *enc; + + // TODO: the SQ API should be extended to allow returning EOF + // for individual streams + ret = sq_receive(sq->sq, -1, SQFRAME(sq->frame)); + if (ret == AVERROR(EAGAIN)) { + ret = 0; + goto finish; + } else if (ret < 0) { + // close all encoders fed from this sync queue + for (unsigned i = 0; i < sq->nb_enc_idx; i++) { + int err = send_to_enc_thread(sch, &sch->enc[sq->enc_idx[i]], NULL); + + // if the sync queue error is EOF and closing the encoder + // produces a more serious error, make sure to pick the latter + ret = err_merge((ret == AVERROR_EOF && err < 0) ? 0 : ret, err); + } + goto finish; + } + + enc = &sch->enc[sq->enc_idx[ret]]; + ret = send_to_enc_thread(sch, enc, sq->frame); + av_frame_unref(sq->frame); + if (ret < 0) { + sq_send(sq->sq, enc->sq_idx[1], SQFRAME(NULL)); + goto finish; + } + } + +finish: + pthread_mutex_unlock(&sq->lock); + + return ret; +} + +static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame) +{ + if (enc->open_cb && frame && !enc->opened) { + int ret = enc_open(sch, enc, frame); + if (ret < 0) + return ret; + enc->opened = 1; + + // discard empty frames that only carry encoder init parameters + if (!frame->buf[0]) { + av_frame_unref(frame); + return 0; + } + } + + return (enc->sq_idx[0] >= 0) ? + send_to_sq(sch, &sch->sq_enc[enc->sq_idx[0]], frame, enc->sq_idx[1]) : + send_to_enc_thread(sch, enc, frame); +} + +static int mux_queue_packet(SchMux *mux, SchMuxStream *ms, AVPacket *pkt) +{ + PreMuxQueue *q = &ms->pre_mux_queue; + AVPacket *tmp_pkt = NULL; + int ret; + + if (!av_fifo_can_write(q->fifo)) { + size_t packets = av_fifo_can_read(q->fifo); + size_t pkt_size = pkt ? pkt->size : 0; + int thresh_reached = (q->data_size + pkt_size) > q->data_threshold; + size_t max_packets = thresh_reached ? q->max_packets : SIZE_MAX; + size_t new_size = FFMIN(2 * packets, max_packets); + + if (new_size <= packets) { + av_log(mux, AV_LOG_ERROR, + "Too many packets buffered for output stream.\n"); + return AVERROR(ENOSPC); + } + ret = av_fifo_grow2(q->fifo, new_size - packets); + if (ret < 0) + return ret; + } + + if (pkt) { + tmp_pkt = av_packet_alloc(); + if (!tmp_pkt) + return AVERROR(ENOMEM); + + av_packet_move_ref(tmp_pkt, pkt); + q->data_size += tmp_pkt->size; + } + av_fifo_write(q->fifo, &tmp_pkt, 1); + + return 0; +} + +static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx, + AVPacket *pkt) +{ + SchMuxStream *ms = &mux->streams[stream_idx]; + int64_t dts = (pkt && pkt->dts != AV_NOPTS_VALUE) ? + av_rescale_q(pkt->dts, pkt->time_base, AV_TIME_BASE_Q) : + AV_NOPTS_VALUE; + + // queue the packet if the muxer cannot be started yet + if (!atomic_load(&mux->mux_started)) { + int queued = 0; + + // the muxer could have started between the above atomic check and + // locking the mutex, then this block falls through to normal send path + pthread_mutex_lock(&sch->mux_ready_lock); + + if (!atomic_load(&mux->mux_started)) { + int ret = mux_queue_packet(mux, ms, pkt); + queued = ret < 0 ? ret : 1; + } + + pthread_mutex_unlock(&sch->mux_ready_lock); + + if (queued < 0) + return queued; + else if (queued) + goto update_schedule; + } + + if (pkt) { + int ret = tq_send(mux->queue, stream_idx, pkt); + if (ret < 0) + return ret; + } else + tq_send_finish(mux->queue, stream_idx); + +update_schedule: + // TODO: use atomics to check whether this changes trailing dts + // to avoid locking unnecesarily + if (dts != AV_NOPTS_VALUE || !pkt) { + pthread_mutex_lock(&sch->schedule_lock); + + if (pkt) ms->last_dts = dts; + else ms->finished = 1; + + schedule_update_locked(sch); + + pthread_mutex_unlock(&sch->schedule_lock); + } + + return 0; +} + +static int +demux_stream_send_to_dst(Scheduler *sch, const SchedulerNode dst, + uint8_t *dst_finished, AVPacket *pkt, unsigned flags) +{ + int ret; + + if (*dst_finished) + return AVERROR_EOF; + + if (pkt && dst.type == SCH_NODE_TYPE_MUX && + (flags & DEMUX_SEND_STREAMCOPY_EOF)) { + av_packet_unref(pkt); + pkt = NULL; + } + + if (!pkt) + goto finish; + + ret = (dst.type == SCH_NODE_TYPE_MUX) ? + send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) : + tq_send(sch->dec[dst.idx].queue, 0, pkt); + if (ret == AVERROR_EOF) + goto finish; + + return ret; + +finish: + if (dst.type == SCH_NODE_TYPE_MUX) + send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL); + else + tq_send_finish(sch->dec[dst.idx].queue, 0); + + *dst_finished = 1; + return AVERROR_EOF; +} + +static int demux_send_for_stream(Scheduler *sch, SchDemux *d, SchDemuxStream *ds, + AVPacket *pkt, unsigned flags) +{ + unsigned nb_done = 0; + + for (unsigned i = 0; i < ds->nb_dst; i++) { + AVPacket *to_send = pkt; + uint8_t *finished = &ds->dst_finished[i]; + + int ret; + + // sending a packet consumes it, so make a temporary reference if needed + if (pkt && i < ds->nb_dst - 1) { + to_send = d->send_pkt; + + ret = av_packet_ref(to_send, pkt); + if (ret < 0) + return ret; + } + + ret = demux_stream_send_to_dst(sch, ds->dst[i], finished, to_send, flags); + if (to_send) + av_packet_unref(to_send); + if (ret == AVERROR_EOF) + nb_done++; + else if (ret < 0) + return ret; + } + + return (nb_done == ds->nb_dst) ? AVERROR_EOF : 0; +} + +static int demux_flush(Scheduler *sch, SchDemux *d, AVPacket *pkt) +{ + Timestamp max_end_ts = (Timestamp){ .ts = AV_NOPTS_VALUE }; + + av_assert0(!pkt->buf && !pkt->data && !pkt->side_data_elems); + + for (unsigned i = 0; i < d->nb_streams; i++) { + SchDemuxStream *ds = &d->streams[i]; + + for (unsigned j = 0; j < ds->nb_dst; j++) { + const SchedulerNode *dst = &ds->dst[j]; + SchDec *dec; + int ret; + + if (ds->dst_finished[j] || dst->type != SCH_NODE_TYPE_DEC) + continue; + + dec = &sch->dec[dst->idx]; + + ret = tq_send(dec->queue, 0, pkt); + if (ret < 0) + return ret; + + if (dec->queue_end_ts) { + Timestamp ts; + ret = av_thread_message_queue_recv(dec->queue_end_ts, &ts, 0); + if (ret < 0) + return ret; + + if (max_end_ts.ts == AV_NOPTS_VALUE || + (ts.ts != AV_NOPTS_VALUE && + av_compare_ts(max_end_ts.ts, max_end_ts.tb, ts.ts, ts.tb) < 0)) + max_end_ts = ts; + + } + } + } + + pkt->pts = max_end_ts.ts; + pkt->time_base = max_end_ts.tb; + + return 0; +} + +int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt, + unsigned flags) +{ + SchDemux *d; + int terminate; + + av_assert0(demux_idx < sch->nb_demux); + d = &sch->demux[demux_idx]; + + terminate = waiter_wait(sch, &d->waiter); + if (terminate) + return AVERROR_EXIT; + + // flush the downstreams after seek + if (pkt->stream_index == -1) + return demux_flush(sch, d, pkt); + + av_assert0(pkt->stream_index < d->nb_streams); + + return demux_send_for_stream(sch, d, &d->streams[pkt->stream_index], pkt, flags); +} + +static int demux_done(Scheduler *sch, unsigned demux_idx) +{ + SchDemux *d = &sch->demux[demux_idx]; + int ret = 0; + + for (unsigned i = 0; i < d->nb_streams; i++) { + int err = demux_send_for_stream(sch, d, &d->streams[i], NULL, 0); + if (err != AVERROR_EOF) + ret = err_merge(ret, err); + } + + pthread_mutex_lock(&sch->schedule_lock); + + schedule_update_locked(sch); + + pthread_mutex_unlock(&sch->schedule_lock); + + return ret; +} + +int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt) +{ + SchMux *mux; + int ret, stream_idx; + + av_assert0(mux_idx < sch->nb_mux); + mux = &sch->mux[mux_idx]; + + ret = tq_receive(mux->queue, &stream_idx, pkt); + pkt->stream_index = stream_idx; + return ret; +} + +void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx) +{ + SchMux *mux; + + av_assert0(mux_idx < sch->nb_mux); + mux = &sch->mux[mux_idx]; + + av_assert0(stream_idx < mux->nb_streams); + tq_receive_finish(mux->queue, stream_idx); + + pthread_mutex_lock(&sch->schedule_lock); + mux->streams[stream_idx].finished = 1; + + schedule_update_locked(sch); + + pthread_mutex_unlock(&sch->schedule_lock); +} + +static int mux_done(Scheduler *sch, unsigned mux_idx) +{ + SchMux *mux = &sch->mux[mux_idx]; + + pthread_mutex_lock(&sch->schedule_lock); + + for (unsigned i = 0; i < mux->nb_streams; i++) { + tq_receive_finish(mux->queue, i); + mux->streams[i].finished = 1; + } + + schedule_update_locked(sch); + + pthread_mutex_unlock(&sch->schedule_lock); + + pthread_mutex_lock(&sch->mux_done_lock); + + av_assert0(sch->nb_mux_done < sch->nb_mux); + sch->nb_mux_done++; + + pthread_cond_signal(&sch->mux_done_cond); + + pthread_mutex_unlock(&sch->mux_done_lock); + + return 0; +} + +int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt) +{ + SchDec *dec; + int ret, dummy; + + av_assert0(dec_idx < sch->nb_dec); + dec = &sch->dec[dec_idx]; + + // the decoder should have given us post-flush end timestamp in pkt + if (dec->expect_end_ts) { + Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base }; + ret = av_thread_message_queue_send(dec->queue_end_ts, &ts, 0); + if (ret < 0) + return ret; + + dec->expect_end_ts = 0; + } + + ret = tq_receive(dec->queue, &dummy, pkt); + av_assert0(dummy <= 0); + + // got a flush packet, on the next call to this function the decoder + // will give us post-flush end timestamp + if (ret >= 0 && !pkt->data && !pkt->side_data_elems && dec->queue_end_ts) + dec->expect_end_ts = 1; + + return ret; +} + +static int send_to_filter(Scheduler *sch, SchFilterGraph *fg, + unsigned in_idx, AVFrame *frame) +{ + if (frame) + return tq_send(fg->queue, in_idx, frame); + + if (!fg->inputs[in_idx].send_finished) { + fg->inputs[in_idx].send_finished = 1; + tq_send_finish(fg->queue, in_idx); + + // close the control stream when all actual inputs are done + if (++fg->nb_inputs_finished == fg->nb_inputs) + tq_send_finish(fg->queue, fg->nb_inputs); + } + return 0; +} + +static int dec_send_to_dst(Scheduler *sch, const SchedulerNode dst, + uint8_t *dst_finished, AVFrame *frame) +{ + int ret; + + if (*dst_finished) + return AVERROR_EOF; + + if (!frame) + goto finish; + + ret = (dst.type == SCH_NODE_TYPE_FILTER_IN) ? + send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame) : + send_to_enc(sch, &sch->enc[dst.idx], frame); + if (ret == AVERROR_EOF) + goto finish; + + return ret; + +finish: + if (dst.type == SCH_NODE_TYPE_FILTER_IN) + send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL); + else + send_to_enc(sch, &sch->enc[dst.idx], NULL); + + *dst_finished = 1; + + return AVERROR_EOF; +} + +int sch_dec_send(Scheduler *sch, unsigned dec_idx, AVFrame *frame) +{ + SchDec *dec; + int ret = 0; + unsigned nb_done = 0; + + av_assert0(dec_idx < sch->nb_dec); + dec = &sch->dec[dec_idx]; + + for (unsigned i = 0; i < dec->nb_dst; i++) { + uint8_t *finished = &dec->dst_finished[i]; + AVFrame *to_send = frame; + + // sending a frame consumes it, so make a temporary reference if needed + if (i < dec->nb_dst - 1) { + to_send = dec->send_frame; + + // frame may sometimes contain props only, + // e.g. to signal EOF timestamp + ret = frame->buf[0] ? av_frame_ref(to_send, frame) : + av_frame_copy_props(to_send, frame); + if (ret < 0) + return ret; + } + + ret = dec_send_to_dst(sch, dec->dst[i], finished, to_send); + if (ret < 0) { + av_frame_unref(to_send); + if (ret == AVERROR_EOF) { + nb_done++; + ret = 0; + continue; + } + goto finish; + } + } + +finish: + return ret < 0 ? ret : + (nb_done == dec->nb_dst) ? AVERROR_EOF : 0; +} + +static int dec_done(Scheduler *sch, unsigned dec_idx) +{ + SchDec *dec = &sch->dec[dec_idx]; + int ret = 0; + + tq_receive_finish(dec->queue, 0); + + // make sure our source does not get stuck waiting for end timestamps + // that will never arrive + if (dec->queue_end_ts) + av_thread_message_queue_set_err_recv(dec->queue_end_ts, AVERROR_EOF); + + for (unsigned i = 0; i < dec->nb_dst; i++) { + int err = dec_send_to_dst(sch, dec->dst[i], &dec->dst_finished[i], NULL); + if (err < 0 && err != AVERROR_EOF) + ret = err_merge(ret, err); + } + + return ret; +} + +int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame) +{ + SchEnc *enc; + int ret, dummy; + + av_assert0(enc_idx < sch->nb_enc); + enc = &sch->enc[enc_idx]; + + ret = tq_receive(enc->queue, &dummy, frame); + av_assert0(dummy <= 0); + + return ret; +} + +int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt) +{ + SchEnc *enc; + + av_assert0(enc_idx < sch->nb_enc); + enc = &sch->enc[enc_idx]; + + return send_to_mux(sch, &sch->mux[enc->dst.idx], enc->dst.idx_stream, pkt); +} + +static int enc_done(Scheduler *sch, unsigned enc_idx) +{ + SchEnc *enc = &sch->enc[enc_idx]; + + tq_receive_finish(enc->queue, 0); + + return send_to_mux(sch, &sch->mux[enc->dst.idx], enc->dst.idx_stream, NULL); +} + +int sch_filter_receive(Scheduler *sch, unsigned fg_idx, + unsigned *in_idx, AVFrame *frame) +{ + SchFilterGraph *fg; + + av_assert0(fg_idx < sch->nb_filters); + fg = &sch->filters[fg_idx]; + + av_assert0(*in_idx <= fg->nb_inputs); + + // update scheduling to account for desired input stream, if it changed + // + // this check needs no locking because only the filtering thread + // updates this value + if (*in_idx != fg->best_input) { + pthread_mutex_lock(&sch->schedule_lock); + + fg->best_input = *in_idx; + schedule_update_locked(sch); + + pthread_mutex_unlock(&sch->schedule_lock); + } + + if (*in_idx == fg->nb_inputs) { + int terminate = waiter_wait(sch, &fg->waiter); + return terminate ? AVERROR_EOF : AVERROR(EAGAIN); + } + + while (1) { + int ret, idx; + + ret = tq_receive(fg->queue, &idx, frame); + if (idx < 0) + return AVERROR_EOF; + else if (ret >= 0) { + *in_idx = idx; + return 0; + } + + // disregard EOFs for specific streams - they should always be + // preceded by an EOF frame + } +} + +int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame) +{ + SchFilterGraph *fg; + + av_assert0(fg_idx < sch->nb_filters); + fg = &sch->filters[fg_idx]; + + av_assert0(out_idx < fg->nb_outputs); + return send_to_enc(sch, &sch->enc[fg->outputs[out_idx].dst.idx], frame); +} + +static int filter_done(Scheduler *sch, unsigned fg_idx) +{ + SchFilterGraph *fg = &sch->filters[fg_idx]; + int ret = 0; + + for (unsigned i = 0; i <= fg->nb_inputs; i++) + tq_receive_finish(fg->queue, i); + + for (unsigned i = 0; i < fg->nb_outputs; i++) { + SchEnc *enc = &sch->enc[fg->outputs[i].dst.idx]; + int err = send_to_enc(sch, enc, NULL); + if (err < 0 && err != AVERROR_EOF) + ret = err_merge(ret, err); + } + + return ret; +} + +int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame) +{ + SchFilterGraph *fg; + + av_assert0(fg_idx < sch->nb_filters); + fg = &sch->filters[fg_idx]; + + return send_to_filter(sch, fg, fg->nb_inputs, frame); +} + +static void *task_wrapper(void *arg) +{ + SchTask *task = arg; + Scheduler *sch = task->parent; + int ret; + int err = 0; + + ret = (intptr_t)task->func(task->func_arg); + if (ret < 0) + av_log(task->func_arg, AV_LOG_ERROR, + "Task finished with error code: %d (%s)\n", ret, av_err2str(ret)); + + switch (task->node.type) { + case SCH_NODE_TYPE_DEMUX: err = demux_done (sch, task->node.idx); break; + case SCH_NODE_TYPE_MUX: err = mux_done (sch, task->node.idx); break; + case SCH_NODE_TYPE_DEC: err = dec_done (sch, task->node.idx); break; + case SCH_NODE_TYPE_ENC: err = enc_done (sch, task->node.idx); break; + case SCH_NODE_TYPE_FILTER_IN: err = filter_done(sch, task->node.idx); break; + default: av_assert0(0); + } + + ret = err_merge(ret, err); + + // EOF is considered normal termination + if (ret == AVERROR_EOF) + ret = 0; + if (ret < 0) + atomic_store(&sch->task_failed, 1); + + av_log(task->func_arg, ret < 0 ? AV_LOG_ERROR : AV_LOG_VERBOSE, + "Terminating thread with return code %d (%s)\n", ret, + ret < 0 ? av_err2str(ret) : "success"); + + return (void*)(intptr_t)ret; +} diff --git a/fftools/ffmpeg_sched.h b/fftools/ffmpeg_sched.h new file mode 100644 index 0000000000..bba1f07b7b --- /dev/null +++ b/fftools/ffmpeg_sched.h @@ -0,0 +1,461 @@ +/* + * Inter-thread scheduling/synchronization. + * Copyright (c) 2023 Anton Khirnov + * + * This file is part of FFmpeg. + * + * FFmpeg is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * FFmpeg is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with FFmpeg; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef FFTOOLS_FFMPEG_SCHED_H +#define FFTOOLS_FFMPEG_SCHED_H + +#include +#include + +/* + * This file contains the API for the transcode scheduler. + * + * Overall architecture of the transcoding process involves instances of the + * following components: + * - demuxers, each containing any number of demuxed streams; demuxed packets + * belonging to some stream are sent to any number of decoders (transcoding) + * and/or muxers (streamcopy); + * - decoders, which receive encoded packets from some demuxed stream, decode + * them, and send decoded frames to any number of filtergraph inputs + * (audio/video) or encoders (subtitles); + * - filtergraphs, each containing zero or more inputs (0 in case the + * filtergraph contains a lavfi source filter), and one or more outputs; the + * inputs and outputs need not have matching media types; + * each filtergraph input receives decoded frames from some decoder; + * filtered frames from each output are sent to some encoder; + * - encoders, which receive decoded frames from some decoder (subtitles) or + * some filtergraph output (audio/video), encode them, and send encoded + * packets to some muxed stream; + * - muxers, each containing any number of muxed streams; each muxed stream + * receives encoded packets from some demuxed stream (streamcopy) or some + * encoder (transcoding); those packets are interleaved and written out by the + * muxer. + * + * There must be at least one muxer instance, otherwise the transcode produces + * no output and is meaningless. Otherwise, in a generic transcoding scenario + * there may be arbitrary number of instances of any of the above components, + * interconnected in various ways. + * + * The code tries to keep all the output streams across all the muxers in sync + * (i.e. at the same DTS), which is accomplished by varying the rates at which + * packets are read from different demuxers and lavfi sources. Note that the + * degree of control we have over synchronization is fundamentally limited - if + * some demuxed streams in the same input are interleaved at different rates + * than that at which they are to be muxed (e.g. because an input file is badly + * interleaved, or the user changed their speed by mismatching amounts), then + * there will be increasing amounts of buffering followed by eventual + * transcoding failure. + * + * N.B. 1: there are meaningful transcode scenarios with no demuxers, e.g. + * - encoding and muxing output from filtergraph(s) that have no inputs; + * - creating a file that contains nothing but attachments and/or metadata. + * + * N.B. 2: a filtergraph output could, in principle, feed multiple encoders, but + * this is unnecessary because the (a)split filter provides the same + * functionality. + * + * The scheduler, in the above model, is the master object that oversees and + * facilitates the transcoding process. The basic idea is that all instances + * of the abovementioned components communicate only with the scheduler and not + * with each other. The scheduler is then the single place containing the + * knowledge about the whole transcoding pipeline. + */ + +struct AVFrame; +struct AVPacket; + +typedef struct Scheduler Scheduler; + +enum SchedulerNodeType { + SCH_NODE_TYPE_NONE = 0, + SCH_NODE_TYPE_DEMUX, + SCH_NODE_TYPE_MUX, + SCH_NODE_TYPE_DEC, + SCH_NODE_TYPE_ENC, + SCH_NODE_TYPE_FILTER_IN, + SCH_NODE_TYPE_FILTER_OUT, +}; + +typedef struct SchedulerNode { + enum SchedulerNodeType type; + unsigned idx; + unsigned idx_stream; +} SchedulerNode; + +typedef void* (*SchThreadFunc)(void *arg); + +#define SCH_DSTREAM(file, stream) \ + (SchedulerNode){ .type = SCH_NODE_TYPE_DEMUX, \ + .idx = file, .idx_stream = stream } +#define SCH_MSTREAM(file, stream) \ + (SchedulerNode){ .type = SCH_NODE_TYPE_MUX, \ + .idx = file, .idx_stream = stream } +#define SCH_DEC(decoder) \ + (SchedulerNode){ .type = SCH_NODE_TYPE_DEC, \ + .idx = decoder } +#define SCH_ENC(encoder) \ + (SchedulerNode){ .type = SCH_NODE_TYPE_ENC, \ + .idx = encoder } +#define SCH_FILTER_IN(filter, input) \ + (SchedulerNode){ .type = SCH_NODE_TYPE_FILTER_IN, \ + .idx = filter, .idx_stream = input } +#define SCH_FILTER_OUT(filter, output) \ + (SchedulerNode){ .type = SCH_NODE_TYPE_FILTER_OUT, \ + .idx = filter, .idx_stream = output } + +Scheduler *sch_alloc(void); +void sch_free(Scheduler **sch); + +int sch_start(Scheduler *sch); +int sch_stop(Scheduler *sch); + +/** + * Wait until transcoding terminates or the specified timeout elapses. + * + * @param timeout_us Amount of time in microseconds after which this function + * will timeout. + * @param transcode_ts Current transcode timestamp in AV_TIME_BASE_Q, for + * informational purposes only. + * + * @retval 0 waiting timed out, transcoding is not finished + * @retval 1 transcoding is finished + */ +int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts); + +/** + * Add a demuxer to the scheduler. + * + * @param func Function executed as the demuxer task. + * @param ctx Demuxer state; will be passed to func and used for logging. + * + * @retval ">=0" Index of the newly-created demuxer. + * @retval "<0" Error code. + */ +int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx); +/** + * Add a demuxed stream for a previously added demuxer. + * + * @param demux_idx index previously returned by sch_add_demux() + * + * @retval ">=0" Index of the newly-created demuxed stream. + * @retval "<0" Error code. + */ +int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx); + +/** + * Add a decoder to the scheduler. + * + * @param func Function executed as the decoder task. + * @param ctx Decoder state; will be passed to func and used for logging. + * @param send_end_ts The decoder will return an end timestamp after flush packets + * are delivered to it. See documentation for + * sch_dec_receive() for more details. + * + * @retval ">=0" Index of the newly-created decoder. + * @retval "<0" Error code. + */ +int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, + int send_end_ts); + +/** + * Add a filtergraph to the scheduler. + * + * @param nb_inputs Number of filtergraph inputs. + * @param nb_outputs number of filtergraph outputs + * @param func Function executed as the filtering task. + * @param ctx Filter state; will be passed to func and used for logging. + * + * @retval ">=0" Index of the newly-created filtergraph. + * @retval "<0" Error code. + */ +int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs, + SchThreadFunc func, void *ctx); + +/** + * Add a muxer to the scheduler. + * + * Note that muxer thread startup is more complicated than for other components, + * because + * - muxer streams fed by audio/video encoders become initialized dynamically at + * runtime, after those encoders receive their first frame and initialize + * themselves, followed by calling sch_mux_stream_ready() + * - the header can be written after all the streams for a muxer are initialized + * - we may need to write an SDP, which must happen + * - AFTER all the headers are written + * - BEFORE any packets are written by any muxer + * - with all the muxers quiescent + * To avoid complicated muxer-thread synchronization dances, we postpone + * starting the muxer threads until after the SDP is written. The sequence of + * events is then as follows: + * - After sch_mux_stream_ready() is called for all the streams in a given muxer, + * the header for that muxer is written (care is taken that headers for + * different muxers are not written concurrently, since they write file + * information to stderr). If SDP is not wanted, the muxer thread then starts + * and muxing begins. + * - When SDP _is_ wanted, no muxer threads start until the header for the last + * muxer is written. After that, the SDP is written, after which all the muxer + * threads are started at once. + * + * In order for the above to work, the scheduler needs to be able to invoke + * just writing the header, which is the reason the init parameter exists. + * + * @param func Function executed as the muxing task. + * @param init Callback that is called to initialize the muxer and write the + * header. Called after sch_mux_stream_ready() is called for all the + * streams in the muxer. + * @param ctx Muxer state; will be passed to func/init and used for logging. + * @param sdp_auto Determines automatic SDP writing - see sch_sdp_filename(). + * + * @retval ">=0" Index of the newly-created muxer. + * @retval "<0" Error code. + */ +int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *), + void *ctx, int sdp_auto); +/** + * Add a muxed stream for a previously added muxer. + * + * @param mux_idx index previously returned by sch_add_mux() + * + * @retval ">=0" Index of the newly-created muxed stream. + * @retval "<0" Error code. + */ +int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx); + +/** + * Configure limits on packet buffering performed before the muxer task is + * started. + * + * @param mux_idx index previously returned by sch_add_mux() + * @param stream_idx_idx index previously returned by sch_add_mux_stream() + * @param data_threshold Total size of the buffered packets' data after which + * max_packets applies. + * @param max_packets maximum Maximum number of buffered packets after + * data_threshold is reached. + */ +void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, + size_t data_threshold, int max_packets); + +/** + * Signal to the scheduler that the specified muxed stream is initialized and + * ready. Muxing is started once all the streams are ready. + */ +int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx); + +/** + * Set the file path for the SDP. + * + * The SDP is written when either of the following is true: + * - this function is called at least once + * - sdp_auto=1 is passed to EVERY call of sch_add_mux() + */ +int sch_sdp_filename(Scheduler *sch, const char *sdp_filename); + +/** + * Add an encoder to the scheduler. + * + * @param func Function executed as the encoding task. + * @param ctx Encoder state; will be passed to func and used for logging. + * @param open_cb This callback, if specified, will be called when the first + * frame is obtained for this encoder. For audio encoders with a + * fixed frame size (which use a sync queue in the scheduler to + * rechunk frames), it must return that frame size on success. + * Otherwise (non-audio, variable frame size) it should return 0. + * + * @retval ">=0" Index of the newly-created encoder. + * @retval "<0" Error code. + */ +int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx, + int (*open_cb)(void *func_arg, const struct AVFrame *frame)); + +/** + * Add an pre-encoding sync queue to the scheduler. + * + * @param buf_size_us Sync queue buffering size, passed to sq_alloc(). + * @param logctx Logging context for the sync queue. passed to sq_alloc(). + * + * @retval ">=0" Index of the newly-created sync queue. + * @retval "<0" Error code. + */ +int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx); +int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx, + int limiting, uint64_t max_frames); + +int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst); + +enum DemuxSendFlags { + /** + * Treat the packet as an EOF for SCH_NODE_TYPE_MUX destinations + * send normally to other types. + */ + DEMUX_SEND_STREAMCOPY_EOF = (1 << 0), +}; + +/** + * Called by demuxer tasks to communicate with their downstreams. The following + * may be sent: + * - a demuxed packet for the stream identified by pkt->stream_index; + * - demuxer discontinuity/reset (e.g. after a seek) - this is signalled by an + * empty packet with stream_index=-1. + * + * @param demux_idx demuxer index + * @param pkt A demuxed packet to send. + * When flushing (i.e. pkt->stream_index=-1 on entry to this + * function), on successful return pkt->pts/pkt->time_base will be + * set to the maximum end timestamp of any decoded audio stream, or + * AV_NOPTS_VALUE if no decoded audio streams are present. + * + * @retval "non-negative value" success + * @retval AVERROR_EOF all consumers for the stream are done + * @retval AVERROR_EXIT all consumers are done, should terminate demuxing + * @retval "anoter negative error code" other failure + */ +int sch_demux_send(Scheduler *sch, unsigned demux_idx, struct AVPacket *pkt, + unsigned flags); + +/** + * Called by decoder tasks to receive a packet for decoding. + * + * @param dec_idx decoder index + * @param pkt Input packet will be written here on success. + * + * An empty packet signals that the decoder should be flushed, but + * more packets will follow (e.g. after seeking). When a decoder + * created with send_end_ts=1 receives a flush packet, it must write + * the end timestamp of the stream after flushing to + * pkt->pts/time_base on the next call to this function (if any). + * + * @retval "non-negative value" success + * @retval AVERROR_EOF no more packets will arrive, should terminate decoding + * @retval "another negative error code" other failure + */ +int sch_dec_receive(Scheduler *sch, unsigned dec_idx, struct AVPacket *pkt); + +/** + * Called by decoder tasks to send a decoded frame downstream. + * + * @param dec_idx Decoder index previously returned by sch_add_dec(). + * @param frame Decoded frame; on success it is consumed and cleared by this + * function + * + * @retval ">=0" success + * @retval AVERROR_EOF all consumers are done, should terminate decoding + * @retval "another negative error code" other failure + */ +int sch_dec_send(Scheduler *sch, unsigned dec_idx, struct AVFrame *frame); + +/** + * Called by filtergraph tasks to obtain frames for filtering. Will wait for a + * frame to become available and return it in frame. + * + * Filtergraphs that contain lavfi sources and do not currently require new + * input frames should call this function as a means of rate control - then + * in_idx should be set equal to nb_inputs on entry to this function. + * + * @param fg_idx Filtergraph index previously returned by sch_add_filtergraph(). + * @param[in,out] in_idx On input contains the index of the input on which a frame + * is most desired. May be set to nb_inputs to signal that + * the filtergraph does not need more input currently. + * + * On success, will be replaced with the input index of + * the actually returned frame or EOF timestamp. + * + * @retval ">=0" Frame data or EOF timestamp was delivered into frame, in_idx + * contains the index of the input it belongs to. + * @retval AVERROR(EAGAIN) No frame was returned, the filtergraph should + * resume filtering. May only be returned when + * in_idx=nb_inputs on entry to this function. + * @retval AVERROR_EOF No more frames will arrive, should terminate filtering. + */ +int sch_filter_receive(Scheduler *sch, unsigned fg_idx, + unsigned *in_idx, struct AVFrame *frame); + +/** + * Called by filtergraph tasks to send a filtered frame or EOF to consumers. + * + * @param fg_idx Filtergraph index previously returned by sch_add_filtergraph(). + * @param out_idx Index of the output which produced the frame. + * @param frame The frame to send to consumers. When NULL, signals that no more + * frames will be produced for the specified output. When non-NULL, + * the frame is consumed and cleared by this function on success. + * + * @retval "non-negative value" success + * @retval AVERROR_EOF all consumers are done + * @retval "anoter negative error code" other failure + */ +int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, + struct AVFrame *frame); + +int sch_filter_command(Scheduler *sch, unsigned fg_idx, struct AVFrame *frame); + +/** + * Called by encoder tasks to obtain frames for encoding. Will wait for a frame + * to become available and return it in frame. + * + * @param enc_idx Encoder index previously returned by sch_add_enc(). + * @param frame Newly-received frame will be stored here on success. Must be + * clean on entrance to this function. + * + * @retval 0 A frame was successfully delivered into frame. + * @retval AVERROR_EOF No more frames will be delivered, the encoder should + * flush everything and terminate. + * + */ +int sch_enc_receive(Scheduler *sch, unsigned enc_idx, struct AVFrame *frame); + +/** + * Called by encoder tasks to send encoded packets downstream. + * + * @param enc_idx Encoder index previously returned by sch_add_enc(). + * @param pkt An encoded packet; it will be consumed and cleared by this + * function on success. + * + * @retval 0 success + * @retval "<0" Error code. + */ +int sch_enc_send (Scheduler *sch, unsigned enc_idx, struct AVPacket *pkt); + +/** + * Called by muxer tasks to obtain packets for muxing. Will wait for a packet + * for any muxed stream to become available and return it in pkt. + * + * @param mux_idx Muxer index previously returned by sch_add_mux(). + * @param pkt Newly-received packet will be stored here on success. Must be + * clean on entrance to this function. + * + * @retval 0 A packet was successfully delivered into pkt. Its stream_index + * corresponds to a stream index previously returned from + * sch_add_mux_stream(). + * @retval AVERROR_EOF When pkt->stream_index is non-negative, this signals that + * no more packets will be delivered for this stream index. + * Otherwise this indicates that no more packets will be + * delivered for any stream and the muxer should therefore + * flush everything and terminate. + */ +int sch_mux_receive(Scheduler *sch, unsigned mux_idx, struct AVPacket *pkt); + +/** + * Called by muxer tasks to signal that a stream will no longer accept input. + * + * @param stream_idx Stream index previously returned from sch_add_mux_stream(). + */ +void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx); + +#endif /* FFTOOLS_FFMPEG_SCHED_H */