@@ -312,9 +312,6 @@ typedef struct OutputFilter {
enum AVMediaType type;
- /* pts of the last frame received from this filter, in AV_TIME_BASE_Q */
- int64_t last_pts;
-
uint64_t nb_frames_dup;
uint64_t nb_frames_drop;
} OutputFilter;
@@ -745,8 +742,6 @@ int subtitle_wrap_frame(AVFrame *frame, AVSubtitle *subtitle, int copy);
*/
FrameData *frame_data(AVFrame *frame);
-int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference);
-int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb);
void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb);
/**
@@ -769,26 +764,9 @@ int fg_create(FilterGraph **pfg, char *graph_desc, Scheduler *sch);
void fg_free(FilterGraph **pfg);
-/**
- * Perform a step of transcoding for the specified filter graph.
- *
- * @param[in] graph filter graph to consider
- * @param[out] best_ist input stream where a frame would allow to continue
- * @return 0 for success, <0 for error
- */
-int fg_transcode_step(FilterGraph *graph, InputStream **best_ist);
-
void fg_send_command(FilterGraph *fg, double time, const char *target,
const char *command, const char *arg, int all_filters);
-/**
- * Get and encode new output from specified filtergraph, without causing
- * activity.
- *
- * @return 0 for success, <0 for severe errors
- */
-int reap_filters(FilterGraph *fg, int flush);
-
int ffmpeg_parse_options(int argc, char **argv, Scheduler *sch);
void enc_stats_write(OutputStream *ost, EncStats *es,
@@ -21,7 +21,6 @@
#include <stdint.h>
#include "ffmpeg.h"
-#include "thread_queue.h"
#include "libavfilter/avfilter.h"
#include "libavfilter/buffersink.h"
@@ -46,7 +45,6 @@ enum FrameOpaque {
FRAME_OPAQUE_REAP_FILTERS = 1,
FRAME_OPAQUE_CHOOSE_INPUT,
FRAME_OPAQUE_SUB_HEARTBEAT,
- FRAME_OPAQUE_EOF,
FRAME_OPAQUE_SEND_COMMAND,
};
@@ -62,8 +60,7 @@ typedef struct FilterGraphPriv {
int is_meta;
int disable_conversions;
- int nb_inputs_bound;
- int nb_outputs_bound;
+ unsigned nb_outputs_done;
const char *graph_desc;
@@ -75,14 +72,10 @@ typedef struct FilterGraphPriv {
Scheduler *sch;
unsigned sch_idx;
- pthread_t thread;
/**
* Queue for sending frames from the main thread to the filtergraph. Has
* nb_inputs+1 streams - the first nb_inputs stream correspond to
* filtergraph inputs. Frames on those streams may have their opaque set to
- * - FRAME_OPAQUE_EOF: frame contains no data, but pts+timebase of the
- * EOF event for the correspondint stream. Will be immediately followed by
- * this stream being send-closed.
* - FRAME_OPAQUE_SUB_HEARTBEAT: frame contains no data, but pts+timebase of
* a subtitle heartbeat event. Will only be sent for sub2video streams.
*
@@ -96,7 +89,7 @@ typedef struct FilterGraphPriv {
* available the terminating empty frame's opaque will contain the index+1
* of the filtergraph input to which more input frames should be supplied.
*/
- ThreadQueue *queue_in;
+
/**
* Queue for sending frames from the filtergraph back to the main thread.
* Has nb_outputs+1 streams - the first nb_outputs stream correspond to
@@ -105,7 +98,7 @@ typedef struct FilterGraphPriv {
* The last stream is "control" - see documentation for queue_in for more
* details.
*/
- ThreadQueue *queue_out;
+ //ThreadQueue *queue_out;
// submitting frames to filter thread returned EOF
// this only happens on thread exit, so is not per-input
int eof_in;
@@ -130,6 +123,7 @@ typedef struct FilterGraphThread {
// The output index is stored in frame opaque.
AVFifo *frame_queue_out;
+ // set to 1 after at least one frame passed through this output
int got_frame;
// EOF status of each input/output, as received by the thread
@@ -260,9 +254,6 @@ typedef struct OutputFilterPriv {
int64_t ts_offset;
int64_t next_pts;
FPSConvContext fps;
-
- // set to 1 after at least one frame passed through this output
- int got_frame;
} OutputFilterPriv;
static OutputFilterPriv *ofp_from_ofilter(OutputFilter *ofilter)
@@ -660,57 +651,6 @@ static int ifilter_has_all_input_formats(FilterGraph *fg)
static void *filter_thread(void *arg);
-// start the filtering thread once all inputs and outputs are bound
-static int fg_thread_try_start(FilterGraphPriv *fgp)
-{
- FilterGraph *fg = &fgp->fg;
- ObjPool *op;
- int ret = 0;
-
- if (fgp->nb_inputs_bound < fg->nb_inputs ||
- fgp->nb_outputs_bound < fg->nb_outputs)
- return 0;
-
- op = objpool_alloc_frames();
- if (!op)
- return AVERROR(ENOMEM);
-
- fgp->queue_in = tq_alloc(fg->nb_inputs + 1, 1, op, frame_move);
- if (!fgp->queue_in) {
- objpool_free(&op);
- return AVERROR(ENOMEM);
- }
-
- // at least one output is mandatory
- op = objpool_alloc_frames();
- if (!op)
- goto fail;
-
- fgp->queue_out = tq_alloc(fg->nb_outputs + 1, 1, op, frame_move);
- if (!fgp->queue_out) {
- objpool_free(&op);
- goto fail;
- }
-
- ret = pthread_create(&fgp->thread, NULL, filter_thread, fgp);
- if (ret) {
- ret = AVERROR(ret);
- av_log(NULL, AV_LOG_ERROR, "pthread_create() for filtergraph %d failed: %s\n",
- fg->index, av_err2str(ret));
- goto fail;
- }
-
- return 0;
-fail:
- if (ret >= 0)
- ret = AVERROR(ENOMEM);
-
- tq_free(&fgp->queue_in);
- tq_free(&fgp->queue_out);
-
- return ret;
-}
-
static char *describe_filter_link(FilterGraph *fg, AVFilterInOut *inout, int in)
{
AVFilterContext *ctx = inout->filter_ctx;
@@ -736,7 +676,6 @@ static OutputFilter *ofilter_alloc(FilterGraph *fg)
ofilter->graph = fg;
ofp->format = -1;
ofp->index = fg->nb_outputs - 1;
- ofilter->last_pts = AV_NOPTS_VALUE;
return ofilter;
}
@@ -768,10 +707,7 @@ static int ifilter_bind_ist(InputFilter *ifilter, InputStream *ist)
return AVERROR(ENOMEM);
}
- fgp->nb_inputs_bound++;
- av_assert0(fgp->nb_inputs_bound <= ifilter->graph->nb_inputs);
-
- return fg_thread_try_start(fgp);
+ return 0;
}
static int set_channel_layout(OutputFilterPriv *f, OutputStream *ost)
@@ -910,10 +846,7 @@ int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost,
if (ret < 0)
return ret;
- fgp->nb_outputs_bound++;
- av_assert0(fgp->nb_outputs_bound <= fg->nb_outputs);
-
- return fg_thread_try_start(fgp);
+ return 0;
}
static InputFilter *ifilter_alloc(FilterGraph *fg)
@@ -943,34 +876,6 @@ static InputFilter *ifilter_alloc(FilterGraph *fg)
return ifilter;
}
-static int fg_thread_stop(FilterGraphPriv *fgp)
-{
- void *ret;
-
- if (!fgp->queue_in)
- return 0;
-
- for (int i = 0; i <= fgp->fg.nb_inputs; i++) {
- InputFilterPriv *ifp = i < fgp->fg.nb_inputs ?
- ifp_from_ifilter(fgp->fg.inputs[i]) : NULL;
-
- if (ifp)
- ifp->eof = 1;
-
- tq_send_finish(fgp->queue_in, i);
- }
-
- for (int i = 0; i <= fgp->fg.nb_outputs; i++)
- tq_receive_finish(fgp->queue_out, i);
-
- pthread_join(fgp->thread, &ret);
-
- tq_free(&fgp->queue_in);
- tq_free(&fgp->queue_out);
-
- return (int)(intptr_t)ret;
-}
-
void fg_free(FilterGraph **pfg)
{
FilterGraph *fg = *pfg;
@@ -980,8 +885,6 @@ void fg_free(FilterGraph **pfg)
return;
fgp = fgp_from_fg(fg);
- fg_thread_stop(fgp);
-
avfilter_graph_free(&fg->graph);
for (int j = 0; j < fg->nb_inputs; j++) {
InputFilter *ifilter = fg->inputs[j];
@@ -2253,8 +2156,56 @@ finish:
fps->dropped_keyframe |= fps->last_dropped && (frame->flags & AV_FRAME_FLAG_KEY);
}
+static int close_output(OutputFilterPriv *ofp, FilterGraphThread *fgt)
+{
+ FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph);
+ int ret;
+
+ // we are finished and no frames were ever seen at this output,
+ // at least initialize the encoder with a dummy frame
+ if (!fgt->got_frame) {
+ AVFrame *frame = fgt->frame;
+ FrameData *fd;
+
+ frame->time_base = ofp->tb_out;
+ frame->format = ofp->format;
+
+ frame->width = ofp->width;
+ frame->height = ofp->height;
+ frame->sample_aspect_ratio = ofp->sample_aspect_ratio;
+
+ frame->sample_rate = ofp->sample_rate;
+ if (ofp->ch_layout.nb_channels) {
+ ret = av_channel_layout_copy(&frame->ch_layout, &ofp->ch_layout);
+ if (ret < 0)
+ return ret;
+ }
+
+ fd = frame_data(frame);
+ if (!fd)
+ return AVERROR(ENOMEM);
+
+ fd->frame_rate_filter = ofp->fps.framerate;
+
+ av_assert0(!frame->buf[0]);
+
+ av_log(ofp->ofilter.ost, AV_LOG_WARNING,
+ "No filtered frames for output stream, trying to "
+ "initialize anyway.\n");
+
+ ret = sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, frame);
+ av_frame_unref(frame);
+ if (ret < 0)
+ return ret;
+ }
+
+ fgt->eof_out[ofp->index] = 1;
+
+ return sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, NULL);
+}
+
static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt,
- AVFrame *frame, int buffer)
+ AVFrame *frame)
{
FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph);
AVFrame *frame_prev = ofp->fps.last_frame;
@@ -2301,28 +2252,17 @@ static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt,
frame_out = frame;
}
- if (buffer) {
- AVFrame *f = av_frame_alloc();
-
- if (!f) {
- av_frame_unref(frame_out);
- return AVERROR(ENOMEM);
- }
-
- av_frame_move_ref(f, frame_out);
- f->opaque = (void*)(intptr_t)ofp->index;
-
- ret = av_fifo_write(fgt->frame_queue_out, &f, 1);
- if (ret < 0) {
- av_frame_free(&f);
- return AVERROR(ENOMEM);
- }
- } else {
- // return the frame to the main thread
- ret = tq_send(fgp->queue_out, ofp->index, frame_out);
+ {
+ // send the frame to consumers
+ ret = sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, frame_out);
if (ret < 0) {
av_frame_unref(frame_out);
- fgt->eof_out[ofp->index] = 1;
+
+ if (!fgt->eof_out[ofp->index]) {
+ fgt->eof_out[ofp->index] = 1;
+ fgp->nb_outputs_done++;
+ }
+
return ret == AVERROR_EOF ? 0 : ret;
}
}
@@ -2343,16 +2283,14 @@ static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt,
av_frame_move_ref(frame_prev, frame);
}
- if (!frame) {
- tq_send_finish(fgp->queue_out, ofp->index);
- fgt->eof_out[ofp->index] = 1;
- }
+ if (!frame)
+ return close_output(ofp, fgt);
return 0;
}
static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
- AVFrame *frame, int buffer)
+ AVFrame *frame)
{
FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph);
OutputStream *ost = ofp->ofilter.ost;
@@ -2362,8 +2300,8 @@ static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
ret = av_buffersink_get_frame_flags(filter, frame,
AV_BUFFERSINK_FLAG_NO_REQUEST);
- if (ret == AVERROR_EOF && !buffer && !fgt->eof_out[ofp->index]) {
- ret = fg_output_frame(ofp, fgt, NULL, buffer);
+ if (ret == AVERROR_EOF && !fgt->eof_out[ofp->index]) {
+ ret = fg_output_frame(ofp, fgt, NULL);
return (ret < 0) ? ret : 1;
} else if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) {
return 1;
@@ -2417,7 +2355,7 @@ static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
fd->frame_rate_filter = ofp->fps.framerate;
}
- ret = fg_output_frame(ofp, fgt, frame, buffer);
+ ret = fg_output_frame(ofp, fgt, frame);
av_frame_unref(frame);
if (ret < 0)
return ret;
@@ -2425,44 +2363,29 @@ static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
return 0;
}
-/* retrieve all frames available at filtergraph outputs and either send them to
- * the main thread (buffer=0) or buffer them for later (buffer=1) */
+/* retrieve all frames available at filtergraph outputs
+ * and send them to consumers */
static int read_frames(FilterGraph *fg, FilterGraphThread *fgt,
- AVFrame *frame, int buffer)
+ AVFrame *frame)
{
FilterGraphPriv *fgp = fgp_from_fg(fg);
- int ret = 0;
if (!fg->graph)
return 0;
- // process buffered frames
- if (!buffer) {
- AVFrame *f;
-
- while (av_fifo_read(fgt->frame_queue_out, &f, 1) >= 0) {
- int out_idx = (intptr_t)f->opaque;
- f->opaque = NULL;
- ret = tq_send(fgp->queue_out, out_idx, f);
- av_frame_free(&f);
- if (ret < 0 && ret != AVERROR_EOF)
- return ret;
- }
- }
-
/* Reap all buffers present in the buffer sinks */
for (int i = 0; i < fg->nb_outputs; i++) {
OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]);
int ret = 0;
while (!ret) {
- ret = fg_output_step(ofp, fgt, frame, buffer);
+ ret = fg_output_step(ofp, fgt, frame);
if (ret < 0)
return ret;
}
}
- return 0;
+ return (fgp->nb_outputs_done == fg->nb_outputs) ? AVERROR_EOF : 0;
}
static void sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb)
@@ -2536,6 +2459,9 @@ static int send_eof(FilterGraphThread *fgt, InputFilter *ifilter,
InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
int ret;
+ if (fgt->eof_in[ifp->index])
+ return 0;
+
fgt->eof_in[ifp->index] = 1;
if (ifp->filter) {
@@ -2637,7 +2563,7 @@ static int send_frame(FilterGraph *fg, FilterGraphThread *fgt,
return ret;
}
- ret = read_frames(fg, fgt, tmp, 1);
+ ret = read_frames(fg, fgt, tmp);
av_frame_free(&tmp);
if (ret < 0)
return ret;
@@ -2674,6 +2600,29 @@ static int choose_input(const FilterGraph *fg, const FilterGraphThread *fgt)
{
int nb_requests, nb_requests_max = 0;
int best_input = -1;
+ int ret;
+
+ if (!fg->graph) {
+ for (int i = 0; i < fg->nb_inputs; i++) {
+ InputFilterPriv *ifp = ifp_from_ifilter(fg->inputs[i]);
+ if (ifp->format < 0 && !fgt->eof_in[i])
+ return i;
+ }
+
+ // This state - graph is not configured, but all inputs are either
+ // initialized or EOF - should be unreachable because sending EOF to a
+ // filter without even a fallback format should fail
+ av_assert0(0);
+ return AVERROR_BUG;
+ }
+
+ ret = avfilter_graph_request_oldest(fg->graph);
+ if (ret < 0 && ret != AVERROR(EAGAIN))
+ return ret;
+
+ // we drain all frames from the filtergraph after each input frame, so
+ // we should never get success here
+ av_assert0(ret < 0);
for (int i = 0; i < fg->nb_inputs; i++) {
InputFilter *ifilter = fg->inputs[i];
@@ -2690,9 +2639,12 @@ static int choose_input(const FilterGraph *fg, const FilterGraphThread *fgt)
}
}
+ av_assert0(best_input >= 0);
+
return best_input;
}
+#if 0
static int msg_process(FilterGraphPriv *fgp, FilterGraphThread *fgt,
AVFrame *frame)
{
@@ -2766,6 +2718,7 @@ done:
return 0;
}
+#endif
static void fg_thread_set_name(const FilterGraph *fg)
{
@@ -2852,28 +2805,53 @@ static void *filter_thread(void *arg)
while (1) {
InputFilter *ifilter;
InputFilterPriv *ifp;
- int input_idx, eof_frame;
+ int input_idx;
- input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame);
- if (input_idx < 0 ||
- (input_idx == fg->nb_inputs && input_status < 0)) {
+ // XXX
+ if (!fg->nb_inputs) {
+ abort();
+ goto no_inputs;
+ }
+
+ input_idx = choose_input(fg, &fgt);
+ // XXX
+ if (input_idx < 0)
+ goto finish;
+
+ input_status = sch_filter_receive(fgp->sch, fgp->sch_idx,
+ &input_idx, fgt.frame);
+ if (input_idx < 0) {
av_log(fg, AV_LOG_VERBOSE, "Filtering thread received EOF\n");
break;
}
+ // XXX
+#if 0
// message on the control stream
if (input_idx == fg->nb_inputs) {
ret = msg_process(fgp, &fgt, fgt.frame);
if (ret < 0)
goto finish;
+#elif 0
+ enum FrameOpaque msg = (intptr_t)fgt.frame->opaque;
+
+ fgt.frame->opaque = NULL;
+ av_assert0(msg > 0);
+ av_assert0(msg == FRAME_OPAQUE_SEND_COMMAND);
+
+ {
+ FilterCommand *fc = (FilterCommand*)frame->buf[0]->data;
+ send_command(fg, fc->time, fc->target, fc->command, fc->arg, fc->all_filters);
+ av_frame_unref(frame);
+ }
+
continue;
}
+#endif
- // we received an input frame or EOF
ifilter = fg->inputs[input_idx];
ifp = ifp_from_ifilter(ifilter);
- eof_frame = input_status >= 0 && (intptr_t)fgt.frame->opaque == FRAME_OPAQUE_EOF;
if (ifp->type_src == AVMEDIA_TYPE_SUBTITLE) {
if (input_status >= 0 && (intptr_t)fgt.frame->opaque == FRAME_OPAQUE_SUB_HEARTBEAT)
sub2video_heartbeat(ifilter, fgt.frame->pts, fgt.frame->time_base);
@@ -2890,20 +2868,17 @@ static void *filter_thread(void *arg)
if (ret < 0)
break;
- if (eof_frame) {
- // an EOF frame is immediately followed by sender closing
- // the corresponding stream, so retrieve that event
- input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame);
- av_assert0(input_status == AVERROR_EOF && input_idx == ifp->index);
- }
-
- // signal to the main thread that we are done
- ret = tq_send(fgp->queue_out, fg->nb_outputs, fgt.frame);
+no_inputs:
+ // retrieve all newly avalable frames
+ // XXX: handle filtergraphs connected to an unlimited source
+ ret = read_frames(fg, &fgt, fgt.frame);
if (ret < 0) {
if (ret == AVERROR_EOF)
- break;
+ av_log(fg, AV_LOG_VERBOSE, "All consumers returned EOF\n");
+ else
+ av_log(fg, AV_LOG_ERROR, "Error sending frames to consumers: %s\n",
+ av_err2str(ret));
- av_log(fg, AV_LOG_ERROR, "Error communicating with the main thread\n");
goto finish;
}
}
@@ -2913,10 +2888,9 @@ finish:
if (ret == AVERROR_EOF)
ret = 0;
- for (int i = 0; i <= fg->nb_inputs; i++)
- tq_receive_finish(fgp->queue_in, i);
- for (int i = 0; i <= fg->nb_outputs; i++)
- tq_send_finish(fgp->queue_out, i);
+ // XXX close_output() here?
+
+ sch_filter_send(fgp->sch, fgp->sch_idx, -1, NULL);
fg_thread_uninit(&fgt);
@@ -2925,64 +2899,6 @@ finish:
return (void*)(intptr_t)ret;
}
-static int thread_send_frame(FilterGraphPriv *fgp, InputFilter *ifilter,
- AVFrame *frame, enum FrameOpaque type)
-{
- InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
- int output_idx, ret;
-
- if (ifp->eof) {
- av_frame_unref(frame);
- return AVERROR_EOF;
- }
-
- frame->opaque = (void*)(intptr_t)type;
-
- ret = tq_send(fgp->queue_in, ifp->index, frame);
- if (ret < 0) {
- ifp->eof = 1;
- av_frame_unref(frame);
- return ret;
- }
-
- if (type == FRAME_OPAQUE_EOF)
- tq_send_finish(fgp->queue_in, ifp->index);
-
- // wait for the frame to be processed
- ret = tq_receive(fgp->queue_out, &output_idx, frame);
- av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF);
-
- return ret;
-}
-
-int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference)
-{
- FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
- int ret;
-
- if (keep_reference) {
- ret = av_frame_ref(fgp->frame, frame);
- if (ret < 0)
- return ret;
- } else
- av_frame_move_ref(fgp->frame, frame);
-
- return thread_send_frame(fgp, ifilter, fgp->frame, 0);
-}
-
-int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb)
-{
- FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
- int ret;
-
- fgp->frame->pts = pts;
- fgp->frame->time_base = tb;
-
- ret = thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_EOF);
-
- return ret == AVERROR_EOF ? 0 : ret;
-}
-
void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb)
{
FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
@@ -2990,144 +2906,10 @@ void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational t
fgp->frame->pts = pts;
fgp->frame->time_base = tb;
+ // XXX
+#if 0
thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_SUB_HEARTBEAT);
-}
-
-int fg_transcode_step(FilterGraph *graph, InputStream **best_ist)
-{
- FilterGraphPriv *fgp = fgp_from_fg(graph);
- int ret, got_frames = 0;
-
- if (fgp->eof_in)
- return AVERROR_EOF;
-
- // signal to the filtering thread to return all frames it can
- av_assert0(!fgp->frame->buf[0]);
- fgp->frame->opaque = (void*)(intptr_t)(best_ist ?
- FRAME_OPAQUE_CHOOSE_INPUT :
- FRAME_OPAQUE_REAP_FILTERS);
-
- ret = tq_send(fgp->queue_in, graph->nb_inputs, fgp->frame);
- if (ret < 0) {
- fgp->eof_in = 1;
- goto finish;
- }
-
- while (1) {
- OutputFilter *ofilter;
- OutputFilterPriv *ofp;
- OutputStream *ost;
- int output_idx;
-
- ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame);
-
- // EOF on the whole queue or the control stream
- if (output_idx < 0 ||
- (ret < 0 && output_idx == graph->nb_outputs))
- goto finish;
-
- // EOF for a specific stream
- if (ret < 0) {
- ofilter = graph->outputs[output_idx];
- ofp = ofp_from_ofilter(ofilter);
-
- // we are finished and no frames were ever seen at this output,
- // at least initialize the encoder with a dummy frame
- if (!ofp->got_frame) {
- AVFrame *frame = fgp->frame;
- FrameData *fd;
-
- frame->time_base = ofp->tb_out;
- frame->format = ofp->format;
-
- frame->width = ofp->width;
- frame->height = ofp->height;
- frame->sample_aspect_ratio = ofp->sample_aspect_ratio;
-
- frame->sample_rate = ofp->sample_rate;
- if (ofp->ch_layout.nb_channels) {
- ret = av_channel_layout_copy(&frame->ch_layout, &ofp->ch_layout);
- if (ret < 0)
- return ret;
- }
-
- fd = frame_data(frame);
- if (!fd)
- return AVERROR(ENOMEM);
-
- fd->frame_rate_filter = ofp->fps.framerate;
-
- av_assert0(!frame->buf[0]);
-
- av_log(ofilter->ost, AV_LOG_WARNING,
- "No filtered frames for output stream, trying to "
- "initialize anyway.\n");
-
- enc_open(ofilter->ost, frame);
- av_frame_unref(frame);
- }
-
- close_output_stream(graph->outputs[output_idx]->ost);
- continue;
- }
-
- // request was fully processed by the filtering thread,
- // return the input stream to read from, if needed
- if (output_idx == graph->nb_outputs) {
- int input_idx = (intptr_t)fgp->frame->opaque - 1;
- av_assert0(input_idx <= graph->nb_inputs);
-
- if (best_ist) {
- *best_ist = (input_idx >= 0 && input_idx < graph->nb_inputs) ?
- ifp_from_ifilter(graph->inputs[input_idx])->ist : NULL;
-
- if (input_idx < 0 && !got_frames) {
- for (int i = 0; i < graph->nb_outputs; i++)
- graph->outputs[i]->ost->unavailable = 1;
- }
- }
- break;
- }
-
- // got a frame from the filtering thread, send it for encoding
- ofilter = graph->outputs[output_idx];
- ost = ofilter->ost;
- ofp = ofp_from_ofilter(ofilter);
-
- if (ost->finished) {
- av_frame_unref(fgp->frame);
- tq_receive_finish(fgp->queue_out, output_idx);
- continue;
- }
-
- if (fgp->frame->pts != AV_NOPTS_VALUE) {
- ofilter->last_pts = av_rescale_q(fgp->frame->pts,
- fgp->frame->time_base,
- AV_TIME_BASE_Q);
- }
-
- ret = enc_frame(ost, fgp->frame);
- av_frame_unref(fgp->frame);
- if (ret < 0)
- goto finish;
-
- ofp->got_frame = 1;
- got_frames = 1;
- }
-
-finish:
- if (ret < 0) {
- fgp->eof_in = 1;
- for (int i = 0; i < graph->nb_outputs; i++)
- close_output_stream(graph->outputs[i]->ost);
- }
-
- return ret;
-}
-
-int reap_filters(FilterGraph *fg, int flush)
-{
- return fg_transcode_step(fg, NULL);
+#endif
}
void fg_send_command(FilterGraph *fg, double time, const char *target,
@@ -3138,9 +2920,6 @@ void fg_send_command(FilterGraph *fg, double time, const char *target,
FilterCommand *fc;
int output_idx, ret;
- if (!fgp->queue_in)
- return;
-
fc = av_mallocz(sizeof(*fc));
if (!fc)
return;
@@ -3165,13 +2944,5 @@ void fg_send_command(FilterGraph *fg, double time, const char *target,
fgp->frame->buf[0] = buf;
fgp->frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_SEND_COMMAND;
- ret = tq_send(fgp->queue_in, fg->nb_inputs + 1, fgp->frame);
- if (ret < 0) {
- av_frame_unref(fgp->frame);
- return;
- }
-
- // wait for the frame to be processed
- ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame);
- av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF);
+ // XXX actually implement the command
}