[FFmpeg-devel,v6] Improved the performance of 1 decode + N filter graphs and adaptive bitrate.

Submitted by Shaofei Wang on March 13, 2019, 9:54 p.m.

Details

Message ID 1552514093-9148-1-git-send-email-shaofei.wang@intel.com
State New
Headers show

Commit Message

Shaofei Wang March 13, 2019, 9:54 p.m.
It enabled multiple simple filter graph concurrency, which bring above about
4%~20% improvement in some 1:N scenarios by CPU or GPU acceleration

Below are some test cases and comparison as reference.
(Hardware platform: Intel(R) Core(TM) i7-6700 CPU @ 3.40GHz)
(Software: Intel iHD driver - 16.9.00100, CentOS 7)

For 1:N transcode by GPU acceleration with vaapi:
./ffmpeg -vaapi_device /dev/dri/renderD128 -hwaccel vaapi \
    -hwaccel_output_format vaapi \
    -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
    -vf "scale_vaapi=1280:720" -c:v h264_vaapi -f null /dev/null \
    -vf "scale_vaapi=720:480" -c:v h264_vaapi -f null /dev/null

    test results:
                2 encoders 5 encoders 10 encoders
    Improved       6.1%    6.9%       5.5%

For 1:N transcode by GPU acceleration with QSV:
./ffmpeg -hwaccel qsv -c:v h264_qsv \
    -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
    -vf "scale_qsv=1280:720:format=nv12" -c:v h264_qsv -f null /dev/null \
    -vf "scale_qsv=720:480:format=nv12" -c:v h264_qsv -f null /dev/null

    test results:
                2 encoders  5 encoders 10 encoders
    Improved       6%       4%         15%

For Intel GPU acceleration case, 1 decode to N scaling, by QSV:
./ffmpeg -hwaccel qsv -c:v h264_qsv \
    -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
    -vf "scale_qsv=1280:720:format=nv12,hwdownload" -pix_fmt nv12 -f null /dev/null \
    -vf "scale_qsv=720:480:format=nv12,hwdownload" -pix_fmt nv12 -f null /dev/null

    test results:
                2 scale  5 scale   10 scale
    Improved       12%     21%        21%

For CPU only 1 decode to N scaling:
./ffmpeg -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
    -vf "scale=1280:720" -pix_fmt nv12 -f null /dev/null \
    -vf "scale=720:480" -pix_fmt nv12 -f null /dev/null

    test results:
                2 scale  5 scale   10 scale
    Improved       25%    107%       148%

Signed-off-by: Wang, Shaofei <shaofei.wang@intel.com>
---
Passed fate and refine the possible data race.
The patch will only effect on multiple SIMPLE filter graphs pipeline

 fftools/ffmpeg.c | 172 +++++++++++++++++++++++++++++++++++++++++++++++++------
 fftools/ffmpeg.h |  13 +++++
 2 files changed, 169 insertions(+), 16 deletions(-)

Comments

Michael Niedermayer March 13, 2019, 4:17 p.m.
On Wed, Mar 13, 2019 at 05:54:53PM -0400, Shaofei Wang wrote:
> It enabled multiple simple filter graph concurrency, which bring above about
> 4%~20% improvement in some 1:N scenarios by CPU or GPU acceleration
> 
> Below are some test cases and comparison as reference.
> (Hardware platform: Intel(R) Core(TM) i7-6700 CPU @ 3.40GHz)
> (Software: Intel iHD driver - 16.9.00100, CentOS 7)

This causes aborts.
I suggest you test it with a fuzzer
example:
Program received signal SIGABRT, Aborted.
0x00007fffefeaec37 in raise () from /lib/x86_64-linux-gnu/libc.so.6
(gdb) bt
Python Exception <type 'exceptions.ImportError'> No module named gdb.frames: 
#0  0x00007fffefeaec37 in raise () from /lib/x86_64-linux-gnu/libc.so.6
#1  0x00007fffefeb2028 in abort () from /lib/x86_64-linux-gnu/libc.so.6
#2  0x000000000042bf71 in strict_pthread_join (thread=0, value_ptr=0x0) at ./libavutil/thread.h:55
#3  0x000000000042d412 in ffmpeg_cleanup (ret=1) at fftools/ffmpeg.c:526
#4  0x00000000004251a8 in exit_program (ret=1) at fftools/cmdutils.c:139
#5  0x000000000043f152 in main (argc=6, argv=0x7fffffffe3a8) at fftools/ffmpeg.c:5032

[...]

Patch hide | download patch | download mbox

diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index 544f1a1..c0c9ca8 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -164,7 +164,13 @@  static struct termios oldtty;
 static int restore_tty;
 #endif
 
+/* enable abr threads when there were multiple simple filter graphs*/
+static int abr_threads_enabled = 0;
+
 #if HAVE_THREADS
+pthread_mutex_t fg_config_mutex;
+pthread_mutex_t ost_init_mutex;
+
 static void free_input_threads(void);
 #endif
 
@@ -509,6 +515,17 @@  static void ffmpeg_cleanup(int ret)
                 }
                 av_fifo_freep(&fg->inputs[j]->ist->sub2video.sub_queue);
             }
+#if HAVE_THREADS
+            if (abr_threads_enabled) {
+                av_frame_free(&fg->inputs[j]->input_frm);
+                pthread_mutex_lock(&fg->inputs[j]->process_mutex);
+                fg->inputs[j]->waited_frm = NULL;
+                fg->inputs[j]->t_end = 1;
+                pthread_cond_signal(&fg->inputs[j]->process_cond);
+                pthread_mutex_unlock(&fg->inputs[j]->process_mutex);
+                pthread_join(fg->inputs[j]->abr_thread, NULL);
+            }
+#endif
             av_buffer_unref(&fg->inputs[j]->hw_frames_ctx);
             av_freep(&fg->inputs[j]->name);
             av_freep(&fg->inputs[j]);
@@ -1419,12 +1436,13 @@  static void finish_output_stream(OutputStream *ost)
  *
  * @return  0 for success, <0 for severe errors
  */
-static int reap_filters(int flush)
+static int reap_filters(int flush, InputFilter * ifilter)
 {
     AVFrame *filtered_frame = NULL;
     int i;
 
-    /* Reap all buffers present in the buffer sinks */
+    /* Reap all buffers present in the buffer sinks or just reap specified
+     * buffer which related with the filter graph who got ifilter as input*/
     for (i = 0; i < nb_output_streams; i++) {
         OutputStream *ost = output_streams[i];
         OutputFile    *of = output_files[ost->file_index];
@@ -1432,13 +1450,25 @@  static int reap_filters(int flush)
         AVCodecContext *enc = ost->enc_ctx;
         int ret = 0;
 
+        if (ifilter && abr_threads_enabled)
+            if (ost != ifilter->graph->outputs[0])
+                continue;
+
         if (!ost->filter || !ost->filter->graph->graph)
             continue;
         filter = ost->filter->filter;
 
         if (!ost->initialized) {
             char error[1024] = "";
+#if HAVE_THREADS
+            if (abr_threads_enabled)
+                pthread_mutex_lock(&ost_init_mutex);
+#endif
             ret = init_output_stream(ost, error, sizeof(error));
+#if HAVE_THREADS
+            if (abr_threads_enabled)
+                pthread_mutex_unlock(&ost_init_mutex);
+#endif
             if (ret < 0) {
                 av_log(NULL, AV_LOG_ERROR, "Error initializing output stream %d:%d -- %s\n",
                        ost->file_index, ost->index, error);
@@ -2179,13 +2209,22 @@  static int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame)
             }
         }
 
-        ret = reap_filters(1);
+        ret = (HAVE_THREADS && abr_threads_enabled) ? reap_filters(1, ifilter) : reap_filters(1, NULL);
+
         if (ret < 0 && ret != AVERROR_EOF) {
             av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n", av_err2str(ret));
             return ret;
         }
 
+#if HAVE_THREADS
+        if (abr_threads_enabled)
+            pthread_mutex_lock(&fg_config_mutex);
+#endif
         ret = configure_filtergraph(fg);
+#if HAVE_THREADS
+        if (abr_threads_enabled)
+            pthread_mutex_unlock(&fg_config_mutex);
+#endif
         if (ret < 0) {
             av_log(NULL, AV_LOG_ERROR, "Error reinitializing filters!\n");
             return ret;
@@ -2252,29 +2291,98 @@  static int decode(AVCodecContext *avctx, AVFrame *frame, int *got_frame, AVPacke
     return 0;
 }
 
+#if HAVE_THREADS
+static void *filter_pipeline(void *arg)
+{
+    InputFilter *fl = arg;
+    AVFrame *frm;
+    int ret;
+    while(1) {
+        pthread_mutex_lock(&fl->process_mutex);
+        while (fl->waited_frm == NULL && !fl->t_end)
+            pthread_cond_wait(&fl->process_cond, &fl->process_mutex);
+        pthread_mutex_unlock(&fl->process_mutex);
+
+        if (fl->t_end) break;
+
+        frm = fl->waited_frm;
+        pthread_mutex_lock(&fl->ifilter_mutex);
+        ret = ifilter_send_frame(fl, frm);
+        pthread_mutex_unlock(&fl->ifilter_mutex);
+        if (ret == AVERROR_EOF)
+            ret = 0;
+        else if (ret < 0) {
+            av_log(NULL, AV_LOG_ERROR,
+                   "Failed to inject frame into filter network: %s\n", av_err2str(ret));
+        } else {
+            ret = reap_filters(1, fl);
+        }
+        fl->t_error = ret;
+
+        pthread_mutex_lock(&fl->finish_mutex);
+        pthread_cond_signal(&fl->finish_cond);
+        fl->waited_frm = NULL;
+        pthread_mutex_unlock(&fl->finish_mutex);
+    }
+    fl->waited_frm = NULL;
+    pthread_mutex_lock(&fl->finish_mutex);
+    pthread_cond_signal(&fl->finish_cond);
+    pthread_mutex_unlock(&fl->finish_mutex);
+    return fl;
+}
+#endif
+
 static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame)
 {
-    int i, ret;
+    int i, ret = 0;
     AVFrame *f;
 
     av_assert1(ist->nb_filters > 0); /* ensure ret is initialized */
+
     for (i = 0; i < ist->nb_filters; i++) {
         if (i < ist->nb_filters - 1) {
-            f = ist->filter_frame;
+            f = (HAVE_THREADS && abr_threads_enabled) ? ist->filters[i]->input_frm : ist->filter_frame;
             ret = av_frame_ref(f, decoded_frame);
             if (ret < 0)
                 break;
         } else
             f = decoded_frame;
-        ret = ifilter_send_frame(ist->filters[i], f);
-        if (ret == AVERROR_EOF)
-            ret = 0; /* ignore */
-        if (ret < 0) {
-            av_log(NULL, AV_LOG_ERROR,
-                   "Failed to inject frame into filter network: %s\n", av_err2str(ret));
-            break;
+        if (!HAVE_THREADS || !abr_threads_enabled) {
+            ret = ifilter_send_frame(ist->filters[i], f);
+            if (ret == AVERROR_EOF)
+                ret = 0; /* ignore */
+            if (ret < 0) {
+                av_log(NULL, AV_LOG_ERROR,
+                       "Failed to inject frame into filter network: %s\n", av_err2str(ret));
+                break;
+            }
+        }
+#if HAVE_THREADS
+        if (abr_threads_enabled) {
+            pthread_mutex_lock(&ist->filters[i]->process_mutex);
+            ist->filters[i]->waited_frm = f;
+            pthread_cond_signal(&ist->filters[i]->process_cond);
+            pthread_mutex_unlock(&ist->filters[i]->process_mutex);
+        }
+#endif
+    }
+#if HAVE_THREADS
+    if (abr_threads_enabled && ret >= 0) {
+        for (i = 0; i < ist->nb_filters; i++) {
+            pthread_mutex_lock(&ist->filters[i]->finish_mutex);
+            while(ist->filters[i]->waited_frm != NULL)
+                pthread_cond_wait(&ist->filters[i]->finish_cond,
+                        &ist->filters[i]->finish_mutex);
+            pthread_mutex_unlock(&ist->filters[i]->finish_mutex);
+        }
+        for (i = 0; i < ist->nb_filters; i++) {
+            if (ist->filters[i]->t_error < 0) {
+                ret = ist->filters[i]->t_error;
+                break;
+            }
         }
     }
+#endif
     return ret;
 }
 
@@ -2334,7 +2442,6 @@  static int decode_audio(InputStream *ist, AVPacket *pkt, int *got_output,
                                               (AVRational){1, avctx->sample_rate});
     ist->nb_samples = decoded_frame->nb_samples;
     err = send_frame_to_filters(ist, decoded_frame);
-
     av_frame_unref(ist->filter_frame);
     av_frame_unref(decoded_frame);
     return err < 0 ? err : ret;
@@ -3680,6 +3787,8 @@  static int transcode_init(void)
                     break;
             ofilter->ost->source_index = k;
         }
+        if (i >= 1 && filtergraph_is_simple(fg))
+            abr_threads_enabled = 1;
     }
 
     /* init framerate emulation */
