diff mbox

[FFmpeg-devel,v4] Improved the performance of 1 decode + N filter graphs and adaptive bitrate.

Message ID 1549924864-26466-1-git-send-email-shaofei.wang@intel.com
State Superseded
Headers show

Commit Message

Shaofei Wang Feb. 11, 2019, 10:41 p.m. UTC
It enabled multiple filter graph concurrency, which bring above about
4%~20% improvement in some 1:N scenarios by CPU or GPU acceleration

Below are some test cases and comparison as reference.
(Hardware platform: Intel(R) Core(TM) i7-6700 CPU @ 3.40GHz)
(Software: Intel iHD driver - 16.9.00100, CentOS 7)

For 1:N transcode by GPU acceleration with vaapi:
./ffmpeg -vaapi_device /dev/dri/renderD128 -hwaccel vaapi \
    -hwaccel_output_format vaapi \
    -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
    -vf "scale_vaapi=1280:720" -c:v h264_vaapi -f null /dev/null \
    -vf "scale_vaapi=720:480" -c:v h264_vaapi -f null /dev/null

    test results:
                2 encoders 5 encoders 10 encoders
    Improved       6.1%    6.9%       5.5%

For 1:N transcode by GPU acceleration with QSV:
./ffmpeg -hwaccel qsv -c:v h264_qsv \
    -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
    -vf "scale_qsv=1280:720:format=nv12" -c:v h264_qsv -f null /dev/null \
    -vf "scale_qsv=720:480:format=nv12" -c:v h264_qsv -f null /dev/null

    test results:
                2 encoders  5 encoders 10 encoders
    Improved       6%       4%         15%

For Intel GPU acceleration case, 1 decode to N scaling, by QSV:
./ffmpeg -hwaccel qsv -c:v h264_qsv \
    -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
    -vf "scale_qsv=1280:720:format=nv12,hwdownload" -pix_fmt nv12 -f null /dev/null \
    -vf "scale_qsv=720:480:format=nv12,hwdownload" -pix_fmt nv12 -f null /dev/null

    test results:
                2 scale  5 scale   10 scale
    Improved       12%     21%        21%

For CPU only 1 decode to N scaling:
./ffmpeg -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
    -vf "scale=1280:720" -pix_fmt nv12 -f null /dev/null \
    -vf "scale=720:480" -pix_fmt nv12 -f null /dev/null

    test results:
                2 scale  5 scale   10 scale
    Improved       25%    107%       148%

Signed-off-by: Wang, Shaofei <shaofei.wang@intel.com>
Reviewed-by: Zhao, Jun <jun.zhao@intel.com>
---
 fftools/ffmpeg.c        | 112 +++++++++++++++++++++++++++++++++++++++++++++---
 fftools/ffmpeg.h        |  14 ++++++
 fftools/ffmpeg_filter.c |   4 ++
 3 files changed, 124 insertions(+), 6 deletions(-)

Comments

Shaofei Wang Feb. 11, 2019, 10:05 a.m. UTC | #1
Code clean and remove the "-abr_pipeline" option, use the perf improved code path by default only if HAVE_THREAD enabled.
Michael Niedermayer Feb. 11, 2019, 10:22 p.m. UTC | #2
On Mon, Feb 11, 2019 at 05:41:04PM -0500, Shaofei Wang wrote:
> It enabled multiple filter graph concurrency, which bring above about
> 4%~20% improvement in some 1:N scenarios by CPU or GPU acceleration
> 
> Below are some test cases and comparison as reference.
> (Hardware platform: Intel(R) Core(TM) i7-6700 CPU @ 3.40GHz)
> (Software: Intel iHD driver - 16.9.00100, CentOS 7)
> 
> For 1:N transcode by GPU acceleration with vaapi:
> ./ffmpeg -vaapi_device /dev/dri/renderD128 -hwaccel vaapi \
>     -hwaccel_output_format vaapi \
>     -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
>     -vf "scale_vaapi=1280:720" -c:v h264_vaapi -f null /dev/null \
>     -vf "scale_vaapi=720:480" -c:v h264_vaapi -f null /dev/null
> 
>     test results:
>                 2 encoders 5 encoders 10 encoders
>     Improved       6.1%    6.9%       5.5%
> 
> For 1:N transcode by GPU acceleration with QSV:
> ./ffmpeg -hwaccel qsv -c:v h264_qsv \
>     -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
>     -vf "scale_qsv=1280:720:format=nv12" -c:v h264_qsv -f null /dev/null \
>     -vf "scale_qsv=720:480:format=nv12" -c:v h264_qsv -f null /dev/null
> 
>     test results:
>                 2 encoders  5 encoders 10 encoders
>     Improved       6%       4%         15%
> 
> For Intel GPU acceleration case, 1 decode to N scaling, by QSV:
> ./ffmpeg -hwaccel qsv -c:v h264_qsv \
>     -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
>     -vf "scale_qsv=1280:720:format=nv12,hwdownload" -pix_fmt nv12 -f null /dev/null \
>     -vf "scale_qsv=720:480:format=nv12,hwdownload" -pix_fmt nv12 -f null /dev/null
> 
>     test results:
>                 2 scale  5 scale   10 scale
>     Improved       12%     21%        21%
> 
> For CPU only 1 decode to N scaling:
> ./ffmpeg -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
>     -vf "scale=1280:720" -pix_fmt nv12 -f null /dev/null \
>     -vf "scale=720:480" -pix_fmt nv12 -f null /dev/null
> 
>     test results:
>                 2 scale  5 scale   10 scale
>     Improved       25%    107%       148%
> 
> Signed-off-by: Wang, Shaofei <shaofei.wang@intel.com>
> Reviewed-by: Zhao, Jun <jun.zhao@intel.com>
> ---
>  fftools/ffmpeg.c        | 112 +++++++++++++++++++++++++++++++++++++++++++++---
>  fftools/ffmpeg.h        |  14 ++++++
>  fftools/ffmpeg_filter.c |   4 ++
>  3 files changed, 124 insertions(+), 6 deletions(-)

