Message ID | 1547591004-13018-1-git-send-email-shaofei.wang@intel.com |
---|---|
State | Superseded |
Headers | show |
On Tue, 15 Jan 2019 at 09:24, Shaofei Wang <shaofei.wang@intel.com> wrote: > With new option "-abr_pipeline" > It enabled multiple filter graph concurrency, which bring obove about > 4%~20% improvement in some 1:N scenarios by CPU or GPU acceleration > > Below are some test cases and comparison as reference. > (Hardware platform: Intel(R) Core(TM) i7-6700 CPU @ 3.40GHz) > (Software: Intel iHD driver - 16.9.00100, CentOS 7) > > For 1:N transcode by GPU acceleration with vaapi: > ./ffmpeg -vaapi_device /dev/dri/renderD128 -hwaccel vaapi \ > -hwaccel_output_format vaapi \ > -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \ > -vf "scale_vaapi=1280:720" -c:v h264_vaapi -f null /dev/null \ > -vf "scale_vaapi=720:480" -c:v h264_vaapi -f null /dev/null \ > -abr_pipeline > > test results: > 2 encoders 5 encoders 10 encoders > Improved 6.1% 6.9% 5.5% > > For 1:N transcode by GPU acceleration with QSV: > ./ffmpeg -hwaccel qsv -c:v h264_qsv \ > -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \ > -vf "scale_qsv=1280:720:format=nv12" -c:v h264_qsv -f null /dev/null \ > -vf "scale_qsv=720:480:format=nv12" -c:v h264_qsv -f null /dev/null > > test results: > 2 encoders 5 encoders 10 encoders > Improved 6% 4% 15% > > For Intel GPU acceleration case, 1 decode to N scaling, by QSV: > ./ffmpeg -hwaccel qsv -c:v h264_qsv \ > -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \ > -vf "scale_qsv=1280:720:format=nv12,hwdownload" -pix_fmt nv12 -f null > /dev/null \ > -vf "scale_qsv=720:480:format=nv12,hwdownload" -pix_fmt nv12 -f null > /dev/null > > test results: > 2 scale 5 scale 10 scale > Improved 12% 21% 21% > > For CPU only 1 decode to N scaling: > ./ffmpeg -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \ > -vf "scale=1280:720" -pix_fmt nv12 -f null /dev/null \ > -vf "scale=720:480" -pix_fmt nv12 -f null /dev/null \ > -abr_pipeline > > test results: > 2 scale 5 scale 10 scale > Improved 25% 107% 148% > > Signed-off-by: Wang, Shaofei <shaofei.wang@intel.com> > Reviewed-by: Zhao, Jun <jun.zhao@intel.com> > --- > fftools/ffmpeg.c | 238 > +++++++++++++++++++++++++++++++++++++++++++++--- > fftools/ffmpeg.h | 15 +++ > fftools/ffmpeg_filter.c | 6 ++ > fftools/ffmpeg_opt.c | 6 +- > 4 files changed, 251 insertions(+), 14 deletions(-) > > diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c > index 544f1a1..d608194 100644 > --- a/fftools/ffmpeg.c > +++ b/fftools/ffmpeg.c > @@ -1523,6 +1523,110 @@ static int reap_filters(int flush) > return 0; > } > > +static int pipeline_reap_filters(int flush, InputFilter * ifilter) > +{ > + AVFrame *filtered_frame = NULL; > + int i; > + > + for (i = 0; i < nb_output_streams; i++) { > + if (ifilter == output_streams[i]->filter->graph->inputs[0]) break; > + } > + OutputStream *ost = output_streams[i]; > + OutputFile *of = output_files[ost->file_index]; > + AVFilterContext *filter; > + AVCodecContext *enc = ost->enc_ctx; > + int ret = 0; > + > + if (!ost->filter || !ost->filter->graph->graph) > + return 0; > + filter = ost->filter->filter; > + > + if (!ost->initialized) { > + char error[1024] = ""; > + ret = init_output_stream(ost, error, sizeof(error)); > + if (ret < 0) { > + av_log(NULL, AV_LOG_ERROR, "Error initializing output stream > %d:%d -- %s\n", > + ost->file_index, ost->index, error); > + exit_program(1); > + } > + } > + > + if (!ost->filtered_frame && !(ost->filtered_frame = > av_frame_alloc())) { > + return AVERROR(ENOMEM); > + } > We never put brackets for single line statements. > + filtered_frame = ost->filtered_frame; > + > + while (1) { > + double float_pts = AV_NOPTS_VALUE; // this is identical to > filtered_frame.pts but with higher precision > + ret = av_buffersink_get_frame_flags(filter, filtered_frame, > + AV_BUFFERSINK_FLAG_NO_REQUEST); > + if (ret < 0) { > + if (ret != AVERROR(EAGAIN) && ret != AVERROR_EOF) { > + av_log(NULL, AV_LOG_WARNING, > + "Error in av_buffersink_get_frame_flags(): %s\n", > av_err2str(ret)); > + } else if (flush && ret == AVERROR_EOF) { > + if (av_buffersink_get_type(filter) == AVMEDIA_TYPE_VIDEO) > + do_video_out(of, ost, NULL, AV_NOPTS_VALUE); > + } > + break; > + } > + if (ost->finished) { > + av_frame_unref(filtered_frame); > + continue; > + } > + if (filtered_frame->pts != AV_NOPTS_VALUE) { > + int64_t start_time = (of->start_time == AV_NOPTS_VALUE) ? 0 : > of->start_time; > + AVRational filter_tb = av_buffersink_get_time_base(filter); > + AVRational tb = enc->time_base; > + int extra_bits = av_clip(29 - av_log2(tb.den), 0, 16); > + > + tb.den <<= extra_bits; > + float_pts = > + av_rescale_q(filtered_frame->pts, filter_tb, tb) - > + av_rescale_q(start_time, AV_TIME_BASE_Q, tb); > + float_pts /= 1 << extra_bits; > + // avoid exact midoints to reduce the chance of rounding > differences, this can be removed in case the fps code is changed to work > with integers > + float_pts += FFSIGN(float_pts) * 1.0 / (1<<17); > + > + filtered_frame->pts = > + av_rescale_q(filtered_frame->pts, filter_tb, > enc->time_base) - > + av_rescale_q(start_time, AV_TIME_BASE_Q, enc->time_base); > + } > + > + switch (av_buffersink_get_type(filter)) { > + case AVMEDIA_TYPE_VIDEO: > + if (!ost->frame_aspect_ratio.num) > + enc->sample_aspect_ratio = > filtered_frame->sample_aspect_ratio; > + > + if (debug_ts) { > + av_log(NULL, AV_LOG_INFO, "filter -> pts:%s pts_time:%s > exact:%f time_base:%d/%d\n", > + av_ts2str(filtered_frame->pts), > av_ts2timestr(filtered_frame->pts, &enc->time_base), > + float_pts, > + enc->time_base.num, enc->time_base.den); > + } > + > + do_video_out(of, ost, filtered_frame, float_pts); > + break; > + case AVMEDIA_TYPE_AUDIO: > + if (!(enc->codec->capabilities & AV_CODEC_CAP_PARAM_CHANGE) && > + enc->channels != filtered_frame->channels) { > + av_log(NULL, AV_LOG_ERROR, > + "Audio filter graph output is not normalized and > encoder does not support parameter changes\n"); > + break; > + } > + do_audio_out(of, ost, filtered_frame); > + break; > + default: > + // TODO support subtitle filters > + av_assert0(0); > + } > + > + av_frame_unref(filtered_frame); > + } > + > + return 0; > +} > + > static void print_final_stats(int64_t total_size) > { > uint64_t video_size = 0, audio_size = 0, extra_size = 0, other_size = > 0; > @@ -2179,7 +2283,15 @@ static int ifilter_send_frame(InputFilter *ifilter, > AVFrame *frame) > } > } > > +#if HAVE_THREADS > + if (!abr_pipeline) { > + ret = reap_filters(1); > + } else { > + ret = pipeline_reap_filters(1, ifilter); > + } > Same. > +#else > ret = reap_filters(1); > +#endif > if (ret < 0 && ret != AVERROR_EOF) { > av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n", > av_err2str(ret)); > return ret; > @@ -2208,6 +2320,16 @@ static int ifilter_send_eof(InputFilter *ifilter, > int64_t pts) > > ifilter->eof = 1; > > +#if HAVE_THREADS > + if (abr_pipeline) { > + ifilter->waited_frm = NULL; > + pthread_mutex_lock(&ifilter->process_mutex); > + ifilter->t_end = 1; > + pthread_cond_signal(&ifilter->process_cond); > + pthread_mutex_unlock(&ifilter->process_mutex); > + pthread_join(ifilter->f_thread, NULL); > + } > +#endif > if (ifilter->filter) { > ret = av_buffersrc_close(ifilter->filter, pts, > AV_BUFFERSRC_FLAG_PUSH); > if (ret < 0) > @@ -2252,6 +2374,42 @@ static int decode(AVCodecContext *avctx, AVFrame > *frame, int *got_frame, AVPacke > return 0; > } > > +#if HAVE_THREADS > +static void *filter_pipeline(void *arg) > +{ > + InputFilter *fl = arg; > + AVFrame *frm; > + int ret; > + while(1) { > + pthread_mutex_lock(&fl->process_mutex); > + while (fl->waited_frm == NULL && !fl->t_end) > + pthread_cond_wait(&fl->process_cond, &fl->process_mutex); > + pthread_mutex_unlock(&fl->process_mutex); > + > + if (fl->t_end) break; > + > + frm = fl->waited_frm; > + ret = ifilter_send_frame(fl, frm); > + if (ret < 0) { > + av_log(NULL, AV_LOG_ERROR, > + "Failed to inject frame into filter network: %s\n", > av_err2str(ret)); > return err;? > + } else { > + ret = pipeline_reap_filters(0, fl); > + } > Single line brackets. > + fl->t_error = ret; > + > + pthread_mutex_lock(&fl->finish_mutex); > + fl->waited_frm = NULL; > + pthread_cond_signal(&fl->finish_cond); > + pthread_mutex_unlock(&fl->finish_mutex); > + > + if (ret < 0) { > + break; > + } > Same. + } > + return NULL; > +} > +#endif > static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame) > { > int i, ret; > @@ -2259,22 +2417,73 @@ static int send_frame_to_filters(InputStream *ist, > AVFrame *decoded_frame) > > av_assert1(ist->nb_filters > 0); /* ensure ret is initialized */ > for (i = 0; i < ist->nb_filters; i++) { > - if (i < ist->nb_filters - 1) { > - f = ist->filter_frame; > - ret = av_frame_ref(f, decoded_frame); > - if (ret < 0) > +#if HAVE_THREADS > + if (!abr_pipeline) { > +#endif > + if (i < ist->nb_filters - 1) { > + f = ist->filter_frame; > + ret = av_frame_ref(f, decoded_frame); > + if (ret < 0) > + break; > + } else > + f = decoded_frame; > + > + ret = ifilter_send_frame(ist->filters[i], f); > + if (ret == AVERROR_EOF) > + ret = 0; /* ignore */ > + if (ret < 0) { > + av_log(NULL, AV_LOG_ERROR, > + "Failed to inject frame into filter network: > %s\n", av_err2str(ret)); > + break; > + } > +#if HAVE_THREADS > + } else { > + if (i < ist->nb_filters - 1) { > + f = &ist->filters[i]->input_frm; > + ret = av_frame_ref(f, decoded_frame); > + if (ret < 0) > + break; > + } else > + f = decoded_frame; > + > + if (!ist->filters[i]->b_abr_thread_init) { > + if ((ret = pthread_create(&ist->filters[i]->f_thread, > NULL, filter_pipeline, ist->filters[i]))) { > + av_log(NULL, AV_LOG_ERROR, "pthread_create failed: > %s. Try to increase `ulimit -v` or decrease `ulimit -s`.\n", strerror(ret)); > + return AVERROR(ret); > + } > + pthread_mutex_init(&ist->filters[i]->process_mutex, NULL); > + pthread_mutex_init(&ist->filters[i]->finish_mutex, NULL); > + pthread_cond_init(&ist->filters[i]->process_cond, NULL); > + pthread_cond_init(&ist->filters[i]->finish_cond, NULL); > + ist->filters[i]->t_end = 0; > + ist->filters[i]->t_error = 0; > + ist->filters[i]->b_abr_thread_init = 1; > + } > + > + pthread_mutex_lock(&ist->filters[i]->process_mutex); > + ist->filters[i]->waited_frm = f; > + pthread_cond_signal(&ist->filters[i]->process_cond); > + pthread_mutex_unlock(&ist->filters[i]->process_mutex); > + } > +#endif > + } > +#if HAVE_THREADS > + if (abr_pipeline) { > + for (i = 0; i < ist->nb_filters; i++) { > + pthread_mutex_lock(&ist->filters[i]->finish_mutex); > + while(ist->filters[i]->waited_frm != NULL) > + pthread_cond_wait(&ist->filters[i]->finish_cond, > &ist->filters[i]->finish_mutex); > + pthread_mutex_unlock(&ist->filters[i]->finish_mutex); > + } > + for (i = 0; i < ist->nb_filters; i++) { > + if (ist->filters[i]->t_error < 0) { > + ret = ist->filters[i]->t_error; > break; > - } else > - f = decoded_frame; > - ret = ifilter_send_frame(ist->filters[i], f); > - if (ret == AVERROR_EOF) > - ret = 0; /* ignore */ > - if (ret < 0) { > - av_log(NULL, AV_LOG_ERROR, > - "Failed to inject frame into filter network: %s\n", > av_err2str(ret)); > - break; > + } > } > } > +#endif > + > return ret; > } > > @@ -4642,6 +4851,9 @@ static int transcode_step(void) > if (ret < 0) > return ret == AVERROR_EOF ? 0 : ret; > > +#if HAVE_THREADS > + if (abr_pipeline) return 0; > +#endif > return reap_filters(0); > } > > diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h > index eb1eaf6..e91c243 100644 > --- a/fftools/ffmpeg.h > +++ b/fftools/ffmpeg.h > @@ -253,6 +253,20 @@ typedef struct InputFilter { > > AVBufferRef *hw_frames_ctx; > > +#if HAVE_THREADS > + // for abr pipeline > + AVFrame *waited_frm; > + AVFrame input_frm; > + pthread_t f_thread; > + pthread_cond_t process_cond; > + pthread_cond_t finish_cond; > + pthread_mutex_t process_mutex; > + pthread_mutex_t finish_mutex; > + int b_abr_thread_init; > + int t_end; > + int t_error; > +#endif > + > int eof; > } InputFilter; > > @@ -606,6 +620,7 @@ extern int frame_bits_per_raw_sample; > extern AVIOContext *progress_avio; > extern float max_error_rate; > extern char *videotoolbox_pixfmt; > +extern int abr_pipeline; > > extern int filter_nbthreads; > extern int filter_complex_nbthreads; > diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c > index 6518d50..8823394 100644 > --- a/fftools/ffmpeg_filter.c > +++ b/fftools/ffmpeg_filter.c > @@ -197,6 +197,7 @@ DEF_CHOOSE_FORMAT(channel_layouts, uint64_t, > channel_layout, channel_layouts, 0, > int init_simple_filtergraph(InputStream *ist, OutputStream *ost) > { > FilterGraph *fg = av_mallocz(sizeof(*fg)); > + int i; > > if (!fg) > exit_program(1); > @@ -225,6 +226,11 @@ int init_simple_filtergraph(InputStream *ist, > OutputStream *ost) > GROW_ARRAY(ist->filters, ist->nb_filters); > ist->filters[ist->nb_filters - 1] = fg->inputs[0]; > > + if (abr_pipeline) { > + for (i = 0; i < ist->nb_filters; i++) { > + ist->filters[i]->b_abr_thread_init = 0; > + } > + } > Same x2. > GROW_ARRAY(filtergraphs, nb_filtergraphs); > filtergraphs[nb_filtergraphs - 1] = fg; > > diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c > index d4851a2..fa5a556 100644 > --- a/fftools/ffmpeg_opt.c > +++ b/fftools/ffmpeg_opt.c > @@ -110,6 +110,7 @@ float max_error_rate = 2.0/3; > int filter_nbthreads = 0; > int filter_complex_nbthreads = 0; > int vstats_version = 2; > +int abr_pipeline = 0; > > > static int intra_only = 0; > @@ -3502,7 +3503,10 @@ const OptionDef options[] = { > "set the maximum number of queued packets from the demuxer" }, > { "find_stream_info", OPT_BOOL | OPT_PERFILE | OPT_INPUT | > OPT_EXPERT, { &find_stream_info }, > "read and decode the streams to fill missing information with > heuristics" }, > - > +#if HAVE_THREADS > + { "abr_pipeline", OPT_BOOL, { > &abr_pipeline }, > + "adaptive bitrate pipeline (1 decode to N filter graphs, and 1 to > N transcode" }, > +#endif > /* video options */ > { "vframes", OPT_VIDEO | HAS_ARG | OPT_PERFILE | OPT_OUTPUT, > { .func_arg = opt_video_frames }, > "set the number of video frames to output", "number" }, > -- > 1.8.3.1 > > _______________________________________________ > ffmpeg-devel mailing list > ffmpeg-devel@ffmpeg.org > http://ffmpeg.org/mailman/listinfo/ffmpeg-devel >
2019-01-15 11:30 GMT+01:00, Rostislav Pehlivanov <atomnuker@gmail.com>: > On Tue, 15 Jan 2019 at 09:24, Shaofei Wang <shaofei.wang@intel.com> wrote: >> +#if HAVE_THREADS >> + if (!abr_pipeline) { >> + ret = reap_filters(1); >> + } else { >> + ret = pipeline_reap_filters(1, ifilter); >> + } >> > > Same. This hunk should have brackets, it simplifies debugging and future patches at very little cost. Carl Eugen
On Tue, 15 Jan 2019 at 11:57, Carl Eugen Hoyos <ceffmpeg@gmail.com> wrote: > 2019-01-15 11:30 GMT+01:00, Rostislav Pehlivanov <atomnuker@gmail.com>: > > On Tue, 15 Jan 2019 at 09:24, Shaofei Wang <shaofei.wang@intel.com> > wrote: > > >> +#if HAVE_THREADS > >> + if (!abr_pipeline) { > >> + ret = reap_filters(1); > >> + } else { > >> + ret = pipeline_reap_filters(1, ifilter); > >> + } > >> > > > > Same. > > This hunk should have brackets, it simplifies > debugging and future patches at very little cost. > No, it does not. It wastes a line. > _______________________________________________ > ffmpeg-devel mailing list > ffmpeg-devel@ffmpeg.org > http://ffmpeg.org/mailman/listinfo/ffmpeg-devel >
2019-01-15 14:43 GMT+01:00, Rostislav Pehlivanov <atomnuker@gmail.com>: > On Tue, 15 Jan 2019 at 11:57, Carl Eugen Hoyos <ceffmpeg@gmail.com> wrote: > >> 2019-01-15 11:30 GMT+01:00, Rostislav Pehlivanov <atomnuker@gmail.com>: >> > On Tue, 15 Jan 2019 at 09:24, Shaofei Wang <shaofei.wang@intel.com> >> wrote: >> >> >> +#if HAVE_THREADS >> >> + if (!abr_pipeline) { >> >> + ret = reap_filters(1); >> >> + } else { >> >> + ret = pipeline_reap_filters(1, ifilter); >> >> + } >> >> >> > >> > Same. >> >> This hunk should have brackets, it simplifies >> debugging and future patches at very little cost. >> > > No, it does not. It wastes a line. Yes, it "wastes" a line. But the advantage far outweighs the incredible costs. Carl Eugen
On Tue, Jan 15, 2019 at 05:23:24PM -0500, Shaofei Wang wrote: > With new option "-abr_pipeline" > It enabled multiple filter graph concurrency, which bring obove about > 4%~20% improvement in some 1:N scenarios by CPU or GPU acceleration > > Below are some test cases and comparison as reference. > (Hardware platform: Intel(R) Core(TM) i7-6700 CPU @ 3.40GHz) > (Software: Intel iHD driver - 16.9.00100, CentOS 7) > > For 1:N transcode by GPU acceleration with vaapi: > ./ffmpeg -vaapi_device /dev/dri/renderD128 -hwaccel vaapi \ > -hwaccel_output_format vaapi \ > -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \ > -vf "scale_vaapi=1280:720" -c:v h264_vaapi -f null /dev/null \ > -vf "scale_vaapi=720:480" -c:v h264_vaapi -f null /dev/null \ > -abr_pipeline > > test results: > 2 encoders 5 encoders 10 encoders > Improved 6.1% 6.9% 5.5% > > For 1:N transcode by GPU acceleration with QSV: > ./ffmpeg -hwaccel qsv -c:v h264_qsv \ > -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \ > -vf "scale_qsv=1280:720:format=nv12" -c:v h264_qsv -f null /dev/null \ > -vf "scale_qsv=720:480:format=nv12" -c:v h264_qsv -f null /dev/null > > test results: > 2 encoders 5 encoders 10 encoders > Improved 6% 4% 15% > > For Intel GPU acceleration case, 1 decode to N scaling, by QSV: > ./ffmpeg -hwaccel qsv -c:v h264_qsv \ > -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \ > -vf "scale_qsv=1280:720:format=nv12,hwdownload" -pix_fmt nv12 -f null /dev/null \ > -vf "scale_qsv=720:480:format=nv12,hwdownload" -pix_fmt nv12 -f null /dev/null > > test results: > 2 scale 5 scale 10 scale > Improved 12% 21% 21% > > For CPU only 1 decode to N scaling: > ./ffmpeg -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \ > -vf "scale=1280:720" -pix_fmt nv12 -f null /dev/null \ > -vf "scale=720:480" -pix_fmt nv12 -f null /dev/null \ > -abr_pipeline > > test results: > 2 scale 5 scale 10 scale > Improved 25% 107% 148% > > Signed-off-by: Wang, Shaofei <shaofei.wang@intel.com> > Reviewed-by: Zhao, Jun <jun.zhao@intel.com> > --- > fftools/ffmpeg.c | 238 +++++++++++++++++++++++++++++++++++++++++++++--- > fftools/ffmpeg.h | 15 +++ > fftools/ffmpeg_filter.c | 6 ++ > fftools/ffmpeg_opt.c | 6 +- > 4 files changed, 251 insertions(+), 14 deletions(-) breaks build when threads are not available CC fftools/ffprobe.o src/fftools/ffmpeg_filter.c: In function ‘init_simple_filtergraph’: src/fftools/ffmpeg_filter.c:231: error: ‘InputFilter’ has no member named ‘b_abr_thread_init’ make: *** [fftools/ffmpeg_filter.o] Error 1 make: *** Waiting for unfinished jobs.... [...]
On Tue, 15 Jan 2019, Carl Eugen Hoyos wrote: > 2019-01-15 14:43 GMT+01:00, Rostislav Pehlivanov <atomnuker@gmail.com>: >> On Tue, 15 Jan 2019 at 11:57, Carl Eugen Hoyos <ceffmpeg@gmail.com> wrote: >> >>> 2019-01-15 11:30 GMT+01:00, Rostislav Pehlivanov <atomnuker@gmail.com>: >>> > On Tue, 15 Jan 2019 at 09:24, Shaofei Wang <shaofei.wang@intel.com> >>> wrote: >>> >>> >> +#if HAVE_THREADS >>> >> + if (!abr_pipeline) { >>> >> + ret = reap_filters(1); >>> >> + } else { >>> >> + ret = pipeline_reap_filters(1, ifilter); >>> >> + } >>> >> >>> > >>> > Same. >>> >>> This hunk should have brackets, it simplifies >>> debugging and future patches at very little cost. >>> >> >> No, it does not. It wastes a line. > > Yes, it "wastes" a line. > But the advantage far outweighs the incredible costs. Not that I care too much, but I would write this: ret = abr_pipeline ? pipeline_reap_filters(1, ifilter) : reap_filters(1); Regards, Marton
diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c index 544f1a1..d608194 100644 --- a/fftools/ffmpeg.c +++ b/fftools/ffmpeg.c @@ -1523,6 +1523,110 @@ static int reap_filters(int flush) return 0; } +static int pipeline_reap_filters(int flush, InputFilter * ifilter) +{ + AVFrame *filtered_frame = NULL; + int i; + + for (i = 0; i < nb_output_streams; i++) { + if (ifilter == output_streams[i]->filter->graph->inputs[0]) break; + } + OutputStream *ost = output_streams[i]; + OutputFile *of = output_files[ost->file_index]; + AVFilterContext *filter; + AVCodecContext *enc = ost->enc_ctx; + int ret = 0; + + if (!ost->filter || !ost->filter->graph->graph) + return 0; + filter = ost->filter->filter; + + if (!ost->initialized) { + char error[1024] = ""; + ret = init_output_stream(ost, error, sizeof(error)); + if (ret < 0) { + av_log(NULL, AV_LOG_ERROR, "Error initializing output stream %d:%d -- %s\n", + ost->file_index, ost->index, error); + exit_program(1); + } + } + + if (!ost->filtered_frame && !(ost->filtered_frame = av_frame_alloc())) { + return AVERROR(ENOMEM); + } + filtered_frame = ost->filtered_frame; + + while (1) { + double float_pts = AV_NOPTS_VALUE; // this is identical to filtered_frame.pts but with higher precision + ret = av_buffersink_get_frame_flags(filter, filtered_frame, + AV_BUFFERSINK_FLAG_NO_REQUEST); + if (ret < 0) { + if (ret != AVERROR(EAGAIN) && ret != AVERROR_EOF) { + av_log(NULL, AV_LOG_WARNING, + "Error in av_buffersink_get_frame_flags(): %s\n", av_err2str(ret)); + } else if (flush && ret == AVERROR_EOF) { + if (av_buffersink_get_type(filter) == AVMEDIA_TYPE_VIDEO) + do_video_out(of, ost, NULL, AV_NOPTS_VALUE); + } + break; + } + if (ost->finished) { + av_frame_unref(filtered_frame); + continue; + } + if (filtered_frame->pts != AV_NOPTS_VALUE) { + int64_t start_time = (of->start_time == AV_NOPTS_VALUE) ? 0 : of->start_time; + AVRational filter_tb = av_buffersink_get_time_base(filter); + AVRational tb = enc->time_base; + int extra_bits = av_clip(29 - av_log2(tb.den), 0, 16); + + tb.den <<= extra_bits; + float_pts = + av_rescale_q(filtered_frame->pts, filter_tb, tb) - + av_rescale_q(start_time, AV_TIME_BASE_Q, tb); + float_pts /= 1 << extra_bits; + // avoid exact midoints to reduce the chance of rounding differences, this can be removed in case the fps code is changed to work with integers + float_pts += FFSIGN(float_pts) * 1.0 / (1<<17); + + filtered_frame->pts = + av_rescale_q(filtered_frame->pts, filter_tb, enc->time_base) - + av_rescale_q(start_time, AV_TIME_BASE_Q, enc->time_base); + } + + switch (av_buffersink_get_type(filter)) { + case AVMEDIA_TYPE_VIDEO: + if (!ost->frame_aspect_ratio.num) + enc->sample_aspect_ratio = filtered_frame->sample_aspect_ratio; + + if (debug_ts) { + av_log(NULL, AV_LOG_INFO, "filter -> pts:%s pts_time:%s exact:%f time_base:%d/%d\n", + av_ts2str(filtered_frame->pts), av_ts2timestr(filtered_frame->pts, &enc->time_base), + float_pts, + enc->time_base.num, enc->time_base.den); + } + + do_video_out(of, ost, filtered_frame, float_pts); + break; + case AVMEDIA_TYPE_AUDIO: + if (!(enc->codec->capabilities & AV_CODEC_CAP_PARAM_CHANGE) && + enc->channels != filtered_frame->channels) { + av_log(NULL, AV_LOG_ERROR, + "Audio filter graph output is not normalized and encoder does not support parameter changes\n"); + break; + } + do_audio_out(of, ost, filtered_frame); + break; + default: + // TODO support subtitle filters + av_assert0(0); + } + + av_frame_unref(filtered_frame); + } + + return 0; +} + static void print_final_stats(int64_t total_size) { uint64_t video_size = 0, audio_size = 0, extra_size = 0, other_size = 0; @@ -2179,7 +2283,15 @@ static int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame) } } +#if HAVE_THREADS + if (!abr_pipeline) { + ret = reap_filters(1); + } else { + ret = pipeline_reap_filters(1, ifilter); + } +#else ret = reap_filters(1); +#endif if (ret < 0 && ret != AVERROR_EOF) { av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n", av_err2str(ret)); return ret; @@ -2208,6 +2320,16 @@ static int ifilter_send_eof(InputFilter *ifilter, int64_t pts) ifilter->eof = 1; +#if HAVE_THREADS + if (abr_pipeline) { + ifilter->waited_frm = NULL; + pthread_mutex_lock(&ifilter->process_mutex); + ifilter->t_end = 1; + pthread_cond_signal(&ifilter->process_cond); + pthread_mutex_unlock(&ifilter->process_mutex); + pthread_join(ifilter->f_thread, NULL); + } +#endif if (ifilter->filter) { ret = av_buffersrc_close(ifilter->filter, pts, AV_BUFFERSRC_FLAG_PUSH); if (ret < 0) @@ -2252,6 +2374,42 @@ static int decode(AVCodecContext *avctx, AVFrame *frame, int *got_frame, AVPacke return 0; } +#if HAVE_THREADS +static void *filter_pipeline(void *arg) +{ + InputFilter *fl = arg; + AVFrame *frm; + int ret; + while(1) { + pthread_mutex_lock(&fl->process_mutex); + while (fl->waited_frm == NULL && !fl->t_end) + pthread_cond_wait(&fl->process_cond, &fl->process_mutex); + pthread_mutex_unlock(&fl->process_mutex); + + if (fl->t_end) break; + + frm = fl->waited_frm; + ret = ifilter_send_frame(fl, frm); + if (ret < 0) { + av_log(NULL, AV_LOG_ERROR, + "Failed to inject frame into filter network: %s\n", av_err2str(ret)); + } else { + ret = pipeline_reap_filters(0, fl); + } + fl->t_error = ret; + + pthread_mutex_lock(&fl->finish_mutex); + fl->waited_frm = NULL; + pthread_cond_signal(&fl->finish_cond); + pthread_mutex_unlock(&fl->finish_mutex); + + if (ret < 0) { + break; + } + } + return NULL; +} +#endif static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame) { int i, ret; @@ -2259,22 +2417,73 @@ static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame) av_assert1(ist->nb_filters > 0); /* ensure ret is initialized */ for (i = 0; i < ist->nb_filters; i++) { - if (i < ist->nb_filters - 1) { - f = ist->filter_frame; - ret = av_frame_ref(f, decoded_frame); - if (ret < 0) +#if HAVE_THREADS + if (!abr_pipeline) { +#endif + if (i < ist->nb_filters - 1) { + f = ist->filter_frame; + ret = av_frame_ref(f, decoded_frame); + if (ret < 0) + break; + } else + f = decoded_frame; + + ret = ifilter_send_frame(ist->filters[i], f); + if (ret == AVERROR_EOF) + ret = 0; /* ignore */ + if (ret < 0) { + av_log(NULL, AV_LOG_ERROR, + "Failed to inject frame into filter network: %s\n", av_err2str(ret)); + break; + } +#if HAVE_THREADS + } else { + if (i < ist->nb_filters - 1) { + f = &ist->filters[i]->input_frm; + ret = av_frame_ref(f, decoded_frame); + if (ret < 0) + break; + } else + f = decoded_frame; + + if (!ist->filters[i]->b_abr_thread_init) { + if ((ret = pthread_create(&ist->filters[i]->f_thread, NULL, filter_pipeline, ist->filters[i]))) { + av_log(NULL, AV_LOG_ERROR, "pthread_create failed: %s. Try to increase `ulimit -v` or decrease `ulimit -s`.\n", strerror(ret)); + return AVERROR(ret); + } + pthread_mutex_init(&ist->filters[i]->process_mutex, NULL); + pthread_mutex_init(&ist->filters[i]->finish_mutex, NULL); + pthread_cond_init(&ist->filters[i]->process_cond, NULL); + pthread_cond_init(&ist->filters[i]->finish_cond, NULL); + ist->filters[i]->t_end = 0; + ist->filters[i]->t_error = 0; + ist->filters[i]->b_abr_thread_init = 1; + } + + pthread_mutex_lock(&ist->filters[i]->process_mutex); + ist->filters[i]->waited_frm = f; + pthread_cond_signal(&ist->filters[i]->process_cond); + pthread_mutex_unlock(&ist->filters[i]->process_mutex); + } +#endif + } +#if HAVE_THREADS + if (abr_pipeline) { + for (i = 0; i < ist->nb_filters; i++) { + pthread_mutex_lock(&ist->filters[i]->finish_mutex); + while(ist->filters[i]->waited_frm != NULL) + pthread_cond_wait(&ist->filters[i]->finish_cond, &ist->filters[i]->finish_mutex); + pthread_mutex_unlock(&ist->filters[i]->finish_mutex); + } + for (i = 0; i < ist->nb_filters; i++) { + if (ist->filters[i]->t_error < 0) { + ret = ist->filters[i]->t_error; break; - } else - f = decoded_frame; - ret = ifilter_send_frame(ist->filters[i], f); - if (ret == AVERROR_EOF) - ret = 0; /* ignore */ - if (ret < 0) { - av_log(NULL, AV_LOG_ERROR, - "Failed to inject frame into filter network: %s\n", av_err2str(ret)); - break; + } } } +#endif + return ret; } @@ -4642,6 +4851,9 @@ static int transcode_step(void) if (ret < 0) return ret == AVERROR_EOF ? 0 : ret; +#if HAVE_THREADS + if (abr_pipeline) return 0; +#endif return reap_filters(0); } diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index eb1eaf6..e91c243 100644 --- a/fftools/ffmpeg.h +++ b/fftools/ffmpeg.h @@ -253,6 +253,20 @@ typedef struct InputFilter { AVBufferRef *hw_frames_ctx; +#if HAVE_THREADS + // for abr pipeline + AVFrame *waited_frm; + AVFrame input_frm; + pthread_t f_thread; + pthread_cond_t process_cond; + pthread_cond_t finish_cond; + pthread_mutex_t process_mutex; + pthread_mutex_t finish_mutex; + int b_abr_thread_init; + int t_end; + int t_error; +#endif + int eof; } InputFilter; @@ -606,6 +620,7 @@ extern int frame_bits_per_raw_sample; extern AVIOContext *progress_avio; extern float max_error_rate; extern char *videotoolbox_pixfmt; +extern int abr_pipeline; extern int filter_nbthreads; extern int filter_complex_nbthreads; diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c index 6518d50..8823394 100644 --- a/fftools/ffmpeg_filter.c +++ b/fftools/ffmpeg_filter.c @@ -197,6 +197,7 @@ DEF_CHOOSE_FORMAT(channel_layouts, uint64_t, channel_layout, channel_layouts, 0, int init_simple_filtergraph(InputStream *ist, OutputStream *ost) { FilterGraph *fg = av_mallocz(sizeof(*fg)); + int i; if (!fg) exit_program(1); @@ -225,6 +226,11 @@ int init_simple_filtergraph(InputStream *ist, OutputStream *ost) GROW_ARRAY(ist->filters, ist->nb_filters); ist->filters[ist->nb_filters - 1] = fg->inputs[0]; + if (abr_pipeline) { + for (i = 0; i < ist->nb_filters; i++) { + ist->filters[i]->b_abr_thread_init = 0; + } + } GROW_ARRAY(filtergraphs, nb_filtergraphs); filtergraphs[nb_filtergraphs - 1] = fg; diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c index d4851a2..fa5a556 100644 --- a/fftools/ffmpeg_opt.c +++ b/fftools/ffmpeg_opt.c @@ -110,6 +110,7 @@ float max_error_rate = 2.0/3; int filter_nbthreads = 0; int filter_complex_nbthreads = 0; int vstats_version = 2; +int abr_pipeline = 0; static int intra_only = 0; @@ -3502,7 +3503,10 @@ const OptionDef options[] = { "set the maximum number of queued packets from the demuxer" }, { "find_stream_info", OPT_BOOL | OPT_PERFILE | OPT_INPUT | OPT_EXPERT, { &find_stream_info }, "read and decode the streams to fill missing information with heuristics" }, - +#if HAVE_THREADS + { "abr_pipeline", OPT_BOOL, { &abr_pipeline }, + "adaptive bitrate pipeline (1 decode to N filter graphs, and 1 to N transcode" }, +#endif /* video options */ { "vframes", OPT_VIDEO | HAS_ARG | OPT_PERFILE | OPT_OUTPUT, { .func_arg = opt_video_frames }, "set the number of video frames to output", "number" },