Message ID | 20231206103002.30084-11-anton@khirnov.net |
---|---|
State | New |
Headers | show |
Series | [FFmpeg-devel,01/10] fftools/ffmpeg_filter: make sub2video heartbeat more robust | expand |
Context | Check | Description |
---|---|---|
yinshiyou/make_loongarch64 | success | Make finished |
yinshiyou/make_fate_loongarch64 | success | Make fate finished |
andriy/make_x86 | success | Make finished |
andriy/make_fate_x86 | success | Make fate finished |
On Wed, Dec 6, 2023 at 11:32 AM Anton Khirnov <anton@khirnov.net> wrote: > Change the main loop and every component (demuxers, decoders, filters, > encoders, muxers) to use the previously added transcode scheduler. Every > instance of every such component was already running in a separate > thread, but now they can actually run in parallel. > > Changes the results of ffmpeg-fix_sub_duration_heartbeat - tested by > JEEB to be more correct and deterministic. > Un-reviewable, please split it. > --- > Changelog | 2 + > fftools/ffmpeg.c | 374 +-------- > fftools/ffmpeg.h | 97 +-- > fftools/ffmpeg_dec.c | 321 ++------ > fftools/ffmpeg_demux.c | 268 ++++--- > fftools/ffmpeg_enc.c | 368 ++------- > fftools/ffmpeg_filter.c | 722 +++++------------- > fftools/ffmpeg_mux.c | 324 ++------ > fftools/ffmpeg_mux.h | 24 +- > fftools/ffmpeg_mux_init.c | 88 +-- > fftools/ffmpeg_opt.c | 6 +- > .../fate/ffmpeg-fix_sub_duration_heartbeat | 36 +- > 12 files changed, 600 insertions(+), 2030 deletions(-) > > diff --git a/Changelog b/Changelog > index f00bc27ca4..67ef92eb02 100644 > --- a/Changelog > +++ b/Changelog > @@ -7,6 +7,8 @@ version <next>: > - EVC encoding using external library libxeve > - QOA decoder and demuxer > - aap filter > +- demuxing, decoding, filtering, encoding, and muxing in the > + ffmpeg CLI now all run in parallel > > version 6.1: > - libaribcaption decoder > diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c > index b8a97258a0..30b594fd97 100644 > --- a/fftools/ffmpeg.c > +++ b/fftools/ffmpeg.c > @@ -117,7 +117,7 @@ typedef struct BenchmarkTimeStamps { > static BenchmarkTimeStamps get_benchmark_time_stamps(void); > static int64_t getmaxrss(void); > > -unsigned nb_output_dumped = 0; > +atomic_uint nb_output_dumped = 0; > > static BenchmarkTimeStamps current_time; > AVIOContext *progress_avio = NULL; > @@ -138,30 +138,6 @@ static struct termios oldtty; > static int restore_tty; > #endif > > -/* sub2video hack: > - Convert subtitles to video with alpha to insert them in filter graphs. > - This is a temporary solution until libavfilter gets real subtitles > support. > - */ > - > -static void sub2video_heartbeat(InputFile *infile, int64_t pts, > AVRational tb) > -{ > - /* When a frame is read from a file, examine all sub2video streams in > - the same file and send the sub2video frame again. Otherwise, > decoded > - video frames could be accumulating in the filter graph while a > filter > - (possibly overlay) is desperately waiting for a subtitle frame. */ > - for (int i = 0; i < infile->nb_streams; i++) { > - InputStream *ist = infile->streams[i]; > - > - if (ist->dec_ctx->codec_type != AVMEDIA_TYPE_SUBTITLE) > - continue; > - > - for (int j = 0; j < ist->nb_filters; j++) > - ifilter_sub2video_heartbeat(ist->filters[j], pts, tb); > - } > -} > - > -/* end of sub2video hack */ > - > static void term_exit_sigsafe(void) > { > #if HAVE_TERMIOS_H > @@ -499,23 +475,13 @@ void update_benchmark(const char *fmt, ...) > } > } > > -void close_output_stream(OutputStream *ost) > -{ > - OutputFile *of = output_files[ost->file_index]; > - ost->finished |= ENCODER_FINISHED; > - > - if (ost->sq_idx_encode >= 0) > - sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL)); > -} > - > -static void print_report(int is_last_report, int64_t timer_start, int64_t > cur_time) > +static void print_report(int is_last_report, int64_t timer_start, int64_t > cur_time, int64_t pts) > { > AVBPrint buf, buf_script; > int64_t total_size = of_filesize(output_files[0]); > int vid; > double bitrate; > double speed; > - int64_t pts = AV_NOPTS_VALUE; > static int64_t last_time = -1; > static int first_report = 1; > uint64_t nb_frames_dup = 0, nb_frames_drop = 0; > @@ -533,7 +499,7 @@ static void print_report(int is_last_report, int64_t > timer_start, int64_t cur_ti > last_time = cur_time; > } > if (((cur_time - last_time) < stats_period && !first_report) || > - (first_report && nb_output_dumped < nb_output_files)) > + (first_report && atomic_load(&nb_output_dumped) < > nb_output_files)) > return; > last_time = cur_time; > } > @@ -544,7 +510,7 @@ static void print_report(int is_last_report, int64_t > timer_start, int64_t cur_ti > av_bprint_init(&buf, 0, AV_BPRINT_SIZE_AUTOMATIC); > av_bprint_init(&buf_script, 0, AV_BPRINT_SIZE_AUTOMATIC); > for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) { > - const float q = ost->enc ? ost->quality / (float) FF_QP2LAMBDA : > -1; > + const float q = ost->enc ? atomic_load(&ost->quality) / (float) > FF_QP2LAMBDA : -1; > > if (vid && ost->type == AVMEDIA_TYPE_VIDEO) { > av_bprintf(&buf, "q=%2.1f ", q); > @@ -565,22 +531,18 @@ static void print_report(int is_last_report, int64_t > timer_start, int64_t cur_ti > if (is_last_report) > av_bprintf(&buf, "L"); > > - nb_frames_dup = ost->filter->nb_frames_dup; > - nb_frames_drop = ost->filter->nb_frames_drop; > + nb_frames_dup = atomic_load(&ost->filter->nb_frames_dup); > + nb_frames_drop = atomic_load(&ost->filter->nb_frames_drop); > > vid = 1; > } > - /* compute min output value */ > - if (ost->last_mux_dts != AV_NOPTS_VALUE) { > - if (pts == AV_NOPTS_VALUE || ost->last_mux_dts > pts) > - pts = ost->last_mux_dts; > - if (copy_ts) { > - if (copy_ts_first_pts == AV_NOPTS_VALUE && pts > 1) > - copy_ts_first_pts = pts; > - if (copy_ts_first_pts != AV_NOPTS_VALUE) > - pts -= copy_ts_first_pts; > - } > - } > + } > + > + if (copy_ts) { > + if (copy_ts_first_pts == AV_NOPTS_VALUE && pts > 1) > + copy_ts_first_pts = pts; > + if (copy_ts_first_pts != AV_NOPTS_VALUE) > + pts -= copy_ts_first_pts; > } > > us = FFABS64U(pts) % AV_TIME_BASE; > @@ -783,81 +745,6 @@ int subtitle_wrap_frame(AVFrame *frame, AVSubtitle > *subtitle, int copy) > return 0; > } > > -int trigger_fix_sub_duration_heartbeat(OutputStream *ost, const AVPacket > *pkt) > -{ > - OutputFile *of = output_files[ost->file_index]; > - int64_t signal_pts = av_rescale_q(pkt->pts, pkt->time_base, > - AV_TIME_BASE_Q); > - > - if (!ost->fix_sub_duration_heartbeat || !(pkt->flags & > AV_PKT_FLAG_KEY)) > - // we are only interested in heartbeats on streams configured, and > - // only on random access points. > - return 0; > - > - for (int i = 0; i < of->nb_streams; i++) { > - OutputStream *iter_ost = of->streams[i]; > - InputStream *ist = iter_ost->ist; > - int ret = AVERROR_BUG; > - > - if (iter_ost == ost || !ist || !ist->decoding_needed || > - ist->dec_ctx->codec_type != AVMEDIA_TYPE_SUBTITLE) > - // We wish to skip the stream that causes the heartbeat, > - // output streams without an input stream, streams not decoded > - // (as fix_sub_duration is only done for decoded subtitles) as > - // well as non-subtitle streams. > - continue; > - > - if ((ret = fix_sub_duration_heartbeat(ist, signal_pts)) < 0) > - return ret; > - } > - > - return 0; > -} > - > -/* pkt = NULL means EOF (needed to flush decoder buffers) */ > -static int process_input_packet(InputStream *ist, const AVPacket *pkt, > int no_eof) > -{ > - InputFile *f = input_files[ist->file_index]; > - int64_t dts_est = AV_NOPTS_VALUE; > - int ret = 0; > - int eof_reached = 0; > - > - if (ist->decoding_needed) { > - ret = dec_packet(ist, pkt, no_eof); > - if (ret < 0 && ret != AVERROR_EOF) > - return ret; > - } > - if (ret == AVERROR_EOF || (!pkt && !ist->decoding_needed)) > - eof_reached = 1; > - > - if (pkt && pkt->opaque_ref) { > - DemuxPktData *pd = (DemuxPktData*)pkt->opaque_ref->data; > - dts_est = pd->dts_est; > - } > - > - if (f->recording_time != INT64_MAX) { > - int64_t start_time = 0; > - if (copy_ts) { > - start_time += f->start_time != AV_NOPTS_VALUE ? f->start_time > : 0; > - start_time += start_at_zero ? 0 : f->start_time_effective; > - } > - if (dts_est >= f->recording_time + start_time) > - pkt = NULL; > - } > - > - for (int oidx = 0; oidx < ist->nb_outputs; oidx++) { > - OutputStream *ost = ist->outputs[oidx]; > - if (ost->enc || (!pkt && no_eof)) > - continue; > - > - ret = of_streamcopy(ost, pkt, dts_est); > - if (ret < 0) > - return ret; > - } > - > - return !eof_reached; > -} > - > static void print_stream_maps(void) > { > av_log(NULL, AV_LOG_INFO, "Stream mapping:\n"); > @@ -934,43 +821,6 @@ static void print_stream_maps(void) > } > } > > -/** > - * Select the output stream to process. > - * > - * @retval 0 an output stream was selected > - * @retval AVERROR(EAGAIN) need to wait until more input is available > - * @retval AVERROR_EOF no more streams need output > - */ > -static int choose_output(OutputStream **post) > -{ > - int64_t opts_min = INT64_MAX; > - OutputStream *ost_min = NULL; > - > - for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) { > - int64_t opts; > - > - if (ost->filter && ost->filter->last_pts != AV_NOPTS_VALUE) { > - opts = ost->filter->last_pts; > - } else { > - opts = ost->last_mux_dts == AV_NOPTS_VALUE ? > - INT64_MIN : ost->last_mux_dts; > - } > - > - if (!ost->initialized && !ost->finished) { > - ost_min = ost; > - break; > - } > - if (!ost->finished && opts < opts_min) { > - opts_min = opts; > - ost_min = ost; > - } > - } > - if (!ost_min) > - return AVERROR_EOF; > - *post = ost_min; > - return ost_min->unavailable ? AVERROR(EAGAIN) : 0; > -} > - > static void set_tty_echo(int on) > { > #if HAVE_TERMIOS_H > @@ -1042,149 +892,21 @@ static int check_keyboard_interaction(int64_t > cur_time) > return 0; > } > > -static void reset_eagain(void) > -{ > - for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) > - ost->unavailable = 0; > -} > - > -static void decode_flush(InputFile *ifile) > -{ > - for (int i = 0; i < ifile->nb_streams; i++) { > - InputStream *ist = ifile->streams[i]; > - > - if (ist->discard || !ist->decoding_needed) > - continue; > - > - dec_packet(ist, NULL, 1); > - } > -} > - > -/* > - * Return > - * - 0 -- one packet was read and processed > - * - AVERROR(EAGAIN) -- no packets were available for selected file, > - * this function should be called again > - * - AVERROR_EOF -- this function should not be called again > - */ > -static int process_input(int file_index, AVPacket *pkt) > -{ > - InputFile *ifile = input_files[file_index]; > - InputStream *ist; > - int ret, i; > - > - ret = ifile_get_packet(ifile, pkt); > - > - if (ret == 1) { > - /* the input file is looped: flush the decoders */ > - decode_flush(ifile); > - return AVERROR(EAGAIN); > - } > - if (ret < 0) { > - if (ret != AVERROR_EOF) { > - av_log(ifile, AV_LOG_ERROR, > - "Error retrieving a packet from demuxer: %s\n", > av_err2str(ret)); > - if (exit_on_error) > - return ret; > - } > - > - for (i = 0; i < ifile->nb_streams; i++) { > - ist = ifile->streams[i]; > - if (!ist->discard) { > - ret = process_input_packet(ist, NULL, 0); > - if (ret>0) > - return 0; > - else if (ret < 0) > - return ret; > - } > - > - /* mark all outputs that don't go through lavfi as finished */ > - for (int oidx = 0; oidx < ist->nb_outputs; oidx++) { > - OutputStream *ost = ist->outputs[oidx]; > - OutputFile *of = output_files[ost->file_index]; > - > - ret = of_output_packet(of, ost, NULL); > - if (ret < 0) > - return ret; > - } > - } > - > - ifile->eof_reached = 1; > - return AVERROR(EAGAIN); > - } > - > - reset_eagain(); > - > - ist = ifile->streams[pkt->stream_index]; > - > - sub2video_heartbeat(ifile, pkt->pts, pkt->time_base); > - > - ret = process_input_packet(ist, pkt, 0); > - > - av_packet_unref(pkt); > - > - return ret < 0 ? ret : 0; > -} > - > -/** > - * Run a single step of transcoding. > - * > - * @return 0 for success, <0 for error > - */ > -static int transcode_step(OutputStream *ost, AVPacket *demux_pkt) > -{ > - InputStream *ist = NULL; > - int ret; > - > - if (ost->filter) { > - if ((ret = fg_transcode_step(ost->filter->graph, &ist)) < 0) > - return ret; > - if (!ist) > - return 0; > - } else { > - ist = ost->ist; > - av_assert0(ist); > - } > - > - ret = process_input(ist->file_index, demux_pkt); > - if (ret == AVERROR(EAGAIN)) { > - return 0; > - } > - > - if (ret < 0) > - return ret == AVERROR_EOF ? 0 : ret; > - > - // process_input() above might have caused output to become available > - // in multiple filtergraphs, so we process all of them > - for (int i = 0; i < nb_filtergraphs; i++) { > - ret = reap_filters(filtergraphs[i], 0); > - if (ret < 0) > - return ret; > - } > - > - return 0; > -} > - > /* > * The following code is the main loop of the file converter > */ > -static int transcode(Scheduler *sch, int *err_rate_exceeded) > +static int transcode(Scheduler *sch) > { > int ret = 0, i; > - InputStream *ist; > - int64_t timer_start; > - AVPacket *demux_pkt = NULL; > + int64_t timer_start, transcode_ts = 0; > > print_stream_maps(); > > - *err_rate_exceeded = 0; > atomic_store(&transcode_init_done, 1); > > - demux_pkt = av_packet_alloc(); > - if (!demux_pkt) { > - ret = AVERROR(ENOMEM); > - goto fail; > - } > + ret = sch_start(sch); > + if (ret < 0) > + return ret; > > if (stdin_interaction) { > av_log(NULL, AV_LOG_INFO, "Press [q] to stop, [?] for help\n"); > @@ -1192,8 +914,7 @@ static int transcode(Scheduler *sch, int > *err_rate_exceeded) > > timer_start = av_gettime_relative(); > > - while (!received_sigterm) { > - OutputStream *ost; > + while (!sch_wait(sch, stats_period, &transcode_ts)) { > int64_t cur_time= av_gettime_relative(); > > /* if 'q' pressed, exits */ > @@ -1201,49 +922,11 @@ static int transcode(Scheduler *sch, int > *err_rate_exceeded) > if (check_keyboard_interaction(cur_time) < 0) > break; > > - ret = choose_output(&ost); > - if (ret == AVERROR(EAGAIN)) { > - reset_eagain(); > - av_usleep(10000); > - ret = 0; > - continue; > - } else if (ret < 0) { > - av_log(NULL, AV_LOG_VERBOSE, "No more output streams to write > to, finishing.\n"); > - ret = 0; > - break; > - } > - > - ret = transcode_step(ost, demux_pkt); > - if (ret < 0 && ret != AVERROR_EOF) { > - av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n", > av_err2str(ret)); > - break; > - } > - > /* dump report by using the output first video and audio streams > */ > - print_report(0, timer_start, cur_time); > + print_report(0, timer_start, cur_time, transcode_ts); > } > > - /* at the end of stream, we must flush the decoder buffers */ > - for (ist = ist_iter(NULL); ist; ist = ist_iter(ist)) { > - float err_rate; > - > - if (!input_files[ist->file_index]->eof_reached) { > - int err = process_input_packet(ist, NULL, 0); > - ret = err_merge(ret, err); > - } > - > - err_rate = (ist->frames_decoded || ist->decode_errors) ? > - ist->decode_errors / (ist->frames_decoded + > ist->decode_errors) : 0.f; > - if (err_rate > max_error_rate) { > - av_log(ist, AV_LOG_FATAL, "Decode error rate %g exceeds > maximum %g\n", > - err_rate, max_error_rate); > - *err_rate_exceeded = 1; > - } else if (err_rate) > - av_log(ist, AV_LOG_VERBOSE, "Decode error rate %g\n", > err_rate); > - } > - ret = err_merge(ret, enc_flush()); > - > - term_exit(); > + ret = sch_stop(sch); > > /* write the trailer if needed */ > for (i = 0; i < nb_output_files; i++) { > @@ -1251,11 +934,10 @@ static int transcode(Scheduler *sch, int > *err_rate_exceeded) > ret = err_merge(ret, err); > } > > - /* dump report by using the first video and audio streams */ > - print_report(1, timer_start, av_gettime_relative()); > + term_exit(); > > -fail: > - av_packet_free(&demux_pkt); > + /* dump report by using the first video and audio streams */ > + print_report(1, timer_start, av_gettime_relative(), transcode_ts); > > return ret; > } > @@ -1308,7 +990,7 @@ int main(int argc, char **argv) > { > Scheduler *sch = NULL; > > - int ret, err_rate_exceeded; > + int ret; > BenchmarkTimeStamps ti; > > init_dynload(); > @@ -1350,7 +1032,7 @@ int main(int argc, char **argv) > } > > current_time = ti = get_benchmark_time_stamps(); > - ret = transcode(sch, &err_rate_exceeded); > + ret = transcode(sch); > if (ret >= 0 && do_benchmark) { > int64_t utime, stime, rtime; > current_time = get_benchmark_time_stamps(); > @@ -1362,8 +1044,8 @@ int main(int argc, char **argv) > utime / 1000000.0, stime / 1000000.0, rtime / 1000000.0); > } > > - ret = received_nb_signals ? 255 : > - err_rate_exceeded ? 69 : ret; > + ret = received_nb_signals ? 255 : > + (ret == FFMPEG_ERROR_RATE_EXCEEDED) ? 69 : ret; > > finish: > if (ret == AVERROR_EXIT) > diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h > index a89038b765..ba82b7490d 100644 > --- a/fftools/ffmpeg.h > +++ b/fftools/ffmpeg.h > @@ -61,6 +61,8 @@ > #define FFMPEG_OPT_TOP 1 > #define FFMPEG_OPT_FORCE_KF_SOURCE_NO_DROP 1 > > +#define FFMPEG_ERROR_RATE_EXCEEDED FFERRTAG('E', 'R', 'E', 'D') > + > enum VideoSyncMethod { > VSYNC_AUTO = -1, > VSYNC_PASSTHROUGH, > @@ -82,13 +84,16 @@ enum HWAccelID { > }; > > enum FrameOpaque { > - FRAME_OPAQUE_REAP_FILTERS = 1, > - FRAME_OPAQUE_CHOOSE_INPUT, > - FRAME_OPAQUE_SUB_HEARTBEAT, > + FRAME_OPAQUE_SUB_HEARTBEAT = 1, > FRAME_OPAQUE_EOF, > FRAME_OPAQUE_SEND_COMMAND, > }; > > +enum PacketOpaque { > + PKT_OPAQUE_SUB_HEARTBEAT = 1, > + PKT_OPAQUE_FIX_SUB_DURATION, > +}; > + > typedef struct HWDevice { > const char *name; > enum AVHWDeviceType type; > @@ -309,11 +314,8 @@ typedef struct OutputFilter { > > enum AVMediaType type; > > - /* pts of the last frame received from this filter, in AV_TIME_BASE_Q > */ > - int64_t last_pts; > - > - uint64_t nb_frames_dup; > - uint64_t nb_frames_drop; > + atomic_uint_least64_t nb_frames_dup; > + atomic_uint_least64_t nb_frames_drop; > } OutputFilter; > > typedef struct FilterGraph { > @@ -426,11 +428,6 @@ typedef struct InputFile { > > float readrate; > int accurate_seek; > - > - /* when looping the input file, this queue is used by decoders to > report > - * the last frame timestamp back to the demuxer thread */ > - AVThreadMessageQueue *audio_ts_queue; > - int audio_ts_queue_size; > } InputFile; > > enum forced_keyframes_const { > @@ -532,8 +529,6 @@ typedef struct OutputStream { > InputStream *ist; > > AVStream *st; /* stream in the output file */ > - /* dts of the last packet sent to the muxing queue, in AV_TIME_BASE_Q > */ > - int64_t last_mux_dts; > > AVRational enc_timebase; > > @@ -578,13 +573,6 @@ typedef struct OutputStream { > AVDictionary *sws_dict; > AVDictionary *swr_opts; > char *apad; > - OSTFinished finished; /* no more packets should be written for > this stream */ > - int unavailable; /* true if the steram is > unavailable (possibly temporarily) */ > - > - // init_output_stream() has been called for this stream > - // The encoder and the bitstream filters have been initialized and > the stream > - // parameters are set in the AVStream. > - int initialized; > > const char *attachment_filename; > > @@ -598,9 +586,8 @@ typedef struct OutputStream { > uint64_t samples_encoded; > > /* packet quality factor */ > - int quality; > + atomic_int quality; > > - int sq_idx_encode; > int sq_idx_mux; > > EncStats enc_stats_pre; > @@ -658,7 +645,6 @@ extern FilterGraph **filtergraphs; > extern int nb_filtergraphs; > > extern char *vstats_filename; > -extern char *sdp_filename; > > extern float dts_delta_threshold; > extern float dts_error_threshold; > @@ -691,7 +677,7 @@ extern const AVIOInterruptCB int_cb; > extern const OptionDef options[]; > extern HWDevice *filter_hw_device; > > -extern unsigned nb_output_dumped; > +extern atomic_uint nb_output_dumped; > > extern int ignore_unknown_streams; > extern int copy_unknown_streams; > @@ -737,10 +723,6 @@ FrameData *frame_data(AVFrame *frame); > > const FrameData *frame_data_c(AVFrame *frame); > > -int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int > keep_reference); > -int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb); > -void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, > AVRational tb); > - > /** > * Set up fallback filtering parameters from a decoder context. They will > only > * be used if no frames are ever sent on this input, otherwise the actual > @@ -761,26 +743,9 @@ int fg_create(FilterGraph **pfg, char *graph_desc, > Scheduler *sch); > > void fg_free(FilterGraph **pfg); > > -/** > - * Perform a step of transcoding for the specified filter graph. > - * > - * @param[in] graph filter graph to consider > - * @param[out] best_ist input stream where a frame would allow to > continue > - * @return 0 for success, <0 for error > - */ > -int fg_transcode_step(FilterGraph *graph, InputStream **best_ist); > - > void fg_send_command(FilterGraph *fg, double time, const char *target, > const char *command, const char *arg, int > all_filters); > > -/** > - * Get and encode new output from specified filtergraph, without causing > - * activity. > - * > - * @return 0 for success, <0 for severe errors > - */ > -int reap_filters(FilterGraph *fg, int flush); > - > int ffmpeg_parse_options(int argc, char **argv, Scheduler *sch); > > void enc_stats_write(OutputStream *ost, EncStats *es, > @@ -807,25 +772,11 @@ int hwaccel_retrieve_data(AVCodecContext *avctx, > AVFrame *input); > int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx); > void dec_free(Decoder **pdec); > > -/** > - * Submit a packet for decoding > - * > - * When pkt==NULL and no_eof=0, there will be no more input. Flush > decoders and > - * mark all downstreams as finished. > - * > - * When pkt==NULL and no_eof=1, the stream was reset (e.g. after a seek). > Flush > - * decoders and await further input. > - */ > -int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof); > - > int enc_alloc(Encoder **penc, const AVCodec *codec, > Scheduler *sch, unsigned sch_idx); > void enc_free(Encoder **penc); > > -int enc_open(OutputStream *ost, const AVFrame *frame); > -int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle > *sub); > -int enc_frame(OutputStream *ost, AVFrame *frame); > -int enc_flush(void); > +int enc_open(void *opaque, const AVFrame *frame); > > /* > * Initialize muxing state for the given stream, should be called > @@ -840,30 +791,11 @@ void of_free(OutputFile **pof); > > void of_enc_stats_close(void); > > -int of_output_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt); > - > -/** > - * @param dts predicted packet dts in AV_TIME_BASE_Q > - */ > -int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts); > - > int64_t of_filesize(OutputFile *of); > > int ifile_open(const OptionsContext *o, const char *filename, Scheduler > *sch); > void ifile_close(InputFile **f); > > -/** > - * Get next input packet from the demuxer. > - * > - * @param pkt the packet is written here when this function returns 0 > - * @return > - * - 0 when a packet has been read successfully > - * - 1 when stream end was reached, but the stream is looped; > - * caller should flush decoders and read from this demuxer again > - * - a negative error code on failure > - */ > -int ifile_get_packet(InputFile *f, AVPacket *pkt); > - > int ist_output_add(InputStream *ist, OutputStream *ost); > int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple); > > @@ -880,9 +812,6 @@ InputStream *ist_iter(InputStream *prev); > * pass NULL to start iteration */ > OutputStream *ost_iter(OutputStream *prev); > > -void close_output_stream(OutputStream *ost); > -int trigger_fix_sub_duration_heartbeat(OutputStream *ost, const AVPacket > *pkt); > -int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts); > void update_benchmark(const char *fmt, ...); > > #define SPECIFIER_OPT_FMT_str "%s" > diff --git a/fftools/ffmpeg_dec.c b/fftools/ffmpeg_dec.c > index 90ea0d6d93..5dde82a276 100644 > --- a/fftools/ffmpeg_dec.c > +++ b/fftools/ffmpeg_dec.c > @@ -54,24 +54,6 @@ struct Decoder { > > Scheduler *sch; > unsigned sch_idx; > - > - pthread_t thread; > - /** > - * Queue for sending coded packets from the main thread to > - * the decoder thread. > - * > - * An empty packet is sent to flush the decoder without terminating > - * decoding. > - */ > - ThreadQueue *queue_in; > - /** > - * Queue for sending decoded frames from the decoder thread > - * to the main thread. > - * > - * An empty frame is sent to signal that a single packet has been > fully > - * processed. > - */ > - ThreadQueue *queue_out; > }; > > // data that is local to the decoder thread and not visible outside of it > @@ -80,24 +62,6 @@ typedef struct DecThreadContext { > AVPacket *pkt; > } DecThreadContext; > > -static int dec_thread_stop(Decoder *d) > -{ > - void *ret; > - > - if (!d->queue_in) > - return 0; > - > - tq_send_finish(d->queue_in, 0); > - tq_receive_finish(d->queue_out, 0); > - > - pthread_join(d->thread, &ret); > - > - tq_free(&d->queue_in); > - tq_free(&d->queue_out); > - > - return (intptr_t)ret; > -} > - > void dec_free(Decoder **pdec) > { > Decoder *dec = *pdec; > @@ -105,8 +69,6 @@ void dec_free(Decoder **pdec) > if (!dec) > return; > > - dec_thread_stop(dec); > - > av_frame_free(&dec->frame); > av_packet_free(&dec->pkt); > > @@ -148,25 +110,6 @@ fail: > return AVERROR(ENOMEM); > } > > -static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame) > -{ > - int i, ret = 0; > - > - for (i = 0; i < ist->nb_filters; i++) { > - ret = ifilter_send_frame(ist->filters[i], decoded_frame, > - i < ist->nb_filters - 1 || > - ist->dec->type == AVMEDIA_TYPE_SUBTITLE); > - if (ret == AVERROR_EOF) > - ret = 0; /* ignore */ > - if (ret < 0) { > - av_log(NULL, AV_LOG_ERROR, > - "Failed to inject frame into filter network: %s\n", > av_err2str(ret)); > - break; > - } > - } > - return ret; > -} > - > static AVRational audio_samplerate_update(void *logctx, Decoder *d, > const AVFrame *frame) > { > @@ -421,28 +364,14 @@ static int process_subtitle(InputStream *ist, > AVFrame *frame) > if (!subtitle) > return 0; > > - ret = send_frame_to_filters(ist, frame); > + ret = sch_dec_send(d->sch, d->sch_idx, frame); > if (ret < 0) > - return ret; > + av_frame_unref(frame); > > - subtitle = (AVSubtitle*)frame->buf[0]->data; > - if (!subtitle->num_rects) > - return 0; > - > - for (int oidx = 0; oidx < ist->nb_outputs; oidx++) { > - OutputStream *ost = ist->outputs[oidx]; > - if (!ost->enc || ost->type != AVMEDIA_TYPE_SUBTITLE) > - continue; > - > - ret = enc_subtitle(output_files[ost->file_index], ost, subtitle); > - if (ret < 0) > - return ret; > - } > - > - return 0; > + return ret == AVERROR_EOF ? AVERROR_EXIT : ret; > } > > -int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts) > +static int fix_sub_duration_heartbeat(InputStream *ist, int64_t > signal_pts) > { > Decoder *d = ist->decoder; > int ret = AVERROR_BUG; > @@ -468,12 +397,24 @@ int fix_sub_duration_heartbeat(InputStream *ist, > int64_t signal_pts) > static int transcode_subtitles(InputStream *ist, const AVPacket *pkt, > AVFrame *frame) > { > - Decoder *d = ist->decoder; > + Decoder *d = ist->decoder; > AVPacket *flush_pkt = NULL; > AVSubtitle subtitle; > int got_output; > int ret; > > + if (pkt && (intptr_t)pkt->opaque == PKT_OPAQUE_SUB_HEARTBEAT) { > + frame->pts = pkt->pts; > + frame->time_base = pkt->time_base; > + frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_SUB_HEARTBEAT; > + > + ret = sch_dec_send(d->sch, d->sch_idx, frame); > + return ret == AVERROR_EOF ? AVERROR_EXIT : ret; > + } else if (pkt && (intptr_t)pkt->opaque == > PKT_OPAQUE_FIX_SUB_DURATION) { > + return fix_sub_duration_heartbeat(ist, av_rescale_q(pkt->pts, > pkt->time_base, > + > AV_TIME_BASE_Q)); > + } > + > if (!pkt) { > flush_pkt = av_packet_alloc(); > if (!flush_pkt) > @@ -496,7 +437,7 @@ static int transcode_subtitles(InputStream *ist, const > AVPacket *pkt, > > ist->frames_decoded++; > > - // XXX the queue for transferring data back to the main thread runs > + // XXX the queue for transferring data to consumers runs > // on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put that > // inside the frame > // eventually, subtitles should be switched to use AVFrames natively > @@ -509,26 +450,7 @@ static int transcode_subtitles(InputStream *ist, > const AVPacket *pkt, > frame->width = ist->dec_ctx->width; > frame->height = ist->dec_ctx->height; > > - ret = tq_send(d->queue_out, 0, frame); > - if (ret < 0) > - av_frame_unref(frame); > - > - return ret; > -} > - > -static int send_filter_eof(InputStream *ist) > -{ > - Decoder *d = ist->decoder; > - int i, ret; > - > - for (i = 0; i < ist->nb_filters; i++) { > - int64_t end_pts = d->last_frame_pts == AV_NOPTS_VALUE ? > AV_NOPTS_VALUE : > - d->last_frame_pts + d->last_frame_duration_est; > - ret = ifilter_send_eof(ist->filters[i], end_pts, > d->last_frame_tb); > - if (ret < 0) > - return ret; > - } > - return 0; > + return process_subtitle(ist, frame); > } > > static int packet_decode(InputStream *ist, AVPacket *pkt, AVFrame *frame) > @@ -635,9 +557,11 @@ static int packet_decode(InputStream *ist, AVPacket > *pkt, AVFrame *frame) > > ist->frames_decoded++; > > - ret = tq_send(d->queue_out, 0, frame); > - if (ret < 0) > - return ret; > + ret = sch_dec_send(d->sch, d->sch_idx, frame); > + if (ret < 0) { > + av_frame_unref(frame); > + return ret == AVERROR_EOF ? AVERROR_EXIT : ret; > + } > } > } > > @@ -679,7 +603,6 @@ fail: > void *decoder_thread(void *arg) > { > InputStream *ist = arg; > - InputFile *ifile = input_files[ist->file_index]; > Decoder *d = ist->decoder; > DecThreadContext dt; > int ret = 0, input_status = 0; > @@ -691,19 +614,31 @@ void *decoder_thread(void *arg) > dec_thread_set_name(ist); > > while (!input_status) { > - int dummy, flush_buffers; > + int flush_buffers, have_data; > > - input_status = tq_receive(d->queue_in, &dummy, dt.pkt); > - flush_buffers = input_status >= 0 && !dt.pkt->buf; > - if (!dt.pkt->buf) > + input_status = sch_dec_receive(d->sch, d->sch_idx, dt.pkt); > + have_data = input_status >= 0 && > + (dt.pkt->buf || dt.pkt->side_data_elems || > + (intptr_t)dt.pkt->opaque == PKT_OPAQUE_SUB_HEARTBEAT || > + (intptr_t)dt.pkt->opaque == PKT_OPAQUE_FIX_SUB_DURATION); > + flush_buffers = input_status >= 0 && !have_data; > + if (!have_data) > av_log(ist, AV_LOG_VERBOSE, "Decoder thread received %s > packet\n", > flush_buffers ? "flush" : "EOF"); > > - ret = packet_decode(ist, dt.pkt->buf ? dt.pkt : NULL, dt.frame); > + ret = packet_decode(ist, have_data ? dt.pkt : NULL, dt.frame); > > av_packet_unref(dt.pkt); > av_frame_unref(dt.frame); > > + // AVERROR_EOF - EOF from the decoder > + // AVERROR_EXIT - EOF from the scheduler > + // we treat them differently when flushing > + if (ret == AVERROR_EXIT) { > + ret = AVERROR_EOF; > + flush_buffers = 0; > + } > + > if (ret == AVERROR_EOF) { > av_log(ist, AV_LOG_VERBOSE, "Decoder returned EOF, %s\n", > flush_buffers ? "resetting" : "finishing"); > @@ -711,11 +646,10 @@ void *decoder_thread(void *arg) > if (!flush_buffers) > break; > > - /* report last frame duration to the demuxer thread */ > + /* report last frame duration to the scheduler */ > if (ist->dec->type == AVMEDIA_TYPE_AUDIO) { > - Timestamp ts = { .ts = d->last_frame_pts + > d->last_frame_duration_est, > - .tb = d->last_frame_tb }; > - av_thread_message_queue_send(ifile->audio_ts_queue, &ts, > 0); > + dt.pkt->pts = d->last_frame_pts + > d->last_frame_duration_est; > + dt.pkt->time_base = d->last_frame_tb; > } > > avcodec_flush_buffers(ist->dec_ctx); > @@ -724,149 +658,47 @@ void *decoder_thread(void *arg) > av_err2str(ret)); > break; > } > - > - // signal to the consumer thread that the entire packet was > processed > - ret = tq_send(d->queue_out, 0, dt.frame); > - if (ret < 0) { > - if (ret != AVERROR_EOF) > - av_log(ist, AV_LOG_ERROR, "Error communicating with the > main thread\n"); > - break; > - } > } > > // EOF is normal thread termination > if (ret == AVERROR_EOF) > ret = 0; > > + // on success send EOF timestamp to our downstreams > + if (ret >= 0) { > + float err_rate; > + > + av_frame_unref(dt.frame); > + > + dt.frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_EOF; > + dt.frame->pts = d->last_frame_pts == AV_NOPTS_VALUE ? > AV_NOPTS_VALUE : > + d->last_frame_pts + > d->last_frame_duration_est; > + dt.frame->time_base = d->last_frame_tb; > + > + ret = sch_dec_send(d->sch, d->sch_idx, dt.frame); > + if (ret < 0 && ret != AVERROR_EOF) { > + av_log(NULL, AV_LOG_FATAL, > + "Error signalling EOF timestamp: %s\n", > av_err2str(ret)); > + goto finish; > + } > + ret = 0; > + > + err_rate = (ist->frames_decoded || ist->decode_errors) ? > + ist->decode_errors / (ist->frames_decoded + > ist->decode_errors) : 0.f; > + if (err_rate > max_error_rate) { > + av_log(ist, AV_LOG_FATAL, "Decode error rate %g exceeds > maximum %g\n", > + err_rate, max_error_rate); > + ret = FFMPEG_ERROR_RATE_EXCEEDED; > + } else if (err_rate) > + av_log(ist, AV_LOG_VERBOSE, "Decode error rate %g\n", > err_rate); > + } > + > finish: > - tq_receive_finish(d->queue_in, 0); > - tq_send_finish (d->queue_out, 0); > - > - // make sure the demuxer does not get stuck waiting for audio > durations > - // that will never arrive > - if (ifile->audio_ts_queue && ist->dec->type == AVMEDIA_TYPE_AUDIO) > - av_thread_message_queue_set_err_recv(ifile->audio_ts_queue, > AVERROR_EOF); > - > dec_thread_uninit(&dt); > > - av_log(ist, AV_LOG_VERBOSE, "Terminating decoder thread\n"); > - > return (void*)(intptr_t)ret; > } > > -int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof) > -{ > - Decoder *d = ist->decoder; > - int ret = 0, thread_ret; > - > - // thread already joined > - if (!d->queue_in) > - return AVERROR_EOF; > - > - // send the packet/flush request/EOF to the decoder thread > - if (pkt || no_eof) { > - av_packet_unref(d->pkt); > - > - if (pkt) { > - ret = av_packet_ref(d->pkt, pkt); > - if (ret < 0) > - goto finish; > - } > - > - ret = tq_send(d->queue_in, 0, d->pkt); > - if (ret < 0) > - goto finish; > - } else > - tq_send_finish(d->queue_in, 0); > - > - // retrieve all decoded data for the packet > - while (1) { > - int dummy; > - > - ret = tq_receive(d->queue_out, &dummy, d->frame); > - if (ret < 0) > - goto finish; > - > - // packet fully processed > - if (!d->frame->buf[0]) > - return 0; > - > - // process the decoded frame > - if (ist->dec->type == AVMEDIA_TYPE_SUBTITLE) { > - ret = process_subtitle(ist, d->frame); > - } else { > - ret = send_frame_to_filters(ist, d->frame); > - } > - av_frame_unref(d->frame); > - if (ret < 0) > - goto finish; > - } > - > -finish: > - thread_ret = dec_thread_stop(d); > - if (thread_ret < 0) { > - av_log(ist, AV_LOG_ERROR, "Decoder thread returned error: %s\n", > - av_err2str(thread_ret)); > - ret = err_merge(ret, thread_ret); > - } > - // non-EOF errors here are all fatal > - if (ret < 0 && ret != AVERROR_EOF) > - return ret; > - > - // signal EOF to our downstreams > - ret = send_filter_eof(ist); > - if (ret < 0) { > - av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n"); > - return ret; > - } > - > - return AVERROR_EOF; > -} > - > -static int dec_thread_start(InputStream *ist) > -{ > - Decoder *d = ist->decoder; > - ObjPool *op; > - int ret = 0; > - > - op = objpool_alloc_packets(); > - if (!op) > - return AVERROR(ENOMEM); > - > - d->queue_in = tq_alloc(1, 1, op, pkt_move); > - if (!d->queue_in) { > - objpool_free(&op); > - return AVERROR(ENOMEM); > - } > - > - op = objpool_alloc_frames(); > - if (!op) > - goto fail; > - > - d->queue_out = tq_alloc(1, 4, op, frame_move); > - if (!d->queue_out) { > - objpool_free(&op); > - goto fail; > - } > - > - ret = pthread_create(&d->thread, NULL, decoder_thread, ist); > - if (ret) { > - ret = AVERROR(ret); > - av_log(ist, AV_LOG_ERROR, "pthread_create() failed: %s\n", > - av_err2str(ret)); > - goto fail; > - } > - > - return 0; > -fail: > - if (ret >= 0) > - ret = AVERROR(ENOMEM); > - > - tq_free(&d->queue_in); > - tq_free(&d->queue_out); > - return ret; > -} > - > static enum AVPixelFormat get_format(AVCodecContext *s, const enum > AVPixelFormat *pix_fmts) > { > InputStream *ist = s->opaque; > @@ -1118,12 +950,5 @@ int dec_open(InputStream *ist, Scheduler *sch, > unsigned sch_idx) > if (ret < 0) > return ret; > > - ret = dec_thread_start(ist); > - if (ret < 0) { > - av_log(ist, AV_LOG_ERROR, "Error starting decoder thread: %s\n", > - av_err2str(ret)); > - return ret; > - } > - > return 0; > } > diff --git a/fftools/ffmpeg_demux.c b/fftools/ffmpeg_demux.c > index 2234dbe076..91cd7a1125 100644 > --- a/fftools/ffmpeg_demux.c > +++ b/fftools/ffmpeg_demux.c > @@ -22,8 +22,6 @@ > #include "ffmpeg.h" > #include "ffmpeg_sched.h" > #include "ffmpeg_utils.h" > -#include "objpool.h" > -#include "thread_queue.h" > > #include "libavutil/avassert.h" > #include "libavutil/avstring.h" > @@ -35,7 +33,6 @@ > #include "libavutil/pixdesc.h" > #include "libavutil/time.h" > #include "libavutil/timestamp.h" > -#include "libavutil/thread.h" > > #include "libavcodec/packet.h" > > @@ -66,7 +63,11 @@ typedef struct DemuxStream { > > double ts_scale; > > + // scheduler returned EOF for this stream > + int finished; > + > int streamcopy_needed; > + int have_sub2video; > > int wrap_correction_done; > int saw_first_ts; > @@ -101,6 +102,7 @@ typedef struct Demuxer { > > /* number of times input stream should be looped */ > int loop; > + int have_audio_dec; > /* duration of the looped segment of the input file */ > Timestamp duration; > /* pts with the smallest/largest values ever seen */ > @@ -113,11 +115,12 @@ typedef struct Demuxer { > double readrate_initial_burst; > > Scheduler *sch; > - ThreadQueue *thread_queue; > - int thread_queue_size; > - pthread_t thread; > + > + AVPacket *pkt_heartbeat; > > int read_started; > + int nb_streams_used; > + int nb_streams_finished; > } Demuxer; > > static DemuxStream *ds_from_ist(InputStream *ist) > @@ -153,7 +156,7 @@ static void report_new_stream(Demuxer *d, const > AVPacket *pkt) > d->nb_streams_warn = pkt->stream_index + 1; > } > > -static int seek_to_start(Demuxer *d) > +static int seek_to_start(Demuxer *d, Timestamp end_pts) > { > InputFile *ifile = &d->f; > AVFormatContext *is = ifile->ctx; > @@ -163,21 +166,10 @@ static int seek_to_start(Demuxer *d) > if (ret < 0) > return ret; > > - if (ifile->audio_ts_queue_size) { > - int got_ts = 0; > - > - while (got_ts < ifile->audio_ts_queue_size) { > - Timestamp ts; > - ret = av_thread_message_queue_recv(ifile->audio_ts_queue, > &ts, 0); > - if (ret < 0) > - return ret; > - got_ts++; > - > - if (d->max_pts.ts == AV_NOPTS_VALUE || > - av_compare_ts(d->max_pts.ts, d->max_pts.tb, ts.ts, ts.tb) > < 0) > - d->max_pts = ts; > - } > - } > + if (end_pts.ts != AV_NOPTS_VALUE && > + (d->max_pts.ts == AV_NOPTS_VALUE || > + av_compare_ts(d->max_pts.ts, d->max_pts.tb, end_pts.ts, > end_pts.tb) < 0)) > + d->max_pts = end_pts; > > if (d->max_pts.ts != AV_NOPTS_VALUE) { > int64_t min_pts = d->min_pts.ts == AV_NOPTS_VALUE ? 0 : > d->min_pts.ts; > @@ -404,7 +396,7 @@ static int ts_fixup(Demuxer *d, AVPacket *pkt) > duration = av_rescale_q(d->duration.ts, d->duration.tb, > pkt->time_base); > if (pkt->pts != AV_NOPTS_VALUE) { > // audio decoders take precedence for estimating total file > duration > - int64_t pkt_duration = ifile->audio_ts_queue_size ? 0 : > pkt->duration; > + int64_t pkt_duration = d->have_audio_dec ? 0 : pkt->duration; > > pkt->pts += duration; > > @@ -440,7 +432,7 @@ static int ts_fixup(Demuxer *d, AVPacket *pkt) > return 0; > } > > -static int input_packet_process(Demuxer *d, AVPacket *pkt) > +static int input_packet_process(Demuxer *d, AVPacket *pkt, unsigned > *send_flags) > { > InputFile *f = &d->f; > InputStream *ist = f->streams[pkt->stream_index]; > @@ -451,6 +443,16 @@ static int input_packet_process(Demuxer *d, AVPacket > *pkt) > if (ret < 0) > return ret; > > + if (f->recording_time != INT64_MAX) { > + int64_t start_time = 0; > + if (copy_ts) { > + start_time += f->start_time != AV_NOPTS_VALUE ? f->start_time > : 0; > + start_time += start_at_zero ? 0 : f->start_time_effective; > + } > + if (ds->dts >= f->recording_time + start_time) > + *send_flags |= DEMUX_SEND_STREAMCOPY_EOF; > + } > + > ds->data_size += pkt->size; > ds->nb_packets++; > > @@ -465,6 +467,8 @@ static int input_packet_process(Demuxer *d, AVPacket > *pkt) > av_ts2timestr(input_files[ist->file_index]->ts_offset, > &AV_TIME_BASE_Q)); > } > > + pkt->stream_index = ds->sch_idx_stream; > + > return 0; > } > > @@ -488,6 +492,65 @@ static void readrate_sleep(Demuxer *d) > } > } > > +static int do_send(Demuxer *d, DemuxStream *ds, AVPacket *pkt, unsigned > flags, > + const char *pkt_desc) > +{ > + int ret; > + > + ret = sch_demux_send(d->sch, d->f.index, pkt, flags); > + if (ret == AVERROR_EOF) { > + av_packet_unref(pkt); > + > + av_log(ds, AV_LOG_VERBOSE, "All consumers of this stream are > done\n"); > + ds->finished = 1; > + > + if (++d->nb_streams_finished == d->nb_streams_used) { > + av_log(d, AV_LOG_VERBOSE, "All consumers are done\n"); > + return AVERROR_EOF; > + } > + } else if (ret < 0) { > + if (ret != AVERROR_EXIT) > + av_log(d, AV_LOG_ERROR, > + "Unable to send %s packet to consumers: %s\n", > + pkt_desc, av_err2str(ret)); > + return ret; > + } > + > + return 0; > +} > + > +static int demux_send(Demuxer *d, DemuxStream *ds, AVPacket *pkt, > unsigned flags) > +{ > + InputFile *f = &d->f; > + int ret; > + > + // send heartbeat for sub2video streams > + if (d->pkt_heartbeat && pkt->pts != AV_NOPTS_VALUE) { > + for (int i = 0; i < f->nb_streams; i++) { > + DemuxStream *ds1 = ds_from_ist(f->streams[i]); > + > + if (ds1->finished || !ds1->have_sub2video) > + continue; > + > + d->pkt_heartbeat->pts = pkt->pts; > + d->pkt_heartbeat->time_base = pkt->time_base; > + d->pkt_heartbeat->stream_index = ds1->sch_idx_stream; > + d->pkt_heartbeat->opaque = > (void*)(intptr_t)PKT_OPAQUE_SUB_HEARTBEAT; > + > + ret = do_send(d, ds1, d->pkt_heartbeat, 0, "heartbeat"); > + if (ret < 0) > + return ret; > + } > + } > + > + ret = do_send(d, ds, pkt, flags, "demuxed"); > + if (ret < 0) > + return ret; > + > + > + return 0; > +} > + > static void discard_unused_programs(InputFile *ifile) > { > for (int j = 0; j < ifile->ctx->nb_programs; j++) { > @@ -527,9 +590,13 @@ static void *input_thread(void *arg) > > discard_unused_programs(f); > > + d->read_started = 1; > d->wallclock_start = av_gettime_relative(); > > while (1) { > + DemuxStream *ds; > + unsigned send_flags = 0; > + > ret = av_read_frame(f->ctx, pkt); > > if (ret == AVERROR(EAGAIN)) { > @@ -538,11 +605,13 @@ static void *input_thread(void *arg) > } > if (ret < 0) { > if (d->loop) { > - /* signal looping to the consumer thread */ > + /* signal looping to our consumers */ > pkt->stream_index = -1; > - ret = tq_send(d->thread_queue, 0, pkt); > + > + ret = sch_demux_send(d->sch, f->index, pkt, 0); > if (ret >= 0) > - ret = seek_to_start(d); > + ret = seek_to_start(d, (Timestamp){ .ts = pkt->pts, > + .tb = > pkt->time_base }); > if (ret >= 0) > continue; > > @@ -551,9 +620,11 @@ static void *input_thread(void *arg) > > if (ret == AVERROR_EOF) > av_log(d, AV_LOG_VERBOSE, "EOF while reading input\n"); > - else > + else { > av_log(d, AV_LOG_ERROR, "Error during demuxing: %s\n", > av_err2str(ret)); > + ret = exit_on_error ? ret : 0; > + } > > break; > } > @@ -565,8 +636,9 @@ static void *input_thread(void *arg) > > /* the following test is needed in case new streams appear > dynamically in stream : we ignore them */ > - if (pkt->stream_index >= f->nb_streams || > - f->streams[pkt->stream_index]->discard) { > + ds = pkt->stream_index < f->nb_streams ? > + ds_from_ist(f->streams[pkt->stream_index]) : NULL; > + if (!ds || ds->ist.discard || ds->finished) { > report_new_stream(d, pkt); > av_packet_unref(pkt); > continue; > @@ -583,122 +655,26 @@ static void *input_thread(void *arg) > } > } > > - ret = input_packet_process(d, pkt); > + ret = input_packet_process(d, pkt, &send_flags); > if (ret < 0) > break; > > if (f->readrate) > readrate_sleep(d); > > - ret = tq_send(d->thread_queue, 0, pkt); > - if (ret < 0) { > - if (ret != AVERROR_EOF) > - av_log(f, AV_LOG_ERROR, > - "Unable to send packet to main thread: %s\n", > - av_err2str(ret)); > + ret = demux_send(d, ds, pkt, send_flags); > + if (ret < 0) > break; > - } > } > > + // EOF/EXIT is normal termination > + if (ret == AVERROR_EOF || ret == AVERROR_EXIT) > + ret = 0; > + > finish: > - av_assert0(ret < 0); > - tq_send_finish(d->thread_queue, 0); > - > av_packet_free(&pkt); > > - av_log(d, AV_LOG_VERBOSE, "Terminating demuxer thread\n"); > - > - return NULL; > -} > - > -static void thread_stop(Demuxer *d) > -{ > - InputFile *f = &d->f; > - > - if (!d->thread_queue) > - return; > - > - tq_receive_finish(d->thread_queue, 0); > - > - pthread_join(d->thread, NULL); > - > - tq_free(&d->thread_queue); > - > - av_thread_message_queue_free(&f->audio_ts_queue); > -} > - > -static int thread_start(Demuxer *d) > -{ > - int ret; > - InputFile *f = &d->f; > - ObjPool *op; > - > - if (d->thread_queue_size <= 0) > - d->thread_queue_size = (nb_input_files > 1 ? 8 : 1); > - > - op = objpool_alloc_packets(); > - if (!op) > - return AVERROR(ENOMEM); > - > - d->thread_queue = tq_alloc(1, d->thread_queue_size, op, pkt_move); > - if (!d->thread_queue) { > - objpool_free(&op); > - return AVERROR(ENOMEM); > - } > - > - if (d->loop) { > - int nb_audio_dec = 0; > - > - for (int i = 0; i < f->nb_streams; i++) { > - InputStream *ist = f->streams[i]; > - nb_audio_dec += !!(ist->decoding_needed && > - ist->st->codecpar->codec_type == > AVMEDIA_TYPE_AUDIO); > - } > - > - if (nb_audio_dec) { > - ret = av_thread_message_queue_alloc(&f->audio_ts_queue, > - nb_audio_dec, > sizeof(Timestamp)); > - if (ret < 0) > - goto fail; > - f->audio_ts_queue_size = nb_audio_dec; > - } > - } > - > - if ((ret = pthread_create(&d->thread, NULL, input_thread, d))) { > - av_log(d, AV_LOG_ERROR, "pthread_create failed: %s. Try to > increase `ulimit -v` or decrease `ulimit -s`.\n", strerror(ret)); > - ret = AVERROR(ret); > - goto fail; > - } > - > - d->read_started = 1; > - > - return 0; > -fail: > - tq_free(&d->thread_queue); > - return ret; > -} > - > -int ifile_get_packet(InputFile *f, AVPacket *pkt) > -{ > - Demuxer *d = demuxer_from_ifile(f); > - int ret, dummy; > - > - if (!d->thread_queue) { > - ret = thread_start(d); > - if (ret < 0) > - return ret; > - } > - > - ret = tq_receive(d->thread_queue, &dummy, pkt); > - if (ret < 0) > - return ret; > - > - if (pkt->stream_index == -1) { > - av_assert0(!pkt->data && !pkt->side_data_elems); > - return 1; > - } > - > - return 0; > + return (void*)(intptr_t)ret; > } > > static void demux_final_stats(Demuxer *d) > @@ -769,8 +745,6 @@ void ifile_close(InputFile **pf) > if (!f) > return; > > - thread_stop(d); > - > if (d->read_started) > demux_final_stats(d); > > @@ -780,6 +754,8 @@ void ifile_close(InputFile **pf) > > avformat_close_input(&f->ctx); > > + av_packet_free(&d->pkt_heartbeat); > + > av_freep(pf); > } > > @@ -802,7 +778,11 @@ static int ist_use(InputStream *ist, int > decoding_needed) > ds->sch_idx_stream = ret; > } > > - ist->discard = 0; > + if (ist->discard) { > + ist->discard = 0; > + d->nb_streams_used++; > + } > + > ist->st->discard = ist->user_set_discard; > ist->decoding_needed |= decoding_needed; > ds->streamcopy_needed |= !decoding_needed; > @@ -823,6 +803,8 @@ static int ist_use(InputStream *ist, int > decoding_needed) > ret = dec_open(ist, d->sch, ds->sch_idx_dec); > if (ret < 0) > return ret; > + > + d->have_audio_dec |= is_audio; > } > > return 0; > @@ -848,6 +830,7 @@ int ist_output_add(InputStream *ist, OutputStream *ost) > > int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple) > { > + Demuxer *d = demuxer_from_ifile(input_files[ist->file_index]); > DemuxStream *ds = ds_from_ist(ist); > int ret; > > @@ -866,6 +849,15 @@ int ist_filter_add(InputStream *ist, InputFilter > *ifilter, int is_simple) > if (ret < 0) > return ret; > > + if (ist->dec_ctx->codec_type == AVMEDIA_TYPE_SUBTITLE) { > + if (!d->pkt_heartbeat) { > + d->pkt_heartbeat = av_packet_alloc(); > + if (!d->pkt_heartbeat) > + return AVERROR(ENOMEM); > + } > + ds->have_sub2video = 1; > + } > + > return ds->sch_idx_dec; > } > > @@ -1607,8 +1599,6 @@ int ifile_open(const OptionsContext *o, const char > *filename, Scheduler *sch) > "since neither -readrate nor -re were given\n"); > } > > - d->thread_queue_size = o->thread_queue_size; > - > /* Add all the streams from the given input file to the demuxer */ > for (int i = 0; i < ic->nb_streams; i++) { > ret = ist_add(o, d, ic->streams[i]); > diff --git a/fftools/ffmpeg_enc.c b/fftools/ffmpeg_enc.c > index 9871381c0e..9383b167f7 100644 > --- a/fftools/ffmpeg_enc.c > +++ b/fftools/ffmpeg_enc.c > @@ -41,12 +41,6 @@ > #include "libavformat/avformat.h" > > struct Encoder { > - AVFrame *sq_frame; > - > - // packet for receiving encoded output > - AVPacket *pkt; > - AVFrame *sub_frame; > - > // combined size of all the packets received from the encoder > uint64_t data_size; > > @@ -54,25 +48,9 @@ struct Encoder { > uint64_t packets_encoded; > > int opened; > - int finished; > > Scheduler *sch; > unsigned sch_idx; > - > - pthread_t thread; > - /** > - * Queue for sending frames from the main thread to > - * the encoder thread. > - */ > - ThreadQueue *queue_in; > - /** > - * Queue for sending encoded packets from the encoder thread > - * to the main thread. > - * > - * An empty packet is sent to signal that a previously sent > - * frame has been fully processed. > - */ > - ThreadQueue *queue_out; > }; > > // data that is local to the decoder thread and not visible outside of it > @@ -81,24 +59,6 @@ typedef struct EncoderThread { > AVPacket *pkt; > } EncoderThread; > > -static int enc_thread_stop(Encoder *e) > -{ > - void *ret; > - > - if (!e->queue_in) > - return 0; > - > - tq_send_finish(e->queue_in, 0); > - tq_receive_finish(e->queue_out, 0); > - > - pthread_join(e->thread, &ret); > - > - tq_free(&e->queue_in); > - tq_free(&e->queue_out); > - > - return (int)(intptr_t)ret; > -} > - > void enc_free(Encoder **penc) > { > Encoder *enc = *penc; > @@ -106,13 +66,6 @@ void enc_free(Encoder **penc) > if (!enc) > return; > > - enc_thread_stop(enc); > - > - av_frame_free(&enc->sq_frame); > - av_frame_free(&enc->sub_frame); > - > - av_packet_free(&enc->pkt); > - > av_freep(penc); > } > > @@ -127,25 +80,12 @@ int enc_alloc(Encoder **penc, const AVCodec *codec, > if (!enc) > return AVERROR(ENOMEM); > > - if (codec->type == AVMEDIA_TYPE_SUBTITLE) { > - enc->sub_frame = av_frame_alloc(); > - if (!enc->sub_frame) > - goto fail; > - } > - > - enc->pkt = av_packet_alloc(); > - if (!enc->pkt) > - goto fail; > - > enc->sch = sch; > enc->sch_idx = sch_idx; > > *penc = enc; > > return 0; > -fail: > - enc_free(&enc); > - return AVERROR(ENOMEM); > } > > static int hw_device_setup_for_encode(OutputStream *ost, AVBufferRef > *frames_ref) > @@ -224,52 +164,9 @@ static int set_encoder_id(OutputFile *of, > OutputStream *ost) > return 0; > } > > -static int enc_thread_start(OutputStream *ost) > -{ > - Encoder *e = ost->enc; > - ObjPool *op; > - int ret = 0; > - > - op = objpool_alloc_frames(); > - if (!op) > - return AVERROR(ENOMEM); > - > - e->queue_in = tq_alloc(1, 1, op, frame_move); > - if (!e->queue_in) { > - objpool_free(&op); > - return AVERROR(ENOMEM); > - } > - > - op = objpool_alloc_packets(); > - if (!op) > - goto fail; > - > - e->queue_out = tq_alloc(1, 4, op, pkt_move); > - if (!e->queue_out) { > - objpool_free(&op); > - goto fail; > - } > - > - ret = pthread_create(&e->thread, NULL, encoder_thread, ost); > - if (ret) { > - ret = AVERROR(ret); > - av_log(ost, AV_LOG_ERROR, "pthread_create() failed: %s\n", > - av_err2str(ret)); > - goto fail; > - } > - > - return 0; > -fail: > - if (ret >= 0) > - ret = AVERROR(ENOMEM); > - > - tq_free(&e->queue_in); > - tq_free(&e->queue_out); > - return ret; > -} > - > -int enc_open(OutputStream *ost, const AVFrame *frame) > +int enc_open(void *opaque, const AVFrame *frame) > { > + OutputStream *ost = opaque; > InputStream *ist = ost->ist; > Encoder *e = ost->enc; > AVCodecContext *enc_ctx = ost->enc_ctx; > @@ -277,6 +174,7 @@ int enc_open(OutputStream *ost, const AVFrame *frame) > const AVCodec *enc = enc_ctx->codec; > OutputFile *of = output_files[ost->file_index]; > FrameData *fd; > + int frame_samples = 0; > int ret; > > if (e->opened) > @@ -420,17 +318,8 @@ int enc_open(OutputStream *ost, const AVFrame *frame) > > e->opened = 1; > > - if (ost->sq_idx_encode >= 0) { > - e->sq_frame = av_frame_alloc(); > - if (!e->sq_frame) > - return AVERROR(ENOMEM); > - } > - > - if (ost->enc_ctx->frame_size) { > - av_assert0(ost->sq_idx_encode >= 0); > - sq_frame_samples(output_files[ost->file_index]->sq_encode, > - ost->sq_idx_encode, ost->enc_ctx->frame_size); > - } > + if (ost->enc_ctx->frame_size) > + frame_samples = ost->enc_ctx->frame_size; > > ret = check_avoptions(ost->encoder_opts); > if (ret < 0) > @@ -476,18 +365,11 @@ int enc_open(OutputStream *ost, const AVFrame *frame) > if (ost->st->time_base.num <= 0 || ost->st->time_base.den <= 0) > ost->st->time_base = av_add_q(ost->enc_ctx->time_base, > (AVRational){0, 1}); > > - ret = enc_thread_start(ost); > - if (ret < 0) { > - av_log(ost, AV_LOG_ERROR, "Error starting encoder thread: %s\n", > - av_err2str(ret)); > - return ret; > - } > - > ret = of_stream_init(of, ost); > if (ret < 0) > return ret; > > - return 0; > + return frame_samples; > } > > static int check_recording_time(OutputStream *ost, int64_t ts, AVRational > tb) > @@ -514,8 +396,7 @@ static int do_subtitle_out(OutputFile *of, > OutputStream *ost, const AVSubtitle * > av_log(ost, AV_LOG_ERROR, "Subtitle packets must have a pts\n"); > return exit_on_error ? AVERROR(EINVAL) : 0; > } > - if (ost->finished || > - (of->start_time != AV_NOPTS_VALUE && sub->pts < of->start_time)) > + if ((of->start_time != AV_NOPTS_VALUE && sub->pts < of->start_time)) > return 0; > > enc = ost->enc_ctx; > @@ -579,7 +460,7 @@ static int do_subtitle_out(OutputFile *of, > OutputStream *ost, const AVSubtitle * > } > pkt->dts = pkt->pts; > > - ret = tq_send(e->queue_out, 0, pkt); > + ret = sch_enc_send(e->sch, e->sch_idx, pkt); > if (ret < 0) { > av_packet_unref(pkt); > return ret; > @@ -671,10 +552,13 @@ static int update_video_stats(OutputStream *ost, > const AVPacket *pkt, int write_ > int64_t frame_number; > double ti1, bitrate, avg_bitrate; > double psnr_val = -1; > + int quality; > > - ost->quality = sd ? AV_RL32(sd) : -1; > + quality = sd ? AV_RL32(sd) : -1; > pict_type = sd ? sd[4] : AV_PICTURE_TYPE_NONE; > > + atomic_store(&ost->quality, quality); > + > if ((enc->flags & AV_CODEC_FLAG_PSNR) && sd && sd[5]) { > // FIXME the scaling assumes 8bit > double error = AV_RL64(sd + 8) / (enc->width * enc->height * > 255.0 * 255.0); > @@ -697,10 +581,10 @@ static int update_video_stats(OutputStream *ost, > const AVPacket *pkt, int write_ > frame_number = e->packets_encoded; > if (vstats_version <= 1) { > fprintf(vstats_file, "frame= %5"PRId64" q= %2.1f ", frame_number, > - ost->quality / (float)FF_QP2LAMBDA); > + quality / (float)FF_QP2LAMBDA); > } else { > fprintf(vstats_file, "out= %2d st= %2d frame= %5"PRId64" q= %2.1f > ", ost->file_index, ost->index, frame_number, > - ost->quality / (float)FF_QP2LAMBDA); > + quality / (float)FF_QP2LAMBDA); > } > > if (psnr_val >= 0) > @@ -801,18 +685,11 @@ static int encode_frame(OutputFile *of, OutputStream > *ost, AVFrame *frame, > av_ts2str(pkt->duration), av_ts2timestr(pkt->duration, > &enc->time_base)); > } > > - if ((ret = trigger_fix_sub_duration_heartbeat(ost, pkt)) < 0) { > - av_log(NULL, AV_LOG_ERROR, > - "Subtitle heartbeat logic failed in %s! (%s)\n", > - __func__, av_err2str(ret)); > - return ret; > - } > - > e->data_size += pkt->size; > > e->packets_encoded++; > > - ret = tq_send(e->queue_out, 0, pkt); > + ret = sch_enc_send(e->sch, e->sch_idx, pkt); > if (ret < 0) { > av_packet_unref(pkt); > return ret; > @@ -822,50 +699,6 @@ static int encode_frame(OutputFile *of, OutputStream > *ost, AVFrame *frame, > av_assert0(0); > } > > -static int submit_encode_frame(OutputFile *of, OutputStream *ost, > - AVFrame *frame, AVPacket *pkt) > -{ > - Encoder *e = ost->enc; > - int ret; > - > - if (ost->sq_idx_encode < 0) > - return encode_frame(of, ost, frame, pkt); > - > - if (frame) { > - ret = av_frame_ref(e->sq_frame, frame); > - if (ret < 0) > - return ret; > - frame = e->sq_frame; > - } > - > - ret = sq_send(of->sq_encode, ost->sq_idx_encode, > - SQFRAME(frame)); > - if (ret < 0) { > - if (frame) > - av_frame_unref(frame); > - if (ret != AVERROR_EOF) > - return ret; > - } > - > - while (1) { > - AVFrame *enc_frame = e->sq_frame; > - > - ret = sq_receive(of->sq_encode, ost->sq_idx_encode, > - SQFRAME(enc_frame)); > - if (ret == AVERROR_EOF) { > - enc_frame = NULL; > - } else if (ret < 0) { > - return (ret == AVERROR(EAGAIN)) ? 0 : ret; > - } > - > - ret = encode_frame(of, ost, enc_frame, pkt); > - if (enc_frame) > - av_frame_unref(enc_frame); > - if (ret < 0) > - return ret; > - } > -} > - > static int do_audio_out(OutputFile *of, OutputStream *ost, > AVFrame *frame, AVPacket *pkt) > { > @@ -881,7 +714,7 @@ static int do_audio_out(OutputFile *of, OutputStream > *ost, > if (!check_recording_time(ost, frame->pts, frame->time_base)) > return AVERROR_EOF; > > - return submit_encode_frame(of, ost, frame, pkt); > + return encode_frame(of, ost, frame, pkt); > } > > static enum AVPictureType forced_kf_apply(void *logctx, KeyframeForceCtx > *kf, > @@ -949,7 +782,7 @@ static int do_video_out(OutputFile *of, OutputStream > *ost, > } > #endif > > - return submit_encode_frame(of, ost, in_picture, pkt); > + return encode_frame(of, ost, in_picture, pkt); > } > > static int frame_encode(OutputStream *ost, AVFrame *frame, AVPacket *pkt) > @@ -958,9 +791,12 @@ static int frame_encode(OutputStream *ost, AVFrame > *frame, AVPacket *pkt) > enum AVMediaType type = ost->type; > > if (type == AVMEDIA_TYPE_SUBTITLE) { > + const AVSubtitle *subtitle = frame && frame->buf[0] ? > + (AVSubtitle*)frame->buf[0]->data : > NULL; > + > // no flushing for subtitles > - return frame ? > - do_subtitle_out(of, ost, (AVSubtitle*)frame->buf[0]->data, > pkt) : 0; > + return subtitle && subtitle->num_rects ? > + do_subtitle_out(of, ost, subtitle, pkt) : 0; > } > > if (frame) { > @@ -968,7 +804,7 @@ static int frame_encode(OutputStream *ost, AVFrame > *frame, AVPacket *pkt) > do_audio_out(of, ost, > frame, pkt); > } > > - return submit_encode_frame(of, ost, NULL, pkt); > + return encode_frame(of, ost, NULL, pkt); > } > > static void enc_thread_set_name(const OutputStream *ost) > @@ -1009,24 +845,50 @@ fail: > void *encoder_thread(void *arg) > { > OutputStream *ost = arg; > - OutputFile *of = output_files[ost->file_index]; > Encoder *e = ost->enc; > EncoderThread et; > int ret = 0, input_status = 0; > + int name_set = 0; > > ret = enc_thread_init(&et); > if (ret < 0) > goto finish; > > - enc_thread_set_name(ost); > + /* Open the subtitle encoders immediately. AVFrame-based encoders > + * are opened through a callback from the scheduler once they get > + * their first frame > + * > + * N.B.: because the callback is called from a different thread, > + * enc_ctx MUST NOT be accessed before sch_enc_receive() returns > + * for the first time for audio/video. */ > + if (ost->type != AVMEDIA_TYPE_VIDEO && ost->type != > AVMEDIA_TYPE_AUDIO) { > + ret = enc_open(ost, NULL); > + if (ret < 0) > + goto finish; > + } > > while (!input_status) { > - int dummy; > - > - input_status = tq_receive(e->queue_in, &dummy, et.frame); > - if (input_status < 0) > + input_status = sch_enc_receive(e->sch, e->sch_idx, et.frame); > + if (input_status == AVERROR_EOF) { > av_log(ost, AV_LOG_VERBOSE, "Encoder thread received EOF\n"); > > + if (!e->opened) { > + av_log(ost, AV_LOG_ERROR, "Could not open encoder before > EOF\n"); > + ret = AVERROR(EINVAL); > + goto finish; > + } > + } else if (input_status < 0) { > + ret = input_status; > + av_log(ost, AV_LOG_ERROR, "Error receiving a frame for > encoding: %s\n", > + av_err2str(ret)); > + goto finish; > + } > + > + if (!name_set) { > + enc_thread_set_name(ost); > + name_set = 1; > + } > + > ret = frame_encode(ost, input_status >= 0 ? et.frame : NULL, > et.pkt); > > av_packet_unref(et.pkt); > @@ -1040,15 +902,6 @@ void *encoder_thread(void *arg) > av_err2str(ret)); > break; > } > - > - // signal to the consumer thread that the frame was encoded > - ret = tq_send(e->queue_out, 0, et.pkt); > - if (ret < 0) { > - if (ret != AVERROR_EOF) > - av_log(ost, AV_LOG_ERROR, > - "Error communicating with the main thread\n"); > - break; > - } > } > > // EOF is normal thread termination > @@ -1056,118 +909,7 @@ void *encoder_thread(void *arg) > ret = 0; > > finish: > - if (ost->sq_idx_encode >= 0) > - sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL)); > - > - tq_receive_finish(e->queue_in, 0); > - tq_send_finish (e->queue_out, 0); > - > enc_thread_uninit(&et); > > - av_log(ost, AV_LOG_VERBOSE, "Terminating encoder thread\n"); > - > return (void*)(intptr_t)ret; > } > - > -int enc_frame(OutputStream *ost, AVFrame *frame) > -{ > - OutputFile *of = output_files[ost->file_index]; > - Encoder *e = ost->enc; > - int ret, thread_ret; > - > - ret = enc_open(ost, frame); > - if (ret < 0) > - return ret; > - > - if (!e->queue_in) > - return AVERROR_EOF; > - > - // send the frame/EOF to the encoder thread > - if (frame) { > - ret = tq_send(e->queue_in, 0, frame); > - if (ret < 0) > - goto finish; > - } else > - tq_send_finish(e->queue_in, 0); > - > - // retrieve all encoded data for the frame > - while (1) { > - int dummy; > - > - ret = tq_receive(e->queue_out, &dummy, e->pkt); > - if (ret < 0) > - break; > - > - // frame fully encoded > - if (!e->pkt->data && !e->pkt->side_data_elems) > - return 0; > - > - // process the encoded packet > - ret = of_output_packet(of, ost, e->pkt); > - if (ret < 0) > - goto finish; > - } > - > -finish: > - thread_ret = enc_thread_stop(e); > - if (thread_ret < 0) { > - av_log(ost, AV_LOG_ERROR, "Encoder thread returned error: %s\n", > - av_err2str(thread_ret)); > - ret = err_merge(ret, thread_ret); > - } > - > - if (ret < 0 && ret != AVERROR_EOF) > - return ret; > - > - // signal EOF to the muxer > - return of_output_packet(of, ost, NULL); > -} > - > -int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub) > -{ > - Encoder *e = ost->enc; > - AVFrame *f = e->sub_frame; > - int ret; > - > - // XXX the queue for transferring data to the encoder thread runs > - // on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put > - // that inside the frame > - // eventually, subtitles should be switched to use AVFrames natively > - ret = subtitle_wrap_frame(f, sub, 1); > - if (ret < 0) > - return ret; > - > - ret = enc_frame(ost, f); > - av_frame_unref(f); > - > - return ret; > -} > - > -int enc_flush(void) > -{ > - int ret = 0; > - > - for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) { > - OutputFile *of = output_files[ost->file_index]; > - if (ost->sq_idx_encode >= 0) > - sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL)); > - } > - > - for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) { > - Encoder *e = ost->enc; > - AVCodecContext *enc = ost->enc_ctx; > - int err; > - > - if (!enc || !e->opened || > - (enc->codec_type != AVMEDIA_TYPE_VIDEO && enc->codec_type != > AVMEDIA_TYPE_AUDIO)) > - continue; > - > - err = enc_frame(ost, NULL); > - if (err != AVERROR_EOF && ret < 0) > - ret = err_merge(ret, err); > - > - av_assert0(!e->queue_in); > - } > - > - return ret; > -} > diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c > index 635b1b0b6e..ada235b084 100644 > --- a/fftools/ffmpeg_filter.c > +++ b/fftools/ffmpeg_filter.c > @@ -21,8 +21,6 @@ > #include <stdint.h> > > #include "ffmpeg.h" > -#include "ffmpeg_utils.h" > -#include "thread_queue.h" > > #include "libavfilter/avfilter.h" > #include "libavfilter/buffersink.h" > @@ -53,10 +51,11 @@ typedef struct FilterGraphPriv { > // true when the filtergraph contains only meta filters > // that do not modify the frame data > int is_meta; > + // source filters are present in the graph > + int have_sources; > int disable_conversions; > > - int nb_inputs_bound; > - int nb_outputs_bound; > + unsigned nb_outputs_done; > > const char *graph_desc; > > @@ -67,41 +66,6 @@ typedef struct FilterGraphPriv { > > Scheduler *sch; > unsigned sch_idx; > - > - pthread_t thread; > - /** > - * Queue for sending frames from the main thread to the filtergraph. > Has > - * nb_inputs+1 streams - the first nb_inputs stream correspond to > - * filtergraph inputs. Frames on those streams may have their opaque > set to > - * - FRAME_OPAQUE_EOF: frame contains no data, but pts+timebase of the > - * EOF event for the correspondint stream. Will be immediately > followed by > - * this stream being send-closed. > - * - FRAME_OPAQUE_SUB_HEARTBEAT: frame contains no data, but > pts+timebase of > - * a subtitle heartbeat event. Will only be sent for sub2video > streams. > - * > - * The last stream is "control" - the main thread sends empty > AVFrames with > - * opaque set to > - * - FRAME_OPAQUE_REAP_FILTERS: a request to retrieve all frame > available > - * from filtergraph outputs. These frames are sent to corresponding > - * streams in queue_out. Finally an empty frame is sent to the > control > - * stream in queue_out. > - * - FRAME_OPAQUE_CHOOSE_INPUT: same as above, but in case no frames > are > - * available the terminating empty frame's opaque will contain the > index+1 > - * of the filtergraph input to which more input frames should be > supplied. > - */ > - ThreadQueue *queue_in; > - /** > - * Queue for sending frames from the filtergraph back to the main > thread. > - * Has nb_outputs+1 streams - the first nb_outputs stream correspond > to > - * filtergraph outputs. > - * > - * The last stream is "control" - see documentation for queue_in for > more > - * details. > - */ > - ThreadQueue *queue_out; > - // submitting frames to filter thread returned EOF > - // this only happens on thread exit, so is not per-input > - int eof_in; > } FilterGraphPriv; > > static FilterGraphPriv *fgp_from_fg(FilterGraph *fg) > @@ -123,6 +87,9 @@ typedef struct FilterGraphThread { > // The output index is stored in frame opaque. > AVFifo *frame_queue_out; > > + // index of the next input to request from the scheduler > + unsigned next_in; > + // set to 1 after at least one frame passed through this output > int got_frame; > > // EOF status of each input/output, as received by the thread > @@ -253,9 +220,6 @@ typedef struct OutputFilterPriv { > int64_t ts_offset; > int64_t next_pts; > FPSConvContext fps; > - > - // set to 1 after at least one frame passed through this output > - int got_frame; > } OutputFilterPriv; > > static OutputFilterPriv *ofp_from_ofilter(OutputFilter *ofilter) > @@ -653,57 +617,6 @@ static int ifilter_has_all_input_formats(FilterGraph > *fg) > > static void *filter_thread(void *arg); > > -// start the filtering thread once all inputs and outputs are bound > -static int fg_thread_try_start(FilterGraphPriv *fgp) > -{ > - FilterGraph *fg = &fgp->fg; > - ObjPool *op; > - int ret = 0; > - > - if (fgp->nb_inputs_bound < fg->nb_inputs || > - fgp->nb_outputs_bound < fg->nb_outputs) > - return 0; > - > - op = objpool_alloc_frames(); > - if (!op) > - return AVERROR(ENOMEM); > - > - fgp->queue_in = tq_alloc(fg->nb_inputs + 1, 1, op, frame_move); > - if (!fgp->queue_in) { > - objpool_free(&op); > - return AVERROR(ENOMEM); > - } > - > - // at least one output is mandatory > - op = objpool_alloc_frames(); > - if (!op) > - goto fail; > - > - fgp->queue_out = tq_alloc(fg->nb_outputs + 1, 1, op, frame_move); > - if (!fgp->queue_out) { > - objpool_free(&op); > - goto fail; > - } > - > - ret = pthread_create(&fgp->thread, NULL, filter_thread, fgp); > - if (ret) { > - ret = AVERROR(ret); > - av_log(NULL, AV_LOG_ERROR, "pthread_create() for filtergraph %d > failed: %s\n", > - fg->index, av_err2str(ret)); > - goto fail; > - } > - > - return 0; > -fail: > - if (ret >= 0) > - ret = AVERROR(ENOMEM); > - > - tq_free(&fgp->queue_in); > - tq_free(&fgp->queue_out); > - > - return ret; > -} > - > static char *describe_filter_link(FilterGraph *fg, AVFilterInOut *inout, > int in) > { > AVFilterContext *ctx = inout->filter_ctx; > @@ -729,7 +642,6 @@ static OutputFilter *ofilter_alloc(FilterGraph *fg) > ofilter->graph = fg; > ofp->format = -1; > ofp->index = fg->nb_outputs - 1; > - ofilter->last_pts = AV_NOPTS_VALUE; > > return ofilter; > } > @@ -760,10 +672,7 @@ static int ifilter_bind_ist(InputFilter *ifilter, > InputStream *ist) > return AVERROR(ENOMEM); > } > > - fgp->nb_inputs_bound++; > - av_assert0(fgp->nb_inputs_bound <= ifilter->graph->nb_inputs); > - > - return fg_thread_try_start(fgp); > + return 0; > } > > static int set_channel_layout(OutputFilterPriv *f, OutputStream *ost) > @@ -902,10 +811,7 @@ int ofilter_bind_ost(OutputFilter *ofilter, > OutputStream *ost, > if (ret < 0) > return ret; > > - fgp->nb_outputs_bound++; > - av_assert0(fgp->nb_outputs_bound <= fg->nb_outputs); > - > - return fg_thread_try_start(fgp); > + return 0; > } > > static InputFilter *ifilter_alloc(FilterGraph *fg) > @@ -935,34 +841,6 @@ static InputFilter *ifilter_alloc(FilterGraph *fg) > return ifilter; > } > > -static int fg_thread_stop(FilterGraphPriv *fgp) > -{ > - void *ret; > - > - if (!fgp->queue_in) > - return 0; > - > - for (int i = 0; i <= fgp->fg.nb_inputs; i++) { > - InputFilterPriv *ifp = i < fgp->fg.nb_inputs ? > - ifp_from_ifilter(fgp->fg.inputs[i]) : NULL; > - > - if (ifp) > - ifp->eof = 1; > - > - tq_send_finish(fgp->queue_in, i); > - } > - > - for (int i = 0; i <= fgp->fg.nb_outputs; i++) > - tq_receive_finish(fgp->queue_out, i); > - > - pthread_join(fgp->thread, &ret); > - > - tq_free(&fgp->queue_in); > - tq_free(&fgp->queue_out); > - > - return (int)(intptr_t)ret; > -} > - > void fg_free(FilterGraph **pfg) > { > FilterGraph *fg = *pfg; > @@ -972,8 +850,6 @@ void fg_free(FilterGraph **pfg) > return; > fgp = fgp_from_fg(fg); > > - fg_thread_stop(fgp); > - > avfilter_graph_free(&fg->graph); > for (int j = 0; j < fg->nb_inputs; j++) { > InputFilter *ifilter = fg->inputs[j]; > @@ -1072,6 +948,15 @@ int fg_create(FilterGraph **pfg, char *graph_desc, > Scheduler *sch) > if (ret < 0) > goto fail; > > + for (unsigned i = 0; i < graph->nb_filters; i++) { > + const AVFilter *f = graph->filters[i]->filter; > + if (!avfilter_filter_pad_count(f, 0) && > + !(f->flags & AVFILTER_FLAG_DYNAMIC_INPUTS)) { > + fgp->have_sources = 1; > + break; > + } > + } > + > for (AVFilterInOut *cur = inputs; cur; cur = cur->next) { > InputFilter *const ifilter = ifilter_alloc(fg); > InputFilterPriv *ifp; > @@ -1800,6 +1685,7 @@ static int configure_filtergraph(FilterGraph *fg, > const FilterGraphThread *fgt) > AVBufferRef *hw_device; > AVFilterInOut *inputs, *outputs, *cur; > int ret, i, simple = filtergraph_is_simple(fg); > + int have_input_eof = 0; > const char *graph_desc = fgp->graph_desc; > > cleanup_filtergraph(fg); > @@ -1922,11 +1808,18 @@ static int configure_filtergraph(FilterGraph *fg, > const FilterGraphThread *fgt) > ret = av_buffersrc_add_frame(ifp->filter, NULL); > if (ret < 0) > goto fail; > + have_input_eof = 1; > } > } > > - return 0; > + if (have_input_eof) { > + // make sure the EOF propagates to the end of the graph > + ret = avfilter_graph_request_oldest(fg->graph); > + if (ret < 0 && ret != AVERROR(EAGAIN) && ret != AVERROR_EOF) > + goto fail; > + } > > + return 0; > fail: > cleanup_filtergraph(fg); > return ret; > @@ -2182,7 +2075,7 @@ static void video_sync_process(OutputFilterPriv > *ofp, AVFrame *frame, > fps->frames_prev_hist[2]); > > if (!*nb_frames && fps->last_dropped) { > - ofilter->nb_frames_drop++; > + atomic_fetch_add(&ofilter->nb_frames_drop, 1); > fps->last_dropped++; > } > > @@ -2260,21 +2153,23 @@ finish: > fps->frames_prev_hist[0] = *nb_frames_prev; > > if (*nb_frames_prev == 0 && fps->last_dropped) { > - ofilter->nb_frames_drop++; > + atomic_fetch_add(&ofilter->nb_frames_drop, 1); > av_log(ost, AV_LOG_VERBOSE, > "*** dropping frame %"PRId64" at ts %"PRId64"\n", > fps->frame_number, fps->last_frame->pts); > } > if (*nb_frames > (*nb_frames_prev && fps->last_dropped) + (*nb_frames > > *nb_frames_prev)) { > + uint64_t nb_frames_dup; > if (*nb_frames > dts_error_threshold * 30) { > av_log(ost, AV_LOG_ERROR, "%"PRId64" frame duplication too > large, skipping\n", *nb_frames - 1); > - ofilter->nb_frames_drop++; > + atomic_fetch_add(&ofilter->nb_frames_drop, 1); > *nb_frames = 0; > return; > } > - ofilter->nb_frames_dup += *nb_frames - (*nb_frames_prev && > fps->last_dropped) - (*nb_frames > *nb_frames_prev); > + nb_frames_dup = atomic_fetch_add(&ofilter->nb_frames_dup, > + *nb_frames - (*nb_frames_prev && > fps->last_dropped) - (*nb_frames > *nb_frames_prev)); > av_log(ost, AV_LOG_VERBOSE, "*** %"PRId64" dup!\n", *nb_frames - > 1); > - if (ofilter->nb_frames_dup > fps->dup_warning) { > + if (nb_frames_dup > fps->dup_warning) { > av_log(ost, AV_LOG_WARNING, "More than %"PRIu64" frames > duplicated\n", fps->dup_warning); > fps->dup_warning *= 10; > } > @@ -2284,8 +2179,57 @@ finish: > fps->dropped_keyframe |= fps->last_dropped && (frame->flags & > AV_FRAME_FLAG_KEY); > } > > +static int close_output(OutputFilterPriv *ofp, FilterGraphThread *fgt) > +{ > + FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph); > + int ret; > + > + // we are finished and no frames were ever seen at this output, > + // at least initialize the encoder with a dummy frame > + if (!fgt->got_frame) { > + AVFrame *frame = fgt->frame; > + FrameData *fd; > + > + frame->time_base = ofp->tb_out; > + frame->format = ofp->format; > + > + frame->width = ofp->width; > + frame->height = ofp->height; > + frame->sample_aspect_ratio = ofp->sample_aspect_ratio; > + > + frame->sample_rate = ofp->sample_rate; > + if (ofp->ch_layout.nb_channels) { > + ret = av_channel_layout_copy(&frame->ch_layout, > &ofp->ch_layout); > + if (ret < 0) > + return ret; > + } > + > + fd = frame_data(frame); > + if (!fd) > + return AVERROR(ENOMEM); > + > + fd->frame_rate_filter = ofp->fps.framerate; > + > + av_assert0(!frame->buf[0]); > + > + av_log(ofp->ofilter.ost, AV_LOG_WARNING, > + "No filtered frames for output stream, trying to " > + "initialize anyway.\n"); > + > + ret = sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, frame); > + if (ret < 0) { > + av_frame_unref(frame); > + return ret; > + } > + } > + > + fgt->eof_out[ofp->index] = 1; > + > + return sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, NULL); > +} > + > static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt, > - AVFrame *frame, int buffer) > + AVFrame *frame) > { > FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph); > AVFrame *frame_prev = ofp->fps.last_frame; > @@ -2332,28 +2276,17 @@ static int fg_output_frame(OutputFilterPriv *ofp, > FilterGraphThread *fgt, > frame_out = frame; > } > > - if (buffer) { > - AVFrame *f = av_frame_alloc(); > - > - if (!f) { > - av_frame_unref(frame_out); > - return AVERROR(ENOMEM); > - } > - > - av_frame_move_ref(f, frame_out); > - f->opaque = (void*)(intptr_t)ofp->index; > - > - ret = av_fifo_write(fgt->frame_queue_out, &f, 1); > - if (ret < 0) { > - av_frame_free(&f); > - return AVERROR(ENOMEM); > - } > - } else { > - // return the frame to the main thread > - ret = tq_send(fgp->queue_out, ofp->index, frame_out); > + { > + // send the frame to consumers > + ret = sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, > frame_out); > if (ret < 0) { > av_frame_unref(frame_out); > - fgt->eof_out[ofp->index] = 1; > + > + if (!fgt->eof_out[ofp->index]) { > + fgt->eof_out[ofp->index] = 1; > + fgp->nb_outputs_done++; > + } > + > return ret == AVERROR_EOF ? 0 : ret; > } > } > @@ -2374,16 +2307,14 @@ static int fg_output_frame(OutputFilterPriv *ofp, > FilterGraphThread *fgt, > av_frame_move_ref(frame_prev, frame); > } > > - if (!frame) { > - tq_send_finish(fgp->queue_out, ofp->index); > - fgt->eof_out[ofp->index] = 1; > - } > + if (!frame) > + return close_output(ofp, fgt); > > return 0; > } > > static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt, > - AVFrame *frame, int buffer) > + AVFrame *frame) > { > FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph); > OutputStream *ost = ofp->ofilter.ost; > @@ -2393,8 +2324,8 @@ static int fg_output_step(OutputFilterPriv *ofp, > FilterGraphThread *fgt, > > ret = av_buffersink_get_frame_flags(filter, frame, > AV_BUFFERSINK_FLAG_NO_REQUEST); > - if (ret == AVERROR_EOF && !buffer && !fgt->eof_out[ofp->index]) { > - ret = fg_output_frame(ofp, fgt, NULL, buffer); > + if (ret == AVERROR_EOF && !fgt->eof_out[ofp->index]) { > + ret = fg_output_frame(ofp, fgt, NULL); > return (ret < 0) ? ret : 1; > } else if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) { > return 1; > @@ -2448,7 +2379,7 @@ static int fg_output_step(OutputFilterPriv *ofp, > FilterGraphThread *fgt, > fd->frame_rate_filter = ofp->fps.framerate; > } > > - ret = fg_output_frame(ofp, fgt, frame, buffer); > + ret = fg_output_frame(ofp, fgt, frame); > av_frame_unref(frame); > if (ret < 0) > return ret; > @@ -2456,44 +2387,68 @@ static int fg_output_step(OutputFilterPriv *ofp, > FilterGraphThread *fgt, > return 0; > } > > -/* retrieve all frames available at filtergraph outputs and either send > them to > - * the main thread (buffer=0) or buffer them for later (buffer=1) */ > +/* retrieve all frames available at filtergraph outputs > + * and send them to consumers */ > static int read_frames(FilterGraph *fg, FilterGraphThread *fgt, > - AVFrame *frame, int buffer) > + AVFrame *frame) > { > FilterGraphPriv *fgp = fgp_from_fg(fg); > - int ret = 0; > + int did_step = 0; > > - if (!fg->graph) > - return 0; > - > - // process buffered frames > - if (!buffer) { > - AVFrame *f; > - > - while (av_fifo_read(fgt->frame_queue_out, &f, 1) >= 0) { > - int out_idx = (intptr_t)f->opaque; > - f->opaque = NULL; > - ret = tq_send(fgp->queue_out, out_idx, f); > - av_frame_free(&f); > - if (ret < 0 && ret != AVERROR_EOF) > - return ret; > + // graph not configured, just select the input to request > + if (!fg->graph) { > + for (int i = 0; i < fg->nb_inputs; i++) { > + InputFilterPriv *ifp = ifp_from_ifilter(fg->inputs[i]); > + if (ifp->format < 0 && !fgt->eof_in[i]) { > + fgt->next_in = i; > + return 0; > + } > } > + > + // This state - graph is not configured, but all inputs are either > + // initialized or EOF - should be unreachable because sending EOF > to a > + // filter without even a fallback format should fail > + av_assert0(0); > + return AVERROR_BUG; > } > > - /* Reap all buffers present in the buffer sinks */ > - for (int i = 0; i < fg->nb_outputs; i++) { > - OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]); > - int ret = 0; > + while (fgp->nb_outputs_done < fg->nb_outputs) { > + int ret; > > - while (!ret) { > - ret = fg_output_step(ofp, fgt, frame, buffer); > - if (ret < 0) > - return ret; > + ret = avfilter_graph_request_oldest(fg->graph); > + if (ret == AVERROR(EAGAIN)) { > + fgt->next_in = choose_input(fg, fgt); > + break; > + } else if (ret < 0) { > + if (ret == AVERROR_EOF) > + av_log(fg, AV_LOG_VERBOSE, "Filtergraph returned EOF, > finishing\n"); > + else > + av_log(fg, AV_LOG_ERROR, > + "Error requesting a frame from the filtergraph: > %s\n", > + av_err2str(ret)); > + return ret; > } > - } > + fgt->next_in = fg->nb_inputs; > > - return 0; > + // return after one iteration, so that scheduler can rate-control > us > + if (did_step && fgp->have_sources) > + return 0; > + > + /* Reap all buffers present in the buffer sinks */ > + for (int i = 0; i < fg->nb_outputs; i++) { > + OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]); > + > + ret = 0; > + while (!ret) { > + ret = fg_output_step(ofp, fgt, frame); > + if (ret < 0) > + return ret; > + } > + } > + did_step = 1; > + }; > + > + return (fgp->nb_outputs_done == fg->nb_outputs) ? AVERROR_EOF : 0; > } > > static void sub2video_heartbeat(InputFilter *ifilter, int64_t pts, > AVRational tb) > @@ -2571,6 +2526,9 @@ static int send_eof(FilterGraphThread *fgt, > InputFilter *ifilter, > InputFilterPriv *ifp = ifp_from_ifilter(ifilter); > int ret; > > + if (fgt->eof_in[ifp->index]) > + return 0; > + > fgt->eof_in[ifp->index] = 1; > > if (ifp->filter) { > @@ -2672,7 +2630,7 @@ static int send_frame(FilterGraph *fg, > FilterGraphThread *fgt, > return ret; > } > > - ret = fg->graph ? read_frames(fg, fgt, tmp, 1) : 0; > + ret = fg->graph ? read_frames(fg, fgt, tmp) : 0; > av_frame_free(&tmp); > if (ret < 0) > return ret; > @@ -2705,82 +2663,6 @@ static int send_frame(FilterGraph *fg, > FilterGraphThread *fgt, > return 0; > } > > -static int msg_process(FilterGraphPriv *fgp, FilterGraphThread *fgt, > - AVFrame *frame) > -{ > - const enum FrameOpaque msg = (intptr_t)frame->opaque; > - FilterGraph *fg = &fgp->fg; > - int graph_eof = 0; > - int ret; > - > - frame->opaque = NULL; > - av_assert0(msg > 0); > - av_assert0(msg == FRAME_OPAQUE_SEND_COMMAND || !frame->buf[0]); > - > - if (!fg->graph) { > - // graph not configured yet, ignore all messages other than > choosing > - // the input to read from > - if (msg != FRAME_OPAQUE_CHOOSE_INPUT) { > - av_frame_unref(frame); > - goto done; > - } > - > - for (int i = 0; i < fg->nb_inputs; i++) { > - InputFilter *ifilter = fg->inputs[i]; > - InputFilterPriv *ifp = ifp_from_ifilter(ifilter); > - if (ifp->format < 0 && !fgt->eof_in[i]) { > - frame->opaque = (void*)(intptr_t)(i + 1); > - goto done; > - } > - } > - > - // This state - graph is not configured, but all inputs are either > - // initialized or EOF - should be unreachable because sending EOF > to a > - // filter without even a fallback format should fail > - av_assert0(0); > - return AVERROR_BUG; > - } > - > - if (msg == FRAME_OPAQUE_SEND_COMMAND) { > - FilterCommand *fc = (FilterCommand*)frame->buf[0]->data; > - send_command(fg, fc->time, fc->target, fc->command, fc->arg, > fc->all_filters); > - av_frame_unref(frame); > - goto done; > - } > - > - if (msg == FRAME_OPAQUE_CHOOSE_INPUT) { > - ret = avfilter_graph_request_oldest(fg->graph); > - > - graph_eof = ret == AVERROR_EOF; > - > - if (ret == AVERROR(EAGAIN)) { > - frame->opaque = (void*)(intptr_t)(choose_input(fg, fgt) + 1); > - goto done; > - } else if (ret < 0 && !graph_eof) > - return ret; > - } > - > - ret = read_frames(fg, fgt, frame, 0); > - if (ret < 0) { > - av_log(fg, AV_LOG_ERROR, "Error sending filtered frames for > encoding\n"); > - return ret; > - } > - > - if (graph_eof) > - return AVERROR_EOF; > - > - // signal to the main thread that we are done processing the message > -done: > - ret = tq_send(fgp->queue_out, fg->nb_outputs, frame); > - if (ret < 0) { > - if (ret != AVERROR_EOF) > - av_log(fg, AV_LOG_ERROR, "Error communicating with the main > thread\n"); > - return ret; > - } > - > - return 0; > -} > - > static void fg_thread_set_name(const FilterGraph *fg) > { > char name[16]; > @@ -2867,294 +2749,94 @@ static void *filter_thread(void *arg) > InputFilter *ifilter; > InputFilterPriv *ifp; > enum FrameOpaque o; > - int input_idx, eof_frame; > + unsigned input_idx = fgt.next_in; > > - input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame); > - if (input_idx < 0 || > - (input_idx == fg->nb_inputs && input_status < 0)) { > + input_status = sch_filter_receive(fgp->sch, fgp->sch_idx, > + &input_idx, fgt.frame); > + if (input_status == AVERROR_EOF) { > av_log(fg, AV_LOG_VERBOSE, "Filtering thread received EOF\n"); > break; > + } else if (input_status == AVERROR(EAGAIN)) { > + // should only happen when we didn't request any input > + av_assert0(input_idx == fg->nb_inputs); > + goto read_frames; > } > + av_assert0(input_status >= 0); > + > + o = (intptr_t)fgt.frame->opaque; > > o = (intptr_t)fgt.frame->opaque; > > // message on the control stream > if (input_idx == fg->nb_inputs) { > - ret = msg_process(fgp, &fgt, fgt.frame); > - if (ret < 0) > - goto finish; > + FilterCommand *fc; > > + av_assert0(o == FRAME_OPAQUE_SEND_COMMAND && > fgt.frame->buf[0]); > + > + fc = (FilterCommand*)fgt.frame->buf[0]->data; > + send_command(fg, fc->time, fc->target, fc->command, fc->arg, > + fc->all_filters); > + av_frame_unref(fgt.frame); > continue; > } > > // we received an input frame or EOF > ifilter = fg->inputs[input_idx]; > ifp = ifp_from_ifilter(ifilter); > - eof_frame = input_status >= 0 && o == FRAME_OPAQUE_EOF; > + > if (ifp->type_src == AVMEDIA_TYPE_SUBTITLE) { > int hb_frame = input_status >= 0 && o == > FRAME_OPAQUE_SUB_HEARTBEAT; > ret = sub2video_frame(ifilter, (fgt.frame->buf[0] || > hb_frame) ? fgt.frame : NULL); > - } else if (input_status >= 0 && fgt.frame->buf[0]) { > + } else if (fgt.frame->buf[0]) { > ret = send_frame(fg, &fgt, ifilter, fgt.frame); > } else { > - int64_t pts = input_status >= 0 ? fgt.frame->pts : > AV_NOPTS_VALUE; > - AVRational tb = input_status >= 0 ? fgt.frame->time_base : > (AVRational){ 1, 1 }; > - ret = send_eof(&fgt, ifilter, pts, tb); > + av_assert1(o == FRAME_OPAQUE_EOF); > + ret = send_eof(&fgt, ifilter, fgt.frame->pts, > fgt.frame->time_base); > } > av_frame_unref(fgt.frame); > if (ret < 0) > + goto finish; > + > +read_frames: > + // retrieve all newly avalable frames > + ret = read_frames(fg, &fgt, fgt.frame); > + if (ret == AVERROR_EOF) { > + av_log(fg, AV_LOG_VERBOSE, "All consumers returned EOF\n"); > break; > - > - if (eof_frame) { > - // an EOF frame is immediately followed by sender closing > - // the corresponding stream, so retrieve that event > - input_status = tq_receive(fgp->queue_in, &input_idx, > fgt.frame); > - av_assert0(input_status == AVERROR_EOF && input_idx == > ifp->index); > - } > - > - // signal to the main thread that we are done > - ret = tq_send(fgp->queue_out, fg->nb_outputs, fgt.frame); > - if (ret < 0) { > - if (ret == AVERROR_EOF) > - break; > - > - av_log(fg, AV_LOG_ERROR, "Error communicating with the main > thread\n"); > + } else if (ret < 0) { > + av_log(fg, AV_LOG_ERROR, "Error sending frames to consumers: > %s\n", > + av_err2str(ret)); > goto finish; > } > } > > + for (unsigned i = 0; i < fg->nb_outputs; i++) { > + OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]); > + > + if (fgt.eof_out[i]) > + continue; > + > + ret = fg_output_frame(ofp, &fgt, NULL); > + if (ret < 0) > + goto finish; > + } > + > finish: > // EOF is normal termination > if (ret == AVERROR_EOF) > ret = 0; > > - for (int i = 0; i <= fg->nb_inputs; i++) > - tq_receive_finish(fgp->queue_in, i); > - for (int i = 0; i <= fg->nb_outputs; i++) > - tq_send_finish(fgp->queue_out, i); > - > fg_thread_uninit(&fgt); > > - av_log(fg, AV_LOG_VERBOSE, "Terminating filtering thread\n"); > - > return (void*)(intptr_t)ret; > } > > -static int thread_send_frame(FilterGraphPriv *fgp, InputFilter *ifilter, > - AVFrame *frame, enum FrameOpaque type) > -{ > - InputFilterPriv *ifp = ifp_from_ifilter(ifilter); > - int output_idx, ret; > - > - if (ifp->eof) { > - av_frame_unref(frame); > - return AVERROR_EOF; > - } > - > - frame->opaque = (void*)(intptr_t)type; > - > - ret = tq_send(fgp->queue_in, ifp->index, frame); > - if (ret < 0) { > - ifp->eof = 1; > - av_frame_unref(frame); > - return ret; > - } > - > - if (type == FRAME_OPAQUE_EOF) > - tq_send_finish(fgp->queue_in, ifp->index); > - > - // wait for the frame to be processed > - ret = tq_receive(fgp->queue_out, &output_idx, frame); > - av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF); > - > - return ret; > -} > - > -int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int > keep_reference) > -{ > - FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph); > - int ret; > - > - if (keep_reference) { > - ret = av_frame_ref(fgp->frame, frame); > - if (ret < 0) > - return ret; > - } else > - av_frame_move_ref(fgp->frame, frame); > - > - return thread_send_frame(fgp, ifilter, fgp->frame, 0); > -} > - > -int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb) > -{ > - FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph); > - int ret; > - > - fgp->frame->pts = pts; > - fgp->frame->time_base = tb; > - > - ret = thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_EOF); > - > - return ret == AVERROR_EOF ? 0 : ret; > -} > - > -void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, > AVRational tb) > -{ > - FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph); > - > - fgp->frame->pts = pts; > - fgp->frame->time_base = tb; > - > - thread_send_frame(fgp, ifilter, fgp->frame, > FRAME_OPAQUE_SUB_HEARTBEAT); > -} > - > -int fg_transcode_step(FilterGraph *graph, InputStream **best_ist) > -{ > - FilterGraphPriv *fgp = fgp_from_fg(graph); > - int ret, got_frames = 0; > - > - if (fgp->eof_in) > - return AVERROR_EOF; > - > - // signal to the filtering thread to return all frames it can > - av_assert0(!fgp->frame->buf[0]); > - fgp->frame->opaque = (void*)(intptr_t)(best_ist ? > - FRAME_OPAQUE_CHOOSE_INPUT : > - FRAME_OPAQUE_REAP_FILTERS); > - > - ret = tq_send(fgp->queue_in, graph->nb_inputs, fgp->frame); > - if (ret < 0) { > - fgp->eof_in = 1; > - goto finish; > - } > - > - while (1) { > - OutputFilter *ofilter; > - OutputFilterPriv *ofp; > - OutputStream *ost; > - int output_idx; > - > - ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame); > - > - // EOF on the whole queue or the control stream > - if (output_idx < 0 || > - (ret < 0 && output_idx == graph->nb_outputs)) > - goto finish; > - > - // EOF for a specific stream > - if (ret < 0) { > - ofilter = graph->outputs[output_idx]; > - ofp = ofp_from_ofilter(ofilter); > - > - // we are finished and no frames were ever seen at this > output, > - // at least initialize the encoder with a dummy frame > - if (!ofp->got_frame) { > - AVFrame *frame = fgp->frame; > - FrameData *fd; > - > - frame->time_base = ofp->tb_out; > - frame->format = ofp->format; > - > - frame->width = ofp->width; > - frame->height = ofp->height; > - frame->sample_aspect_ratio = ofp->sample_aspect_ratio; > - > - frame->sample_rate = ofp->sample_rate; > - if (ofp->ch_layout.nb_channels) { > - ret = av_channel_layout_copy(&frame->ch_layout, > &ofp->ch_layout); > - if (ret < 0) > - return ret; > - } > - > - fd = frame_data(frame); > - if (!fd) > - return AVERROR(ENOMEM); > - > - fd->frame_rate_filter = ofp->fps.framerate; > - > - av_assert0(!frame->buf[0]); > - > - av_log(ofilter->ost, AV_LOG_WARNING, > - "No filtered frames for output stream, trying to " > - "initialize anyway.\n"); > - > - enc_open(ofilter->ost, frame); > - av_frame_unref(frame); > - } > - > - close_output_stream(graph->outputs[output_idx]->ost); > - continue; > - } > - > - // request was fully processed by the filtering thread, > - // return the input stream to read from, if needed > - if (output_idx == graph->nb_outputs) { > - int input_idx = (intptr_t)fgp->frame->opaque - 1; > - av_assert0(input_idx <= graph->nb_inputs); > - > - if (best_ist) { > - *best_ist = (input_idx >= 0 && input_idx < > graph->nb_inputs) ? > - > ifp_from_ifilter(graph->inputs[input_idx])->ist : NULL; > - > - if (input_idx < 0 && !got_frames) { > - for (int i = 0; i < graph->nb_outputs; i++) > - graph->outputs[i]->ost->unavailable = 1; > - } > - } > - break; > - } > - > - // got a frame from the filtering thread, send it for encoding > - ofilter = graph->outputs[output_idx]; > - ost = ofilter->ost; > - ofp = ofp_from_ofilter(ofilter); > - > - if (ost->finished) { > - av_frame_unref(fgp->frame); > - tq_receive_finish(fgp->queue_out, output_idx); > - continue; > - } > - > - if (fgp->frame->pts != AV_NOPTS_VALUE) { > - ofilter->last_pts = av_rescale_q(fgp->frame->pts, > - fgp->frame->time_base, > - AV_TIME_BASE_Q); > - } > - > - ret = enc_frame(ost, fgp->frame); > - av_frame_unref(fgp->frame); > - if (ret < 0) > - goto finish; > - > - ofp->got_frame = 1; > - got_frames = 1; > - } > - > -finish: > - if (ret < 0) { > - fgp->eof_in = 1; > - for (int i = 0; i < graph->nb_outputs; i++) > - close_output_stream(graph->outputs[i]->ost); > - } > - > - return ret; > -} > - > -int reap_filters(FilterGraph *fg, int flush) > -{ > - return fg_transcode_step(fg, NULL); > -} > - > void fg_send_command(FilterGraph *fg, double time, const char *target, > const char *command, const char *arg, int > all_filters) > { > FilterGraphPriv *fgp = fgp_from_fg(fg); > AVBufferRef *buf; > FilterCommand *fc; > - int output_idx, ret; > - > - if (!fgp->queue_in) > - return; > > fc = av_mallocz(sizeof(*fc)); > if (!fc) > @@ -3180,13 +2862,5 @@ void fg_send_command(FilterGraph *fg, double time, > const char *target, > fgp->frame->buf[0] = buf; > fgp->frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_SEND_COMMAND; > > - ret = tq_send(fgp->queue_in, fg->nb_inputs, fgp->frame); > - if (ret < 0) { > - av_frame_unref(fgp->frame); > - return; > - } > - > - // wait for the frame to be processed > - ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame); > - av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF); > + sch_filter_command(fgp->sch, fgp->sch_idx, fgp->frame); > } > diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c > index ef5c2f60e0..067dc65d4e 100644 > --- a/fftools/ffmpeg_mux.c > +++ b/fftools/ffmpeg_mux.c > @@ -23,16 +23,13 @@ > #include "ffmpeg.h" > #include "ffmpeg_mux.h" > #include "ffmpeg_utils.h" > -#include "objpool.h" > #include "sync_queue.h" > -#include "thread_queue.h" > > #include "libavutil/fifo.h" > #include "libavutil/intreadwrite.h" > #include "libavutil/log.h" > #include "libavutil/mem.h" > #include "libavutil/timestamp.h" > -#include "libavutil/thread.h" > > #include "libavcodec/packet.h" > > @@ -41,10 +38,9 @@ > > typedef struct MuxThreadContext { > AVPacket *pkt; > + AVPacket *fix_sub_duration_pkt; > } MuxThreadContext; > > -int want_sdp = 1; > - > static Muxer *mux_from_of(OutputFile *of) > { > return (Muxer*)of; > @@ -207,14 +203,41 @@ static int sync_queue_process(Muxer *mux, > OutputStream *ost, AVPacket *pkt, int > return 0; > } > > +static int of_streamcopy(OutputStream *ost, AVPacket *pkt); > + > /* apply the output bitstream filters */ > -static int mux_packet_filter(Muxer *mux, OutputStream *ost, > - AVPacket *pkt, int *stream_eof) > +static int mux_packet_filter(Muxer *mux, MuxThreadContext *mt, > + OutputStream *ost, AVPacket *pkt, int > *stream_eof) > { > MuxStream *ms = ms_from_ost(ost); > const char *err_msg; > int ret = 0; > > + if (pkt && !ost->enc) { > + ret = of_streamcopy(ost, pkt); > + if (ret == AVERROR(EAGAIN)) > + return 0; > + else if (ret == AVERROR_EOF) { > + av_packet_unref(pkt); > + pkt = NULL; > + ret = 0; > + } else if (ret < 0) > + goto fail; > + } > + > + // emit heartbeat for -fix_sub_duration; > + // we are only interested in heartbeats on on random access points. > + if (pkt && (pkt->flags & AV_PKT_FLAG_KEY)) { > + mt->fix_sub_duration_pkt->opaque = > (void*)(intptr_t)PKT_OPAQUE_FIX_SUB_DURATION; > + mt->fix_sub_duration_pkt->pts = pkt->pts; > + mt->fix_sub_duration_pkt->time_base = pkt->time_base; > + > + ret = sch_mux_sub_heartbeat(mux->sch, mux->sch_idx, ms->sch_idx, > + mt->fix_sub_duration_pkt); > + if (ret < 0) > + goto fail; > + } > + > if (ms->bsf_ctx) { > int bsf_eof = 0; > > @@ -278,6 +301,7 @@ static void thread_set_name(OutputFile *of) > static void mux_thread_uninit(MuxThreadContext *mt) > { > av_packet_free(&mt->pkt); > + av_packet_free(&mt->fix_sub_duration_pkt); > > memset(mt, 0, sizeof(*mt)); > } > @@ -290,6 +314,10 @@ static int mux_thread_init(MuxThreadContext *mt) > if (!mt->pkt) > goto fail; > > + mt->fix_sub_duration_pkt = av_packet_alloc(); > + if (!mt->fix_sub_duration_pkt) > + goto fail; > + > return 0; > > fail: > @@ -316,19 +344,22 @@ void *muxer_thread(void *arg) > OutputStream *ost; > int stream_idx, stream_eof = 0; > > - ret = tq_receive(mux->tq, &stream_idx, mt.pkt); > + ret = sch_mux_receive(mux->sch, of->index, mt.pkt); > + stream_idx = mt.pkt->stream_index; > if (stream_idx < 0) { > av_log(mux, AV_LOG_VERBOSE, "All streams finished\n"); > ret = 0; > break; > } > > - ost = of->streams[stream_idx]; > - ret = mux_packet_filter(mux, ost, ret < 0 ? NULL : mt.pkt, > &stream_eof); > + ost = of->streams[mux->sch_stream_idx[stream_idx]]; > + mt.pkt->stream_index = ost->index; > + > + ret = mux_packet_filter(mux, &mt, ost, ret < 0 ? NULL : mt.pkt, > &stream_eof); > av_packet_unref(mt.pkt); > if (ret == AVERROR_EOF) { > if (stream_eof) { > - tq_receive_finish(mux->tq, stream_idx); > + sch_mux_receive_finish(mux->sch, of->index, stream_idx); > } else { > av_log(mux, AV_LOG_VERBOSE, "Muxer returned EOF\n"); > ret = 0; > @@ -343,243 +374,55 @@ void *muxer_thread(void *arg) > finish: > mux_thread_uninit(&mt); > > - for (unsigned int i = 0; i < mux->fc->nb_streams; i++) > - tq_receive_finish(mux->tq, i); > - > - av_log(mux, AV_LOG_VERBOSE, "Terminating muxer thread\n"); > - > return (void*)(intptr_t)ret; > } > > -static int thread_submit_packet(Muxer *mux, OutputStream *ost, AVPacket > *pkt) > -{ > - int ret = 0; > - > - if (!pkt || ost->finished & MUXER_FINISHED) > - goto finish; > - > - ret = tq_send(mux->tq, ost->index, pkt); > - if (ret < 0) > - goto finish; > - > - return 0; > - > -finish: > - if (pkt) > - av_packet_unref(pkt); > - > - ost->finished |= MUXER_FINISHED; > - tq_send_finish(mux->tq, ost->index); > - return ret == AVERROR_EOF ? 0 : ret; > -} > - > -static int queue_packet(OutputStream *ost, AVPacket *pkt) > -{ > - MuxStream *ms = ms_from_ost(ost); > - AVPacket *tmp_pkt = NULL; > - int ret; > - > - if (!av_fifo_can_write(ms->muxing_queue)) { > - size_t cur_size = av_fifo_can_read(ms->muxing_queue); > - size_t pkt_size = pkt ? pkt->size : 0; > - unsigned int are_we_over_size = > - (ms->muxing_queue_data_size + pkt_size) > > ms->muxing_queue_data_threshold; > - size_t limit = are_we_over_size ? ms->max_muxing_queue_size : > SIZE_MAX; > - size_t new_size = FFMIN(2 * cur_size, limit); > - > - if (new_size <= cur_size) { > - av_log(ost, AV_LOG_ERROR, > - "Too many packets buffered for output stream %d:%d.\n", > - ost->file_index, ost->st->index); > - return AVERROR(ENOSPC); > - } > - ret = av_fifo_grow2(ms->muxing_queue, new_size - cur_size); > - if (ret < 0) > - return ret; > - } > - > - if (pkt) { > - ret = av_packet_make_refcounted(pkt); > - if (ret < 0) > - return ret; > - > - tmp_pkt = av_packet_alloc(); > - if (!tmp_pkt) > - return AVERROR(ENOMEM); > - > - av_packet_move_ref(tmp_pkt, pkt); > - ms->muxing_queue_data_size += tmp_pkt->size; > - } > - av_fifo_write(ms->muxing_queue, &tmp_pkt, 1); > - > - return 0; > -} > - > -static int submit_packet(Muxer *mux, AVPacket *pkt, OutputStream *ost) > -{ > - int ret; > - > - if (mux->tq) { > - return thread_submit_packet(mux, ost, pkt); > - } else { > - /* the muxer is not initialized yet, buffer the packet */ > - ret = queue_packet(ost, pkt); > - if (ret < 0) { > - if (pkt) > - av_packet_unref(pkt); > - return ret; > - } > - } > - > - return 0; > -} > - > -int of_output_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) > -{ > - Muxer *mux = mux_from_of(of); > - int ret = 0; > - > - if (pkt && pkt->dts != AV_NOPTS_VALUE) > - ost->last_mux_dts = av_rescale_q(pkt->dts, pkt->time_base, > AV_TIME_BASE_Q); > - > - ret = submit_packet(mux, pkt, ost); > - if (ret < 0) { > - av_log(ost, AV_LOG_ERROR, "Error submitting a packet to the > muxer: %s", > - av_err2str(ret)); > - return ret; > - } > - > - return 0; > -} > - > -int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts) > +static int of_streamcopy(OutputStream *ost, AVPacket *pkt) > { > OutputFile *of = output_files[ost->file_index]; > MuxStream *ms = ms_from_ost(ost); > + DemuxPktData *pd = pkt->opaque_ref ? > (DemuxPktData*)pkt->opaque_ref->data : NULL; > + int64_t dts = pd ? pd->dts_est : AV_NOPTS_VALUE; > int64_t start_time = (of->start_time == AV_NOPTS_VALUE) ? 0 : > of->start_time; > int64_t ts_offset; > - AVPacket *opkt = ms->pkt; > - int ret; > - > - av_packet_unref(opkt); > > if (of->recording_time != INT64_MAX && > dts >= of->recording_time + start_time) > - pkt = NULL; > - > - // EOF: flush output bitstream filters. > - if (!pkt) > - return of_output_packet(of, ost, NULL); > + return AVERROR_EOF; > > if (!ms->streamcopy_started && !(pkt->flags & AV_PKT_FLAG_KEY) && > !ms->copy_initial_nonkeyframes) > - return 0; > + return AVERROR(EAGAIN); > > if (!ms->streamcopy_started) { > if (!ms->copy_prior_start && > (pkt->pts == AV_NOPTS_VALUE ? > dts < ms->ts_copy_start : > pkt->pts < av_rescale_q(ms->ts_copy_start, AV_TIME_BASE_Q, > pkt->time_base))) > - return 0; > + return AVERROR(EAGAIN); > > if (of->start_time != AV_NOPTS_VALUE && dts < of->start_time) > - return 0; > + return AVERROR(EAGAIN); > } > > - ret = av_packet_ref(opkt, pkt); > - if (ret < 0) > - return ret; > - > - ts_offset = av_rescale_q(start_time, AV_TIME_BASE_Q, opkt->time_base); > + ts_offset = av_rescale_q(start_time, AV_TIME_BASE_Q, pkt->time_base); > > if (pkt->pts != AV_NOPTS_VALUE) > - opkt->pts -= ts_offset; > + pkt->pts -= ts_offset; > > if (pkt->dts == AV_NOPTS_VALUE) { > - opkt->dts = av_rescale_q(dts, AV_TIME_BASE_Q, opkt->time_base); > + pkt->dts = av_rescale_q(dts, AV_TIME_BASE_Q, pkt->time_base); > } else if (ost->st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) { > - opkt->pts = opkt->dts - ts_offset; > - } > - opkt->dts -= ts_offset; > - > - { > - int ret = trigger_fix_sub_duration_heartbeat(ost, pkt); > - if (ret < 0) { > - av_log(NULL, AV_LOG_ERROR, > - "Subtitle heartbeat logic failed in %s! (%s)\n", > - __func__, av_err2str(ret)); > - return ret; > - } > + pkt->pts = pkt->dts - ts_offset; > } > > - ret = of_output_packet(of, ost, opkt); > - if (ret < 0) > - return ret; > + pkt->dts -= ts_offset; > > ms->streamcopy_started = 1; > > return 0; > } > > -static int thread_stop(Muxer *mux) > -{ > - void *ret; > - > - if (!mux || !mux->tq) > - return 0; > - > - for (unsigned int i = 0; i < mux->fc->nb_streams; i++) > - tq_send_finish(mux->tq, i); > - > - pthread_join(mux->thread, &ret); > - > - tq_free(&mux->tq); > - > - return (int)(intptr_t)ret; > -} > - > -static int thread_start(Muxer *mux) > -{ > - AVFormatContext *fc = mux->fc; > - ObjPool *op; > - int ret; > - > - op = objpool_alloc_packets(); > - if (!op) > - return AVERROR(ENOMEM); > - > - mux->tq = tq_alloc(fc->nb_streams, mux->thread_queue_size, op, > pkt_move); > - if (!mux->tq) { > - objpool_free(&op); > - return AVERROR(ENOMEM); > - } > - > - ret = pthread_create(&mux->thread, NULL, muxer_thread, (void*)mux); > - if (ret) { > - tq_free(&mux->tq); > - return AVERROR(ret); > - } > - > - /* flush the muxing queues */ > - for (int i = 0; i < fc->nb_streams; i++) { > - OutputStream *ost = mux->of.streams[i]; > - MuxStream *ms = ms_from_ost(ost); > - AVPacket *pkt; > - > - while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) { > - ret = thread_submit_packet(mux, ost, pkt); > - if (pkt) { > - ms->muxing_queue_data_size -= pkt->size; > - av_packet_free(&pkt); > - } > - if (ret < 0) > - return ret; > - } > - } > - > - return 0; > -} > - > int print_sdp(const char *filename); > > int print_sdp(const char *filename) > @@ -590,11 +433,6 @@ int print_sdp(const char *filename) > AVIOContext *sdp_pb; > AVFormatContext **avc; > > - for (i = 0; i < nb_output_files; i++) { > - if (!mux_from_of(output_files[i])->header_written) > - return 0; > - } > - > avc = av_malloc_array(nb_output_files, sizeof(*avc)); > if (!avc) > return AVERROR(ENOMEM); > @@ -629,25 +467,17 @@ int print_sdp(const char *filename) > avio_closep(&sdp_pb); > } > > - // SDP successfully written, allow muxer threads to start > - ret = 1; > - > fail: > av_freep(&avc); > return ret; > } > > -int mux_check_init(Muxer *mux) > +int mux_check_init(void *arg) > { > + Muxer *mux = arg; > OutputFile *of = &mux->of; > AVFormatContext *fc = mux->fc; > - int ret, i; > - > - for (i = 0; i < fc->nb_streams; i++) { > - OutputStream *ost = of->streams[i]; > - if (!ost->initialized) > - return 0; > - } > + int ret; > > ret = avformat_write_header(fc, &mux->opts); > if (ret < 0) { > @@ -659,27 +489,7 @@ int mux_check_init(Muxer *mux) > mux->header_written = 1; > > av_dump_format(fc, of->index, fc->url, 1); > - nb_output_dumped++; > - > - if (sdp_filename || want_sdp) { > - ret = print_sdp(sdp_filename); > - if (ret < 0) { > - av_log(NULL, AV_LOG_ERROR, "Error writing the SDP.\n"); > - return ret; > - } else if (ret == 1) { > - /* SDP is written only after all the muxers are ready, so now > we > - * start ALL the threads */ > - for (i = 0; i < nb_output_files; i++) { > - ret = thread_start(mux_from_of(output_files[i])); > - if (ret < 0) > - return ret; > - } > - } > - } else { > - ret = thread_start(mux_from_of(of)); > - if (ret < 0) > - return ret; > - } > + atomic_fetch_add(&nb_output_dumped, 1); > > return 0; > } > @@ -736,9 +546,10 @@ int of_stream_init(OutputFile *of, OutputStream *ost) > ost->st->time_base); > } > > - ost->initialized = 1; > + if (ms->sch_idx >= 0) > + return sch_mux_stream_ready(mux->sch, of->index, ms->sch_idx); > > - return mux_check_init(mux); > + return 0; > } > > static int check_written(OutputFile *of) > @@ -852,15 +663,13 @@ int of_write_trailer(OutputFile *of) > AVFormatContext *fc = mux->fc; > int ret, mux_result = 0; > > - if (!mux->tq) { > + if (!mux->header_written) { > av_log(mux, AV_LOG_ERROR, > "Nothing was written into output file, because " > "at least one of its streams received no packets.\n"); > return AVERROR(EINVAL); > } > > - mux_result = thread_stop(mux); > - > ret = av_write_trailer(fc); > if (ret < 0) { > av_log(mux, AV_LOG_ERROR, "Error writing trailer: %s\n", > av_err2str(ret)); > @@ -905,13 +714,6 @@ static void ost_free(OutputStream **post) > ost->logfile = NULL; > } > > - if (ms->muxing_queue) { > - AVPacket *pkt; > - while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) > - av_packet_free(&pkt); > - av_fifo_freep2(&ms->muxing_queue); > - } > - > avcodec_parameters_free(&ost->par_in); > > av_bsf_free(&ms->bsf_ctx); > @@ -976,8 +778,6 @@ void of_free(OutputFile **pof) > return; > mux = mux_from_of(of); > > - thread_stop(mux); > - > sq_free(&of->sq_encode); > sq_free(&mux->sq_mux); > > diff --git a/fftools/ffmpeg_mux.h b/fftools/ffmpeg_mux.h > index eee2b2cb07..5d7cf3fa76 100644 > --- a/fftools/ffmpeg_mux.h > +++ b/fftools/ffmpeg_mux.h > @@ -25,7 +25,6 @@ > #include <stdint.h> > > #include "ffmpeg_sched.h" > -#include "thread_queue.h" > > #include "libavformat/avformat.h" > > @@ -33,7 +32,6 @@ > > #include "libavutil/dict.h" > #include "libavutil/fifo.h" > -#include "libavutil/thread.h" > > typedef struct MuxStream { > OutputStream ost; > @@ -41,9 +39,6 @@ typedef struct MuxStream { > // name used for logging > char log_name[32]; > > - /* the packets are buffered here until the muxer is ready to be > initialized */ > - AVFifo *muxing_queue; > - > AVBSFContext *bsf_ctx; > AVPacket *bsf_pkt; > > @@ -57,17 +52,6 @@ typedef struct MuxStream { > > int64_t max_frames; > > - /* > - * The size of the AVPackets' buffers in queue. > - * Updated when a packet is either pushed or pulled from the queue. > - */ > - size_t muxing_queue_data_size; > - > - int max_muxing_queue_size; > - > - /* Threshold after which max_muxing_queue_size will be in effect */ > - size_t muxing_queue_data_threshold; > - > // timestamp from which the streamcopied streams should start, > // in AV_TIME_BASE_Q; > // everything before it should be discarded > @@ -106,9 +90,6 @@ typedef struct Muxer { > int *sch_stream_idx; > int nb_sch_stream_idx; > > - pthread_t thread; > - ThreadQueue *tq; > - > AVDictionary *opts; > > int thread_queue_size; > @@ -122,10 +103,7 @@ typedef struct Muxer { > AVPacket *sq_pkt; > } Muxer; > > -/* whether we want to print an SDP, set in of_open() */ > -extern int want_sdp; > - > -int mux_check_init(Muxer *mux); > +int mux_check_init(void *arg); > > static MuxStream *ms_from_ost(OutputStream *ost) > { > diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c > index 534b4379c7..6459296ab0 100644 > --- a/fftools/ffmpeg_mux_init.c > +++ b/fftools/ffmpeg_mux_init.c > @@ -924,13 +924,6 @@ static int new_stream_audio(Muxer *mux, const > OptionsContext *o, > return 0; > } > > -static int new_stream_attachment(Muxer *mux, const OptionsContext *o, > - OutputStream *ost) > -{ > - ost->finished = 1; > - return 0; > -} > - > static int new_stream_subtitle(Muxer *mux, const OptionsContext *o, > OutputStream *ost) > { > @@ -1168,9 +1161,6 @@ static int ost_add(Muxer *mux, const OptionsContext > *o, enum AVMediaType type, > if (!ost->par_in) > return AVERROR(ENOMEM); > > - ms->muxing_queue = av_fifo_alloc2(8, sizeof(AVPacket*), 0); > - if (!ms->muxing_queue) > - return AVERROR(ENOMEM); > ms->last_mux_dts = AV_NOPTS_VALUE; > > ost->st = st; > @@ -1190,7 +1180,8 @@ static int ost_add(Muxer *mux, const OptionsContext > *o, enum AVMediaType type, > if (!ost->enc_ctx) > return AVERROR(ENOMEM); > > - ret = sch_add_enc(mux->sch, encoder_thread, ost, NULL); > + ret = sch_add_enc(mux->sch, encoder_thread, ost, > + ost->type == AVMEDIA_TYPE_SUBTITLE ? NULL : > enc_open); > if (ret < 0) > return ret; > ms->sch_idx_enc = ret; > @@ -1414,9 +1405,6 @@ static int ost_add(Muxer *mux, const OptionsContext > *o, enum AVMediaType type, > > sch_mux_stream_buffering(mux->sch, mux->sch_idx, ms->sch_idx, > max_muxing_queue_size, > muxing_queue_data_threshold); > - > - ms->max_muxing_queue_size = max_muxing_queue_size; > - ms->muxing_queue_data_threshold = muxing_queue_data_threshold; > } > > MATCH_PER_STREAM_OPT(bits_per_raw_sample, i, ost->bits_per_raw_sample, > @@ -1434,8 +1422,6 @@ static int ost_add(Muxer *mux, const OptionsContext > *o, enum AVMediaType type, > if (ost->enc_ctx && > av_get_exact_bits_per_sample(ost->enc_ctx->codec_id) == 24) > av_dict_set(&ost->swr_opts, "output_sample_bits", "24", 0); > > - ost->last_mux_dts = AV_NOPTS_VALUE; > - > MATCH_PER_STREAM_OPT(copy_initial_nonkeyframes, i, > ms->copy_initial_nonkeyframes, oc, st); > > @@ -1443,7 +1429,6 @@ static int ost_add(Muxer *mux, const OptionsContext > *o, enum AVMediaType type, > case AVMEDIA_TYPE_VIDEO: ret = new_stream_video (mux, o, > ost); break; > case AVMEDIA_TYPE_AUDIO: ret = new_stream_audio (mux, o, > ost); break; > case AVMEDIA_TYPE_SUBTITLE: ret = new_stream_subtitle (mux, o, > ost); break; > - case AVMEDIA_TYPE_ATTACHMENT: ret = new_stream_attachment(mux, o, > ost); break; > } > if (ret < 0) > return ret; > @@ -1938,7 +1923,6 @@ static int setup_sync_queues(Muxer *mux, > AVFormatContext *oc, int64_t buf_size_u > MuxStream *ms = ms_from_ost(ost); > enum AVMediaType type = ost->type; > > - ost->sq_idx_encode = -1; > ost->sq_idx_mux = -1; > > nb_interleaved += IS_INTERLEAVED(type); > @@ -1961,11 +1945,17 @@ static int setup_sync_queues(Muxer *mux, > AVFormatContext *oc, int64_t buf_size_u > * - at least one encoded audio/video stream is frame-limited, since > * that has similar semantics to 'shortest' > * - at least one audio encoder requires constant frame sizes > + * > + * Note that encoding sync queues are handled in the scheduler, > because > + * different encoders run in different threads and need external > + * synchronization, while muxer sync queues can be handled inside the > muxer > */ > if ((of->shortest && nb_av_enc > 1) || limit_frames_av_enc || > nb_audio_fs) { > - of->sq_encode = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, mux); > - if (!of->sq_encode) > - return AVERROR(ENOMEM); > + int sq_idx, ret; > + > + sq_idx = sch_add_sq_enc(mux->sch, buf_size_us, mux); > + if (sq_idx < 0) > + return sq_idx; > > for (int i = 0; i < oc->nb_streams; i++) { > OutputStream *ost = of->streams[i]; > @@ -1975,13 +1965,11 @@ static int setup_sync_queues(Muxer *mux, > AVFormatContext *oc, int64_t buf_size_u > if (!IS_AV_ENC(ost, type)) > continue; > > - ost->sq_idx_encode = sq_add_stream(of->sq_encode, > - of->shortest || > ms->max_frames < INT64_MAX); > - if (ost->sq_idx_encode < 0) > - return ost->sq_idx_encode; > - > - if (ms->max_frames != INT64_MAX) > - sq_limit_frames(of->sq_encode, ost->sq_idx_encode, > ms->max_frames); > + ret = sch_sq_add_enc(mux->sch, sq_idx, ms->sch_idx_enc, > + of->shortest || ms->max_frames < > INT64_MAX, > + ms->max_frames); > + if (ret < 0) > + return ret; > } > } > > @@ -2652,23 +2640,6 @@ static int validate_enc_avopt(Muxer *mux, const > AVDictionary *codec_avopt) > return 0; > } > > -static int init_output_stream_nofilter(OutputStream *ost) > -{ > - int ret = 0; > - > - if (ost->enc_ctx) { > - ret = enc_open(ost, NULL); > - if (ret < 0) > - return ret; > - } else { > - ret = of_stream_init(output_files[ost->file_index], ost); > - if (ret < 0) > - return ret; > - } > - > - return ret; > -} > - > static const char *output_file_item_name(void *obj) > { > const Muxer *mux = obj; > @@ -2751,8 +2722,6 @@ int of_open(const OptionsContext *o, const char > *filename, Scheduler *sch) > av_strlcat(mux->log_name, "/", sizeof(mux->log_name)); > av_strlcat(mux->log_name, oc->oformat->name, sizeof(mux->log_name)); > > - if (strcmp(oc->oformat->name, "rtp")) > - want_sdp = 0; > > of->format = oc->oformat; > if (recording_time != INT64_MAX) > @@ -2768,7 +2737,7 @@ int of_open(const OptionsContext *o, const char > *filename, Scheduler *sch) > AVFMT_FLAG_BITEXACT); > } > > - err = sch_add_mux(sch, muxer_thread, NULL, mux, > + err = sch_add_mux(sch, muxer_thread, mux_check_init, mux, > !strcmp(oc->oformat->name, "rtp")); > if (err < 0) > return err; > @@ -2854,26 +2823,15 @@ int of_open(const OptionsContext *o, const char > *filename, Scheduler *sch) > > of->url = filename; > > - /* initialize stream copy and subtitle/data streams. > - * Encoded AVFrame based streams will get initialized when the first > AVFrame > - * is received in do_video_out > - */ > + /* initialize streamcopy streams. */ > for (int i = 0; i < of->nb_streams; i++) { > OutputStream *ost = of->streams[i]; > > - if (ost->filter) > - continue; > - > - err = init_output_stream_nofilter(ost); > - if (err < 0) > - return err; > - } > - > - /* write the header for files with no streams */ > - if (of->format->flags & AVFMT_NOSTREAMS && oc->nb_streams == 0) { > - int ret = mux_check_init(mux); > - if (ret < 0) > - return ret; > + if (!ost->enc) { > + err = of_stream_init(of, ost); > + if (err < 0) > + return err; > + } > } > > return 0; > diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c > index d463306546..6177a96a4e 100644 > --- a/fftools/ffmpeg_opt.c > +++ b/fftools/ffmpeg_opt.c > @@ -64,7 +64,6 @@ const char *const opt_name_top_field_first[] > = {"top", NULL}; > HWDevice *filter_hw_device; > > char *vstats_filename; > -char *sdp_filename; > > float audio_drift_threshold = 0.1; > float dts_delta_threshold = 10; > @@ -580,9 +579,8 @@ fail: > > static int opt_sdp_file(void *optctx, const char *opt, const char *arg) > { > - av_free(sdp_filename); > - sdp_filename = av_strdup(arg); > - return 0; > + Scheduler *sch = optctx; > + return sch_sdp_filename(sch, arg); > } > > #if CONFIG_VAAPI > diff --git a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat > b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat > index 957a410921..bc9b833799 100644 > --- a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat > +++ b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat > @@ -1,48 +1,40 @@ > 1 > -00:00:00,968 --> 00:00:01,001 > +00:00:00,968 --> 00:00:01,168 > <font face="Monospace">{\an7}(</font> > > 2 > -00:00:01,001 --> 00:00:01,168 > -<font face="Monospace">{\an7}(</font> > - > -3 > 00:00:01,168 --> 00:00:01,368 > <font face="Monospace">{\an7}(<i> inaudibl</i></font> > > -4 > +3 > 00:00:01,368 --> 00:00:01,568 > <font face="Monospace">{\an7}(<i> inaudible radio chat</i></font> > > -5 > +4 > 00:00:01,568 --> 00:00:02,002 > <font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font> > > +5 > +00:00:02,002 --> 00:00:03,103 > +<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font> > + > 6 > -00:00:02,002 --> 00:00:03,003 > -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font> > - > -7 > -00:00:03,003 --> 00:00:03,103 > -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font> > - > -8 > 00:00:03,103 --> 00:00:03,303 > -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> ) > +<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> ) > >></font> > > -9 > +7 > 00:00:03,303 --> 00:00:03,503 > -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> ) > +<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> ) > >> Safety rema</font> > > -10 > +8 > 00:00:03,504 --> 00:00:03,704 > -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> ) > +<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> ) > >> Safety remains our numb</font> > > -11 > +9 > 00:00:03,704 --> 00:00:04,004 > -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> ) > +<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> ) > >> Safety remains our number one</font> > > -- > 2.42.0 > > _______________________________________________ > ffmpeg-devel mailing list > ffmpeg-devel@ffmpeg.org > https://ffmpeg.org/mailman/listinfo/ffmpeg-devel > > To unsubscribe, visit link above, or email > ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe". >
Quoting Paul B Mahol (2023-12-06 12:22:48) > On Wed, Dec 6, 2023 at 11:32 AM Anton Khirnov <anton@khirnov.net> wrote: > > > Change the main loop and every component (demuxers, decoders, filters, > > encoders, muxers) to use the previously added transcode scheduler. Every > > instance of every such component was already running in a separate > > thread, but now they can actually run in parallel. > > > > Changes the results of ffmpeg-fix_sub_duration_heartbeat - tested by > > JEEB to be more correct and deterministic. > > > > Un-reviewable, please split it. Here https://git.khirnov.net/libav.git/log/?h=ffmpeg_threading_split
diff --git a/Changelog b/Changelog index f00bc27ca4..67ef92eb02 100644 --- a/Changelog +++ b/Changelog @@ -7,6 +7,8 @@ version <next>: - EVC encoding using external library libxeve - QOA decoder and demuxer - aap filter +- demuxing, decoding, filtering, encoding, and muxing in the + ffmpeg CLI now all run in parallel version 6.1: - libaribcaption decoder diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c index b8a97258a0..30b594fd97 100644 --- a/fftools/ffmpeg.c +++ b/fftools/ffmpeg.c @@ -117,7 +117,7 @@ typedef struct BenchmarkTimeStamps { static BenchmarkTimeStamps get_benchmark_time_stamps(void); static int64_t getmaxrss(void); -unsigned nb_output_dumped = 0; +atomic_uint nb_output_dumped = 0; static BenchmarkTimeStamps current_time; AVIOContext *progress_avio = NULL; @@ -138,30 +138,6 @@ static struct termios oldtty; static int restore_tty; #endif -/* sub2video hack: - Convert subtitles to video with alpha to insert them in filter graphs. - This is a temporary solution until libavfilter gets real subtitles support. - */ - -static void sub2video_heartbeat(InputFile *infile, int64_t pts, AVRational tb) -{ - /* When a frame is read from a file, examine all sub2video streams in - the same file and send the sub2video frame again. Otherwise, decoded - video frames could be accumulating in the filter graph while a filter - (possibly overlay) is desperately waiting for a subtitle frame. */ - for (int i = 0; i < infile->nb_streams; i++) { - InputStream *ist = infile->streams[i]; - - if (ist->dec_ctx->codec_type != AVMEDIA_TYPE_SUBTITLE) - continue; - - for (int j = 0; j < ist->nb_filters; j++) - ifilter_sub2video_heartbeat(ist->filters[j], pts, tb); - } -} - -/* end of sub2video hack */ - static void term_exit_sigsafe(void) { #if HAVE_TERMIOS_H @@ -499,23 +475,13 @@ void update_benchmark(const char *fmt, ...) } } -void close_output_stream(OutputStream *ost) -{ - OutputFile *of = output_files[ost->file_index]; - ost->finished |= ENCODER_FINISHED; - - if (ost->sq_idx_encode >= 0) - sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL)); -} - -static void print_report(int is_last_report, int64_t timer_start, int64_t cur_time) +static void print_report(int is_last_report, int64_t timer_start, int64_t cur_time, int64_t pts) { AVBPrint buf, buf_script; int64_t total_size = of_filesize(output_files[0]); int vid; double bitrate; double speed; - int64_t pts = AV_NOPTS_VALUE; static int64_t last_time = -1; static int first_report = 1; uint64_t nb_frames_dup = 0, nb_frames_drop = 0; @@ -533,7 +499,7 @@ static void print_report(int is_last_report, int64_t timer_start, int64_t cur_ti last_time = cur_time; } if (((cur_time - last_time) < stats_period && !first_report) || - (first_report && nb_output_dumped < nb_output_files)) + (first_report && atomic_load(&nb_output_dumped) < nb_output_files)) return; last_time = cur_time; } @@ -544,7 +510,7 @@ static void print_report(int is_last_report, int64_t timer_start, int64_t cur_ti av_bprint_init(&buf, 0, AV_BPRINT_SIZE_AUTOMATIC); av_bprint_init(&buf_script, 0, AV_BPRINT_SIZE_AUTOMATIC); for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) { - const float q = ost->enc ? ost->quality / (float) FF_QP2LAMBDA : -1; + const float q = ost->enc ? atomic_load(&ost->quality) / (float) FF_QP2LAMBDA : -1; if (vid && ost->type == AVMEDIA_TYPE_VIDEO) { av_bprintf(&buf, "q=%2.1f ", q); @@ -565,22 +531,18 @@ static void print_report(int is_last_report, int64_t timer_start, int64_t cur_ti if (is_last_report) av_bprintf(&buf, "L"); - nb_frames_dup = ost->filter->nb_frames_dup; - nb_frames_drop = ost->filter->nb_frames_drop; + nb_frames_dup = atomic_load(&ost->filter->nb_frames_dup); + nb_frames_drop = atomic_load(&ost->filter->nb_frames_drop); vid = 1; } - /* compute min output value */ - if (ost->last_mux_dts != AV_NOPTS_VALUE) { - if (pts == AV_NOPTS_VALUE || ost->last_mux_dts > pts) - pts = ost->last_mux_dts; - if (copy_ts) { - if (copy_ts_first_pts == AV_NOPTS_VALUE && pts > 1) - copy_ts_first_pts = pts; - if (copy_ts_first_pts != AV_NOPTS_VALUE) - pts -= copy_ts_first_pts; - } - } + } + + if (copy_ts) { + if (copy_ts_first_pts == AV_NOPTS_VALUE && pts > 1) + copy_ts_first_pts = pts; + if (copy_ts_first_pts != AV_NOPTS_VALUE) + pts -= copy_ts_first_pts; } us = FFABS64U(pts) % AV_TIME_BASE; @@ -783,81 +745,6 @@ int subtitle_wrap_frame(AVFrame *frame, AVSubtitle *subtitle, int copy) return 0; } -int trigger_fix_sub_duration_heartbeat(OutputStream *ost, const AVPacket *pkt) -{ - OutputFile *of = output_files[ost->file_index]; - int64_t signal_pts = av_rescale_q(pkt->pts, pkt->time_base, - AV_TIME_BASE_Q); - - if (!ost->fix_sub_duration_heartbeat || !(pkt->flags & AV_PKT_FLAG_KEY)) - // we are only interested in heartbeats on streams configured, and - // only on random access points. - return 0; - - for (int i = 0; i < of->nb_streams; i++) { - OutputStream *iter_ost = of->streams[i]; - InputStream *ist = iter_ost->ist; - int ret = AVERROR_BUG; - - if (iter_ost == ost || !ist || !ist->decoding_needed || - ist->dec_ctx->codec_type != AVMEDIA_TYPE_SUBTITLE) - // We wish to skip the stream that causes the heartbeat, - // output streams without an input stream, streams not decoded - // (as fix_sub_duration is only done for decoded subtitles) as - // well as non-subtitle streams. - continue; - - if ((ret = fix_sub_duration_heartbeat(ist, signal_pts)) < 0) - return ret; - } - - return 0; -} - -/* pkt = NULL means EOF (needed to flush decoder buffers) */ -static int process_input_packet(InputStream *ist, const AVPacket *pkt, int no_eof) -{ - InputFile *f = input_files[ist->file_index]; - int64_t dts_est = AV_NOPTS_VALUE; - int ret = 0; - int eof_reached = 0; - - if (ist->decoding_needed) { - ret = dec_packet(ist, pkt, no_eof); - if (ret < 0 && ret != AVERROR_EOF) - return ret; - } - if (ret == AVERROR_EOF || (!pkt && !ist->decoding_needed)) - eof_reached = 1; - - if (pkt && pkt->opaque_ref) { - DemuxPktData *pd = (DemuxPktData*)pkt->opaque_ref->data; - dts_est = pd->dts_est; - } - - if (f->recording_time != INT64_MAX) { - int64_t start_time = 0; - if (copy_ts) { - start_time += f->start_time != AV_NOPTS_VALUE ? f->start_time : 0; - start_time += start_at_zero ? 0 : f->start_time_effective; - } - if (dts_est >= f->recording_time + start_time) - pkt = NULL; - } - - for (int oidx = 0; oidx < ist->nb_outputs; oidx++) { - OutputStream *ost = ist->outputs[oidx]; - if (ost->enc || (!pkt && no_eof)) - continue; - - ret = of_streamcopy(ost, pkt, dts_est); - if (ret < 0) - return ret; - } - - return !eof_reached; -} - static void print_stream_maps(void) { av_log(NULL, AV_LOG_INFO, "Stream mapping:\n"); @@ -934,43 +821,6 @@ static void print_stream_maps(void) } } -/** - * Select the output stream to process. - * - * @retval 0 an output stream was selected - * @retval AVERROR(EAGAIN) need to wait until more input is available - * @retval AVERROR_EOF no more streams need output - */ -static int choose_output(OutputStream **post) -{ - int64_t opts_min = INT64_MAX; - OutputStream *ost_min = NULL; - - for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) { - int64_t opts; - - if (ost->filter && ost->filter->last_pts != AV_NOPTS_VALUE) { - opts = ost->filter->last_pts; - } else { - opts = ost->last_mux_dts == AV_NOPTS_VALUE ? - INT64_MIN : ost->last_mux_dts; - } - - if (!ost->initialized && !ost->finished) { - ost_min = ost; - break; - } - if (!ost->finished && opts < opts_min) { - opts_min = opts; - ost_min = ost; - } - } - if (!ost_min) - return AVERROR_EOF; - *post = ost_min; - return ost_min->unavailable ? AVERROR(EAGAIN) : 0; -} - static void set_tty_echo(int on) { #if HAVE_TERMIOS_H @@ -1042,149 +892,21 @@ static int check_keyboard_interaction(int64_t cur_time) return 0; } -static void reset_eagain(void) -{ - for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) - ost->unavailable = 0; -} - -static void decode_flush(InputFile *ifile) -{ - for (int i = 0; i < ifile->nb_streams; i++) { - InputStream *ist = ifile->streams[i]; - - if (ist->discard || !ist->decoding_needed) - continue; - - dec_packet(ist, NULL, 1); - } -} - -/* - * Return - * - 0 -- one packet was read and processed - * - AVERROR(EAGAIN) -- no packets were available for selected file, - * this function should be called again - * - AVERROR_EOF -- this function should not be called again - */ -static int process_input(int file_index, AVPacket *pkt) -{ - InputFile *ifile = input_files[file_index]; - InputStream *ist; - int ret, i; - - ret = ifile_get_packet(ifile, pkt); - - if (ret == 1) { - /* the input file is looped: flush the decoders */ - decode_flush(ifile); - return AVERROR(EAGAIN); - } - if (ret < 0) { - if (ret != AVERROR_EOF) { - av_log(ifile, AV_LOG_ERROR, - "Error retrieving a packet from demuxer: %s\n", av_err2str(ret)); - if (exit_on_error) - return ret; - } - - for (i = 0; i < ifile->nb_streams; i++) { - ist = ifile->streams[i]; - if (!ist->discard) { - ret = process_input_packet(ist, NULL, 0); - if (ret>0) - return 0; - else if (ret < 0) - return ret; - } - - /* mark all outputs that don't go through lavfi as finished */ - for (int oidx = 0; oidx < ist->nb_outputs; oidx++) { - OutputStream *ost = ist->outputs[oidx]; - OutputFile *of = output_files[ost->file_index]; - - ret = of_output_packet(of, ost, NULL); - if (ret < 0) - return ret; - } - } - - ifile->eof_reached = 1; - return AVERROR(EAGAIN); - } - - reset_eagain(); - - ist = ifile->streams[pkt->stream_index]; - - sub2video_heartbeat(ifile, pkt->pts, pkt->time_base); - - ret = process_input_packet(ist, pkt, 0); - - av_packet_unref(pkt); - - return ret < 0 ? ret : 0; -} - -/** - * Run a single step of transcoding. - * - * @return 0 for success, <0 for error - */ -static int transcode_step(OutputStream *ost, AVPacket *demux_pkt) -{ - InputStream *ist = NULL; - int ret; - - if (ost->filter) { - if ((ret = fg_transcode_step(ost->filter->graph, &ist)) < 0) - return ret; - if (!ist) - return 0; - } else { - ist = ost->ist; - av_assert0(ist); - } - - ret = process_input(ist->file_index, demux_pkt); - if (ret == AVERROR(EAGAIN)) { - return 0; - } - - if (ret < 0) - return ret == AVERROR_EOF ? 0 : ret; - - // process_input() above might have caused output to become available - // in multiple filtergraphs, so we process all of them - for (int i = 0; i < nb_filtergraphs; i++) { - ret = reap_filters(filtergraphs[i], 0); - if (ret < 0) - return ret; - } - - return 0; -} - /* * The following code is the main loop of the file converter */ -static int transcode(Scheduler *sch, int *err_rate_exceeded) +static int transcode(Scheduler *sch) { int ret = 0, i; - InputStream *ist; - int64_t timer_start; - AVPacket *demux_pkt = NULL; + int64_t timer_start, transcode_ts = 0; print_stream_maps(); - *err_rate_exceeded = 0; atomic_store(&transcode_init_done, 1); - demux_pkt = av_packet_alloc(); - if (!demux_pkt) { - ret = AVERROR(ENOMEM); - goto fail; - } + ret = sch_start(sch); + if (ret < 0) + return ret; if (stdin_interaction) { av_log(NULL, AV_LOG_INFO, "Press [q] to stop, [?] for help\n"); @@ -1192,8 +914,7 @@ static int transcode(Scheduler *sch, int *err_rate_exceeded) timer_start = av_gettime_relative(); - while (!received_sigterm) { - OutputStream *ost; + while (!sch_wait(sch, stats_period, &transcode_ts)) { int64_t cur_time= av_gettime_relative(); /* if 'q' pressed, exits */ @@ -1201,49 +922,11 @@ static int transcode(Scheduler *sch, int *err_rate_exceeded) if (check_keyboard_interaction(cur_time) < 0) break; - ret = choose_output(&ost); - if (ret == AVERROR(EAGAIN)) { - reset_eagain(); - av_usleep(10000); - ret = 0; - continue; - } else if (ret < 0) { - av_log(NULL, AV_LOG_VERBOSE, "No more output streams to write to, finishing.\n"); - ret = 0; - break; - } - - ret = transcode_step(ost, demux_pkt); - if (ret < 0 && ret != AVERROR_EOF) { - av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n", av_err2str(ret)); - break; - } - /* dump report by using the output first video and audio streams */ - print_report(0, timer_start, cur_time); + print_report(0, timer_start, cur_time, transcode_ts); } - /* at the end of stream, we must flush the decoder buffers */ - for (ist = ist_iter(NULL); ist; ist = ist_iter(ist)) { - float err_rate; - - if (!input_files[ist->file_index]->eof_reached) { - int err = process_input_packet(ist, NULL, 0); - ret = err_merge(ret, err); - } - - err_rate = (ist->frames_decoded || ist->decode_errors) ? - ist->decode_errors / (ist->frames_decoded + ist->decode_errors) : 0.f; - if (err_rate > max_error_rate) { - av_log(ist, AV_LOG_FATAL, "Decode error rate %g exceeds maximum %g\n", - err_rate, max_error_rate); - *err_rate_exceeded = 1; - } else if (err_rate) - av_log(ist, AV_LOG_VERBOSE, "Decode error rate %g\n", err_rate); - } - ret = err_merge(ret, enc_flush()); - - term_exit(); + ret = sch_stop(sch); /* write the trailer if needed */ for (i = 0; i < nb_output_files; i++) { @@ -1251,11 +934,10 @@ static int transcode(Scheduler *sch, int *err_rate_exceeded) ret = err_merge(ret, err); } - /* dump report by using the first video and audio streams */ - print_report(1, timer_start, av_gettime_relative()); + term_exit(); -fail: - av_packet_free(&demux_pkt); + /* dump report by using the first video and audio streams */ + print_report(1, timer_start, av_gettime_relative(), transcode_ts); return ret; } @@ -1308,7 +990,7 @@ int main(int argc, char **argv) { Scheduler *sch = NULL; - int ret, err_rate_exceeded; + int ret; BenchmarkTimeStamps ti; init_dynload(); @@ -1350,7 +1032,7 @@ int main(int argc, char **argv) } current_time = ti = get_benchmark_time_stamps(); - ret = transcode(sch, &err_rate_exceeded); + ret = transcode(sch); if (ret >= 0 && do_benchmark) { int64_t utime, stime, rtime; current_time = get_benchmark_time_stamps(); @@ -1362,8 +1044,8 @@ int main(int argc, char **argv) utime / 1000000.0, stime / 1000000.0, rtime / 1000000.0); } - ret = received_nb_signals ? 255 : - err_rate_exceeded ? 69 : ret; + ret = received_nb_signals ? 255 : + (ret == FFMPEG_ERROR_RATE_EXCEEDED) ? 69 : ret; finish: if (ret == AVERROR_EXIT) diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index a89038b765..ba82b7490d 100644 --- a/fftools/ffmpeg.h +++ b/fftools/ffmpeg.h @@ -61,6 +61,8 @@ #define FFMPEG_OPT_TOP 1 #define FFMPEG_OPT_FORCE_KF_SOURCE_NO_DROP 1 +#define FFMPEG_ERROR_RATE_EXCEEDED FFERRTAG('E', 'R', 'E', 'D') + enum VideoSyncMethod { VSYNC_AUTO = -1, VSYNC_PASSTHROUGH, @@ -82,13 +84,16 @@ enum HWAccelID { }; enum FrameOpaque { - FRAME_OPAQUE_REAP_FILTERS = 1, - FRAME_OPAQUE_CHOOSE_INPUT, - FRAME_OPAQUE_SUB_HEARTBEAT, + FRAME_OPAQUE_SUB_HEARTBEAT = 1, FRAME_OPAQUE_EOF, FRAME_OPAQUE_SEND_COMMAND, }; +enum PacketOpaque { + PKT_OPAQUE_SUB_HEARTBEAT = 1, + PKT_OPAQUE_FIX_SUB_DURATION, +}; + typedef struct HWDevice { const char *name; enum AVHWDeviceType type; @@ -309,11 +314,8 @@ typedef struct OutputFilter { enum AVMediaType type; - /* pts of the last frame received from this filter, in AV_TIME_BASE_Q */ - int64_t last_pts; - - uint64_t nb_frames_dup; - uint64_t nb_frames_drop; + atomic_uint_least64_t nb_frames_dup; + atomic_uint_least64_t nb_frames_drop; } OutputFilter; typedef struct FilterGraph { @@ -426,11 +428,6 @@ typedef struct InputFile { float readrate; int accurate_seek; - - /* when looping the input file, this queue is used by decoders to report - * the last frame timestamp back to the demuxer thread */ - AVThreadMessageQueue *audio_ts_queue; - int audio_ts_queue_size; } InputFile; enum forced_keyframes_const { @@ -532,8 +529,6 @@ typedef struct OutputStream { InputStream *ist; AVStream *st; /* stream in the output file */ - /* dts of the last packet sent to the muxing queue, in AV_TIME_BASE_Q */ - int64_t last_mux_dts; AVRational enc_timebase; @@ -578,13 +573,6 @@ typedef struct OutputStream { AVDictionary *sws_dict; AVDictionary *swr_opts; char *apad; - OSTFinished finished; /* no more packets should be written for this stream */ - int unavailable; /* true if the steram is unavailable (possibly temporarily) */ - - // init_output_stream() has been called for this stream - // The encoder and the bitstream filters have been initialized and the stream - // parameters are set in the AVStream. - int initialized; const char *attachment_filename; @@ -598,9 +586,8 @@ typedef struct OutputStream { uint64_t samples_encoded; /* packet quality factor */ - int quality; + atomic_int quality; - int sq_idx_encode; int sq_idx_mux; EncStats enc_stats_pre; @@ -658,7 +645,6 @@ extern FilterGraph **filtergraphs; extern int nb_filtergraphs; extern char *vstats_filename; -extern char *sdp_filename; extern float dts_delta_threshold; extern float dts_error_threshold; @@ -691,7 +677,7 @@ extern const AVIOInterruptCB int_cb; extern const OptionDef options[]; extern HWDevice *filter_hw_device; -extern unsigned nb_output_dumped; +extern atomic_uint nb_output_dumped; extern int ignore_unknown_streams; extern int copy_unknown_streams; @@ -737,10 +723,6 @@ FrameData *frame_data(AVFrame *frame); const FrameData *frame_data_c(AVFrame *frame); -int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference); -int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb); -void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb); - /** * Set up fallback filtering parameters from a decoder context. They will only * be used if no frames are ever sent on this input, otherwise the actual @@ -761,26 +743,9 @@ int fg_create(FilterGraph **pfg, char *graph_desc, Scheduler *sch); void fg_free(FilterGraph **pfg); -/** - * Perform a step of transcoding for the specified filter graph. - * - * @param[in] graph filter graph to consider - * @param[out] best_ist input stream where a frame would allow to continue - * @return 0 for success, <0 for error - */ -int fg_transcode_step(FilterGraph *graph, InputStream **best_ist); - void fg_send_command(FilterGraph *fg, double time, const char *target, const char *command, const char *arg, int all_filters); -/** - * Get and encode new output from specified filtergraph, without causing - * activity. - * - * @return 0 for success, <0 for severe errors - */ -int reap_filters(FilterGraph *fg, int flush); - int ffmpeg_parse_options(int argc, char **argv, Scheduler *sch); void enc_stats_write(OutputStream *ost, EncStats *es, @@ -807,25 +772,11 @@ int hwaccel_retrieve_data(AVCodecContext *avctx, AVFrame *input); int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx); void dec_free(Decoder **pdec); -/** - * Submit a packet for decoding - * - * When pkt==NULL and no_eof=0, there will be no more input. Flush decoders and - * mark all downstreams as finished. - * - * When pkt==NULL and no_eof=1, the stream was reset (e.g. after a seek). Flush - * decoders and await further input. - */ -int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof); - int enc_alloc(Encoder **penc, const AVCodec *codec, Scheduler *sch, unsigned sch_idx); void enc_free(Encoder **penc); -int enc_open(OutputStream *ost, const AVFrame *frame); -int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub); -int enc_frame(OutputStream *ost, AVFrame *frame); -int enc_flush(void); +int enc_open(void *opaque, const AVFrame *frame); /* * Initialize muxing state for the given stream, should be called @@ -840,30 +791,11 @@ void of_free(OutputFile **pof); void of_enc_stats_close(void); -int of_output_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt); - -/** - * @param dts predicted packet dts in AV_TIME_BASE_Q - */ -int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts); - int64_t of_filesize(OutputFile *of); int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch); void ifile_close(InputFile **f); -/** - * Get next input packet from the demuxer. - * - * @param pkt the packet is written here when this function returns 0 - * @return - * - 0 when a packet has been read successfully - * - 1 when stream end was reached, but the stream is looped; - * caller should flush decoders and read from this demuxer again - * - a negative error code on failure - */ -int ifile_get_packet(InputFile *f, AVPacket *pkt); - int ist_output_add(InputStream *ist, OutputStream *ost); int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple); @@ -880,9 +812,6 @@ InputStream *ist_iter(InputStream *prev); * pass NULL to start iteration */ OutputStream *ost_iter(OutputStream *prev); -void close_output_stream(OutputStream *ost); -int trigger_fix_sub_duration_heartbeat(OutputStream *ost, const AVPacket *pkt); -int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts); void update_benchmark(const char *fmt, ...); #define SPECIFIER_OPT_FMT_str "%s" diff --git a/fftools/ffmpeg_dec.c b/fftools/ffmpeg_dec.c index 90ea0d6d93..5dde82a276 100644 --- a/fftools/ffmpeg_dec.c +++ b/fftools/ffmpeg_dec.c @@ -54,24 +54,6 @@ struct Decoder { Scheduler *sch; unsigned sch_idx; - - pthread_t thread; - /** - * Queue for sending coded packets from the main thread to - * the decoder thread. - * - * An empty packet is sent to flush the decoder without terminating - * decoding. - */ - ThreadQueue *queue_in; - /** - * Queue for sending decoded frames from the decoder thread - * to the main thread. - * - * An empty frame is sent to signal that a single packet has been fully - * processed. - */ - ThreadQueue *queue_out; }; // data that is local to the decoder thread and not visible outside of it @@ -80,24 +62,6 @@ typedef struct DecThreadContext { AVPacket *pkt; } DecThreadContext; -static int dec_thread_stop(Decoder *d) -{ - void *ret; - - if (!d->queue_in) - return 0; - - tq_send_finish(d->queue_in, 0); - tq_receive_finish(d->queue_out, 0); - - pthread_join(d->thread, &ret); - - tq_free(&d->queue_in); - tq_free(&d->queue_out); - - return (intptr_t)ret; -} - void dec_free(Decoder **pdec) { Decoder *dec = *pdec; @@ -105,8 +69,6 @@ void dec_free(Decoder **pdec) if (!dec) return; - dec_thread_stop(dec); - av_frame_free(&dec->frame); av_packet_free(&dec->pkt); @@ -148,25 +110,6 @@ fail: return AVERROR(ENOMEM); } -static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame) -{ - int i, ret = 0; - - for (i = 0; i < ist->nb_filters; i++) { - ret = ifilter_send_frame(ist->filters[i], decoded_frame, - i < ist->nb_filters - 1 || - ist->dec->type == AVMEDIA_TYPE_SUBTITLE); - if (ret == AVERROR_EOF) - ret = 0; /* ignore */ - if (ret < 0) { - av_log(NULL, AV_LOG_ERROR, - "Failed to inject frame into filter network: %s\n", av_err2str(ret)); - break; - } - } - return ret; -} - static AVRational audio_samplerate_update(void *logctx, Decoder *d, const AVFrame *frame) { @@ -421,28 +364,14 @@ static int process_subtitle(InputStream *ist, AVFrame *frame) if (!subtitle) return 0; - ret = send_frame_to_filters(ist, frame); + ret = sch_dec_send(d->sch, d->sch_idx, frame); if (ret < 0) - return ret; + av_frame_unref(frame); - subtitle = (AVSubtitle*)frame->buf[0]->data; - if (!subtitle->num_rects) - return 0; - - for (int oidx = 0; oidx < ist->nb_outputs; oidx++) { - OutputStream *ost = ist->outputs[oidx]; - if (!ost->enc || ost->type != AVMEDIA_TYPE_SUBTITLE) - continue; - - ret = enc_subtitle(output_files[ost->file_index], ost, subtitle); - if (ret < 0) - return ret; - } - - return 0; + return ret == AVERROR_EOF ? AVERROR_EXIT : ret; } -int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts) +static int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts) { Decoder *d = ist->decoder; int ret = AVERROR_BUG; @@ -468,12 +397,24 @@ int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts) static int transcode_subtitles(InputStream *ist, const AVPacket *pkt, AVFrame *frame) { - Decoder *d = ist->decoder; + Decoder *d = ist->decoder; AVPacket *flush_pkt = NULL; AVSubtitle subtitle; int got_output; int ret; + if (pkt && (intptr_t)pkt->opaque == PKT_OPAQUE_SUB_HEARTBEAT) { + frame->pts = pkt->pts; + frame->time_base = pkt->time_base; + frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_SUB_HEARTBEAT; + + ret = sch_dec_send(d->sch, d->sch_idx, frame); + return ret == AVERROR_EOF ? AVERROR_EXIT : ret; + } else if (pkt && (intptr_t)pkt->opaque == PKT_OPAQUE_FIX_SUB_DURATION) { + return fix_sub_duration_heartbeat(ist, av_rescale_q(pkt->pts, pkt->time_base, + AV_TIME_BASE_Q)); + } + if (!pkt) { flush_pkt = av_packet_alloc(); if (!flush_pkt) @@ -496,7 +437,7 @@ static int transcode_subtitles(InputStream *ist, const AVPacket *pkt, ist->frames_decoded++; - // XXX the queue for transferring data back to the main thread runs + // XXX the queue for transferring data to consumers runs // on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put that // inside the frame // eventually, subtitles should be switched to use AVFrames natively @@ -509,26 +450,7 @@ static int transcode_subtitles(InputStream *ist, const AVPacket *pkt, frame->width = ist->dec_ctx->width; frame->height = ist->dec_ctx->height; - ret = tq_send(d->queue_out, 0, frame); - if (ret < 0) - av_frame_unref(frame); - - return ret; -} - -static int send_filter_eof(InputStream *ist) -{ - Decoder *d = ist->decoder; - int i, ret; - - for (i = 0; i < ist->nb_filters; i++) { - int64_t end_pts = d->last_frame_pts == AV_NOPTS_VALUE ? AV_NOPTS_VALUE : - d->last_frame_pts + d->last_frame_duration_est; - ret = ifilter_send_eof(ist->filters[i], end_pts, d->last_frame_tb); - if (ret < 0) - return ret; - } - return 0; + return process_subtitle(ist, frame); } static int packet_decode(InputStream *ist, AVPacket *pkt, AVFrame *frame) @@ -635,9 +557,11 @@ static int packet_decode(InputStream *ist, AVPacket *pkt, AVFrame *frame) ist->frames_decoded++; - ret = tq_send(d->queue_out, 0, frame); - if (ret < 0) - return ret; + ret = sch_dec_send(d->sch, d->sch_idx, frame); + if (ret < 0) { + av_frame_unref(frame); + return ret == AVERROR_EOF ? AVERROR_EXIT : ret; + } } } @@ -679,7 +603,6 @@ fail: void *decoder_thread(void *arg) { InputStream *ist = arg; - InputFile *ifile = input_files[ist->file_index]; Decoder *d = ist->decoder; DecThreadContext dt; int ret = 0, input_status = 0; @@ -691,19 +614,31 @@ void *decoder_thread(void *arg) dec_thread_set_name(ist); while (!input_status) { - int dummy, flush_buffers; + int flush_buffers, have_data; - input_status = tq_receive(d->queue_in, &dummy, dt.pkt); - flush_buffers = input_status >= 0 && !dt.pkt->buf; - if (!dt.pkt->buf) + input_status = sch_dec_receive(d->sch, d->sch_idx, dt.pkt); + have_data = input_status >= 0 && + (dt.pkt->buf || dt.pkt->side_data_elems || + (intptr_t)dt.pkt->opaque == PKT_OPAQUE_SUB_HEARTBEAT || + (intptr_t)dt.pkt->opaque == PKT_OPAQUE_FIX_SUB_DURATION); + flush_buffers = input_status >= 0 && !have_data; + if (!have_data) av_log(ist, AV_LOG_VERBOSE, "Decoder thread received %s packet\n", flush_buffers ? "flush" : "EOF"); - ret = packet_decode(ist, dt.pkt->buf ? dt.pkt : NULL, dt.frame); + ret = packet_decode(ist, have_data ? dt.pkt : NULL, dt.frame); av_packet_unref(dt.pkt); av_frame_unref(dt.frame); + // AVERROR_EOF - EOF from the decoder + // AVERROR_EXIT - EOF from the scheduler + // we treat them differently when flushing + if (ret == AVERROR_EXIT) { + ret = AVERROR_EOF; + flush_buffers = 0; + } + if (ret == AVERROR_EOF) { av_log(ist, AV_LOG_VERBOSE, "Decoder returned EOF, %s\n", flush_buffers ? "resetting" : "finishing"); @@ -711,11 +646,10 @@ void *decoder_thread(void *arg) if (!flush_buffers) break; - /* report last frame duration to the demuxer thread */ + /* report last frame duration to the scheduler */ if (ist->dec->type == AVMEDIA_TYPE_AUDIO) { - Timestamp ts = { .ts = d->last_frame_pts + d->last_frame_duration_est, - .tb = d->last_frame_tb }; - av_thread_message_queue_send(ifile->audio_ts_queue, &ts, 0); + dt.pkt->pts = d->last_frame_pts + d->last_frame_duration_est; + dt.pkt->time_base = d->last_frame_tb; } avcodec_flush_buffers(ist->dec_ctx); @@ -724,149 +658,47 @@ void *decoder_thread(void *arg) av_err2str(ret)); break; } - - // signal to the consumer thread that the entire packet was processed - ret = tq_send(d->queue_out, 0, dt.frame); - if (ret < 0) { - if (ret != AVERROR_EOF) - av_log(ist, AV_LOG_ERROR, "Error communicating with the main thread\n"); - break; - } } // EOF is normal thread termination if (ret == AVERROR_EOF) ret = 0; + // on success send EOF timestamp to our downstreams + if (ret >= 0) { + float err_rate; + + av_frame_unref(dt.frame); + + dt.frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_EOF; + dt.frame->pts = d->last_frame_pts == AV_NOPTS_VALUE ? AV_NOPTS_VALUE : + d->last_frame_pts + d->last_frame_duration_est; + dt.frame->time_base = d->last_frame_tb; + + ret = sch_dec_send(d->sch, d->sch_idx, dt.frame); + if (ret < 0 && ret != AVERROR_EOF) { + av_log(NULL, AV_LOG_FATAL, + "Error signalling EOF timestamp: %s\n", av_err2str(ret)); + goto finish; + } + ret = 0; + + err_rate = (ist->frames_decoded || ist->decode_errors) ? + ist->decode_errors / (ist->frames_decoded + ist->decode_errors) : 0.f; + if (err_rate > max_error_rate) { + av_log(ist, AV_LOG_FATAL, "Decode error rate %g exceeds maximum %g\n", + err_rate, max_error_rate); + ret = FFMPEG_ERROR_RATE_EXCEEDED; + } else if (err_rate) + av_log(ist, AV_LOG_VERBOSE, "Decode error rate %g\n", err_rate); + } + finish: - tq_receive_finish(d->queue_in, 0); - tq_send_finish (d->queue_out, 0); - - // make sure the demuxer does not get stuck waiting for audio durations - // that will never arrive - if (ifile->audio_ts_queue && ist->dec->type == AVMEDIA_TYPE_AUDIO) - av_thread_message_queue_set_err_recv(ifile->audio_ts_queue, AVERROR_EOF); - dec_thread_uninit(&dt); - av_log(ist, AV_LOG_VERBOSE, "Terminating decoder thread\n"); - return (void*)(intptr_t)ret; } -int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof) -{ - Decoder *d = ist->decoder; - int ret = 0, thread_ret; - - // thread already joined - if (!d->queue_in) - return AVERROR_EOF; - - // send the packet/flush request/EOF to the decoder thread - if (pkt || no_eof) { - av_packet_unref(d->pkt); - - if (pkt) { - ret = av_packet_ref(d->pkt, pkt); - if (ret < 0) - goto finish; - } - - ret = tq_send(d->queue_in, 0, d->pkt); - if (ret < 0) - goto finish; - } else - tq_send_finish(d->queue_in, 0); - - // retrieve all decoded data for the packet - while (1) { - int dummy; - - ret = tq_receive(d->queue_out, &dummy, d->frame); - if (ret < 0) - goto finish; - - // packet fully processed - if (!d->frame->buf[0]) - return 0; - - // process the decoded frame - if (ist->dec->type == AVMEDIA_TYPE_SUBTITLE) { - ret = process_subtitle(ist, d->frame); - } else { - ret = send_frame_to_filters(ist, d->frame); - } - av_frame_unref(d->frame); - if (ret < 0) - goto finish; - } - -finish: - thread_ret = dec_thread_stop(d); - if (thread_ret < 0) { - av_log(ist, AV_LOG_ERROR, "Decoder thread returned error: %s\n", - av_err2str(thread_ret)); - ret = err_merge(ret, thread_ret); - } - // non-EOF errors here are all fatal - if (ret < 0 && ret != AVERROR_EOF) - return ret; - - // signal EOF to our downstreams - ret = send_filter_eof(ist); - if (ret < 0) { - av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n"); - return ret; - } - - return AVERROR_EOF; -} - -static int dec_thread_start(InputStream *ist) -{ - Decoder *d = ist->decoder; - ObjPool *op; - int ret = 0; - - op = objpool_alloc_packets(); - if (!op) - return AVERROR(ENOMEM); - - d->queue_in = tq_alloc(1, 1, op, pkt_move); - if (!d->queue_in) { - objpool_free(&op); - return AVERROR(ENOMEM); - } - - op = objpool_alloc_frames(); - if (!op) - goto fail; - - d->queue_out = tq_alloc(1, 4, op, frame_move); - if (!d->queue_out) { - objpool_free(&op); - goto fail; - } - - ret = pthread_create(&d->thread, NULL, decoder_thread, ist); - if (ret) { - ret = AVERROR(ret); - av_log(ist, AV_LOG_ERROR, "pthread_create() failed: %s\n", - av_err2str(ret)); - goto fail; - } - - return 0; -fail: - if (ret >= 0) - ret = AVERROR(ENOMEM); - - tq_free(&d->queue_in); - tq_free(&d->queue_out); - return ret; -} - static enum AVPixelFormat get_format(AVCodecContext *s, const enum AVPixelFormat *pix_fmts) { InputStream *ist = s->opaque; @@ -1118,12 +950,5 @@ int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx) if (ret < 0) return ret; - ret = dec_thread_start(ist); - if (ret < 0) { - av_log(ist, AV_LOG_ERROR, "Error starting decoder thread: %s\n", - av_err2str(ret)); - return ret; - } - return 0; } diff --git a/fftools/ffmpeg_demux.c b/fftools/ffmpeg_demux.c index 2234dbe076..91cd7a1125 100644 --- a/fftools/ffmpeg_demux.c +++ b/fftools/ffmpeg_demux.c @@ -22,8 +22,6 @@ #include "ffmpeg.h" #include "ffmpeg_sched.h" #include "ffmpeg_utils.h" -#include "objpool.h" -#include "thread_queue.h" #include "libavutil/avassert.h" #include "libavutil/avstring.h" @@ -35,7 +33,6 @@ #include "libavutil/pixdesc.h" #include "libavutil/time.h" #include "libavutil/timestamp.h" -#include "libavutil/thread.h" #include "libavcodec/packet.h" @@ -66,7 +63,11 @@ typedef struct DemuxStream { double ts_scale; + // scheduler returned EOF for this stream + int finished; + int streamcopy_needed; + int have_sub2video; int wrap_correction_done; int saw_first_ts; @@ -101,6 +102,7 @@ typedef struct Demuxer { /* number of times input stream should be looped */ int loop; + int have_audio_dec; /* duration of the looped segment of the input file */ Timestamp duration; /* pts with the smallest/largest values ever seen */ @@ -113,11 +115,12 @@ typedef struct Demuxer { double readrate_initial_burst; Scheduler *sch; - ThreadQueue *thread_queue; - int thread_queue_size; - pthread_t thread; + + AVPacket *pkt_heartbeat; int read_started; + int nb_streams_used; + int nb_streams_finished; } Demuxer; static DemuxStream *ds_from_ist(InputStream *ist) @@ -153,7 +156,7 @@ static void report_new_stream(Demuxer *d, const AVPacket *pkt) d->nb_streams_warn = pkt->stream_index + 1; } -static int seek_to_start(Demuxer *d) +static int seek_to_start(Demuxer *d, Timestamp end_pts) { InputFile *ifile = &d->f; AVFormatContext *is = ifile->ctx; @@ -163,21 +166,10 @@ static int seek_to_start(Demuxer *d) if (ret < 0) return ret; - if (ifile->audio_ts_queue_size) { - int got_ts = 0; - - while (got_ts < ifile->audio_ts_queue_size) { - Timestamp ts; - ret = av_thread_message_queue_recv(ifile->audio_ts_queue, &ts, 0); - if (ret < 0) - return ret; - got_ts++; - - if (d->max_pts.ts == AV_NOPTS_VALUE || - av_compare_ts(d->max_pts.ts, d->max_pts.tb, ts.ts, ts.tb) < 0) - d->max_pts = ts; - } - } + if (end_pts.ts != AV_NOPTS_VALUE && + (d->max_pts.ts == AV_NOPTS_VALUE || + av_compare_ts(d->max_pts.ts, d->max_pts.tb, end_pts.ts, end_pts.tb) < 0)) + d->max_pts = end_pts; if (d->max_pts.ts != AV_NOPTS_VALUE) { int64_t min_pts = d->min_pts.ts == AV_NOPTS_VALUE ? 0 : d->min_pts.ts; @@ -404,7 +396,7 @@ static int ts_fixup(Demuxer *d, AVPacket *pkt) duration = av_rescale_q(d->duration.ts, d->duration.tb, pkt->time_base); if (pkt->pts != AV_NOPTS_VALUE) { // audio decoders take precedence for estimating total file duration - int64_t pkt_duration = ifile->audio_ts_queue_size ? 0 : pkt->duration; + int64_t pkt_duration = d->have_audio_dec ? 0 : pkt->duration; pkt->pts += duration; @@ -440,7 +432,7 @@ static int ts_fixup(Demuxer *d, AVPacket *pkt) return 0; } -static int input_packet_process(Demuxer *d, AVPacket *pkt) +static int input_packet_process(Demuxer *d, AVPacket *pkt, unsigned *send_flags) { InputFile *f = &d->f; InputStream *ist = f->streams[pkt->stream_index]; @@ -451,6 +443,16 @@ static int input_packet_process(Demuxer *d, AVPacket *pkt) if (ret < 0) return ret; + if (f->recording_time != INT64_MAX) { + int64_t start_time = 0; + if (copy_ts) { + start_time += f->start_time != AV_NOPTS_VALUE ? f->start_time : 0; + start_time += start_at_zero ? 0 : f->start_time_effective; + } + if (ds->dts >= f->recording_time + start_time) + *send_flags |= DEMUX_SEND_STREAMCOPY_EOF; + } + ds->data_size += pkt->size; ds->nb_packets++; @@ -465,6 +467,8 @@ static int input_packet_process(Demuxer *d, AVPacket *pkt) av_ts2timestr(input_files[ist->file_index]->ts_offset, &AV_TIME_BASE_Q)); } + pkt->stream_index = ds->sch_idx_stream; + return 0; } @@ -488,6 +492,65 @@ static void readrate_sleep(Demuxer *d) } } +static int do_send(Demuxer *d, DemuxStream *ds, AVPacket *pkt, unsigned flags, + const char *pkt_desc) +{ + int ret; + + ret = sch_demux_send(d->sch, d->f.index, pkt, flags); + if (ret == AVERROR_EOF) { + av_packet_unref(pkt); + + av_log(ds, AV_LOG_VERBOSE, "All consumers of this stream are done\n"); + ds->finished = 1; + + if (++d->nb_streams_finished == d->nb_streams_used) { + av_log(d, AV_LOG_VERBOSE, "All consumers are done\n"); + return AVERROR_EOF; + } + } else if (ret < 0) { + if (ret != AVERROR_EXIT) + av_log(d, AV_LOG_ERROR, + "Unable to send %s packet to consumers: %s\n", + pkt_desc, av_err2str(ret)); + return ret; + } + + return 0; +} + +static int demux_send(Demuxer *d, DemuxStream *ds, AVPacket *pkt, unsigned flags) +{ + InputFile *f = &d->f; + int ret; + + // send heartbeat for sub2video streams + if (d->pkt_heartbeat && pkt->pts != AV_NOPTS_VALUE) { + for (int i = 0; i < f->nb_streams; i++) { + DemuxStream *ds1 = ds_from_ist(f->streams[i]); + + if (ds1->finished || !ds1->have_sub2video) + continue; + + d->pkt_heartbeat->pts = pkt->pts; + d->pkt_heartbeat->time_base = pkt->time_base; + d->pkt_heartbeat->stream_index = ds1->sch_idx_stream; + d->pkt_heartbeat->opaque = (void*)(intptr_t)PKT_OPAQUE_SUB_HEARTBEAT; + + ret = do_send(d, ds1, d->pkt_heartbeat, 0, "heartbeat"); + if (ret < 0) + return ret; + } + } + + ret = do_send(d, ds, pkt, flags, "demuxed"); + if (ret < 0) + return ret; + + + return 0; +} + static void discard_unused_programs(InputFile *ifile) { for (int j = 0; j < ifile->ctx->nb_programs; j++) { @@ -527,9 +590,13 @@ static void *input_thread(void *arg) discard_unused_programs(f); + d->read_started = 1; d->wallclock_start = av_gettime_relative(); while (1) { + DemuxStream *ds; + unsigned send_flags = 0; + ret = av_read_frame(f->ctx, pkt); if (ret == AVERROR(EAGAIN)) { @@ -538,11 +605,13 @@ static void *input_thread(void *arg) } if (ret < 0) { if (d->loop) { - /* signal looping to the consumer thread */ + /* signal looping to our consumers */ pkt->stream_index = -1; - ret = tq_send(d->thread_queue, 0, pkt); + + ret = sch_demux_send(d->sch, f->index, pkt, 0); if (ret >= 0) - ret = seek_to_start(d); + ret = seek_to_start(d, (Timestamp){ .ts = pkt->pts, + .tb = pkt->time_base }); if (ret >= 0) continue; @@ -551,9 +620,11 @@ static void *input_thread(void *arg) if (ret == AVERROR_EOF) av_log(d, AV_LOG_VERBOSE, "EOF while reading input\n"); - else + else { av_log(d, AV_LOG_ERROR, "Error during demuxing: %s\n", av_err2str(ret)); + ret = exit_on_error ? ret : 0; + } break; } @@ -565,8 +636,9 @@ static void *input_thread(void *arg) /* the following test is needed in case new streams appear dynamically in stream : we ignore them */ - if (pkt->stream_index >= f->nb_streams || - f->streams[pkt->stream_index]->discard) { + ds = pkt->stream_index < f->nb_streams ? + ds_from_ist(f->streams[pkt->stream_index]) : NULL; + if (!ds || ds->ist.discard || ds->finished) { report_new_stream(d, pkt); av_packet_unref(pkt); continue; @@ -583,122 +655,26 @@ static void *input_thread(void *arg) } } - ret = input_packet_process(d, pkt); + ret = input_packet_process(d, pkt, &send_flags); if (ret < 0) break; if (f->readrate) readrate_sleep(d); - ret = tq_send(d->thread_queue, 0, pkt); - if (ret < 0) { - if (ret != AVERROR_EOF) - av_log(f, AV_LOG_ERROR, - "Unable to send packet to main thread: %s\n", - av_err2str(ret)); + ret = demux_send(d, ds, pkt, send_flags); + if (ret < 0) break; - } } + // EOF/EXIT is normal termination + if (ret == AVERROR_EOF || ret == AVERROR_EXIT) + ret = 0; + finish: - av_assert0(ret < 0); - tq_send_finish(d->thread_queue, 0); - av_packet_free(&pkt); - av_log(d, AV_LOG_VERBOSE, "Terminating demuxer thread\n"); - - return NULL; -} - -static void thread_stop(Demuxer *d) -{ - InputFile *f = &d->f; - - if (!d->thread_queue) - return; - - tq_receive_finish(d->thread_queue, 0); - - pthread_join(d->thread, NULL); - - tq_free(&d->thread_queue); - - av_thread_message_queue_free(&f->audio_ts_queue); -} - -static int thread_start(Demuxer *d) -{ - int ret; - InputFile *f = &d->f; - ObjPool *op; - - if (d->thread_queue_size <= 0) - d->thread_queue_size = (nb_input_files > 1 ? 8 : 1); - - op = objpool_alloc_packets(); - if (!op) - return AVERROR(ENOMEM); - - d->thread_queue = tq_alloc(1, d->thread_queue_size, op, pkt_move); - if (!d->thread_queue) { - objpool_free(&op); - return AVERROR(ENOMEM); - } - - if (d->loop) { - int nb_audio_dec = 0; - - for (int i = 0; i < f->nb_streams; i++) { - InputStream *ist = f->streams[i]; - nb_audio_dec += !!(ist->decoding_needed && - ist->st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO); - } - - if (nb_audio_dec) { - ret = av_thread_message_queue_alloc(&f->audio_ts_queue, - nb_audio_dec, sizeof(Timestamp)); - if (ret < 0) - goto fail; - f->audio_ts_queue_size = nb_audio_dec; - } - } - - if ((ret = pthread_create(&d->thread, NULL, input_thread, d))) { - av_log(d, AV_LOG_ERROR, "pthread_create failed: %s. Try to increase `ulimit -v` or decrease `ulimit -s`.\n", strerror(ret)); - ret = AVERROR(ret); - goto fail; - } - - d->read_started = 1; - - return 0; -fail: - tq_free(&d->thread_queue); - return ret; -} - -int ifile_get_packet(InputFile *f, AVPacket *pkt) -{ - Demuxer *d = demuxer_from_ifile(f); - int ret, dummy; - - if (!d->thread_queue) { - ret = thread_start(d); - if (ret < 0) - return ret; - } - - ret = tq_receive(d->thread_queue, &dummy, pkt); - if (ret < 0) - return ret; - - if (pkt->stream_index == -1) { - av_assert0(!pkt->data && !pkt->side_data_elems); - return 1; - } - - return 0; + return (void*)(intptr_t)ret; } static void demux_final_stats(Demuxer *d) @@ -769,8 +745,6 @@ void ifile_close(InputFile **pf) if (!f) return; - thread_stop(d); - if (d->read_started) demux_final_stats(d); @@ -780,6 +754,8 @@ void ifile_close(InputFile **pf) avformat_close_input(&f->ctx); + av_packet_free(&d->pkt_heartbeat); + av_freep(pf); } @@ -802,7 +778,11 @@ static int ist_use(InputStream *ist, int decoding_needed) ds->sch_idx_stream = ret; } - ist->discard = 0; + if (ist->discard) { + ist->discard = 0; + d->nb_streams_used++; + } + ist->st->discard = ist->user_set_discard; ist->decoding_needed |= decoding_needed; ds->streamcopy_needed |= !decoding_needed; @@ -823,6 +803,8 @@ static int ist_use(InputStream *ist, int decoding_needed) ret = dec_open(ist, d->sch, ds->sch_idx_dec); if (ret < 0) return ret; + + d->have_audio_dec |= is_audio; } return 0; @@ -848,6 +830,7 @@ int ist_output_add(InputStream *ist, OutputStream *ost) int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple) { + Demuxer *d = demuxer_from_ifile(input_files[ist->file_index]); DemuxStream *ds = ds_from_ist(ist); int ret; @@ -866,6 +849,15 @@ int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple) if (ret < 0) return ret; + if (ist->dec_ctx->codec_type == AVMEDIA_TYPE_SUBTITLE) { + if (!d->pkt_heartbeat) { + d->pkt_heartbeat = av_packet_alloc(); + if (!d->pkt_heartbeat) + return AVERROR(ENOMEM); + } + ds->have_sub2video = 1; + } + return ds->sch_idx_dec; } @@ -1607,8 +1599,6 @@ int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch) "since neither -readrate nor -re were given\n"); } - d->thread_queue_size = o->thread_queue_size; - /* Add all the streams from the given input file to the demuxer */ for (int i = 0; i < ic->nb_streams; i++) { ret = ist_add(o, d, ic->streams[i]); diff --git a/fftools/ffmpeg_enc.c b/fftools/ffmpeg_enc.c index 9871381c0e..9383b167f7 100644 --- a/fftools/ffmpeg_enc.c +++ b/fftools/ffmpeg_enc.c @@ -41,12 +41,6 @@ #include "libavformat/avformat.h" struct Encoder { - AVFrame *sq_frame; - - // packet for receiving encoded output - AVPacket *pkt; - AVFrame *sub_frame; - // combined size of all the packets received from the encoder uint64_t data_size; @@ -54,25 +48,9 @@ struct Encoder { uint64_t packets_encoded; int opened; - int finished; Scheduler *sch; unsigned sch_idx; - - pthread_t thread; - /** - * Queue for sending frames from the main thread to - * the encoder thread. - */ - ThreadQueue *queue_in; - /** - * Queue for sending encoded packets from the encoder thread - * to the main thread. - * - * An empty packet is sent to signal that a previously sent - * frame has been fully processed. - */ - ThreadQueue *queue_out; }; // data that is local to the decoder thread and not visible outside of it @@ -81,24 +59,6 @@ typedef struct EncoderThread { AVPacket *pkt; } EncoderThread; -static int enc_thread_stop(Encoder *e) -{ - void *ret; - - if (!e->queue_in) - return 0; - - tq_send_finish(e->queue_in, 0); - tq_receive_finish(e->queue_out, 0); - - pthread_join(e->thread, &ret); - - tq_free(&e->queue_in); - tq_free(&e->queue_out); - - return (int)(intptr_t)ret; -} - void enc_free(Encoder **penc) { Encoder *enc = *penc; @@ -106,13 +66,6 @@ void enc_free(Encoder **penc) if (!enc) return; - enc_thread_stop(enc); - - av_frame_free(&enc->sq_frame); - av_frame_free(&enc->sub_frame); - - av_packet_free(&enc->pkt); - av_freep(penc); } @@ -127,25 +80,12 @@ int enc_alloc(Encoder **penc, const AVCodec *codec, if (!enc) return AVERROR(ENOMEM); - if (codec->type == AVMEDIA_TYPE_SUBTITLE) { - enc->sub_frame = av_frame_alloc(); - if (!enc->sub_frame) - goto fail; - } - - enc->pkt = av_packet_alloc(); - if (!enc->pkt) - goto fail; - enc->sch = sch; enc->sch_idx = sch_idx; *penc = enc; return 0; -fail: - enc_free(&enc); - return AVERROR(ENOMEM); } static int hw_device_setup_for_encode(OutputStream *ost, AVBufferRef *frames_ref) @@ -224,52 +164,9 @@ static int set_encoder_id(OutputFile *of, OutputStream *ost) return 0; } -static int enc_thread_start(OutputStream *ost) -{ - Encoder *e = ost->enc; - ObjPool *op; - int ret = 0; - - op = objpool_alloc_frames(); - if (!op) - return AVERROR(ENOMEM); - - e->queue_in = tq_alloc(1, 1, op, frame_move); - if (!e->queue_in) { - objpool_free(&op); - return AVERROR(ENOMEM); - } - - op = objpool_alloc_packets(); - if (!op) - goto fail; - - e->queue_out = tq_alloc(1, 4, op, pkt_move); - if (!e->queue_out) { - objpool_free(&op); - goto fail; - } - - ret = pthread_create(&e->thread, NULL, encoder_thread, ost); - if (ret) { - ret = AVERROR(ret); - av_log(ost, AV_LOG_ERROR, "pthread_create() failed: %s\n", - av_err2str(ret)); - goto fail; - } - - return 0; -fail: - if (ret >= 0) - ret = AVERROR(ENOMEM); - - tq_free(&e->queue_in); - tq_free(&e->queue_out); - return ret; -} - -int enc_open(OutputStream *ost, const AVFrame *frame) +int enc_open(void *opaque, const AVFrame *frame) { + OutputStream *ost = opaque; InputStream *ist = ost->ist; Encoder *e = ost->enc; AVCodecContext *enc_ctx = ost->enc_ctx; @@ -277,6 +174,7 @@ int enc_open(OutputStream *ost, const AVFrame *frame) const AVCodec *enc = enc_ctx->codec; OutputFile *of = output_files[ost->file_index]; FrameData *fd; + int frame_samples = 0; int ret; if (e->opened) @@ -420,17 +318,8 @@ int enc_open(OutputStream *ost, const AVFrame *frame) e->opened = 1; - if (ost->sq_idx_encode >= 0) { - e->sq_frame = av_frame_alloc(); - if (!e->sq_frame) - return AVERROR(ENOMEM); - } - - if (ost->enc_ctx->frame_size) { - av_assert0(ost->sq_idx_encode >= 0); - sq_frame_samples(output_files[ost->file_index]->sq_encode, - ost->sq_idx_encode, ost->enc_ctx->frame_size); - } + if (ost->enc_ctx->frame_size) + frame_samples = ost->enc_ctx->frame_size; ret = check_avoptions(ost->encoder_opts); if (ret < 0) @@ -476,18 +365,11 @@ int enc_open(OutputStream *ost, const AVFrame *frame) if (ost->st->time_base.num <= 0 || ost->st->time_base.den <= 0) ost->st->time_base = av_add_q(ost->enc_ctx->time_base, (AVRational){0, 1}); - ret = enc_thread_start(ost); - if (ret < 0) { - av_log(ost, AV_LOG_ERROR, "Error starting encoder thread: %s\n", - av_err2str(ret)); - return ret; - } - ret = of_stream_init(of, ost); if (ret < 0) return ret; - return 0; + return frame_samples; } static int check_recording_time(OutputStream *ost, int64_t ts, AVRational tb) @@ -514,8 +396,7 @@ static int do_subtitle_out(OutputFile *of, OutputStream *ost, const AVSubtitle * av_log(ost, AV_LOG_ERROR, "Subtitle packets must have a pts\n"); return exit_on_error ? AVERROR(EINVAL) : 0; } - if (ost->finished || - (of->start_time != AV_NOPTS_VALUE && sub->pts < of->start_time)) + if ((of->start_time != AV_NOPTS_VALUE && sub->pts < of->start_time)) return 0; enc = ost->enc_ctx; @@ -579,7 +460,7 @@ static int do_subtitle_out(OutputFile *of, OutputStream *ost, const AVSubtitle * } pkt->dts = pkt->pts; - ret = tq_send(e->queue_out, 0, pkt); + ret = sch_enc_send(e->sch, e->sch_idx, pkt); if (ret < 0) { av_packet_unref(pkt); return ret; @@ -671,10 +552,13 @@ static int update_video_stats(OutputStream *ost, const AVPacket *pkt, int write_ int64_t frame_number; double ti1, bitrate, avg_bitrate; double psnr_val = -1; + int quality; - ost->quality = sd ? AV_RL32(sd) : -1; + quality = sd ? AV_RL32(sd) : -1; pict_type = sd ? sd[4] : AV_PICTURE_TYPE_NONE; + atomic_store(&ost->quality, quality); + if ((enc->flags & AV_CODEC_FLAG_PSNR) && sd && sd[5]) { // FIXME the scaling assumes 8bit double error = AV_RL64(sd + 8) / (enc->width * enc->height * 255.0 * 255.0); @@ -697,10 +581,10 @@ static int update_video_stats(OutputStream *ost, const AVPacket *pkt, int write_ frame_number = e->packets_encoded; if (vstats_version <= 1) { fprintf(vstats_file, "frame= %5"PRId64" q= %2.1f ", frame_number, - ost->quality / (float)FF_QP2LAMBDA); + quality / (float)FF_QP2LAMBDA); } else { fprintf(vstats_file, "out= %2d st= %2d frame= %5"PRId64" q= %2.1f ", ost->file_index, ost->index, frame_number, - ost->quality / (float)FF_QP2LAMBDA); + quality / (float)FF_QP2LAMBDA); } if (psnr_val >= 0) @@ -801,18 +685,11 @@ static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame, av_ts2str(pkt->duration), av_ts2timestr(pkt->duration, &enc->time_base)); } - if ((ret = trigger_fix_sub_duration_heartbeat(ost, pkt)) < 0) { - av_log(NULL, AV_LOG_ERROR, - "Subtitle heartbeat logic failed in %s! (%s)\n", - __func__, av_err2str(ret)); - return ret; - } - e->data_size += pkt->size; e->packets_encoded++; - ret = tq_send(e->queue_out, 0, pkt); + ret = sch_enc_send(e->sch, e->sch_idx, pkt); if (ret < 0) { av_packet_unref(pkt); return ret; @@ -822,50 +699,6 @@ static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame, av_assert0(0); } -static int submit_encode_frame(OutputFile *of, OutputStream *ost, - AVFrame *frame, AVPacket *pkt) -{ - Encoder *e = ost->enc; - int ret; - - if (ost->sq_idx_encode < 0) - return encode_frame(of, ost, frame, pkt); - - if (frame) { - ret = av_frame_ref(e->sq_frame, frame); - if (ret < 0) - return ret; - frame = e->sq_frame; - } - - ret = sq_send(of->sq_encode, ost->sq_idx_encode, - SQFRAME(frame)); - if (ret < 0) { - if (frame) - av_frame_unref(frame); - if (ret != AVERROR_EOF) - return ret; - } - - while (1) { - AVFrame *enc_frame = e->sq_frame; - - ret = sq_receive(of->sq_encode, ost->sq_idx_encode, - SQFRAME(enc_frame)); - if (ret == AVERROR_EOF) { - enc_frame = NULL; - } else if (ret < 0) { - return (ret == AVERROR(EAGAIN)) ? 0 : ret; - } - - ret = encode_frame(of, ost, enc_frame, pkt); - if (enc_frame) - av_frame_unref(enc_frame); - if (ret < 0) - return ret; - } -} - static int do_audio_out(OutputFile *of, OutputStream *ost, AVFrame *frame, AVPacket *pkt) { @@ -881,7 +714,7 @@ static int do_audio_out(OutputFile *of, OutputStream *ost, if (!check_recording_time(ost, frame->pts, frame->time_base)) return AVERROR_EOF; - return submit_encode_frame(of, ost, frame, pkt); + return encode_frame(of, ost, frame, pkt); } static enum AVPictureType forced_kf_apply(void *logctx, KeyframeForceCtx *kf, @@ -949,7 +782,7 @@ static int do_video_out(OutputFile *of, OutputStream *ost, } #endif - return submit_encode_frame(of, ost, in_picture, pkt); + return encode_frame(of, ost, in_picture, pkt); } static int frame_encode(OutputStream *ost, AVFrame *frame, AVPacket *pkt) @@ -958,9 +791,12 @@ static int frame_encode(OutputStream *ost, AVFrame *frame, AVPacket *pkt) enum AVMediaType type = ost->type; if (type == AVMEDIA_TYPE_SUBTITLE) { + const AVSubtitle *subtitle = frame && frame->buf[0] ? + (AVSubtitle*)frame->buf[0]->data : NULL; + // no flushing for subtitles - return frame ? - do_subtitle_out(of, ost, (AVSubtitle*)frame->buf[0]->data, pkt) : 0; + return subtitle && subtitle->num_rects ? + do_subtitle_out(of, ost, subtitle, pkt) : 0; } if (frame) { @@ -968,7 +804,7 @@ static int frame_encode(OutputStream *ost, AVFrame *frame, AVPacket *pkt) do_audio_out(of, ost, frame, pkt); } - return submit_encode_frame(of, ost, NULL, pkt); + return encode_frame(of, ost, NULL, pkt); } static void enc_thread_set_name(const OutputStream *ost) @@ -1009,24 +845,50 @@ fail: void *encoder_thread(void *arg) { OutputStream *ost = arg; - OutputFile *of = output_files[ost->file_index]; Encoder *e = ost->enc; EncoderThread et; int ret = 0, input_status = 0; + int name_set = 0; ret = enc_thread_init(&et); if (ret < 0) goto finish; - enc_thread_set_name(ost); + /* Open the subtitle encoders immediately. AVFrame-based encoders + * are opened through a callback from the scheduler once they get + * their first frame + * + * N.B.: because the callback is called from a different thread, + * enc_ctx MUST NOT be accessed before sch_enc_receive() returns + * for the first time for audio/video. */ + if (ost->type != AVMEDIA_TYPE_VIDEO && ost->type != AVMEDIA_TYPE_AUDIO) { + ret = enc_open(ost, NULL); + if (ret < 0) + goto finish; + } while (!input_status) { - int dummy; - - input_status = tq_receive(e->queue_in, &dummy, et.frame); - if (input_status < 0) + input_status = sch_enc_receive(e->sch, e->sch_idx, et.frame); + if (input_status == AVERROR_EOF) { av_log(ost, AV_LOG_VERBOSE, "Encoder thread received EOF\n"); + if (!e->opened) { + av_log(ost, AV_LOG_ERROR, "Could not open encoder before EOF\n"); + ret = AVERROR(EINVAL); + goto finish; + } + } else if (input_status < 0) { + ret = input_status; + av_log(ost, AV_LOG_ERROR, "Error receiving a frame for encoding: %s\n", + av_err2str(ret)); + goto finish; + } + + if (!name_set) { + enc_thread_set_name(ost); + name_set = 1; + } + ret = frame_encode(ost, input_status >= 0 ? et.frame : NULL, et.pkt); av_packet_unref(et.pkt); @@ -1040,15 +902,6 @@ void *encoder_thread(void *arg) av_err2str(ret)); break; } - - // signal to the consumer thread that the frame was encoded - ret = tq_send(e->queue_out, 0, et.pkt); - if (ret < 0) { - if (ret != AVERROR_EOF) - av_log(ost, AV_LOG_ERROR, - "Error communicating with the main thread\n"); - break; - } } // EOF is normal thread termination @@ -1056,118 +909,7 @@ void *encoder_thread(void *arg) ret = 0; finish: - if (ost->sq_idx_encode >= 0) - sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL)); - - tq_receive_finish(e->queue_in, 0); - tq_send_finish (e->queue_out, 0); - enc_thread_uninit(&et); - av_log(ost, AV_LOG_VERBOSE, "Terminating encoder thread\n"); - return (void*)(intptr_t)ret; } - -int enc_frame(OutputStream *ost, AVFrame *frame) -{ - OutputFile *of = output_files[ost->file_index]; - Encoder *e = ost->enc; - int ret, thread_ret; - - ret = enc_open(ost, frame); - if (ret < 0) - return ret; - - if (!e->queue_in) - return AVERROR_EOF; - - // send the frame/EOF to the encoder thread - if (frame) { - ret = tq_send(e->queue_in, 0, frame); - if (ret < 0) - goto finish; - } else - tq_send_finish(e->queue_in, 0); - - // retrieve all encoded data for the frame - while (1) { - int dummy; - - ret = tq_receive(e->queue_out, &dummy, e->pkt); - if (ret < 0) - break; - - // frame fully encoded - if (!e->pkt->data && !e->pkt->side_data_elems) - return 0; - - // process the encoded packet - ret = of_output_packet(of, ost, e->pkt); - if (ret < 0) - goto finish; - } - -finish: - thread_ret = enc_thread_stop(e); - if (thread_ret < 0) { - av_log(ost, AV_LOG_ERROR, "Encoder thread returned error: %s\n", - av_err2str(thread_ret)); - ret = err_merge(ret, thread_ret); - } - - if (ret < 0 && ret != AVERROR_EOF) - return ret; - - // signal EOF to the muxer - return of_output_packet(of, ost, NULL); -} - -int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub) -{ - Encoder *e = ost->enc; - AVFrame *f = e->sub_frame; - int ret; - - // XXX the queue for transferring data to the encoder thread runs - // on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put - // that inside the frame - // eventually, subtitles should be switched to use AVFrames natively - ret = subtitle_wrap_frame(f, sub, 1); - if (ret < 0) - return ret; - - ret = enc_frame(ost, f); - av_frame_unref(f); - - return ret; -} - -int enc_flush(void) -{ - int ret = 0; - - for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) { - OutputFile *of = output_files[ost->file_index]; - if (ost->sq_idx_encode >= 0) - sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL)); - } - - for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) { - Encoder *e = ost->enc; - AVCodecContext *enc = ost->enc_ctx; - int err; - - if (!enc || !e->opened || - (enc->codec_type != AVMEDIA_TYPE_VIDEO && enc->codec_type != AVMEDIA_TYPE_AUDIO)) - continue; - - err = enc_frame(ost, NULL); - if (err != AVERROR_EOF && ret < 0) - ret = err_merge(ret, err); - - av_assert0(!e->queue_in); - } - - return ret; -} diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c index 635b1b0b6e..ada235b084 100644 --- a/fftools/ffmpeg_filter.c +++ b/fftools/ffmpeg_filter.c @@ -21,8 +21,6 @@ #include <stdint.h> #include "ffmpeg.h" -#include "ffmpeg_utils.h" -#include "thread_queue.h" #include "libavfilter/avfilter.h" #include "libavfilter/buffersink.h" @@ -53,10 +51,11 @@ typedef struct FilterGraphPriv { // true when the filtergraph contains only meta filters // that do not modify the frame data int is_meta; + // source filters are present in the graph + int have_sources; int disable_conversions; - int nb_inputs_bound; - int nb_outputs_bound; + unsigned nb_outputs_done; const char *graph_desc; @@ -67,41 +66,6 @@ typedef struct FilterGraphPriv { Scheduler *sch; unsigned sch_idx; - - pthread_t thread; - /** - * Queue for sending frames from the main thread to the filtergraph. Has - * nb_inputs+1 streams - the first nb_inputs stream correspond to - * filtergraph inputs. Frames on those streams may have their opaque set to - * - FRAME_OPAQUE_EOF: frame contains no data, but pts+timebase of the - * EOF event for the correspondint stream. Will be immediately followed by - * this stream being send-closed. - * - FRAME_OPAQUE_SUB_HEARTBEAT: frame contains no data, but pts+timebase of - * a subtitle heartbeat event. Will only be sent for sub2video streams. - * - * The last stream is "control" - the main thread sends empty AVFrames with - * opaque set to - * - FRAME_OPAQUE_REAP_FILTERS: a request to retrieve all frame available - * from filtergraph outputs. These frames are sent to corresponding - * streams in queue_out. Finally an empty frame is sent to the control - * stream in queue_out. - * - FRAME_OPAQUE_CHOOSE_INPUT: same as above, but in case no frames are - * available the terminating empty frame's opaque will contain the index+1 - * of the filtergraph input to which more input frames should be supplied. - */ - ThreadQueue *queue_in; - /** - * Queue for sending frames from the filtergraph back to the main thread. - * Has nb_outputs+1 streams - the first nb_outputs stream correspond to - * filtergraph outputs. - * - * The last stream is "control" - see documentation for queue_in for more - * details. - */ - ThreadQueue *queue_out; - // submitting frames to filter thread returned EOF - // this only happens on thread exit, so is not per-input - int eof_in; } FilterGraphPriv; static FilterGraphPriv *fgp_from_fg(FilterGraph *fg) @@ -123,6 +87,9 @@ typedef struct FilterGraphThread { // The output index is stored in frame opaque. AVFifo *frame_queue_out; + // index of the next input to request from the scheduler + unsigned next_in; + // set to 1 after at least one frame passed through this output int got_frame; // EOF status of each input/output, as received by the thread @@ -253,9 +220,6 @@ typedef struct OutputFilterPriv { int64_t ts_offset; int64_t next_pts; FPSConvContext fps; - - // set to 1 after at least one frame passed through this output - int got_frame; } OutputFilterPriv; static OutputFilterPriv *ofp_from_ofilter(OutputFilter *ofilter) @@ -653,57 +617,6 @@ static int ifilter_has_all_input_formats(FilterGraph *fg) static void *filter_thread(void *arg); -// start the filtering thread once all inputs and outputs are bound -static int fg_thread_try_start(FilterGraphPriv *fgp) -{ - FilterGraph *fg = &fgp->fg; - ObjPool *op; - int ret = 0; - - if (fgp->nb_inputs_bound < fg->nb_inputs || - fgp->nb_outputs_bound < fg->nb_outputs) - return 0; - - op = objpool_alloc_frames(); - if (!op) - return AVERROR(ENOMEM); - - fgp->queue_in = tq_alloc(fg->nb_inputs + 1, 1, op, frame_move); - if (!fgp->queue_in) { - objpool_free(&op); - return AVERROR(ENOMEM); - } - - // at least one output is mandatory - op = objpool_alloc_frames(); - if (!op) - goto fail; - - fgp->queue_out = tq_alloc(fg->nb_outputs + 1, 1, op, frame_move); - if (!fgp->queue_out) { - objpool_free(&op); - goto fail; - } - - ret = pthread_create(&fgp->thread, NULL, filter_thread, fgp); - if (ret) { - ret = AVERROR(ret); - av_log(NULL, AV_LOG_ERROR, "pthread_create() for filtergraph %d failed: %s\n", - fg->index, av_err2str(ret)); - goto fail; - } - - return 0; -fail: - if (ret >= 0) - ret = AVERROR(ENOMEM); - - tq_free(&fgp->queue_in); - tq_free(&fgp->queue_out); - - return ret; -} - static char *describe_filter_link(FilterGraph *fg, AVFilterInOut *inout, int in) { AVFilterContext *ctx = inout->filter_ctx; @@ -729,7 +642,6 @@ static OutputFilter *ofilter_alloc(FilterGraph *fg) ofilter->graph = fg; ofp->format = -1; ofp->index = fg->nb_outputs - 1; - ofilter->last_pts = AV_NOPTS_VALUE; return ofilter; } @@ -760,10 +672,7 @@ static int ifilter_bind_ist(InputFilter *ifilter, InputStream *ist) return AVERROR(ENOMEM); } - fgp->nb_inputs_bound++; - av_assert0(fgp->nb_inputs_bound <= ifilter->graph->nb_inputs); - - return fg_thread_try_start(fgp); + return 0; } static int set_channel_layout(OutputFilterPriv *f, OutputStream *ost) @@ -902,10 +811,7 @@ int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost, if (ret < 0) return ret; - fgp->nb_outputs_bound++; - av_assert0(fgp->nb_outputs_bound <= fg->nb_outputs); - - return fg_thread_try_start(fgp); + return 0; } static InputFilter *ifilter_alloc(FilterGraph *fg) @@ -935,34 +841,6 @@ static InputFilter *ifilter_alloc(FilterGraph *fg) return ifilter; } -static int fg_thread_stop(FilterGraphPriv *fgp) -{ - void *ret; - - if (!fgp->queue_in) - return 0; - - for (int i = 0; i <= fgp->fg.nb_inputs; i++) { - InputFilterPriv *ifp = i < fgp->fg.nb_inputs ? - ifp_from_ifilter(fgp->fg.inputs[i]) : NULL; - - if (ifp) - ifp->eof = 1; - - tq_send_finish(fgp->queue_in, i); - } - - for (int i = 0; i <= fgp->fg.nb_outputs; i++) - tq_receive_finish(fgp->queue_out, i); - - pthread_join(fgp->thread, &ret); - - tq_free(&fgp->queue_in); - tq_free(&fgp->queue_out); - - return (int)(intptr_t)ret; -} - void fg_free(FilterGraph **pfg) { FilterGraph *fg = *pfg; @@ -972,8 +850,6 @@ void fg_free(FilterGraph **pfg) return; fgp = fgp_from_fg(fg); - fg_thread_stop(fgp); - avfilter_graph_free(&fg->graph); for (int j = 0; j < fg->nb_inputs; j++) { InputFilter *ifilter = fg->inputs[j]; @@ -1072,6 +948,15 @@ int fg_create(FilterGraph **pfg, char *graph_desc, Scheduler *sch) if (ret < 0) goto fail; + for (unsigned i = 0; i < graph->nb_filters; i++) { + const AVFilter *f = graph->filters[i]->filter; + if (!avfilter_filter_pad_count(f, 0) && + !(f->flags & AVFILTER_FLAG_DYNAMIC_INPUTS)) { + fgp->have_sources = 1; + break; + } + } + for (AVFilterInOut *cur = inputs; cur; cur = cur->next) { InputFilter *const ifilter = ifilter_alloc(fg); InputFilterPriv *ifp; @@ -1800,6 +1685,7 @@ static int configure_filtergraph(FilterGraph *fg, const FilterGraphThread *fgt) AVBufferRef *hw_device; AVFilterInOut *inputs, *outputs, *cur; int ret, i, simple = filtergraph_is_simple(fg); + int have_input_eof = 0; const char *graph_desc = fgp->graph_desc; cleanup_filtergraph(fg); @@ -1922,11 +1808,18 @@ static int configure_filtergraph(FilterGraph *fg, const FilterGraphThread *fgt) ret = av_buffersrc_add_frame(ifp->filter, NULL); if (ret < 0) goto fail; + have_input_eof = 1; } } - return 0; + if (have_input_eof) { + // make sure the EOF propagates to the end of the graph + ret = avfilter_graph_request_oldest(fg->graph); + if (ret < 0 && ret != AVERROR(EAGAIN) && ret != AVERROR_EOF) + goto fail; + } + return 0; fail: cleanup_filtergraph(fg); return ret; @@ -2182,7 +2075,7 @@ static void video_sync_process(OutputFilterPriv *ofp, AVFrame *frame, fps->frames_prev_hist[2]); if (!*nb_frames && fps->last_dropped) { - ofilter->nb_frames_drop++; + atomic_fetch_add(&ofilter->nb_frames_drop, 1); fps->last_dropped++; } @@ -2260,21 +2153,23 @@ finish: fps->frames_prev_hist[0] = *nb_frames_prev; if (*nb_frames_prev == 0 && fps->last_dropped) { - ofilter->nb_frames_drop++; + atomic_fetch_add(&ofilter->nb_frames_drop, 1); av_log(ost, AV_LOG_VERBOSE, "*** dropping frame %"PRId64" at ts %"PRId64"\n", fps->frame_number, fps->last_frame->pts); } if (*nb_frames > (*nb_frames_prev && fps->last_dropped) + (*nb_frames > *nb_frames_prev)) { + uint64_t nb_frames_dup; if (*nb_frames > dts_error_threshold * 30) { av_log(ost, AV_LOG_ERROR, "%"PRId64" frame duplication too large, skipping\n", *nb_frames - 1); - ofilter->nb_frames_drop++; + atomic_fetch_add(&ofilter->nb_frames_drop, 1); *nb_frames = 0; return; } - ofilter->nb_frames_dup += *nb_frames - (*nb_frames_prev && fps->last_dropped) - (*nb_frames > *nb_frames_prev); + nb_frames_dup = atomic_fetch_add(&ofilter->nb_frames_dup, + *nb_frames - (*nb_frames_prev && fps->last_dropped) - (*nb_frames > *nb_frames_prev)); av_log(ost, AV_LOG_VERBOSE, "*** %"PRId64" dup!\n", *nb_frames - 1); - if (ofilter->nb_frames_dup > fps->dup_warning) { + if (nb_frames_dup > fps->dup_warning) { av_log(ost, AV_LOG_WARNING, "More than %"PRIu64" frames duplicated\n", fps->dup_warning); fps->dup_warning *= 10; } @@ -2284,8 +2179,57 @@ finish: fps->dropped_keyframe |= fps->last_dropped && (frame->flags & AV_FRAME_FLAG_KEY); } +static int close_output(OutputFilterPriv *ofp, FilterGraphThread *fgt) +{ + FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph); + int ret; + + // we are finished and no frames were ever seen at this output, + // at least initialize the encoder with a dummy frame + if (!fgt->got_frame) { + AVFrame *frame = fgt->frame; + FrameData *fd; + + frame->time_base = ofp->tb_out; + frame->format = ofp->format; + + frame->width = ofp->width; + frame->height = ofp->height; + frame->sample_aspect_ratio = ofp->sample_aspect_ratio; + + frame->sample_rate = ofp->sample_rate; + if (ofp->ch_layout.nb_channels) { + ret = av_channel_layout_copy(&frame->ch_layout, &ofp->ch_layout); + if (ret < 0) + return ret; + } + + fd = frame_data(frame); + if (!fd) + return AVERROR(ENOMEM); + + fd->frame_rate_filter = ofp->fps.framerate; + + av_assert0(!frame->buf[0]); + + av_log(ofp->ofilter.ost, AV_LOG_WARNING, + "No filtered frames for output stream, trying to " + "initialize anyway.\n"); + + ret = sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, frame); + if (ret < 0) { + av_frame_unref(frame); + return ret; + } + } + + fgt->eof_out[ofp->index] = 1; + + return sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, NULL); +} + static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt, - AVFrame *frame, int buffer) + AVFrame *frame) { FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph); AVFrame *frame_prev = ofp->fps.last_frame; @@ -2332,28 +2276,17 @@ static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt, frame_out = frame; } - if (buffer) { - AVFrame *f = av_frame_alloc(); - - if (!f) { - av_frame_unref(frame_out); - return AVERROR(ENOMEM); - } - - av_frame_move_ref(f, frame_out); - f->opaque = (void*)(intptr_t)ofp->index; - - ret = av_fifo_write(fgt->frame_queue_out, &f, 1); - if (ret < 0) { - av_frame_free(&f); - return AVERROR(ENOMEM); - } - } else { - // return the frame to the main thread - ret = tq_send(fgp->queue_out, ofp->index, frame_out); + { + // send the frame to consumers + ret = sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, frame_out); if (ret < 0) { av_frame_unref(frame_out); - fgt->eof_out[ofp->index] = 1; + + if (!fgt->eof_out[ofp->index]) { + fgt->eof_out[ofp->index] = 1; + fgp->nb_outputs_done++; + } + return ret == AVERROR_EOF ? 0 : ret; } } @@ -2374,16 +2307,14 @@ static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt, av_frame_move_ref(frame_prev, frame); } - if (!frame) { - tq_send_finish(fgp->queue_out, ofp->index); - fgt->eof_out[ofp->index] = 1; - } + if (!frame) + return close_output(ofp, fgt); return 0; } static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt, - AVFrame *frame, int buffer) + AVFrame *frame) { FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph); OutputStream *ost = ofp->ofilter.ost; @@ -2393,8 +2324,8 @@ static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt, ret = av_buffersink_get_frame_flags(filter, frame, AV_BUFFERSINK_FLAG_NO_REQUEST); - if (ret == AVERROR_EOF && !buffer && !fgt->eof_out[ofp->index]) { - ret = fg_output_frame(ofp, fgt, NULL, buffer); + if (ret == AVERROR_EOF && !fgt->eof_out[ofp->index]) { + ret = fg_output_frame(ofp, fgt, NULL); return (ret < 0) ? ret : 1; } else if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) { return 1; @@ -2448,7 +2379,7 @@ static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt, fd->frame_rate_filter = ofp->fps.framerate; } - ret = fg_output_frame(ofp, fgt, frame, buffer); + ret = fg_output_frame(ofp, fgt, frame); av_frame_unref(frame); if (ret < 0) return ret; @@ -2456,44 +2387,68 @@ static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt, return 0; } -/* retrieve all frames available at filtergraph outputs and either send them to - * the main thread (buffer=0) or buffer them for later (buffer=1) */ +/* retrieve all frames available at filtergraph outputs + * and send them to consumers */ static int read_frames(FilterGraph *fg, FilterGraphThread *fgt, - AVFrame *frame, int buffer) + AVFrame *frame) { FilterGraphPriv *fgp = fgp_from_fg(fg); - int ret = 0; + int did_step = 0; - if (!fg->graph) - return 0; - - // process buffered frames - if (!buffer) { - AVFrame *f; - - while (av_fifo_read(fgt->frame_queue_out, &f, 1) >= 0) { - int out_idx = (intptr_t)f->opaque; - f->opaque = NULL; - ret = tq_send(fgp->queue_out, out_idx, f); - av_frame_free(&f); - if (ret < 0 && ret != AVERROR_EOF) - return ret; + // graph not configured, just select the input to request + if (!fg->graph) { + for (int i = 0; i < fg->nb_inputs; i++) { + InputFilterPriv *ifp = ifp_from_ifilter(fg->inputs[i]); + if (ifp->format < 0 && !fgt->eof_in[i]) { + fgt->next_in = i; + return 0; + } } + + // This state - graph is not configured, but all inputs are either + // initialized or EOF - should be unreachable because sending EOF to a + // filter without even a fallback format should fail + av_assert0(0); + return AVERROR_BUG; } - /* Reap all buffers present in the buffer sinks */ - for (int i = 0; i < fg->nb_outputs; i++) { - OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]); - int ret = 0; + while (fgp->nb_outputs_done < fg->nb_outputs) { + int ret; - while (!ret) { - ret = fg_output_step(ofp, fgt, frame, buffer); - if (ret < 0) - return ret; + ret = avfilter_graph_request_oldest(fg->graph); + if (ret == AVERROR(EAGAIN)) { + fgt->next_in = choose_input(fg, fgt); + break; + } else if (ret < 0) { + if (ret == AVERROR_EOF) + av_log(fg, AV_LOG_VERBOSE, "Filtergraph returned EOF, finishing\n"); + else + av_log(fg, AV_LOG_ERROR, + "Error requesting a frame from the filtergraph: %s\n", + av_err2str(ret)); + return ret; } - } + fgt->next_in = fg->nb_inputs; - return 0; + // return after one iteration, so that scheduler can rate-control us + if (did_step && fgp->have_sources) + return 0; + + /* Reap all buffers present in the buffer sinks */ + for (int i = 0; i < fg->nb_outputs; i++) { + OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]); + + ret = 0; + while (!ret) { + ret = fg_output_step(ofp, fgt, frame); + if (ret < 0) + return ret; + } + } + did_step = 1; + }; + + return (fgp->nb_outputs_done == fg->nb_outputs) ? AVERROR_EOF : 0; } static void sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb) @@ -2571,6 +2526,9 @@ static int send_eof(FilterGraphThread *fgt, InputFilter *ifilter, InputFilterPriv *ifp = ifp_from_ifilter(ifilter); int ret; + if (fgt->eof_in[ifp->index]) + return 0; + fgt->eof_in[ifp->index] = 1; if (ifp->filter) { @@ -2672,7 +2630,7 @@ static int send_frame(FilterGraph *fg, FilterGraphThread *fgt, return ret; } - ret = fg->graph ? read_frames(fg, fgt, tmp, 1) : 0; + ret = fg->graph ? read_frames(fg, fgt, tmp) : 0; av_frame_free(&tmp); if (ret < 0) return ret; @@ -2705,82 +2663,6 @@ static int send_frame(FilterGraph *fg, FilterGraphThread *fgt, return 0; } -static int msg_process(FilterGraphPriv *fgp, FilterGraphThread *fgt, - AVFrame *frame) -{ - const enum FrameOpaque msg = (intptr_t)frame->opaque; - FilterGraph *fg = &fgp->fg; - int graph_eof = 0; - int ret; - - frame->opaque = NULL; - av_assert0(msg > 0); - av_assert0(msg == FRAME_OPAQUE_SEND_COMMAND || !frame->buf[0]); - - if (!fg->graph) { - // graph not configured yet, ignore all messages other than choosing - // the input to read from - if (msg != FRAME_OPAQUE_CHOOSE_INPUT) { - av_frame_unref(frame); - goto done; - } - - for (int i = 0; i < fg->nb_inputs; i++) { - InputFilter *ifilter = fg->inputs[i]; - InputFilterPriv *ifp = ifp_from_ifilter(ifilter); - if (ifp->format < 0 && !fgt->eof_in[i]) { - frame->opaque = (void*)(intptr_t)(i + 1); - goto done; - } - } - - // This state - graph is not configured, but all inputs are either - // initialized or EOF - should be unreachable because sending EOF to a - // filter without even a fallback format should fail - av_assert0(0); - return AVERROR_BUG; - } - - if (msg == FRAME_OPAQUE_SEND_COMMAND) { - FilterCommand *fc = (FilterCommand*)frame->buf[0]->data; - send_command(fg, fc->time, fc->target, fc->command, fc->arg, fc->all_filters); - av_frame_unref(frame); - goto done; - } - - if (msg == FRAME_OPAQUE_CHOOSE_INPUT) { - ret = avfilter_graph_request_oldest(fg->graph); - - graph_eof = ret == AVERROR_EOF; - - if (ret == AVERROR(EAGAIN)) { - frame->opaque = (void*)(intptr_t)(choose_input(fg, fgt) + 1); - goto done; - } else if (ret < 0 && !graph_eof) - return ret; - } - - ret = read_frames(fg, fgt, frame, 0); - if (ret < 0) { - av_log(fg, AV_LOG_ERROR, "Error sending filtered frames for encoding\n"); - return ret; - } - - if (graph_eof) - return AVERROR_EOF; - - // signal to the main thread that we are done processing the message -done: - ret = tq_send(fgp->queue_out, fg->nb_outputs, frame); - if (ret < 0) { - if (ret != AVERROR_EOF) - av_log(fg, AV_LOG_ERROR, "Error communicating with the main thread\n"); - return ret; - } - - return 0; -} - static void fg_thread_set_name(const FilterGraph *fg) { char name[16]; @@ -2867,294 +2749,94 @@ static void *filter_thread(void *arg) InputFilter *ifilter; InputFilterPriv *ifp; enum FrameOpaque o; - int input_idx, eof_frame; + unsigned input_idx = fgt.next_in; - input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame); - if (input_idx < 0 || - (input_idx == fg->nb_inputs && input_status < 0)) { + input_status = sch_filter_receive(fgp->sch, fgp->sch_idx, + &input_idx, fgt.frame); + if (input_status == AVERROR_EOF) { av_log(fg, AV_LOG_VERBOSE, "Filtering thread received EOF\n"); break; + } else if (input_status == AVERROR(EAGAIN)) { + // should only happen when we didn't request any input + av_assert0(input_idx == fg->nb_inputs); + goto read_frames; } + av_assert0(input_status >= 0); + + o = (intptr_t)fgt.frame->opaque; o = (intptr_t)fgt.frame->opaque; // message on the control stream if (input_idx == fg->nb_inputs) { - ret = msg_process(fgp, &fgt, fgt.frame); - if (ret < 0) - goto finish; + FilterCommand *fc; + av_assert0(o == FRAME_OPAQUE_SEND_COMMAND && fgt.frame->buf[0]); + + fc = (FilterCommand*)fgt.frame->buf[0]->data; + send_command(fg, fc->time, fc->target, fc->command, fc->arg, + fc->all_filters); + av_frame_unref(fgt.frame); continue; } // we received an input frame or EOF ifilter = fg->inputs[input_idx]; ifp = ifp_from_ifilter(ifilter); - eof_frame = input_status >= 0 && o == FRAME_OPAQUE_EOF; + if (ifp->type_src == AVMEDIA_TYPE_SUBTITLE) { int hb_frame = input_status >= 0 && o == FRAME_OPAQUE_SUB_HEARTBEAT; ret = sub2video_frame(ifilter, (fgt.frame->buf[0] || hb_frame) ? fgt.frame : NULL); - } else if (input_status >= 0 && fgt.frame->buf[0]) { + } else if (fgt.frame->buf[0]) { ret = send_frame(fg, &fgt, ifilter, fgt.frame); } else { - int64_t pts = input_status >= 0 ? fgt.frame->pts : AV_NOPTS_VALUE; - AVRational tb = input_status >= 0 ? fgt.frame->time_base : (AVRational){ 1, 1 }; - ret = send_eof(&fgt, ifilter, pts, tb); + av_assert1(o == FRAME_OPAQUE_EOF); + ret = send_eof(&fgt, ifilter, fgt.frame->pts, fgt.frame->time_base); } av_frame_unref(fgt.frame); if (ret < 0) + goto finish; + +read_frames: + // retrieve all newly avalable frames + ret = read_frames(fg, &fgt, fgt.frame); + if (ret == AVERROR_EOF) { + av_log(fg, AV_LOG_VERBOSE, "All consumers returned EOF\n"); break; - - if (eof_frame) { - // an EOF frame is immediately followed by sender closing - // the corresponding stream, so retrieve that event - input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame); - av_assert0(input_status == AVERROR_EOF && input_idx == ifp->index); - } - - // signal to the main thread that we are done - ret = tq_send(fgp->queue_out, fg->nb_outputs, fgt.frame); - if (ret < 0) { - if (ret == AVERROR_EOF) - break; - - av_log(fg, AV_LOG_ERROR, "Error communicating with the main thread\n"); + } else if (ret < 0) { + av_log(fg, AV_LOG_ERROR, "Error sending frames to consumers: %s\n", + av_err2str(ret)); goto finish; } } + for (unsigned i = 0; i < fg->nb_outputs; i++) { + OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]); + + if (fgt.eof_out[i]) + continue; + + ret = fg_output_frame(ofp, &fgt, NULL); + if (ret < 0) + goto finish; + } + finish: // EOF is normal termination if (ret == AVERROR_EOF) ret = 0; - for (int i = 0; i <= fg->nb_inputs; i++) - tq_receive_finish(fgp->queue_in, i); - for (int i = 0; i <= fg->nb_outputs; i++) - tq_send_finish(fgp->queue_out, i); - fg_thread_uninit(&fgt); - av_log(fg, AV_LOG_VERBOSE, "Terminating filtering thread\n"); - return (void*)(intptr_t)ret; } -static int thread_send_frame(FilterGraphPriv *fgp, InputFilter *ifilter, - AVFrame *frame, enum FrameOpaque type) -{ - InputFilterPriv *ifp = ifp_from_ifilter(ifilter); - int output_idx, ret; - - if (ifp->eof) { - av_frame_unref(frame); - return AVERROR_EOF; - } - - frame->opaque = (void*)(intptr_t)type; - - ret = tq_send(fgp->queue_in, ifp->index, frame); - if (ret < 0) { - ifp->eof = 1; - av_frame_unref(frame); - return ret; - } - - if (type == FRAME_OPAQUE_EOF) - tq_send_finish(fgp->queue_in, ifp->index); - - // wait for the frame to be processed - ret = tq_receive(fgp->queue_out, &output_idx, frame); - av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF); - - return ret; -} - -int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference) -{ - FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph); - int ret; - - if (keep_reference) { - ret = av_frame_ref(fgp->frame, frame); - if (ret < 0) - return ret; - } else - av_frame_move_ref(fgp->frame, frame); - - return thread_send_frame(fgp, ifilter, fgp->frame, 0); -} - -int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb) -{ - FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph); - int ret; - - fgp->frame->pts = pts; - fgp->frame->time_base = tb; - - ret = thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_EOF); - - return ret == AVERROR_EOF ? 0 : ret; -} - -void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb) -{ - FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph); - - fgp->frame->pts = pts; - fgp->frame->time_base = tb; - - thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_SUB_HEARTBEAT); -} - -int fg_transcode_step(FilterGraph *graph, InputStream **best_ist) -{ - FilterGraphPriv *fgp = fgp_from_fg(graph); - int ret, got_frames = 0; - - if (fgp->eof_in) - return AVERROR_EOF; - - // signal to the filtering thread to return all frames it can - av_assert0(!fgp->frame->buf[0]); - fgp->frame->opaque = (void*)(intptr_t)(best_ist ? - FRAME_OPAQUE_CHOOSE_INPUT : - FRAME_OPAQUE_REAP_FILTERS); - - ret = tq_send(fgp->queue_in, graph->nb_inputs, fgp->frame); - if (ret < 0) { - fgp->eof_in = 1; - goto finish; - } - - while (1) { - OutputFilter *ofilter; - OutputFilterPriv *ofp; - OutputStream *ost; - int output_idx; - - ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame); - - // EOF on the whole queue or the control stream - if (output_idx < 0 || - (ret < 0 && output_idx == graph->nb_outputs)) - goto finish; - - // EOF for a specific stream - if (ret < 0) { - ofilter = graph->outputs[output_idx]; - ofp = ofp_from_ofilter(ofilter); - - // we are finished and no frames were ever seen at this output, - // at least initialize the encoder with a dummy frame - if (!ofp->got_frame) { - AVFrame *frame = fgp->frame; - FrameData *fd; - - frame->time_base = ofp->tb_out; - frame->format = ofp->format; - - frame->width = ofp->width; - frame->height = ofp->height; - frame->sample_aspect_ratio = ofp->sample_aspect_ratio; - - frame->sample_rate = ofp->sample_rate; - if (ofp->ch_layout.nb_channels) { - ret = av_channel_layout_copy(&frame->ch_layout, &ofp->ch_layout); - if (ret < 0) - return ret; - } - - fd = frame_data(frame); - if (!fd) - return AVERROR(ENOMEM); - - fd->frame_rate_filter = ofp->fps.framerate; - - av_assert0(!frame->buf[0]); - - av_log(ofilter->ost, AV_LOG_WARNING, - "No filtered frames for output stream, trying to " - "initialize anyway.\n"); - - enc_open(ofilter->ost, frame); - av_frame_unref(frame); - } - - close_output_stream(graph->outputs[output_idx]->ost); - continue; - } - - // request was fully processed by the filtering thread, - // return the input stream to read from, if needed - if (output_idx == graph->nb_outputs) { - int input_idx = (intptr_t)fgp->frame->opaque - 1; - av_assert0(input_idx <= graph->nb_inputs); - - if (best_ist) { - *best_ist = (input_idx >= 0 && input_idx < graph->nb_inputs) ? - ifp_from_ifilter(graph->inputs[input_idx])->ist : NULL; - - if (input_idx < 0 && !got_frames) { - for (int i = 0; i < graph->nb_outputs; i++) - graph->outputs[i]->ost->unavailable = 1; - } - } - break; - } - - // got a frame from the filtering thread, send it for encoding - ofilter = graph->outputs[output_idx]; - ost = ofilter->ost; - ofp = ofp_from_ofilter(ofilter); - - if (ost->finished) { - av_frame_unref(fgp->frame); - tq_receive_finish(fgp->queue_out, output_idx); - continue; - } - - if (fgp->frame->pts != AV_NOPTS_VALUE) { - ofilter->last_pts = av_rescale_q(fgp->frame->pts, - fgp->frame->time_base, - AV_TIME_BASE_Q); - } - - ret = enc_frame(ost, fgp->frame); - av_frame_unref(fgp->frame); - if (ret < 0) - goto finish; - - ofp->got_frame = 1; - got_frames = 1; - } - -finish: - if (ret < 0) { - fgp->eof_in = 1; - for (int i = 0; i < graph->nb_outputs; i++) - close_output_stream(graph->outputs[i]->ost); - } - - return ret; -} - -int reap_filters(FilterGraph *fg, int flush) -{ - return fg_transcode_step(fg, NULL); -} - void fg_send_command(FilterGraph *fg, double time, const char *target, const char *command, const char *arg, int all_filters) { FilterGraphPriv *fgp = fgp_from_fg(fg); AVBufferRef *buf; FilterCommand *fc; - int output_idx, ret; - - if (!fgp->queue_in) - return; fc = av_mallocz(sizeof(*fc)); if (!fc) @@ -3180,13 +2862,5 @@ void fg_send_command(FilterGraph *fg, double time, const char *target, fgp->frame->buf[0] = buf; fgp->frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_SEND_COMMAND; - ret = tq_send(fgp->queue_in, fg->nb_inputs, fgp->frame); - if (ret < 0) { - av_frame_unref(fgp->frame); - return; - } - - // wait for the frame to be processed - ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame); - av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF); + sch_filter_command(fgp->sch, fgp->sch_idx, fgp->frame); } diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c index ef5c2f60e0..067dc65d4e 100644 --- a/fftools/ffmpeg_mux.c +++ b/fftools/ffmpeg_mux.c @@ -23,16 +23,13 @@ #include "ffmpeg.h" #include "ffmpeg_mux.h" #include "ffmpeg_utils.h" -#include "objpool.h" #include "sync_queue.h" -#include "thread_queue.h" #include "libavutil/fifo.h" #include "libavutil/intreadwrite.h" #include "libavutil/log.h" #include "libavutil/mem.h" #include "libavutil/timestamp.h" -#include "libavutil/thread.h" #include "libavcodec/packet.h" @@ -41,10 +38,9 @@ typedef struct MuxThreadContext { AVPacket *pkt; + AVPacket *fix_sub_duration_pkt; } MuxThreadContext; -int want_sdp = 1; - static Muxer *mux_from_of(OutputFile *of) { return (Muxer*)of; @@ -207,14 +203,41 @@ static int sync_queue_process(Muxer *mux, OutputStream *ost, AVPacket *pkt, int return 0; } +static int of_streamcopy(OutputStream *ost, AVPacket *pkt); + /* apply the output bitstream filters */ -static int mux_packet_filter(Muxer *mux, OutputStream *ost, - AVPacket *pkt, int *stream_eof) +static int mux_packet_filter(Muxer *mux, MuxThreadContext *mt, + OutputStream *ost, AVPacket *pkt, int *stream_eof) { MuxStream *ms = ms_from_ost(ost); const char *err_msg; int ret = 0; + if (pkt && !ost->enc) { + ret = of_streamcopy(ost, pkt); + if (ret == AVERROR(EAGAIN)) + return 0; + else if (ret == AVERROR_EOF) { + av_packet_unref(pkt); + pkt = NULL; + ret = 0; + } else if (ret < 0) + goto fail; + } + + // emit heartbeat for -fix_sub_duration; + // we are only interested in heartbeats on on random access points. + if (pkt && (pkt->flags & AV_PKT_FLAG_KEY)) { + mt->fix_sub_duration_pkt->opaque = (void*)(intptr_t)PKT_OPAQUE_FIX_SUB_DURATION; + mt->fix_sub_duration_pkt->pts = pkt->pts; + mt->fix_sub_duration_pkt->time_base = pkt->time_base; + + ret = sch_mux_sub_heartbeat(mux->sch, mux->sch_idx, ms->sch_idx, + mt->fix_sub_duration_pkt); + if (ret < 0) + goto fail; + } + if (ms->bsf_ctx) { int bsf_eof = 0; @@ -278,6 +301,7 @@ static void thread_set_name(OutputFile *of) static void mux_thread_uninit(MuxThreadContext *mt) { av_packet_free(&mt->pkt); + av_packet_free(&mt->fix_sub_duration_pkt); memset(mt, 0, sizeof(*mt)); } @@ -290,6 +314,10 @@ static int mux_thread_init(MuxThreadContext *mt) if (!mt->pkt) goto fail; + mt->fix_sub_duration_pkt = av_packet_alloc(); + if (!mt->fix_sub_duration_pkt) + goto fail; + return 0; fail: @@ -316,19 +344,22 @@ void *muxer_thread(void *arg) OutputStream *ost; int stream_idx, stream_eof = 0; - ret = tq_receive(mux->tq, &stream_idx, mt.pkt); + ret = sch_mux_receive(mux->sch, of->index, mt.pkt); + stream_idx = mt.pkt->stream_index; if (stream_idx < 0) { av_log(mux, AV_LOG_VERBOSE, "All streams finished\n"); ret = 0; break; } - ost = of->streams[stream_idx]; - ret = mux_packet_filter(mux, ost, ret < 0 ? NULL : mt.pkt, &stream_eof); + ost = of->streams[mux->sch_stream_idx[stream_idx]]; + mt.pkt->stream_index = ost->index; + + ret = mux_packet_filter(mux, &mt, ost, ret < 0 ? NULL : mt.pkt, &stream_eof); av_packet_unref(mt.pkt); if (ret == AVERROR_EOF) { if (stream_eof) { - tq_receive_finish(mux->tq, stream_idx); + sch_mux_receive_finish(mux->sch, of->index, stream_idx); } else { av_log(mux, AV_LOG_VERBOSE, "Muxer returned EOF\n"); ret = 0; @@ -343,243 +374,55 @@ void *muxer_thread(void *arg) finish: mux_thread_uninit(&mt); - for (unsigned int i = 0; i < mux->fc->nb_streams; i++) - tq_receive_finish(mux->tq, i); - - av_log(mux, AV_LOG_VERBOSE, "Terminating muxer thread\n"); - return (void*)(intptr_t)ret; } -static int thread_submit_packet(Muxer *mux, OutputStream *ost, AVPacket *pkt) -{ - int ret = 0; - - if (!pkt || ost->finished & MUXER_FINISHED) - goto finish; - - ret = tq_send(mux->tq, ost->index, pkt); - if (ret < 0) - goto finish; - - return 0; - -finish: - if (pkt) - av_packet_unref(pkt); - - ost->finished |= MUXER_FINISHED; - tq_send_finish(mux->tq, ost->index); - return ret == AVERROR_EOF ? 0 : ret; -} - -static int queue_packet(OutputStream *ost, AVPacket *pkt) -{ - MuxStream *ms = ms_from_ost(ost); - AVPacket *tmp_pkt = NULL; - int ret; - - if (!av_fifo_can_write(ms->muxing_queue)) { - size_t cur_size = av_fifo_can_read(ms->muxing_queue); - size_t pkt_size = pkt ? pkt->size : 0; - unsigned int are_we_over_size = - (ms->muxing_queue_data_size + pkt_size) > ms->muxing_queue_data_threshold; - size_t limit = are_we_over_size ? ms->max_muxing_queue_size : SIZE_MAX; - size_t new_size = FFMIN(2 * cur_size, limit); - - if (new_size <= cur_size) { - av_log(ost, AV_LOG_ERROR, - "Too many packets buffered for output stream %d:%d.\n", - ost->file_index, ost->st->index); - return AVERROR(ENOSPC); - } - ret = av_fifo_grow2(ms->muxing_queue, new_size - cur_size); - if (ret < 0) - return ret; - } - - if (pkt) { - ret = av_packet_make_refcounted(pkt); - if (ret < 0) - return ret; - - tmp_pkt = av_packet_alloc(); - if (!tmp_pkt) - return AVERROR(ENOMEM); - - av_packet_move_ref(tmp_pkt, pkt); - ms->muxing_queue_data_size += tmp_pkt->size; - } - av_fifo_write(ms->muxing_queue, &tmp_pkt, 1); - - return 0; -} - -static int submit_packet(Muxer *mux, AVPacket *pkt, OutputStream *ost) -{ - int ret; - - if (mux->tq) { - return thread_submit_packet(mux, ost, pkt); - } else { - /* the muxer is not initialized yet, buffer the packet */ - ret = queue_packet(ost, pkt); - if (ret < 0) { - if (pkt) - av_packet_unref(pkt); - return ret; - } - } - - return 0; -} - -int of_output_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) -{ - Muxer *mux = mux_from_of(of); - int ret = 0; - - if (pkt && pkt->dts != AV_NOPTS_VALUE) - ost->last_mux_dts = av_rescale_q(pkt->dts, pkt->time_base, AV_TIME_BASE_Q); - - ret = submit_packet(mux, pkt, ost); - if (ret < 0) { - av_log(ost, AV_LOG_ERROR, "Error submitting a packet to the muxer: %s", - av_err2str(ret)); - return ret; - } - - return 0; -} - -int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts) +static int of_streamcopy(OutputStream *ost, AVPacket *pkt) { OutputFile *of = output_files[ost->file_index]; MuxStream *ms = ms_from_ost(ost); + DemuxPktData *pd = pkt->opaque_ref ? (DemuxPktData*)pkt->opaque_ref->data : NULL; + int64_t dts = pd ? pd->dts_est : AV_NOPTS_VALUE; int64_t start_time = (of->start_time == AV_NOPTS_VALUE) ? 0 : of->start_time; int64_t ts_offset; - AVPacket *opkt = ms->pkt; - int ret; - - av_packet_unref(opkt); if (of->recording_time != INT64_MAX && dts >= of->recording_time + start_time) - pkt = NULL; - - // EOF: flush output bitstream filters. - if (!pkt) - return of_output_packet(of, ost, NULL); + return AVERROR_EOF; if (!ms->streamcopy_started && !(pkt->flags & AV_PKT_FLAG_KEY) && !ms->copy_initial_nonkeyframes) - return 0; + return AVERROR(EAGAIN); if (!ms->streamcopy_started) { if (!ms->copy_prior_start && (pkt->pts == AV_NOPTS_VALUE ? dts < ms->ts_copy_start : pkt->pts < av_rescale_q(ms->ts_copy_start, AV_TIME_BASE_Q, pkt->time_base))) - return 0; + return AVERROR(EAGAIN); if (of->start_time != AV_NOPTS_VALUE && dts < of->start_time) - return 0; + return AVERROR(EAGAIN); } - ret = av_packet_ref(opkt, pkt); - if (ret < 0) - return ret; - - ts_offset = av_rescale_q(start_time, AV_TIME_BASE_Q, opkt->time_base); + ts_offset = av_rescale_q(start_time, AV_TIME_BASE_Q, pkt->time_base); if (pkt->pts != AV_NOPTS_VALUE) - opkt->pts -= ts_offset; + pkt->pts -= ts_offset; if (pkt->dts == AV_NOPTS_VALUE) { - opkt->dts = av_rescale_q(dts, AV_TIME_BASE_Q, opkt->time_base); + pkt->dts = av_rescale_q(dts, AV_TIME_BASE_Q, pkt->time_base); } else if (ost->st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) { - opkt->pts = opkt->dts - ts_offset; - } - opkt->dts -= ts_offset; - - { - int ret = trigger_fix_sub_duration_heartbeat(ost, pkt); - if (ret < 0) { - av_log(NULL, AV_LOG_ERROR, - "Subtitle heartbeat logic failed in %s! (%s)\n", - __func__, av_err2str(ret)); - return ret; - } + pkt->pts = pkt->dts - ts_offset; } - ret = of_output_packet(of, ost, opkt); - if (ret < 0) - return ret; + pkt->dts -= ts_offset; ms->streamcopy_started = 1; return 0; } -static int thread_stop(Muxer *mux) -{ - void *ret; - - if (!mux || !mux->tq) - return 0; - - for (unsigned int i = 0; i < mux->fc->nb_streams; i++) - tq_send_finish(mux->tq, i); - - pthread_join(mux->thread, &ret); - - tq_free(&mux->tq); - - return (int)(intptr_t)ret; -} - -static int thread_start(Muxer *mux) -{ - AVFormatContext *fc = mux->fc; - ObjPool *op; - int ret; - - op = objpool_alloc_packets(); - if (!op) - return AVERROR(ENOMEM); - - mux->tq = tq_alloc(fc->nb_streams, mux->thread_queue_size, op, pkt_move); - if (!mux->tq) { - objpool_free(&op); - return AVERROR(ENOMEM); - } - - ret = pthread_create(&mux->thread, NULL, muxer_thread, (void*)mux); - if (ret) { - tq_free(&mux->tq); - return AVERROR(ret); - } - - /* flush the muxing queues */ - for (int i = 0; i < fc->nb_streams; i++) { - OutputStream *ost = mux->of.streams[i]; - MuxStream *ms = ms_from_ost(ost); - AVPacket *pkt; - - while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) { - ret = thread_submit_packet(mux, ost, pkt); - if (pkt) { - ms->muxing_queue_data_size -= pkt->size; - av_packet_free(&pkt); - } - if (ret < 0) - return ret; - } - } - - return 0; -} - int print_sdp(const char *filename); int print_sdp(const char *filename) @@ -590,11 +433,6 @@ int print_sdp(const char *filename) AVIOContext *sdp_pb; AVFormatContext **avc; - for (i = 0; i < nb_output_files; i++) { - if (!mux_from_of(output_files[i])->header_written) - return 0; - } - avc = av_malloc_array(nb_output_files, sizeof(*avc)); if (!avc) return AVERROR(ENOMEM); @@ -629,25 +467,17 @@ int print_sdp(const char *filename) avio_closep(&sdp_pb); } - // SDP successfully written, allow muxer threads to start - ret = 1; - fail: av_freep(&avc); return ret; } -int mux_check_init(Muxer *mux) +int mux_check_init(void *arg) { + Muxer *mux = arg; OutputFile *of = &mux->of; AVFormatContext *fc = mux->fc; - int ret, i; - - for (i = 0; i < fc->nb_streams; i++) { - OutputStream *ost = of->streams[i]; - if (!ost->initialized) - return 0; - } + int ret; ret = avformat_write_header(fc, &mux->opts); if (ret < 0) { @@ -659,27 +489,7 @@ int mux_check_init(Muxer *mux) mux->header_written = 1; av_dump_format(fc, of->index, fc->url, 1); - nb_output_dumped++; - - if (sdp_filename || want_sdp) { - ret = print_sdp(sdp_filename); - if (ret < 0) { - av_log(NULL, AV_LOG_ERROR, "Error writing the SDP.\n"); - return ret; - } else if (ret == 1) { - /* SDP is written only after all the muxers are ready, so now we - * start ALL the threads */ - for (i = 0; i < nb_output_files; i++) { - ret = thread_start(mux_from_of(output_files[i])); - if (ret < 0) - return ret; - } - } - } else { - ret = thread_start(mux_from_of(of)); - if (ret < 0) - return ret; - } + atomic_fetch_add(&nb_output_dumped, 1); return 0; } @@ -736,9 +546,10 @@ int of_stream_init(OutputFile *of, OutputStream *ost) ost->st->time_base); } - ost->initialized = 1; + if (ms->sch_idx >= 0) + return sch_mux_stream_ready(mux->sch, of->index, ms->sch_idx); - return mux_check_init(mux); + return 0; } static int check_written(OutputFile *of) @@ -852,15 +663,13 @@ int of_write_trailer(OutputFile *of) AVFormatContext *fc = mux->fc; int ret, mux_result = 0; - if (!mux->tq) { + if (!mux->header_written) { av_log(mux, AV_LOG_ERROR, "Nothing was written into output file, because " "at least one of its streams received no packets.\n"); return AVERROR(EINVAL); } - mux_result = thread_stop(mux); - ret = av_write_trailer(fc); if (ret < 0) { av_log(mux, AV_LOG_ERROR, "Error writing trailer: %s\n", av_err2str(ret)); @@ -905,13 +714,6 @@ static void ost_free(OutputStream **post) ost->logfile = NULL; } - if (ms->muxing_queue) { - AVPacket *pkt; - while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) - av_packet_free(&pkt); - av_fifo_freep2(&ms->muxing_queue); - } - avcodec_parameters_free(&ost->par_in); av_bsf_free(&ms->bsf_ctx); @@ -976,8 +778,6 @@ void of_free(OutputFile **pof) return; mux = mux_from_of(of); - thread_stop(mux); - sq_free(&of->sq_encode); sq_free(&mux->sq_mux); diff --git a/fftools/ffmpeg_mux.h b/fftools/ffmpeg_mux.h index eee2b2cb07..5d7cf3fa76 100644 --- a/fftools/ffmpeg_mux.h +++ b/fftools/ffmpeg_mux.h @@ -25,7 +25,6 @@ #include <stdint.h> #include "ffmpeg_sched.h" -#include "thread_queue.h" #include "libavformat/avformat.h" @@ -33,7 +32,6 @@ #include "libavutil/dict.h" #include "libavutil/fifo.h" -#include "libavutil/thread.h" typedef struct MuxStream { OutputStream ost; @@ -41,9 +39,6 @@ typedef struct MuxStream { // name used for logging char log_name[32]; - /* the packets are buffered here until the muxer is ready to be initialized */ - AVFifo *muxing_queue; - AVBSFContext *bsf_ctx; AVPacket *bsf_pkt; @@ -57,17 +52,6 @@ typedef struct MuxStream { int64_t max_frames; - /* - * The size of the AVPackets' buffers in queue. - * Updated when a packet is either pushed or pulled from the queue. - */ - size_t muxing_queue_data_size; - - int max_muxing_queue_size; - - /* Threshold after which max_muxing_queue_size will be in effect */ - size_t muxing_queue_data_threshold; - // timestamp from which the streamcopied streams should start, // in AV_TIME_BASE_Q; // everything before it should be discarded @@ -106,9 +90,6 @@ typedef struct Muxer { int *sch_stream_idx; int nb_sch_stream_idx; - pthread_t thread; - ThreadQueue *tq; - AVDictionary *opts; int thread_queue_size; @@ -122,10 +103,7 @@ typedef struct Muxer { AVPacket *sq_pkt; } Muxer; -/* whether we want to print an SDP, set in of_open() */ -extern int want_sdp; - -int mux_check_init(Muxer *mux); +int mux_check_init(void *arg); static MuxStream *ms_from_ost(OutputStream *ost) { diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c index 534b4379c7..6459296ab0 100644 --- a/fftools/ffmpeg_mux_init.c +++ b/fftools/ffmpeg_mux_init.c @@ -924,13 +924,6 @@ static int new_stream_audio(Muxer *mux, const OptionsContext *o, return 0; } -static int new_stream_attachment(Muxer *mux, const OptionsContext *o, - OutputStream *ost) -{ - ost->finished = 1; - return 0; -} - static int new_stream_subtitle(Muxer *mux, const OptionsContext *o, OutputStream *ost) { @@ -1168,9 +1161,6 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type, if (!ost->par_in) return AVERROR(ENOMEM); - ms->muxing_queue = av_fifo_alloc2(8, sizeof(AVPacket*), 0); - if (!ms->muxing_queue) - return AVERROR(ENOMEM); ms->last_mux_dts = AV_NOPTS_VALUE; ost->st = st; @@ -1190,7 +1180,8 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type, if (!ost->enc_ctx) return AVERROR(ENOMEM); - ret = sch_add_enc(mux->sch, encoder_thread, ost, NULL); + ret = sch_add_enc(mux->sch, encoder_thread, ost, + ost->type == AVMEDIA_TYPE_SUBTITLE ? NULL : enc_open); if (ret < 0) return ret; ms->sch_idx_enc = ret; @@ -1414,9 +1405,6 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type, sch_mux_stream_buffering(mux->sch, mux->sch_idx, ms->sch_idx, max_muxing_queue_size, muxing_queue_data_threshold); - - ms->max_muxing_queue_size = max_muxing_queue_size; - ms->muxing_queue_data_threshold = muxing_queue_data_threshold; } MATCH_PER_STREAM_OPT(bits_per_raw_sample, i, ost->bits_per_raw_sample, @@ -1434,8 +1422,6 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type, if (ost->enc_ctx && av_get_exact_bits_per_sample(ost->enc_ctx->codec_id) == 24) av_dict_set(&ost->swr_opts, "output_sample_bits", "24", 0); - ost->last_mux_dts = AV_NOPTS_VALUE; - MATCH_PER_STREAM_OPT(copy_initial_nonkeyframes, i, ms->copy_initial_nonkeyframes, oc, st); @@ -1443,7 +1429,6 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type, case AVMEDIA_TYPE_VIDEO: ret = new_stream_video (mux, o, ost); break; case AVMEDIA_TYPE_AUDIO: ret = new_stream_audio (mux, o, ost); break; case AVMEDIA_TYPE_SUBTITLE: ret = new_stream_subtitle (mux, o, ost); break; - case AVMEDIA_TYPE_ATTACHMENT: ret = new_stream_attachment(mux, o, ost); break; } if (ret < 0) return ret; @@ -1938,7 +1923,6 @@ static int setup_sync_queues(Muxer *mux, AVFormatContext *oc, int64_t buf_size_u MuxStream *ms = ms_from_ost(ost); enum AVMediaType type = ost->type; - ost->sq_idx_encode = -1; ost->sq_idx_mux = -1; nb_interleaved += IS_INTERLEAVED(type); @@ -1961,11 +1945,17 @@ static int setup_sync_queues(Muxer *mux, AVFormatContext *oc, int64_t buf_size_u * - at least one encoded audio/video stream is frame-limited, since * that has similar semantics to 'shortest' * - at least one audio encoder requires constant frame sizes + * + * Note that encoding sync queues are handled in the scheduler, because + * different encoders run in different threads and need external + * synchronization, while muxer sync queues can be handled inside the muxer */ if ((of->shortest && nb_av_enc > 1) || limit_frames_av_enc || nb_audio_fs) { - of->sq_encode = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, mux); - if (!of->sq_encode) - return AVERROR(ENOMEM); + int sq_idx, ret; + + sq_idx = sch_add_sq_enc(mux->sch, buf_size_us, mux); + if (sq_idx < 0) + return sq_idx; for (int i = 0; i < oc->nb_streams; i++) { OutputStream *ost = of->streams[i]; @@ -1975,13 +1965,11 @@ static int setup_sync_queues(Muxer *mux, AVFormatContext *oc, int64_t buf_size_u if (!IS_AV_ENC(ost, type)) continue; - ost->sq_idx_encode = sq_add_stream(of->sq_encode, - of->shortest || ms->max_frames < INT64_MAX); - if (ost->sq_idx_encode < 0) - return ost->sq_idx_encode; - - if (ms->max_frames != INT64_MAX) - sq_limit_frames(of->sq_encode, ost->sq_idx_encode, ms->max_frames); + ret = sch_sq_add_enc(mux->sch, sq_idx, ms->sch_idx_enc, + of->shortest || ms->max_frames < INT64_MAX, + ms->max_frames); + if (ret < 0) + return ret; } } @@ -2652,23 +2640,6 @@ static int validate_enc_avopt(Muxer *mux, const AVDictionary *codec_avopt) return 0; } -static int init_output_stream_nofilter(OutputStream *ost) -{ - int ret = 0; - - if (ost->enc_ctx) { - ret = enc_open(ost, NULL); - if (ret < 0) - return ret; - } else { - ret = of_stream_init(output_files[ost->file_index], ost); - if (ret < 0) - return ret; - } - - return ret; -} - static const char *output_file_item_name(void *obj) { const Muxer *mux = obj; @@ -2751,8 +2722,6 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch) av_strlcat(mux->log_name, "/", sizeof(mux->log_name)); av_strlcat(mux->log_name, oc->oformat->name, sizeof(mux->log_name)); - if (strcmp(oc->oformat->name, "rtp")) - want_sdp = 0; of->format = oc->oformat; if (recording_time != INT64_MAX) @@ -2768,7 +2737,7 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch) AVFMT_FLAG_BITEXACT); } - err = sch_add_mux(sch, muxer_thread, NULL, mux, + err = sch_add_mux(sch, muxer_thread, mux_check_init, mux, !strcmp(oc->oformat->name, "rtp")); if (err < 0) return err; @@ -2854,26 +2823,15 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch) of->url = filename; - /* initialize stream copy and subtitle/data streams. - * Encoded AVFrame based streams will get initialized when the first AVFrame - * is received in do_video_out - */ + /* initialize streamcopy streams. */ for (int i = 0; i < of->nb_streams; i++) { OutputStream *ost = of->streams[i]; - if (ost->filter) - continue; - - err = init_output_stream_nofilter(ost); - if (err < 0) - return err; - } - - /* write the header for files with no streams */ - if (of->format->flags & AVFMT_NOSTREAMS && oc->nb_streams == 0) { - int ret = mux_check_init(mux); - if (ret < 0) - return ret; + if (!ost->enc) { + err = of_stream_init(of, ost); + if (err < 0) + return err; + } } return 0; diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c index d463306546..6177a96a4e 100644 --- a/fftools/ffmpeg_opt.c +++ b/fftools/ffmpeg_opt.c @@ -64,7 +64,6 @@ const char *const opt_name_top_field_first[] = {"top", NULL}; HWDevice *filter_hw_device; char *vstats_filename; -char *sdp_filename; float audio_drift_threshold = 0.1; float dts_delta_threshold = 10; @@ -580,9 +579,8 @@ fail: static int opt_sdp_file(void *optctx, const char *opt, const char *arg) { - av_free(sdp_filename); - sdp_filename = av_strdup(arg); - return 0; + Scheduler *sch = optctx; + return sch_sdp_filename(sch, arg); } #if CONFIG_VAAPI diff --git a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat index 957a410921..bc9b833799 100644 --- a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat +++ b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat @@ -1,48 +1,40 @@ 1 -00:00:00,968 --> 00:00:01,001 +00:00:00,968 --> 00:00:01,168 <font face="Monospace">{\an7}(</font> 2 -00:00:01,001 --> 00:00:01,168 -<font face="Monospace">{\an7}(</font> - -3 00:00:01,168 --> 00:00:01,368 <font face="Monospace">{\an7}(<i> inaudibl</i></font> -4 +3 00:00:01,368 --> 00:00:01,568 <font face="Monospace">{\an7}(<i> inaudible radio chat</i></font> -5 +4 00:00:01,568 --> 00:00:02,002 <font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font> +5 +00:00:02,002 --> 00:00:03,103 +<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font> + 6 -00:00:02,002 --> 00:00:03,003 -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font> - -7 -00:00:03,003 --> 00:00:03,103 -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font> - -8 00:00:03,103 --> 00:00:03,303 -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> ) +<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> ) >></font> -9 +7 00:00:03,303 --> 00:00:03,503 -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> ) +<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> ) >> Safety rema</font> -10 +8 00:00:03,504 --> 00:00:03,704 -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> ) +<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> ) >> Safety remains our numb</font> -11 +9 00:00:03,704 --> 00:00:04,004 -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> ) +<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> ) >> Safety remains our number one</font>