breaks fate
make: *** [fate-lavf-mxf_d10] Error 1
make: *** [fate-filter-tremolo] Error 1
make: *** [fate-filter-chorus] Error 1
make: *** [tests/data/hls-list-append.m3u8] Error 1
make: *** [fate-filter-atrim-mixed] Error 1
make: *** [fate-filter-atrim-time] Error 1
make: *** [tests/data/live_last_endlist.m3u8] Error 1
make: *** [fate-filter-volume] Error 1
make: *** [fate-filter-join] Error 1
make: *** [fate-lavf-mxf] Error 1
make: *** [fate-swr-resample-s16p-44100-8000] Error 1
make: *** [fate-swr-resample-s16p-44100-2626] Error 1
...

[...]
Mark Thompson Feb. 12, 2019, 12:17 a.m. UTC | #3
On 11/02/2019 22:41, Shaofei Wang wrote:

Please avoid sending messages from the future - the list received this about thirteen hours before its supposed send time (received "Mon, 11 Feb 2019 11:42:09 +0200", sent "Mon, 11 Feb 2019 17:41:04 -0500").

Probably the sending machine or some intermediate has an incorrect time or time zone.

> It enabled multiple filter graph concurrency, which bring above about
> 4%~20% improvement in some 1:N scenarios by CPU or GPU acceleration
> 
> Below are some test cases and comparison as reference.
> (Hardware platform: Intel(R) Core(TM) i7-6700 CPU @ 3.40GHz)
> (Software: Intel iHD driver - 16.9.00100, CentOS 7)
> 
> For 1:N transcode by GPU acceleration with vaapi:
> ./ffmpeg -vaapi_device /dev/dri/renderD128 -hwaccel vaapi \
>     -hwaccel_output_format vaapi \
>     -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
>     -vf "scale_vaapi=1280:720" -c:v h264_vaapi -f null /dev/null \
>     -vf "scale_vaapi=720:480" -c:v h264_vaapi -f null /dev/null
> 
>     test results:
>                 2 encoders 5 encoders 10 encoders
>     Improved       6.1%    6.9%       5.5%
> 
> For 1:N transcode by GPU acceleration with QSV:
> ./ffmpeg -hwaccel qsv -c:v h264_qsv \
>     -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
>     -vf "scale_qsv=1280:720:format=nv12" -c:v h264_qsv -f null /dev/null \
>     -vf "scale_qsv=720:480:format=nv12" -c:v h264_qsv -f null /dev/null
> 
>     test results:
>                 2 encoders  5 encoders 10 encoders
>     Improved       6%       4%         15%
> 
> For Intel GPU acceleration case, 1 decode to N scaling, by QSV:
> ./ffmpeg -hwaccel qsv -c:v h264_qsv \
>     -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
>     -vf "scale_qsv=1280:720:format=nv12,hwdownload" -pix_fmt nv12 -f null /dev/null \
>     -vf "scale_qsv=720:480:format=nv12,hwdownload" -pix_fmt nv12 -f null /dev/null
> 
>     test results:
>                 2 scale  5 scale   10 scale
>     Improved       12%     21%        21%
> 
> For CPU only 1 decode to N scaling:
> ./ffmpeg -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
>     -vf "scale=1280:720" -pix_fmt nv12 -f null /dev/null \
>     -vf "scale=720:480" -pix_fmt nv12 -f null /dev/null
> 
>     test results:
>                 2 scale  5 scale   10 scale
>     Improved       25%    107%       148%
> 

Some numbers for more use-cases and platforms (with different architectures and core counts) would be a good idea if you intend to enable this by default.

Presumably it's a bit slower on less powerful machines with fewer cores when it makes many threads, but by how much?  Is that acceptable?

