diff mbox

[FFmpeg-devel,v2] Improved the performance of 1 decode + N filter graphs and adaptive bitrate.

Message ID 1547591004-13018-1-git-send-email-shaofei.wang@intel.com
State Superseded
Headers show

Commit Message

Shaofei Wang Jan. 15, 2019, 10:23 p.m. UTC
With new option "-abr_pipeline"
It enabled multiple filter graph concurrency, which bring obove about
4%~20% improvement in some 1:N scenarios by CPU or GPU acceleration

Below are some test cases and comparison as reference.
(Hardware platform: Intel(R) Core(TM) i7-6700 CPU @ 3.40GHz)
(Software: Intel iHD driver - 16.9.00100, CentOS 7)

For 1:N transcode by GPU acceleration with vaapi:
./ffmpeg -vaapi_device /dev/dri/renderD128 -hwaccel vaapi \
    -hwaccel_output_format vaapi \
    -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
    -vf "scale_vaapi=1280:720" -c:v h264_vaapi -f null /dev/null \
    -vf "scale_vaapi=720:480" -c:v h264_vaapi -f null /dev/null \
    -abr_pipeline

    test results:
                2 encoders 5 encoders 10 encoders
    Improved       6.1%    6.9%       5.5%

For 1:N transcode by GPU acceleration with QSV:
./ffmpeg -hwaccel qsv -c:v h264_qsv \
    -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
    -vf "scale_qsv=1280:720:format=nv12" -c:v h264_qsv -f null /dev/null \
    -vf "scale_qsv=720:480:format=nv12" -c:v h264_qsv -f null /dev/null

    test results:
                2 encoders  5 encoders 10 encoders
    Improved       6%       4%         15%

For Intel GPU acceleration case, 1 decode to N scaling, by QSV:
./ffmpeg -hwaccel qsv -c:v h264_qsv \
    -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
    -vf "scale_qsv=1280:720:format=nv12,hwdownload" -pix_fmt nv12 -f null /dev/null \
    -vf "scale_qsv=720:480:format=nv12,hwdownload" -pix_fmt nv12 -f null /dev/null

    test results:
                2 scale  5 scale   10 scale
    Improved       12%     21%        21%

For CPU only 1 decode to N scaling:
./ffmpeg -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
    -vf "scale=1280:720" -pix_fmt nv12 -f null /dev/null \
    -vf "scale=720:480" -pix_fmt nv12 -f null /dev/null \
    -abr_pipeline

    test results:
                2 scale  5 scale   10 scale
    Improved       25%    107%       148%

Signed-off-by: Wang, Shaofei <shaofei.wang@intel.com>
Reviewed-by: Zhao, Jun <jun.zhao@intel.com>
---
 fftools/ffmpeg.c        | 238 +++++++++++++++++++++++++++++++++++++++++++++---
 fftools/ffmpeg.h        |  15 +++
 fftools/ffmpeg_filter.c |   6 ++
 fftools/ffmpeg_opt.c    |   6 +-
 4 files changed, 251 insertions(+), 14 deletions(-)

Comments

Rostislav Pehlivanov Jan. 15, 2019, 10:30 a.m. UTC | #1
On Tue, 15 Jan 2019 at 09:24, Shaofei Wang <shaofei.wang@intel.com> wrote:

