diff mbox

[FFmpeg-devel] Improved the performance of 1 decode + N filter graphs and adaptive bitrate.

Message ID 1553638041-26102-1-git-send-email-shaofei.wang@intel.com
State New
Headers show

Commit Message

Shaofei Wang March 26, 2019, 10:07 p.m. UTC
It enabled MULTIPLE SIMPLE filter graph concurrency, which bring above about
4%~20% improvement in some 1:N scenarios by CPU or GPU acceleration

Below are some test cases and comparison as reference.
(Hardware platform: Intel(R) Core(TM) i7-6700 CPU @ 3.40GHz)
(Software: Intel iHD driver - 16.9.00100, CentOS 7)

For 1:N transcode by GPU acceleration with vaapi:
./ffmpeg -vaapi_device /dev/dri/renderD128 -hwaccel vaapi \
    -hwaccel_output_format vaapi \
    -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
    -vf "scale_vaapi=1280:720" -c:v h264_vaapi -f null /dev/null \
    -vf "scale_vaapi=720:480" -c:v h264_vaapi -f null /dev/null

    test results:
                2 encoders 5 encoders 10 encoders
    Improved       6.1%    6.9%       5.5%

For 1:N transcode by GPU acceleration with QSV:
./ffmpeg -hwaccel qsv -c:v h264_qsv \
    -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
    -vf "scale_qsv=1280:720:format=nv12" -c:v h264_qsv -f null /dev/null \
    -vf "scale_qsv=720:480:format=nv12" -c:v h264_qsv -f null /dev/null

    test results:
                2 encoders  5 encoders 10 encoders
    Improved       6%       4%         15%

For Intel GPU acceleration case, 1 decode to N scaling, by QSV:
./ffmpeg -hwaccel qsv -c:v h264_qsv \
    -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
    -vf "scale_qsv=1280:720:format=nv12,hwdownload" -pix_fmt nv12 -f null /dev/null \
    -vf "scale_qsv=720:480:format=nv12,hwdownload" -pix_fmt nv12 -f null /dev/null

    test results:
                2 scale  5 scale   10 scale
    Improved       12%     21%        21%

For CPU only 1 decode to N scaling:
./ffmpeg -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
    -vf "scale=1280:720" -pix_fmt nv12 -f null /dev/null \
    -vf "scale=720:480" -pix_fmt nv12 -f null /dev/null

    test results:
                2 scale  5 scale   10 scale
    Improved       25%    107%       148%

Signed-off-by: Wang, Shaofei <shaofei.wang@intel.com>
---
The patch will only effect on multiple SIMPLE filter graphs pipeline,
Passed fate and refine the possible data race,
AFL tested, without introducing extra crashs/hangs

 fftools/ffmpeg.c | 172 +++++++++++++++++++++++++++++++++++++++++++++++++------
 fftools/ffmpeg.h |  13 +++++
 2 files changed, 169 insertions(+), 16 deletions(-)

Comments

Carl Eugen Hoyos March 26, 2019, 10:35 a.m. UTC | #1
2019-03-26 23:07 GMT+01:00, Shaofei Wang <shaofei.wang@intel.com>:
> It enabled MULTIPLE SIMPLE filter graph concurrency, which
> bring above about 4%~20% improvement in some 1:N
> scenarios by CPU or GPU acceleration

Which version of the patch did you test to get this numbers?

The following is not an actual review, feel free to ignore all comments.

> -    /* Reap all buffers present in the buffer sinks */
> +    /* Reap all buffers present in the buffer sinks or just reap specified
> +     * buffer which related with the filter graph who got ifilter as
> input*/

/* Reap all buffers present in the buffer sinks */
/* or just reap specified buffer which related */
/* with the filter graph who got ifilter as input */

> +        if (ifilter && abr_threads_enabled)
> +            if (ost != ifilter->graph->outputs[0]->ost)
> +                continue;

if (filter && abr_threads_enabled && ost != ifilter->graph->outputs[0]->ost)
  continue;

> -        ret = reap_filters(1);
> +        ret = (HAVE_THREADS && abr_threads_enabled) ? reap_filters(1,
> ifilter) : reap_filters(1, NULL);

ret = reap_filters(1, HAVE_THREADS && abr_threads_enabled ? ifilter, NULL);

Same below.

