diff mbox series

[FFmpeg-devel,14/24] fftools/ffmpeg_mux: move bitstream filtering to the muxer thread

Message ID 20231104092125.10213-15-anton@khirnov.net
State New
Headers show
Series [FFmpeg-devel,01/24] lavf/mux: do not apply max_interleave_delta to subtitles | expand

Commit Message

Anton Khirnov Nov. 4, 2023, 7:56 a.m. UTC
This will be the appropriate place for it after the rest of transcoding
is switched to a threaded architecture.
---
 fftools/ffmpeg_mux.c | 112 ++++++++++++++++++++++++++-----------------
 1 file changed, 67 insertions(+), 45 deletions(-)

Comments

James Almer Nov. 4, 2023, 1:39 p.m. UTC | #1
On 11/4/2023 4:56 AM, Anton Khirnov wrote:
> This will be the appropriate place for it after the rest of transcoding
> is switched to a threaded architecture.
> ---
>   fftools/ffmpeg_mux.c | 112 ++++++++++++++++++++++++++-----------------
>   1 file changed, 67 insertions(+), 45 deletions(-)
> 
> diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c
> index 82352b7981..57fb8a8413 100644
> --- a/fftools/ffmpeg_mux.c
> +++ b/fftools/ffmpeg_mux.c
> @@ -207,6 +207,67 @@ static int sync_queue_process(Muxer *mux, OutputStream *ost, AVPacket *pkt, int
>       return 0;
>   }
>   
> +/* apply the output bitstream filters */
> +static int mux_packet_filter(Muxer *mux, OutputStream *ost,
> +                             AVPacket *pkt, int *stream_eof)
> +{
> +    MuxStream *ms = ms_from_ost(ost);
> +    const char *err_msg;
> +    int ret = 0;
> +
> +    if (ms->bsf_ctx) {
> +        int bsf_eof = 0;
> +
> +        if (pkt)
> +            av_packet_rescale_ts(pkt, pkt->time_base, ms->bsf_ctx->time_base_in);
> +
> +        ret = av_bsf_send_packet(ms->bsf_ctx, pkt);
> +        if (ret < 0) {

Unrelated to this patch, but this should probably include a comment 
about the reason we're not checking for EAGAIN, like we do for 
avcodec_send_packet().

I'll send a patch for that after this is pushed, so you don't have to 
solve conflicts after a rebase.

> +            err_msg = "submitting a packet for bitstream filtering";
> +            goto fail;
> +        }
> +
> +        while (!bsf_eof) {
> +            ret = av_bsf_receive_packet(ms->bsf_ctx, ms->bsf_pkt);
> +            if (ret == AVERROR(EAGAIN))
> +                return 0;
> +            else if (ret == AVERROR_EOF)
> +                bsf_eof = 1;
> +            else if (ret < 0) {
> +                av_log(ost, AV_LOG_ERROR,
> +                       "Error applying bitstream filters to a packet: %s",
> +                       av_err2str(ret));
> +                if (exit_on_error)
> +                    return ret;
> +                continue;
> +            }
> +
> +            if (!bsf_eof)
> +                ms->bsf_pkt->time_base = ms->bsf_ctx->time_base_out;
> +
> +            ret = sync_queue_process(mux, ost, bsf_eof ? NULL : ms->bsf_pkt, stream_eof);
> +            if (ret < 0)
> +                goto mux_fail;
> +        }
> +        *stream_eof = 1;
> +        return AVERROR_EOF;
> +    } else {
> +        ret = sync_queue_process(mux, ost, pkt, stream_eof);
> +        if (ret < 0)
> +            goto mux_fail;
> +    }
> +
> +    return 0;
> +
> +mux_fail:
> +    err_msg = "submitting a packet to the muxer";
> +
> +fail:
> +    if (ret != AVERROR_EOF)
> +        av_log(ost, AV_LOG_ERROR, "Error %s: %s\n", err_msg, av_err2str(ret));
> +    return ret;
> +}
> +
>   static void thread_set_name(OutputFile *of)
>   {
>       char name[16];
> @@ -263,7 +324,7 @@ static void *muxer_thread(void *arg)
>           }
>   
>           ost = of->streams[stream_idx];
> -        ret = sync_queue_process(mux, ost, ret < 0 ? NULL : mt.pkt, &stream_eof);
> +        ret = mux_packet_filter(mux, ost, ret < 0 ? NULL : mt.pkt, &stream_eof);
>           av_packet_unref(mt.pkt);
>           if (ret == AVERROR_EOF) {
>               if (stream_eof) {
> @@ -376,58 +437,19 @@ static int submit_packet(Muxer *mux, AVPacket *pkt, OutputStream *ost)
>   int of_output_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
>   {
>       Muxer *mux = mux_from_of(of);
> -    MuxStream *ms = ms_from_ost(ost);
> -    const char *err_msg;
>       int ret = 0;
>   
>       if (pkt && pkt->dts != AV_NOPTS_VALUE)
>           ost->last_mux_dts = av_rescale_q(pkt->dts, pkt->time_base, AV_TIME_BASE_Q);
>   
> -    /* apply the output bitstream filters */
> -    if (ms->bsf_ctx) {
> -        int bsf_eof = 0;
> -
> -        if (pkt)
> -            av_packet_rescale_ts(pkt, pkt->time_base, ms->bsf_ctx->time_base_in);
> -
> -        ret = av_bsf_send_packet(ms->bsf_ctx, pkt);
> -        if (ret < 0) {
> -            err_msg = "submitting a packet for bitstream filtering";
> -            goto fail;
> -        }
> -
> -        while (!bsf_eof) {
> -            ret = av_bsf_receive_packet(ms->bsf_ctx, ms->bsf_pkt);
> -            if (ret == AVERROR(EAGAIN))
> -                return 0;
> -            else if (ret == AVERROR_EOF)
> -                bsf_eof = 1;
> -            else if (ret < 0) {
> -                err_msg = "applying bitstream filters to a packet";
> -                goto fail;
> -            }
> -
> -            if (!bsf_eof)
> -                ms->bsf_pkt->time_base = ms->bsf_ctx->time_base_out;
> -
> -            ret = submit_packet(mux, bsf_eof ? NULL : ms->bsf_pkt, ost);
> -            if (ret < 0)
> -                goto mux_fail;
> -        }
> -    } else {
> -        ret = submit_packet(mux, pkt, ost);
> -        if (ret < 0)
> -            goto mux_fail;
> +    ret = submit_packet(mux, pkt, ost);
> +    if (ret < 0) {
> +        av_log(ost, AV_LOG_ERROR, "Error submitting a packet to the muxer: %s",
> +               av_err2str(ret));
> +        return ret;
>       }
>   
>       return 0;
> -
> -mux_fail:
> -    err_msg = "submitting a packet to the muxer";
> -
> -fail:
> -    av_log(ost, AV_LOG_ERROR, "Error %s\n", err_msg);
> -    return exit_on_error ? ret : 0;
>   }
>   
>   int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts)
Anton Khirnov Nov. 9, 2023, 11:41 a.m. UTC | #2
Quoting James Almer (2023-11-04 14:39:44)
> On 11/4/2023 4:56 AM, Anton Khirnov wrote:
> > This will be the appropriate place for it after the rest of transcoding
> > is switched to a threaded architecture.
> > ---
> >   fftools/ffmpeg_mux.c | 112 ++++++++++++++++++++++++++-----------------
> >   1 file changed, 67 insertions(+), 45 deletions(-)
> > 
> > diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c
> > index 82352b7981..57fb8a8413 100644
> > --- a/fftools/ffmpeg_mux.c
> > +++ b/fftools/ffmpeg_mux.c
> > @@ -207,6 +207,67 @@ static int sync_queue_process(Muxer *mux, OutputStream *ost, AVPacket *pkt, int
> >       return 0;
> >   }
> >   
> > +/* apply the output bitstream filters */
> > +static int mux_packet_filter(Muxer *mux, OutputStream *ost,
> > +                             AVPacket *pkt, int *stream_eof)
> > +{
> > +    MuxStream *ms = ms_from_ost(ost);
> > +    const char *err_msg;
> > +    int ret = 0;
> > +
> > +    if (ms->bsf_ctx) {
> > +        int bsf_eof = 0;
> > +
> > +        if (pkt)
> > +            av_packet_rescale_ts(pkt, pkt->time_base, ms->bsf_ctx->time_base_in);
> > +
> > +        ret = av_bsf_send_packet(ms->bsf_ctx, pkt);
> > +        if (ret < 0) {
> 
> Unrelated to this patch, but this should probably include a comment 
> about the reason we're not checking for EAGAIN, like we do for 
> avcodec_send_packet().

Isn't the situation pretty much the same here - seeing EAGAIN would be a
bug?
James Almer Nov. 9, 2023, 11:47 a.m. UTC | #3
On 11/9/2023 8:41 AM, Anton Khirnov wrote:
> Quoting James Almer (2023-11-04 14:39:44)
>> On 11/4/2023 4:56 AM, Anton Khirnov wrote:
>>> This will be the appropriate place for it after the rest of transcoding
>>> is switched to a threaded architecture.
>>> ---
>>>    fftools/ffmpeg_mux.c | 112 ++++++++++++++++++++++++++-----------------
>>>    1 file changed, 67 insertions(+), 45 deletions(-)
>>>
>>> diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c
>>> index 82352b7981..57fb8a8413 100644
>>> --- a/fftools/ffmpeg_mux.c
>>> +++ b/fftools/ffmpeg_mux.c
>>> @@ -207,6 +207,67 @@ static int sync_queue_process(Muxer *mux, OutputStream *ost, AVPacket *pkt, int
>>>        return 0;
>>>    }
>>>    
>>> +/* apply the output bitstream filters */
>>> +static int mux_packet_filter(Muxer *mux, OutputStream *ost,
>>> +                             AVPacket *pkt, int *stream_eof)
>>> +{
>>> +    MuxStream *ms = ms_from_ost(ost);
>>> +    const char *err_msg;
>>> +    int ret = 0;
>>> +
>>> +    if (ms->bsf_ctx) {
>>> +        int bsf_eof = 0;
>>> +
>>> +        if (pkt)
>>> +            av_packet_rescale_ts(pkt, pkt->time_base, ms->bsf_ctx->time_base_in);
>>> +
>>> +        ret = av_bsf_send_packet(ms->bsf_ctx, pkt);
>>> +        if (ret < 0) {
>>
>> Unrelated to this patch, but this should probably include a comment
>> about the reason we're not checking for EAGAIN, like we do for
>> avcodec_send_packet().
> 
> Isn't the situation pretty much the same here - seeing EAGAIN would be a
> bug?

Yes, and a similar comment here is a good idea as people may use 
ffmpeg.c as a reference implementation for our APIs.
Anton Khirnov Nov. 9, 2023, noon UTC | #4
Quoting James Almer (2023-11-09 12:47:56)
> On 11/9/2023 8:41 AM, Anton Khirnov wrote:
> > Quoting James Almer (2023-11-04 14:39:44)
> >> On 11/4/2023 4:56 AM, Anton Khirnov wrote:
> >>> This will be the appropriate place for it after the rest of transcoding
> >>> is switched to a threaded architecture.
> >>> ---
> >>>    fftools/ffmpeg_mux.c | 112 ++++++++++++++++++++++++++-----------------
> >>>    1 file changed, 67 insertions(+), 45 deletions(-)
> >>>
> >>> diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c
> >>> index 82352b7981..57fb8a8413 100644
> >>> --- a/fftools/ffmpeg_mux.c
> >>> +++ b/fftools/ffmpeg_mux.c
> >>> @@ -207,6 +207,67 @@ static int sync_queue_process(Muxer *mux, OutputStream *ost, AVPacket *pkt, int
> >>>        return 0;
> >>>    }
> >>>    
> >>> +/* apply the output bitstream filters */
> >>> +static int mux_packet_filter(Muxer *mux, OutputStream *ost,
> >>> +                             AVPacket *pkt, int *stream_eof)
> >>> +{
> >>> +    MuxStream *ms = ms_from_ost(ost);
> >>> +    const char *err_msg;
> >>> +    int ret = 0;
> >>> +
> >>> +    if (ms->bsf_ctx) {
> >>> +        int bsf_eof = 0;
> >>> +
> >>> +        if (pkt)
> >>> +            av_packet_rescale_ts(pkt, pkt->time_base, ms->bsf_ctx->time_base_in);
> >>> +
> >>> +        ret = av_bsf_send_packet(ms->bsf_ctx, pkt);
> >>> +        if (ret < 0) {
> >>
> >> Unrelated to this patch, but this should probably include a comment
> >> about the reason we're not checking for EAGAIN, like we do for
> >> avcodec_send_packet().
> > 
> > Isn't the situation pretty much the same here - seeing EAGAIN would be a
> > bug?
> 
> Yes, and a similar comment here is a good idea as people may use 
> ffmpeg.c as a reference implementation for our APIs.

Right, then certainly there should be a comment or maybe even a check
like for decoding.
diff mbox series

Patch

diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c
index 82352b7981..57fb8a8413 100644
--- a/fftools/ffmpeg_mux.c
+++ b/fftools/ffmpeg_mux.c
@@ -207,6 +207,67 @@  static int sync_queue_process(Muxer *mux, OutputStream *ost, AVPacket *pkt, int
     return 0;
 }
 
+/* apply the output bitstream filters */
+static int mux_packet_filter(Muxer *mux, OutputStream *ost,
+                             AVPacket *pkt, int *stream_eof)
+{
+    MuxStream *ms = ms_from_ost(ost);
+    const char *err_msg;
+    int ret = 0;
+
+    if (ms->bsf_ctx) {
+        int bsf_eof = 0;
+
+        if (pkt)
+            av_packet_rescale_ts(pkt, pkt->time_base, ms->bsf_ctx->time_base_in);
+
+        ret = av_bsf_send_packet(ms->bsf_ctx, pkt);
+        if (ret < 0) {
+            err_msg = "submitting a packet for bitstream filtering";
+            goto fail;
+        }
+
+        while (!bsf_eof) {
+            ret = av_bsf_receive_packet(ms->bsf_ctx, ms->bsf_pkt);
+            if (ret == AVERROR(EAGAIN))
+                return 0;
+            else if (ret == AVERROR_EOF)
+                bsf_eof = 1;
+            else if (ret < 0) {
+                av_log(ost, AV_LOG_ERROR,
+                       "Error applying bitstream filters to a packet: %s",
+                       av_err2str(ret));
+                if (exit_on_error)
+                    return ret;
+                continue;
+            }
+
+            if (!bsf_eof)
+                ms->bsf_pkt->time_base = ms->bsf_ctx->time_base_out;
+
+            ret = sync_queue_process(mux, ost, bsf_eof ? NULL : ms->bsf_pkt, stream_eof);
+            if (ret < 0)
+                goto mux_fail;
+        }
+        *stream_eof = 1;
+        return AVERROR_EOF;
+    } else {
+        ret = sync_queue_process(mux, ost, pkt, stream_eof);
+        if (ret < 0)
+            goto mux_fail;
+    }
+
+    return 0;
+
+mux_fail:
+    err_msg = "submitting a packet to the muxer";
+
+fail:
+    if (ret != AVERROR_EOF)
+        av_log(ost, AV_LOG_ERROR, "Error %s: %s\n", err_msg, av_err2str(ret));
+    return ret;
+}
+
 static void thread_set_name(OutputFile *of)
 {
     char name[16];
@@ -263,7 +324,7 @@  static void *muxer_thread(void *arg)
         }
 
         ost = of->streams[stream_idx];
-        ret = sync_queue_process(mux, ost, ret < 0 ? NULL : mt.pkt, &stream_eof);
+        ret = mux_packet_filter(mux, ost, ret < 0 ? NULL : mt.pkt, &stream_eof);
         av_packet_unref(mt.pkt);
         if (ret == AVERROR_EOF) {
             if (stream_eof) {
@@ -376,58 +437,19 @@  static int submit_packet(Muxer *mux, AVPacket *pkt, OutputStream *ost)
 int of_output_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
 {
     Muxer *mux = mux_from_of(of);
-    MuxStream *ms = ms_from_ost(ost);
-    const char *err_msg;
     int ret = 0;
 
     if (pkt && pkt->dts != AV_NOPTS_VALUE)
         ost->last_mux_dts = av_rescale_q(pkt->dts, pkt->time_base, AV_TIME_BASE_Q);
 
-    /* apply the output bitstream filters */
-    if (ms->bsf_ctx) {
-        int bsf_eof = 0;
-
-        if (pkt)
-            av_packet_rescale_ts(pkt, pkt->time_base, ms->bsf_ctx->time_base_in);
-
-        ret = av_bsf_send_packet(ms->bsf_ctx, pkt);
-        if (ret < 0) {
-            err_msg = "submitting a packet for bitstream filtering";
-            goto fail;
-        }
-
-        while (!bsf_eof) {
-            ret = av_bsf_receive_packet(ms->bsf_ctx, ms->bsf_pkt);
-            if (ret == AVERROR(EAGAIN))
-                return 0;
-            else if (ret == AVERROR_EOF)
-                bsf_eof = 1;
-            else if (ret < 0) {
-                err_msg = "applying bitstream filters to a packet";
-                goto fail;
-            }
-
-            if (!bsf_eof)
-                ms->bsf_pkt->time_base = ms->bsf_ctx->time_base_out;
-
-            ret = submit_packet(mux, bsf_eof ? NULL : ms->bsf_pkt, ost);
-            if (ret < 0)
-                goto mux_fail;
-        }
-    } else {
-        ret = submit_packet(mux, pkt, ost);
-        if (ret < 0)
-            goto mux_fail;
+    ret = submit_packet(mux, pkt, ost);
+    if (ret < 0) {
+        av_log(ost, AV_LOG_ERROR, "Error submitting a packet to the muxer: %s",
+               av_err2str(ret));
+        return ret;
     }
 
     return 0;
-
-mux_fail:
-    err_msg = "submitting a packet to the muxer";
-
-fail:
-    av_log(ost, AV_LOG_ERROR, "Error %s\n", err_msg);
-    return exit_on_error ? ret : 0;
 }
 
 int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts)