Message ID | 1549924864-26466-1-git-send-email-shaofei.wang@intel.com |
---|---|
State | Superseded |
Headers | show |
Code clean and remove the "-abr_pipeline" option, use the perf improved code path by default only if HAVE_THREAD enabled.
On Mon, Feb 11, 2019 at 05:41:04PM -0500, Shaofei Wang wrote: > It enabled multiple filter graph concurrency, which bring above about > 4%~20% improvement in some 1:N scenarios by CPU or GPU acceleration > > Below are some test cases and comparison as reference. > (Hardware platform: Intel(R) Core(TM) i7-6700 CPU @ 3.40GHz) > (Software: Intel iHD driver - 16.9.00100, CentOS 7) > > For 1:N transcode by GPU acceleration with vaapi: > ./ffmpeg -vaapi_device /dev/dri/renderD128 -hwaccel vaapi \ > -hwaccel_output_format vaapi \ > -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \ > -vf "scale_vaapi=1280:720" -c:v h264_vaapi -f null /dev/null \ > -vf "scale_vaapi=720:480" -c:v h264_vaapi -f null /dev/null > > test results: > 2 encoders 5 encoders 10 encoders > Improved 6.1% 6.9% 5.5% > > For 1:N transcode by GPU acceleration with QSV: > ./ffmpeg -hwaccel qsv -c:v h264_qsv \ > -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \ > -vf "scale_qsv=1280:720:format=nv12" -c:v h264_qsv -f null /dev/null \ > -vf "scale_qsv=720:480:format=nv12" -c:v h264_qsv -f null /dev/null > > test results: > 2 encoders 5 encoders 10 encoders > Improved 6% 4% 15% > > For Intel GPU acceleration case, 1 decode to N scaling, by QSV: > ./ffmpeg -hwaccel qsv -c:v h264_qsv \ > -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \ > -vf "scale_qsv=1280:720:format=nv12,hwdownload" -pix_fmt nv12 -f null /dev/null \ > -vf "scale_qsv=720:480:format=nv12,hwdownload" -pix_fmt nv12 -f null /dev/null > > test results: > 2 scale 5 scale 10 scale > Improved 12% 21% 21% > > For CPU only 1 decode to N scaling: > ./ffmpeg -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \ > -vf "scale=1280:720" -pix_fmt nv12 -f null /dev/null \ > -vf "scale=720:480" -pix_fmt nv12 -f null /dev/null > > test results: > 2 scale 5 scale 10 scale > Improved 25% 107% 148% > > Signed-off-by: Wang, Shaofei <shaofei.wang@intel.com> > Reviewed-by: Zhao, Jun <jun.zhao@intel.com> > --- > fftools/ffmpeg.c | 112 +++++++++++++++++++++++++++++++++++++++++++++--- > fftools/ffmpeg.h | 14 ++++++ > fftools/ffmpeg_filter.c | 4 ++ > 3 files changed, 124 insertions(+), 6 deletions(-) breaks fate make: *** [fate-lavf-mxf_d10] Error 1 make: *** [fate-filter-tremolo] Error 1 make: *** [fate-filter-chorus] Error 1 make: *** [tests/data/hls-list-append.m3u8] Error 1 make: *** [fate-filter-atrim-mixed] Error 1 make: *** [fate-filter-atrim-time] Error 1 make: *** [tests/data/live_last_endlist.m3u8] Error 1 make: *** [fate-filter-volume] Error 1 make: *** [fate-filter-join] Error 1 make: *** [fate-lavf-mxf] Error 1 make: *** [fate-swr-resample-s16p-44100-8000] Error 1 make: *** [fate-swr-resample-s16p-44100-2626] Error 1 ... [...]
On 11/02/2019 22:41, Shaofei Wang wrote: Please avoid sending messages from the future - the list received this about thirteen hours before its supposed send time (received "Mon, 11 Feb 2019 11:42:09 +0200", sent "Mon, 11 Feb 2019 17:41:04 -0500"). Probably the sending machine or some intermediate has an incorrect time or time zone. > It enabled multiple filter graph concurrency, which bring above about > 4%~20% improvement in some 1:N scenarios by CPU or GPU acceleration > > Below are some test cases and comparison as reference. > (Hardware platform: Intel(R) Core(TM) i7-6700 CPU @ 3.40GHz) > (Software: Intel iHD driver - 16.9.00100, CentOS 7) > > For 1:N transcode by GPU acceleration with vaapi: > ./ffmpeg -vaapi_device /dev/dri/renderD128 -hwaccel vaapi \ > -hwaccel_output_format vaapi \ > -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \ > -vf "scale_vaapi=1280:720" -c:v h264_vaapi -f null /dev/null \ > -vf "scale_vaapi=720:480" -c:v h264_vaapi -f null /dev/null > > test results: > 2 encoders 5 encoders 10 encoders > Improved 6.1% 6.9% 5.5% > > For 1:N transcode by GPU acceleration with QSV: > ./ffmpeg -hwaccel qsv -c:v h264_qsv \ > -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \ > -vf "scale_qsv=1280:720:format=nv12" -c:v h264_qsv -f null /dev/null \ > -vf "scale_qsv=720:480:format=nv12" -c:v h264_qsv -f null /dev/null > > test results: > 2 encoders 5 encoders 10 encoders > Improved 6% 4% 15% > > For Intel GPU acceleration case, 1 decode to N scaling, by QSV: > ./ffmpeg -hwaccel qsv -c:v h264_qsv \ > -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \ > -vf "scale_qsv=1280:720:format=nv12,hwdownload" -pix_fmt nv12 -f null /dev/null \ > -vf "scale_qsv=720:480:format=nv12,hwdownload" -pix_fmt nv12 -f null /dev/null > > test results: > 2 scale 5 scale 10 scale > Improved 12% 21% 21% > > For CPU only 1 decode to N scaling: > ./ffmpeg -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \ > -vf "scale=1280:720" -pix_fmt nv12 -f null /dev/null \ > -vf "scale=720:480" -pix_fmt nv12 -f null /dev/null > > test results: > 2 scale 5 scale 10 scale > Improved 25% 107% 148% > Some numbers for more use-cases and platforms (with different architectures and core counts) would be a good idea if you intend to enable this by default. Presumably it's a bit slower on less powerful machines with fewer cores when it makes many threads, but by how much? Is that acceptable? > Signed-off-by: Wang, Shaofei <shaofei.wang@intel.com> > Reviewed-by: Zhao, Jun <jun.zhao@intel.com> > --- > fftools/ffmpeg.c | 112 +++++++++++++++++++++++++++++++++++++++++++++--- > fftools/ffmpeg.h | 14 ++++++ > fftools/ffmpeg_filter.c | 4 ++ > 3 files changed, 124 insertions(+), 6 deletions(-) > > diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c > index 544f1a1..67b1a2a 100644 > --- a/fftools/ffmpeg.c > +++ b/fftools/ffmpeg.c > @@ -1419,13 +1419,18 @@ static void finish_output_stream(OutputStream *ost) > * > * @return 0 for success, <0 for severe errors > */ > -static int reap_filters(int flush) > +static int reap_filters(int flush, InputFilter * ifilter) > { > AVFrame *filtered_frame = NULL; > int i; > > - /* Reap all buffers present in the buffer sinks */ > + /* Reap all buffers present in the buffer sinks or just reap specified > + * input filter buffer */ > for (i = 0; i < nb_output_streams; i++) { > + if (ifilter) { > + if (ifilter != output_streams[i]->filter->graph->inputs[0]) > + continue; > + } No mixed declarations and code. > OutputStream *ost = output_streams[i]; > OutputFile *of = output_files[ost->file_index]; > AVFilterContext *filter; How carefully has this been audited to make sure that there are no data races? The calls to init_output_stream() and do_video_out() both do /a lot/, and in particular they interact with the InputStream which might be shared with other threads (and indeed is in all your examples above). > @@ -2179,7 +2184,8 @@ static int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame) > } > } > > - ret = reap_filters(1); > + ret = HAVE_THREADS ? reap_filters(1, ifilter) : reap_filters(1, NULL); > + > if (ret < 0 && ret != AVERROR_EOF) { > av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n", av_err2str(ret)); > return ret; > @@ -2208,6 +2214,14 @@ static int ifilter_send_eof(InputFilter *ifilter, int64_t pts) > > ifilter->eof = 1; > > +#if HAVE_THREADS > + ifilter->waited_frm = NULL; > + pthread_mutex_lock(&ifilter->process_mutex); > + ifilter->t_end = 1; > + pthread_cond_signal(&ifilter->process_cond); > + pthread_mutex_unlock(&ifilter->process_mutex); > + pthread_join(ifilter->f_thread, NULL); > +#endif > if (ifilter->filter) { > ret = av_buffersrc_close(ifilter->filter, pts, AV_BUFFERSRC_FLAG_PUSH); > if (ret < 0) > @@ -2252,12 +2266,95 @@ static int decode(AVCodecContext *avctx, AVFrame *frame, int *got_frame, AVPacke > return 0; > } > > +#if HAVE_THREADS > +static void *filter_pipeline(void *arg) > +{ > + InputFilter *fl = arg; > + AVFrame *frm; > + int ret; > + while(1) { > + pthread_mutex_lock(&fl->process_mutex); > + while (fl->waited_frm == NULL && !fl->t_end) > + pthread_cond_wait(&fl->process_cond, &fl->process_mutex); > + pthread_mutex_unlock(&fl->process_mutex); > + > + if (fl->t_end) break; > + > + frm = fl->waited_frm; > + ret = ifilter_send_frame(fl, frm); > + if (ret < 0) { > + av_log(NULL, AV_LOG_ERROR, > + "Failed to inject frame into filter network: %s\n", av_err2str(ret)); > + } else { > + ret = reap_filters(0, fl); > + } > + fl->t_error = ret; > + > + pthread_mutex_lock(&fl->finish_mutex); > + fl->waited_frm = NULL; > + pthread_cond_signal(&fl->finish_cond); > + pthread_mutex_unlock(&fl->finish_mutex); > + > + if (ret < 0) > + break; Is this error always totally fatal? (I guess I'm wondering if any EAGAIN-like cases end up here.) > + } > + return fl; This return value seems to be unused? > +} > +#endif > + > static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame) > { > int i, ret; > AVFrame *f; > > av_assert1(ist->nb_filters > 0); /* ensure ret is initialized */ > +#if HAVE_THREADS > + for (i = 0; i < ist->nb_filters; i++) { > + //it will use abr_pipeline mode by default > + if (i < ist->nb_filters - 1) { > + f = &ist->filters[i]->input_frm; > + ret = av_frame_ref(f, decoded_frame); > + if (ret < 0) > + break; Won't this just deadlock if you ever hit the break? You'll immediately wait for threads which haven't been given anything to do. > + } else > + f = decoded_frame; > + > + if (!ist->filters[i]->b_abr_thread_init) { > + if ((ret = pthread_create(&ist->filters[i]->f_thread, NULL, filter_pipeline, > + ist->filters[i]))) { > + av_log(NULL, AV_LOG_ERROR, > + "pthread_create failed: %s. Try to increase `ulimit -v` or \ > + decrease `ulimit -s`.\n", strerror(ret)); What is the motivation for these recommendations? Neither seems likely to help except in very weirdly constrained systems. > + return AVERROR(ret); > + } > + pthread_mutex_init(&ist->filters[i]->process_mutex, NULL); > + pthread_mutex_init(&ist->filters[i]->finish_mutex, NULL); > + pthread_cond_init(&ist->filters[i]->process_cond, NULL); > + pthread_cond_init(&ist->filters[i]->finish_cond, NULL); > + ist->filters[i]->t_end = 0; > + ist->filters[i]->t_error = 0; > + ist->filters[i]->b_abr_thread_init = 1; > + } > + > + pthread_mutex_lock(&ist->filters[i]->process_mutex); > + ist->filters[i]->waited_frm = f; > + pthread_cond_signal(&ist->filters[i]->process_cond); > + pthread_mutex_unlock(&ist->filters[i]->process_mutex); > + } > + > + for (i = 0; i < ist->nb_filters; i++) { > + pthread_mutex_lock(&ist->filters[i]->finish_mutex); > + while(ist->filters[i]->waited_frm != NULL) > + pthread_cond_wait(&ist->filters[i]->finish_cond, &ist->filters[i]->finish_mutex); > + pthread_mutex_unlock(&ist->filters[i]->finish_mutex); > + } Is the lockstep such that you can actually use the same mutex and condvar for both parts? That would seem simpler if it works. > + for (i = 0; i < ist->nb_filters; i++) { > + if (ist->filters[i]->t_error < 0) { > + ret = ist->filters[i]->t_error; > + break; > + } > + } > +#else > for (i = 0; i < ist->nb_filters; i++) { > if (i < ist->nb_filters - 1) { > f = ist->filter_frame; > @@ -2266,6 +2363,7 @@ static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame) > break; > } else > f = decoded_frame; > + Stray change? > ret = ifilter_send_frame(ist->filters[i], f); > if (ret == AVERROR_EOF) > ret = 0; /* ignore */ > @@ -2275,6 +2373,8 @@ static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame) > break; > } > } > +#endif There is still a bit of common code here between the two branches. I think you really do want the #ifdefed region to be as small as possible (you can put the loop outside the condition with a new loop start in the HAVE_THREADS case only). > + > return ret; > } > > @@ -4537,10 +4637,10 @@ static int transcode_from_filter(FilterGraph *graph, InputStream **best_ist) > *best_ist = NULL; > ret = avfilter_graph_request_oldest(graph->graph); > if (ret >= 0) > - return reap_filters(0); > + return reap_filters(0, NULL); I'm not entirely sure I'm reading this correctly, but I think this is the complex filtergraph case. That means that using -filter_complex split will have quite different behaviour to multiple -vf instances? > > if (ret == AVERROR_EOF) { > - ret = reap_filters(1); > + ret = reap_filters(1, NULL); > for (i = 0; i < graph->nb_outputs; i++) > close_output_stream(graph->outputs[i]->ost); > return ret; > @@ -4642,7 +4742,7 @@ static int transcode_step(void) > if (ret < 0) > return ret == AVERROR_EOF ? 0 : ret; > > - return reap_filters(0); > + return HAVE_THREADS ? ret : reap_filters(0, NULL); > } > > /* > diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h > index eb1eaf6..9a8e776 100644 > --- a/fftools/ffmpeg.h > +++ b/fftools/ffmpeg.h > @@ -253,6 +253,20 @@ typedef struct InputFilter { > > AVBufferRef *hw_frames_ctx; > > + // for abr pipeline > + int b_abr_thread_init; I'm not sure what this name is intended to mean at all. Since it indicates whether the filter thread has been created, maybe something like "filter_thread_created" would make the meaning clearer? > +#if HAVE_THREADS > + AVFrame *waited_frm; > + AVFrame input_frm; sizeof(AVFrame) is not part of the ABI. You need to allocate it somewhere. > + pthread_t f_thread; "filter_thread"? > + pthread_cond_t process_cond; > + pthread_cond_t finish_cond; > + pthread_mutex_t process_mutex; > + pthread_mutex_t finish_mutex; > + int t_end; > + int t_error; I think it would be a good idea to document the condition associated with each of these. > +#endif > + > int eof; > } InputFilter; > > diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c > index 6518d50..5d1e521 100644 > --- a/fftools/ffmpeg_filter.c > +++ b/fftools/ffmpeg_filter.c > @@ -197,6 +197,7 @@ DEF_CHOOSE_FORMAT(channel_layouts, uint64_t, channel_layout, channel_layouts, 0, > int init_simple_filtergraph(InputStream *ist, OutputStream *ost) > { > FilterGraph *fg = av_mallocz(sizeof(*fg)); > + int i; > > if (!fg) > exit_program(1); > @@ -225,6 +226,9 @@ int init_simple_filtergraph(InputStream *ist, OutputStream *ost) > GROW_ARRAY(ist->filters, ist->nb_filters); > ist->filters[ist->nb_filters - 1] = fg->inputs[0]; > > + for (i = 0; i < ist->nb_filters; i++) > + ist->filters[i]->b_abr_thread_init = 0; It doesn't look like the right place for this init? init_simple_filtergraph() is called once per output stream, so this is going to happen multiple times. > + > GROW_ARRAY(filtergraphs, nb_filtergraphs); > filtergraphs[nb_filtergraphs - 1] = fg; > > - Mark
> -----Original Message----- > From: ffmpeg-devel [mailto:ffmpeg-devel-bounces@ffmpeg.org] On Behalf Of > Mark Thompson > Sent: Tuesday, February 12, 2019 8:18 AM It should be UTC time when received the email > To: ffmpeg-devel@ffmpeg.org > Subject: Re: [FFmpeg-devel] [PATCH v4] Improved the performance of 1 > decode + N filter graphs and adaptive bitrate. > > On 11/02/2019 22:41, Shaofei Wang wrote: And the above time I've sent the previous email is also a correct UTC time > Please avoid sending messages from the future - the list received this about > thirteen hours before its supposed send time (received "Mon, 11 Feb 2019 > 11:42:09 +0200", sent "Mon, 11 Feb 2019 17:41:04 -0500"). > Probably the sending machine or some intermediate has an incorrect time or > time zone. It may be the reason. > Some numbers for more use-cases and platforms (with different architectures > and core counts) would be a good idea if you intend to enable this by default. It would be better to have more platforms data. Actually, it provide option for user to choose a "faster" path in the previous version. In this patch it simplified code path. > Presumably it's a bit slower on less powerful machines with fewer cores when > it makes many threads, but by how much? Is that acceptable? Is it resource limited machine that we should disable HAVE_THREADS? > > diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c index > > 544f1a1..67b1a2a 100644 > > --- a/fftools/ffmpeg.c > > +++ b/fftools/ffmpeg.c > > @@ -1419,13 +1419,18 @@ static void > finish_output_stream(OutputStream *ost) > > * > > * @return 0 for success, <0 for severe errors > > */ > > -static int reap_filters(int flush) > > +static int reap_filters(int flush, InputFilter * ifilter) > > { > > AVFrame *filtered_frame = NULL; > > int i; > > > > - /* Reap all buffers present in the buffer sinks */ > > + /* Reap all buffers present in the buffer sinks or just reap specified > > + * input filter buffer */ > > for (i = 0; i < nb_output_streams; i++) { > > + if (ifilter) { > > + if (ifilter != output_streams[i]->filter->graph->inputs[0]) > > + continue; > > + } > > No mixed declarations and code. OK. > > OutputStream *ost = output_streams[i]; > > OutputFile *of = output_files[ost->file_index]; > > AVFilterContext *filter; > > How carefully has this been audited to make sure that there are no data races? > The calls to init_output_stream() and do_video_out() both do /a lot/, and in > particular they interact with the InputStream which might be shared with > other threads (and indeed is in all your examples above). Base on the code path of multithread, it won't have duplicated path to call init_output_stream() and do_video_out(), since there's no output stream share multiple filter graphs. And this concern should be hightlight, will investigate more in the code. > > @@ -2179,7 +2184,8 @@ static int ifilter_send_frame(InputFilter *ifilter, > AVFrame *frame) > > } > > } > > > > - ret = reap_filters(1); > > + ret = HAVE_THREADS ? reap_filters(1, ifilter) : > > + reap_filters(1, NULL); > > + > > if (ret < 0 && ret != AVERROR_EOF) { > > av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n", > av_err2str(ret)); > > return ret; > > @@ -2208,6 +2214,14 @@ static int ifilter_send_eof(InputFilter > > *ifilter, int64_t pts) > > > > ifilter->eof = 1; > > > > +#if HAVE_THREADS > > + ifilter->waited_frm = NULL; > > + pthread_mutex_lock(&ifilter->process_mutex); > > + ifilter->t_end = 1; > > + pthread_cond_signal(&ifilter->process_cond); > > + pthread_mutex_unlock(&ifilter->process_mutex); > > + pthread_join(ifilter->f_thread, NULL); #endif > > if (ifilter->filter) { > > ret = av_buffersrc_close(ifilter->filter, pts, > AV_BUFFERSRC_FLAG_PUSH); > > if (ret < 0) > > @@ -2252,12 +2266,95 @@ static int decode(AVCodecContext *avctx, > AVFrame *frame, int *got_frame, AVPacke > > return 0; > > } > > > > +#if HAVE_THREADS > > +static void *filter_pipeline(void *arg) { > > + InputFilter *fl = arg; > > + AVFrame *frm; > > + int ret; > > + while(1) { > > + pthread_mutex_lock(&fl->process_mutex); > > + while (fl->waited_frm == NULL && !fl->t_end) > > + pthread_cond_wait(&fl->process_cond, &fl->process_mutex); > > + pthread_mutex_unlock(&fl->process_mutex); > > + > > + if (fl->t_end) break; > > + > > + frm = fl->waited_frm; > > + ret = ifilter_send_frame(fl, frm); > > + if (ret < 0) { > > + av_log(NULL, AV_LOG_ERROR, > > + "Failed to inject frame into filter network: %s\n", > av_err2str(ret)); > > + } else { > > + ret = reap_filters(0, fl); > > + } > > + fl->t_error = ret; > > + > > + pthread_mutex_lock(&fl->finish_mutex); > > + fl->waited_frm = NULL; > > + pthread_cond_signal(&fl->finish_cond); > > + pthread_mutex_unlock(&fl->finish_mutex); > > + > > + if (ret < 0) > > + break; > > Is this error always totally fatal? (I guess I'm wondering if any EAGAIN-like > cases end up here.) Will remove the break. If the ret<0, similar as previous code to call ifilter_send_frame() it will return the ret value to caller to decide whether it's fatal or not. > > > + } > > + return fl; > > This return value seems to be unused? OK, just return here. > > > +} > > +#endif > > + > > static int send_frame_to_filters(InputStream *ist, AVFrame > > *decoded_frame) { > > int i, ret; > > AVFrame *f; > > > > av_assert1(ist->nb_filters > 0); /* ensure ret is initialized */ > > +#if HAVE_THREADS > > + for (i = 0; i < ist->nb_filters; i++) { > > + //it will use abr_pipeline mode by default > > + if (i < ist->nb_filters - 1) { > > + f = &ist->filters[i]->input_frm; > > + ret = av_frame_ref(f, decoded_frame); > > + if (ret < 0) > > + break; > > Won't this just deadlock if you ever hit the break? You'll immediately wait > for threads which haven't been given anything to do. Yeah. Fixed in the next version. Thanks. > > > + } else > > + f = decoded_frame; > > + > > + if (!ist->filters[i]->b_abr_thread_init) { > > + if ((ret = pthread_create(&ist->filters[i]->f_thread, NULL, > filter_pipeline, > > + ist->filters[i]))) { > > + av_log(NULL, AV_LOG_ERROR, > > + "pthread_create failed: %s. Try to increase > `ulimit -v` or \ > > + decrease `ulimit -s`.\n", strerror(ret)); > > What is the motivation for these recommendations? Neither seems likely to > help except in very weirdly constrained systems. Done, removed. > > > + return AVERROR(ret); > > + } > > + pthread_mutex_init(&ist->filters[i]->process_mutex, NULL); > > + pthread_mutex_init(&ist->filters[i]->finish_mutex, NULL); > > + pthread_cond_init(&ist->filters[i]->process_cond, NULL); > > + pthread_cond_init(&ist->filters[i]->finish_cond, NULL); > > + ist->filters[i]->t_end = 0; > > + ist->filters[i]->t_error = 0; > > + ist->filters[i]->b_abr_thread_init = 1; > > + } > > + > > + pthread_mutex_lock(&ist->filters[i]->process_mutex); > > + ist->filters[i]->waited_frm = f; > > + pthread_cond_signal(&ist->filters[i]->process_cond); > > + pthread_mutex_unlock(&ist->filters[i]->process_mutex); > > + } > > + > > + for (i = 0; i < ist->nb_filters; i++) { > > + pthread_mutex_lock(&ist->filters[i]->finish_mutex); > > + while(ist->filters[i]->waited_frm != NULL) > > + pthread_cond_wait(&ist->filters[i]->finish_cond, > &ist->filters[i]->finish_mutex); > > + pthread_mutex_unlock(&ist->filters[i]->finish_mutex); > > + } > > Is the lockstep such that you can actually use the same mutex and condvar for > both parts? That would seem simpler if it works. > Let me try, but it's safe to use those for producer and consumer. > > + for (i = 0; i < ist->nb_filters; i++) { > > + if (ist->filters[i]->t_error < 0) { > > + ret = ist->filters[i]->t_error; > > + break; > > + } > > + } > > +#else > > for (i = 0; i < ist->nb_filters; i++) { > > if (i < ist->nb_filters - 1) { > > f = ist->filter_frame; > > @@ -2266,6 +2363,7 @@ static int send_frame_to_filters(InputStream *ist, > AVFrame *decoded_frame) > > break; > > } else > > f = decoded_frame; > > + > > Stray change? Done. > > > ret = ifilter_send_frame(ist->filters[i], f); > > if (ret == AVERROR_EOF) > > ret = 0; /* ignore */ > > @@ -2275,6 +2373,8 @@ static int send_frame_to_filters(InputStream *ist, > AVFrame *decoded_frame) > > break; > > } > > } > > +#endif > > There is still a bit of common code here between the two branches. I think > you really do want the #ifdefed region to be as small as possible (you can put > the loop outside the condition with a new loop start in the HAVE_THREADS > case only). It may increase more "#ifdef" slices? > > > + > > return ret; > > } > > > > @@ -4537,10 +4637,10 @@ static int transcode_from_filter(FilterGraph > *graph, InputStream **best_ist) > > *best_ist = NULL; > > ret = avfilter_graph_request_oldest(graph->graph); > > if (ret >= 0) > > - return reap_filters(0); > > + return reap_filters(0, NULL); > > I'm not entirely sure I'm reading this correctly, but I think this is the complex > filtergraph case. I think so. > That means that using -filter_complex split will have quite different behaviour > to multiple -vf instances? > Yes, they are different. The patch is mainly for filter graph level. In terms of complex split case, there will be another one where changed in the lib. > > > > if (ret == AVERROR_EOF) { > > - ret = reap_filters(1); > > + ret = reap_filters(1, NULL); > > for (i = 0; i < graph->nb_outputs; i++) > > close_output_stream(graph->outputs[i]->ost); > > return ret; > > @@ -4642,7 +4742,7 @@ static int transcode_step(void) > > if (ret < 0) > > return ret == AVERROR_EOF ? 0 : ret; > > > > - return reap_filters(0); > > + return HAVE_THREADS ? ret : reap_filters(0, NULL); > > } > > > > /* > > diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index > > eb1eaf6..9a8e776 100644 > > --- a/fftools/ffmpeg.h > > +++ b/fftools/ffmpeg.h > > @@ -253,6 +253,20 @@ typedef struct InputFilter { > > > > AVBufferRef *hw_frames_ctx; > > > > + // for abr pipeline > > + int b_abr_thread_init; > > I'm not sure what this name is intended to mean at all. Since it indicates > whether the filter thread has been created, maybe something like > "filter_thread_created" would make the meaning clearer? > How about abr_thread_created? Since I want to distinguish them from those frame/slice filter thread, so I call them adaptive bite rate pipeline :) > > +#if HAVE_THREADS > > + AVFrame *waited_frm; > > + AVFrame input_frm; > > sizeof(AVFrame) is not part of the ABI. You need to allocate it somewhere. > Please tell more? > > + pthread_t f_thread; > > "filter_thread"? > abr_thread > > + pthread_cond_t process_cond; > > + pthread_cond_t finish_cond; > > + pthread_mutex_t process_mutex; > > + pthread_mutex_t finish_mutex; > > + int t_end; > > + int t_error; > > I think it would be a good idea to document the condition associated with > each of these. > > > +#endif > > + > > int eof; > > } InputFilter; > > > > diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c index > > 6518d50..5d1e521 100644 > > --- a/fftools/ffmpeg_filter.c > > +++ b/fftools/ffmpeg_filter.c > > @@ -197,6 +197,7 @@ DEF_CHOOSE_FORMAT(channel_layouts, uint64_t, > > channel_layout, channel_layouts, 0, int > > init_simple_filtergraph(InputStream *ist, OutputStream *ost) { > > FilterGraph *fg = av_mallocz(sizeof(*fg)); > > + int i; > > > > if (!fg) > > exit_program(1); > > @@ -225,6 +226,9 @@ int init_simple_filtergraph(InputStream *ist, > OutputStream *ost) > > GROW_ARRAY(ist->filters, ist->nb_filters); > > ist->filters[ist->nb_filters - 1] = fg->inputs[0]; > > > > + for (i = 0; i < ist->nb_filters; i++) > > + ist->filters[i]->b_abr_thread_init = 0; > > It doesn't look like the right place for this init? init_simple_filtergraph() is > called once per output stream, so this is going to happen multiple times. > Find another place. Not only simple filter graph need this, but also complex graph do. > > + > > GROW_ARRAY(filtergraphs, nb_filtergraphs); > > filtergraphs[nb_filtergraphs - 1] = fg; > > > > > > - Mark > _______________________________________________ > ffmpeg-devel mailing list > ffmpeg-devel@ffmpeg.org > http://ffmpeg.org/mailman/listinfo/ffmpeg-devel
2019-02-13 8:52 GMT+01:00, Wang, Shaofei <shaofei.wang@intel.com>: >> > + AVFrame input_frm; >> >> sizeof(AVFrame) is not part of the ABI. You need to allocate it >> somewhere. >> > Please tell more? See the documentation for AVFrame in libavutil/frame.h Use av_frame_alloc() Carl Eugen
> >> sizeof(AVFrame) is not part of the ABI. You need to allocate it > >> somewhere. > >> > > Please tell more? > > See the documentation for AVFrame in libavutil/frame.h Use av_frame_alloc() > > Carl Eugen Thanks Carl
diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c index 544f1a1..67b1a2a 100644 --- a/fftools/ffmpeg.c +++ b/fftools/ffmpeg.c @@ -1419,13 +1419,18 @@ static void finish_output_stream(OutputStream *ost) * * @return 0 for success, <0 for severe errors */ -static int reap_filters(int flush) +static int reap_filters(int flush, InputFilter * ifilter) { AVFrame *filtered_frame = NULL; int i; - /* Reap all buffers present in the buffer sinks */ + /* Reap all buffers present in the buffer sinks or just reap specified + * input filter buffer */ for (i = 0; i < nb_output_streams; i++) { + if (ifilter) { + if (ifilter != output_streams[i]->filter->graph->inputs[0]) + continue; + } OutputStream *ost = output_streams[i]; OutputFile *of = output_files[ost->file_index]; AVFilterContext *filter; @@ -2179,7 +2184,8 @@ static int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame) } } - ret = reap_filters(1); + ret = HAVE_THREADS ? reap_filters(1, ifilter) : reap_filters(1, NULL); + if (ret < 0 && ret != AVERROR_EOF) { av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n", av_err2str(ret)); return ret; @@ -2208,6 +2214,14 @@ static int ifilter_send_eof(InputFilter *ifilter, int64_t pts) ifilter->eof = 1; +#if HAVE_THREADS + ifilter->waited_frm = NULL; + pthread_mutex_lock(&ifilter->process_mutex); + ifilter->t_end = 1; + pthread_cond_signal(&ifilter->process_cond); + pthread_mutex_unlock(&ifilter->process_mutex); + pthread_join(ifilter->f_thread, NULL); +#endif if (ifilter->filter) { ret = av_buffersrc_close(ifilter->filter, pts, AV_BUFFERSRC_FLAG_PUSH); if (ret < 0) @@ -2252,12 +2266,95 @@ static int decode(AVCodecContext *avctx, AVFrame *frame, int *got_frame, AVPacke return 0; } +#if HAVE_THREADS +static void *filter_pipeline(void *arg) +{ + InputFilter *fl = arg; + AVFrame *frm; + int ret; + while(1) { + pthread_mutex_lock(&fl->process_mutex); + while (fl->waited_frm == NULL && !fl->t_end) + pthread_cond_wait(&fl->process_cond, &fl->process_mutex); + pthread_mutex_unlock(&fl->process_mutex); + + if (fl->t_end) break; + + frm = fl->waited_frm; + ret = ifilter_send_frame(fl, frm); + if (ret < 0) { + av_log(NULL, AV_LOG_ERROR, + "Failed to inject frame into filter network: %s\n", av_err2str(ret)); + } else { + ret = reap_filters(0, fl); + } + fl->t_error = ret; + + pthread_mutex_lock(&fl->finish_mutex); + fl->waited_frm = NULL; + pthread_cond_signal(&fl->finish_cond); + pthread_mutex_unlock(&fl->finish_mutex); + + if (ret < 0) + break; + } + return fl; +} +#endif + static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame) { int i, ret; AVFrame *f; av_assert1(ist->nb_filters > 0); /* ensure ret is initialized */ +#if HAVE_THREADS + for (i = 0; i < ist->nb_filters; i++) { + //it will use abr_pipeline mode by default + if (i < ist->nb_filters - 1) { + f = &ist->filters[i]->input_frm; + ret = av_frame_ref(f, decoded_frame); + if (ret < 0) + break; + } else + f = decoded_frame; + + if (!ist->filters[i]->b_abr_thread_init) { + if ((ret = pthread_create(&ist->filters[i]->f_thread, NULL, filter_pipeline, + ist->filters[i]))) { + av_log(NULL, AV_LOG_ERROR, + "pthread_create failed: %s. Try to increase `ulimit -v` or \ + decrease `ulimit -s`.\n", strerror(ret)); + return AVERROR(ret); + } + pthread_mutex_init(&ist->filters[i]->process_mutex, NULL); + pthread_mutex_init(&ist->filters[i]->finish_mutex, NULL); + pthread_cond_init(&ist->filters[i]->process_cond, NULL); + pthread_cond_init(&ist->filters[i]->finish_cond, NULL); + ist->filters[i]->t_end = 0; + ist->filters[i]->t_error = 0; + ist->filters[i]->b_abr_thread_init = 1; + } + + pthread_mutex_lock(&ist->filters[i]->process_mutex); + ist->filters[i]->waited_frm = f; + pthread_cond_signal(&ist->filters[i]->process_cond); + pthread_mutex_unlock(&ist->filters[i]->process_mutex); + } + + for (i = 0; i < ist->nb_filters; i++) { + pthread_mutex_lock(&ist->filters[i]->finish_mutex); + while(ist->filters[i]->waited_frm != NULL) + pthread_cond_wait(&ist->filters[i]->finish_cond, &ist->filters[i]->finish_mutex); + pthread_mutex_unlock(&ist->filters[i]->finish_mutex); + } + for (i = 0; i < ist->nb_filters; i++) { + if (ist->filters[i]->t_error < 0) { + ret = ist->filters[i]->t_error; + break; + } + } +#else for (i = 0; i < ist->nb_filters; i++) { if (i < ist->nb_filters - 1) { f = ist->filter_frame; @@ -2266,6 +2363,7 @@ static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame) break; } else f = decoded_frame; + ret = ifilter_send_frame(ist->filters[i], f); if (ret == AVERROR_EOF) ret = 0; /* ignore */ @@ -2275,6 +2373,8 @@ static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame) break; } } +#endif + return ret; } @@ -4537,10 +4637,10 @@ static int transcode_from_filter(FilterGraph *graph, InputStream **best_ist) *best_ist = NULL; ret = avfilter_graph_request_oldest(graph->graph); if (ret >= 0) - return reap_filters(0); + return reap_filters(0, NULL); if (ret == AVERROR_EOF) { - ret = reap_filters(1); + ret = reap_filters(1, NULL); for (i = 0; i < graph->nb_outputs; i++) close_output_stream(graph->outputs[i]->ost); return ret; @@ -4642,7 +4742,7 @@ static int transcode_step(void) if (ret < 0) return ret == AVERROR_EOF ? 0 : ret; - return reap_filters(0); + return HAVE_THREADS ? ret : reap_filters(0, NULL); } /* diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index eb1eaf6..9a8e776 100644 --- a/fftools/ffmpeg.h +++ b/fftools/ffmpeg.h @@ -253,6 +253,20 @@ typedef struct InputFilter { AVBufferRef *hw_frames_ctx; + // for abr pipeline + int b_abr_thread_init; +#if HAVE_THREADS + AVFrame *waited_frm; + AVFrame input_frm; + pthread_t f_thread; + pthread_cond_t process_cond; + pthread_cond_t finish_cond; + pthread_mutex_t process_mutex; + pthread_mutex_t finish_mutex; + int t_end; + int t_error; +#endif + int eof; } InputFilter; diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c index 6518d50..5d1e521 100644 --- a/fftools/ffmpeg_filter.c +++ b/fftools/ffmpeg_filter.c @@ -197,6 +197,7 @@ DEF_CHOOSE_FORMAT(channel_layouts, uint64_t, channel_layout, channel_layouts, 0, int init_simple_filtergraph(InputStream *ist, OutputStream *ost) { FilterGraph *fg = av_mallocz(sizeof(*fg)); + int i; if (!fg) exit_program(1); @@ -225,6 +226,9 @@ int init_simple_filtergraph(InputStream *ist, OutputStream *ost) GROW_ARRAY(ist->filters, ist->nb_filters); ist->filters[ist->nb_filters - 1] = fg->inputs[0]; + for (i = 0; i < ist->nb_filters; i++) + ist->filters[i]->b_abr_thread_init = 0; + GROW_ARRAY(filtergraphs, nb_filtergraphs); filtergraphs[nb_filtergraphs - 1] = fg;