From patchwork Wed Mar 6 11:03:16 2024 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Anton Khirnov X-Patchwork-Id: 46846 Delivered-To: ffmpegpatchwork2@gmail.com Received: by 2002:a05:6a20:d90e:b0:19e:cdac:8cce with SMTP id jd14csp313946pzb; Wed, 6 Mar 2024 03:07:31 -0800 (PST) X-Forwarded-Encrypted: i=2; AJvYcCUo/2fAHCgtecneh1WlL/xbbJngYd9OejLUsqZ4ZRjGOTPAAVZ1s1orhJ/u3YGQojwAFWux+zB2UqIEK4/YmmMNqRUC86zKRFF7bg== X-Google-Smtp-Source: AGHT+IHD+bbhHDy5Uho8oSjPcE2KBMQfHARkwT0x0wkaTpS+mdWH4Sc8li3rr21MAD+Rh+Hasd5Z X-Received: by 2002:a05:6402:511:b0:566:abbd:c390 with SMTP id m17-20020a056402051100b00566abbdc390mr5470350edv.6.1709723251489; Wed, 06 Mar 2024 03:07:31 -0800 (PST) ARC-Seal: i=1; a=rsa-sha256; t=1709723251; cv=none; d=google.com; s=arc-20160816; b=uKdXyJV7bLAEy5Vynjqai5ERIab7emUirexUFp3HE5bpQ9zswHoN2TfTxrpXiuZLl7 k7iLuPV0b9I+CSeUqfh27mlBmRUDkvGNPYFgi2TV7/frrab7MuU25YCfUot01yF9AFl0 jkLHiYW8mq5IDQV1XnGpHWTgGGfJNQDjRJ1re/yti7otJp/XTx2FGHmnVKPx/k44RCPk 6vv6AhSRJUTphimJ5MtmFCoSn/94UQkhsoy0qik6ljLWTl26C4bC97GdzXms21F+6uHg XYBVmjEspjE89qRPgKFPVJMb2X6rfYkCcAPu7HRzQPcjj8gfhlePelJFF3jn/LVDgVk5 kaww== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:content-transfer-encoding:reply-to:list-subscribe :list-help:list-post:list-archive:list-unsubscribe:list-id :precedence:subject:mime-version:references:in-reply-to:message-id :date:to:from:dkim-signature:delivered-to; bh=GYdxFZxCH65m/+GqYRz2El1l7c5TtL8RLPnPFVAmU8o=; fh=YOA8vD9MJZuwZ71F/05pj6KdCjf6jQRmzLS+CATXUQk=; b=uB1m4HuzvZKgjmijZaxrCbc0hUm4vtngJOTZGEqlDrsi4BLJA1bXjRtFsL8sGonVhS JNa21ueg0QXF2yupN8byxqevTDHN5qjaouNPuxWMkkn4V4H/AHx0bqMjukBvBgnSKVZn 1fLj8OSqWAd+DpKdIfE1WL506N3WXs12nxBGmZH0WRW+FUbhOS3eSAQdpYbaJ/PpzFgt e5v/lBY/KuEvAZyiDhnEm3TyIgjZG3T630xZbGfCHPtH9Q5yqSpqSab3DV6yEutAkxdw WfmCQ/XD/BOpbr4nbKRt8TJFQ0CAAUr6Jf2S4e+kD3gaydqn+sYk637SGiC40Dxdl4l5 1t+w==; dara=google.com ARC-Authentication-Results: i=1; mx.google.com; dkim=neutral (body hash did not verify) header.i=@khirnov.net header.s=mail header.b=GsAafDAa; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org. [79.124.17.100]) by mx.google.com with ESMTP id y2-20020a056402440200b00565d035f48esi6118360eda.158.2024.03.06.03.07.31; Wed, 06 Mar 2024 03:07:31 -0800 (PST) Received-SPF: pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) client-ip=79.124.17.100; Authentication-Results: mx.google.com; dkim=neutral (body hash did not verify) header.i=@khirnov.net header.s=mail header.b=GsAafDAa; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 7BD9C68CEC6; Wed, 6 Mar 2024 13:05:43 +0200 (EET) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from mail1.khirnov.net (quelana.khirnov.net [94.230.150.81]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id D625268C97D for ; Wed, 6 Mar 2024 13:05:30 +0200 (EET) Authentication-Results: mail1.khirnov.net; dkim=pass (2048-bit key; unprotected) header.d=khirnov.net header.i=@khirnov.net header.a=rsa-sha256 header.s=mail header.b=GsAafDAa; dkim-atps=neutral Received: from localhost (mail1.khirnov.net [IPv6:::1]) by mail1.khirnov.net (Postfix) with ESMTP id 3C7A84D5E for ; Wed, 6 Mar 2024 12:05:28 +0100 (CET) Received: from mail1.khirnov.net ([IPv6:::1]) by localhost (mail1.khirnov.net [IPv6:::1]) (amavis, port 10024) with ESMTP id WLj93cWDY4JK for ; Wed, 6 Mar 2024 12:05:27 +0100 (CET) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=khirnov.net; s=mail; t=1709723123; bh=4awKoN74/by19YpqsLKevaNe3F+jj2LiOrlfYYGZjCM=; h=From:To:Subject:Date:In-Reply-To:References:From; b=GsAafDAa03U+JHrMFcriLAvpMB7Xp2i96rVFAHv4xJ8dt7pZn0HoM4uFqfDFUXuKE KRWuwqNYf0voPEcaioQ84oEY2nFp0VOhFk8G6Nt7rrXMAKnvt8nO48i5zquIlFT7ri AQMWZMQs9gGfSBZJC+neTkM4i+f9edwtvm5hp0ypZ4eGzfIiivD59yqCIUiD1zPv7u DFI509wnVThD2bl3hs3AUymoY5Zha/vrQhUcyPUyNI3SVGe2wTaljr41jL4TqAq8JZ jlmj+DniAoW2dpyvqMtb4ajyo+/80fsxCyyvLyf7t2Q9/ZJKSX8JEz6M9GrZUA6Ry8 QvTsHF29D1S6Q== Received: from libav.khirnov.net (libav.khirnov.net [IPv6:2a00:c500:561:201::7]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256 client-signature RSA-PSS (2048 bits) client-digest SHA256) (Client CN "libav.khirnov.net", Issuer "smtp.khirnov.net SMTP CA" (verified OK)) by mail1.khirnov.net (Postfix) with ESMTPS id 3DCFD4D61 for ; Wed, 6 Mar 2024 12:05:23 +0100 (CET) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:::1]) by libav.khirnov.net (Postfix) with ESMTP id 304873A0357 for ; Wed, 6 Mar 2024 12:05:23 +0100 (CET) From: Anton Khirnov To: ffmpeg-devel@ffmpeg.org Date: Wed, 6 Mar 2024 12:03:16 +0100 Message-ID: <20240306110319.17339-15-anton@khirnov.net> X-Mailer: git-send-email 2.43.0 In-Reply-To: <20240306110319.17339-1-anton@khirnov.net> References: <20240306110319.17339-1-anton@khirnov.net> MIME-Version: 1.0 Subject: [FFmpeg-devel] [PATCH 15/18] fftools/ffmpeg_sched: allow connecting encoder output to decoders X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" X-TUID: S+iFYlBjFykM --- fftools/ffmpeg_sched.c | 212 ++++++++++++++++++++++++++++++++++------- fftools/ffmpeg_sched.h | 8 +- 2 files changed, 181 insertions(+), 39 deletions(-) diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c index 5f8ef04680..d1fb942c34 100644 --- a/fftools/ffmpeg_sched.c +++ b/fftools/ffmpeg_sched.c @@ -1051,24 +1051,43 @@ int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst) } case SCH_NODE_TYPE_ENC: { SchEnc *enc; - SchMuxStream *ms; av_assert0(src.idx < sch->nb_enc); - // encoding packets go to muxing - av_assert0(dst.type == SCH_NODE_TYPE_MUX && - dst.idx < sch->nb_mux && - dst.idx_stream < sch->mux[dst.idx].nb_streams); enc = &sch->enc[src.idx]; - ms = &sch->mux[dst.idx].streams[dst.idx_stream]; - - av_assert0(!ms->src.type); ret = GROW_ARRAY(enc->dst, enc->nb_dst); if (ret < 0) return ret; enc->dst[enc->nb_dst - 1] = dst; - ms->src = src; + + // encoding packets go to muxing or decoding + switch (dst.type) { + case SCH_NODE_TYPE_MUX: { + SchMuxStream *ms; + + av_assert0(dst.idx < sch->nb_mux && + dst.idx_stream < sch->mux[dst.idx].nb_streams); + ms = &sch->mux[dst.idx].streams[dst.idx_stream]; + + av_assert0(!ms->src.type); + ms->src = src; + + break; + } + case SCH_NODE_TYPE_DEC: { + SchDec *dec; + + av_assert0(dst.idx < sch->nb_dec); + dec = &sch->dec[dst.idx]; + + av_assert0(!dec->src.type); + dec->src = src; + + break; + } + default: av_assert0(0); + } break; } @@ -1217,6 +1236,31 @@ int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_ return 0; } +static void unchoke_for_stream(Scheduler *sch, SchedulerNode src) +{ + while (1) { + SchFilterGraph *fg; + + // fed directly by a demuxer (i.e. not through a filtergraph) + if (src.type == SCH_NODE_TYPE_DEMUX) { + sch->demux[src.idx].waiter.choked_next = 0; + return; + } + + av_assert0(src.type == SCH_NODE_TYPE_FILTER_OUT); + fg = &sch->filters[src.idx]; + + // the filtergraph contains internal sources and + // requested to be scheduled directly + if (fg->best_input == fg->nb_inputs) { + fg->waiter.choked_next = 0; + return; + } + + src = fg->inputs[fg->best_input].src_sched; + } +} + static void schedule_update_locked(Scheduler *sch) { int64_t dts; @@ -1245,7 +1289,6 @@ static void schedule_update_locked(Scheduler *sch) for (unsigned j = 0; j < mux->nb_streams; j++) { SchMuxStream *ms = &mux->streams[j]; - SchDemux *d; // unblock sources for output streams that are not finished // and not too far ahead of the trailing stream @@ -1256,28 +1299,9 @@ static void schedule_update_locked(Scheduler *sch) if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE) continue; - // for outputs fed from filtergraphs, consider that filtergraph's - // best_input information, in other cases there is a well-defined - // source demuxer - if (ms->src_sched.type == SCH_NODE_TYPE_FILTER_OUT) { - SchFilterGraph *fg = &sch->filters[ms->src_sched.idx]; - SchFilterIn *fi; - - // the filtergraph contains internal sources and - // requested to be scheduled directly - if (fg->best_input == fg->nb_inputs) { - fg->waiter.choked_next = 0; - have_unchoked = 1; - continue; - } - - fi = &fg->inputs[fg->best_input]; - d = &sch->demux[fi->src_sched.idx]; - } else - d = &sch->demux[ms->src_sched.idx]; - - d->waiter.choked_next = 0; - have_unchoked = 1; + // resolve the source to unchoke + unchoke_for_stream(sch, ms->src_sched); + have_unchoked = 1; } } @@ -1303,6 +1327,105 @@ static void schedule_update_locked(Scheduler *sch) } +enum { + CYCLE_NODE_NEW = 0, + CYCLE_NODE_STARTED, + CYCLE_NODE_DONE, +}; + +static int +check_acyclic_for_output(const Scheduler *sch, SchedulerNode src, + uint8_t *filters_visited, SchedulerNode *filters_stack) +{ + unsigned nb_filters_stack = 0; + + memset(filters_visited, 0, sch->nb_filters * sizeof(*filters_visited)); + + while (1) { + const SchFilterGraph *fg = &sch->filters[src.idx]; + + filters_visited[src.idx] = CYCLE_NODE_STARTED; + + // descend into every input, depth first + if (src.idx_stream < fg->nb_inputs) { + const SchFilterIn *fi = &fg->inputs[src.idx_stream++]; + + // connected to demuxer, no cycles possible + if (fi->src_sched.type == SCH_NODE_TYPE_DEMUX) + continue; + + // otherwise connected to another filtergraph + av_assert0(fi->src_sched.type == SCH_NODE_TYPE_FILTER_OUT); + + // found a cycle + if (filters_visited[fi->src_sched.idx] == CYCLE_NODE_STARTED) + return AVERROR(EINVAL); + + // place current position on stack and descend + av_assert0(nb_filters_stack < sch->nb_filters); + filters_stack[nb_filters_stack++] = src; + src = (SchedulerNode){ .idx = fi->src_sched.idx, .idx_stream = 0 }; + continue; + } + + filters_visited[src.idx] = CYCLE_NODE_DONE; + + // previous search finished, + if (nb_filters_stack) { + src = filters_stack[--nb_filters_stack]; + continue; + } + return 0; + } +} + +static int check_acyclic(Scheduler *sch) +{ + uint8_t *filters_visited = NULL; + SchedulerNode *filters_stack = NULL; + + int ret = 0; + + if (!sch->nb_filters) + return 0; + + filters_visited = av_malloc_array(sch->nb_filters, sizeof(*filters_visited)); + if (!filters_visited) + return AVERROR(ENOMEM); + + filters_stack = av_malloc_array(sch->nb_filters, sizeof(*filters_stack)); + if (!filters_stack) { + ret = AVERROR(ENOMEM); + goto fail; + } + + // trace the transcoding graph upstream from every output stream + // fed by a filtergraph + for (unsigned i = 0; i < sch->nb_mux; i++) { + SchMux *mux = &sch->mux[i]; + + for (unsigned j = 0; j < mux->nb_streams; j++) { + SchMuxStream *ms = &mux->streams[j]; + SchedulerNode src = ms->src_sched; + + if (src.type != SCH_NODE_TYPE_FILTER_OUT) + continue; + src.idx_stream = 0; + + ret = check_acyclic_for_output(sch, src, filters_visited, filters_stack); + if (ret < 0) { + av_log(mux, AV_LOG_ERROR, "Transcoding graph has a cycle\n"); + goto fail; + } + } + } + +fail: + av_freep(&filters_visited); + av_freep(&filters_stack); + return ret; +} + static int start_prepare(Scheduler *sch) { int ret; @@ -1402,14 +1525,21 @@ static int start_prepare(Scheduler *sch) for (unsigned j = 0; j < fg->nb_inputs; j++) { SchFilterIn *fi = &fg->inputs[j]; + SchDec *dec; if (!fi->src.type) { av_log(fg, AV_LOG_ERROR, "Filtergraph input %u not connected to a source\n", j); return AVERROR(EINVAL); } + av_assert0(fi->src.type == SCH_NODE_TYPE_DEC); + dec = &sch->dec[fi->src.idx]; - fi->src_sched = sch->dec[fi->src.idx].src; + switch (dec->src.type) { + case SCH_NODE_TYPE_DEMUX: fi->src_sched = dec->src; break; + case SCH_NODE_TYPE_ENC: fi->src_sched = sch->enc[dec->src.idx].src; break; + default: av_assert0(0); + } } for (unsigned j = 0; j < fg->nb_outputs; j++) { @@ -1423,6 +1553,11 @@ static int start_prepare(Scheduler *sch) } } + // Check that the transcoding graph has no cycles. + ret = check_acyclic(sch); + if (ret < 0) + return ret; + return 0; } @@ -1575,6 +1710,8 @@ static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame) SchMux *mux; SchMuxStream *ms; + if (enc->dst[i].type != SCH_NODE_TYPE_MUX) + continue; mux = &sch->mux[enc->dst[i].idx]; ms = &mux->streams[enc->dst[i].idx_stream]; @@ -2150,14 +2287,19 @@ static int enc_send_to_dst(Scheduler *sch, const SchedulerNode dst, if (!pkt) goto finish; - ret = send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt); + ret = (dst.type == SCH_NODE_TYPE_MUX) ? + send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) : + tq_send(sch->dec[dst.idx].queue, 0, pkt); if (ret == AVERROR_EOF) goto finish; return ret; finish: - send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL); + if (dst.type == SCH_NODE_TYPE_MUX) + send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL); + else + tq_send_finish(sch->dec[dst.idx].queue, 0); *dst_finished = 1; diff --git a/fftools/ffmpeg_sched.h b/fftools/ffmpeg_sched.h index fc6711f9c3..a9190bd3d1 100644 --- a/fftools/ffmpeg_sched.h +++ b/fftools/ffmpeg_sched.h @@ -35,9 +35,9 @@ * - demuxers, each containing any number of demuxed streams; demuxed packets * belonging to some stream are sent to any number of decoders (transcoding) * and/or muxers (streamcopy); - * - decoders, which receive encoded packets from some demuxed stream, decode - * them, and send decoded frames to any number of filtergraph inputs - * (audio/video) or encoders (subtitles); + * - decoders, which receive encoded packets from some demuxed stream or + * encoder, decode them, and send decoded frames to any number of filtergraph + * inputs (audio/video) or encoders (subtitles); * - filtergraphs, each containing zero or more inputs (0 in case the * filtergraph contains a lavfi source filter), and one or more outputs; the * inputs and outputs need not have matching media types; @@ -45,7 +45,7 @@ * filtered frames from each output are sent to some encoder; * - encoders, which receive decoded frames from some decoder (subtitles) or * some filtergraph output (audio/video), encode them, and send encoded - * packets to some muxed stream; + * packets to any number of muxed streams or decoders; * - muxers, each containing any number of muxed streams; each muxed stream * receives encoded packets from some demuxed stream (streamcopy) or some * encoder (transcoding); those packets are interleaved and written out by the