> Signed-off-by: Wang, Shaofei <shaofei.wang@intel.com>
> Reviewed-by: Zhao, Jun <jun.zhao@intel.com>
> ---
>  fftools/ffmpeg.c        | 112 +++++++++++++++++++++++++++++++++++++++++++++---
>  fftools/ffmpeg.h        |  14 ++++++
>  fftools/ffmpeg_filter.c |   4 ++
>  3 files changed, 124 insertions(+), 6 deletions(-)
> 
> diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
> index 544f1a1..67b1a2a 100644
> --- a/fftools/ffmpeg.c
> +++ b/fftools/ffmpeg.c
> @@ -1419,13 +1419,18 @@ static void finish_output_stream(OutputStream *ost)
>   *
>   * @return  0 for success, <0 for severe errors
>   */
> -static int reap_filters(int flush)
> +static int reap_filters(int flush, InputFilter * ifilter)
>  {
>      AVFrame *filtered_frame = NULL;
>      int i;
>  
> -    /* Reap all buffers present in the buffer sinks */
> +    /* Reap all buffers present in the buffer sinks or just reap specified
> +     * input filter buffer */
>      for (i = 0; i < nb_output_streams; i++) {
> +        if (ifilter) {
> +            if (ifilter != output_streams[i]->filter->graph->inputs[0])
> +                continue;
> +        }

No mixed declarations and code.

>          OutputStream *ost = output_streams[i];
>          OutputFile    *of = output_files[ost->file_index];
>          AVFilterContext *filter;

How carefully has this been audited to make sure that there are no data races?  The calls to init_output_stream() and do_video_out() both do /a lot/, and in particular they interact with the InputStream which might be shared with other threads (and indeed is in all your examples above).

> @@ -2179,7 +2184,8 @@ static int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame)
>              }
>          }
>  
> -        ret = reap_filters(1);
> +        ret = HAVE_THREADS ? reap_filters(1, ifilter) : reap_filters(1, NULL);
> +
>          if (ret < 0 && ret != AVERROR_EOF) {
>              av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n", av_err2str(ret));
>              return ret;
> @@ -2208,6 +2214,14 @@ static int ifilter_send_eof(InputFilter *ifilter, int64_t pts)
>  
>      ifilter->eof = 1;
>  
> +#if HAVE_THREADS
> +    ifilter->waited_frm = NULL;
> +    pthread_mutex_lock(&ifilter->process_mutex);
> +    ifilter->t_end = 1;
> +    pthread_cond_signal(&ifilter->process_cond);
> +    pthread_mutex_unlock(&ifilter->process_mutex);
> +    pthread_join(ifilter->f_thread, NULL);
> +#endif
>      if (ifilter->filter) {
>          ret = av_buffersrc_close(ifilter->filter, pts, AV_BUFFERSRC_FLAG_PUSH);
>          if (ret < 0)
> @@ -2252,12 +2266,95 @@ static int decode(AVCodecContext *avctx, AVFrame *frame, int *got_frame, AVPacke
>      return 0;
>  }
>  
> +#if HAVE_THREADS
> +static void *filter_pipeline(void *arg)
> +{
> +    InputFilter *fl = arg;
> +    AVFrame *frm;
> +    int ret;
> +    while(1) {
> +        pthread_mutex_lock(&fl->process_mutex);
> +        while (fl->waited_frm == NULL && !fl->t_end)
> +            pthread_cond_wait(&fl->process_cond, &fl->process_mutex);
> +        pthread_mutex_unlock(&fl->process_mutex);
> +
> +        if (fl->t_end) break;
> +
> +        frm = fl->waited_frm;
> +        ret = ifilter_send_frame(fl, frm);
> +        if (ret < 0) {
> +            av_log(NULL, AV_LOG_ERROR,
> +                   "Failed to inject frame into filter network: %s\n", av_err2str(ret));
> +        } else {
> +            ret = reap_filters(0, fl);
> +        }
> +        fl->t_error = ret;
> +
> +        pthread_mutex_lock(&fl->finish_mutex);
> +        fl->waited_frm = NULL;
> +        pthread_cond_signal(&fl->finish_cond);
> +        pthread_mutex_unlock(&fl->finish_mutex);
> +
> +        if (ret < 0)
> +            break;

Is this error always totally fatal?  (I guess I'm wondering if any EAGAIN-like cases end up here.)

> +    }
> +    return fl;

This return value seems to be unused?

> +}
> +#endif
> +
>  static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame)
>  {
>      int i, ret;
>      AVFrame *f;
>  
>      av_assert1(ist->nb_filters > 0); /* ensure ret is initialized */
> +#if HAVE_THREADS
> +    for (i = 0; i < ist->nb_filters; i++) {
> +        //it will use abr_pipeline mode by default
> +        if (i < ist->nb_filters - 1) {
> +            f = &ist->filters[i]->input_frm;
> +            ret = av_frame_ref(f, decoded_frame);
> +            if (ret < 0)
> +                break;

Won't this just deadlock if you ever hit the break?  You'll immediately wait for threads which haven't been given anything to do.

> +        } else
> +            f = decoded_frame;
> +
> +        if (!ist->filters[i]->b_abr_thread_init) {
> +            if ((ret = pthread_create(&ist->filters[i]->f_thread, NULL, filter_pipeline,
> +                            ist->filters[i]))) {
> +                av_log(NULL, AV_LOG_ERROR,
> +                        "pthread_create failed: %s. Try to increase `ulimit -v` or \
> +                        decrease `ulimit -s`.\n", strerror(ret));

What is the motivation for these recommendations?  Neither seems likely to help except in very weirdly constrained systems.

> +                return AVERROR(ret);
> +            }
> +            pthread_mutex_init(&ist->filters[i]->process_mutex, NULL);
> +            pthread_mutex_init(&ist->filters[i]->finish_mutex, NULL);
> +            pthread_cond_init(&ist->filters[i]->process_cond, NULL);
> +            pthread_cond_init(&ist->filters[i]->finish_cond, NULL);
> +            ist->filters[i]->t_end = 0;
> +            ist->filters[i]->t_error = 0;
> +            ist->filters[i]->b_abr_thread_init = 1;
> +        }
> +
> +        pthread_mutex_lock(&ist->filters[i]->process_mutex);
> +        ist->filters[i]->waited_frm = f;
> +        pthread_cond_signal(&ist->filters[i]->process_cond);
> +        pthread_mutex_unlock(&ist->filters[i]->process_mutex);
> +    }
> +
> +    for (i = 0; i < ist->nb_filters; i++) {
> +        pthread_mutex_lock(&ist->filters[i]->finish_mutex);
> +        while(ist->filters[i]->waited_frm != NULL)
> +            pthread_cond_wait(&ist->filters[i]->finish_cond, &ist->filters[i]->finish_mutex);
> +        pthread_mutex_unlock(&ist->filters[i]->finish_mutex);
> +    }

Is the lockstep such that you can actually use the same mutex and condvar for both parts?  That would seem simpler if it works.

> +    for (i = 0; i < ist->nb_filters; i++) {
> +        if (ist->filters[i]->t_error < 0) {
> +            ret = ist->filters[i]->t_error;
> +            break;
> +        }
> +    }
> +#else
>      for (i = 0; i < ist->nb_filters; i++) {
>          if (i < ist->nb_filters - 1) {
>              f = ist->filter_frame;
> @@ -2266,6 +2363,7 @@ static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame)
>                  break;
>          } else
>              f = decoded_frame;
> +

Stray change?

>          ret = ifilter_send_frame(ist->filters[i], f);
>          if (ret == AVERROR_EOF)
>              ret = 0; /* ignore */
> @@ -2275,6 +2373,8 @@ static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame)
>              break;
>          }
>      }
> +#endif

There is still a bit of common code here between the two branches.  I think you really do want the #ifdefed region to be as small as possible (you can put the loop outside the condition with a new loop start in the HAVE_THREADS case only).

> +
>      return ret;
>  }
>  
> @@ -4537,10 +4637,10 @@ static int transcode_from_filter(FilterGraph *graph, InputStream **best_ist)
>      *best_ist = NULL;
>      ret = avfilter_graph_request_oldest(graph->graph);
>      if (ret >= 0)
> -        return reap_filters(0);
> +        return reap_filters(0, NULL);

I'm not entirely sure I'm reading this correctly, but I think this is the complex filtergraph case.

That means that using -filter_complex split will have quite different behaviour to multiple -vf instances?

>  
>      if (ret == AVERROR_EOF) {
> -        ret = reap_filters(1);
> +        ret = reap_filters(1, NULL);
>          for (i = 0; i < graph->nb_outputs; i++)
>              close_output_stream(graph->outputs[i]->ost);
>          return ret;
> @@ -4642,7 +4742,7 @@ static int transcode_step(void)
>      if (ret < 0)
>          return ret == AVERROR_EOF ? 0 : ret;
>  
> -    return reap_filters(0);
> +    return HAVE_THREADS ? ret : reap_filters(0, NULL);
>  }
>  
>  /*
> diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
> index eb1eaf6..9a8e776 100644
> --- a/fftools/ffmpeg.h
> +++ b/fftools/ffmpeg.h
> @@ -253,6 +253,20 @@ typedef struct InputFilter {
>  
>      AVBufferRef *hw_frames_ctx;
>  
> +    // for abr pipeline
> +    int b_abr_thread_init;

I'm not sure what this name is intended to mean at all.  Since it indicates whether the filter thread has been created, maybe something like "filter_thread_created" would make the meaning clearer?

> +#if HAVE_THREADS
> +    AVFrame *waited_frm;
> +    AVFrame input_frm;

sizeof(AVFrame) is not part of the ABI.  You need to allocate it somewhere.

> +    pthread_t f_thread;

"filter_thread"?

> +    pthread_cond_t process_cond;
> +    pthread_cond_t finish_cond;
> +    pthread_mutex_t process_mutex;
> +    pthread_mutex_t finish_mutex;
> +    int t_end;
> +    int t_error;

I think it would be a good idea to document the condition associated with each of these.

> +#endif
> +
>      int eof;
>  } InputFilter;
>  
> diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c
> index 6518d50..5d1e521 100644
> --- a/fftools/ffmpeg_filter.c
> +++ b/fftools/ffmpeg_filter.c
> @@ -197,6 +197,7 @@ DEF_CHOOSE_FORMAT(channel_layouts, uint64_t, channel_layout, channel_layouts, 0,
>  int init_simple_filtergraph(InputStream *ist, OutputStream *ost)
>  {
>      FilterGraph *fg = av_mallocz(sizeof(*fg));
> +    int i;
>  
>      if (!fg)
>          exit_program(1);
> @@ -225,6 +226,9 @@ int init_simple_filtergraph(InputStream *ist, OutputStream *ost)
>      GROW_ARRAY(ist->filters, ist->nb_filters);
>      ist->filters[ist->nb_filters - 1] = fg->inputs[0];
>  
> +    for (i = 0; i < ist->nb_filters; i++)
> +        ist->filters[i]->b_abr_thread_init = 0;

It doesn't look like the right place for this init?  init_simple_filtergraph() is called once per output stream, so this is going to happen multiple times.

> +
>      GROW_ARRAY(filtergraphs, nb_filtergraphs);
>      filtergraphs[nb_filtergraphs - 1] = fg;
>  
> 

- Mark
Shaofei Wang Feb. 13, 2019, 7:52 a.m. UTC | #4
> -----Original Message-----

> From: ffmpeg-devel [mailto:ffmpeg-devel-bounces@ffmpeg.org] On Behalf Of

> Mark Thompson

> Sent: Tuesday, February 12, 2019 8:18 AM

It should be UTC time when received the email

> To: ffmpeg-devel@ffmpeg.org

> Subject: Re: [FFmpeg-devel] [PATCH v4] Improved the performance of 1

> decode + N filter graphs and adaptive bitrate.

> 

> On 11/02/2019 22:41, Shaofei Wang wrote:

And the above time I've sent the previous email is also a correct UTC time
 
> Please avoid sending messages from the future - the list received this about

> thirteen hours before its supposed send time (received "Mon, 11 Feb 2019

> 11:42:09 +0200", sent "Mon, 11 Feb 2019 17:41:04 -0500").


> Probably the sending machine or some intermediate has an incorrect time or

> time zone.

It may be the reason.

> Some numbers for more use-cases and platforms (with different architectures

> and core counts) would be a good idea if you intend to enable this by default.

It would be better to have more platforms data.
Actually, it provide option for user to choose a "faster" path in the previous
version. In this patch it simplified code path.

> Presumably it's a bit slower on less powerful machines with fewer cores when

> it makes many threads, but by how much?  Is that acceptable?

Is it resource limited machine that we should disable HAVE_THREADS?

> > diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c index

> > 544f1a1..67b1a2a 100644

> > --- a/fftools/ffmpeg.c

> > +++ b/fftools/ffmpeg.c

> > @@ -1419,13 +1419,18 @@ static void

> finish_output_stream(OutputStream *ost)

> >   *

> >   * @return  0 for success, <0 for severe errors

> >   */