> +        if (ret == AVERROR_EOF)
> +            ret = 0;
> +        else if (ret < 0) {

Please add "{" and "}".

> +            av_log(NULL, AV_LOG_ERROR,
> +                   "Failed to inject frame into filter network: %s\n",
> av_err2str(ret));
> +        } else {

>  static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame)
>  {
> -    int i, ret;
> +    int i, ret = 0;
>      AVFrame *f;
>
>      av_assert1(ist->nb_filters > 0); /* ensure ret is initialized */
> +

No.

>      for (i = 0; i < ist->nb_filters; i++) {
>          if (i < ist->nb_filters - 1) {
> -            f = ist->filter_frame;
> +            f = (HAVE_THREADS && abr_threads_enabled) ?
> ist->filters[i]->input_frm : ist->filter_frame;

See above.

> -        ret = ifilter_send_frame(ist->filters[i], f);
> -        if (ret == AVERROR_EOF)
> -            ret = 0; /* ignore */
> -        if (ret < 0) {
> -            av_log(NULL, AV_LOG_ERROR,
> -                   "Failed to inject frame into filter network: %s\n",
> av_err2str(ret));
> -            break;
> +        if (!HAVE_THREADS || !abr_threads_enabled) {
> +            ret = ifilter_send_frame(ist->filters[i], f);
> +            if (ret == AVERROR_EOF)
> +                ret = 0; /* ignore */
> +            if (ret < 0) {
> +                av_log(NULL, AV_LOG_ERROR,
> +                       "Failed to inject frame into filter network: %s\n",
> av_err2str(ret));
> +                break;

Some people here (including me) find the patch much easier
to read if you do the re-indentation in a separate patch.

> @@ -2334,7 +2442,6 @@ static int decode_audio
>                                                (AVRational){1,
> avctx->sample_rate});
>      ist->nb_samples = decoded_frame->nb_samples;
>      err = send_frame_to_filters(ist, decoded_frame);
> -

No.

Carl Eugen
Michael Niedermayer March 26, 2019, 9:24 p.m. UTC | #2
On Tue, Mar 26, 2019 at 06:07:21PM -0400, Shaofei Wang wrote:
> It enabled MULTIPLE SIMPLE filter graph concurrency, which bring above about
> 4%~20% improvement in some 1:N scenarios by CPU or GPU acceleration
> 
> Below are some test cases and comparison as reference.
> (Hardware platform: Intel(R) Core(TM) i7-6700 CPU @ 3.40GHz)
> (Software: Intel iHD driver - 16.9.00100, CentOS 7)
> 
> For 1:N transcode by GPU acceleration with vaapi:
> ./ffmpeg -vaapi_device /dev/dri/renderD128 -hwaccel vaapi \
>     -hwaccel_output_format vaapi \
>     -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
>     -vf "scale_vaapi=1280:720" -c:v h264_vaapi -f null /dev/null \
>     -vf "scale_vaapi=720:480" -c:v h264_vaapi -f null /dev/null
> 
>     test results:
>                 2 encoders 5 encoders 10 encoders
>     Improved       6.1%    6.9%       5.5%
> 
> For 1:N transcode by GPU acceleration with QSV:
> ./ffmpeg -hwaccel qsv -c:v h264_qsv \
>     -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
>     -vf "scale_qsv=1280:720:format=nv12" -c:v h264_qsv -f null /dev/null \
>     -vf "scale_qsv=720:480:format=nv12" -c:v h264_qsv -f null /dev/null
> 
>     test results:
>                 2 encoders  5 encoders 10 encoders
>     Improved       6%       4%         15%
> 
> For Intel GPU acceleration case, 1 decode to N scaling, by QSV:
> ./ffmpeg -hwaccel qsv -c:v h264_qsv \
>     -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
>     -vf "scale_qsv=1280:720:format=nv12,hwdownload" -pix_fmt nv12 -f null /dev/null \
>     -vf "scale_qsv=720:480:format=nv12,hwdownload" -pix_fmt nv12 -f null /dev/null
> 
>     test results:
>                 2 scale  5 scale   10 scale
>     Improved       12%     21%        21%
> 
> For CPU only 1 decode to N scaling:
> ./ffmpeg -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
>     -vf "scale=1280:720" -pix_fmt nv12 -f null /dev/null \
>     -vf "scale=720:480" -pix_fmt nv12 -f null /dev/null
> 
>     test results:
>                 2 scale  5 scale   10 scale
>     Improved       25%    107%       148%
> 
> Signed-off-by: Wang, Shaofei <shaofei.wang@intel.com>
> ---
> The patch will only effect on multiple SIMPLE filter graphs pipeline,
> Passed fate and refine the possible data race,
> AFL tested, without introducing extra crashs/hangs
> 
>  fftools/ffmpeg.c | 172 +++++++++++++++++++++++++++++++++++++++++++++++++------
>  fftools/ffmpeg.h |  13 +++++
>  2 files changed, 169 insertions(+), 16 deletions(-)

this still fails with some (fuzzed) samples
valgrind does not seem to produce much usefull though
...
Error while decoding stream #0:0: Invalid data found when processing input
[rv40 @ 0x25829f40] marking unfished frame as finished
[rv40 @ 0x25829f40] concealing 27 DC, 27 AC, 27 MV errors in B frame
[rv40 @ 0x2584f000] Slice indicates MB offset 142, got 140
[rv40 @ 0x2584f000] Dquant for P-frame
[rv40 @ 0x2584f000] concealing 2 DC, 2 AC, 2 MV errors in P frame
Error while decoding stream #0:0: Invalid data found when processing input
[rv40 @ 0x256dfb40] Slice indicates MB offset 140, got 135
[rv40 @ 0x256dfb40] concealing 5 DC, 5 AC, 5 MV errors in B frame
[rv40 @ 0x25701940] concealing 112 DC, 112 AC, 112 MV errors in P frame
Error while decoding stream #0:0: Invalid data found when processing input=N/A    
    Last message repeated 1 times
[rv40 @ 0x25804e80] First slice header is incorrect
[rv40 @ 0x256dfb40] Slice indicates MB offset 187, got 140
[rv40 @ 0x256dfb40] Dquant for P-frame
[rv40 @ 0x256dfb40] concealing 47 DC, 47 AC, 47 MV errors in P frame
[rv40 @ 0x25726a00] Dquant for B-frame
[rv40 @ 0x25770b80] concealing 115 DC, 115 AC, 115 MV errors in P frame
Error while decoding stream #0:0: Invalid data found when processing input=N/A    
[rv40 @ 0x25804e80] Dquant for P-frame
[rv40 @ 0x257dfdc0] Dquant for P-framee=-577014:32:22.77 bitrate=N/A speed=N/A    
[rv40 @ 0x25804e80] Dquant for B-frame
Too many packets buffered for output stream 0:0.
pthread_join failed with error: Resource deadlock avoided

gdb output:
pthread_join failed with error: Resource deadlock avoided

Program received signal SIGABRT, Aborted.
[Switching to Thread 0x7fffe4def700 (LWP 13350)]
0x00007fffefeaec37 in raise () from /lib/x86_64-linux-gnu/libc.so.6
(gdb) bt
Python Exception <type 'exceptions.ImportError'> No module named gdb.frames: 
#0  0x00007fffefeaec37 in raise () from /lib/x86_64-linux-gnu/libc.so.6
#1  0x00007fffefeb2028 in abort () from /lib/x86_64-linux-gnu/libc.so.6
#2  0x000000000042bea1 in strict_pthread_join (thread=140737033205504, value_ptr=0x0) at ./libavutil/thread.h:55
#3  0x000000000042d342 in ffmpeg_cleanup (ret=1) at fftools/ffmpeg.c:526
#4  0x00000000004250d8 in exit_program (ret=1) at fftools/cmdutils.c:139
#5  0x000000000042dfd4 in write_packet (of=0x231b780, pkt=0x7fffe4dee680, ost=0x237f680, unqueue=0) at fftools/ffmpeg.c:738
#6  0x000000000042eb39 in output_packet (of=0x231b780, pkt=0x7fffe4dee680, ost=0x237f680, eof=0) at fftools/ffmpeg.c:903
#7  0x0000000000430c30 in do_video_out (of=0x231b780, ost=0x237f680, next_picture=0x7fffa4005200, sync_ipts=98940.800010681152) at fftools/ffmpeg.c:1337
#8  0x0000000000431986 in reap_filters (flush=1, ifilter=0x22d2c00) at fftools/ffmpeg.c:1533
#9  0x0000000000434dc8 in filter_pipeline (arg=0x22d2c00) at fftools/ffmpeg.c:2318
#10 0x00007ffff0249184 in start_thread () from /lib/x86_64-linux-gnu/libpthread.so.0
#11 0x00007fffeff7603d in clone () from /lib/x86_64-linux-gnu/libc.so.6


[...]
Mark Thompson March 26, 2019, 10:44 p.m. UTC | #3
On 26/03/2019 22:07, Shaofei Wang wrote:
> It enabled MULTIPLE SIMPLE filter graph concurrency, which bring above about
> 4%~20% improvement in some 1:N scenarios by CPU or GPU acceleration
> 
> ...
> ---
> The patch will only effect on multiple SIMPLE filter graphs pipeline,
> Passed fate and refine the possible data race,
> AFL tested, without introducing extra crashs/hangs
> 
>  fftools/ffmpeg.c | 172 +++++++++++++++++++++++++++++++++++++++++++++++++------
>  fftools/ffmpeg.h |  13 +++++
>  2 files changed, 169 insertions(+), 16 deletions(-)

On further thought, I don't think this patch should continue in its present form.

The fundamental problem is that the bolted-on parallelism renders it pretty much unreviewable - the codebase here was not designed with any parallelism in mind, so it does not follow the usual rules expected of such code.  In particular, there are global variables in many places accessed without regard for data races, and that becomes fatal undefined behaviour once parallelism is added.  While in theory every one of those can be fixed (by adding more locks, or one big lock covering many instances together), it will be very hard to verify that all cases have been covered and the code will suffer significantly in readability/maintainability for the change.

To give an explicit example of the sort of problems you are hitting, the -benchmark_all option is entirely broken in the current proposed version.  It invokes undefined behaviour by writing to the current_time global in every thread.  Even if that were avoided by moving the value to a thread-local structure, it also gives wrong results - getrusage(RUSAGE_SELF) returns values for the whole process, so it won't say anything useful once there are multiple threads looking at the value in parallel.  Perhaps that is fixable by using some sort of per-thread storage and RUSAGE_THREAD (and some equivalent on Windows), but whatever that is certainly requires more investigation and probably a separate patch.

Three possible thoughts on where to go from here:

* Start by refactoring the code to make it easier to verify.  This would entail something like removing all global variable accesses from parallel paths, and then ensuring that each thread only ever writes to its own local working set (e.g. OutputStream structure) while marking all other structures as const.  After such refactoring has happened, a new version of the present patch would then be possible to assess.

* Run exhaustively in tsan/valgrind/other race detectors and fix every problem it finds, then provide evidence from that to help with review.  Since these issues can be hidden behind specific option combinations (as the benchmark one is), this would need a lot of care to ensure full coverage, and some parts (like the benchmark one) would need rewriting anyway.  As noted above I'm not sure this would be very good for the code, but we could at least have some amount of confidence that the result was correct.

* Reconsider whether the patch is worth pursuing.  While I agree that a correct implementation of this would help in the specific use-cases you describe, I'm not sure that the set of users who gain from it is actually particularly large - most people wanting to maximise throughput in the way this change can help with already run multiple FFmpeg instances (or their own programs using the libraries) because the serial limitations of FFmpeg are well-known.


- Mark
Carl Eugen Hoyos March 26, 2019, 11:44 p.m. UTC | #4
2019-03-26 23:44 GMT+01:00, Mark Thompson <sw@jkqxz.net>:

> * Run exhaustively in tsan/valgrind/other race detectors and fix every
> problem it finds, then provide evidence from that to help with review.

In addition to issues you mention, I wonder if this wouldn't trigger
many unresolved issues in existing (library) code, making the
task even more difficult.

Carl Eugen
Shaofei Wang March 27, 2019, 9:51 a.m. UTC | #5
> -----Original Message-----

> From: ffmpeg-devel [mailto:ffmpeg-devel-bounces@ffmpeg.org] On Behalf Of

> Mark Thompson

> Sent: Wednesday, March 27, 2019 6:44 AM

> To: ffmpeg-devel@ffmpeg.org

> Subject: Re: [FFmpeg-devel] [PATCH] Improved the performance of 1 decode +

> N filter graphs and adaptive bitrate.

>

It will always without an ending to work out a perfect solution, that' also the
current situation of the parallelism issue in ffmpeg.
No matter how users would like to resolved the "well-known" serial
limitation by them self in their app, but why not do some positive thing in
the big amount user used ffmpeg tool? Although it's codebase was not
designed with parallelism mind, it can also be paralleled at some key part
of pipeline as the patch shows with lightly modification. Otherwise, a brand
new parallel designed ffmpeg can be delivered by extra big effort, and
concerns, behaviour problems will no less than the current one.
Some thoughts:
- Provide the option -abr_pipeline to enable this multiple simple filter graph
parallel optimization instead of by default, which means users who enable
it knows what they are doing and performance pursuing. The feature/
optimization shouldn't cover all the situations in ffmpeg which is a cool box
provides huge amount of different features. And some cases need parallel,
some not.
- To fix those already known parallel problems such as -benchmark_all.
Those consideration are good to improve and refine the feature/solution,
, even some hidden/not easy triggered issue found in future, they can be
fixed by dedicated patch.

Thanks

> On further thought, I don't think this patch should continue in its present

> form.

> 

> The fundamental problem is that the bolted-on parallelism renders it pretty

> much unreviewable - the codebase here was not designed with any

> parallelism in mind, so it does not follow the usual rules expected of such

> code.  In particular, there are global variables in many places accessed

> without regard for data races, and that becomes fatal undefined behaviour

> once parallelism is added.  While in theory every one of those can be fixed

> (by adding more locks, or one big lock covering many instances together), it

> will be very hard to verify that all cases have been covered and the code will

> suffer significantly in readability/maintainability for the change.

> 

> To give an explicit example of the sort of problems you are hitting, the

> -benchmark_all option is entirely broken in the current proposed version.  It

> invokes undefined behaviour by writing to the current_time global in every

> thread.  Even if that were avoided by moving the value to a thread-local

> structure, it also gives wrong results - getrusage(RUSAGE_SELF) returns values

> for the whole process, so it won't say anything useful once there are multiple

> threads looking at the value in parallel.  Perhaps that is fixable by using some

> sort of per-thread storage and RUSAGE_THREAD (and some equivalent on

> Windows), but whatever that is certainly requires more investigation and

> probably a separate patch.

> 

> Three possible thoughts on where to go from here:

> 

> * Start by refactoring the code to make it easier to verify.  This would entail

> something like removing all global variable accesses from parallel paths, and

> then ensuring that each thread only ever writes to its own local working set

> (e.g. OutputStream structure) while marking all other structures as const.

> After such refactoring has happened, a new version of the present patch

> would then be possible to assess.

> 

> * Run exhaustively in tsan/valgrind/other race detectors and fix every

> problem it finds, then provide evidence from that to help with review.  Since

> these issues can be hidden behind specific option combinations (as the

> benchmark one is), this would need a lot of care to ensure full coverage, and

> some parts (like the benchmark one) would need rewriting anyway.  As

> noted above I'm not sure this would be very good for the code, but we could

> at least have some amount of confidence that the result was correct.

For the existing code, already got lots of issues detected by valgrind/tsan.
We can provide evidence to prove the problem have been covered.
> 

> * Reconsider whether the patch is worth pursuing.  While I agree that a

> correct implementation of this would help in the specific use-cases you

> describe, I'm not sure that the set of users who gain from it is actually

> particularly large - most people wanting to maximise throughput in the way

> this change can help with already run multiple FFmpeg instances (or their own

> programs using the libraries) because the serial limitations of FFmpeg are

> well-known.

To run multiple FFmpeg instances couldn't stand for 1:N cases. And of course
users can use their own program to call the libraries, but we also see a lot of
users who use ffmpeg tool to build up their solution, demo, App and etc.
Shaofei Wang March 27, 2019, 10:03 a.m. UTC | #6
> -----Original Message-----
> From: ffmpeg-devel [mailto:ffmpeg-devel-bounces@ffmpeg.org] On Behalf Of
> Michael Niedermayer
> Sent: Wednesday, March 27, 2019 5:24 AM
> To: FFmpeg development discussions and patches <ffmpeg-devel@ffmpeg.org>
> Subject: Re: [FFmpeg-devel] [PATCH] Improved the performance of 1 decode +
> N filter graphs and adaptive bitrate.
 
> this still fails with some (fuzzed) samples valgrind does not seem to produce
> much usefull though ...
Still the join issue, will fix them in next version, thanks
> Error while decoding stream #0:0: Invalid data found when processing input
> [rv40 @ 0x25829f40] marking unfished frame as finished
> [rv40 @ 0x25829f40] concealing 27 DC, 27 AC, 27 MV errors in B frame
> [rv40 @ 0x2584f000] Slice indicates MB offset 142, got 140
> [rv40 @ 0x2584f000] Dquant for P-frame
> [rv40 @ 0x2584f000] concealing 2 DC, 2 AC, 2 MV errors in P frame Error
> while decoding stream #0:0: Invalid data found when processing input
> [rv40 @ 0x256dfb40] Slice indicates MB offset 140, got 135
> [rv40 @ 0x256dfb40] concealing 5 DC, 5 AC, 5 MV errors in B frame
> [rv40 @ 0x25701940] concealing 112 DC, 112 AC, 112 MV errors in P frame
> Error while decoding stream #0:0: Invalid data found when processing
> input=N/A
>     Last message repeated 1 times
> [rv40 @ 0x25804e80] First slice header is incorrect
> [rv40 @ 0x256dfb40] Slice indicates MB offset 187, got 140
> [rv40 @ 0x256dfb40] Dquant for P-frame
> [rv40 @ 0x256dfb40] concealing 47 DC, 47 AC, 47 MV errors in P frame
> [rv40 @ 0x25726a00] Dquant for B-frame
> [rv40 @ 0x25770b80] concealing 115 DC, 115 AC, 115 MV errors in P frame
> Error while decoding stream #0:0: Invalid data found when processing
> input=N/A
> [rv40 @ 0x25804e80] Dquant for P-frame
> [rv40 @ 0x257dfdc0] Dquant for P-framee=-577014:32:22.77 bitrate=N/A
> speed=N/A
> [rv40 @ 0x25804e80] Dquant for B-frame
> Too many packets buffered for output stream 0:0.
> pthread_join failed with error: Resource deadlock avoided
> 
> gdb output:
> pthread_join failed with error: Resource deadlock avoided
> 
> Program received signal SIGABRT, Aborted.
> [Switching to Thread 0x7fffe4def700 (LWP 13350)]
> 0x00007fffefeaec37 in raise () from /lib/x86_64-linux-gnu/libc.so.6
> (gdb) bt
> Python Exception <type 'exceptions.ImportError'> No module named
> gdb.frames:
> #0  0x00007fffefeaec37 in raise () from /lib/x86_64-linux-gnu/libc.so.6
> #1  0x00007fffefeb2028 in abort () from /lib/x86_64-linux-gnu/libc.so.6
> #2  0x000000000042bea1 in strict_pthread_join (thread=140737033205504,
> value_ptr=0x0) at ./libavutil/thread.h:55
> #3  0x000000000042d342 in ffmpeg_cleanup (ret=1) at fftools/ffmpeg.c:526
> #4  0x00000000004250d8 in exit_program (ret=1) at fftools/cmdutils.c:139
> #5  0x000000000042dfd4 in write_packet (of=0x231b780,
> pkt=0x7fffe4dee680, ost=0x237f680, unqueue=0) at fftools/ffmpeg.c:738
> #6  0x000000000042eb39 in output_packet (of=0x231b780,
> pkt=0x7fffe4dee680, ost=0x237f680, eof=0) at fftools/ffmpeg.c:903
> #7  0x0000000000430c30 in do_video_out (of=0x231b780, ost=0x237f680,
> next_picture=0x7fffa4005200, sync_ipts=98940.800010681152) at
> fftools/ffmpeg.c:1337
> #8  0x0000000000431986 in reap_filters (flush=1, ifilter=0x22d2c00) at
> fftools/ffmpeg.c:1533
> #9  0x0000000000434dc8 in filter_pipeline (arg=0x22d2c00) at
> fftools/ffmpeg.c:2318
> #10 0x00007ffff0249184 in start_thread () from
> /lib/x86_64-linux-gnu/libpthread.so.0
> #11 0x00007fffeff7603d in clone () from /lib/x86_64-linux-gnu/libc.so.6
> 
> 
> [...]
> --
> Michael     GnuPG fingerprint:
> 9FF2128B147EF6730BADF133611EC787040B0FAB
> 
> Those who are too smart to engage in politics are punished by being governed
> by those who are dumber. -- Plato
Shaofei Wang March 27, 2019, 10:54 a.m. UTC | #7
> -----Original Message-----

> From: ffmpeg-devel [mailto:ffmpeg-devel-bounces@ffmpeg.org] On Behalf Of

> Carl Eugen Hoyos

> Sent: Tuesday, March 26, 2019 6:36 PM

> To: FFmpeg development discussions and patches <ffmpeg-devel@ffmpeg.org>

> Subject: Re: [FFmpeg-devel] [PATCH] Improved the performance of 1 decode +

> N filter graphs and adaptive bitrate.

> 

> 2019-03-26 23:07 GMT+01:00, Shaofei Wang <shaofei.wang@intel.com>:

> > It enabled MULTIPLE SIMPLE filter graph concurrency, which bring above

> > about 4%~20% improvement in some 1:N scenarios by CPU or GPU

> > acceleration

> 

> Which version of the patch did you test to get this numbers?

> 

The current one got it. It may move depend on different platform, density and case
diff mbox

Patch

diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index 544f1a1..5f6e712 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -164,7 +164,13 @@  static struct termios oldtty;
 static int restore_tty;
 #endif
 
+/* enable abr threads when there were multiple simple filter graphs*/
+static int abr_threads_enabled = 0;
+
 #if HAVE_THREADS
+pthread_mutex_t fg_config_mutex;
+pthread_mutex_t ost_init_mutex;
+
 static void free_input_threads(void);
 #endif
 
@@ -509,6 +515,17 @@  static void ffmpeg_cleanup(int ret)
                 }
                 av_fifo_freep(&fg->inputs[j]->ist->sub2video.sub_queue);
             }
