@@ -1286,10 +1286,7 @@ static void finish_output_stream(OutputStream *ost)
OutputFile *of = output_files[ost->file_index];
ost->finished = ENCODER_FINISHED;
- if (ost->sq_idx_mux >= 0)
- sq_send(of->sq_mux, ost->sq_idx_mux, SQPKT(NULL));
- else
- ost->finished |= MUXER_FINISHED;
+ output_packet(of, ost->pkt, ost, 1);
}
/**
@@ -3421,9 +3418,8 @@ static int need_output(void)
for (i = 0; i < nb_output_streams; i++) {
OutputStream *ost = output_streams[i];
- OutputFile *of = output_files[ost->file_index];
- if (ost->finished || of_finished(of))
+ if (ost->finished)
continue;
return 1;
@@ -4269,26 +4265,6 @@ static int transcode_step(void)
return reap_filters(0);
}
-static void flush_sync_queues_mux(void)
-{
- /* mark all queue inputs as done */
- for (int i = 0; i < nb_output_streams; i++) {
- OutputStream *ost = output_streams[i];
- OutputFile *of = output_files[ost->file_index];
- if (ost->sq_idx_mux >= 0)
- sq_send(of->sq_mux, ost->sq_idx_mux, SQPKT(NULL));
- }
-
- /* encode all packets remaining in the sync queues */
- for (int i = 0; i < nb_output_streams; i++) {
- OutputStream *ost = output_streams[i];
- OutputFile *of = output_files[ost->file_index];
-
- if (!(ost->finished & MUXER_FINISHED))
- output_packet(of, ost->pkt, ost, 1);
- }
-}
-
/*
* The following code is the main loop of the file converter
*/
@@ -4310,6 +4286,12 @@ static int transcode(void)
timer_start = av_gettime_relative();
+ for (i = 0; i < nb_output_files; i++) {
+ ret = of_thread_start(output_files[i]);
+ if (ret < 0)
+ goto fail;
+ }
+
if ((ret = init_input_threads()) < 0)
goto fail;
@@ -4346,7 +4328,9 @@ static int transcode(void)
}
}
flush_encoders();
- flush_sync_queues_mux();
+
+ for (i = 0; i < nb_output_files; i++)
+ of_thread_stop(output_files[i]);
term_exit();
@@ -583,6 +583,8 @@ typedef struct OutputFile {
const AVOutputFormat *format;
const char *url;
+ AVThreadMessageQueue *mux_queue;
+
SyncQueue *sq_encode;
SyncQueue *sq_mux;
@@ -697,11 +699,14 @@ int hwaccel_decode_init(AVCodecContext *avctx);
int of_muxer_init(OutputFile *of, AVFormatContext *fc,
AVDictionary *opts, int64_t limit_filesize);
+
+int of_thread_start(OutputFile *of);
+void of_thread_stop(OutputFile *of);
+
int of_write_trailer(OutputFile *of);
void of_close(OutputFile **pof);
int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost, int eof);
-int of_finished(OutputFile *of);
int64_t of_filesize(OutputFile *of);
AVChapter * const *
of_get_chapters(OutputFile *of, unsigned int *nb_chapters);
@@ -16,17 +16,20 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
+#include <stdatomic.h>
#include <stdio.h>
#include <string.h>
#include "ffmpeg.h"
#include "sync_queue.h"
+#include "thread_queue.h"
#include "libavutil/fifo.h"
#include "libavutil/intreadwrite.h"
#include "libavutil/log.h"
#include "libavutil/mem.h"
#include "libavutil/timestamp.h"
+#include "libavutil/thread.h"
#include "libavcodec/packet.h"
@@ -46,18 +49,24 @@ typedef struct MuxStream {
/* dts of the last packet sent to the muxer, in the stream timebase
* used for making up missing dts values */
int64_t last_mux_dts;
+
+ /* data (a real or a flush packet) was received for this stream */
+ int got_data;
} MuxStream;
struct Muxer {
AVFormatContext *fc;
+ pthread_t thread;
+ ThreadQueue *tq;
+
MuxStream *streams;
AVDictionary *opts;
/* filesize limit expressed in bytes */
int64_t limit_filesize;
- int64_t final_filesize;
+ atomic_int_least64_t last_filesize;
int header_written;
};
@@ -221,13 +230,32 @@ static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
return 0;
}
+static int64_t filesize(AVIOContext *pb)
+{
+ int64_t ret = -1;
+
+ if (pb) {
+ ret = avio_size(pb);
+ if (ret <= 0) // FIXME improve avio_size() so it works with non seekable output too
+ ret = avio_tell(pb);
+ }
+
+ return ret;
+}
+
static int write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
{
MuxStream *ms = &of->mux->streams[ost->index];
AVFormatContext *s = of->mux->fc;
AVStream *st = ost->st;
+ int64_t fs;
int ret;
+ fs = filesize(s->pb);
+ atomic_store(&of->mux->last_filesize, fs);
+ if (fs >= of->mux->limit_filesize)
+ return AVERROR_EOF;
+
if ((st->codecpar->codec_type == AVMEDIA_TYPE_VIDEO && video_sync_method == VSYNC_DROP) ||
(st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO && audio_sync_method < 0))
pkt->pts = pkt->dts = AV_NOPTS_VALUE;
@@ -333,8 +361,8 @@ static int check_write_header(OutputFile *of)
int ret, i;
for (i = 0; i < fc->nb_streams; i++) {
- OutputStream *ost = output_streams[of->ost_index + i];
- if (!ost->initialized)
+ MuxStream *ms = &of->mux->streams[i];
+ if (!ms->got_data)
return 0;
}
@@ -378,12 +406,15 @@ static int check_write_header(OutputFile *of)
return 0;
}
-int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost, int eof)
+static int sync_queue_process(OutputFile *of, OutputStream *ost, AVPacket *pkt)
{
+ Muxer *mux = of->mux;
+ MuxStream *ms = &mux->streams[ost->index];
int ret;
- if (!of->mux->header_written) {
- ret = check_write_header(of);
+ ms->got_data = 1;
+ if (!mux->header_written) {
+ ret = check_write_header(of);
if (ret < 0) {
av_packet_unref(pkt);
return ret;
@@ -391,34 +422,102 @@ int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost, int eof)
}
if (ost->sq_idx_mux >= 0) {
- ret = sq_send(of->sq_mux, ost->sq_idx_mux,
- SQPKT(eof ? NULL: pkt));
+ int ret = sq_send(of->sq_mux, ost->sq_idx_mux, SQPKT(pkt));
if (ret < 0) {
- av_packet_unref(pkt);
- if (ret == AVERROR_EOF) {
- ost->finished |= MUXER_FINISHED;
- return 0;
- } else
- return ret;
+ if (pkt)
+ av_packet_unref(pkt);
+ return ret;
}
while (1) {
+ pkt = av_packet_alloc();
+ if (!pkt)
+ // XXX
+ abort();
+
ret = sq_receive(of->sq_mux, -1, SQPKT(pkt));
- if (ret == AVERROR_EOF || ret == AVERROR(EAGAIN))
- return 0;
- else if (ret < 0)
- return ret;
+ if (ret < 0) {
+ av_packet_free(&pkt);
+ return (ret == AVERROR_EOF || ret == AVERROR(EAGAIN)) ? 0 : ret;
+ }
ret = submit_packet(of, pkt, output_streams[of->ost_index + ret]);
+ av_packet_free(&pkt);
if (ret < 0)
return ret;
}
- } else if (!eof)
+ } else if (pkt)
return submit_packet(of, pkt, ost);
return 0;
}
+static void *muxer_thread(void *arg)
+{
+ OutputFile *of = arg;
+ Muxer *mux = of->mux;
+
+ while (1) {
+ OutputStream *ost;
+ AVPacket *pkt = NULL;
+ int stream_idx, ret;
+
+ ret = tq_receive(mux->tq, &stream_idx, &pkt);
+ if (stream_idx < 0) {
+ av_log(NULL, AV_LOG_DEBUG,
+ "All streams finished for output file #%d\n", of->index);
+ break;
+ }
+
+ ost = output_streams[of->ost_index + stream_idx];
+ ret = sync_queue_process(of, ost, ret < 0 ? NULL : pkt);
+ av_packet_free(&pkt);
+ if (ret == AVERROR_EOF)
+ tq_receive_finish(mux->tq, stream_idx);
+ else if (ret < 0) {
+ av_log(NULL, AV_LOG_ERROR,
+ "Error muxing a packet for output file #%d\n", of->index);
+ break;
+ }
+ }
+
+ for (unsigned int i = 0; i < mux->fc->nb_streams; i++) {
+ sync_queue_process(of, output_streams[of->ost_index], NULL);
+ tq_receive_finish(mux->tq, i);
+ }
+
+ av_log(NULL, AV_LOG_DEBUG, "Terminating muxer thread %d\n", of->index);
+
+ return NULL;
+}
+
+int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost, int eof)
+{
+ AVPacket *pkt1;
+ int ret = 0;
+
+ if (eof) {
+ tq_send_finish(of->mux->tq, ost->index);
+ return 0;
+ }
+
+ pkt1 = av_packet_alloc();
+ if (!pkt1) {
+ av_packet_unref(pkt);
+ return AVERROR(ENOMEM);
+ }
+
+ av_packet_move_ref(pkt1, pkt);
+
+ ret = tq_send(of->mux->tq, ost->index, &pkt1);
+ if (ret < 0) {
+ av_packet_free(&pkt1);
+ ost->finished |= MUXER_FINISHED;
+ }
+
+ return ret == AVERROR_EOF ? 0 : ret;
+}
+
int of_write_trailer(OutputFile *of)
{
AVFormatContext *fc = of->mux->fc;
@@ -438,7 +537,7 @@ int of_write_trailer(OutputFile *of)
return ret;
}
- of->mux->final_filesize = of_filesize(of);
+ of->mux->last_filesize = filesize(fc->pb);
if (!(of->format->flags & AVFMT_NOFILE)) {
ret = avio_closep(&fc->pb);
@@ -487,6 +586,9 @@ static void mux_free(Muxer **pmux)
av_freep(&mux->streams);
av_dict_free(&mux->opts);
+ if (mux->tq) {
+ }
+
fc_close(&mux->fc);
av_freep(pmux);
@@ -558,30 +660,53 @@ fail:
return ret;
}
-int of_finished(OutputFile *of)
+int64_t of_filesize(OutputFile *of)
{
- return of_filesize(of) >= of->mux->limit_filesize;
+ return atomic_load(&of->mux->last_filesize);
}
-int64_t of_filesize(OutputFile *of)
+AVChapter * const *
+of_get_chapters(OutputFile *of, unsigned int *nb_chapters)
{
- AVIOContext *pb = of->mux->fc->pb;
- int64_t ret = -1;
+ *nb_chapters = of->mux->fc->nb_chapters;
+ return of->mux->fc->chapters;
+}
- if (of->mux->final_filesize)
- ret = of->mux->final_filesize;
- else if (pb) {
- ret = avio_size(pb);
- if (ret <= 0) // FIXME improve avio_size() so it works with non seekable output too
- ret = avio_tell(pb);
+static void pkt_free(void *pkt)
+{
+ av_packet_free((AVPacket**)&pkt);
+}
+
+int of_thread_start(OutputFile *of)
+{
+ Muxer *mux = of->mux;
+ int ret;
+
+ mux->tq = tq_alloc(mux->fc->nb_streams, 8, sizeof(AVPacket*),
+ pkt_free);
+ if (!mux->tq)
+ return AVERROR(ENOMEM);
+
+ ret = pthread_create(&mux->thread, NULL, muxer_thread, (void*)of);
+ if (ret) {
+ tq_free(&mux->tq);
+ return AVERROR(ret);
}
- return ret;
+ return 0;
}
-AVChapter * const *
-of_get_chapters(OutputFile *of, unsigned int *nb_chapters)
+void of_thread_stop(OutputFile *of)
{
- *nb_chapters = of->mux->fc->nb_chapters;
- return of->mux->fc->chapters;
+ Muxer *mux = of->mux;
+
+ if (!mux || !mux->tq)
+ return;
+
+ for (unsigned int i = 0; i < mux->fc->nb_streams; i++)
+ tq_send_finish(mux->tq, i);
+
+ pthread_join(mux->thread, NULL);
+
+ tq_free(&mux->tq);
}