> > -static int reap_filters(int flush)

> > +static int reap_filters(int flush, InputFilter * ifilter)

> >  {

> >      AVFrame *filtered_frame = NULL;

> >      int i;

> >

> > -    /* Reap all buffers present in the buffer sinks */

> > +    /* Reap all buffers present in the buffer sinks or just reap specified

> > +     * input filter buffer */

> >      for (i = 0; i < nb_output_streams; i++) {

> > +        if (ifilter) {

> > +            if (ifilter != output_streams[i]->filter->graph->inputs[0])

> > +                continue;

> > +        }

> 

> No mixed declarations and code.

OK. 
> >          OutputStream *ost = output_streams[i];

> >          OutputFile    *of = output_files[ost->file_index];

> >          AVFilterContext *filter;

> 

> How carefully has this been audited to make sure that there are no data races?

> The calls to init_output_stream() and do_video_out() both do /a lot/, and in

> particular they interact with the InputStream which might be shared with

> other threads (and indeed is in all your examples above).

Base on the code path of multithread, it won't have duplicated path to call
init_output_stream() and do_video_out(), since there's no output stream share
multiple filter graphs. And this concern should be hightlight, will investigate
more in the code.

> > @@ -2179,7 +2184,8 @@ static int ifilter_send_frame(InputFilter *ifilter,

> AVFrame *frame)

> >              }

> >          }

> >

> > -        ret = reap_filters(1);

> > +        ret = HAVE_THREADS ? reap_filters(1, ifilter) :

> > + reap_filters(1, NULL);

> > +

> >          if (ret < 0 && ret != AVERROR_EOF) {

> >              av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n",

> av_err2str(ret));

> >              return ret;

> > @@ -2208,6 +2214,14 @@ static int ifilter_send_eof(InputFilter

> > *ifilter, int64_t pts)

> >

> >      ifilter->eof = 1;

> >

> > +#if HAVE_THREADS

> > +    ifilter->waited_frm = NULL;

> > +    pthread_mutex_lock(&ifilter->process_mutex);

> > +    ifilter->t_end = 1;

> > +    pthread_cond_signal(&ifilter->process_cond);

> > +    pthread_mutex_unlock(&ifilter->process_mutex);

> > +    pthread_join(ifilter->f_thread, NULL); #endif

> >      if (ifilter->filter) {

> >          ret = av_buffersrc_close(ifilter->filter, pts,

> AV_BUFFERSRC_FLAG_PUSH);

> >          if (ret < 0)

> > @@ -2252,12 +2266,95 @@ static int decode(AVCodecContext *avctx,

> AVFrame *frame, int *got_frame, AVPacke

> >      return 0;

> >  }

> >

> > +#if HAVE_THREADS

> > +static void *filter_pipeline(void *arg) {

> > +    InputFilter *fl = arg;

> > +    AVFrame *frm;

> > +    int ret;

> > +    while(1) {

> > +        pthread_mutex_lock(&fl->process_mutex);

> > +        while (fl->waited_frm == NULL && !fl->t_end)

> > +            pthread_cond_wait(&fl->process_cond, &fl->process_mutex);

> > +        pthread_mutex_unlock(&fl->process_mutex);

> > +

> > +        if (fl->t_end) break;

> > +

> > +        frm = fl->waited_frm;

> > +        ret = ifilter_send_frame(fl, frm);

> > +        if (ret < 0) {

> > +            av_log(NULL, AV_LOG_ERROR,

> > +                   "Failed to inject frame into filter network: %s\n",

> av_err2str(ret));

> > +        } else {

> > +            ret = reap_filters(0, fl);

> > +        }

> > +        fl->t_error = ret;

> > +

> > +        pthread_mutex_lock(&fl->finish_mutex);

> > +        fl->waited_frm = NULL;

> > +        pthread_cond_signal(&fl->finish_cond);

> > +        pthread_mutex_unlock(&fl->finish_mutex);

> > +

> > +        if (ret < 0)

> > +            break;

> 

> Is this error always totally fatal?  (I guess I'm wondering if any EAGAIN-like

> cases end up here.)

Will remove the break. If the ret<0, similar as previous code to call ifilter_send_frame()
it will return the ret value to caller to decide whether it's fatal or not.

> 

> > +    }

> > +    return fl;

> 

> This return value seems to be unused?

OK, just return here.
> 

> > +}

> > +#endif

> > +

> >  static int send_frame_to_filters(InputStream *ist, AVFrame

> > *decoded_frame)  {

> >      int i, ret;

> >      AVFrame *f;

> >

> >      av_assert1(ist->nb_filters > 0); /* ensure ret is initialized */

> > +#if HAVE_THREADS

> > +    for (i = 0; i < ist->nb_filters; i++) {

> > +        //it will use abr_pipeline mode by default

> > +        if (i < ist->nb_filters - 1) {

> > +            f = &ist->filters[i]->input_frm;

> > +            ret = av_frame_ref(f, decoded_frame);

> > +            if (ret < 0)

> > +                break;

> 

> Won't this just deadlock if you ever hit the break?  You'll immediately wait

> for threads which haven't been given anything to do.

Yeah. Fixed in the next version. Thanks.

> 

> > +        } else

> > +            f = decoded_frame;

> > +

> > +        if (!ist->filters[i]->b_abr_thread_init) {

> > +            if ((ret = pthread_create(&ist->filters[i]->f_thread, NULL,

> filter_pipeline,

> > +                            ist->filters[i]))) {

> > +                av_log(NULL, AV_LOG_ERROR,

> > +                        "pthread_create failed: %s. Try to increase

> `ulimit -v` or \

> > +                        decrease `ulimit -s`.\n", strerror(ret));

> 

> What is the motivation for these recommendations?  Neither seems likely to

> help except in very weirdly constrained systems.

Done, removed.
> 

> > +                return AVERROR(ret);

> > +            }

> > +            pthread_mutex_init(&ist->filters[i]->process_mutex, NULL);

> > +            pthread_mutex_init(&ist->filters[i]->finish_mutex, NULL);

> > +            pthread_cond_init(&ist->filters[i]->process_cond, NULL);

> > +            pthread_cond_init(&ist->filters[i]->finish_cond, NULL);

> > +            ist->filters[i]->t_end = 0;

> > +            ist->filters[i]->t_error = 0;

> > +            ist->filters[i]->b_abr_thread_init = 1;

> > +        }

> > +

> > +        pthread_mutex_lock(&ist->filters[i]->process_mutex);

> > +        ist->filters[i]->waited_frm = f;

> > +        pthread_cond_signal(&ist->filters[i]->process_cond);

> > +        pthread_mutex_unlock(&ist->filters[i]->process_mutex);

> > +    }

> > +

> > +    for (i = 0; i < ist->nb_filters; i++) {

> > +        pthread_mutex_lock(&ist->filters[i]->finish_mutex);

> > +        while(ist->filters[i]->waited_frm != NULL)

> > +            pthread_cond_wait(&ist->filters[i]->finish_cond,

> &ist->filters[i]->finish_mutex);

> > +        pthread_mutex_unlock(&ist->filters[i]->finish_mutex);

> > +    }

> 

> Is the lockstep such that you can actually use the same mutex and condvar for

> both parts?  That would seem simpler if it works.

>

Let me try, but it's safe to use those for producer and consumer.

> > +    for (i = 0; i < ist->nb_filters; i++) {

> > +        if (ist->filters[i]->t_error < 0) {

> > +            ret = ist->filters[i]->t_error;

> > +            break;

> > +        }

> > +    }

> > +#else

> >      for (i = 0; i < ist->nb_filters; i++) {

> >          if (i < ist->nb_filters - 1) {

> >              f = ist->filter_frame;

> > @@ -2266,6 +2363,7 @@ static int send_frame_to_filters(InputStream *ist,

> AVFrame *decoded_frame)

> >                  break;

> >          } else

> >              f = decoded_frame;

> > +

> 

> Stray change?

Done.
> 

> >          ret = ifilter_send_frame(ist->filters[i], f);

> >          if (ret == AVERROR_EOF)

> >              ret = 0; /* ignore */

> > @@ -2275,6 +2373,8 @@ static int send_frame_to_filters(InputStream *ist,

> AVFrame *decoded_frame)

> >              break;

> >          }

> >      }

> > +#endif

> 

> There is still a bit of common code here between the two branches.  I think

> you really do want the #ifdefed region to be as small as possible (you can put

> the loop outside the condition with a new loop start in the HAVE_THREADS

> case only).

It may increase more "#ifdef" slices?

> 

> > +

> >      return ret;

> >  }