+#if HAVE_THREADS
+            if (abr_threads_enabled) {
+                av_frame_free(&fg->inputs[j]->input_frm);
+                pthread_mutex_lock(&fg->inputs[j]->process_mutex);
+                fg->inputs[j]->waited_frm = NULL;
+                fg->inputs[j]->t_end = 1;
+                pthread_cond_signal(&fg->inputs[j]->process_cond);
+                pthread_mutex_unlock(&fg->inputs[j]->process_mutex);
+                pthread_join(fg->inputs[j]->abr_thread, NULL);
+            }
+#endif
             av_buffer_unref(&fg->inputs[j]->hw_frames_ctx);
             av_freep(&fg->inputs[j]->name);
             av_freep(&fg->inputs[j]);
@@ -1419,12 +1436,13 @@  static void finish_output_stream(OutputStream *ost)
  *
  * @return  0 for success, <0 for severe errors
  */
-static int reap_filters(int flush)
+static int reap_filters(int flush, InputFilter * ifilter)
 {
     AVFrame *filtered_frame = NULL;
     int i;
 
-    /* Reap all buffers present in the buffer sinks */
+    /* Reap all buffers present in the buffer sinks or just reap specified
+     * buffer which related with the filter graph who got ifilter as input*/
     for (i = 0; i < nb_output_streams; i++) {
         OutputStream *ost = output_streams[i];
         OutputFile    *of = output_files[ost->file_index];
@@ -1432,13 +1450,25 @@  static int reap_filters(int flush)
         AVCodecContext *enc = ost->enc_ctx;
         int ret = 0;
 
+        if (ifilter && abr_threads_enabled)
+            if (ost != ifilter->graph->outputs[0]->ost)
+                continue;
+
         if (!ost->filter || !ost->filter->graph->graph)
             continue;
         filter = ost->filter->filter;
 
         if (!ost->initialized) {
             char error[1024] = "";
+#if HAVE_THREADS
+            if (abr_threads_enabled)
+                pthread_mutex_lock(&ost_init_mutex);
+#endif
             ret = init_output_stream(ost, error, sizeof(error));
+#if HAVE_THREADS
+            if (abr_threads_enabled)
+                pthread_mutex_unlock(&ost_init_mutex);
+#endif
             if (ret < 0) {
                 av_log(NULL, AV_LOG_ERROR, "Error initializing output stream %d:%d -- %s\n",
                        ost->file_index, ost->index, error);
@@ -2179,13 +2209,22 @@  static int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame)
             }
         }
 
-        ret = reap_filters(1);
+        ret = (HAVE_THREADS && abr_threads_enabled) ? reap_filters(1, ifilter) : reap_filters(1, NULL);
+
         if (ret < 0 && ret != AVERROR_EOF) {
             av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n", av_err2str(ret));
             return ret;
         }
 
+#if HAVE_THREADS
+        if (abr_threads_enabled)
+            pthread_mutex_lock(&fg_config_mutex);
+#endif
         ret = configure_filtergraph(fg);
+#if HAVE_THREADS
+        if (abr_threads_enabled)
+            pthread_mutex_unlock(&fg_config_mutex);
+#endif
         if (ret < 0) {
             av_log(NULL, AV_LOG_ERROR, "Error reinitializing filters!\n");
             return ret;
@@ -2252,29 +2291,98 @@  static int decode(AVCodecContext *avctx, AVFrame *frame, int *got_frame, AVPacke
     return 0;
 }
 
+#if HAVE_THREADS
+static void *filter_pipeline(void *arg)
+{
+    InputFilter *fl = arg;
+    AVFrame *frm;
+    int ret;
+    while(1) {
+        pthread_mutex_lock(&fl->process_mutex);
+        while (fl->waited_frm == NULL && !fl->t_end)
+            pthread_cond_wait(&fl->process_cond, &fl->process_mutex);
+        pthread_mutex_unlock(&fl->process_mutex);
+
+        if (fl->t_end) break;
+
+        frm = fl->waited_frm;
+        pthread_mutex_lock(&fl->ifilter_mutex);
+        ret = ifilter_send_frame(fl, frm);
+        pthread_mutex_unlock(&fl->ifilter_mutex);
+        if (ret == AVERROR_EOF)
+            ret = 0;
+        else if (ret < 0) {
+            av_log(NULL, AV_LOG_ERROR,
+                   "Failed to inject frame into filter network: %s\n", av_err2str(ret));
+        } else {
+            ret = reap_filters(1, fl);
+        }
+        fl->t_error = ret;
+
+        pthread_mutex_lock(&fl->finish_mutex);
+        pthread_cond_signal(&fl->finish_cond);
+        fl->waited_frm = NULL;
+        pthread_mutex_unlock(&fl->finish_mutex);
+    }
+    fl->waited_frm = NULL;
+    pthread_mutex_lock(&fl->finish_mutex);
+    pthread_cond_signal(&fl->finish_cond);
+    pthread_mutex_unlock(&fl->finish_mutex);
+    return fl;
+}
+#endif
+
 static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame)
 {
-    int i, ret;
+    int i, ret = 0;
     AVFrame *f;
 
     av_assert1(ist->nb_filters > 0); /* ensure ret is initialized */
+
     for (i = 0; i < ist->nb_filters; i++) {
         if (i < ist->nb_filters - 1) {
-            f = ist->filter_frame;
+            f = (HAVE_THREADS && abr_threads_enabled) ? ist->filters[i]->input_frm : ist->filter_frame;
             ret = av_frame_ref(f, decoded_frame);
             if (ret < 0)
                 break;
         } else
             f = decoded_frame;
-        ret = ifilter_send_frame(ist->filters[i], f);
-        if (ret == AVERROR_EOF)
-            ret = 0; /* ignore */
-        if (ret < 0) {
-            av_log(NULL, AV_LOG_ERROR,
-                   "Failed to inject frame into filter network: %s\n", av_err2str(ret));
-            break;
+        if (!HAVE_THREADS || !abr_threads_enabled) {
+            ret = ifilter_send_frame(ist->filters[i], f);
+            if (ret == AVERROR_EOF)
+                ret = 0; /* ignore */
+            if (ret < 0) {
+                av_log(NULL, AV_LOG_ERROR,
+                       "Failed to inject frame into filter network: %s\n", av_err2str(ret));
+                break;
+            }
+        }
+#if HAVE_THREADS
+        if (abr_threads_enabled) {
+            pthread_mutex_lock(&ist->filters[i]->process_mutex);
+            ist->filters[i]->waited_frm = f;
+            pthread_cond_signal(&ist->filters[i]->process_cond);
+            pthread_mutex_unlock(&ist->filters[i]->process_mutex);
+        }
+#endif
+    }
+#if HAVE_THREADS
+    if (abr_threads_enabled && ret >= 0) {
+        for (i = 0; i < ist->nb_filters; i++) {
+            pthread_mutex_lock(&ist->filters[i]->finish_mutex);
+            while(ist->filters[i]->waited_frm != NULL)
+                pthread_cond_wait(&ist->filters[i]->finish_cond,
+                        &ist->filters[i]->finish_mutex);
+            pthread_mutex_unlock(&ist->filters[i]->finish_mutex);
+        }
+        for (i = 0; i < ist->nb_filters; i++) {
+            if (ist->filters[i]->t_error < 0) {
+                ret = ist->filters[i]->t_error;
+                break;
+            }
         }
     }
+#endif
     return ret;
 }
 
@@ -2334,7 +2442,6 @@  static int decode_audio(InputStream *ist, AVPacket *pkt, int *got_output,
                                               (AVRational){1, avctx->sample_rate});
     ist->nb_samples = decoded_frame->nb_samples;
     err = send_frame_to_filters(ist, decoded_frame);
-
     av_frame_unref(ist->filter_frame);
     av_frame_unref(decoded_frame);
     return err < 0 ? err : ret;
@@ -3737,6 +3844,39 @@  static int transcode_init(void)
         }
     }
 
