From patchwork Tue Sep 19 19:10:47 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Anton Khirnov X-Patchwork-Id: 43804 Delivered-To: ffmpegpatchwork2@gmail.com Received: by 2002:a05:6a20:a886:b0:149:dfde:5c0a with SMTP id ca6csp170220pzb; Tue, 19 Sep 2023 12:24:15 -0700 (PDT) X-Google-Smtp-Source: AGHT+IFAZQ55doEJ0f2zczo6+mOqYpLNoil0X6CzFtr9RinVCQaqqOP6YLVYjE41tZ4rx1R2QIf6 X-Received: by 2002:a17:907:3e15:b0:9ad:e403:239f with SMTP id hp21-20020a1709073e1500b009ade403239fmr374450ejc.16.1695151455547; Tue, 19 Sep 2023 12:24:15 -0700 (PDT) ARC-Seal: i=1; a=rsa-sha256; t=1695151455; cv=none; d=google.com; s=arc-20160816; b=Wnn9xvpGJoE12xeWbcMUegXabHky3jByjGvnGCeN6snP7PsCiRcTYIZ0+lWhz3B5+d vsMKE73EvzPYGKztfAhvhOFmLvWFugjJ8Pg0e/emPXzOO5fsjtR4gW311FGxbZSBtUnW DzEDvS9Jd9Nh0unQSPwswom5yVxfWpGwmEt4cwhbbjzIrvKhhGs743GXmemkbL0xU8QN i0SIa+JwczZznWj8w9PtQsgxqjC7GsoLnDaEjmWB4IFw7jHCOGDGz3rcodEq0tPLJrF+ Dud1+2Ln8nxKAi5WnXJWnaZs2QurYRilsxCqD2jgHJxwMSX75AFVbJ+Aa6sO3rYF0h4T gjXw== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:content-transfer-encoding:reply-to:list-subscribe :list-help:list-post:list-archive:list-unsubscribe:list-id :precedence:subject:mime-version:references:in-reply-to:message-id :date:to:from:delivered-to; bh=Z0CnGdIRHUGx3kWASAuZehnlMvteLzKC8+M+7C44BbU=; fh=YOA8vD9MJZuwZ71F/05pj6KdCjf6jQRmzLS+CATXUQk=; b=Yz86A7mzOOXJxI58aLcUiGG/OKs6keFA1XH2j2MNZubVoEiFkg0b5C0F07HyCYZVIe BJj7pd4pjBNUBSrRIHh3tKnnMzcmb3GU5YKiXz2rJRgZMcp2Zn8CvM88BCF31yUXEDRp Q7GnLSzNNKpXwLbbKBP3rzsRoGw+kSzsDgZ4E5epRlhcQ26dlwbir+Q8cH7txgb70RaW +hYDFIRESCwKYN7wHLFibO33Ji4JaqXD1fRe80fizWMJoLkBdKR1jlB27qwBBEuiMARZ eDikNL82kWn4946yMSo9o531V0Fk172SMlWiBMbk5XM6Y2Tvj9Zo9FjGfJiNerlNZY14 enZg== ARC-Authentication-Results: i=1; mx.google.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org. [79.124.17.100]) by mx.google.com with ESMTP id q4-20020a170906940400b0099a1f78de48si10589250ejx.638.2023.09.19.12.24.15; Tue, 19 Sep 2023 12:24:15 -0700 (PDT) Received-SPF: pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) client-ip=79.124.17.100; Authentication-Results: mx.google.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 8057A68C9AB; Tue, 19 Sep 2023 22:21:41 +0300 (EEST) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from mail1.khirnov.net (quelana.khirnov.net [94.230.150.81]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id 35B9768C992 for ; Tue, 19 Sep 2023 22:21:40 +0300 (EEST) Received: from localhost (mail1.khirnov.net [IPv6:::1]) by mail1.khirnov.net (Postfix) with ESMTP id E115310 for ; Tue, 19 Sep 2023 21:21:39 +0200 (CEST) Received: from mail1.khirnov.net ([IPv6:::1]) by localhost (mail1.khirnov.net [IPv6:::1]) (amavis, port 10024) with ESMTP id 963CkFMvvLCv for ; Tue, 19 Sep 2023 21:21:34 +0200 (CEST) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:2a00:c500:561:201::7]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256 client-signature RSA-PSS (2048 bits) client-digest SHA256) (Client CN "libav.khirnov.net", Issuer "smtp.khirnov.net SMTP CA" (verified OK)) by mail1.khirnov.net (Postfix) with ESMTPS id 043795202 for ; Tue, 19 Sep 2023 21:20:43 +0200 (CEST) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:::1]) by libav.khirnov.net (Postfix) with ESMTP id EC1123A150F for ; Tue, 19 Sep 2023 21:20:42 +0200 (CEST) From: Anton Khirnov To: ffmpeg-devel@ffmpeg.org Date: Tue, 19 Sep 2023 21:10:47 +0200 Message-Id: <20230919191044.18873-21-anton@khirnov.net> X-Mailer: git-send-email 2.40.1 In-Reply-To: <20230919191044.18873-1-anton@khirnov.net> References: <20230919191044.18873-1-anton@khirnov.net> MIME-Version: 1.0 Subject: [FFmpeg-devel] [PATCH 20/27] WIP: fftools/ffmpeg_enc: move encoding to a separate thread X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" X-TUID: 1/Xg8uImBUtv As for the analogous decoding change, this is only a preparatory step to a fully threaded architecture and does not yet make encoding truly parallel. The main thread will currently submit a frame and wait until it has been fully processed by the encoder before moving on. That will change in future commits after filters are moved to threads and a thread-aware scheduler is added. WIP: resolve all // XXX left in the code Also, if an encoder with a sync queue receives EOF it will terminate after processing everything it currently has, even though the sync queue might still be triggered by other threads. --- fftools/ffmpeg_enc.c | 384 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 340 insertions(+), 44 deletions(-) diff --git a/fftools/ffmpeg_enc.c b/fftools/ffmpeg_enc.c index d8d7c3416d..ea542173c5 100644 --- a/fftools/ffmpeg_enc.c +++ b/fftools/ffmpeg_enc.c @@ -20,6 +20,7 @@ #include #include "ffmpeg.h" +#include "thread_queue.h" #include "libavutil/avassert.h" #include "libavutil/avstring.h" @@ -43,6 +44,7 @@ struct Encoder { // packet for receiving encoded output AVPacket *pkt; + AVFrame *sub_frame; // combined size of all the packets received from the encoder uint64_t data_size; @@ -51,8 +53,48 @@ struct Encoder { uint64_t packets_encoded; int opened; + int finished; + + pthread_t thread; + /** + * Queue for sending frames from the main thread to + * the encoder thread. + */ + ThreadQueue *queue_in; + /** + * Queue for sending encoded packets from the encoder thread + * to the main thread. + * + * An empty packet is sent to signal that a previously sent + * frame has been fully processed. + */ + ThreadQueue *queue_out; }; +// data that is local to the decoder thread and not visible outside of it +typedef struct EncoderThread { + AVFrame *frame; + AVPacket *pkt; +} EncoderThread; + +static int enc_thread_stop(Encoder *e) +{ + void *ret; + + if (!e->queue_in) + return 0; + + tq_send_finish(e->queue_in, 0); + tq_receive_finish(e->queue_out, 0); + + pthread_join(e->thread, &ret); + + tq_free(&e->queue_in); + tq_free(&e->queue_out); + + return (int)(intptr_t)ret; +} + void enc_free(Encoder **penc) { Encoder *enc = *penc; @@ -60,7 +102,10 @@ void enc_free(Encoder **penc) if (!enc) return; + enc_thread_stop(enc); + av_frame_free(&enc->sq_frame); + av_frame_free(&enc->sub_frame); av_packet_free(&enc->pkt); @@ -77,6 +122,12 @@ int enc_alloc(Encoder **penc, const AVCodec *codec) if (!enc) return AVERROR(ENOMEM); + if (codec->type == AVMEDIA_TYPE_SUBTITLE) { + enc->sub_frame = av_frame_alloc(); + if (!enc->sub_frame) + goto fail; + } + enc->pkt = av_packet_alloc(); if (!enc->pkt) goto fail; @@ -165,6 +216,52 @@ static int set_encoder_id(OutputFile *of, OutputStream *ost) return 0; } +static void *encoder_thread(void *arg); + +static int enc_thread_start(OutputStream *ost) +{ + Encoder *e = ost->enc; + ObjPool *op; + int ret = 0; + + op = objpool_alloc_frames(); + if (!op) + return AVERROR(ENOMEM); + + e->queue_in = tq_alloc(1, 1, op, frame_move); + if (!e->queue_in) { + objpool_free(&op); + return AVERROR(ENOMEM); + } + + op = objpool_alloc_packets(); + if (!op) + goto fail; + + e->queue_out = tq_alloc(1, 4, op, pkt_move); + if (!e->queue_out) { + objpool_free(&op); + goto fail; + } + + ret = pthread_create(&e->thread, NULL, encoder_thread, ost); + if (ret) { + ret = AVERROR(ret); + av_log(ost, AV_LOG_ERROR, "pthread_create() failed: %s\n", + av_err2str(ret)); + goto fail; + } + + return 0; +fail: + if (ret >= 0) + ret = AVERROR(ENOMEM); + + tq_free(&e->queue_in); + tq_free(&e->queue_out); + return ret; +} + int enc_open(OutputStream *ost, const AVFrame *frame) { InputStream *ist = ost->ist; @@ -387,6 +484,13 @@ int enc_open(OutputStream *ost, const AVFrame *frame) if (ost->st->time_base.num <= 0 || ost->st->time_base.den <= 0) ost->st->time_base = av_add_q(ost->enc_ctx->time_base, (AVRational){0, 1}); + ret = enc_thread_start(ost); + if (ret < 0) { + av_log(ost, AV_LOG_ERROR, "Error starting encoder thread: %s\n", + av_err2str(ret)); + return ret; + } + ret = of_stream_init(of, ost); if (ret < 0) return ret; @@ -400,19 +504,18 @@ static int check_recording_time(OutputStream *ost, int64_t ts, AVRational tb) if (of->recording_time != INT64_MAX && av_compare_ts(ts, tb, of->recording_time, AV_TIME_BASE_Q) >= 0) { - close_output_stream(ost); return 0; } return 1; } -int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub) +static int do_subtitle_out(OutputFile *of, OutputStream *ost, const AVSubtitle *sub, + AVPacket *pkt) { Encoder *e = ost->enc; int subtitle_out_max_size = 1024 * 1024; int subtitle_out_size, nb, i, ret; AVCodecContext *enc; - AVPacket *pkt = e->pkt; int64_t pts; if (sub->pts == AV_NOPTS_VALUE) { @@ -442,8 +545,9 @@ int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub) for (i = 0; i < nb; i++) { AVSubtitle local_sub = *sub; + // XXX if (!check_recording_time(ost, pts, AV_TIME_BASE_Q)) - return 0; + return AVERROR_EOF; ret = av_new_packet(pkt, subtitle_out_max_size); if (ret < 0) @@ -484,9 +588,11 @@ int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub) } pkt->dts = pkt->pts; - ret = of_output_packet(of, ost, pkt); - if (ret < 0) + ret = tq_send(e->queue_out, 0, pkt); + if (ret < 0) { + av_packet_unref(pkt); return ret; + } } return 0; @@ -624,11 +730,11 @@ static int update_video_stats(OutputStream *ost, const AVPacket *pkt, int write_ return 0; } -static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame) +static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame, + AVPacket *pkt) { Encoder *e = ost->enc; AVCodecContext *enc = ost->enc_ctx; - AVPacket *pkt = e->pkt; const char *type_desc = av_get_media_type_string(enc->codec_type); const char *action = frame ? "encode" : "flush"; int ret; @@ -678,11 +784,9 @@ static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame) if (ret == AVERROR(EAGAIN)) { av_assert0(frame); // should never happen during flushing return 0; - } else if (ret == AVERROR_EOF) { - ret = of_output_packet(of, ost, NULL); - return ret < 0 ? ret : AVERROR_EOF; } else if (ret < 0) { - av_log(ost, AV_LOG_ERROR, "%s encoding failed\n", type_desc); + if (ret != AVERROR_EOF) + av_log(ost, AV_LOG_ERROR, "%s encoding failed\n", type_desc); return ret; } @@ -706,33 +810,36 @@ static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame) av_ts2str(pkt->duration), av_ts2timestr(pkt->duration, &enc->time_base)); } - if ((ret = trigger_fix_sub_duration_heartbeat(ost, pkt)) < 0) { - av_log(NULL, AV_LOG_ERROR, - "Subtitle heartbeat logic failed in %s! (%s)\n", - __func__, av_err2str(ret)); - return ret; - } + // XXX + //if ((ret = trigger_fix_sub_duration_heartbeat(ost, pkt)) < 0) { + // av_log(NULL, AV_LOG_ERROR, + // "Subtitle heartbeat logic failed in %s! (%s)\n", + // __func__, av_err2str(ret)); + // return ret; + //} e->data_size += pkt->size; e->packets_encoded++; - ret = of_output_packet(of, ost, pkt); - if (ret < 0) + ret = tq_send(e->queue_out, 0, pkt); + if (ret < 0) { + av_packet_unref(pkt); return ret; + } } av_assert0(0); } static int submit_encode_frame(OutputFile *of, OutputStream *ost, - AVFrame *frame) + AVFrame *frame, AVPacket *pkt) { Encoder *e = ost->enc; int ret; if (ost->sq_idx_encode < 0) - return encode_frame(of, ost, frame); + return encode_frame(of, ost, frame, pkt); if (frame) { ret = av_frame_ref(e->sq_frame, frame); @@ -761,22 +868,22 @@ static int submit_encode_frame(OutputFile *of, OutputStream *ost, return (ret == AVERROR(EAGAIN)) ? 0 : ret; } - ret = encode_frame(of, ost, enc_frame); + ret = encode_frame(of, ost, enc_frame, pkt); if (enc_frame) av_frame_unref(enc_frame); if (ret < 0) { - if (ret == AVERROR_EOF) - close_output_stream(ost); + // XXX + //if (ret == AVERROR_EOF) + // close_output_stream(ost); return ret; } } } static int do_audio_out(OutputFile *of, OutputStream *ost, - AVFrame *frame) + AVFrame *frame, AVPacket *pkt) { AVCodecContext *enc = ost->enc_ctx; - int ret; if (!(enc->codec->capabilities & AV_CODEC_CAP_PARAM_CHANGE) && enc->ch_layout.nb_channels != frame->ch_layout.nb_channels) { @@ -785,11 +892,12 @@ static int do_audio_out(OutputFile *of, OutputStream *ost, return 0; } + // XXX if (!check_recording_time(ost, frame->pts, frame->time_base)) - return 0; + return AVERROR_EOF; - ret = submit_encode_frame(of, ost, frame); - return (ret < 0 && ret != AVERROR_EOF) ? ret : 0; + // XXX check EOF handling + return submit_encode_frame(of, ost, frame, pkt); } static enum AVPictureType forced_kf_apply(void *logctx, KeyframeForceCtx *kf, @@ -839,13 +947,14 @@ force_keyframe: } /* May modify/reset frame */ -static int do_video_out(OutputFile *of, OutputStream *ost, AVFrame *in_picture) +static int do_video_out(OutputFile *of, OutputStream *ost, + AVFrame *in_picture, AVPacket *pkt) { - int ret; AVCodecContext *enc = ost->enc_ctx; + // XXX if (!check_recording_time(ost, in_picture->pts, ost->enc_ctx->time_base)) - return 0; + return AVERROR_EOF; in_picture->quality = enc->global_quality; in_picture->pict_type = forced_kf_apply(ost, &ost->kf, enc->time_base, in_picture); @@ -857,26 +966,210 @@ static int do_video_out(OutputFile *of, OutputStream *ost, AVFrame *in_picture) } #endif - ret = submit_encode_frame(of, ost, in_picture); - return (ret == AVERROR_EOF) ? 0 : ret; + // XXX check EOF handling + return submit_encode_frame(of, ost, in_picture, pkt); +} + +static int frame_encode(OutputStream *ost, AVFrame *frame, AVPacket *pkt) +{ + OutputFile *of = output_files[ost->file_index]; + enum AVMediaType type = ost->type; + int ret; + + if (type == AVMEDIA_TYPE_SUBTITLE) { + // no flushing for subtitles + return frame ? + do_subtitle_out(of, ost, (AVSubtitle*)frame->buf[0]->data, pkt) : 0; + } + + // XXX + if (frame) { + ret = (type == AVMEDIA_TYPE_VIDEO) ? do_video_out(of, ost, frame, pkt) : + do_audio_out(of, ost, frame, pkt); + if (ret < 0) + return ret; + } + + return frame ? 0 : submit_encode_frame(of, ost, NULL, pkt); +} + +static void enc_thread_set_name(const OutputStream *ost) +{ + char name[16]; + snprintf(name, sizeof(name), "enc%d:%d:%s", ost->file_index, ost->index, + ost->enc_ctx->codec->name); + ff_thread_setname(name); +} + +static void enc_thread_uninit(EncoderThread *et) +{ + av_packet_free(&et->pkt); + av_frame_free(&et->frame); + + memset(et, 0, sizeof(*et)); +} + +static int enc_thread_init(EncoderThread *et) +{ + memset(et, 0, sizeof(*et)); + + et->frame = av_frame_alloc(); + if (!et->frame) + goto fail; + + et->pkt = av_packet_alloc(); + if (!et->pkt) + goto fail; + + return 0; + +fail: + enc_thread_uninit(et); + return AVERROR(ENOMEM); +} + +static void *encoder_thread(void *arg) +{ + OutputStream *ost = arg; + OutputFile *of = output_files[ost->file_index]; + Encoder *e = ost->enc; + EncoderThread et; + int ret = 0, input_status = 0; + + ret = enc_thread_init(&et); + if (ret < 0) + goto finish; + + enc_thread_set_name(ost); + + while (!input_status) { + int dummy; + + input_status = tq_receive(e->queue_in, &dummy, et.frame); + if (input_status < 0) + av_log(ost, AV_LOG_VERBOSE, "Encoder thread received EOF\n"); + + ret = frame_encode(ost, input_status >= 0 ? et.frame : NULL, et.pkt); + + av_packet_unref(et.pkt); + av_frame_unref(et.frame); + + if (ret < 0) { + if (ret == AVERROR_EOF) + av_log(ost, AV_LOG_VERBOSE, "Encoder returned EOF, finishing\n"); + else + av_log(ost, AV_LOG_ERROR, "Error encoding a frame: %s\n", + av_err2str(ret)); + break; + } + + // signal to the consumer thread that the frame was encoded + ret = tq_send(e->queue_out, 0, et.pkt); + if (ret < 0) { + if (ret != AVERROR_EOF) + av_log(ost, AV_LOG_ERROR, + "Error communicating with the main thread\n"); + break; + } + } + + // EOF is normal thread termination + if (ret == AVERROR_EOF) + ret = 0; + +finish: + if (ost->sq_idx_encode >= 0) + sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL)); + + tq_receive_finish(e->queue_in, 0); + tq_send_finish (e->queue_out, 0); + + enc_thread_uninit(&et); + + av_log(ost, AV_LOG_VERBOSE, "Terminating encoder thread\n"); + + return (void*)(intptr_t)ret; } int enc_frame(OutputStream *ost, AVFrame *frame) { OutputFile *of = output_files[ost->file_index]; - int ret; + Encoder *e = ost->enc; + int ret, thread_ret; ret = enc_open(ost, frame); if (ret < 0) return ret; - return ost->enc_ctx->codec_type == AVMEDIA_TYPE_VIDEO ? - do_video_out(of, ost, frame) : do_audio_out(of, ost, frame); + // thread already joined + // XXX check EOF handling + if (!e->queue_in) + return AVERROR_EOF; + + // send the frame/EOF to the encoder thread + if (frame) { + ret = tq_send(e->queue_in, 0, frame); + if (ret < 0) + goto finish; + } else + tq_send_finish(e->queue_in, 0); + + // retrieve all encoded data for the frame + while (1) { + int dummy; + + ret = tq_receive(e->queue_out, &dummy, e->pkt); + if (ret < 0) + break; + + // frame fully encoded + if (!e->pkt->data && !e->pkt->side_data_elems) + return 0; + + // process the encoded packet + ret = of_output_packet(of, ost, e->pkt); + if (ret < 0) + goto finish; + } + +finish: + thread_ret = enc_thread_stop(e); + if (thread_ret < 0) { + av_log(ost, AV_LOG_ERROR, "Encoder thread returned error: %s\n", + av_err2str(thread_ret)); + ret = err_merge(ret, thread_ret); + } + + if (ret < 0 && ret != AVERROR_EOF) + return ret; + + // signal EOF to the muxer + return of_output_packet(of, ost, NULL); +} + +int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub) +{ + Encoder *e = ost->enc; + AVFrame *f = e->sub_frame; + int ret; + + // XXX the queue for transferring data to the encoder thread runs + // on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put + // that inside the frame + // eventually, subtitles should be switched to use AVFrames natively + ret = subtitle_wrap_frame(f, sub, 1); + if (ret < 0) + return ret; + + ret = enc_frame(ost, f); + av_frame_unref(f); + + return ret; } int enc_flush(void) { - int ret; + int ret = 0; for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) { OutputFile *of = output_files[ost->file_index]; @@ -887,16 +1180,19 @@ int enc_flush(void) for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) { Encoder *e = ost->enc; AVCodecContext *enc = ost->enc_ctx; - OutputFile *of = output_files[ost->file_index]; + int err; if (!enc || !e->opened || (enc->codec_type != AVMEDIA_TYPE_VIDEO && enc->codec_type != AVMEDIA_TYPE_AUDIO)) continue; - ret = submit_encode_frame(of, ost, NULL); - if (ret != AVERROR_EOF) - return ret; + err = enc_frame(ost, NULL); + // XXX check EOF handling + if (err != AVERROR_EOF && ret < 0) + ret = err_merge(ret, err); + + av_assert0(!e->queue_in); } - return 0; + return ret; }