> >

> > @@ -4537,10 +4637,10 @@ static int transcode_from_filter(FilterGraph

> *graph, InputStream **best_ist)

> >      *best_ist = NULL;

> >      ret = avfilter_graph_request_oldest(graph->graph);

> >      if (ret >= 0)

> > -        return reap_filters(0);

> > +        return reap_filters(0, NULL);

> 

> I'm not entirely sure I'm reading this correctly, but I think this is the complex

> filtergraph case.

I think so.

> That means that using -filter_complex split will have quite different behaviour

> to multiple -vf instances?

> 

Yes, they are different. The patch is mainly for filter graph level. In terms of complex
split case, there will be another one where changed in the lib.

> >

> >      if (ret == AVERROR_EOF) {

> > -        ret = reap_filters(1);

> > +        ret = reap_filters(1, NULL);

> >          for (i = 0; i < graph->nb_outputs; i++)

> >              close_output_stream(graph->outputs[i]->ost);

> >          return ret;

> > @@ -4642,7 +4742,7 @@ static int transcode_step(void)

> >      if (ret < 0)

> >          return ret == AVERROR_EOF ? 0 : ret;

> >

> > -    return reap_filters(0);

> > +    return HAVE_THREADS ? ret : reap_filters(0, NULL);

> >  }

> >