+    if (nb_filtergraphs > 1 && filtergraph_is_simple(filtergraphs[0]))
+        abr_threads_enabled = 1;
+#if HAVE_THREADS
+    if (abr_threads_enabled) {
+        for (i = 0; i < nb_input_streams; i++) {
+            ist = input_streams[i];
+            for (j = 0; j < ist->nb_filters; j++) {
+                pthread_mutex_init(&ist->filters[j]->process_mutex, NULL);
+                pthread_mutex_init(&ist->filters[j]->finish_mutex, NULL);
+                pthread_cond_init(&ist->filters[j]->process_cond, NULL);
+                pthread_cond_init(&ist->filters[j]->finish_cond, NULL);
+                pthread_mutex_init(&ist->filters[j]->ifilter_mutex, NULL);
+                if (i == 0) {
+                    pthread_mutex_init(&fg_config_mutex, NULL);
+                    pthread_mutex_init(&ost_init_mutex, NULL);
+                }
+                ist->filters[j]->t_end = 0;
+                ist->filters[j]->t_error = 0;
+                ist->filters[j]->input_frm = av_frame_alloc();
+                if (!ist->filters[j]->input_frm)
+                    return AVERROR(ENOMEM);
+
+                if ((ret = pthread_create(&ist->filters[j]->abr_thread, NULL, filter_pipeline,
+                                ist->filters[j]))) {
+                    av_log(NULL, AV_LOG_ERROR,
+                            "abr pipeline pthread_create failed.\n");
+                    return AVERROR(ret);
+                }
+            }
+        }
+    }
+#endif
+
  dump_format:
     /* dump the stream mapping */
     av_log(NULL, AV_LOG_INFO, "Stream mapping:\n");