@@ -3737,6 +3846,37 @@  static int transcode_init(void)
         }
     }
 
+#if HAVE_THREADS
+    if (abr_threads_enabled) {
+        for (i = 0; i < nb_input_streams; i++) {
+            ist = input_streams[i];
+            for (j = 0; j < ist->nb_filters; j++) {
+                pthread_mutex_init(&ist->filters[j]->process_mutex, NULL);
+                pthread_mutex_init(&ist->filters[j]->finish_mutex, NULL);
+                pthread_cond_init(&ist->filters[j]->process_cond, NULL);
+                pthread_cond_init(&ist->filters[j]->finish_cond, NULL);
+                pthread_mutex_init(&ist->filters[j]->ifilter_mutex, NULL);
+                if (i == 0) {
+                    pthread_mutex_init(&fg_config_mutex, NULL);
+                    pthread_mutex_init(&ost_init_mutex, NULL);
+                }
+                ist->filters[j]->t_end = 0;
+                ist->filters[j]->t_error = 0;
+                ist->filters[j]->input_frm = av_frame_alloc();
+                if (!ist->filters[j]->input_frm)
+                    return AVERROR(ENOMEM);
+
+                if ((ret = pthread_create(&ist->filters[j]->abr_thread, NULL, filter_pipeline,
+                                ist->filters[j]))) {
+                    av_log(NULL, AV_LOG_ERROR,
+                            "abr pipeline pthread_create failed.\n");
+                    return AVERROR(ret);
+                }
+            }
+        }
+    }
+#endif
+
  dump_format:
     /* dump the stream mapping */
     av_log(NULL, AV_LOG_INFO, "Stream mapping:\n");
@@ -4537,10 +4677,10 @@  static int transcode_from_filter(FilterGraph *graph, InputStream **best_ist)
     *best_ist = NULL;
     ret = avfilter_graph_request_oldest(graph->graph);
     if (ret >= 0)
-        return reap_filters(0);
+        return reap_filters(0, NULL);
 
     if (ret == AVERROR_EOF) {
-        ret = reap_filters(1);
+        ret = reap_filters(1, NULL);
         for (i = 0; i < graph->nb_outputs; i++)
             close_output_stream(graph->outputs[i]->ost);
         return ret;
@@ -4642,7 +4782,7 @@  static int transcode_step(void)
     if (ret < 0)
         return ret == AVERROR_EOF ? 0 : ret;
 
-    return reap_filters(0);
+    return (HAVE_THREADS && abr_threads_enabled) ? ret : reap_filters(0, NULL);
 }
 
 /*
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index eb1eaf6..a0f11d3 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -253,6 +253,19 @@  typedef struct InputFilter {
 
     AVBufferRef *hw_frames_ctx;
 
+    // for abr pipeline
+    AVFrame *waited_frm;
+    AVFrame *input_frm;
+#if HAVE_THREADS
+    pthread_t abr_thread;
+    pthread_cond_t process_cond;
+    pthread_cond_t finish_cond;
+    pthread_mutex_t process_mutex;
+    pthread_mutex_t finish_mutex;
+    pthread_mutex_t ifilter_mutex;
+#endif
+    int t_end;
+    int t_error;
     int eof;
 } InputFilter;