> >  /*

> > diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index

> > eb1eaf6..9a8e776 100644

> > --- a/fftools/ffmpeg.h

> > +++ b/fftools/ffmpeg.h

> > @@ -253,6 +253,20 @@ typedef struct InputFilter {

> >

> >      AVBufferRef *hw_frames_ctx;

> >

> > +    // for abr pipeline

> > +    int b_abr_thread_init;

> 

> I'm not sure what this name is intended to mean at all.  Since it indicates

> whether the filter thread has been created, maybe something like

> "filter_thread_created" would make the meaning clearer?

> 

How about abr_thread_created? Since I want to distinguish them from those
frame/slice filter thread, so I call them adaptive bite rate pipeline :)

> > +#if HAVE_THREADS

> > +    AVFrame *waited_frm;

> > +    AVFrame input_frm;

> 

> sizeof(AVFrame) is not part of the ABI.  You need to allocate it somewhere.

> 

Please tell more?

> > +    pthread_t f_thread;

> 

> "filter_thread"?

>

abr_thread

> > +    pthread_cond_t process_cond;

> > +    pthread_cond_t finish_cond;

> > +    pthread_mutex_t process_mutex;

> > +    pthread_mutex_t finish_mutex;

> > +    int t_end;

> > +    int t_error;

> 

> I think it would be a good idea to document the condition associated with

> each of these.

> 

> > +#endif

> > +

> >      int eof;

> >  } InputFilter;

> >

> > diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c index

> > 6518d50..5d1e521 100644

> > --- a/fftools/ffmpeg_filter.c

> > +++ b/fftools/ffmpeg_filter.c

> > @@ -197,6 +197,7 @@ DEF_CHOOSE_FORMAT(channel_layouts, uint64_t,

> > channel_layout, channel_layouts, 0,  int

> > init_simple_filtergraph(InputStream *ist, OutputStream *ost)  {

> >      FilterGraph *fg = av_mallocz(sizeof(*fg));

> > +    int i;

> >

> >      if (!fg)

> >          exit_program(1);

> > @@ -225,6 +226,9 @@ int init_simple_filtergraph(InputStream *ist,

> OutputStream *ost)

> >      GROW_ARRAY(ist->filters, ist->nb_filters);

> >      ist->filters[ist->nb_filters - 1] = fg->inputs[0];

> >

> > +    for (i = 0; i < ist->nb_filters; i++)

> > +        ist->filters[i]->b_abr_thread_init = 0;

> 

> It doesn't look like the right place for this init?  init_simple_filtergraph() is

> called once per output stream, so this is going to happen multiple times.

> 

Find another place. Not only simple filter graph need this, but also complex 
graph do.
> > +

> >      GROW_ARRAY(filtergraphs, nb_filtergraphs);

> >      filtergraphs[nb_filtergraphs - 1] = fg;

> >

> >

> 

> - Mark

> _______________________________________________

> ffmpeg-devel mailing list

> ffmpeg-devel@ffmpeg.org

> http://ffmpeg.org/mailman/listinfo/ffmpeg-devel
Carl Eugen Hoyos Feb. 13, 2019, 10:44 a.m. UTC | #5
2019-02-13 8:52 GMT+01:00, Wang, Shaofei <shaofei.wang@intel.com>:

>> > +    AVFrame input_frm;
>>
>> sizeof(AVFrame) is not part of the ABI.  You need to allocate it
>> somewhere.
>>
> Please tell more?

See the documentation for AVFrame in libavutil/frame.h
Use av_frame_alloc()

Carl Eugen
Shaofei Wang Feb. 13, 2019, 10:57 a.m. UTC | #6
> >> sizeof(AVFrame) is not part of the ABI.  You need to allocate it

> >> somewhere.

> >>

> > Please tell more?

> 

> See the documentation for AVFrame in libavutil/frame.h Use av_frame_alloc()

> 

> Carl Eugen


Thanks Carl
diff mbox

Patch

diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index 544f1a1..67b1a2a 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -1419,13 +1419,18 @@  static void finish_output_stream(OutputStream *ost)
  *
  * @return  0 for success, <0 for severe errors
  */
-static int reap_filters(int flush)
+static int reap_filters(int flush, InputFilter * ifilter)
 {
     AVFrame *filtered_frame = NULL;
     int i;
 
-    /* Reap all buffers present in the buffer sinks */
+    /* Reap all buffers present in the buffer sinks or just reap specified
+     * input filter buffer */
     for (i = 0; i < nb_output_streams; i++) {
+        if (ifilter) {
+            if (ifilter != output_streams[i]->filter->graph->inputs[0])
+                continue;
+        }
         OutputStream *ost = output_streams[i];
         OutputFile    *of = output_files[ost->file_index];
         AVFilterContext *filter;
@@ -2179,7 +2184,8 @@  static int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame)
             }
         }
 