> With new option "-abr_pipeline"
> It enabled multiple filter graph concurrency, which bring obove about
> 4%~20% improvement in some 1:N scenarios by CPU or GPU acceleration
>
> Below are some test cases and comparison as reference.
> (Hardware platform: Intel(R) Core(TM) i7-6700 CPU @ 3.40GHz)
> (Software: Intel iHD driver - 16.9.00100, CentOS 7)
>
> For 1:N transcode by GPU acceleration with vaapi:
> ./ffmpeg -vaapi_device /dev/dri/renderD128 -hwaccel vaapi \
>     -hwaccel_output_format vaapi \
>     -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
>     -vf "scale_vaapi=1280:720" -c:v h264_vaapi -f null /dev/null \
>     -vf "scale_vaapi=720:480" -c:v h264_vaapi -f null /dev/null \
>     -abr_pipeline
>
>     test results:
>                 2 encoders 5 encoders 10 encoders
>     Improved       6.1%    6.9%       5.5%
>
> For 1:N transcode by GPU acceleration with QSV:
> ./ffmpeg -hwaccel qsv -c:v h264_qsv \
>     -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
>     -vf "scale_qsv=1280:720:format=nv12" -c:v h264_qsv -f null /dev/null \
>     -vf "scale_qsv=720:480:format=nv12" -c:v h264_qsv -f null /dev/null
>
>     test results:
>                 2 encoders  5 encoders 10 encoders
>     Improved       6%       4%         15%
>
> For Intel GPU acceleration case, 1 decode to N scaling, by QSV:
> ./ffmpeg -hwaccel qsv -c:v h264_qsv \
>     -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
>     -vf "scale_qsv=1280:720:format=nv12,hwdownload" -pix_fmt nv12 -f null
> /dev/null \
>     -vf "scale_qsv=720:480:format=nv12,hwdownload" -pix_fmt nv12 -f null
> /dev/null
>
>     test results:
>                 2 scale  5 scale   10 scale
>     Improved       12%     21%        21%
>
> For CPU only 1 decode to N scaling:
> ./ffmpeg -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
>     -vf "scale=1280:720" -pix_fmt nv12 -f null /dev/null \
>     -vf "scale=720:480" -pix_fmt nv12 -f null /dev/null \
>     -abr_pipeline
>
>     test results:
>                 2 scale  5 scale   10 scale
>     Improved       25%    107%       148%
>
> Signed-off-by: Wang, Shaofei <shaofei.wang@intel.com>
> Reviewed-by: Zhao, Jun <jun.zhao@intel.com>
> ---
>  fftools/ffmpeg.c        | 238
> +++++++++++++++++++++++++++++++++++++++++++++---
>  fftools/ffmpeg.h        |  15 +++
>  fftools/ffmpeg_filter.c |   6 ++
>  fftools/ffmpeg_opt.c    |   6 +-
>  4 files changed, 251 insertions(+), 14 deletions(-)
>
> diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
> index 544f1a1..d608194 100644
> --- a/fftools/ffmpeg.c
> +++ b/fftools/ffmpeg.c
> @@ -1523,6 +1523,110 @@ static int reap_filters(int flush)
>      return 0;
>  }
>
> +static int pipeline_reap_filters(int flush, InputFilter * ifilter)
> +{
> +    AVFrame *filtered_frame = NULL;
> +    int i;
> +
> +    for (i = 0; i < nb_output_streams; i++) {
> +        if (ifilter == output_streams[i]->filter->graph->inputs[0]) break;
> +    }
> +    OutputStream *ost = output_streams[i];
> +    OutputFile    *of = output_files[ost->file_index];
> +    AVFilterContext *filter;
> +    AVCodecContext *enc = ost->enc_ctx;
> +    int ret = 0;
> +
> +    if (!ost->filter || !ost->filter->graph->graph)
> +        return 0;
> +    filter = ost->filter->filter;
> +
> +    if (!ost->initialized) {
> +        char error[1024] = "";
> +        ret = init_output_stream(ost, error, sizeof(error));
> +        if (ret < 0) {
> +            av_log(NULL, AV_LOG_ERROR, "Error initializing output stream
> %d:%d -- %s\n",
> +                   ost->file_index, ost->index, error);
> +            exit_program(1);
> +        }
> +    }
> +
> +    if (!ost->filtered_frame && !(ost->filtered_frame =
> av_frame_alloc())) {
> +        return AVERROR(ENOMEM);
> +    }
>

We never put brackets for single line statements.


> +    filtered_frame = ost->filtered_frame;
> +
> +    while (1) {
> +        double float_pts = AV_NOPTS_VALUE; // this is identical to
> filtered_frame.pts but with higher precision
> +        ret = av_buffersink_get_frame_flags(filter, filtered_frame,
> +                                           AV_BUFFERSINK_FLAG_NO_REQUEST);
> +        if (ret < 0) {
> +            if (ret != AVERROR(EAGAIN) && ret != AVERROR_EOF) {
> +                av_log(NULL, AV_LOG_WARNING,
> +                       "Error in av_buffersink_get_frame_flags(): %s\n",
> av_err2str(ret));
> +            } else if (flush && ret == AVERROR_EOF) {
> +                if (av_buffersink_get_type(filter) == AVMEDIA_TYPE_VIDEO)
> +                    do_video_out(of, ost, NULL, AV_NOPTS_VALUE);
> +            }
> +            break;
> +        }
> +        if (ost->finished) {
> +            av_frame_unref(filtered_frame);
> +            continue;
> +        }
> +        if (filtered_frame->pts != AV_NOPTS_VALUE) {
> +            int64_t start_time = (of->start_time == AV_NOPTS_VALUE) ? 0 :
> of->start_time;
> +            AVRational filter_tb = av_buffersink_get_time_base(filter);
> +            AVRational tb = enc->time_base;
> +            int extra_bits = av_clip(29 - av_log2(tb.den), 0, 16);
> +
> +            tb.den <<= extra_bits;
> +            float_pts =
> +                av_rescale_q(filtered_frame->pts, filter_tb, tb) -
> +                av_rescale_q(start_time, AV_TIME_BASE_Q, tb);
> +            float_pts /= 1 << extra_bits;
> +            // avoid exact midoints to reduce the chance of rounding
> differences, this can be removed in case the fps code is changed to work
> with integers
> +            float_pts += FFSIGN(float_pts) * 1.0 / (1<<17);
> +
> +            filtered_frame->pts =
> +                av_rescale_q(filtered_frame->pts, filter_tb,
> enc->time_base) -
> +                av_rescale_q(start_time, AV_TIME_BASE_Q, enc->time_base);
> +        }
> +
> +        switch (av_buffersink_get_type(filter)) {
> +        case AVMEDIA_TYPE_VIDEO:
> +            if (!ost->frame_aspect_ratio.num)
> +                enc->sample_aspect_ratio =
> filtered_frame->sample_aspect_ratio;
> +
> +            if (debug_ts) {
> +                av_log(NULL, AV_LOG_INFO, "filter -> pts:%s pts_time:%s
> exact:%f time_base:%d/%d\n",
> +                        av_ts2str(filtered_frame->pts),
> av_ts2timestr(filtered_frame->pts, &enc->time_base),
> +                        float_pts,
> +                        enc->time_base.num, enc->time_base.den);
> +            }
> +
> +            do_video_out(of, ost, filtered_frame, float_pts);
> +            break;
> +        case AVMEDIA_TYPE_AUDIO:
> +            if (!(enc->codec->capabilities & AV_CODEC_CAP_PARAM_CHANGE) &&
> +                enc->channels != filtered_frame->channels) {
> +                av_log(NULL, AV_LOG_ERROR,
> +                       "Audio filter graph output is not normalized and
> encoder does not support parameter changes\n");
> +                break;
> +            }
> +            do_audio_out(of, ost, filtered_frame);
> +            break;
> +        default:
> +            // TODO support subtitle filters
> +            av_assert0(0);
> +        }
> +
> +        av_frame_unref(filtered_frame);
> +    }
> +
> +    return 0;
> +}
> +
>  static void print_final_stats(int64_t total_size)
>  {
>      uint64_t video_size = 0, audio_size = 0, extra_size = 0, other_size =
> 0;
> @@ -2179,7 +2283,15 @@ static int ifilter_send_frame(InputFilter *ifilter,
> AVFrame *frame)
>              }
>          }
>
> +#if HAVE_THREADS
> +        if (!abr_pipeline) {
> +            ret = reap_filters(1);
> +        } else {
> +            ret = pipeline_reap_filters(1, ifilter);
> +        }
>

