@@ -117,7 +117,7 @@ typedef struct BenchmarkTimeStamps {
static BenchmarkTimeStamps get_benchmark_time_stamps(void);
static int64_t getmaxrss(void);
-unsigned nb_output_dumped = 0;
+atomic_uint nb_output_dumped = 0;
static BenchmarkTimeStamps current_time;
AVIOContext *progress_avio = NULL;
@@ -138,30 +138,6 @@ static struct termios oldtty;
static int restore_tty;
#endif
-/* sub2video hack:
- Convert subtitles to video with alpha to insert them in filter graphs.
- This is a temporary solution until libavfilter gets real subtitles support.
- */
-
-static void sub2video_heartbeat(InputFile *infile, int64_t pts, AVRational tb)
-{
- /* When a frame is read from a file, examine all sub2video streams in
- the same file and send the sub2video frame again. Otherwise, decoded
- video frames could be accumulating in the filter graph while a filter
- (possibly overlay) is desperately waiting for a subtitle frame. */
- for (int i = 0; i < infile->nb_streams; i++) {
- InputStream *ist = infile->streams[i];
-
- if (ist->dec_ctx->codec_type != AVMEDIA_TYPE_SUBTITLE)
- continue;
-
- for (int j = 0; j < ist->nb_filters; j++)
- ifilter_sub2video_heartbeat(ist->filters[j], pts, tb);
- }
-}
-
-/* end of sub2video hack */
-
static void term_exit_sigsafe(void)
{
#if HAVE_TERMIOS_H
@@ -499,23 +475,13 @@ void update_benchmark(const char *fmt, ...)
}
}
-void close_output_stream(OutputStream *ost)
-{
- OutputFile *of = output_files[ost->file_index];
- ost->finished |= ENCODER_FINISHED;
-
- if (ost->sq_idx_encode >= 0)
- sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL));
-}
-
-static void print_report(int is_last_report, int64_t timer_start, int64_t cur_time)
+static void print_report(int is_last_report, int64_t timer_start, int64_t cur_time, int64_t pts)
{
AVBPrint buf, buf_script;
int64_t total_size = of_filesize(output_files[0]);
int vid;
double bitrate;
double speed;
- int64_t pts = AV_NOPTS_VALUE;
static int64_t last_time = -1;
static int first_report = 1;
uint64_t nb_frames_dup = 0, nb_frames_drop = 0;
@@ -533,7 +499,7 @@ static void print_report(int is_last_report, int64_t timer_start, int64_t cur_ti
last_time = cur_time;
}
if (((cur_time - last_time) < stats_period && !first_report) ||
- (first_report && nb_output_dumped < nb_output_files))
+ (first_report && atomic_load(&nb_output_dumped) < nb_output_files))
return;
last_time = cur_time;
}
@@ -544,7 +510,7 @@ static void print_report(int is_last_report, int64_t timer_start, int64_t cur_ti
av_bprint_init(&buf, 0, AV_BPRINT_SIZE_AUTOMATIC);
av_bprint_init(&buf_script, 0, AV_BPRINT_SIZE_AUTOMATIC);
for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
- const float q = ost->enc ? ost->quality / (float) FF_QP2LAMBDA : -1;
+ const float q = ost->enc ? atomic_load(&ost->quality) / (float) FF_QP2LAMBDA : -1;
if (vid && ost->type == AVMEDIA_TYPE_VIDEO) {
av_bprintf(&buf, "q=%2.1f ", q);
@@ -565,22 +531,18 @@ static void print_report(int is_last_report, int64_t timer_start, int64_t cur_ti
if (is_last_report)
av_bprintf(&buf, "L");
- nb_frames_dup = ost->filter->nb_frames_dup;
- nb_frames_drop = ost->filter->nb_frames_drop;
+ nb_frames_dup = atomic_load(&ost->filter->nb_frames_dup);
+ nb_frames_drop = atomic_load(&ost->filter->nb_frames_drop);
vid = 1;
}
- /* compute min output value */
- if (ost->last_mux_dts != AV_NOPTS_VALUE) {
- if (pts == AV_NOPTS_VALUE || ost->last_mux_dts > pts)
- pts = ost->last_mux_dts;
- if (copy_ts) {
- if (copy_ts_first_pts == AV_NOPTS_VALUE && pts > 1)
- copy_ts_first_pts = pts;
- if (copy_ts_first_pts != AV_NOPTS_VALUE)
- pts -= copy_ts_first_pts;
- }
- }
+ }
+
+ if (copy_ts) {
+ if (copy_ts_first_pts == AV_NOPTS_VALUE && pts > 1)
+ copy_ts_first_pts = pts;
+ if (copy_ts_first_pts != AV_NOPTS_VALUE)
+ pts -= copy_ts_first_pts;
}
us = FFABS64U(pts) % AV_TIME_BASE;
@@ -783,81 +745,6 @@ int subtitle_wrap_frame(AVFrame *frame, AVSubtitle *subtitle, int copy)
return 0;
}
-int trigger_fix_sub_duration_heartbeat(OutputStream *ost, const AVPacket *pkt)
-{
- OutputFile *of = output_files[ost->file_index];
- int64_t signal_pts = av_rescale_q(pkt->pts, pkt->time_base,
- AV_TIME_BASE_Q);
-
- if (!ost->fix_sub_duration_heartbeat || !(pkt->flags & AV_PKT_FLAG_KEY))
- // we are only interested in heartbeats on streams configured, and
- // only on random access points.
- return 0;
-
- for (int i = 0; i < of->nb_streams; i++) {
- OutputStream *iter_ost = of->streams[i];
- InputStream *ist = iter_ost->ist;
- int ret = AVERROR_BUG;
-
- if (iter_ost == ost || !ist || !ist->decoding_needed ||
- ist->dec_ctx->codec_type != AVMEDIA_TYPE_SUBTITLE)
- // We wish to skip the stream that causes the heartbeat,
- // output streams without an input stream, streams not decoded
- // (as fix_sub_duration is only done for decoded subtitles) as
- // well as non-subtitle streams.
- continue;
-
- if ((ret = fix_sub_duration_heartbeat(ist, signal_pts)) < 0)
- return ret;
- }
-
- return 0;
-}
-
-/* pkt = NULL means EOF (needed to flush decoder buffers) */
-static int process_input_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
-{
- InputFile *f = input_files[ist->file_index];
- int64_t dts_est = AV_NOPTS_VALUE;
- int ret = 0;
- int eof_reached = 0;
-
- if (ist->decoding_needed) {
- ret = dec_packet(ist, pkt, no_eof);
- if (ret < 0 && ret != AVERROR_EOF)
- return ret;
- }
- if (ret == AVERROR_EOF || (!pkt && !ist->decoding_needed))
- eof_reached = 1;
-
- if (pkt && pkt->opaque_ref) {
- DemuxPktData *pd = (DemuxPktData*)pkt->opaque_ref->data;
- dts_est = pd->dts_est;
- }
-
- if (f->recording_time != INT64_MAX) {
- int64_t start_time = 0;
- if (copy_ts) {
- start_time += f->start_time != AV_NOPTS_VALUE ? f->start_time : 0;
- start_time += start_at_zero ? 0 : f->start_time_effective;
- }
- if (dts_est >= f->recording_time + start_time)
- pkt = NULL;
- }
-
- for (int oidx = 0; oidx < ist->nb_outputs; oidx++) {
- OutputStream *ost = ist->outputs[oidx];
- if (ost->enc || (!pkt && no_eof))
- continue;
-
- ret = of_streamcopy(ost, pkt, dts_est);
- if (ret < 0)
- return ret;
- }
-
- return !eof_reached;
-}
-
static void print_stream_maps(void)
{
av_log(NULL, AV_LOG_INFO, "Stream mapping:\n");
@@ -934,43 +821,6 @@ static void print_stream_maps(void)
}
}
-/**
- * Select the output stream to process.
- *
- * @retval 0 an output stream was selected
- * @retval AVERROR(EAGAIN) need to wait until more input is available
- * @retval AVERROR_EOF no more streams need output
- */
-static int choose_output(OutputStream **post)
-{
- int64_t opts_min = INT64_MAX;
- OutputStream *ost_min = NULL;
-
- for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
- int64_t opts;
-
- if (ost->filter && ost->filter->last_pts != AV_NOPTS_VALUE) {
- opts = ost->filter->last_pts;
- } else {
- opts = ost->last_mux_dts == AV_NOPTS_VALUE ?
- INT64_MIN : ost->last_mux_dts;
- }
-
- if (!ost->initialized && !ost->finished) {
- ost_min = ost;
- break;
- }
- if (!ost->finished && opts < opts_min) {
- opts_min = opts;
- ost_min = ost;
- }
- }
- if (!ost_min)
- return AVERROR_EOF;
- *post = ost_min;
- return ost_min->unavailable ? AVERROR(EAGAIN) : 0;
-}
-
static void set_tty_echo(int on)
{
#if HAVE_TERMIOS_H
@@ -1042,149 +892,21 @@ static int check_keyboard_interaction(int64_t cur_time)
return 0;
}
-static void reset_eagain(void)
-{
- for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost))
- ost->unavailable = 0;
-}
-
-static void decode_flush(InputFile *ifile)
-{
- for (int i = 0; i < ifile->nb_streams; i++) {
- InputStream *ist = ifile->streams[i];
-
- if (ist->discard || !ist->decoding_needed)
- continue;
-
- dec_packet(ist, NULL, 1);
- }
-}
-
-/*
- * Return
- * - 0 -- one packet was read and processed
- * - AVERROR(EAGAIN) -- no packets were available for selected file,
- * this function should be called again
- * - AVERROR_EOF -- this function should not be called again
- */
-static int process_input(int file_index, AVPacket *pkt)
-{
- InputFile *ifile = input_files[file_index];
- InputStream *ist;
- int ret, i;
-
- ret = ifile_get_packet(ifile, pkt);
-
- if (ret == 1) {
- /* the input file is looped: flush the decoders */
- decode_flush(ifile);
- return AVERROR(EAGAIN);
- }
- if (ret < 0) {
- if (ret != AVERROR_EOF) {
- av_log(ifile, AV_LOG_ERROR,
- "Error retrieving a packet from demuxer: %s\n", av_err2str(ret));
- if (exit_on_error)
- return ret;
- }
-
- for (i = 0; i < ifile->nb_streams; i++) {
- ist = ifile->streams[i];
- if (!ist->discard) {
- ret = process_input_packet(ist, NULL, 0);
- if (ret>0)
- return 0;
- else if (ret < 0)
- return ret;
- }
-
- /* mark all outputs that don't go through lavfi as finished */
- for (int oidx = 0; oidx < ist->nb_outputs; oidx++) {
- OutputStream *ost = ist->outputs[oidx];
- OutputFile *of = output_files[ost->file_index];
-
- ret = of_output_packet(of, ost, NULL);
- if (ret < 0)
- return ret;
- }
- }
-
- ifile->eof_reached = 1;
- return AVERROR(EAGAIN);
- }
-
- reset_eagain();
-
- ist = ifile->streams[pkt->stream_index];
-
- sub2video_heartbeat(ifile, pkt->pts, pkt->time_base);
-
- ret = process_input_packet(ist, pkt, 0);
-
- av_packet_unref(pkt);
-
- return ret < 0 ? ret : 0;
-}
-
-/**
- * Run a single step of transcoding.
- *
- * @return 0 for success, <0 for error
- */
-static int transcode_step(OutputStream *ost, AVPacket *demux_pkt)
-{
- InputStream *ist = NULL;
- int ret;
-
- if (ost->filter) {
- if ((ret = fg_transcode_step(ost->filter->graph, &ist)) < 0)
- return ret;
- if (!ist)
- return 0;
- } else {
- ist = ost->ist;
- av_assert0(ist);
- }
-
- ret = process_input(ist->file_index, demux_pkt);
- if (ret == AVERROR(EAGAIN)) {
- return 0;
- }
-
- if (ret < 0)
- return ret == AVERROR_EOF ? 0 : ret;
-
- // process_input() above might have caused output to become available
- // in multiple filtergraphs, so we process all of them
- for (int i = 0; i < nb_filtergraphs; i++) {
- ret = reap_filters(filtergraphs[i], 0);
- if (ret < 0)
- return ret;
- }
-
- return 0;
-}
-
/*
* The following code is the main loop of the file converter
*/
-static int transcode(Scheduler *sch, int *err_rate_exceeded)
+static int transcode(Scheduler *sch)
{
int ret = 0, i;
- InputStream *ist;
- int64_t timer_start;
- AVPacket *demux_pkt = NULL;
+ int64_t timer_start, transcode_ts = 0;
print_stream_maps();
- *err_rate_exceeded = 0;
atomic_store(&transcode_init_done, 1);
- demux_pkt = av_packet_alloc();
- if (!demux_pkt) {
- ret = AVERROR(ENOMEM);
- goto fail;
- }
+ ret = sch_start(sch);
+ if (ret < 0)
+ return ret;
if (stdin_interaction) {
av_log(NULL, AV_LOG_INFO, "Press [q] to stop, [?] for help\n");
@@ -1192,8 +914,7 @@ static int transcode(Scheduler *sch, int *err_rate_exceeded)
timer_start = av_gettime_relative();
- while (!received_sigterm) {
- OutputStream *ost;
+ while (!sch_wait(sch, stats_period, &transcode_ts)) {
int64_t cur_time= av_gettime_relative();
/* if 'q' pressed, exits */
@@ -1201,49 +922,11 @@ static int transcode(Scheduler *sch, int *err_rate_exceeded)
if (check_keyboard_interaction(cur_time) < 0)
break;
- ret = choose_output(&ost);
- if (ret == AVERROR(EAGAIN)) {
- reset_eagain();
- av_usleep(10000);
- ret = 0;
- continue;
- } else if (ret < 0) {
- av_log(NULL, AV_LOG_VERBOSE, "No more output streams to write to, finishing.\n");
- ret = 0;
- break;
- }
-
- ret = transcode_step(ost, demux_pkt);
- if (ret < 0 && ret != AVERROR_EOF) {
- av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n", av_err2str(ret));
- break;
- }
-
/* dump report by using the output first video and audio streams */
- print_report(0, timer_start, cur_time);
+ print_report(0, timer_start, cur_time, transcode_ts);
}
- /* at the end of stream, we must flush the decoder buffers */
- for (ist = ist_iter(NULL); ist; ist = ist_iter(ist)) {
- float err_rate;
-
- if (!input_files[ist->file_index]->eof_reached) {
- int err = process_input_packet(ist, NULL, 0);
- ret = err_merge(ret, err);
- }
-
- err_rate = (ist->frames_decoded || ist->decode_errors) ?
- ist->decode_errors / (ist->frames_decoded + ist->decode_errors) : 0.f;
- if (err_rate > max_error_rate) {
- av_log(ist, AV_LOG_FATAL, "Decode error rate %g exceeds maximum %g\n",
- err_rate, max_error_rate);
- *err_rate_exceeded = 1;
- } else if (err_rate)
- av_log(ist, AV_LOG_VERBOSE, "Decode error rate %g\n", err_rate);
- }
- ret = err_merge(ret, enc_flush());
-
- term_exit();
+ ret = sch_stop(sch);
/* write the trailer if needed */
for (i = 0; i < nb_output_files; i++) {
@@ -1251,11 +934,10 @@ static int transcode(Scheduler *sch, int *err_rate_exceeded)
ret = err_merge(ret, err);
}
- /* dump report by using the first video and audio streams */
- print_report(1, timer_start, av_gettime_relative());
+ term_exit();
-fail:
- av_packet_free(&demux_pkt);
+ /* dump report by using the first video and audio streams */
+ print_report(1, timer_start, av_gettime_relative(), transcode_ts);
return ret;
}
@@ -1308,7 +990,7 @@ int main(int argc, char **argv)
{
Scheduler *sch = NULL;
- int ret, err_rate_exceeded;
+ int ret;
BenchmarkTimeStamps ti;
init_dynload();
@@ -1350,7 +1032,7 @@ int main(int argc, char **argv)
}
current_time = ti = get_benchmark_time_stamps();
- ret = transcode(sch, &err_rate_exceeded);
+ ret = transcode(sch);
if (ret >= 0 && do_benchmark) {
int64_t utime, stime, rtime;
current_time = get_benchmark_time_stamps();
@@ -1362,8 +1044,8 @@ int main(int argc, char **argv)
utime / 1000000.0, stime / 1000000.0, rtime / 1000000.0);
}
- ret = received_nb_signals ? 255 :
- err_rate_exceeded ? 69 : ret;
+ ret = received_nb_signals ? 255 :
+ (ret == FFMPEG_ERROR_RATE_EXCEEDED) ? 69 : ret;
finish:
if (ret == AVERROR_EXIT)
@@ -61,6 +61,8 @@
#define FFMPEG_OPT_TOP 1
#define FFMPEG_OPT_FORCE_KF_SOURCE_NO_DROP 1
+#define FFMPEG_ERROR_RATE_EXCEEDED FFERRTAG('E', 'R', 'E', 'D')
+
enum VideoSyncMethod {
VSYNC_AUTO = -1,
VSYNC_PASSTHROUGH,
@@ -82,13 +84,16 @@ enum HWAccelID {
};
enum FrameOpaque {
- FRAME_OPAQUE_REAP_FILTERS = 1,
- FRAME_OPAQUE_CHOOSE_INPUT,
- FRAME_OPAQUE_SUB_HEARTBEAT,
+ FRAME_OPAQUE_SUB_HEARTBEAT = 1,
FRAME_OPAQUE_EOF,
FRAME_OPAQUE_SEND_COMMAND,
};
+enum PacketOpaque {
+ PKT_OPAQUE_SUB_HEARTBEAT = 1,
+ PKT_OPAQUE_FIX_SUB_DURATION,
+};
+
typedef struct HWDevice {
const char *name;
enum AVHWDeviceType type;
@@ -309,11 +314,8 @@ typedef struct OutputFilter {
enum AVMediaType type;
- /* pts of the last frame received from this filter, in AV_TIME_BASE_Q */
- int64_t last_pts;
-
- uint64_t nb_frames_dup;
- uint64_t nb_frames_drop;
+ atomic_uint_least64_t nb_frames_dup;
+ atomic_uint_least64_t nb_frames_drop;
} OutputFilter;
typedef struct FilterGraph {
@@ -426,11 +428,6 @@ typedef struct InputFile {
float readrate;
int accurate_seek;
-
- /* when looping the input file, this queue is used by decoders to report
- * the last frame timestamp back to the demuxer thread */
- AVThreadMessageQueue *audio_ts_queue;
- int audio_ts_queue_size;
} InputFile;
enum forced_keyframes_const {
@@ -532,8 +529,6 @@ typedef struct OutputStream {
InputStream *ist;
AVStream *st; /* stream in the output file */
- /* dts of the last packet sent to the muxing queue, in AV_TIME_BASE_Q */
- int64_t last_mux_dts;
AVRational enc_timebase;
@@ -578,13 +573,6 @@ typedef struct OutputStream {
AVDictionary *sws_dict;
AVDictionary *swr_opts;
char *apad;
- OSTFinished finished; /* no more packets should be written for this stream */
- int unavailable; /* true if the steram is unavailable (possibly temporarily) */
-
- // init_output_stream() has been called for this stream
- // The encoder and the bitstream filters have been initialized and the stream
- // parameters are set in the AVStream.
- int initialized;
const char *attachment_filename;
@@ -598,9 +586,8 @@ typedef struct OutputStream {
uint64_t samples_encoded;
/* packet quality factor */
- int quality;
+ atomic_int quality;
- int sq_idx_encode;
int sq_idx_mux;
EncStats enc_stats_pre;
@@ -658,7 +645,6 @@ extern FilterGraph **filtergraphs;
extern int nb_filtergraphs;
extern char *vstats_filename;
-extern char *sdp_filename;
extern float dts_delta_threshold;
extern float dts_error_threshold;
@@ -691,7 +677,7 @@ extern const AVIOInterruptCB int_cb;
extern const OptionDef options[];
extern HWDevice *filter_hw_device;
-extern unsigned nb_output_dumped;
+extern atomic_uint nb_output_dumped;
extern int ignore_unknown_streams;
extern int copy_unknown_streams;
@@ -737,10 +723,6 @@ FrameData *frame_data(AVFrame *frame);
const FrameData *frame_data_c(AVFrame *frame);
-int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference);
-int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb);
-void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb);
-
/**
* Set up fallback filtering parameters from a decoder context. They will only
* be used if no frames are ever sent on this input, otherwise the actual
@@ -761,26 +743,9 @@ int fg_create(FilterGraph **pfg, char *graph_desc, Scheduler *sch);
void fg_free(FilterGraph **pfg);
-/**
- * Perform a step of transcoding for the specified filter graph.
- *
- * @param[in] graph filter graph to consider
- * @param[out] best_ist input stream where a frame would allow to continue
- * @return 0 for success, <0 for error
- */
-int fg_transcode_step(FilterGraph *graph, InputStream **best_ist);
-
void fg_send_command(FilterGraph *fg, double time, const char *target,
const char *command, const char *arg, int all_filters);
-/**
- * Get and encode new output from specified filtergraph, without causing
- * activity.
- *
- * @return 0 for success, <0 for severe errors
- */
-int reap_filters(FilterGraph *fg, int flush);
-
int ffmpeg_parse_options(int argc, char **argv, Scheduler *sch);
void enc_stats_write(OutputStream *ost, EncStats *es,
@@ -807,25 +772,11 @@ int hwaccel_retrieve_data(AVCodecContext *avctx, AVFrame *input);
int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx);
void dec_free(Decoder **pdec);
-/**
- * Submit a packet for decoding
- *
- * When pkt==NULL and no_eof=0, there will be no more input. Flush decoders and
- * mark all downstreams as finished.
- *
- * When pkt==NULL and no_eof=1, the stream was reset (e.g. after a seek). Flush
- * decoders and await further input.
- */
-int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof);
-
int enc_alloc(Encoder **penc, const AVCodec *codec,
Scheduler *sch, unsigned sch_idx);
void enc_free(Encoder **penc);
-int enc_open(OutputStream *ost, const AVFrame *frame);
-int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub);
-int enc_frame(OutputStream *ost, AVFrame *frame);
-int enc_flush(void);
+int enc_open(void *opaque, const AVFrame *frame);
/*
* Initialize muxing state for the given stream, should be called
@@ -840,30 +791,11 @@ void of_free(OutputFile **pof);
void of_enc_stats_close(void);
-int of_output_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt);
-
-/**
- * @param dts predicted packet dts in AV_TIME_BASE_Q
- */
-int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts);
-
int64_t of_filesize(OutputFile *of);
int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch);
void ifile_close(InputFile **f);
-/**
- * Get next input packet from the demuxer.
- *
- * @param pkt the packet is written here when this function returns 0
- * @return
- * - 0 when a packet has been read successfully
- * - 1 when stream end was reached, but the stream is looped;
- * caller should flush decoders and read from this demuxer again
- * - a negative error code on failure
- */
-int ifile_get_packet(InputFile *f, AVPacket *pkt);
-
int ist_output_add(InputStream *ist, OutputStream *ost);
int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple);
@@ -880,9 +812,6 @@ InputStream *ist_iter(InputStream *prev);
* pass NULL to start iteration */
OutputStream *ost_iter(OutputStream *prev);
-void close_output_stream(OutputStream *ost);
-int trigger_fix_sub_duration_heartbeat(OutputStream *ost, const AVPacket *pkt);
-int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts);
void update_benchmark(const char *fmt, ...);
#define SPECIFIER_OPT_FMT_str "%s"
@@ -54,24 +54,6 @@ struct Decoder {
Scheduler *sch;
unsigned sch_idx;
-
- pthread_t thread;
- /**
- * Queue for sending coded packets from the main thread to
- * the decoder thread.
- *
- * An empty packet is sent to flush the decoder without terminating
- * decoding.
- */
- ThreadQueue *queue_in;
- /**
- * Queue for sending decoded frames from the decoder thread
- * to the main thread.
- *
- * An empty frame is sent to signal that a single packet has been fully
- * processed.
- */
- ThreadQueue *queue_out;
};
// data that is local to the decoder thread and not visible outside of it
@@ -80,24 +62,6 @@ typedef struct DecThreadContext {
AVPacket *pkt;
} DecThreadContext;
-static int dec_thread_stop(Decoder *d)
-{
- void *ret;
-
- if (!d->queue_in)
- return 0;
-
- tq_send_finish(d->queue_in, 0);
- tq_receive_finish(d->queue_out, 0);
-
- pthread_join(d->thread, &ret);
-
- tq_free(&d->queue_in);
- tq_free(&d->queue_out);
-
- return (intptr_t)ret;
-}
-
void dec_free(Decoder **pdec)
{
Decoder *dec = *pdec;
@@ -105,8 +69,6 @@ void dec_free(Decoder **pdec)
if (!dec)
return;
- dec_thread_stop(dec);
-
av_frame_free(&dec->frame);
av_packet_free(&dec->pkt);
@@ -148,25 +110,6 @@ fail:
return AVERROR(ENOMEM);
}
-static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame)
-{
- int i, ret = 0;
-
- for (i = 0; i < ist->nb_filters; i++) {
- ret = ifilter_send_frame(ist->filters[i], decoded_frame,
- i < ist->nb_filters - 1 ||
- ist->dec->type == AVMEDIA_TYPE_SUBTITLE);
- if (ret == AVERROR_EOF)
- ret = 0; /* ignore */
- if (ret < 0) {
- av_log(NULL, AV_LOG_ERROR,
- "Failed to inject frame into filter network: %s\n", av_err2str(ret));
- break;
- }
- }
- return ret;
-}
-
static AVRational audio_samplerate_update(void *logctx, Decoder *d,
const AVFrame *frame)
{
@@ -421,28 +364,14 @@ static int process_subtitle(InputStream *ist, AVFrame *frame)
if (!subtitle)
return 0;
- ret = send_frame_to_filters(ist, frame);
+ ret = sch_dec_send(d->sch, d->sch_idx, frame);
if (ret < 0)
- return ret;
+ av_frame_unref(frame);
- subtitle = (AVSubtitle*)frame->buf[0]->data;
- if (!subtitle->num_rects)
- return 0;
-
- for (int oidx = 0; oidx < ist->nb_outputs; oidx++) {
- OutputStream *ost = ist->outputs[oidx];
- if (!ost->enc || ost->type != AVMEDIA_TYPE_SUBTITLE)
- continue;
-
- ret = enc_subtitle(output_files[ost->file_index], ost, subtitle);
- if (ret < 0)
- return ret;
- }
-
- return 0;
+ return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
}
-int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts)
+static int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts)
{
Decoder *d = ist->decoder;
int ret = AVERROR_BUG;
@@ -468,12 +397,24 @@ int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts)
static int transcode_subtitles(InputStream *ist, const AVPacket *pkt,
AVFrame *frame)
{
- Decoder *d = ist->decoder;
+ Decoder *d = ist->decoder;
AVPacket *flush_pkt = NULL;
AVSubtitle subtitle;
int got_output;
int ret;
+ if (pkt && (intptr_t)pkt->opaque == PKT_OPAQUE_SUB_HEARTBEAT) {
+ frame->pts = pkt->pts;
+ frame->time_base = pkt->time_base;
+ frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_SUB_HEARTBEAT;
+
+ ret = sch_dec_send(d->sch, d->sch_idx, frame);
+ return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
+ } else if (pkt && (intptr_t)pkt->opaque == PKT_OPAQUE_FIX_SUB_DURATION) {
+ return fix_sub_duration_heartbeat(ist, av_rescale_q(pkt->pts, pkt->time_base,
+ AV_TIME_BASE_Q));
+ }
+
if (!pkt) {
flush_pkt = av_packet_alloc();
if (!flush_pkt)
@@ -496,7 +437,7 @@ static int transcode_subtitles(InputStream *ist, const AVPacket *pkt,
ist->frames_decoded++;
- // XXX the queue for transferring data back to the main thread runs
+ // XXX the queue for transferring data to consumers runs
// on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put that
// inside the frame
// eventually, subtitles should be switched to use AVFrames natively
@@ -509,26 +450,7 @@ static int transcode_subtitles(InputStream *ist, const AVPacket *pkt,
frame->width = ist->dec_ctx->width;
frame->height = ist->dec_ctx->height;
- ret = tq_send(d->queue_out, 0, frame);
- if (ret < 0)
- av_frame_unref(frame);
-
- return ret;
-}
-
-static int send_filter_eof(InputStream *ist)
-{
- Decoder *d = ist->decoder;
- int i, ret;
-
- for (i = 0; i < ist->nb_filters; i++) {
- int64_t end_pts = d->last_frame_pts == AV_NOPTS_VALUE ? AV_NOPTS_VALUE :
- d->last_frame_pts + d->last_frame_duration_est;
- ret = ifilter_send_eof(ist->filters[i], end_pts, d->last_frame_tb);
- if (ret < 0)
- return ret;
- }
- return 0;
+ return process_subtitle(ist, frame);
}
static int packet_decode(InputStream *ist, AVPacket *pkt, AVFrame *frame)
@@ -635,9 +557,11 @@ static int packet_decode(InputStream *ist, AVPacket *pkt, AVFrame *frame)
ist->frames_decoded++;
- ret = tq_send(d->queue_out, 0, frame);
- if (ret < 0)
- return ret;
+ ret = sch_dec_send(d->sch, d->sch_idx, frame);
+ if (ret < 0) {
+ av_frame_unref(frame);
+ return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
+ }
}
}
@@ -679,7 +603,6 @@ fail:
void *decoder_thread(void *arg)
{
InputStream *ist = arg;
- InputFile *ifile = input_files[ist->file_index];
Decoder *d = ist->decoder;
DecThreadContext dt;
int ret = 0, input_status = 0;
@@ -691,19 +614,31 @@ void *decoder_thread(void *arg)
dec_thread_set_name(ist);
while (!input_status) {
- int dummy, flush_buffers;
+ int flush_buffers, have_data;
- input_status = tq_receive(d->queue_in, &dummy, dt.pkt);
- flush_buffers = input_status >= 0 && !dt.pkt->buf;
- if (!dt.pkt->buf)
+ input_status = sch_dec_receive(d->sch, d->sch_idx, dt.pkt);
+ have_data = input_status >= 0 &&
+ (dt.pkt->buf || dt.pkt->side_data_elems ||
+ (intptr_t)dt.pkt->opaque == PKT_OPAQUE_SUB_HEARTBEAT ||
+ (intptr_t)dt.pkt->opaque == PKT_OPAQUE_FIX_SUB_DURATION);
+ flush_buffers = input_status >= 0 && !have_data;
+ if (!have_data)
av_log(ist, AV_LOG_VERBOSE, "Decoder thread received %s packet\n",
flush_buffers ? "flush" : "EOF");
- ret = packet_decode(ist, dt.pkt->buf ? dt.pkt : NULL, dt.frame);
+ ret = packet_decode(ist, have_data ? dt.pkt : NULL, dt.frame);
av_packet_unref(dt.pkt);
av_frame_unref(dt.frame);
+ // AVERROR_EOF - EOF from the decoder
+ // AVERROR_EXIT - EOF from the scheduler
+ // we treat them differently when flushing
+ if (ret == AVERROR_EXIT) {
+ ret = AVERROR_EOF;
+ flush_buffers = 0;
+ }
+
if (ret == AVERROR_EOF) {
av_log(ist, AV_LOG_VERBOSE, "Decoder returned EOF, %s\n",
flush_buffers ? "resetting" : "finishing");
@@ -711,11 +646,10 @@ void *decoder_thread(void *arg)
if (!flush_buffers)
break;
- /* report last frame duration to the demuxer thread */
+ /* report last frame duration to the scheduler */
if (ist->dec->type == AVMEDIA_TYPE_AUDIO) {
- Timestamp ts = { .ts = d->last_frame_pts + d->last_frame_duration_est,
- .tb = d->last_frame_tb };
- av_thread_message_queue_send(ifile->audio_ts_queue, &ts, 0);
+ dt.pkt->pts = d->last_frame_pts + d->last_frame_duration_est;
+ dt.pkt->time_base = d->last_frame_tb;
}
avcodec_flush_buffers(ist->dec_ctx);
@@ -724,149 +658,47 @@ void *decoder_thread(void *arg)
av_err2str(ret));
break;
}
-
- // signal to the consumer thread that the entire packet was processed
- ret = tq_send(d->queue_out, 0, dt.frame);
- if (ret < 0) {
- if (ret != AVERROR_EOF)
- av_log(ist, AV_LOG_ERROR, "Error communicating with the main thread\n");
- break;
- }
}
// EOF is normal thread termination
if (ret == AVERROR_EOF)
ret = 0;
+ // on success send EOF timestamp to our downstreams
+ if (ret >= 0) {
+ float err_rate;
+
+ av_frame_unref(dt.frame);
+
+ dt.frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_EOF;
+ dt.frame->pts = d->last_frame_pts == AV_NOPTS_VALUE ? AV_NOPTS_VALUE :
+ d->last_frame_pts + d->last_frame_duration_est;
+ dt.frame->time_base = d->last_frame_tb;
+
+ ret = sch_dec_send(d->sch, d->sch_idx, dt.frame);
+ if (ret < 0 && ret != AVERROR_EOF) {
+ av_log(NULL, AV_LOG_FATAL,
+ "Error signalling EOF timestamp: %s\n", av_err2str(ret));
+ goto finish;
+ }
+ ret = 0;
+
+ err_rate = (ist->frames_decoded || ist->decode_errors) ?
+ ist->decode_errors / (ist->frames_decoded + ist->decode_errors) : 0.f;
+ if (err_rate > max_error_rate) {
+ av_log(ist, AV_LOG_FATAL, "Decode error rate %g exceeds maximum %g\n",
+ err_rate, max_error_rate);
+ ret = FFMPEG_ERROR_RATE_EXCEEDED;
+ } else if (err_rate)
+ av_log(ist, AV_LOG_VERBOSE, "Decode error rate %g\n", err_rate);
+ }
+
finish:
- tq_receive_finish(d->queue_in, 0);
- tq_send_finish (d->queue_out, 0);
-
- // make sure the demuxer does not get stuck waiting for audio durations
- // that will never arrive
- if (ifile->audio_ts_queue && ist->dec->type == AVMEDIA_TYPE_AUDIO)
- av_thread_message_queue_set_err_recv(ifile->audio_ts_queue, AVERROR_EOF);
-
dec_thread_uninit(&dt);
- av_log(ist, AV_LOG_VERBOSE, "Terminating decoder thread\n");
-
return (void*)(intptr_t)ret;
}
-int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
-{
- Decoder *d = ist->decoder;
- int ret = 0, thread_ret;
-
- // thread already joined
- if (!d->queue_in)
- return AVERROR_EOF;
-
- // send the packet/flush request/EOF to the decoder thread
- if (pkt || no_eof) {
- av_packet_unref(d->pkt);
-
- if (pkt) {
- ret = av_packet_ref(d->pkt, pkt);
- if (ret < 0)
- goto finish;
- }
-
- ret = tq_send(d->queue_in, 0, d->pkt);
- if (ret < 0)
- goto finish;
- } else
- tq_send_finish(d->queue_in, 0);
-
- // retrieve all decoded data for the packet
- while (1) {
- int dummy;
-
- ret = tq_receive(d->queue_out, &dummy, d->frame);
- if (ret < 0)
- goto finish;
-
- // packet fully processed
- if (!d->frame->buf[0])
- return 0;
-
- // process the decoded frame
- if (ist->dec->type == AVMEDIA_TYPE_SUBTITLE) {
- ret = process_subtitle(ist, d->frame);
- } else {
- ret = send_frame_to_filters(ist, d->frame);
- }
- av_frame_unref(d->frame);
- if (ret < 0)
- goto finish;
- }
-
-finish:
- thread_ret = dec_thread_stop(d);
- if (thread_ret < 0) {
- av_log(ist, AV_LOG_ERROR, "Decoder thread returned error: %s\n",
- av_err2str(thread_ret));
- ret = err_merge(ret, thread_ret);
- }
- // non-EOF errors here are all fatal
- if (ret < 0 && ret != AVERROR_EOF)
- return ret;
-
- // signal EOF to our downstreams
- ret = send_filter_eof(ist);
- if (ret < 0) {
- av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n");
- return ret;
- }
-
- return AVERROR_EOF;
-}
-
-static int dec_thread_start(InputStream *ist)
-{
- Decoder *d = ist->decoder;
- ObjPool *op;
- int ret = 0;
-
- op = objpool_alloc_packets();
- if (!op)
- return AVERROR(ENOMEM);
-
- d->queue_in = tq_alloc(1, 1, op, pkt_move);
- if (!d->queue_in) {
- objpool_free(&op);
- return AVERROR(ENOMEM);
- }
-
- op = objpool_alloc_frames();
- if (!op)
- goto fail;
-
- d->queue_out = tq_alloc(1, 4, op, frame_move);
- if (!d->queue_out) {
- objpool_free(&op);
- goto fail;
- }
-
- ret = pthread_create(&d->thread, NULL, decoder_thread, ist);
- if (ret) {
- ret = AVERROR(ret);
- av_log(ist, AV_LOG_ERROR, "pthread_create() failed: %s\n",
- av_err2str(ret));
- goto fail;
- }
-
- return 0;
-fail:
- if (ret >= 0)
- ret = AVERROR(ENOMEM);
-
- tq_free(&d->queue_in);
- tq_free(&d->queue_out);
- return ret;
-}
-
static enum AVPixelFormat get_format(AVCodecContext *s, const enum AVPixelFormat *pix_fmts)
{
InputStream *ist = s->opaque;
@@ -1118,12 +950,5 @@ int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx)
if (ret < 0)
return ret;
- ret = dec_thread_start(ist);
- if (ret < 0) {
- av_log(ist, AV_LOG_ERROR, "Error starting decoder thread: %s\n",
- av_err2str(ret));
- return ret;
- }
-
return 0;
}
@@ -22,8 +22,6 @@
#include "ffmpeg.h"
#include "ffmpeg_sched.h"
#include "ffmpeg_utils.h"
-#include "objpool.h"
-#include "thread_queue.h"
#include "libavutil/avassert.h"
#include "libavutil/avstring.h"
@@ -35,7 +33,6 @@
#include "libavutil/pixdesc.h"
#include "libavutil/time.h"
#include "libavutil/timestamp.h"
-#include "libavutil/thread.h"
#include "libavcodec/packet.h"
@@ -66,7 +63,11 @@ typedef struct DemuxStream {
double ts_scale;
+ // scheduler returned EOF for this stream
+ int finished;
+
int streamcopy_needed;
+ int have_sub2video;
int wrap_correction_done;
int saw_first_ts;
@@ -101,6 +102,7 @@ typedef struct Demuxer {
/* number of times input stream should be looped */
int loop;
+ int have_audio_dec;
/* duration of the looped segment of the input file */
Timestamp duration;
/* pts with the smallest/largest values ever seen */
@@ -113,11 +115,12 @@ typedef struct Demuxer {
double readrate_initial_burst;
Scheduler *sch;
- ThreadQueue *thread_queue;
- int thread_queue_size;
- pthread_t thread;
+
+ AVPacket *pkt_heartbeat;
int read_started;
+ int nb_streams_used;
+ int nb_streams_finished;
} Demuxer;
static DemuxStream *ds_from_ist(InputStream *ist)
@@ -153,7 +156,7 @@ static void report_new_stream(Demuxer *d, const AVPacket *pkt)
d->nb_streams_warn = pkt->stream_index + 1;
}
-static int seek_to_start(Demuxer *d)
+static int seek_to_start(Demuxer *d, Timestamp end_pts)
{
InputFile *ifile = &d->f;
AVFormatContext *is = ifile->ctx;
@@ -163,21 +166,10 @@ static int seek_to_start(Demuxer *d)
if (ret < 0)
return ret;
- if (ifile->audio_ts_queue_size) {
- int got_ts = 0;
-
- while (got_ts < ifile->audio_ts_queue_size) {
- Timestamp ts;
- ret = av_thread_message_queue_recv(ifile->audio_ts_queue, &ts, 0);
- if (ret < 0)
- return ret;
- got_ts++;
-
- if (d->max_pts.ts == AV_NOPTS_VALUE ||
- av_compare_ts(d->max_pts.ts, d->max_pts.tb, ts.ts, ts.tb) < 0)
- d->max_pts = ts;
- }
- }
+ if (end_pts.ts != AV_NOPTS_VALUE &&
+ (d->max_pts.ts == AV_NOPTS_VALUE ||
+ av_compare_ts(d->max_pts.ts, d->max_pts.tb, end_pts.ts, end_pts.tb) < 0))
+ d->max_pts = end_pts;
if (d->max_pts.ts != AV_NOPTS_VALUE) {
int64_t min_pts = d->min_pts.ts == AV_NOPTS_VALUE ? 0 : d->min_pts.ts;
@@ -404,7 +396,7 @@ static int ts_fixup(Demuxer *d, AVPacket *pkt)
duration = av_rescale_q(d->duration.ts, d->duration.tb, pkt->time_base);
if (pkt->pts != AV_NOPTS_VALUE) {
// audio decoders take precedence for estimating total file duration
- int64_t pkt_duration = ifile->audio_ts_queue_size ? 0 : pkt->duration;
+ int64_t pkt_duration = d->have_audio_dec ? 0 : pkt->duration;
pkt->pts += duration;
@@ -440,7 +432,7 @@ static int ts_fixup(Demuxer *d, AVPacket *pkt)
return 0;
}
-static int input_packet_process(Demuxer *d, AVPacket *pkt)
+static int input_packet_process(Demuxer *d, AVPacket *pkt, unsigned *send_flags)
{
InputFile *f = &d->f;
InputStream *ist = f->streams[pkt->stream_index];
@@ -451,6 +443,16 @@ static int input_packet_process(Demuxer *d, AVPacket *pkt)
if (ret < 0)
return ret;
+ if (f->recording_time != INT64_MAX) {
+ int64_t start_time = 0;
+ if (copy_ts) {
+ start_time += f->start_time != AV_NOPTS_VALUE ? f->start_time : 0;
+ start_time += start_at_zero ? 0 : f->start_time_effective;
+ }
+ if (ds->dts >= f->recording_time + start_time)
+ *send_flags |= DEMUX_SEND_STREAMCOPY_EOF;
+ }
+
ds->data_size += pkt->size;
ds->nb_packets++;
@@ -465,6 +467,8 @@ static int input_packet_process(Demuxer *d, AVPacket *pkt)
av_ts2timestr(input_files[ist->file_index]->ts_offset, &AV_TIME_BASE_Q));
}
+ pkt->stream_index = ds->sch_idx_stream;
+
return 0;
}
@@ -488,6 +492,65 @@ static void readrate_sleep(Demuxer *d)
}
}
+static int do_send(Demuxer *d, DemuxStream *ds, AVPacket *pkt, unsigned flags,
+ const char *pkt_desc)
+{
+ int ret;
+
+ ret = sch_demux_send(d->sch, d->f.index, pkt, flags);
+ if (ret == AVERROR_EOF) {
+ av_packet_unref(pkt);
+
+ av_log(ds, AV_LOG_VERBOSE, "All consumers of this stream are done\n");
+ ds->finished = 1;
+
+ if (++d->nb_streams_finished == d->nb_streams_used) {
+ av_log(d, AV_LOG_VERBOSE, "All consumers are done\n");
+ return AVERROR_EOF;
+ }
+ } else if (ret < 0) {
+ if (ret != AVERROR_EXIT)
+ av_log(d, AV_LOG_ERROR,
+ "Unable to send %s packet to consumers: %s\n",
+ pkt_desc, av_err2str(ret));
+ return ret;
+ }
+
+ return 0;
+}
+
+static int demux_send(Demuxer *d, DemuxStream *ds, AVPacket *pkt, unsigned flags)
+{
+ InputFile *f = &d->f;
+ int ret;
+
+ // send heartbeat for sub2video streams
+ if (d->pkt_heartbeat && pkt->pts != AV_NOPTS_VALUE) {
+ for (int i = 0; i < f->nb_streams; i++) {
+ DemuxStream *ds1 = ds_from_ist(f->streams[i]);
+
+ if (ds1->finished || !ds1->have_sub2video)
+ continue;
+
+ d->pkt_heartbeat->pts = pkt->pts;
+ d->pkt_heartbeat->time_base = pkt->time_base;
+ d->pkt_heartbeat->stream_index = ds1->sch_idx_stream;
+ d->pkt_heartbeat->opaque = (void*)(intptr_t)PKT_OPAQUE_SUB_HEARTBEAT;
+
+ ret = do_send(d, ds1, d->pkt_heartbeat, 0, "heartbeat");
+ if (ret < 0)
+ return ret;
+ }
+ }
+
+ ret = do_send(d, ds, pkt, flags, "demuxed");
+ if (ret < 0)
+ return ret;
+
+
+ return 0;
+}
+
static void discard_unused_programs(InputFile *ifile)
{
for (int j = 0; j < ifile->ctx->nb_programs; j++) {
@@ -527,9 +590,13 @@ static void *input_thread(void *arg)
discard_unused_programs(f);
+ d->read_started = 1;
d->wallclock_start = av_gettime_relative();
while (1) {
+ DemuxStream *ds;
+ unsigned send_flags = 0;
+
ret = av_read_frame(f->ctx, pkt);
if (ret == AVERROR(EAGAIN)) {
@@ -538,11 +605,13 @@ static void *input_thread(void *arg)
}
if (ret < 0) {
if (d->loop) {
- /* signal looping to the consumer thread */
+ /* signal looping to our consumers */
pkt->stream_index = -1;
- ret = tq_send(d->thread_queue, 0, pkt);
+
+ ret = sch_demux_send(d->sch, f->index, pkt, 0);
if (ret >= 0)
- ret = seek_to_start(d);
+ ret = seek_to_start(d, (Timestamp){ .ts = pkt->pts,
+ .tb = pkt->time_base });
if (ret >= 0)
continue;
@@ -551,9 +620,11 @@ static void *input_thread(void *arg)
if (ret == AVERROR_EOF)
av_log(d, AV_LOG_VERBOSE, "EOF while reading input\n");
- else
+ else {
av_log(d, AV_LOG_ERROR, "Error during demuxing: %s\n",
av_err2str(ret));
+ ret = exit_on_error ? ret : 0;
+ }
break;
}
@@ -565,8 +636,9 @@ static void *input_thread(void *arg)
/* the following test is needed in case new streams appear
dynamically in stream : we ignore them */
- if (pkt->stream_index >= f->nb_streams ||
- f->streams[pkt->stream_index]->discard) {
+ ds = pkt->stream_index < f->nb_streams ?
+ ds_from_ist(f->streams[pkt->stream_index]) : NULL;
+ if (!ds || ds->ist.discard || ds->finished) {
report_new_stream(d, pkt);
av_packet_unref(pkt);
continue;
@@ -583,122 +655,26 @@ static void *input_thread(void *arg)
}
}
- ret = input_packet_process(d, pkt);
+ ret = input_packet_process(d, pkt, &send_flags);
if (ret < 0)
break;
if (f->readrate)
readrate_sleep(d);
- ret = tq_send(d->thread_queue, 0, pkt);
- if (ret < 0) {
- if (ret != AVERROR_EOF)
- av_log(f, AV_LOG_ERROR,
- "Unable to send packet to main thread: %s\n",
- av_err2str(ret));
+ ret = demux_send(d, ds, pkt, send_flags);
+ if (ret < 0)
break;
- }
}
+ // EOF/EXIT is normal termination
+ if (ret == AVERROR_EOF || ret == AVERROR_EXIT)
+ ret = 0;
+
finish:
- av_assert0(ret < 0);
- tq_send_finish(d->thread_queue, 0);
-
av_packet_free(&pkt);
- av_log(d, AV_LOG_VERBOSE, "Terminating demuxer thread\n");
-
- return NULL;
-}
-
-static void thread_stop(Demuxer *d)
-{
- InputFile *f = &d->f;
-
- if (!d->thread_queue)
- return;
-
- tq_receive_finish(d->thread_queue, 0);
-
- pthread_join(d->thread, NULL);
-
- tq_free(&d->thread_queue);
-
- av_thread_message_queue_free(&f->audio_ts_queue);
-}
-
-static int thread_start(Demuxer *d)
-{
- int ret;
- InputFile *f = &d->f;
- ObjPool *op;
-
- if (d->thread_queue_size <= 0)
- d->thread_queue_size = (nb_input_files > 1 ? 8 : 1);
-
- op = objpool_alloc_packets();
- if (!op)
- return AVERROR(ENOMEM);
-
- d->thread_queue = tq_alloc(1, d->thread_queue_size, op, pkt_move);
- if (!d->thread_queue) {
- objpool_free(&op);
- return AVERROR(ENOMEM);
- }
-
- if (d->loop) {
- int nb_audio_dec = 0;
-
- for (int i = 0; i < f->nb_streams; i++) {
- InputStream *ist = f->streams[i];
- nb_audio_dec += !!(ist->decoding_needed &&
- ist->st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO);
- }
-
- if (nb_audio_dec) {
- ret = av_thread_message_queue_alloc(&f->audio_ts_queue,
- nb_audio_dec, sizeof(Timestamp));
- if (ret < 0)
- goto fail;
- f->audio_ts_queue_size = nb_audio_dec;
- }
- }
-
- if ((ret = pthread_create(&d->thread, NULL, input_thread, d))) {
- av_log(d, AV_LOG_ERROR, "pthread_create failed: %s. Try to increase `ulimit -v` or decrease `ulimit -s`.\n", strerror(ret));
- ret = AVERROR(ret);
- goto fail;
- }
-
- d->read_started = 1;
-
- return 0;
-fail:
- tq_free(&d->thread_queue);
- return ret;
-}
-
-int ifile_get_packet(InputFile *f, AVPacket *pkt)
-{
- Demuxer *d = demuxer_from_ifile(f);
- int ret, dummy;
-
- if (!d->thread_queue) {
- ret = thread_start(d);
- if (ret < 0)
- return ret;
- }
-
- ret = tq_receive(d->thread_queue, &dummy, pkt);
- if (ret < 0)
- return ret;
-
- if (pkt->stream_index == -1) {
- av_assert0(!pkt->data && !pkt->side_data_elems);
- return 1;
- }
-
- return 0;
+ return (void*)(intptr_t)ret;
}
static void demux_final_stats(Demuxer *d)
@@ -769,8 +745,6 @@ void ifile_close(InputFile **pf)
if (!f)
return;
- thread_stop(d);
-
if (d->read_started)
demux_final_stats(d);
@@ -780,6 +754,8 @@ void ifile_close(InputFile **pf)
avformat_close_input(&f->ctx);
+ av_packet_free(&d->pkt_heartbeat);
+
av_freep(pf);
}
@@ -802,7 +778,11 @@ static int ist_use(InputStream *ist, int decoding_needed)
ds->sch_idx_stream = ret;
}
- ist->discard = 0;
+ if (ist->discard) {
+ ist->discard = 0;
+ d->nb_streams_used++;
+ }
+
ist->st->discard = ist->user_set_discard;
ist->decoding_needed |= decoding_needed;
ds->streamcopy_needed |= !decoding_needed;
@@ -823,6 +803,8 @@ static int ist_use(InputStream *ist, int decoding_needed)
ret = dec_open(ist, d->sch, ds->sch_idx_dec);
if (ret < 0)
return ret;
+
+ d->have_audio_dec |= is_audio;
}
return 0;
@@ -848,6 +830,7 @@ int ist_output_add(InputStream *ist, OutputStream *ost)
int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple)
{
+ Demuxer *d = demuxer_from_ifile(input_files[ist->file_index]);
DemuxStream *ds = ds_from_ist(ist);
int ret;
@@ -866,6 +849,15 @@ int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple)
if (ret < 0)
return ret;
+ if (ist->dec_ctx->codec_type == AVMEDIA_TYPE_SUBTITLE) {
+ if (!d->pkt_heartbeat) {
+ d->pkt_heartbeat = av_packet_alloc();
+ if (!d->pkt_heartbeat)
+ return AVERROR(ENOMEM);
+ }
+ ds->have_sub2video = 1;
+ }
+
return ds->sch_idx_dec;
}
@@ -1607,8 +1599,6 @@ int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch)
"since neither -readrate nor -re were given\n");
}
- d->thread_queue_size = o->thread_queue_size;
-
/* Add all the streams from the given input file to the demuxer */
for (int i = 0; i < ic->nb_streams; i++) {
ret = ist_add(o, d, ic->streams[i]);
@@ -41,12 +41,6 @@
#include "libavformat/avformat.h"
struct Encoder {
- AVFrame *sq_frame;
-
- // packet for receiving encoded output
- AVPacket *pkt;
- AVFrame *sub_frame;
-
// combined size of all the packets received from the encoder
uint64_t data_size;
@@ -54,25 +48,9 @@ struct Encoder {
uint64_t packets_encoded;
int opened;
- int finished;
Scheduler *sch;
unsigned sch_idx;
-
- pthread_t thread;
- /**
- * Queue for sending frames from the main thread to
- * the encoder thread.
- */
- ThreadQueue *queue_in;
- /**
- * Queue for sending encoded packets from the encoder thread
- * to the main thread.
- *
- * An empty packet is sent to signal that a previously sent
- * frame has been fully processed.
- */
- ThreadQueue *queue_out;
};
// data that is local to the decoder thread and not visible outside of it
@@ -81,24 +59,6 @@ typedef struct EncoderThread {
AVPacket *pkt;
} EncoderThread;
-static int enc_thread_stop(Encoder *e)
-{
- void *ret;
-
- if (!e->queue_in)
- return 0;
-
- tq_send_finish(e->queue_in, 0);
- tq_receive_finish(e->queue_out, 0);
-
- pthread_join(e->thread, &ret);
-
- tq_free(&e->queue_in);
- tq_free(&e->queue_out);
-
- return (int)(intptr_t)ret;
-}
-
void enc_free(Encoder **penc)
{
Encoder *enc = *penc;
@@ -106,13 +66,6 @@ void enc_free(Encoder **penc)
if (!enc)
return;
- enc_thread_stop(enc);
-
- av_frame_free(&enc->sq_frame);
- av_frame_free(&enc->sub_frame);
-
- av_packet_free(&enc->pkt);
-
av_freep(penc);
}
@@ -127,25 +80,12 @@ int enc_alloc(Encoder **penc, const AVCodec *codec,
if (!enc)
return AVERROR(ENOMEM);
- if (codec->type == AVMEDIA_TYPE_SUBTITLE) {
- enc->sub_frame = av_frame_alloc();
- if (!enc->sub_frame)
- goto fail;
- }
-
- enc->pkt = av_packet_alloc();
- if (!enc->pkt)
- goto fail;
-
enc->sch = sch;
enc->sch_idx = sch_idx;
*penc = enc;
return 0;
-fail:
- enc_free(&enc);
- return AVERROR(ENOMEM);
}
static int hw_device_setup_for_encode(OutputStream *ost, AVBufferRef *frames_ref)
@@ -224,52 +164,9 @@ static int set_encoder_id(OutputFile *of, OutputStream *ost)
return 0;
}
-static int enc_thread_start(OutputStream *ost)
-{
- Encoder *e = ost->enc;
- ObjPool *op;
- int ret = 0;
-
- op = objpool_alloc_frames();
- if (!op)
- return AVERROR(ENOMEM);
-
- e->queue_in = tq_alloc(1, 1, op, frame_move);
- if (!e->queue_in) {
- objpool_free(&op);
- return AVERROR(ENOMEM);
- }
-
- op = objpool_alloc_packets();
- if (!op)
- goto fail;
-
- e->queue_out = tq_alloc(1, 4, op, pkt_move);
- if (!e->queue_out) {
- objpool_free(&op);
- goto fail;
- }
-
- ret = pthread_create(&e->thread, NULL, encoder_thread, ost);
- if (ret) {
- ret = AVERROR(ret);
- av_log(ost, AV_LOG_ERROR, "pthread_create() failed: %s\n",
- av_err2str(ret));
- goto fail;
- }
-
- return 0;
-fail:
- if (ret >= 0)
- ret = AVERROR(ENOMEM);
-
- tq_free(&e->queue_in);
- tq_free(&e->queue_out);
- return ret;
-}
-
-int enc_open(OutputStream *ost, const AVFrame *frame)
+int enc_open(void *opaque, const AVFrame *frame)
{
+ OutputStream *ost = opaque;
InputStream *ist = ost->ist;
Encoder *e = ost->enc;
AVCodecContext *enc_ctx = ost->enc_ctx;
@@ -277,6 +174,7 @@ int enc_open(OutputStream *ost, const AVFrame *frame)
const AVCodec *enc = enc_ctx->codec;
OutputFile *of = output_files[ost->file_index];
FrameData *fd;
+ int frame_samples = 0;
int ret;
if (e->opened)
@@ -420,17 +318,8 @@ int enc_open(OutputStream *ost, const AVFrame *frame)
e->opened = 1;
- if (ost->sq_idx_encode >= 0) {
- e->sq_frame = av_frame_alloc();
- if (!e->sq_frame)
- return AVERROR(ENOMEM);
- }
-
- if (ost->enc_ctx->frame_size) {
- av_assert0(ost->sq_idx_encode >= 0);
- sq_frame_samples(output_files[ost->file_index]->sq_encode,
- ost->sq_idx_encode, ost->enc_ctx->frame_size);
- }
+ if (ost->enc_ctx->frame_size)
+ frame_samples = ost->enc_ctx->frame_size;
ret = check_avoptions(ost->encoder_opts);
if (ret < 0)
@@ -476,18 +365,11 @@ int enc_open(OutputStream *ost, const AVFrame *frame)
if (ost->st->time_base.num <= 0 || ost->st->time_base.den <= 0)
ost->st->time_base = av_add_q(ost->enc_ctx->time_base, (AVRational){0, 1});
- ret = enc_thread_start(ost);
- if (ret < 0) {
- av_log(ost, AV_LOG_ERROR, "Error starting encoder thread: %s\n",
- av_err2str(ret));
- return ret;
- }
-
ret = of_stream_init(of, ost);
if (ret < 0)
return ret;
- return 0;
+ return frame_samples;
}
static int check_recording_time(OutputStream *ost, int64_t ts, AVRational tb)
@@ -514,8 +396,7 @@ static int do_subtitle_out(OutputFile *of, OutputStream *ost, const AVSubtitle *
av_log(ost, AV_LOG_ERROR, "Subtitle packets must have a pts\n");
return exit_on_error ? AVERROR(EINVAL) : 0;
}
- if (ost->finished ||
- (of->start_time != AV_NOPTS_VALUE && sub->pts < of->start_time))
+ if ((of->start_time != AV_NOPTS_VALUE && sub->pts < of->start_time))
return 0;
enc = ost->enc_ctx;
@@ -579,7 +460,7 @@ static int do_subtitle_out(OutputFile *of, OutputStream *ost, const AVSubtitle *
}
pkt->dts = pkt->pts;
- ret = tq_send(e->queue_out, 0, pkt);
+ ret = sch_enc_send(e->sch, e->sch_idx, pkt);
if (ret < 0) {
av_packet_unref(pkt);
return ret;
@@ -671,10 +552,13 @@ static int update_video_stats(OutputStream *ost, const AVPacket *pkt, int write_
int64_t frame_number;
double ti1, bitrate, avg_bitrate;
double psnr_val = -1;
+ int quality;
- ost->quality = sd ? AV_RL32(sd) : -1;
+ quality = sd ? AV_RL32(sd) : -1;
pict_type = sd ? sd[4] : AV_PICTURE_TYPE_NONE;
+ atomic_store(&ost->quality, quality);
+
if ((enc->flags & AV_CODEC_FLAG_PSNR) && sd && sd[5]) {
// FIXME the scaling assumes 8bit
double error = AV_RL64(sd + 8) / (enc->width * enc->height * 255.0 * 255.0);
@@ -697,10 +581,10 @@ static int update_video_stats(OutputStream *ost, const AVPacket *pkt, int write_
frame_number = e->packets_encoded;
if (vstats_version <= 1) {
fprintf(vstats_file, "frame= %5"PRId64" q= %2.1f ", frame_number,
- ost->quality / (float)FF_QP2LAMBDA);
+ quality / (float)FF_QP2LAMBDA);
} else {
fprintf(vstats_file, "out= %2d st= %2d frame= %5"PRId64" q= %2.1f ", ost->file_index, ost->index, frame_number,
- ost->quality / (float)FF_QP2LAMBDA);
+ quality / (float)FF_QP2LAMBDA);
}
if (psnr_val >= 0)
@@ -801,18 +685,11 @@ static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame,
av_ts2str(pkt->duration), av_ts2timestr(pkt->duration, &enc->time_base));
}
- if ((ret = trigger_fix_sub_duration_heartbeat(ost, pkt)) < 0) {
- av_log(NULL, AV_LOG_ERROR,
- "Subtitle heartbeat logic failed in %s! (%s)\n",
- __func__, av_err2str(ret));
- return ret;
- }
-
e->data_size += pkt->size;
e->packets_encoded++;
- ret = tq_send(e->queue_out, 0, pkt);
+ ret = sch_enc_send(e->sch, e->sch_idx, pkt);
if (ret < 0) {
av_packet_unref(pkt);
return ret;
@@ -822,50 +699,6 @@ static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame,
av_assert0(0);
}
-static int submit_encode_frame(OutputFile *of, OutputStream *ost,
- AVFrame *frame, AVPacket *pkt)
-{
- Encoder *e = ost->enc;
- int ret;
-
- if (ost->sq_idx_encode < 0)
- return encode_frame(of, ost, frame, pkt);
-
- if (frame) {
- ret = av_frame_ref(e->sq_frame, frame);
- if (ret < 0)
- return ret;
- frame = e->sq_frame;
- }
-
- ret = sq_send(of->sq_encode, ost->sq_idx_encode,
- SQFRAME(frame));
- if (ret < 0) {
- if (frame)
- av_frame_unref(frame);
- if (ret != AVERROR_EOF)
- return ret;
- }
-
- while (1) {
- AVFrame *enc_frame = e->sq_frame;
-
- ret = sq_receive(of->sq_encode, ost->sq_idx_encode,
- SQFRAME(enc_frame));
- if (ret == AVERROR_EOF) {
- enc_frame = NULL;
- } else if (ret < 0) {
- return (ret == AVERROR(EAGAIN)) ? 0 : ret;
- }
-
- ret = encode_frame(of, ost, enc_frame, pkt);
- if (enc_frame)
- av_frame_unref(enc_frame);
- if (ret < 0)
- return ret;
- }
-}
-
static int do_audio_out(OutputFile *of, OutputStream *ost,
AVFrame *frame, AVPacket *pkt)
{
@@ -881,7 +714,7 @@ static int do_audio_out(OutputFile *of, OutputStream *ost,
if (!check_recording_time(ost, frame->pts, frame->time_base))
return AVERROR_EOF;
- return submit_encode_frame(of, ost, frame, pkt);
+ return encode_frame(of, ost, frame, pkt);
}
static enum AVPictureType forced_kf_apply(void *logctx, KeyframeForceCtx *kf,
@@ -949,7 +782,7 @@ static int do_video_out(OutputFile *of, OutputStream *ost,
}
#endif
- return submit_encode_frame(of, ost, in_picture, pkt);
+ return encode_frame(of, ost, in_picture, pkt);
}
static int frame_encode(OutputStream *ost, AVFrame *frame, AVPacket *pkt)
@@ -958,9 +791,12 @@ static int frame_encode(OutputStream *ost, AVFrame *frame, AVPacket *pkt)
enum AVMediaType type = ost->type;
if (type == AVMEDIA_TYPE_SUBTITLE) {
+ const AVSubtitle *subtitle = frame && frame->buf[0] ?
+ (AVSubtitle*)frame->buf[0]->data : NULL;
+
// no flushing for subtitles
- return frame ?
- do_subtitle_out(of, ost, (AVSubtitle*)frame->buf[0]->data, pkt) : 0;
+ return subtitle && subtitle->num_rects ?
+ do_subtitle_out(of, ost, subtitle, pkt) : 0;
}
if (frame) {
@@ -968,7 +804,7 @@ static int frame_encode(OutputStream *ost, AVFrame *frame, AVPacket *pkt)
do_audio_out(of, ost, frame, pkt);
}
- return submit_encode_frame(of, ost, NULL, pkt);
+ return encode_frame(of, ost, NULL, pkt);
}
static void enc_thread_set_name(const OutputStream *ost)
@@ -1009,24 +845,50 @@ fail:
void *encoder_thread(void *arg)
{
OutputStream *ost = arg;
- OutputFile *of = output_files[ost->file_index];
Encoder *e = ost->enc;
EncoderThread et;
int ret = 0, input_status = 0;
+ int name_set = 0;
ret = enc_thread_init(&et);
if (ret < 0)
goto finish;
- enc_thread_set_name(ost);
+ /* Open the subtitle encoders immediately. AVFrame-based encoders
+ * are opened through a callback from the scheduler once they get
+ * their first frame
+ *
+ * N.B.: because the callback is called from a different thread,
+ * enc_ctx MUST NOT be accessed before sch_enc_receive() returns
+ * for the first time for audio/video. */
+ if (ost->type != AVMEDIA_TYPE_VIDEO && ost->type != AVMEDIA_TYPE_AUDIO) {
+ ret = enc_open(ost, NULL);
+ if (ret < 0)
+ goto finish;
+ }
while (!input_status) {
- int dummy;
-
- input_status = tq_receive(e->queue_in, &dummy, et.frame);
- if (input_status < 0)
+ input_status = sch_enc_receive(e->sch, e->sch_idx, et.frame);
+ if (input_status == AVERROR_EOF) {
av_log(ost, AV_LOG_VERBOSE, "Encoder thread received EOF\n");
+ if (!e->opened) {
+ av_log(ost, AV_LOG_ERROR, "Could not open encoder before EOF\n");
+ ret = AVERROR(EINVAL);
+ goto finish;
+ }
+ } else if (input_status < 0) {
+ ret = input_status;
+ av_log(ost, AV_LOG_ERROR, "Error receiving a frame for encoding: %s\n",
+ av_err2str(ret));
+ goto finish;
+ }
+
+ if (!name_set) {
+ enc_thread_set_name(ost);
+ name_set = 1;
+ }
+
ret = frame_encode(ost, input_status >= 0 ? et.frame : NULL, et.pkt);
av_packet_unref(et.pkt);
@@ -1040,15 +902,6 @@ void *encoder_thread(void *arg)
av_err2str(ret));
break;
}
-
- // signal to the consumer thread that the frame was encoded
- ret = tq_send(e->queue_out, 0, et.pkt);
- if (ret < 0) {
- if (ret != AVERROR_EOF)
- av_log(ost, AV_LOG_ERROR,
- "Error communicating with the main thread\n");
- break;
- }
}
// EOF is normal thread termination
@@ -1056,118 +909,7 @@ void *encoder_thread(void *arg)
ret = 0;
finish:
- if (ost->sq_idx_encode >= 0)
- sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL));
-
- tq_receive_finish(e->queue_in, 0);
- tq_send_finish (e->queue_out, 0);
-
enc_thread_uninit(&et);
- av_log(ost, AV_LOG_VERBOSE, "Terminating encoder thread\n");
-
return (void*)(intptr_t)ret;
}
-
-int enc_frame(OutputStream *ost, AVFrame *frame)
-{
- OutputFile *of = output_files[ost->file_index];
- Encoder *e = ost->enc;
- int ret, thread_ret;
-
- ret = enc_open(ost, frame);
- if (ret < 0)
- return ret;
-
- if (!e->queue_in)
- return AVERROR_EOF;
-
- // send the frame/EOF to the encoder thread
- if (frame) {
- ret = tq_send(e->queue_in, 0, frame);
- if (ret < 0)
- goto finish;
- } else
- tq_send_finish(e->queue_in, 0);
-
- // retrieve all encoded data for the frame
- while (1) {
- int dummy;
-
- ret = tq_receive(e->queue_out, &dummy, e->pkt);
- if (ret < 0)
- break;
-
- // frame fully encoded
- if (!e->pkt->data && !e->pkt->side_data_elems)
- return 0;
-
- // process the encoded packet
- ret = of_output_packet(of, ost, e->pkt);
- if (ret < 0)
- goto finish;
- }
-
-finish:
- thread_ret = enc_thread_stop(e);
- if (thread_ret < 0) {
- av_log(ost, AV_LOG_ERROR, "Encoder thread returned error: %s\n",
- av_err2str(thread_ret));
- ret = err_merge(ret, thread_ret);
- }
-
- if (ret < 0 && ret != AVERROR_EOF)
- return ret;
-
- // signal EOF to the muxer
- return of_output_packet(of, ost, NULL);
-}
-
-int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub)
-{
- Encoder *e = ost->enc;
- AVFrame *f = e->sub_frame;
- int ret;
-
- // XXX the queue for transferring data to the encoder thread runs
- // on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put
- // that inside the frame
- // eventually, subtitles should be switched to use AVFrames natively
- ret = subtitle_wrap_frame(f, sub, 1);
- if (ret < 0)
- return ret;
-
- ret = enc_frame(ost, f);
- av_frame_unref(f);
-
- return ret;
-}
-
-int enc_flush(void)
-{
- int ret = 0;
-
- for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
- OutputFile *of = output_files[ost->file_index];
- if (ost->sq_idx_encode >= 0)
- sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL));
- }
-
- for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
- Encoder *e = ost->enc;
- AVCodecContext *enc = ost->enc_ctx;
- int err;
-
- if (!enc || !e->opened ||
- (enc->codec_type != AVMEDIA_TYPE_VIDEO && enc->codec_type != AVMEDIA_TYPE_AUDIO))
- continue;
-
- err = enc_frame(ost, NULL);
- if (err != AVERROR_EOF && ret < 0)
- ret = err_merge(ret, err);
-
- av_assert0(!e->queue_in);
- }
-
- return ret;
-}
@@ -21,8 +21,6 @@
#include <stdint.h>
#include "ffmpeg.h"
-#include "ffmpeg_utils.h"
-#include "thread_queue.h"
#include "libavfilter/avfilter.h"
#include "libavfilter/buffersink.h"
@@ -53,10 +51,11 @@ typedef struct FilterGraphPriv {
// true when the filtergraph contains only meta filters
// that do not modify the frame data
int is_meta;
+ // source filters are present in the graph
+ int have_sources;
int disable_conversions;
- int nb_inputs_bound;
- int nb_outputs_bound;
+ unsigned nb_outputs_done;
const char *graph_desc;
@@ -67,41 +66,6 @@ typedef struct FilterGraphPriv {
Scheduler *sch;
unsigned sch_idx;
-
- pthread_t thread;
- /**
- * Queue for sending frames from the main thread to the filtergraph. Has
- * nb_inputs+1 streams - the first nb_inputs stream correspond to
- * filtergraph inputs. Frames on those streams may have their opaque set to
- * - FRAME_OPAQUE_EOF: frame contains no data, but pts+timebase of the
- * EOF event for the correspondint stream. Will be immediately followed by
- * this stream being send-closed.
- * - FRAME_OPAQUE_SUB_HEARTBEAT: frame contains no data, but pts+timebase of
- * a subtitle heartbeat event. Will only be sent for sub2video streams.
- *
- * The last stream is "control" - the main thread sends empty AVFrames with
- * opaque set to
- * - FRAME_OPAQUE_REAP_FILTERS: a request to retrieve all frame available
- * from filtergraph outputs. These frames are sent to corresponding
- * streams in queue_out. Finally an empty frame is sent to the control
- * stream in queue_out.
- * - FRAME_OPAQUE_CHOOSE_INPUT: same as above, but in case no frames are
- * available the terminating empty frame's opaque will contain the index+1
- * of the filtergraph input to which more input frames should be supplied.
- */
- ThreadQueue *queue_in;
- /**
- * Queue for sending frames from the filtergraph back to the main thread.
- * Has nb_outputs+1 streams - the first nb_outputs stream correspond to
- * filtergraph outputs.
- *
- * The last stream is "control" - see documentation for queue_in for more
- * details.
- */
- ThreadQueue *queue_out;
- // submitting frames to filter thread returned EOF
- // this only happens on thread exit, so is not per-input
- int eof_in;
} FilterGraphPriv;
static FilterGraphPriv *fgp_from_fg(FilterGraph *fg)
@@ -123,6 +87,9 @@ typedef struct FilterGraphThread {
// The output index is stored in frame opaque.
AVFifo *frame_queue_out;
+ // index of the next input to request from the scheduler
+ unsigned next_in;
+ // set to 1 after at least one frame passed through this output
int got_frame;
// EOF status of each input/output, as received by the thread
@@ -253,9 +220,6 @@ typedef struct OutputFilterPriv {
int64_t ts_offset;
int64_t next_pts;
FPSConvContext fps;
-
- // set to 1 after at least one frame passed through this output
- int got_frame;
} OutputFilterPriv;
static OutputFilterPriv *ofp_from_ofilter(OutputFilter *ofilter)
@@ -653,57 +617,6 @@ static int ifilter_has_all_input_formats(FilterGraph *fg)
static void *filter_thread(void *arg);
-// start the filtering thread once all inputs and outputs are bound
-static int fg_thread_try_start(FilterGraphPriv *fgp)
-{
- FilterGraph *fg = &fgp->fg;
- ObjPool *op;
- int ret = 0;
-
- if (fgp->nb_inputs_bound < fg->nb_inputs ||
- fgp->nb_outputs_bound < fg->nb_outputs)
- return 0;
-
- op = objpool_alloc_frames();
- if (!op)
- return AVERROR(ENOMEM);
-
- fgp->queue_in = tq_alloc(fg->nb_inputs + 1, 1, op, frame_move);
- if (!fgp->queue_in) {
- objpool_free(&op);
- return AVERROR(ENOMEM);
- }
-
- // at least one output is mandatory
- op = objpool_alloc_frames();
- if (!op)
- goto fail;
-
- fgp->queue_out = tq_alloc(fg->nb_outputs + 1, 1, op, frame_move);
- if (!fgp->queue_out) {
- objpool_free(&op);
- goto fail;
- }
-
- ret = pthread_create(&fgp->thread, NULL, filter_thread, fgp);
- if (ret) {
- ret = AVERROR(ret);
- av_log(NULL, AV_LOG_ERROR, "pthread_create() for filtergraph %d failed: %s\n",
- fg->index, av_err2str(ret));
- goto fail;
- }
-
- return 0;
-fail:
- if (ret >= 0)
- ret = AVERROR(ENOMEM);
-
- tq_free(&fgp->queue_in);
- tq_free(&fgp->queue_out);
-
- return ret;
-}
-
static char *describe_filter_link(FilterGraph *fg, AVFilterInOut *inout, int in)
{
AVFilterContext *ctx = inout->filter_ctx;
@@ -729,7 +642,6 @@ static OutputFilter *ofilter_alloc(FilterGraph *fg)
ofilter->graph = fg;
ofp->format = -1;
ofp->index = fg->nb_outputs - 1;
- ofilter->last_pts = AV_NOPTS_VALUE;
return ofilter;
}
@@ -760,10 +672,7 @@ static int ifilter_bind_ist(InputFilter *ifilter, InputStream *ist)
return AVERROR(ENOMEM);
}
- fgp->nb_inputs_bound++;
- av_assert0(fgp->nb_inputs_bound <= ifilter->graph->nb_inputs);
-
- return fg_thread_try_start(fgp);
+ return 0;
}
static int set_channel_layout(OutputFilterPriv *f, OutputStream *ost)
@@ -902,10 +811,7 @@ int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost,
if (ret < 0)
return ret;
- fgp->nb_outputs_bound++;
- av_assert0(fgp->nb_outputs_bound <= fg->nb_outputs);
-
- return fg_thread_try_start(fgp);
+ return 0;
}
static InputFilter *ifilter_alloc(FilterGraph *fg)
@@ -935,34 +841,6 @@ static InputFilter *ifilter_alloc(FilterGraph *fg)
return ifilter;
}
-static int fg_thread_stop(FilterGraphPriv *fgp)
-{
- void *ret;
-
- if (!fgp->queue_in)
- return 0;
-
- for (int i = 0; i <= fgp->fg.nb_inputs; i++) {
- InputFilterPriv *ifp = i < fgp->fg.nb_inputs ?
- ifp_from_ifilter(fgp->fg.inputs[i]) : NULL;
-
- if (ifp)
- ifp->eof = 1;
-
- tq_send_finish(fgp->queue_in, i);
- }
-
- for (int i = 0; i <= fgp->fg.nb_outputs; i++)
- tq_receive_finish(fgp->queue_out, i);
-
- pthread_join(fgp->thread, &ret);
-
- tq_free(&fgp->queue_in);
- tq_free(&fgp->queue_out);
-
- return (int)(intptr_t)ret;
-}
-
void fg_free(FilterGraph **pfg)
{
FilterGraph *fg = *pfg;
@@ -972,8 +850,6 @@ void fg_free(FilterGraph **pfg)
return;
fgp = fgp_from_fg(fg);
- fg_thread_stop(fgp);
-
avfilter_graph_free(&fg->graph);
for (int j = 0; j < fg->nb_inputs; j++) {
InputFilter *ifilter = fg->inputs[j];
@@ -1072,6 +948,15 @@ int fg_create(FilterGraph **pfg, char *graph_desc, Scheduler *sch)
if (ret < 0)
goto fail;
+ for (unsigned i = 0; i < graph->nb_filters; i++) {
+ const AVFilter *f = graph->filters[i]->filter;
+ if (!avfilter_filter_pad_count(f, 0) &&
+ !(f->flags & AVFILTER_FLAG_DYNAMIC_INPUTS)) {
+ fgp->have_sources = 1;
+ break;
+ }
+ }
+
for (AVFilterInOut *cur = inputs; cur; cur = cur->next) {
InputFilter *const ifilter = ifilter_alloc(fg);
InputFilterPriv *ifp;
@@ -1800,6 +1685,7 @@ static int configure_filtergraph(FilterGraph *fg, const FilterGraphThread *fgt)
AVBufferRef *hw_device;
AVFilterInOut *inputs, *outputs, *cur;
int ret, i, simple = filtergraph_is_simple(fg);
+ int have_input_eof = 0;
const char *graph_desc = fgp->graph_desc;
cleanup_filtergraph(fg);
@@ -1922,11 +1808,18 @@ static int configure_filtergraph(FilterGraph *fg, const FilterGraphThread *fgt)
ret = av_buffersrc_add_frame(ifp->filter, NULL);
if (ret < 0)
goto fail;
+ have_input_eof = 1;
}
}
- return 0;
+ if (have_input_eof) {
+ // make sure the EOF propagates to the end of the graph
+ ret = avfilter_graph_request_oldest(fg->graph);
+ if (ret < 0 && ret != AVERROR(EAGAIN) && ret != AVERROR_EOF)
+ goto fail;
+ }
+ return 0;
fail:
cleanup_filtergraph(fg);
return ret;
@@ -2182,7 +2075,7 @@ static void video_sync_process(OutputFilterPriv *ofp, AVFrame *frame,
fps->frames_prev_hist[2]);
if (!*nb_frames && fps->last_dropped) {
- ofilter->nb_frames_drop++;
+ atomic_fetch_add(&ofilter->nb_frames_drop, 1);
fps->last_dropped++;
}
@@ -2260,21 +2153,23 @@ finish:
fps->frames_prev_hist[0] = *nb_frames_prev;
if (*nb_frames_prev == 0 && fps->last_dropped) {
- ofilter->nb_frames_drop++;
+ atomic_fetch_add(&ofilter->nb_frames_drop, 1);
av_log(ost, AV_LOG_VERBOSE,
"*** dropping frame %"PRId64" at ts %"PRId64"\n",
fps->frame_number, fps->last_frame->pts);
}
if (*nb_frames > (*nb_frames_prev && fps->last_dropped) + (*nb_frames > *nb_frames_prev)) {
+ uint64_t nb_frames_dup;
if (*nb_frames > dts_error_threshold * 30) {
av_log(ost, AV_LOG_ERROR, "%"PRId64" frame duplication too large, skipping\n", *nb_frames - 1);
- ofilter->nb_frames_drop++;
+ atomic_fetch_add(&ofilter->nb_frames_drop, 1);
*nb_frames = 0;
return;
}
- ofilter->nb_frames_dup += *nb_frames - (*nb_frames_prev && fps->last_dropped) - (*nb_frames > *nb_frames_prev);
+ nb_frames_dup = atomic_fetch_add(&ofilter->nb_frames_dup,
+ *nb_frames - (*nb_frames_prev && fps->last_dropped) - (*nb_frames > *nb_frames_prev));
av_log(ost, AV_LOG_VERBOSE, "*** %"PRId64" dup!\n", *nb_frames - 1);
- if (ofilter->nb_frames_dup > fps->dup_warning) {
+ if (nb_frames_dup > fps->dup_warning) {
av_log(ost, AV_LOG_WARNING, "More than %"PRIu64" frames duplicated\n", fps->dup_warning);
fps->dup_warning *= 10;
}
@@ -2284,8 +2179,57 @@ finish:
fps->dropped_keyframe |= fps->last_dropped && (frame->flags & AV_FRAME_FLAG_KEY);
}
+static int close_output(OutputFilterPriv *ofp, FilterGraphThread *fgt)
+{
+ FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph);
+ int ret;
+
+ // we are finished and no frames were ever seen at this output,
+ // at least initialize the encoder with a dummy frame
+ if (!fgt->got_frame) {
+ AVFrame *frame = fgt->frame;
+ FrameData *fd;
+
+ frame->time_base = ofp->tb_out;
+ frame->format = ofp->format;
+
+ frame->width = ofp->width;
+ frame->height = ofp->height;
+ frame->sample_aspect_ratio = ofp->sample_aspect_ratio;
+
+ frame->sample_rate = ofp->sample_rate;
+ if (ofp->ch_layout.nb_channels) {
+ ret = av_channel_layout_copy(&frame->ch_layout, &ofp->ch_layout);
+ if (ret < 0)
+ return ret;
+ }
+
+ fd = frame_data(frame);
+ if (!fd)
+ return AVERROR(ENOMEM);
+
+ fd->frame_rate_filter = ofp->fps.framerate;
+
+ av_assert0(!frame->buf[0]);
+
+ av_log(ofp->ofilter.ost, AV_LOG_WARNING,
+ "No filtered frames for output stream, trying to "
+ "initialize anyway.\n");
+
+ ret = sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, frame);
+ if (ret < 0) {
+ av_frame_unref(frame);
+ return ret;
+ }
+ }
+
+ fgt->eof_out[ofp->index] = 1;
+
+ return sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, NULL);
+}
+
static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt,
- AVFrame *frame, int buffer)
+ AVFrame *frame)
{
FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph);
AVFrame *frame_prev = ofp->fps.last_frame;
@@ -2332,28 +2276,17 @@ static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt,
frame_out = frame;
}
- if (buffer) {
- AVFrame *f = av_frame_alloc();
-
- if (!f) {
- av_frame_unref(frame_out);
- return AVERROR(ENOMEM);
- }
-
- av_frame_move_ref(f, frame_out);
- f->opaque = (void*)(intptr_t)ofp->index;
-
- ret = av_fifo_write(fgt->frame_queue_out, &f, 1);
- if (ret < 0) {
- av_frame_free(&f);
- return AVERROR(ENOMEM);
- }
- } else {
- // return the frame to the main thread
- ret = tq_send(fgp->queue_out, ofp->index, frame_out);
+ {
+ // send the frame to consumers
+ ret = sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, frame_out);
if (ret < 0) {
av_frame_unref(frame_out);
- fgt->eof_out[ofp->index] = 1;
+
+ if (!fgt->eof_out[ofp->index]) {
+ fgt->eof_out[ofp->index] = 1;
+ fgp->nb_outputs_done++;
+ }
+
return ret == AVERROR_EOF ? 0 : ret;
}
}
@@ -2374,16 +2307,14 @@ static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt,
av_frame_move_ref(frame_prev, frame);
}
- if (!frame) {
- tq_send_finish(fgp->queue_out, ofp->index);
- fgt->eof_out[ofp->index] = 1;
- }
+ if (!frame)
+ return close_output(ofp, fgt);
return 0;
}
static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
- AVFrame *frame, int buffer)
+ AVFrame *frame)
{
FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph);
OutputStream *ost = ofp->ofilter.ost;
@@ -2393,8 +2324,8 @@ static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
ret = av_buffersink_get_frame_flags(filter, frame,
AV_BUFFERSINK_FLAG_NO_REQUEST);
- if (ret == AVERROR_EOF && !buffer && !fgt->eof_out[ofp->index]) {
- ret = fg_output_frame(ofp, fgt, NULL, buffer);
+ if (ret == AVERROR_EOF && !fgt->eof_out[ofp->index]) {
+ ret = fg_output_frame(ofp, fgt, NULL);
return (ret < 0) ? ret : 1;
} else if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) {
return 1;
@@ -2448,7 +2379,7 @@ static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
fd->frame_rate_filter = ofp->fps.framerate;
}
- ret = fg_output_frame(ofp, fgt, frame, buffer);
+ ret = fg_output_frame(ofp, fgt, frame);
av_frame_unref(frame);
if (ret < 0)
return ret;
@@ -2456,44 +2387,68 @@ static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
return 0;
}
-/* retrieve all frames available at filtergraph outputs and either send them to
- * the main thread (buffer=0) or buffer them for later (buffer=1) */
+/* retrieve all frames available at filtergraph outputs
+ * and send them to consumers */
static int read_frames(FilterGraph *fg, FilterGraphThread *fgt,
- AVFrame *frame, int buffer)
+ AVFrame *frame)
{
FilterGraphPriv *fgp = fgp_from_fg(fg);
- int ret = 0;
+ int did_step = 0;
- if (!fg->graph)
- return 0;
-
- // process buffered frames
- if (!buffer) {
- AVFrame *f;
-
- while (av_fifo_read(fgt->frame_queue_out, &f, 1) >= 0) {
- int out_idx = (intptr_t)f->opaque;
- f->opaque = NULL;
- ret = tq_send(fgp->queue_out, out_idx, f);
- av_frame_free(&f);
- if (ret < 0 && ret != AVERROR_EOF)
- return ret;
+ // graph not configured, just select the input to request
+ if (!fg->graph) {
+ for (int i = 0; i < fg->nb_inputs; i++) {
+ InputFilterPriv *ifp = ifp_from_ifilter(fg->inputs[i]);
+ if (ifp->format < 0 && !fgt->eof_in[i]) {
+ fgt->next_in = i;
+ return 0;
+ }
}
+
+ // This state - graph is not configured, but all inputs are either
+ // initialized or EOF - should be unreachable because sending EOF to a
+ // filter without even a fallback format should fail
+ av_assert0(0);
+ return AVERROR_BUG;
}
- /* Reap all buffers present in the buffer sinks */
- for (int i = 0; i < fg->nb_outputs; i++) {
- OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]);
- int ret = 0;
+ while (1) {
+ int ret;
- while (!ret) {
- ret = fg_output_step(ofp, fgt, frame, buffer);
- if (ret < 0)
- return ret;
+ ret = avfilter_graph_request_oldest(fg->graph);
+ if (ret == AVERROR(EAGAIN)) {
+ fgt->next_in = choose_input(fg, fgt);
+ break;
+ } else if (ret < 0) {
+ if (ret == AVERROR_EOF)
+ av_log(fg, AV_LOG_VERBOSE, "Filtergraph returned EOF, finishing\n");
+ else
+ av_log(fg, AV_LOG_ERROR,
+ "Error requesting a frame from the filtergraph: %s\n",
+ av_err2str(ret));
+ return ret;
}
- }
+ fgt->next_in = fg->nb_inputs;
- return 0;
+ // return after one iteration, so that scheduler can rate-control us
+ if (did_step && fgp->have_sources)
+ return 0;
+
+ /* Reap all buffers present in the buffer sinks */
+ for (int i = 0; i < fg->nb_outputs; i++) {
+ OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]);
+
+ ret = 0;
+ while (!ret) {
+ ret = fg_output_step(ofp, fgt, frame);
+ if (ret < 0)
+ return ret;
+ }
+ }
+ did_step = 1;
+ };
+
+ return (fgp->nb_outputs_done == fg->nb_outputs) ? AVERROR_EOF : 0;
}
static void sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb)
@@ -2571,6 +2526,9 @@ static int send_eof(FilterGraphThread *fgt, InputFilter *ifilter,
InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
int ret;
+ if (fgt->eof_in[ifp->index])
+ return 0;
+
fgt->eof_in[ifp->index] = 1;
if (ifp->filter) {
@@ -2672,7 +2630,7 @@ static int send_frame(FilterGraph *fg, FilterGraphThread *fgt,
return ret;
}
- ret = fg->graph ? read_frames(fg, fgt, tmp, 1) : 0;
+ ret = fg->graph ? read_frames(fg, fgt, tmp) : 0;
av_frame_free(&tmp);
if (ret < 0)
return ret;
@@ -2705,82 +2663,6 @@ static int send_frame(FilterGraph *fg, FilterGraphThread *fgt,
return 0;
}
-static int msg_process(FilterGraphPriv *fgp, FilterGraphThread *fgt,
- AVFrame *frame)
-{
- const enum FrameOpaque msg = (intptr_t)frame->opaque;
- FilterGraph *fg = &fgp->fg;
- int graph_eof = 0;
- int ret;
-
- frame->opaque = NULL;
- av_assert0(msg > 0);
- av_assert0(msg == FRAME_OPAQUE_SEND_COMMAND || !frame->buf[0]);
-
- if (!fg->graph) {
- // graph not configured yet, ignore all messages other than choosing
- // the input to read from
- if (msg != FRAME_OPAQUE_CHOOSE_INPUT) {
- av_frame_unref(frame);
- goto done;
- }
-
- for (int i = 0; i < fg->nb_inputs; i++) {
- InputFilter *ifilter = fg->inputs[i];
- InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
- if (ifp->format < 0 && !fgt->eof_in[i]) {
- frame->opaque = (void*)(intptr_t)(i + 1);
- goto done;
- }
- }
-
- // This state - graph is not configured, but all inputs are either
- // initialized or EOF - should be unreachable because sending EOF to a
- // filter without even a fallback format should fail
- av_assert0(0);
- return AVERROR_BUG;
- }
-
- if (msg == FRAME_OPAQUE_SEND_COMMAND) {
- FilterCommand *fc = (FilterCommand*)frame->buf[0]->data;
- send_command(fg, fc->time, fc->target, fc->command, fc->arg, fc->all_filters);
- av_frame_unref(frame);
- goto done;
- }
-
- if (msg == FRAME_OPAQUE_CHOOSE_INPUT) {
- ret = avfilter_graph_request_oldest(fg->graph);
-
- graph_eof = ret == AVERROR_EOF;
-
- if (ret == AVERROR(EAGAIN)) {
- frame->opaque = (void*)(intptr_t)(choose_input(fg, fgt) + 1);
- goto done;
- } else if (ret < 0 && !graph_eof)
- return ret;
- }
-
- ret = read_frames(fg, fgt, frame, 0);
- if (ret < 0) {
- av_log(fg, AV_LOG_ERROR, "Error sending filtered frames for encoding\n");
- return ret;
- }
-
- if (graph_eof)
- return AVERROR_EOF;
-
- // signal to the main thread that we are done processing the message
-done:
- ret = tq_send(fgp->queue_out, fg->nb_outputs, frame);
- if (ret < 0) {
- if (ret != AVERROR_EOF)
- av_log(fg, AV_LOG_ERROR, "Error communicating with the main thread\n");
- return ret;
- }
-
- return 0;
-}
-
static void fg_thread_set_name(const FilterGraph *fg)
{
char name[16];
@@ -2867,294 +2749,94 @@ static void *filter_thread(void *arg)
InputFilter *ifilter;
InputFilterPriv *ifp;
enum FrameOpaque o;
- int input_idx, eof_frame;
+ unsigned input_idx = fgt.next_in;
- input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame);
- if (input_idx < 0 ||
- (input_idx == fg->nb_inputs && input_status < 0)) {
+ input_status = sch_filter_receive(fgp->sch, fgp->sch_idx,
+ &input_idx, fgt.frame);
+ if (input_status == AVERROR_EOF) {
av_log(fg, AV_LOG_VERBOSE, "Filtering thread received EOF\n");
break;
+ } else if (input_status == AVERROR(EAGAIN)) {
+ // should only happen when we didn't request any input
+ av_assert0(input_idx == fg->nb_inputs);
+ goto read_frames;
}
+ av_assert0(input_status >= 0);
+
+ o = (intptr_t)fgt.frame->opaque;
o = (intptr_t)fgt.frame->opaque;
// message on the control stream
if (input_idx == fg->nb_inputs) {
- ret = msg_process(fgp, &fgt, fgt.frame);
- if (ret < 0)
- goto finish;
+ FilterCommand *fc;
+ av_assert0(o == FRAME_OPAQUE_SEND_COMMAND && fgt.frame->buf[0]);
+
+ fc = (FilterCommand*)fgt.frame->buf[0]->data;
+ send_command(fg, fc->time, fc->target, fc->command, fc->arg,
+ fc->all_filters);
+ av_frame_unref(fgt.frame);
continue;
}
// we received an input frame or EOF
ifilter = fg->inputs[input_idx];
ifp = ifp_from_ifilter(ifilter);
- eof_frame = input_status >= 0 && o == FRAME_OPAQUE_EOF;
+
if (ifp->type_src == AVMEDIA_TYPE_SUBTITLE) {
int hb_frame = input_status >= 0 && o == FRAME_OPAQUE_SUB_HEARTBEAT;
ret = sub2video_frame(ifilter, (fgt.frame->buf[0] || hb_frame) ? fgt.frame : NULL);
- } else if (input_status >= 0 && fgt.frame->buf[0]) {
+ } else if (fgt.frame->buf[0]) {
ret = send_frame(fg, &fgt, ifilter, fgt.frame);
} else {
- int64_t pts = input_status >= 0 ? fgt.frame->pts : AV_NOPTS_VALUE;
- AVRational tb = input_status >= 0 ? fgt.frame->time_base : (AVRational){ 1, 1 };
- ret = send_eof(&fgt, ifilter, pts, tb);
+ av_assert1(o == FRAME_OPAQUE_EOF);
+ ret = send_eof(&fgt, ifilter, fgt.frame->pts, fgt.frame->time_base);
}
av_frame_unref(fgt.frame);
if (ret < 0)
+ goto finish;
+
+read_frames:
+ // retrieve all newly avalable frames
+ ret = read_frames(fg, &fgt, fgt.frame);
+ if (ret == AVERROR_EOF) {
+ av_log(fg, AV_LOG_VERBOSE, "All consumers returned EOF\n");
break;
-
- if (eof_frame) {
- // an EOF frame is immediately followed by sender closing
- // the corresponding stream, so retrieve that event
- input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame);
- av_assert0(input_status == AVERROR_EOF && input_idx == ifp->index);
- }
-
- // signal to the main thread that we are done
- ret = tq_send(fgp->queue_out, fg->nb_outputs, fgt.frame);
- if (ret < 0) {
- if (ret == AVERROR_EOF)
- break;
-
- av_log(fg, AV_LOG_ERROR, "Error communicating with the main thread\n");
+ } else if (ret < 0) {
+ av_log(fg, AV_LOG_ERROR, "Error sending frames to consumers: %s\n",
+ av_err2str(ret));
goto finish;
}
}
+ for (unsigned i = 0; i < fg->nb_outputs; i++) {
+ OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]);
+
+ if (fgt.eof_out[i])
+ continue;
+
+ ret = fg_output_frame(ofp, &fgt, NULL);
+ if (ret < 0)
+ goto finish;
+ }
+
finish:
// EOF is normal termination
if (ret == AVERROR_EOF)
ret = 0;
- for (int i = 0; i <= fg->nb_inputs; i++)
- tq_receive_finish(fgp->queue_in, i);
- for (int i = 0; i <= fg->nb_outputs; i++)
- tq_send_finish(fgp->queue_out, i);
-
fg_thread_uninit(&fgt);
- av_log(fg, AV_LOG_VERBOSE, "Terminating filtering thread\n");
-
return (void*)(intptr_t)ret;
}
-static int thread_send_frame(FilterGraphPriv *fgp, InputFilter *ifilter,
- AVFrame *frame, enum FrameOpaque type)
-{
- InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
- int output_idx, ret;
-
- if (ifp->eof) {
- av_frame_unref(frame);
- return AVERROR_EOF;
- }
-
- frame->opaque = (void*)(intptr_t)type;
-
- ret = tq_send(fgp->queue_in, ifp->index, frame);
- if (ret < 0) {
- ifp->eof = 1;
- av_frame_unref(frame);
- return ret;
- }
-
- if (type == FRAME_OPAQUE_EOF)
- tq_send_finish(fgp->queue_in, ifp->index);
-
- // wait for the frame to be processed
- ret = tq_receive(fgp->queue_out, &output_idx, frame);
- av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF);
-
- return ret;
-}
-
-int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference)
-{
- FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
- int ret;
-
- if (keep_reference) {
- ret = av_frame_ref(fgp->frame, frame);
- if (ret < 0)
- return ret;
- } else
- av_frame_move_ref(fgp->frame, frame);
-
- return thread_send_frame(fgp, ifilter, fgp->frame, 0);
-}
-
-int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb)
-{
- FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
- int ret;
-
- fgp->frame->pts = pts;
- fgp->frame->time_base = tb;
-
- ret = thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_EOF);
-
- return ret == AVERROR_EOF ? 0 : ret;
-}
-
-void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb)
-{
- FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
-
- fgp->frame->pts = pts;
- fgp->frame->time_base = tb;
-
- thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_SUB_HEARTBEAT);
-}
-
-int fg_transcode_step(FilterGraph *graph, InputStream **best_ist)
-{
- FilterGraphPriv *fgp = fgp_from_fg(graph);
- int ret, got_frames = 0;
-
- if (fgp->eof_in)
- return AVERROR_EOF;
-
- // signal to the filtering thread to return all frames it can
- av_assert0(!fgp->frame->buf[0]);
- fgp->frame->opaque = (void*)(intptr_t)(best_ist ?
- FRAME_OPAQUE_CHOOSE_INPUT :
- FRAME_OPAQUE_REAP_FILTERS);
-
- ret = tq_send(fgp->queue_in, graph->nb_inputs, fgp->frame);
- if (ret < 0) {
- fgp->eof_in = 1;
- goto finish;
- }
-
- while (1) {
- OutputFilter *ofilter;
- OutputFilterPriv *ofp;
- OutputStream *ost;
- int output_idx;
-
- ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame);
-
- // EOF on the whole queue or the control stream
- if (output_idx < 0 ||
- (ret < 0 && output_idx == graph->nb_outputs))
- goto finish;
-
- // EOF for a specific stream
- if (ret < 0) {
- ofilter = graph->outputs[output_idx];
- ofp = ofp_from_ofilter(ofilter);
-
- // we are finished and no frames were ever seen at this output,
- // at least initialize the encoder with a dummy frame
- if (!ofp->got_frame) {
- AVFrame *frame = fgp->frame;
- FrameData *fd;
-
- frame->time_base = ofp->tb_out;
- frame->format = ofp->format;
-
- frame->width = ofp->width;
- frame->height = ofp->height;
- frame->sample_aspect_ratio = ofp->sample_aspect_ratio;
-
- frame->sample_rate = ofp->sample_rate;
- if (ofp->ch_layout.nb_channels) {
- ret = av_channel_layout_copy(&frame->ch_layout, &ofp->ch_layout);
- if (ret < 0)
- return ret;
- }
-
- fd = frame_data(frame);
- if (!fd)
- return AVERROR(ENOMEM);
-
- fd->frame_rate_filter = ofp->fps.framerate;
-
- av_assert0(!frame->buf[0]);
-
- av_log(ofilter->ost, AV_LOG_WARNING,
- "No filtered frames for output stream, trying to "
- "initialize anyway.\n");
-
- enc_open(ofilter->ost, frame);
- av_frame_unref(frame);
- }
-
- close_output_stream(graph->outputs[output_idx]->ost);
- continue;
- }
-
- // request was fully processed by the filtering thread,
- // return the input stream to read from, if needed
- if (output_idx == graph->nb_outputs) {
- int input_idx = (intptr_t)fgp->frame->opaque - 1;
- av_assert0(input_idx <= graph->nb_inputs);
-
- if (best_ist) {
- *best_ist = (input_idx >= 0 && input_idx < graph->nb_inputs) ?
- ifp_from_ifilter(graph->inputs[input_idx])->ist : NULL;
-
- if (input_idx < 0 && !got_frames) {
- for (int i = 0; i < graph->nb_outputs; i++)
- graph->outputs[i]->ost->unavailable = 1;
- }
- }
- break;
- }
-
- // got a frame from the filtering thread, send it for encoding
- ofilter = graph->outputs[output_idx];
- ost = ofilter->ost;
- ofp = ofp_from_ofilter(ofilter);
-
- if (ost->finished) {
- av_frame_unref(fgp->frame);
- tq_receive_finish(fgp->queue_out, output_idx);
- continue;
- }
-
- if (fgp->frame->pts != AV_NOPTS_VALUE) {
- ofilter->last_pts = av_rescale_q(fgp->frame->pts,
- fgp->frame->time_base,
- AV_TIME_BASE_Q);
- }
-
- ret = enc_frame(ost, fgp->frame);
- av_frame_unref(fgp->frame);
- if (ret < 0)
- goto finish;
-
- ofp->got_frame = 1;
- got_frames = 1;
- }
-
-finish:
- if (ret < 0) {
- fgp->eof_in = 1;
- for (int i = 0; i < graph->nb_outputs; i++)
- close_output_stream(graph->outputs[i]->ost);
- }
-
- return ret;
-}
-
-int reap_filters(FilterGraph *fg, int flush)
-{
- return fg_transcode_step(fg, NULL);
-}
-
void fg_send_command(FilterGraph *fg, double time, const char *target,
const char *command, const char *arg, int all_filters)
{
FilterGraphPriv *fgp = fgp_from_fg(fg);
AVBufferRef *buf;
FilterCommand *fc;
- int output_idx, ret;
-
- if (!fgp->queue_in)
- return;
fc = av_mallocz(sizeof(*fc));
if (!fc)
@@ -3180,13 +2862,5 @@ void fg_send_command(FilterGraph *fg, double time, const char *target,
fgp->frame->buf[0] = buf;
fgp->frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_SEND_COMMAND;
- ret = tq_send(fgp->queue_in, fg->nb_inputs, fgp->frame);
- if (ret < 0) {
- av_frame_unref(fgp->frame);
- return;
- }
-
- // wait for the frame to be processed
- ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame);
- av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF);
+ sch_filter_command(fgp->sch, fgp->sch_idx, fgp->frame);
}
@@ -23,16 +23,13 @@
#include "ffmpeg.h"
#include "ffmpeg_mux.h"
#include "ffmpeg_utils.h"
-#include "objpool.h"
#include "sync_queue.h"
-#include "thread_queue.h"
#include "libavutil/fifo.h"
#include "libavutil/intreadwrite.h"
#include "libavutil/log.h"
#include "libavutil/mem.h"
#include "libavutil/timestamp.h"
-#include "libavutil/thread.h"
#include "libavcodec/packet.h"
@@ -41,10 +38,9 @@
typedef struct MuxThreadContext {
AVPacket *pkt;
+ AVPacket *fix_sub_duration_pkt;
} MuxThreadContext;
-int want_sdp = 1;
-
static Muxer *mux_from_of(OutputFile *of)
{
return (Muxer*)of;
@@ -207,14 +203,41 @@ static int sync_queue_process(Muxer *mux, OutputStream *ost, AVPacket *pkt, int
return 0;
}
+static int of_streamcopy(OutputStream *ost, AVPacket *pkt);
+
/* apply the output bitstream filters */
-static int mux_packet_filter(Muxer *mux, OutputStream *ost,
- AVPacket *pkt, int *stream_eof)
+static int mux_packet_filter(Muxer *mux, MuxThreadContext *mt,
+ OutputStream *ost, AVPacket *pkt, int *stream_eof)
{
MuxStream *ms = ms_from_ost(ost);
const char *err_msg;
int ret = 0;
+ if (pkt && !ost->enc) {
+ ret = of_streamcopy(ost, pkt);
+ if (ret == AVERROR(EAGAIN))
+ return 0;
+ else if (ret == AVERROR_EOF) {
+ av_packet_unref(pkt);
+ pkt = NULL;
+ ret = 0;
+ } else if (ret < 0)
+ goto fail;
+ }
+
+ // emit heartbeat for -fix_sub_duration;
+ // we are only interested in heartbeats on on random access points.
+ if (pkt && (pkt->flags & AV_PKT_FLAG_KEY)) {
+ mt->fix_sub_duration_pkt->opaque = (void*)(intptr_t)PKT_OPAQUE_FIX_SUB_DURATION;
+ mt->fix_sub_duration_pkt->pts = pkt->pts;
+ mt->fix_sub_duration_pkt->time_base = pkt->time_base;
+
+ ret = sch_mux_sub_heartbeat(mux->sch, mux->sch_idx, ms->sch_idx,
+ mt->fix_sub_duration_pkt);
+ if (ret < 0)
+ goto fail;
+ }
+
if (ms->bsf_ctx) {
int bsf_eof = 0;
@@ -278,6 +301,7 @@ static void thread_set_name(OutputFile *of)
static void mux_thread_uninit(MuxThreadContext *mt)
{
av_packet_free(&mt->pkt);
+ av_packet_free(&mt->fix_sub_duration_pkt);
memset(mt, 0, sizeof(*mt));
}
@@ -290,6 +314,10 @@ static int mux_thread_init(MuxThreadContext *mt)
if (!mt->pkt)
goto fail;
+ mt->fix_sub_duration_pkt = av_packet_alloc();
+ if (!mt->fix_sub_duration_pkt)
+ goto fail;
+
return 0;
fail:
@@ -316,19 +344,22 @@ void *muxer_thread(void *arg)
OutputStream *ost;
int stream_idx, stream_eof = 0;
- ret = tq_receive(mux->tq, &stream_idx, mt.pkt);
+ ret = sch_mux_receive(mux->sch, of->index, mt.pkt);
+ stream_idx = mt.pkt->stream_index;
if (stream_idx < 0) {
av_log(mux, AV_LOG_VERBOSE, "All streams finished\n");
ret = 0;
break;
}
- ost = of->streams[stream_idx];
- ret = mux_packet_filter(mux, ost, ret < 0 ? NULL : mt.pkt, &stream_eof);
+ ost = of->streams[mux->sch_stream_idx[stream_idx]];
+ mt.pkt->stream_index = ost->index;
+
+ ret = mux_packet_filter(mux, &mt, ost, ret < 0 ? NULL : mt.pkt, &stream_eof);
av_packet_unref(mt.pkt);
if (ret == AVERROR_EOF) {
if (stream_eof) {
- tq_receive_finish(mux->tq, stream_idx);
+ sch_mux_receive_finish(mux->sch, of->index, stream_idx);
} else {
av_log(mux, AV_LOG_VERBOSE, "Muxer returned EOF\n");
ret = 0;
@@ -343,243 +374,55 @@ void *muxer_thread(void *arg)
finish:
mux_thread_uninit(&mt);
- for (unsigned int i = 0; i < mux->fc->nb_streams; i++)
- tq_receive_finish(mux->tq, i);
-
- av_log(mux, AV_LOG_VERBOSE, "Terminating muxer thread\n");
-
return (void*)(intptr_t)ret;
}
-static int thread_submit_packet(Muxer *mux, OutputStream *ost, AVPacket *pkt)
-{
- int ret = 0;
-
- if (!pkt || ost->finished & MUXER_FINISHED)
- goto finish;
-
- ret = tq_send(mux->tq, ost->index, pkt);
- if (ret < 0)
- goto finish;
-
- return 0;
-
-finish:
- if (pkt)
- av_packet_unref(pkt);
-
- ost->finished |= MUXER_FINISHED;
- tq_send_finish(mux->tq, ost->index);
- return ret == AVERROR_EOF ? 0 : ret;
-}
-
-static int queue_packet(OutputStream *ost, AVPacket *pkt)
-{
- MuxStream *ms = ms_from_ost(ost);
- AVPacket *tmp_pkt = NULL;
- int ret;
-
- if (!av_fifo_can_write(ms->muxing_queue)) {
- size_t cur_size = av_fifo_can_read(ms->muxing_queue);
- size_t pkt_size = pkt ? pkt->size : 0;
- unsigned int are_we_over_size =
- (ms->muxing_queue_data_size + pkt_size) > ms->muxing_queue_data_threshold;
- size_t limit = are_we_over_size ? ms->max_muxing_queue_size : SIZE_MAX;
- size_t new_size = FFMIN(2 * cur_size, limit);
-
- if (new_size <= cur_size) {
- av_log(ost, AV_LOG_ERROR,
- "Too many packets buffered for output stream %d:%d.\n",
- ost->file_index, ost->st->index);
- return AVERROR(ENOSPC);
- }
- ret = av_fifo_grow2(ms->muxing_queue, new_size - cur_size);
- if (ret < 0)
- return ret;
- }
-
- if (pkt) {
- ret = av_packet_make_refcounted(pkt);
- if (ret < 0)
- return ret;
-
- tmp_pkt = av_packet_alloc();
- if (!tmp_pkt)
- return AVERROR(ENOMEM);
-
- av_packet_move_ref(tmp_pkt, pkt);
- ms->muxing_queue_data_size += tmp_pkt->size;
- }
- av_fifo_write(ms->muxing_queue, &tmp_pkt, 1);
-
- return 0;
-}
-
-static int submit_packet(Muxer *mux, AVPacket *pkt, OutputStream *ost)
-{
- int ret;
-
- if (mux->tq) {
- return thread_submit_packet(mux, ost, pkt);
- } else {
- /* the muxer is not initialized yet, buffer the packet */
- ret = queue_packet(ost, pkt);
- if (ret < 0) {
- if (pkt)
- av_packet_unref(pkt);
- return ret;
- }
- }
-
- return 0;
-}
-
-int of_output_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
-{
- Muxer *mux = mux_from_of(of);
- int ret = 0;
-
- if (pkt && pkt->dts != AV_NOPTS_VALUE)
- ost->last_mux_dts = av_rescale_q(pkt->dts, pkt->time_base, AV_TIME_BASE_Q);
-
- ret = submit_packet(mux, pkt, ost);
- if (ret < 0) {
- av_log(ost, AV_LOG_ERROR, "Error submitting a packet to the muxer: %s",
- av_err2str(ret));
- return ret;
- }
-
- return 0;
-}
-
-int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts)
+static int of_streamcopy(OutputStream *ost, AVPacket *pkt)
{
OutputFile *of = output_files[ost->file_index];
MuxStream *ms = ms_from_ost(ost);
+ DemuxPktData *pd = pkt->opaque_ref ? (DemuxPktData*)pkt->opaque_ref->data : NULL;
+ int64_t dts = pd ? pd->dts_est : AV_NOPTS_VALUE;
int64_t start_time = (of->start_time == AV_NOPTS_VALUE) ? 0 : of->start_time;
int64_t ts_offset;
- AVPacket *opkt = ms->pkt;
- int ret;
-
- av_packet_unref(opkt);
if (of->recording_time != INT64_MAX &&
dts >= of->recording_time + start_time)
- pkt = NULL;
-
- // EOF: flush output bitstream filters.
- if (!pkt)
- return of_output_packet(of, ost, NULL);
+ return AVERROR_EOF;
if (!ms->streamcopy_started && !(pkt->flags & AV_PKT_FLAG_KEY) &&
!ms->copy_initial_nonkeyframes)
- return 0;
+ return AVERROR(EAGAIN);
if (!ms->streamcopy_started) {
if (!ms->copy_prior_start &&
(pkt->pts == AV_NOPTS_VALUE ?
dts < ms->ts_copy_start :
pkt->pts < av_rescale_q(ms->ts_copy_start, AV_TIME_BASE_Q, pkt->time_base)))
- return 0;
+ return AVERROR(EAGAIN);
if (of->start_time != AV_NOPTS_VALUE && dts < of->start_time)
- return 0;
+ return AVERROR(EAGAIN);
}
- ret = av_packet_ref(opkt, pkt);
- if (ret < 0)
- return ret;
-
- ts_offset = av_rescale_q(start_time, AV_TIME_BASE_Q, opkt->time_base);
+ ts_offset = av_rescale_q(start_time, AV_TIME_BASE_Q, pkt->time_base);
if (pkt->pts != AV_NOPTS_VALUE)
- opkt->pts -= ts_offset;
+ pkt->pts -= ts_offset;
if (pkt->dts == AV_NOPTS_VALUE) {
- opkt->dts = av_rescale_q(dts, AV_TIME_BASE_Q, opkt->time_base);
+ pkt->dts = av_rescale_q(dts, AV_TIME_BASE_Q, pkt->time_base);
} else if (ost->st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) {
- opkt->pts = opkt->dts - ts_offset;
- }
- opkt->dts -= ts_offset;
-
- {
- int ret = trigger_fix_sub_duration_heartbeat(ost, pkt);
- if (ret < 0) {
- av_log(NULL, AV_LOG_ERROR,
- "Subtitle heartbeat logic failed in %s! (%s)\n",
- __func__, av_err2str(ret));
- return ret;
- }
+ pkt->pts = pkt->dts - ts_offset;
}
- ret = of_output_packet(of, ost, opkt);
- if (ret < 0)
- return ret;
+ pkt->dts -= ts_offset;
ms->streamcopy_started = 1;
return 0;
}
-static int thread_stop(Muxer *mux)
-{
- void *ret;
-
- if (!mux || !mux->tq)
- return 0;
-
- for (unsigned int i = 0; i < mux->fc->nb_streams; i++)
- tq_send_finish(mux->tq, i);
-
- pthread_join(mux->thread, &ret);
-
- tq_free(&mux->tq);
-
- return (int)(intptr_t)ret;
-}
-
-static int thread_start(Muxer *mux)
-{
- AVFormatContext *fc = mux->fc;
- ObjPool *op;
- int ret;
-
- op = objpool_alloc_packets();
- if (!op)
- return AVERROR(ENOMEM);
-
- mux->tq = tq_alloc(fc->nb_streams, mux->thread_queue_size, op, pkt_move);
- if (!mux->tq) {
- objpool_free(&op);
- return AVERROR(ENOMEM);
- }
-
- ret = pthread_create(&mux->thread, NULL, muxer_thread, (void*)mux);
- if (ret) {
- tq_free(&mux->tq);
- return AVERROR(ret);
- }
-
- /* flush the muxing queues */
- for (int i = 0; i < fc->nb_streams; i++) {
- OutputStream *ost = mux->of.streams[i];
- MuxStream *ms = ms_from_ost(ost);
- AVPacket *pkt;
-
- while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) {
- ret = thread_submit_packet(mux, ost, pkt);
- if (pkt) {
- ms->muxing_queue_data_size -= pkt->size;
- av_packet_free(&pkt);
- }
- if (ret < 0)
- return ret;
- }
- }
-
- return 0;
-}
-
int print_sdp(const char *filename);
int print_sdp(const char *filename)
@@ -590,11 +433,6 @@ int print_sdp(const char *filename)
AVIOContext *sdp_pb;
AVFormatContext **avc;
- for (i = 0; i < nb_output_files; i++) {
- if (!mux_from_of(output_files[i])->header_written)
- return 0;
- }
-
avc = av_malloc_array(nb_output_files, sizeof(*avc));
if (!avc)
return AVERROR(ENOMEM);
@@ -629,25 +467,17 @@ int print_sdp(const char *filename)
avio_closep(&sdp_pb);
}
- // SDP successfully written, allow muxer threads to start
- ret = 1;
-
fail:
av_freep(&avc);
return ret;
}
-int mux_check_init(Muxer *mux)
+int mux_check_init(void *arg)
{
+ Muxer *mux = arg;
OutputFile *of = &mux->of;
AVFormatContext *fc = mux->fc;
- int ret, i;
-
- for (i = 0; i < fc->nb_streams; i++) {
- OutputStream *ost = of->streams[i];
- if (!ost->initialized)
- return 0;
- }
+ int ret;
ret = avformat_write_header(fc, &mux->opts);
if (ret < 0) {
@@ -659,27 +489,7 @@ int mux_check_init(Muxer *mux)
mux->header_written = 1;
av_dump_format(fc, of->index, fc->url, 1);
- nb_output_dumped++;
-
- if (sdp_filename || want_sdp) {
- ret = print_sdp(sdp_filename);
- if (ret < 0) {
- av_log(NULL, AV_LOG_ERROR, "Error writing the SDP.\n");
- return ret;
- } else if (ret == 1) {
- /* SDP is written only after all the muxers are ready, so now we
- * start ALL the threads */
- for (i = 0; i < nb_output_files; i++) {
- ret = thread_start(mux_from_of(output_files[i]));
- if (ret < 0)
- return ret;
- }
- }
- } else {
- ret = thread_start(mux_from_of(of));
- if (ret < 0)
- return ret;
- }
+ atomic_fetch_add(&nb_output_dumped, 1);
return 0;
}
@@ -736,9 +546,10 @@ int of_stream_init(OutputFile *of, OutputStream *ost)
ost->st->time_base);
}
- ost->initialized = 1;
+ if (ms->sch_idx >= 0)
+ return sch_mux_stream_ready(mux->sch, of->index, ms->sch_idx);
- return mux_check_init(mux);
+ return 0;
}
static int check_written(OutputFile *of)
@@ -852,15 +663,13 @@ int of_write_trailer(OutputFile *of)
AVFormatContext *fc = mux->fc;
int ret, mux_result = 0;
- if (!mux->tq) {
+ if (!mux->header_written) {
av_log(mux, AV_LOG_ERROR,
"Nothing was written into output file, because "
"at least one of its streams received no packets.\n");
return AVERROR(EINVAL);
}
- mux_result = thread_stop(mux);
-
ret = av_write_trailer(fc);
if (ret < 0) {
av_log(mux, AV_LOG_ERROR, "Error writing trailer: %s\n", av_err2str(ret));
@@ -905,13 +714,6 @@ static void ost_free(OutputStream **post)
ost->logfile = NULL;
}
- if (ms->muxing_queue) {
- AVPacket *pkt;
- while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0)
- av_packet_free(&pkt);
- av_fifo_freep2(&ms->muxing_queue);
- }
-
avcodec_parameters_free(&ost->par_in);
av_bsf_free(&ms->bsf_ctx);
@@ -976,8 +778,6 @@ void of_free(OutputFile **pof)
return;
mux = mux_from_of(of);
- thread_stop(mux);
-
sq_free(&of->sq_encode);
sq_free(&mux->sq_mux);
@@ -25,7 +25,6 @@
#include <stdint.h>
#include "ffmpeg_sched.h"
-#include "thread_queue.h"
#include "libavformat/avformat.h"
@@ -33,7 +32,6 @@
#include "libavutil/dict.h"
#include "libavutil/fifo.h"
-#include "libavutil/thread.h"
typedef struct MuxStream {
OutputStream ost;
@@ -41,9 +39,6 @@ typedef struct MuxStream {
// name used for logging
char log_name[32];
- /* the packets are buffered here until the muxer is ready to be initialized */
- AVFifo *muxing_queue;
-
AVBSFContext *bsf_ctx;
AVPacket *bsf_pkt;
@@ -57,17 +52,6 @@ typedef struct MuxStream {
int64_t max_frames;
- /*
- * The size of the AVPackets' buffers in queue.
- * Updated when a packet is either pushed or pulled from the queue.
- */
- size_t muxing_queue_data_size;
-
- int max_muxing_queue_size;
-
- /* Threshold after which max_muxing_queue_size will be in effect */
- size_t muxing_queue_data_threshold;
-
// timestamp from which the streamcopied streams should start,
// in AV_TIME_BASE_Q;
// everything before it should be discarded
@@ -106,9 +90,6 @@ typedef struct Muxer {
int *sch_stream_idx;
int nb_sch_stream_idx;
- pthread_t thread;
- ThreadQueue *tq;
-
AVDictionary *opts;
int thread_queue_size;
@@ -122,10 +103,7 @@ typedef struct Muxer {
AVPacket *sq_pkt;
} Muxer;
-/* whether we want to print an SDP, set in of_open() */
-extern int want_sdp;
-
-int mux_check_init(Muxer *mux);
+int mux_check_init(void *arg);
static MuxStream *ms_from_ost(OutputStream *ost)
{
@@ -924,13 +924,6 @@ static int new_stream_audio(Muxer *mux, const OptionsContext *o,
return 0;
}
-static int new_stream_attachment(Muxer *mux, const OptionsContext *o,
- OutputStream *ost)
-{
- ost->finished = 1;
- return 0;
-}
-
static int new_stream_subtitle(Muxer *mux, const OptionsContext *o,
OutputStream *ost)
{
@@ -1168,9 +1161,6 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
if (!ost->par_in)
return AVERROR(ENOMEM);
- ms->muxing_queue = av_fifo_alloc2(8, sizeof(AVPacket*), 0);
- if (!ms->muxing_queue)
- return AVERROR(ENOMEM);
ms->last_mux_dts = AV_NOPTS_VALUE;
ost->st = st;
@@ -1190,7 +1180,8 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
if (!ost->enc_ctx)
return AVERROR(ENOMEM);
- ret = sch_add_enc(mux->sch, encoder_thread, ost, NULL);
+ ret = sch_add_enc(mux->sch, encoder_thread, ost,
+ ost->type == AVMEDIA_TYPE_SUBTITLE ? NULL : enc_open);
if (ret < 0)
return ret;
ms->sch_idx_enc = ret;
@@ -1414,9 +1405,6 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
sch_mux_stream_buffering(mux->sch, mux->sch_idx, ms->sch_idx,
max_muxing_queue_size, muxing_queue_data_threshold);
-
- ms->max_muxing_queue_size = max_muxing_queue_size;
- ms->muxing_queue_data_threshold = muxing_queue_data_threshold;
}
MATCH_PER_STREAM_OPT(bits_per_raw_sample, i, ost->bits_per_raw_sample,
@@ -1434,8 +1422,6 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
if (ost->enc_ctx && av_get_exact_bits_per_sample(ost->enc_ctx->codec_id) == 24)
av_dict_set(&ost->swr_opts, "output_sample_bits", "24", 0);
- ost->last_mux_dts = AV_NOPTS_VALUE;
-
MATCH_PER_STREAM_OPT(copy_initial_nonkeyframes, i,
ms->copy_initial_nonkeyframes, oc, st);
@@ -1443,7 +1429,6 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
case AVMEDIA_TYPE_VIDEO: ret = new_stream_video (mux, o, ost); break;
case AVMEDIA_TYPE_AUDIO: ret = new_stream_audio (mux, o, ost); break;
case AVMEDIA_TYPE_SUBTITLE: ret = new_stream_subtitle (mux, o, ost); break;
- case AVMEDIA_TYPE_ATTACHMENT: ret = new_stream_attachment(mux, o, ost); break;
}
if (ret < 0)
return ret;
@@ -1938,7 +1923,6 @@ static int setup_sync_queues(Muxer *mux, AVFormatContext *oc, int64_t buf_size_u
MuxStream *ms = ms_from_ost(ost);
enum AVMediaType type = ost->type;
- ost->sq_idx_encode = -1;
ost->sq_idx_mux = -1;
nb_interleaved += IS_INTERLEAVED(type);
@@ -1961,11 +1945,17 @@ static int setup_sync_queues(Muxer *mux, AVFormatContext *oc, int64_t buf_size_u
* - at least one encoded audio/video stream is frame-limited, since
* that has similar semantics to 'shortest'
* - at least one audio encoder requires constant frame sizes
+ *
+ * Note that encoding sync queues are handled in the scheduler, because
+ * different encoders run in different threads and need external
+ * synchronization, while muxer sync queues can be handled inside the muxer
*/
if ((of->shortest && nb_av_enc > 1) || limit_frames_av_enc || nb_audio_fs) {
- of->sq_encode = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, mux);
- if (!of->sq_encode)
- return AVERROR(ENOMEM);
+ int sq_idx, ret;
+
+ sq_idx = sch_add_sq_enc(mux->sch, buf_size_us, mux);
+ if (sq_idx < 0)
+ return sq_idx;
for (int i = 0; i < oc->nb_streams; i++) {
OutputStream *ost = of->streams[i];
@@ -1975,13 +1965,11 @@ static int setup_sync_queues(Muxer *mux, AVFormatContext *oc, int64_t buf_size_u
if (!IS_AV_ENC(ost, type))
continue;
- ost->sq_idx_encode = sq_add_stream(of->sq_encode,
- of->shortest || ms->max_frames < INT64_MAX);
- if (ost->sq_idx_encode < 0)
- return ost->sq_idx_encode;
-
- if (ms->max_frames != INT64_MAX)
- sq_limit_frames(of->sq_encode, ost->sq_idx_encode, ms->max_frames);
+ ret = sch_sq_add_enc(mux->sch, sq_idx, ms->sch_idx_enc,
+ of->shortest || ms->max_frames < INT64_MAX,
+ ms->max_frames);
+ if (ret < 0)
+ return ret;
}
}
@@ -2652,23 +2640,6 @@ static int validate_enc_avopt(Muxer *mux, const AVDictionary *codec_avopt)
return 0;
}
-static int init_output_stream_nofilter(OutputStream *ost)
-{
- int ret = 0;
-
- if (ost->enc_ctx) {
- ret = enc_open(ost, NULL);
- if (ret < 0)
- return ret;
- } else {
- ret = of_stream_init(output_files[ost->file_index], ost);
- if (ret < 0)
- return ret;
- }
-
- return ret;
-}
-
static const char *output_file_item_name(void *obj)
{
const Muxer *mux = obj;
@@ -2751,8 +2722,6 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
av_strlcat(mux->log_name, "/", sizeof(mux->log_name));
av_strlcat(mux->log_name, oc->oformat->name, sizeof(mux->log_name));
- if (strcmp(oc->oformat->name, "rtp"))
- want_sdp = 0;
of->format = oc->oformat;
if (recording_time != INT64_MAX)
@@ -2768,7 +2737,7 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
AVFMT_FLAG_BITEXACT);
}
- err = sch_add_mux(sch, muxer_thread, NULL, mux,
+ err = sch_add_mux(sch, muxer_thread, mux_check_init, mux,
!strcmp(oc->oformat->name, "rtp"));
if (err < 0)
return err;
@@ -2854,26 +2823,15 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
of->url = filename;
- /* initialize stream copy and subtitle/data streams.
- * Encoded AVFrame based streams will get initialized when the first AVFrame
- * is received in do_video_out
- */
+ /* initialize streamcopy streams. */
for (int i = 0; i < of->nb_streams; i++) {
OutputStream *ost = of->streams[i];
- if (ost->filter)
- continue;
-
- err = init_output_stream_nofilter(ost);
- if (err < 0)
- return err;
- }
-
- /* write the header for files with no streams */
- if (of->format->flags & AVFMT_NOSTREAMS && oc->nb_streams == 0) {
- int ret = mux_check_init(mux);
- if (ret < 0)
- return ret;
+ if (!ost->enc) {
+ err = of_stream_init(of, ost);
+ if (err < 0)
+ return err;
+ }
}
return 0;
@@ -64,7 +64,6 @@ const char *const opt_name_top_field_first[] = {"top", NULL};
HWDevice *filter_hw_device;
char *vstats_filename;
-char *sdp_filename;
float audio_drift_threshold = 0.1;
float dts_delta_threshold = 10;
@@ -580,9 +579,8 @@ fail:
static int opt_sdp_file(void *optctx, const char *opt, const char *arg)
{
- av_free(sdp_filename);
- sdp_filename = av_strdup(arg);
- return 0;
+ Scheduler *sch = optctx;
+ return sch_sdp_filename(sch, arg);
}
#if CONFIG_VAAPI
@@ -1,48 +1,40 @@
1
-00:00:00,968 --> 00:00:01,001
+00:00:00,968 --> 00:00:01,168
<font face="Monospace">{\an7}(</font>
2
-00:00:01,001 --> 00:00:01,168
-<font face="Monospace">{\an7}(</font>
-
-3
00:00:01,168 --> 00:00:01,368
<font face="Monospace">{\an7}(<i> inaudibl</i></font>
-4
+3
00:00:01,368 --> 00:00:01,568
<font face="Monospace">{\an7}(<i> inaudible radio chat</i></font>
-5
+4
00:00:01,568 --> 00:00:02,002
<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font>
+5
+00:00:02,002 --> 00:00:03,103
+<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font>
+
6
-00:00:02,002 --> 00:00:03,003
-<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font>
-
-7
-00:00:03,003 --> 00:00:03,103
-<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font>
-
-8
00:00:03,103 --> 00:00:03,303
-<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
+<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
>></font>
-9
+7
00:00:03,303 --> 00:00:03,503
-<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
+<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
>> Safety rema</font>
-10
+8
00:00:03,504 --> 00:00:03,704
-<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
+<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
>> Safety remains our numb</font>
-11
+9
00:00:03,704 --> 00:00:04,004
-<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
+<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
>> Safety remains our number one</font>