-        ret = reap_filters(1);
+        ret = HAVE_THREADS ? reap_filters(1, ifilter) : reap_filters(1, NULL);
+
         if (ret < 0 && ret != AVERROR_EOF) {
             av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n", av_err2str(ret));
             return ret;
@@ -2208,6 +2214,14 @@  static int ifilter_send_eof(InputFilter *ifilter, int64_t pts)
 
     ifilter->eof = 1;
 
+#if HAVE_THREADS
+    ifilter->waited_frm = NULL;
+    pthread_mutex_lock(&ifilter->process_mutex);
+    ifilter->t_end = 1;
+    pthread_cond_signal(&ifilter->process_cond);
+    pthread_mutex_unlock(&ifilter->process_mutex);
+    pthread_join(ifilter->f_thread, NULL);
+#endif
     if (ifilter->filter) {
         ret = av_buffersrc_close(ifilter->filter, pts, AV_BUFFERSRC_FLAG_PUSH);
         if (ret < 0)
@@ -2252,12 +2266,95 @@  static int decode(AVCodecContext *avctx, AVFrame *frame, int *got_frame, AVPacke
     return 0;
 }
 
+#if HAVE_THREADS
+static void *filter_pipeline(void *arg)
+{
+    InputFilter *fl = arg;
+    AVFrame *frm;
+    int ret;
+    while(1) {
+        pthread_mutex_lock(&fl->process_mutex);
+        while (fl->waited_frm == NULL && !fl->t_end)
+            pthread_cond_wait(&fl->process_cond, &fl->process_mutex);
+        pthread_mutex_unlock(&fl->process_mutex);
+
+        if (fl->t_end) break;
+
+        frm = fl->waited_frm;
+        ret = ifilter_send_frame(fl, frm);
+        if (ret < 0) {
+            av_log(NULL, AV_LOG_ERROR,
+                   "Failed to inject frame into filter network: %s\n", av_err2str(ret));
+        } else {
+            ret = reap_filters(0, fl);
+        }
+        fl->t_error = ret;
+
+        pthread_mutex_lock(&fl->finish_mutex);
+        fl->waited_frm = NULL;
+        pthread_cond_signal(&fl->finish_cond);
+        pthread_mutex_unlock(&fl->finish_mutex);
+
+        if (ret < 0)
+            break;
+    }
+    return fl;
+}
+#endif
+
 static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame)
 {
     int i, ret;
     AVFrame *f;
 
     av_assert1(ist->nb_filters > 0); /* ensure ret is initialized */
+#if HAVE_THREADS
+    for (i = 0; i < ist->nb_filters; i++) {
+        //it will use abr_pipeline mode by default
+        if (i < ist->nb_filters - 1) {
+            f = &ist->filters[i]->input_frm;
+            ret = av_frame_ref(f, decoded_frame);
+            if (ret < 0)
+                break;
+        } else
+            f = decoded_frame;
+
+        if (!ist->filters[i]->b_abr_thread_init) {
+            if ((ret = pthread_create(&ist->filters[i]->f_thread, NULL, filter_pipeline,
+                            ist->filters[i]))) {
+                av_log(NULL, AV_LOG_ERROR,
+                        "pthread_create failed: %s. Try to increase `ulimit -v` or \
+                        decrease `ulimit -s`.\n", strerror(ret));
+                return AVERROR(ret);
+            }
+            pthread_mutex_init(&ist->filters[i]->process_mutex, NULL);
+            pthread_mutex_init(&ist->filters[i]->finish_mutex, NULL);
+            pthread_cond_init(&ist->filters[i]->process_cond, NULL);
+            pthread_cond_init(&ist->filters[i]->finish_cond, NULL);
+            ist->filters[i]->t_end = 0;
+            ist->filters[i]->t_error = 0;
+            ist->filters[i]->b_abr_thread_init = 1;
+        }
+
+        pthread_mutex_lock(&ist->filters[i]->process_mutex);
+        ist->filters[i]->waited_frm = f;
+        pthread_cond_signal(&ist->filters[i]->process_cond);
+        pthread_mutex_unlock(&ist->filters[i]->process_mutex);
+    }
+
+    for (i = 0; i < ist->nb_filters; i++) {
+        pthread_mutex_lock(&ist->filters[i]->finish_mutex);
+        while(ist->filters[i]->waited_frm != NULL)
+            pthread_cond_wait(&ist->filters[i]->finish_cond, &ist->filters[i]->finish_mutex);
+        pthread_mutex_unlock(&ist->filters[i]->finish_mutex);
+    }
+    for (i = 0; i < ist->nb_filters; i++) {
+        if (ist->filters[i]->t_error < 0) {
+            ret = ist->filters[i]->t_error;
+            break;
+        }
+    }
+#else
     for (i = 0; i < ist->nb_filters; i++) {
         if (i < ist->nb_filters - 1) {
             f = ist->filter_frame;
@@ -2266,6 +2363,7 @@  static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame)
                 break;
         } else
             f = decoded_frame;
+
         ret = ifilter_send_frame(ist->filters[i], f);
         if (ret == AVERROR_EOF)
             ret = 0; /* ignore */
@@ -2275,6 +2373,8 @@  static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame)
             break;
         }
     }
+#endif
+
     return ret;
 }
 
@@ -4537,10 +4637,10 @@  static int transcode_from_filter(FilterGraph *graph, InputStream **best_ist)
     *best_ist = NULL;
     ret = avfilter_graph_request_oldest(graph->graph);
     if (ret >= 0)
-        return reap_filters(0);
+        return reap_filters(0, NULL);
 
     if (ret == AVERROR_EOF) {
-        ret = reap_filters(1);
+        ret = reap_filters(1, NULL);
         for (i = 0; i < graph->nb_outputs; i++)
             close_output_stream(graph->outputs[i]->ost);
         return ret;
@@ -4642,7 +4742,7 @@  static int transcode_step(void)
     if (ret < 0)
         return ret == AVERROR_EOF ? 0 : ret;
 
-    return reap_filters(0);
+    return HAVE_THREADS ? ret : reap_filters(0, NULL);
 }
 
 /*
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index eb1eaf6..9a8e776 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -253,6 +253,20 @@  typedef struct InputFilter {
 
     AVBufferRef *hw_frames_ctx;
 
+    // for abr pipeline
+    int b_abr_thread_init;
+#if HAVE_THREADS
+    AVFrame *waited_frm;
+    AVFrame input_frm;
+    pthread_t f_thread;
+    pthread_cond_t process_cond;
+    pthread_cond_t finish_cond;
+    pthread_mutex_t process_mutex;
+    pthread_mutex_t finish_mutex;
+    int t_end;
+    int t_error;
+#endif
+
     int eof;
 } InputFilter;
 
diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c
index 6518d50..5d1e521 100644
--- a/fftools/ffmpeg_filter.c
+++ b/fftools/ffmpeg_filter.c
@@ -197,6 +197,7 @@  DEF_CHOOSE_FORMAT(channel_layouts, uint64_t, channel_layout, channel_layouts, 0,
 int init_simple_filtergraph(InputStream *ist, OutputStream *ost)
 {
     FilterGraph *fg = av_mallocz(sizeof(*fg));
+    int i;
 
     if (!fg)
         exit_program(1);
@@ -225,6 +226,9 @@  int init_simple_filtergraph(InputStream *ist, OutputStream *ost)
     GROW_ARRAY(ist->filters, ist->nb_filters);
     ist->filters[ist->nb_filters - 1] = fg->inputs[0];
 
+    for (i = 0; i < ist->nb_filters; i++)
+        ist->filters[i]->b_abr_thread_init = 0;
+
     GROW_ARRAY(filtergraphs, nb_filtergraphs);
     filtergraphs[nb_filtergraphs - 1] = fg;