From patchwork Wed Dec 6 10:27:08 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Anton Khirnov X-Patchwork-Id: 44959 Delivered-To: ffmpegpatchwork2@gmail.com Received: by 2002:a05:6a20:9153:b0:181:818d:5e7f with SMTP id x19csp161085pzc; Wed, 6 Dec 2023 02:30:56 -0800 (PST) X-Google-Smtp-Source: AGHT+IFR8lKdhv+BxvUQtaMWvqHBYN4o5zKnZf95acIgG/o6t9YJZAORagMcp6FJmDe9ycwmzpba X-Received: by 2002:a17:907:1b20:b0:a01:b8e5:d4b7 with SMTP id mp32-20020a1709071b2000b00a01b8e5d4b7mr1206040ejc.1.1701858655842; Wed, 06 Dec 2023 02:30:55 -0800 (PST) ARC-Seal: i=1; a=rsa-sha256; t=1701858655; cv=none; d=google.com; s=arc-20160816; b=Vg6483WqB6TyPTw4XLZtSGlZzIUSyXSkg/L7MqQOzE309sOb4pIVW1te1409+N/sPb wQwc/aZPtMdr1G3wJnsekjUVFPTEJ4tB2UQamMKIy1P1vsvhLK54FYdGaQAiQSOoqeBr Fo262uuSEfE5EimbZGyFhpHxxamWKxgoGRbULop1XApc7iqYnkrwk41KKgnXX8pPS4aN H6gqWar1NEG+bvDKXvl8iq4ZV1z3D4fASjv7HdEhBR9fEw2PovePvC3W26J7o9zNUs75 GQVjCo8tI1bdQxCNdj79GcUdCZxJ2bHvAOnHUAh6TvnftLu1b9BDjgoAMjKupbLjfgrW 0HMw== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:content-transfer-encoding:reply-to:list-subscribe :list-help:list-post:list-archive:list-unsubscribe:list-id :precedence:subject:mime-version:references:in-reply-to:message-id :date:to:from:delivered-to; bh=YWV9Uq/cMKLrJUOnEgP7hCZlGS4YCshrZ3+02DPSX6k=; fh=YOA8vD9MJZuwZ71F/05pj6KdCjf6jQRmzLS+CATXUQk=; b=GtTG33BcRAUKWLdhFnXNWqKoMvUzMDdSUSQNjzi2R1lOiKhcyF9F9SfqLt6Rm+LLuH mpP0W9mDIm1C36c4n6Itf9bBNZwcViQUmazX723rENFMowceBWVYv6Wks/b836KPpF1K Zg4aQynBVhDBq25mQBt6+MmbVN1JXSrUYtE2H+pLohCWiPHuwkdkL2B0bEr/tATDTSMJ 2zolBfkWtKXAG29QwtkUXRcRIeKVXNuvh0yjS8WSLfX3hIOXExQOmuQf550GuG+X32g8 5Oel6Owm55Aq40XnS68YuIk5Y/UIgdbY5eXBO3fSMOtNr6dfiystEXkZVyxUM9rXbSf5 d0IA== ARC-Authentication-Results: i=1; mx.google.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org. [79.124.17.100]) by mx.google.com with ESMTP id hd34-20020a17090796a200b00a1d82a7f8basi688657ejc.519.2023.12.06.02.30.55; Wed, 06 Dec 2023 02:30:55 -0800 (PST) Received-SPF: pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) client-ip=79.124.17.100; Authentication-Results: mx.google.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id BBABF68CF57; Wed, 6 Dec 2023 12:30:21 +0200 (EET) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from mail1.khirnov.net (quelana.khirnov.net [94.230.150.81]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id 3FDD268C9D3 for ; Wed, 6 Dec 2023 12:30:12 +0200 (EET) Received: from localhost (mail1.khirnov.net [IPv6:::1]) by mail1.khirnov.net (Postfix) with ESMTP id C09911CB4 for ; Wed, 6 Dec 2023 11:30:11 +0100 (CET) Received: from mail1.khirnov.net ([IPv6:::1]) by localhost (mail1.khirnov.net [IPv6:::1]) (amavis, port 10024) with ESMTP id FfTu4wAnkBr2 for ; Wed, 6 Dec 2023 11:30:10 +0100 (CET) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:2a00:c500:561:201::7]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256 client-signature RSA-PSS (2048 bits) client-digest SHA256) (Client CN "libav.khirnov.net", Issuer "smtp.khirnov.net SMTP CA" (verified OK)) by mail1.khirnov.net (Postfix) with ESMTPS id 6F17C1CBF for ; Wed, 6 Dec 2023 11:30:10 +0100 (CET) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:::1]) by libav.khirnov.net (Postfix) with ESMTP id 560373A05D0 for ; Wed, 6 Dec 2023 11:30:10 +0100 (CET) From: Anton Khirnov To: ffmpeg-devel@ffmpeg.org Date: Wed, 6 Dec 2023 11:27:08 +0100 Message-ID: <20231206103002.30084-3-anton@khirnov.net> X-Mailer: git-send-email 2.42.0 In-Reply-To: <20231206103002.30084-1-anton@khirnov.net> References: <20231206103002.30084-1-anton@khirnov.net> MIME-Version: 1.0 Subject: [FFmpeg-devel] [PATCH 02/10] fftools/ffmpeg_filter: move filtering to a separate thread X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" X-TUID: X3AD/VZdrSdG As previously for decoding, this is merely "scaffolding" for moving to a fully threaded architecture and does not yet make filtering truly parallel - the main thread will currently wait for the filtering thread to finish its work before continuing. That will change in future commits after encoders are also moved to threads and a thread-aware scheduler is added. --- fftools/ffmpeg.h | 9 +- fftools/ffmpeg_dec.c | 39 +- fftools/ffmpeg_filter.c | 827 ++++++++++++++++++++++++++++++++++------ 3 files changed, 732 insertions(+), 143 deletions(-) diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index 1f11a2f002..f50222472c 100644 --- a/fftools/ffmpeg.h +++ b/fftools/ffmpeg.h @@ -80,6 +80,14 @@ enum HWAccelID { HWACCEL_GENERIC, }; +enum FrameOpaque { + FRAME_OPAQUE_REAP_FILTERS = 1, + FRAME_OPAQUE_CHOOSE_INPUT, + FRAME_OPAQUE_SUB_HEARTBEAT, + FRAME_OPAQUE_EOF, + FRAME_OPAQUE_SEND_COMMAND, +}; + typedef struct HWDevice { const char *name; enum AVHWDeviceType type; @@ -730,7 +738,6 @@ const FrameData *frame_data_c(AVFrame *frame); int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference); int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb); -int ifilter_sub2video(InputFilter *ifilter, const AVFrame *frame); void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb); /** diff --git a/fftools/ffmpeg_dec.c b/fftools/ffmpeg_dec.c index 517d6b3ced..b60bad1220 100644 --- a/fftools/ffmpeg_dec.c +++ b/fftools/ffmpeg_dec.c @@ -147,11 +147,12 @@ fail: static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame) { - int i, ret; + int i, ret = 0; - av_assert1(ist->nb_filters > 0); /* ensure ret is initialized */ for (i = 0; i < ist->nb_filters; i++) { - ret = ifilter_send_frame(ist->filters[i], decoded_frame, i < ist->nb_filters - 1); + ret = ifilter_send_frame(ist->filters[i], decoded_frame, + i < ist->nb_filters - 1 || + ist->dec->type == AVMEDIA_TYPE_SUBTITLE); if (ret == AVERROR_EOF) ret = 0; /* ignore */ if (ret < 0) { @@ -380,15 +381,6 @@ static int video_frame_process(InputStream *ist, AVFrame *frame) return 0; } -static void sub2video_flush(InputStream *ist) -{ - for (int i = 0; i < ist->nb_filters; i++) { - int ret = ifilter_sub2video(ist->filters[i], NULL); - if (ret != AVERROR_EOF && ret < 0) - av_log(NULL, AV_LOG_WARNING, "Flush the frame error.\n"); - } -} - static int process_subtitle(InputStream *ist, AVFrame *frame) { Decoder *d = ist->decoder; @@ -426,14 +418,9 @@ static int process_subtitle(InputStream *ist, AVFrame *frame) if (!subtitle) return 0; - for (int i = 0; i < ist->nb_filters; i++) { - ret = ifilter_sub2video(ist->filters[i], frame); - if (ret < 0) { - av_log(ist, AV_LOG_ERROR, "Error sending a subtitle for filtering: %s\n", - av_err2str(ret)); - return ret; - } - } + ret = send_frame_to_filters(ist, frame); + if (ret < 0) + return ret; subtitle = (AVSubtitle*)frame->buf[0]->data; if (!subtitle->num_rects) @@ -824,14 +811,10 @@ finish: return ret; // signal EOF to our downstreams - if (ist->dec->type == AVMEDIA_TYPE_SUBTITLE) - sub2video_flush(ist); - else { - ret = send_filter_eof(ist); - if (ret < 0) { - av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n"); - return ret; - } + ret = send_filter_eof(ist); + if (ret < 0) { + av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n"); + return ret; } return AVERROR_EOF; diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c index 69c28a6b2b..d845448332 100644 --- a/fftools/ffmpeg_filter.c +++ b/fftools/ffmpeg_filter.c @@ -21,6 +21,8 @@ #include #include "ffmpeg.h" +#include "ffmpeg_utils.h" +#include "thread_queue.h" #include "libavfilter/avfilter.h" #include "libavfilter/buffersink.h" @@ -53,12 +55,50 @@ typedef struct FilterGraphPriv { int is_meta; int disable_conversions; + int nb_inputs_bound; + int nb_outputs_bound; + const char *graph_desc; // frame for temporarily holding output from the filtergraph AVFrame *frame; // frame for sending output to the encoder AVFrame *frame_enc; + + pthread_t thread; + /** + * Queue for sending frames from the main thread to the filtergraph. Has + * nb_inputs+1 streams - the first nb_inputs stream correspond to + * filtergraph inputs. Frames on those streams may have their opaque set to + * - FRAME_OPAQUE_EOF: frame contains no data, but pts+timebase of the + * EOF event for the correspondint stream. Will be immediately followed by + * this stream being send-closed. + * - FRAME_OPAQUE_SUB_HEARTBEAT: frame contains no data, but pts+timebase of + * a subtitle heartbeat event. Will only be sent for sub2video streams. + * + * The last stream is "control" - the main thread sends empty AVFrames with + * opaque set to + * - FRAME_OPAQUE_REAP_FILTERS: a request to retrieve all frame available + * from filtergraph outputs. These frames are sent to corresponding + * streams in queue_out. Finally an empty frame is sent to the control + * stream in queue_out. + * - FRAME_OPAQUE_CHOOSE_INPUT: same as above, but in case no frames are + * available the terminating empty frame's opaque will contain the index+1 + * of the filtergraph input to which more input frames should be supplied. + */ + ThreadQueue *queue_in; + /** + * Queue for sending frames from the filtergraph back to the main thread. + * Has nb_outputs+1 streams - the first nb_outputs stream correspond to + * filtergraph outputs. + * + * The last stream is "control" - see documentation for queue_in for more + * details. + */ + ThreadQueue *queue_out; + // submitting frames to filter thread returned EOF + // this only happens on thread exit, so is not per-input + int eof_in; } FilterGraphPriv; static FilterGraphPriv *fgp_from_fg(FilterGraph *fg) @@ -71,6 +111,22 @@ static const FilterGraphPriv *cfgp_from_cfg(const FilterGraph *fg) return (const FilterGraphPriv*)fg; } +// data that is local to the filter thread and not visible outside of it +typedef struct FilterGraphThread { + AVFrame *frame; + + // Temporary buffer for output frames, since on filtergraph reset + // we cannot send them to encoders immediately. + // The output index is stored in frame opaque. + AVFifo *frame_queue_out; + + int got_frame; + + // EOF status of each input/output, as received by the thread + uint8_t *eof_in; + uint8_t *eof_out; +} FilterGraphThread; + typedef struct InputFilterPriv { InputFilter ifilter; @@ -204,7 +260,25 @@ static OutputFilterPriv *ofp_from_ofilter(OutputFilter *ofilter) return (OutputFilterPriv*)ofilter; } -static int configure_filtergraph(FilterGraph *fg); +typedef struct FilterCommand { + char *target; + char *command; + char *arg; + + double time; + int all_filters; +} FilterCommand; + +static void filter_command_free(void *opaque, uint8_t *data) +{ + FilterCommand *fc = (FilterCommand*)data; + + av_freep(&fc->target); + av_freep(&fc->command); + av_freep(&fc->arg); + + av_free(data); +} static int sub2video_get_blank_frame(InputFilterPriv *ifp) { @@ -574,6 +648,59 @@ static int ifilter_has_all_input_formats(FilterGraph *fg) return 1; } +static void *filter_thread(void *arg); + +// start the filtering thread once all inputs and outputs are bound +static int fg_thread_try_start(FilterGraphPriv *fgp) +{ + FilterGraph *fg = &fgp->fg; + ObjPool *op; + int ret = 0; + + if (fgp->nb_inputs_bound < fg->nb_inputs || + fgp->nb_outputs_bound < fg->nb_outputs) + return 0; + + op = objpool_alloc_frames(); + if (!op) + return AVERROR(ENOMEM); + + fgp->queue_in = tq_alloc(fg->nb_inputs + 1, 1, op, frame_move); + if (!fgp->queue_in) { + objpool_free(&op); + return AVERROR(ENOMEM); + } + + // at least one output is mandatory + op = objpool_alloc_frames(); + if (!op) + goto fail; + + fgp->queue_out = tq_alloc(fg->nb_outputs + 1, 1, op, frame_move); + if (!fgp->queue_out) { + objpool_free(&op); + goto fail; + } + + ret = pthread_create(&fgp->thread, NULL, filter_thread, fgp); + if (ret) { + ret = AVERROR(ret); + av_log(NULL, AV_LOG_ERROR, "pthread_create() for filtergraph %d failed: %s\n", + fg->index, av_err2str(ret)); + goto fail; + } + + return 0; +fail: + if (ret >= 0) + ret = AVERROR(ENOMEM); + + tq_free(&fgp->queue_in); + tq_free(&fgp->queue_out); + + return ret; +} + static char *describe_filter_link(FilterGraph *fg, AVFilterInOut *inout, int in) { AVFilterContext *ctx = inout->filter_ctx; @@ -607,6 +734,7 @@ static OutputFilter *ofilter_alloc(FilterGraph *fg) static int ifilter_bind_ist(InputFilter *ifilter, InputStream *ist) { InputFilterPriv *ifp = ifp_from_ifilter(ifilter); + FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph); int ret; av_assert0(!ifp->ist); @@ -624,7 +752,10 @@ static int ifilter_bind_ist(InputFilter *ifilter, InputStream *ist) return AVERROR(ENOMEM); } - return 0; + fgp->nb_inputs_bound++; + av_assert0(fgp->nb_inputs_bound <= ifilter->graph->nb_inputs); + + return fg_thread_try_start(fgp); } static int set_channel_layout(OutputFilterPriv *f, OutputStream *ost) @@ -756,24 +887,10 @@ int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost) break; } - // if we have all input parameters and all outputs are bound, - // the graph can now be configured - if (ifilter_has_all_input_formats(fg)) { - int ret; + fgp->nb_outputs_bound++; + av_assert0(fgp->nb_outputs_bound <= fg->nb_outputs); - for (int i = 0; i < fg->nb_outputs; i++) - if (!fg->outputs[i]->ost) - return 0; - - ret = configure_filtergraph(fg); - if (ret < 0) { - av_log(fg, AV_LOG_ERROR, "Error configuring filter graph: %s\n", - av_err2str(ret)); - return ret; - } - } - - return 0; + return fg_thread_try_start(fgp); } static InputFilter *ifilter_alloc(FilterGraph *fg) @@ -803,6 +920,34 @@ static InputFilter *ifilter_alloc(FilterGraph *fg) return ifilter; } +static int fg_thread_stop(FilterGraphPriv *fgp) +{ + void *ret; + + if (!fgp->queue_in) + return 0; + + for (int i = 0; i <= fgp->fg.nb_inputs; i++) { + InputFilterPriv *ifp = i < fgp->fg.nb_inputs ? + ifp_from_ifilter(fgp->fg.inputs[i]) : NULL; + + if (ifp) + ifp->eof = 1; + + tq_send_finish(fgp->queue_in, i); + } + + for (int i = 0; i <= fgp->fg.nb_outputs; i++) + tq_receive_finish(fgp->queue_out, i); + + pthread_join(fgp->thread, &ret); + + tq_free(&fgp->queue_in); + tq_free(&fgp->queue_out); + + return (int)(intptr_t)ret; +} + void fg_free(FilterGraph **pfg) { FilterGraph *fg = *pfg; @@ -812,6 +957,8 @@ void fg_free(FilterGraph **pfg) return; fgp = fgp_from_fg(fg); + fg_thread_stop(fgp); + avfilter_graph_free(&fg->graph); for (int j = 0; j < fg->nb_inputs; j++) { InputFilter *ifilter = fg->inputs[j]; @@ -1622,7 +1769,7 @@ static int graph_is_meta(AVFilterGraph *graph) return 1; } -static int configure_filtergraph(FilterGraph *fg) +static int configure_filtergraph(FilterGraph *fg, const FilterGraphThread *fgt) { FilterGraphPriv *fgp = fgp_from_fg(fg); AVBufferRef *hw_device; @@ -1746,7 +1893,7 @@ static int configure_filtergraph(FilterGraph *fg) /* send the EOFs for the finished inputs */ for (i = 0; i < fg->nb_inputs; i++) { InputFilterPriv *ifp = ifp_from_ifilter(fg->inputs[i]); - if (ifp->eof) { + if (fgt->eof_in[i]) { ret = av_buffersrc_add_frame(ifp->filter, NULL); if (ret < 0) goto fail; @@ -1829,8 +1976,8 @@ int filtergraph_is_simple(const FilterGraph *fg) return fgp->is_simple; } -void fg_send_command(FilterGraph *fg, double time, const char *target, - const char *command, const char *arg, int all_filters) +static void send_command(FilterGraph *fg, double time, const char *target, + const char *command, const char *arg, int all_filters) { int ret; @@ -1853,6 +2000,29 @@ void fg_send_command(FilterGraph *fg, double time, const char *target, } } +static int choose_input(const FilterGraph *fg, const FilterGraphThread *fgt) +{ + int nb_requests, nb_requests_max = 0; + int best_input = -1; + + for (int i = 0; i < fg->nb_inputs; i++) { + InputFilter *ifilter = fg->inputs[i]; + InputFilterPriv *ifp = ifp_from_ifilter(ifilter); + InputStream *ist = ifp->ist; + + if (input_files[ist->file_index]->eagain || fgt->eof_in[i]) + continue; + + nb_requests = av_buffersrc_get_nb_failed_requests(ifp->filter); + if (nb_requests > nb_requests_max) { + nb_requests_max = nb_requests; + best_input = i; + } + } + + return best_input; +} + static int choose_out_timebase(OutputFilterPriv *ofp, AVFrame *frame) { OutputFilter *ofilter = &ofp->ofilter; @@ -2088,16 +2258,16 @@ finish: fps->dropped_keyframe |= fps->last_dropped && (frame->flags & AV_FRAME_FLAG_KEY); } -static int fg_output_frame(OutputFilterPriv *ofp, AVFrame *frame) +static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt, + AVFrame *frame, int buffer) { FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph); - OutputStream *ost = ofp->ofilter.ost; AVFrame *frame_prev = ofp->fps.last_frame; enum AVMediaType type = ofp->ofilter.type; - int64_t nb_frames = 1, nb_frames_prev = 0; + int64_t nb_frames = !!frame, nb_frames_prev = 0; - if (type == AVMEDIA_TYPE_VIDEO) + if (type == AVMEDIA_TYPE_VIDEO && (frame || fgt->got_frame)) video_sync_process(ofp, frame, &nb_frames, &nb_frames_prev); for (int64_t i = 0; i < nb_frames; i++) { @@ -2136,10 +2306,31 @@ static int fg_output_frame(OutputFilterPriv *ofp, AVFrame *frame) frame_out = frame; } - ret = enc_frame(ost, frame_out); - av_frame_unref(frame_out); - if (ret < 0) - return ret; + if (buffer) { + AVFrame *f = av_frame_alloc(); + + if (!f) { + av_frame_unref(frame_out); + return AVERROR(ENOMEM); + } + + av_frame_move_ref(f, frame_out); + f->opaque = (void*)(intptr_t)ofp->index; + + ret = av_fifo_write(fgt->frame_queue_out, &f, 1); + if (ret < 0) { + av_frame_free(&f); + return AVERROR(ENOMEM); + } + } else { + // return the frame to the main thread + ret = tq_send(fgp->queue_out, ofp->index, frame_out); + if (ret < 0) { + av_frame_unref(frame_out); + fgt->eof_out[ofp->index] = 1; + return ret == AVERROR_EOF ? 0 : ret; + } + } if (type == AVMEDIA_TYPE_VIDEO) { ofp->fps.frame_number++; @@ -2149,7 +2340,7 @@ static int fg_output_frame(OutputFilterPriv *ofp, AVFrame *frame) frame->flags &= ~AV_FRAME_FLAG_KEY; } - ofp->got_frame = 1; + fgt->got_frame = 1; } if (frame && frame_prev) { @@ -2157,23 +2348,27 @@ static int fg_output_frame(OutputFilterPriv *ofp, AVFrame *frame) av_frame_move_ref(frame_prev, frame); } + if (!frame) { + tq_send_finish(fgp->queue_out, ofp->index); + fgt->eof_out[ofp->index] = 1; + } + return 0; } -static int fg_output_step(OutputFilterPriv *ofp, int flush) +static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt, + AVFrame *frame, int buffer) { FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph); OutputStream *ost = ofp->ofilter.ost; - AVFrame *frame = fgp->frame; AVFilterContext *filter = ofp->filter; FrameData *fd; int ret; ret = av_buffersink_get_frame_flags(filter, frame, AV_BUFFERSINK_FLAG_NO_REQUEST); - if (flush && ret == AVERROR_EOF && ofp->got_frame && - ost->type == AVMEDIA_TYPE_VIDEO) { - ret = fg_output_frame(ofp, NULL); + if (ret == AVERROR_EOF && !buffer && !fgt->eof_out[ofp->index]) { + ret = fg_output_frame(ofp, fgt, NULL, buffer); return (ret < 0) ? ret : 1; } else if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) { return 1; @@ -2183,22 +2378,18 @@ static int fg_output_step(OutputFilterPriv *ofp, int flush) av_err2str(ret)); return ret; } - if (ost->finished) { + + if (fgt->eof_out[ofp->index]) { av_frame_unref(frame); return 0; } frame->time_base = av_buffersink_get_time_base(filter); - if (frame->pts != AV_NOPTS_VALUE) { - ost->filter->last_pts = av_rescale_q(frame->pts, frame->time_base, - AV_TIME_BASE_Q); - - if (debug_ts) - av_log(fgp, AV_LOG_INFO, "filter_raw -> pts:%s pts_time:%s time_base:%d/%d\n", - av_ts2str(frame->pts), av_ts2timestr(frame->pts, &frame->time_base), - frame->time_base.num, frame->time_base.den); - } + if (debug_ts) + av_log(fgp, AV_LOG_INFO, "filter_raw -> pts:%s pts_time:%s time_base:%d/%d\n", + av_ts2str(frame->pts), av_ts2timestr(frame->pts, &frame->time_base), + frame->time_base.num, frame->time_base.den); // Choose the output timebase the first time we get a frame. if (!ofp->tb_out_locked) { @@ -2231,7 +2422,7 @@ static int fg_output_step(OutputFilterPriv *ofp, int flush) fd->frame_rate_filter = ofp->fps.framerate; } - ret = fg_output_frame(ofp, frame); + ret = fg_output_frame(ofp, fgt, frame, buffer); av_frame_unref(frame); if (ret < 0) return ret; @@ -2239,18 +2430,38 @@ static int fg_output_step(OutputFilterPriv *ofp, int flush) return 0; } -int reap_filters(FilterGraph *fg, int flush) +/* retrieve all frames available at filtergraph outputs and either send them to + * the main thread (buffer=0) or buffer them for later (buffer=1) */ +static int read_frames(FilterGraph *fg, FilterGraphThread *fgt, + AVFrame *frame, int buffer) { + FilterGraphPriv *fgp = fgp_from_fg(fg); + int ret = 0; + if (!fg->graph) return 0; + // process buffered frames + if (!buffer) { + AVFrame *f; + + while (av_fifo_read(fgt->frame_queue_out, &f, 1) >= 0) { + int out_idx = (intptr_t)f->opaque; + f->opaque = NULL; + ret = tq_send(fgp->queue_out, out_idx, f); + av_frame_free(&f); + if (ret < 0 && ret != AVERROR_EOF) + return ret; + } + } + /* Reap all buffers present in the buffer sinks */ for (int i = 0; i < fg->nb_outputs; i++) { OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]); int ret = 0; while (!ret) { - ret = fg_output_step(ofp, flush); + ret = fg_output_step(ofp, fgt, frame, buffer); if (ret < 0) return ret; } @@ -2259,7 +2470,7 @@ int reap_filters(FilterGraph *fg, int flush) return 0; } -void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb) +static void sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb) { InputFilterPriv *ifp = ifp_from_ifilter(ifilter); int64_t pts2; @@ -2284,11 +2495,17 @@ void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational t sub2video_push_ref(ifp, pts2); } -int ifilter_sub2video(InputFilter *ifilter, const AVFrame *frame) +static int sub2video_frame(InputFilter *ifilter, const AVFrame *frame) { InputFilterPriv *ifp = ifp_from_ifilter(ifilter); int ret; + // heartbeat frame + if (frame && !frame->buf[0]) { + sub2video_heartbeat(ifilter, frame->pts, frame->time_base); + return 0; + } + if (ifilter->graph->graph) { if (!frame) { if (ifp->sub2video.end_pts < INT64_MAX) @@ -2317,12 +2534,13 @@ int ifilter_sub2video(InputFilter *ifilter, const AVFrame *frame) return 0; } -int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb) +static int send_eof(FilterGraphThread *fgt, InputFilter *ifilter, + int64_t pts, AVRational tb) { InputFilterPriv *ifp = ifp_from_ifilter(ifilter); int ret; - ifp->eof = 1; + fgt->eof_in[ifp->index] = 1; if (ifp->filter) { pts = av_rescale_q_rnd(pts, tb, ifp->time_base, @@ -2346,7 +2564,7 @@ int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb) return ret; if (ifilter_has_all_input_formats(ifilter->graph)) { - ret = configure_filtergraph(ifilter->graph); + ret = configure_filtergraph(ifilter->graph, fgt); if (ret < 0) { av_log(NULL, AV_LOG_ERROR, "Error initializing filters!\n"); return ret; @@ -2365,10 +2583,10 @@ int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb) return 0; } -int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference) +static int send_frame(FilterGraph *fg, FilterGraphThread *fgt, + InputFilter *ifilter, AVFrame *frame) { InputFilterPriv *ifp = ifp_from_ifilter(ifilter); - FilterGraph *fg = ifilter->graph; AVFrameSideData *sd; int need_reinit, ret; @@ -2408,10 +2626,13 @@ int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference) /* (re)init the graph if possible, otherwise buffer the frame and return */ if (need_reinit || !fg->graph) { + AVFrame *tmp = av_frame_alloc(); + + if (!tmp) + return AVERROR(ENOMEM); + if (!ifilter_has_all_input_formats(fg)) { - AVFrame *tmp = av_frame_clone(frame); - if (!tmp) - return AVERROR(ENOMEM); + av_frame_move_ref(tmp, frame); ret = av_fifo_write(ifp->frame_queue, &tmp, 1); if (ret < 0) @@ -2420,27 +2641,18 @@ int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference) return ret; } - ret = reap_filters(fg, 0); - if (ret < 0 && ret != AVERROR_EOF) { - av_log(fg, AV_LOG_ERROR, "Error while filtering: %s\n", av_err2str(ret)); + ret = fg->graph ? read_frames(fg, fgt, tmp, 1) : 0; + av_frame_free(&tmp); + if (ret < 0) return ret; - } - ret = configure_filtergraph(fg); + ret = configure_filtergraph(fg, fgt); if (ret < 0) { av_log(fg, AV_LOG_ERROR, "Error reinitializing filters!\n"); return ret; } } - if (keep_reference) { - ret = av_frame_ref(ifp->frame, frame); - if (ret < 0) - return ret; - } else - av_frame_move_ref(ifp->frame, frame); - frame = ifp->frame; - frame->pts = av_rescale_q(frame->pts, frame->time_base, ifp->time_base); frame->duration = av_rescale_q(frame->duration, frame->time_base, ifp->time_base); frame->time_base = ifp->time_base; @@ -2462,20 +2674,32 @@ int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference) return 0; } -int fg_transcode_step(FilterGraph *graph, InputStream **best_ist) +static int msg_process(FilterGraphPriv *fgp, FilterGraphThread *fgt, + AVFrame *frame) { - FilterGraphPriv *fgp = fgp_from_fg(graph); - int i, ret; - int nb_requests, nb_requests_max = 0; - InputStream *ist; + const enum FrameOpaque msg = (intptr_t)frame->opaque; + FilterGraph *fg = &fgp->fg; + int graph_eof = 0; + int ret; - if (!graph->graph) { - for (int i = 0; i < graph->nb_inputs; i++) { - InputFilter *ifilter = graph->inputs[i]; + frame->opaque = NULL; + av_assert0(msg > 0); + av_assert0(msg == FRAME_OPAQUE_SEND_COMMAND || !frame->buf[0]); + + if (!fg->graph) { + // graph not configured yet, ignore all messages other than choosing + // the input to read from + if (msg != FRAME_OPAQUE_CHOOSE_INPUT) { + av_frame_unref(frame); + goto done; + } + + for (int i = 0; i < fg->nb_inputs; i++) { + InputFilter *ifilter = fg->inputs[i]; InputFilterPriv *ifp = ifp_from_ifilter(ifilter); - if (ifp->format < 0 && !ifp->eof) { - *best_ist = ifp->ist; - return 0; + if (ifp->format < 0 && !fgt->eof_in[i]) { + frame->opaque = (void*)(intptr_t)(i + 1); + goto done; } } @@ -2486,16 +2710,310 @@ int fg_transcode_step(FilterGraph *graph, InputStream **best_ist) return AVERROR_BUG; } - *best_ist = NULL; - ret = avfilter_graph_request_oldest(graph->graph); - if (ret >= 0) - return reap_filters(graph, 0); + if (msg == FRAME_OPAQUE_SEND_COMMAND) { + FilterCommand *fc = (FilterCommand*)frame->buf[0]->data; + send_command(fg, fc->time, fc->target, fc->command, fc->arg, fc->all_filters); + av_frame_unref(frame); + goto done; + } - if (ret == AVERROR_EOF) { - reap_filters(graph, 1); - for (int i = 0; i < graph->nb_outputs; i++) { - OutputFilter *ofilter = graph->outputs[i]; - OutputFilterPriv *ofp = ofp_from_ofilter(ofilter); + if (msg == FRAME_OPAQUE_CHOOSE_INPUT) { + ret = avfilter_graph_request_oldest(fg->graph); + + graph_eof = ret == AVERROR_EOF; + + if (ret == AVERROR(EAGAIN)) { + frame->opaque = (void*)(intptr_t)(choose_input(fg, fgt) + 1); + goto done; + } else if (ret < 0 && !graph_eof) + return ret; + } + + ret = read_frames(fg, fgt, frame, 0); + if (ret < 0) { + av_log(fg, AV_LOG_ERROR, "Error sending filtered frames for encoding\n"); + return ret; + } + + if (graph_eof) + return AVERROR_EOF; + + // signal to the main thread that we are done processing the message +done: + ret = tq_send(fgp->queue_out, fg->nb_outputs, frame); + if (ret < 0) { + if (ret != AVERROR_EOF) + av_log(fg, AV_LOG_ERROR, "Error communicating with the main thread\n"); + return ret; + } + + return 0; +} + +static void fg_thread_set_name(const FilterGraph *fg) +{ + char name[16]; + if (filtergraph_is_simple(fg)) { + OutputStream *ost = fg->outputs[0]->ost; + snprintf(name, sizeof(name), "%cf#%d:%d", + av_get_media_type_string(ost->type)[0], + ost->file_index, ost->index); + } else { + snprintf(name, sizeof(name), "fc%d", fg->index); + } + + ff_thread_setname(name); +} + +static void fg_thread_uninit(FilterGraphThread *fgt) +{ + if (fgt->frame_queue_out) { + AVFrame *frame; + while (av_fifo_read(fgt->frame_queue_out, &frame, 1) >= 0) + av_frame_free(&frame); + av_fifo_freep2(&fgt->frame_queue_out); + } + + av_frame_free(&fgt->frame); + av_freep(&fgt->eof_in); + av_freep(&fgt->eof_out); + + memset(fgt, 0, sizeof(*fgt)); +} + +static int fg_thread_init(FilterGraphThread *fgt, const FilterGraph *fg) +{ + memset(fgt, 0, sizeof(*fgt)); + + fgt->frame = av_frame_alloc(); + if (!fgt->frame) + goto fail; + + fgt->eof_in = av_calloc(fg->nb_inputs, sizeof(*fgt->eof_in)); + if (!fgt->eof_in) + goto fail; + + fgt->eof_out = av_calloc(fg->nb_outputs, sizeof(*fgt->eof_out)); + if (!fgt->eof_out) + goto fail; + + fgt->frame_queue_out = av_fifo_alloc2(1, sizeof(AVFrame*), AV_FIFO_FLAG_AUTO_GROW); + if (!fgt->frame_queue_out) + goto fail; + + return 0; + +fail: + fg_thread_uninit(fgt); + return AVERROR(ENOMEM); +} + +static void *filter_thread(void *arg) +{ + FilterGraphPriv *fgp = arg; + FilterGraph *fg = &fgp->fg; + + FilterGraphThread fgt; + int ret = 0, input_status = 0; + + ret = fg_thread_init(&fgt, fg); + if (ret < 0) + goto finish; + + fg_thread_set_name(fg); + + // if we have all input parameters the graph can now be configured + if (ifilter_has_all_input_formats(fg)) { + ret = configure_filtergraph(fg, &fgt); + if (ret < 0) { + av_log(fg, AV_LOG_ERROR, "Error configuring filter graph: %s\n", + av_err2str(ret)); + goto finish; + } + } + + while (1) { + InputFilter *ifilter; + InputFilterPriv *ifp; + enum FrameOpaque o; + int input_idx, eof_frame; + + input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame); + if (input_idx < 0 || + (input_idx == fg->nb_inputs && input_status < 0)) { + av_log(fg, AV_LOG_VERBOSE, "Filtering thread received EOF\n"); + break; + } + + o = (intptr_t)fgt.frame->opaque; + + // message on the control stream + if (input_idx == fg->nb_inputs) { + ret = msg_process(fgp, &fgt, fgt.frame); + if (ret < 0) + goto finish; + + continue; + } + + // we received an input frame or EOF + ifilter = fg->inputs[input_idx]; + ifp = ifp_from_ifilter(ifilter); + eof_frame = input_status >= 0 && o == FRAME_OPAQUE_EOF; + if (ifp->type_src == AVMEDIA_TYPE_SUBTITLE) { + int hb_frame = input_status >= 0 && o == FRAME_OPAQUE_SUB_HEARTBEAT; + ret = sub2video_frame(ifilter, (fgt.frame->buf[0] || hb_frame) ? fgt.frame : NULL); + } else if (input_status >= 0 && fgt.frame->buf[0]) { + ret = send_frame(fg, &fgt, ifilter, fgt.frame); + } else { + int64_t pts = input_status >= 0 ? fgt.frame->pts : AV_NOPTS_VALUE; + AVRational tb = input_status >= 0 ? fgt.frame->time_base : (AVRational){ 1, 1 }; + ret = send_eof(&fgt, ifilter, pts, tb); + } + av_frame_unref(fgt.frame); + if (ret < 0) + break; + + if (eof_frame) { + // an EOF frame is immediately followed by sender closing + // the corresponding stream, so retrieve that event + input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame); + av_assert0(input_status == AVERROR_EOF && input_idx == ifp->index); + } + + // signal to the main thread that we are done + ret = tq_send(fgp->queue_out, fg->nb_outputs, fgt.frame); + if (ret < 0) { + if (ret == AVERROR_EOF) + break; + + av_log(fg, AV_LOG_ERROR, "Error communicating with the main thread\n"); + goto finish; + } + } + +finish: + // EOF is normal termination + if (ret == AVERROR_EOF) + ret = 0; + + for (int i = 0; i <= fg->nb_inputs; i++) + tq_receive_finish(fgp->queue_in, i); + for (int i = 0; i <= fg->nb_outputs; i++) + tq_send_finish(fgp->queue_out, i); + + fg_thread_uninit(&fgt); + + av_log(fg, AV_LOG_VERBOSE, "Terminating filtering thread\n"); + + return (void*)(intptr_t)ret; +} + +static int thread_send_frame(FilterGraphPriv *fgp, InputFilter *ifilter, + AVFrame *frame, enum FrameOpaque type) +{ + InputFilterPriv *ifp = ifp_from_ifilter(ifilter); + int output_idx, ret; + + if (ifp->eof) { + av_frame_unref(frame); + return AVERROR_EOF; + } + + frame->opaque = (void*)(intptr_t)type; + + ret = tq_send(fgp->queue_in, ifp->index, frame); + if (ret < 0) { + ifp->eof = 1; + av_frame_unref(frame); + return ret; + } + + if (type == FRAME_OPAQUE_EOF) + tq_send_finish(fgp->queue_in, ifp->index); + + // wait for the frame to be processed + ret = tq_receive(fgp->queue_out, &output_idx, frame); + av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF); + + return ret; +} + +int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference) +{ + FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph); + int ret; + + if (keep_reference) { + ret = av_frame_ref(fgp->frame, frame); + if (ret < 0) + return ret; + } else + av_frame_move_ref(fgp->frame, frame); + + return thread_send_frame(fgp, ifilter, fgp->frame, 0); +} + +int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb) +{ + FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph); + int ret; + + fgp->frame->pts = pts; + fgp->frame->time_base = tb; + + ret = thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_EOF); + + return ret == AVERROR_EOF ? 0 : ret; +} + +void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb) +{ + FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph); + + fgp->frame->pts = pts; + fgp->frame->time_base = tb; + + thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_SUB_HEARTBEAT); +} + +int fg_transcode_step(FilterGraph *graph, InputStream **best_ist) +{ + FilterGraphPriv *fgp = fgp_from_fg(graph); + int ret, got_frames = 0; + + if (fgp->eof_in) + return AVERROR_EOF; + + // signal to the filtering thread to return all frames it can + av_assert0(!fgp->frame->buf[0]); + fgp->frame->opaque = (void*)(intptr_t)(best_ist ? + FRAME_OPAQUE_CHOOSE_INPUT : + FRAME_OPAQUE_REAP_FILTERS); + + ret = tq_send(fgp->queue_in, graph->nb_inputs, fgp->frame); + if (ret < 0) { + fgp->eof_in = 1; + goto finish; + } + + while (1) { + OutputFilter *ofilter; + OutputFilterPriv *ofp; + OutputStream *ost; + int output_idx; + + ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame); + + // EOF on the whole queue or the control stream + if (output_idx < 0 || + (ret < 0 && output_idx == graph->nb_outputs)) + goto finish; + + // EOF for a specific stream + if (ret < 0) { + ofilter = graph->outputs[output_idx]; + ofp = ofp_from_ofilter(ofilter); // we are finished and no frames were ever seen at this output, // at least initialize the encoder with a dummy frame @@ -2533,30 +3051,111 @@ int fg_transcode_step(FilterGraph *graph, InputStream **best_ist) av_frame_unref(frame); } - close_output_stream(ofilter->ost); - } - return 0; - } - if (ret != AVERROR(EAGAIN)) - return ret; - - for (i = 0; i < graph->nb_inputs; i++) { - InputFilter *ifilter = graph->inputs[i]; - InputFilterPriv *ifp = ifp_from_ifilter(ifilter); - - ist = ifp->ist; - if (input_files[ist->file_index]->eagain || ifp->eof) + close_output_stream(graph->outputs[output_idx]->ost); continue; - nb_requests = av_buffersrc_get_nb_failed_requests(ifp->filter); - if (nb_requests > nb_requests_max) { - nb_requests_max = nb_requests; - *best_ist = ist; } + + // request was fully processed by the filtering thread, + // return the input stream to read from, if needed + if (output_idx == graph->nb_outputs) { + int input_idx = (intptr_t)fgp->frame->opaque - 1; + av_assert0(input_idx <= graph->nb_inputs); + + if (best_ist) { + *best_ist = (input_idx >= 0 && input_idx < graph->nb_inputs) ? + ifp_from_ifilter(graph->inputs[input_idx])->ist : NULL; + + if (input_idx < 0 && !got_frames) { + for (int i = 0; i < graph->nb_outputs; i++) + graph->outputs[i]->ost->unavailable = 1; + } + } + break; + } + + // got a frame from the filtering thread, send it for encoding + ofilter = graph->outputs[output_idx]; + ost = ofilter->ost; + ofp = ofp_from_ofilter(ofilter); + + if (ost->finished) { + av_frame_unref(fgp->frame); + tq_receive_finish(fgp->queue_out, output_idx); + continue; + } + + if (fgp->frame->pts != AV_NOPTS_VALUE) { + ofilter->last_pts = av_rescale_q(fgp->frame->pts, + fgp->frame->time_base, + AV_TIME_BASE_Q); + } + + ret = enc_frame(ost, fgp->frame); + av_frame_unref(fgp->frame); + if (ret < 0) + goto finish; + + ofp->got_frame = 1; + got_frames = 1; } - if (!*best_ist) - for (i = 0; i < graph->nb_outputs; i++) - graph->outputs[i]->ost->unavailable = 1; +finish: + if (ret < 0) { + fgp->eof_in = 1; + for (int i = 0; i < graph->nb_outputs; i++) + close_output_stream(graph->outputs[i]->ost); + } - return 0; + return ret; +} + +int reap_filters(FilterGraph *fg, int flush) +{ + return fg_transcode_step(fg, NULL); +} + +void fg_send_command(FilterGraph *fg, double time, const char *target, + const char *command, const char *arg, int all_filters) +{ + FilterGraphPriv *fgp = fgp_from_fg(fg); + AVBufferRef *buf; + FilterCommand *fc; + int output_idx, ret; + + if (!fgp->queue_in) + return; + + fc = av_mallocz(sizeof(*fc)); + if (!fc) + return; + + buf = av_buffer_create((uint8_t*)fc, sizeof(*fc), filter_command_free, NULL, 0); + if (!buf) { + av_freep(&fc); + return; + } + + fc->target = av_strdup(target); + fc->command = av_strdup(command); + fc->arg = av_strdup(arg); + if (!fc->target || !fc->command || !fc->arg) { + av_buffer_unref(&buf); + return; + } + + fc->time = time; + fc->all_filters = all_filters; + + fgp->frame->buf[0] = buf; + fgp->frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_SEND_COMMAND; + + ret = tq_send(fgp->queue_in, fg->nb_inputs, fgp->frame); + if (ret < 0) { + av_frame_unref(fgp->frame); + return; + } + + // wait for the frame to be processed + ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame); + av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF); }