Same.



> +#else
>          ret = reap_filters(1);
> +#endif
>          if (ret < 0 && ret != AVERROR_EOF) {
>              av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n",
> av_err2str(ret));
>              return ret;
> @@ -2208,6 +2320,16 @@ static int ifilter_send_eof(InputFilter *ifilter,
> int64_t pts)
>
>      ifilter->eof = 1;
>
> +#if HAVE_THREADS
> +    if (abr_pipeline) {
> +        ifilter->waited_frm = NULL;
> +        pthread_mutex_lock(&ifilter->process_mutex);
> +        ifilter->t_end = 1;
> +        pthread_cond_signal(&ifilter->process_cond);
> +        pthread_mutex_unlock(&ifilter->process_mutex);
> +        pthread_join(ifilter->f_thread, NULL);
> +    }
> +#endif
>      if (ifilter->filter) {
>          ret = av_buffersrc_close(ifilter->filter, pts,
> AV_BUFFERSRC_FLAG_PUSH);
>          if (ret < 0)
> @@ -2252,6 +2374,42 @@ static int decode(AVCodecContext *avctx, AVFrame
> *frame, int *got_frame, AVPacke
>      return 0;
>  }
>
> +#if HAVE_THREADS
> +static void *filter_pipeline(void *arg)
> +{
> +    InputFilter *fl = arg;
> +    AVFrame *frm;
> +    int ret;
> +    while(1) {
> +        pthread_mutex_lock(&fl->process_mutex);
> +        while (fl->waited_frm == NULL && !fl->t_end)
> +            pthread_cond_wait(&fl->process_cond, &fl->process_mutex);
> +        pthread_mutex_unlock(&fl->process_mutex);
> +
> +        if (fl->t_end) break;
> +
> +        frm = fl->waited_frm;
> +        ret = ifilter_send_frame(fl, frm);
> +        if (ret < 0) {
> +            av_log(NULL, AV_LOG_ERROR,
> +                   "Failed to inject frame into filter network: %s\n",
> av_err2str(ret));
>

return err;?



> +        } else {
> +            ret = pipeline_reap_filters(0, fl);
> +        }
>

Single line brackets.



> +        fl->t_error = ret;
> +
> +        pthread_mutex_lock(&fl->finish_mutex);
> +        fl->waited_frm = NULL;
> +        pthread_cond_signal(&fl->finish_cond);
> +        pthread_mutex_unlock(&fl->finish_mutex);
> +
> +        if (ret < 0) {
> +            break;
> +        }
>

Same.


+    }
> +    return NULL;
> +}
> +#endif
>  static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame)
>  {
>      int i, ret;
> @@ -2259,22 +2417,73 @@ static int send_frame_to_filters(InputStream *ist,
> AVFrame *decoded_frame)
>
>      av_assert1(ist->nb_filters > 0); /* ensure ret is initialized */
>      for (i = 0; i < ist->nb_filters; i++) {
> -        if (i < ist->nb_filters - 1) {
> -            f = ist->filter_frame;
> -            ret = av_frame_ref(f, decoded_frame);
> -            if (ret < 0)
> +#if HAVE_THREADS
> +        if (!abr_pipeline) {
> +#endif
> +            if (i < ist->nb_filters - 1) {
> +                f = ist->filter_frame;
> +                ret = av_frame_ref(f, decoded_frame);
> +                if (ret < 0)
> +                    break;
> +            } else
> +                f = decoded_frame;
> +
> +                ret = ifilter_send_frame(ist->filters[i], f);
> +                if (ret == AVERROR_EOF)
> +                    ret = 0; /* ignore */
> +                if (ret < 0) {
> +                    av_log(NULL, AV_LOG_ERROR,
> +                           "Failed to inject frame into filter network:
> %s\n", av_err2str(ret));
> +                    break;
> +                }
> +#if HAVE_THREADS
> +        } else {
> +            if (i < ist->nb_filters - 1) {
> +                f = &ist->filters[i]->input_frm;
> +                ret = av_frame_ref(f, decoded_frame);
> +                if (ret < 0)
> +                    break;
> +            } else
> +                f = decoded_frame;
> +
> +            if (!ist->filters[i]->b_abr_thread_init) {
> +                if ((ret = pthread_create(&ist->filters[i]->f_thread,
> NULL, filter_pipeline, ist->filters[i]))) {
> +                    av_log(NULL, AV_LOG_ERROR, "pthread_create failed:
> %s. Try to increase `ulimit -v` or decrease `ulimit -s`.\n", strerror(ret));
> +                    return AVERROR(ret);
> +                }
> +                pthread_mutex_init(&ist->filters[i]->process_mutex, NULL);
> +                pthread_mutex_init(&ist->filters[i]->finish_mutex, NULL);
> +                pthread_cond_init(&ist->filters[i]->process_cond, NULL);
> +                pthread_cond_init(&ist->filters[i]->finish_cond, NULL);
> +                ist->filters[i]->t_end = 0;
> +                ist->filters[i]->t_error = 0;
> +                ist->filters[i]->b_abr_thread_init = 1;
> +            }
> +
> +            pthread_mutex_lock(&ist->filters[i]->process_mutex);
> +            ist->filters[i]->waited_frm = f;
> +            pthread_cond_signal(&ist->filters[i]->process_cond);
> +            pthread_mutex_unlock(&ist->filters[i]->process_mutex);
> +        }
> +#endif
> +    }
> +#if HAVE_THREADS
> +    if (abr_pipeline) {
> +        for (i = 0; i < ist->nb_filters; i++) {
> +            pthread_mutex_lock(&ist->filters[i]->finish_mutex);
> +            while(ist->filters[i]->waited_frm != NULL)
> +                pthread_cond_wait(&ist->filters[i]->finish_cond,
> &ist->filters[i]->finish_mutex);
> +            pthread_mutex_unlock(&ist->filters[i]->finish_mutex);
> +        }
> +        for (i = 0; i < ist->nb_filters; i++) {
> +            if (ist->filters[i]->t_error < 0) {
> +                ret = ist->filters[i]->t_error;
>                  break;
> -        } else
> -            f = decoded_frame;
> -        ret = ifilter_send_frame(ist->filters[i], f);
> -        if (ret == AVERROR_EOF)
> -            ret = 0; /* ignore */
> -        if (ret < 0) {
> -            av_log(NULL, AV_LOG_ERROR,
> -                   "Failed to inject frame into filter network: %s\n",
> av_err2str(ret));
> -            break;
> +            }
>          }
>      }
> +#endif
> +
>      return ret;
>  }
>
> @@ -4642,6 +4851,9 @@ static int transcode_step(void)
>      if (ret < 0)
>          return ret == AVERROR_EOF ? 0 : ret;
>
> +#if HAVE_THREADS
> +    if (abr_pipeline) return 0;
> +#endif
>      return reap_filters(0);
>  }
>
> diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
> index eb1eaf6..e91c243 100644
> --- a/fftools/ffmpeg.h
> +++ b/fftools/ffmpeg.h
> @@ -253,6 +253,20 @@ typedef struct InputFilter {
>
>      AVBufferRef *hw_frames_ctx;
>
> +#if HAVE_THREADS
> +    // for abr pipeline
> +    AVFrame *waited_frm;
> +    AVFrame input_frm;
> +    pthread_t f_thread;
> +    pthread_cond_t process_cond;
> +    pthread_cond_t finish_cond;
> +    pthread_mutex_t process_mutex;
> +    pthread_mutex_t finish_mutex;
> +    int b_abr_thread_init;
> +    int t_end;
> +    int t_error;
> +#endif
> +
>      int eof;
>  } InputFilter;
>
> @@ -606,6 +620,7 @@ extern int frame_bits_per_raw_sample;
>  extern AVIOContext *progress_avio;
>  extern float max_error_rate;
>  extern char *videotoolbox_pixfmt;
> +extern int abr_pipeline;
>
>  extern int filter_nbthreads;
>  extern int filter_complex_nbthreads;
> diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c
> index 6518d50..8823394 100644
> --- a/fftools/ffmpeg_filter.c
> +++ b/fftools/ffmpeg_filter.c
> @@ -197,6 +197,7 @@ DEF_CHOOSE_FORMAT(channel_layouts, uint64_t,
> channel_layout, channel_layouts, 0,
>  int init_simple_filtergraph(InputStream *ist, OutputStream *ost)
>  {
>      FilterGraph *fg = av_mallocz(sizeof(*fg));
> +    int i;
>
>      if (!fg)
>          exit_program(1);
> @@ -225,6 +226,11 @@ int init_simple_filtergraph(InputStream *ist,
> OutputStream *ost)
>      GROW_ARRAY(ist->filters, ist->nb_filters);
>      ist->filters[ist->nb_filters - 1] = fg->inputs[0];
>
> +    if (abr_pipeline) {
> +        for (i = 0; i < ist->nb_filters; i++) {
> +            ist->filters[i]->b_abr_thread_init = 0;
> +        }
> +    }
>

Same x2.



>      GROW_ARRAY(filtergraphs, nb_filtergraphs);
>      filtergraphs[nb_filtergraphs - 1] = fg;
>
> diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c
> index d4851a2..fa5a556 100644
> --- a/fftools/ffmpeg_opt.c
> +++ b/fftools/ffmpeg_opt.c
> @@ -110,6 +110,7 @@ float max_error_rate  = 2.0/3;
>  int filter_nbthreads = 0;
>  int filter_complex_nbthreads = 0;
>  int vstats_version = 2;
> +int abr_pipeline      = 0;
>
>
>  static int intra_only         = 0;
> @@ -3502,7 +3503,10 @@ const OptionDef options[] = {
>          "set the maximum number of queued packets from the demuxer" },
>      { "find_stream_info", OPT_BOOL | OPT_PERFILE | OPT_INPUT |
> OPT_EXPERT, { &find_stream_info },
>          "read and decode the streams to fill missing information with
> heuristics" },
> -
> +#if HAVE_THREADS
> +    { "abr_pipeline",    OPT_BOOL,                                    {
> &abr_pipeline },
> +        "adaptive bitrate pipeline (1 decode to N filter graphs, and 1 to
> N transcode" },
> +#endif
>      /* video options */
>      { "vframes",      OPT_VIDEO | HAS_ARG  | OPT_PERFILE | OPT_OUTPUT,
>        { .func_arg = opt_video_frames },
>          "set the number of video frames to output", "number" },
> --
> 1.8.3.1
>
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel@ffmpeg.org
> http://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
Carl Eugen Hoyos Jan. 15, 2019, 11:57 a.m. UTC | #2
2019-01-15 11:30 GMT+01:00, Rostislav Pehlivanov <atomnuker@gmail.com>:
> On Tue, 15 Jan 2019 at 09:24, Shaofei Wang <shaofei.wang@intel.com> wrote:

>> +#if HAVE_THREADS
>> +        if (!abr_pipeline) {
>> +            ret = reap_filters(1);
>> +        } else {
>> +            ret = pipeline_reap_filters(1, ifilter);
>> +        }
>>
>
> Same.

This hunk should have brackets, it simplifies
debugging and future patches at very little cost.

Carl Eugen
Rostislav Pehlivanov Jan. 15, 2019, 1:43 p.m. UTC | #3
On Tue, 15 Jan 2019 at 11:57, Carl Eugen Hoyos <ceffmpeg@gmail.com> wrote:

> 2019-01-15 11:30 GMT+01:00, Rostislav Pehlivanov <atomnuker@gmail.com>:
> > On Tue, 15 Jan 2019 at 09:24, Shaofei Wang <shaofei.wang@intel.com>
> wrote:
>
> >> +#if HAVE_THREADS
> >> +        if (!abr_pipeline) {
> >> +            ret = reap_filters(1);
> >> +        } else {
> >> +            ret = pipeline_reap_filters(1, ifilter);
> >> +        }
> >>
> >
> > Same.
>
> This hunk should have brackets, it simplifies
> debugging and future patches at very little cost.
>

No, it does not. It wastes a line.


> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel@ffmpeg.org
> http://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
Carl Eugen Hoyos Jan. 15, 2019, 2 p.m. UTC | #4
2019-01-15 14:43 GMT+01:00, Rostislav Pehlivanov <atomnuker@gmail.com>:
> On Tue, 15 Jan 2019 at 11:57, Carl Eugen Hoyos <ceffmpeg@gmail.com> wrote:
>
>> 2019-01-15 11:30 GMT+01:00, Rostislav Pehlivanov <atomnuker@gmail.com>:
>> > On Tue, 15 Jan 2019 at 09:24, Shaofei Wang <shaofei.wang@intel.com>
>> wrote:
>>
>> >> +#if HAVE_THREADS
>> >> +        if (!abr_pipeline) {
>> >> +            ret = reap_filters(1);
>> >> +        } else {
>> >> +            ret = pipeline_reap_filters(1, ifilter);
>> >> +        }
>> >>
>> >
>> > Same.
>>
>> This hunk should have brackets, it simplifies
>> debugging and future patches at very little cost.
>>
>
> No, it does not. It wastes a line.

Yes, it "wastes" a line.
But the advantage far outweighs the incredible costs.

Carl Eugen
Michael Niedermayer Jan. 15, 2019, 5:43 p.m. UTC | #5
On Tue, Jan 15, 2019 at 05:23:24PM -0500, Shaofei Wang wrote:
> With new option "-abr_pipeline"
> It enabled multiple filter graph concurrency, which bring obove about
> 4%~20% improvement in some 1:N scenarios by CPU or GPU acceleration
> 
> Below are some test cases and comparison as reference.
> (Hardware platform: Intel(R) Core(TM) i7-6700 CPU @ 3.40GHz)
> (Software: Intel iHD driver - 16.9.00100, CentOS 7)
> 
> For 1:N transcode by GPU acceleration with vaapi:
> ./ffmpeg -vaapi_device /dev/dri/renderD128 -hwaccel vaapi \
>     -hwaccel_output_format vaapi \
>     -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
>     -vf "scale_vaapi=1280:720" -c:v h264_vaapi -f null /dev/null \
>     -vf "scale_vaapi=720:480" -c:v h264_vaapi -f null /dev/null \
>     -abr_pipeline
> 
>     test results:
>                 2 encoders 5 encoders 10 encoders
>     Improved       6.1%    6.9%       5.5%
> 
> For 1:N transcode by GPU acceleration with QSV:
> ./ffmpeg -hwaccel qsv -c:v h264_qsv \
>     -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
>     -vf "scale_qsv=1280:720:format=nv12" -c:v h264_qsv -f null /dev/null \
>     -vf "scale_qsv=720:480:format=nv12" -c:v h264_qsv -f null /dev/null
> 
>     test results:
>                 2 encoders  5 encoders 10 encoders
>     Improved       6%       4%         15%
> 
> For Intel GPU acceleration case, 1 decode to N scaling, by QSV:
> ./ffmpeg -hwaccel qsv -c:v h264_qsv \
>     -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
>     -vf "scale_qsv=1280:720:format=nv12,hwdownload" -pix_fmt nv12 -f null /dev/null \
>     -vf "scale_qsv=720:480:format=nv12,hwdownload" -pix_fmt nv12 -f null /dev/null
> 
>     test results:
>                 2 scale  5 scale   10 scale
>     Improved       12%     21%        21%
> 
> For CPU only 1 decode to N scaling:
> ./ffmpeg -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
>     -vf "scale=1280:720" -pix_fmt nv12 -f null /dev/null \
>     -vf "scale=720:480" -pix_fmt nv12 -f null /dev/null \
>     -abr_pipeline
> 
>     test results:
>                 2 scale  5 scale   10 scale
>     Improved       25%    107%       148%
> 
> Signed-off-by: Wang, Shaofei <shaofei.wang@intel.com>
> Reviewed-by: Zhao, Jun <jun.zhao@intel.com>
> ---
>  fftools/ffmpeg.c        | 238 +++++++++++++++++++++++++++++++++++++++++++++---
>  fftools/ffmpeg.h        |  15 +++
>  fftools/ffmpeg_filter.c |   6 ++
>  fftools/ffmpeg_opt.c    |   6 +-
>  4 files changed, 251 insertions(+), 14 deletions(-)

breaks build when threads are not available

CC	fftools/ffprobe.o
src/fftools/ffmpeg_filter.c: In function ‘init_simple_filtergraph’:
src/fftools/ffmpeg_filter.c:231: error: ‘InputFilter’ has no member named ‘b_abr_thread_init’
make: *** [fftools/ffmpeg_filter.o] Error 1
make: *** Waiting for unfinished jobs....


[...]
Marton Balint Jan. 15, 2019, 8:21 p.m. UTC | #6
On Tue, 15 Jan 2019, Carl Eugen Hoyos wrote:

> 2019-01-15 14:43 GMT+01:00, Rostislav Pehlivanov <atomnuker@gmail.com>:
>> On Tue, 15 Jan 2019 at 11:57, Carl Eugen Hoyos <ceffmpeg@gmail.com> wrote:
>>
>>> 2019-01-15 11:30 GMT+01:00, Rostislav Pehlivanov <atomnuker@gmail.com>:
>>> > On Tue, 15 Jan 2019 at 09:24, Shaofei Wang <shaofei.wang@intel.com>
>>> wrote:
>>>
>>> >> +#if HAVE_THREADS
>>> >> +        if (!abr_pipeline) {
>>> >> +            ret = reap_filters(1);
>>> >> +        } else {
>>> >> +            ret = pipeline_reap_filters(1, ifilter);
>>> >> +        }
>>> >>
>>> >
>>> > Same.
>>>
>>> This hunk should have brackets, it simplifies
>>> debugging and future patches at very little cost.
>>>
>>
>> No, it does not. It wastes a line.
>
> Yes, it "wastes" a line.
> But the advantage far outweighs the incredible costs.

Not that I care too much, but I would write this:

ret = abr_pipeline ? pipeline_reap_filters(1, ifilter) : reap_filters(1);

Regards,
Marton
diff mbox

Patch

diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index 544f1a1..d608194 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -1523,6 +1523,110 @@  static int reap_filters(int flush)
     return 0;
 }
 
+static int pipeline_reap_filters(int flush, InputFilter * ifilter)
+{
+    AVFrame *filtered_frame = NULL;
+    int i;
+
+    for (i = 0; i < nb_output_streams; i++) {
+        if (ifilter == output_streams[i]->filter->graph->inputs[0]) break;
+    }
+    OutputStream *ost = output_streams[i];
+    OutputFile    *of = output_files[ost->file_index];
+    AVFilterContext *filter;
+    AVCodecContext *enc = ost->enc_ctx;
+    int ret = 0;
+
+    if (!ost->filter || !ost->filter->graph->graph)
+        return 0;
+    filter = ost->filter->filter;
+
+    if (!ost->initialized) {
+        char error[1024] = "";
+        ret = init_output_stream(ost, error, sizeof(error));
+        if (ret < 0) {
+            av_log(NULL, AV_LOG_ERROR, "Error initializing output stream %d:%d -- %s\n",
+                   ost->file_index, ost->index, error);
+            exit_program(1);
+        }
+    }
+
+    if (!ost->filtered_frame && !(ost->filtered_frame = av_frame_alloc())) {
+        return AVERROR(ENOMEM);
+    }
+    filtered_frame = ost->filtered_frame;
+
+    while (1) {
+        double float_pts = AV_NOPTS_VALUE; // this is identical to filtered_frame.pts but with higher precision
+        ret = av_buffersink_get_frame_flags(filter, filtered_frame,
+                                           AV_BUFFERSINK_FLAG_NO_REQUEST);
+        if (ret < 0) {
+            if (ret != AVERROR(EAGAIN) && ret != AVERROR_EOF) {
+                av_log(NULL, AV_LOG_WARNING,
+                       "Error in av_buffersink_get_frame_flags(): %s\n", av_err2str(ret));
+            } else if (flush && ret == AVERROR_EOF) {
+                if (av_buffersink_get_type(filter) == AVMEDIA_TYPE_VIDEO)
+                    do_video_out(of, ost, NULL, AV_NOPTS_VALUE);
+            }
+            break;
+        }
+        if (ost->finished) {
+            av_frame_unref(filtered_frame);
+            continue;
+        }
+        if (filtered_frame->pts != AV_NOPTS_VALUE) {
+            int64_t start_time = (of->start_time == AV_NOPTS_VALUE) ? 0 : of->start_time;
+            AVRational filter_tb = av_buffersink_get_time_base(filter);
+            AVRational tb = enc->time_base;
+            int extra_bits = av_clip(29 - av_log2(tb.den), 0, 16);
+
+            tb.den <<= extra_bits;
+            float_pts =
+                av_rescale_q(filtered_frame->pts, filter_tb, tb) -
+                av_rescale_q(start_time, AV_TIME_BASE_Q, tb);
+            float_pts /= 1 << extra_bits;
+            // avoid exact midoints to reduce the chance of rounding differences, this can be removed in case the fps code is changed to work with integers
+            float_pts += FFSIGN(float_pts) * 1.0 / (1<<17);
+
+            filtered_frame->pts =
+                av_rescale_q(filtered_frame->pts, filter_tb, enc->time_base) -
+                av_rescale_q(start_time, AV_TIME_BASE_Q, enc->time_base);
+        }
+
+        switch (av_buffersink_get_type(filter)) {
+        case AVMEDIA_TYPE_VIDEO:
+            if (!ost->frame_aspect_ratio.num)
+                enc->sample_aspect_ratio = filtered_frame->sample_aspect_ratio;
+
+            if (debug_ts) {
+                av_log(NULL, AV_LOG_INFO, "filter -> pts:%s pts_time:%s exact:%f time_base:%d/%d\n",
+                        av_ts2str(filtered_frame->pts), av_ts2timestr(filtered_frame->pts, &enc->time_base),
+                        float_pts,
+                        enc->time_base.num, enc->time_base.den);
+            }
+
+            do_video_out(of, ost, filtered_frame, float_pts);
+            break;
+        case AVMEDIA_TYPE_AUDIO:
+            if (!(enc->codec->capabilities & AV_CODEC_CAP_PARAM_CHANGE) &&
+                enc->channels != filtered_frame->channels) {
+                av_log(NULL, AV_LOG_ERROR,
+                       "Audio filter graph output is not normalized and encoder does not support parameter changes\n");
+                break;
+            }
+            do_audio_out(of, ost, filtered_frame);
+            break;
+        default:
+            // TODO support subtitle filters
+            av_assert0(0);
+        }
+
+        av_frame_unref(filtered_frame);
+    }
+
+    return 0;
+}
+
 static void print_final_stats(int64_t total_size)
 {
     uint64_t video_size = 0, audio_size = 0, extra_size = 0, other_size = 0;
@@ -2179,7 +2283,15 @@  static int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame)
             }
         }
 
+#if HAVE_THREADS
+        if (!abr_pipeline) {
+            ret = reap_filters(1);
+        } else {
+            ret = pipeline_reap_filters(1, ifilter);
+        }
+#else
         ret = reap_filters(1);
+#endif
         if (ret < 0 && ret != AVERROR_EOF) {
             av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n", av_err2str(ret));
             return ret;
@@ -2208,6 +2320,16 @@  static int ifilter_send_eof(InputFilter *ifilter, int64_t pts)
 
     ifilter->eof = 1;
 
+#if HAVE_THREADS
+    if (abr_pipeline) {
+        ifilter->waited_frm = NULL;
+        pthread_mutex_lock(&ifilter->process_mutex);
+        ifilter->t_end = 1;
+        pthread_cond_signal(&ifilter->process_cond);
+        pthread_mutex_unlock(&ifilter->process_mutex);
+        pthread_join(ifilter->f_thread, NULL);
+    }
+#endif
     if (ifilter->filter) {
         ret = av_buffersrc_close(ifilter->filter, pts, AV_BUFFERSRC_FLAG_PUSH);
         if (ret < 0)
@@ -2252,6 +2374,42 @@  static int decode(AVCodecContext *avctx, AVFrame *frame, int *got_frame, AVPacke
     return 0;
 }
 
+#if HAVE_THREADS
+static void *filter_pipeline(void *arg)
+{
+    InputFilter *fl = arg;
+    AVFrame *frm;
+    int ret;
+    while(1) {
+        pthread_mutex_lock(&fl->process_mutex);
+        while (fl->waited_frm == NULL && !fl->t_end)
+            pthread_cond_wait(&fl->process_cond, &fl->process_mutex);
+        pthread_mutex_unlock(&fl->process_mutex);
+
+        if (fl->t_end) break;
+
+        frm = fl->waited_frm;
+        ret = ifilter_send_frame(fl, frm);
+        if (ret < 0) {
+            av_log(NULL, AV_LOG_ERROR,
+                   "Failed to inject frame into filter network: %s\n", av_err2str(ret));
+        } else {
+            ret = pipeline_reap_filters(0, fl);
+        }
+        fl->t_error = ret;
+
+        pthread_mutex_lock(&fl->finish_mutex);
+        fl->waited_frm = NULL;
+        pthread_cond_signal(&fl->finish_cond);
+        pthread_mutex_unlock(&fl->finish_mutex);
+
+        if (ret < 0) {
+            break;
+        }
+    }
+    return NULL;
+}
+#endif
 static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame)
 {
     int i, ret;
@@ -2259,22 +2417,73 @@  static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame)
 
     av_assert1(ist->nb_filters > 0); /* ensure ret is initialized */
     for (i = 0; i < ist->nb_filters; i++) {
-        if (i < ist->nb_filters - 1) {
-            f = ist->filter_frame;
-            ret = av_frame_ref(f, decoded_frame);
-            if (ret < 0)
+#if HAVE_THREADS
+        if (!abr_pipeline) {
+#endif
+            if (i < ist->nb_filters - 1) {
+                f = ist->filter_frame;
+                ret = av_frame_ref(f, decoded_frame);
+                if (ret < 0)
+                    break;
+            } else
+                f = decoded_frame;
+
+                ret = ifilter_send_frame(ist->filters[i], f);
+                if (ret == AVERROR_EOF)
+                    ret = 0; /* ignore */
+                if (ret < 0) {
+                    av_log(NULL, AV_LOG_ERROR,
+                           "Failed to inject frame into filter network: %s\n", av_err2str(ret));
+                    break;
+                }
+#if HAVE_THREADS
+        } else {
+            if (i < ist->nb_filters - 1) {
+                f = &ist->filters[i]->input_frm;
+                ret = av_frame_ref(f, decoded_frame);
+                if (ret < 0)
+                    break;
+            } else
+                f = decoded_frame;
+
+            if (!ist->filters[i]->b_abr_thread_init) {
+                if ((ret = pthread_create(&ist->filters[i]->f_thread, NULL, filter_pipeline, ist->filters[i]))) {
+                    av_log(NULL, AV_LOG_ERROR, "pthread_create failed: %s. Try to increase `ulimit -v` or decrease `ulimit -s`.\n", strerror(ret));
+                    return AVERROR(ret);
+                }
+                pthread_mutex_init(&ist->filters[i]->process_mutex, NULL);
+                pthread_mutex_init(&ist->filters[i]->finish_mutex, NULL);
+                pthread_cond_init(&ist->filters[i]->process_cond, NULL);
+                pthread_cond_init(&ist->filters[i]->finish_cond, NULL);
+                ist->filters[i]->t_end = 0;
+                ist->filters[i]->t_error = 0;
+                ist->filters[i]->b_abr_thread_init = 1;
+            }
+
+            pthread_mutex_lock(&ist->filters[i]->process_mutex);
+            ist->filters[i]->waited_frm = f;
+            pthread_cond_signal(&ist->filters[i]->process_cond);
+            pthread_mutex_unlock(&ist->filters[i]->process_mutex);
+        }
+#endif
+    }
+#if HAVE_THREADS
+    if (abr_pipeline) {
+        for (i = 0; i < ist->nb_filters; i++) {
+            pthread_mutex_lock(&ist->filters[i]->finish_mutex);
+            while(ist->filters[i]->waited_frm != NULL)
+                pthread_cond_wait(&ist->filters[i]->finish_cond, &ist->filters[i]->finish_mutex);
+            pthread_mutex_unlock(&ist->filters[i]->finish_mutex);
+        }
+        for (i = 0; i < ist->nb_filters; i++) {
+            if (ist->filters[i]->t_error < 0) {
+                ret = ist->filters[i]->t_error;
                 break;
-        } else
-            f = decoded_frame;
-        ret = ifilter_send_frame(ist->filters[i], f);
-        if (ret == AVERROR_EOF)
-            ret = 0; /* ignore */
-        if (ret < 0) {
-            av_log(NULL, AV_LOG_ERROR,
-                   "Failed to inject frame into filter network: %s\n", av_err2str(ret));
-            break;
+            }
         }
     }
+#endif
+
     return ret;
 }
 
@@ -4642,6 +4851,9 @@  static int transcode_step(void)
     if (ret < 0)
         return ret == AVERROR_EOF ? 0 : ret;
 
+#if HAVE_THREADS
+    if (abr_pipeline) return 0;
+#endif
     return reap_filters(0);
 }
 
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index eb1eaf6..e91c243 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -253,6 +253,20 @@  typedef struct InputFilter {
 
     AVBufferRef *hw_frames_ctx;
 
+#if HAVE_THREADS
+    // for abr pipeline
+    AVFrame *waited_frm;
+    AVFrame input_frm;
+    pthread_t f_thread;
+    pthread_cond_t process_cond;
+    pthread_cond_t finish_cond;
+    pthread_mutex_t process_mutex;
+    pthread_mutex_t finish_mutex;
+    int b_abr_thread_init;
+    int t_end;
+    int t_error;
+#endif
+
     int eof;
 } InputFilter;
 
@@ -606,6 +620,7 @@  extern int frame_bits_per_raw_sample;
 extern AVIOContext *progress_avio;
 extern float max_error_rate;
 extern char *videotoolbox_pixfmt;
+extern int abr_pipeline;
 
 extern int filter_nbthreads;
 extern int filter_complex_nbthreads;
diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c
index 6518d50..8823394 100644
--- a/fftools/ffmpeg_filter.c
+++ b/fftools/ffmpeg_filter.c
@@ -197,6 +197,7 @@  DEF_CHOOSE_FORMAT(channel_layouts, uint64_t, channel_layout, channel_layouts, 0,
 int init_simple_filtergraph(InputStream *ist, OutputStream *ost)
 {
     FilterGraph *fg = av_mallocz(sizeof(*fg));
+    int i;
 
     if (!fg)
         exit_program(1);
@@ -225,6 +226,11 @@  int init_simple_filtergraph(InputStream *ist, OutputStream *ost)
     GROW_ARRAY(ist->filters, ist->nb_filters);
     ist->filters[ist->nb_filters - 1] = fg->inputs[0];
 
+    if (abr_pipeline) {
+        for (i = 0; i < ist->nb_filters; i++) {
+            ist->filters[i]->b_abr_thread_init = 0;
+        }
+    }
     GROW_ARRAY(filtergraphs, nb_filtergraphs);
     filtergraphs[nb_filtergraphs - 1] = fg;
 
diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c
index d4851a2..fa5a556 100644
--- a/fftools/ffmpeg_opt.c
+++ b/fftools/ffmpeg_opt.c
@@ -110,6 +110,7 @@  float max_error_rate  = 2.0/3;
 int filter_nbthreads = 0;
 int filter_complex_nbthreads = 0;
 int vstats_version = 2;
+int abr_pipeline      = 0;
 
 
 static int intra_only         = 0;
@@ -3502,7 +3503,10 @@  const OptionDef options[] = {
         "set the maximum number of queued packets from the demuxer" },
     { "find_stream_info", OPT_BOOL | OPT_PERFILE | OPT_INPUT | OPT_EXPERT, { &find_stream_info },
         "read and decode the streams to fill missing information with heuristics" },
-
+#if HAVE_THREADS
+    { "abr_pipeline",    OPT_BOOL,                                    { &abr_pipeline },
+        "adaptive bitrate pipeline (1 decode to N filter graphs, and 1 to N transcode" },
+#endif
     /* video options */
     { "vframes",      OPT_VIDEO | HAS_ARG  | OPT_PERFILE | OPT_OUTPUT,           { .func_arg = opt_video_frames },
         "set the number of video frames to output", "number" },