@@ -4537,10 +4677,10 @@  static int transcode_from_filter(FilterGraph *graph, InputStream **best_ist)
     *best_ist = NULL;
     ret = avfilter_graph_request_oldest(graph->graph);
     if (ret >= 0)
-        return reap_filters(0);
+        return reap_filters(0, NULL);
 
     if (ret == AVERROR_EOF) {
-        ret = reap_filters(1);
+        ret = reap_filters(1, NULL);
         for (i = 0; i < graph->nb_outputs; i++)
             close_output_stream(graph->outputs[i]->ost);
         return ret;
@@ -4642,7 +4782,7 @@  static int transcode_step(void)
     if (ret < 0)
         return ret == AVERROR_EOF ? 0 : ret;
 
-    return reap_filters(0);
+    return (HAVE_THREADS && abr_threads_enabled) ? ret : reap_filters(0, NULL);
 }
 
 /*
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index eb1eaf6..b1179f9 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -253,6 +253,19 @@  typedef struct InputFilter {
 
     AVBufferRef *hw_frames_ctx;
 
+    // for abr pipeline
+    AVFrame *waited_frm;
+    AVFrame *input_frm;
+#if HAVE_THREADS
+    pthread_t abr_thread;
+    pthread_cond_t process_cond;
+    pthread_cond_t finish_cond;
+    pthread_mutex_t process_mutex;
+    pthread_mutex_t finish_mutex;
+    pthread_mutex_t ifilter_mutex;
+    int t_end;
+    int t_error;
+#endif
     int eof;
 } InputFilter;