@@ -97,6 +97,7 @@ typedef struct MpegTSWrite {
int pmt_start_pid;
int start_pid;
int m2ts_mode;
+ int parallel_mux;
int reemit_pat_pmt; // backward compatibility
@@ -120,6 +121,7 @@ typedef struct MpegTSWrite {
/* a PES packet header is generated every DEFAULT_PES_HEADER_FREQ packets */
#define DEFAULT_PES_HEADER_FREQ 16
#define DEFAULT_PES_PAYLOAD_SIZE ((DEFAULT_PES_HEADER_FREQ - 1) * 184 + 170)
+#define MAX_PES_PAYLOAD 2 * 200 * 1024 // From mpegts.c
/* The section length is 12 bits. The first 2 are set to 0, the remaining
* 10 bits should not exceed 1021. */
@@ -227,17 +229,24 @@ static int mpegts_write_section1(MpegTSSection *s, int tid, int id,
#define PAT_RETRANS_TIME 100
#define PCR_RETRANS_TIME 20
+#define PES_START_FLAG 1
+#define PES_FULL_FLAG 2
+/* Unused free flags: 4,8,16,32,64 */
+#define PES_NEEDS_END_FLAG 128
+
typedef struct MpegTSWriteStream {
struct MpegTSService *service;
int pid; /* stream associated pid */
int cc;
int discontinuity;
int payload_size;
+ int payload_top;
int first_pts_check; ///< first pts check needed
int prev_payload_key;
int64_t payload_pts;
int64_t payload_dts;
int payload_flags;
+ int pes_flags;
uint8_t *payload;
AVFormatContext *amux;
AVRational user_tb;
@@ -866,7 +875,7 @@ static int mpegts_init(AVFormatContext *s)
ts_st->user_tb = st->time_base;
avpriv_set_pts_info(st, 33, 1, 90000);
- ts_st->payload = av_mallocz(ts->pes_payload_size);
+ ts_st->payload = av_mallocz(ts->parallel_mux ? MAX_PES_PAYLOAD : ts->pes_payload_size);
if (!ts_st->payload) {
ret = AVERROR(ENOMEM);
goto fail;
@@ -910,6 +919,8 @@ static int mpegts_init(AVFormatContext *s)
pids[i] = ts_st->pid;
ts_st->payload_pts = AV_NOPTS_VALUE;
ts_st->payload_dts = AV_NOPTS_VALUE;
+ ts_st->payload_top = 0;
+ ts_st->pes_flags = 0;
ts_st->first_pts_check = 1;
ts_st->cc = 15;
ts_st->discontinuity = ts->flags & MPEGTS_FLAG_DISCONT;
@@ -953,46 +964,52 @@ static int mpegts_init(AVFormatContext *s)
av_freep(&pids);
- /* if no video stream, use the first stream as PCR */
- if (service->pcr_pid == 0x1fff && s->nb_streams > 0) {
- pcr_st = s->streams[0];
- ts_st = pcr_st->priv_data;
- service->pcr_pid = ts_st->pid;
- } else
- ts_st = pcr_st->priv_data;
-
- if (ts->mux_rate > 1) {
- service->pcr_packet_period = (int64_t)ts->mux_rate * ts->pcr_period /
- (TS_PACKET_SIZE * 8 * 1000);
- ts->sdt_packet_period = (int64_t)ts->mux_rate * SDT_RETRANS_TIME /
- (TS_PACKET_SIZE * 8 * 1000);
- ts->pat_packet_period = (int64_t)ts->mux_rate * PAT_RETRANS_TIME /
- (TS_PACKET_SIZE * 8 * 1000);
-
- if (ts->copyts < 1)
- ts->first_pcr = av_rescale(s->max_delay, PCR_TIME_BASE, AV_TIME_BASE);
- } else {
- /* Arbitrary values, PAT/PMT will also be written on video key frames */
- ts->sdt_packet_period = 200;
- ts->pat_packet_period = 40;
- if (pcr_st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) {
- int frame_size = av_get_audio_frame_duration2(pcr_st->codecpar, 0);
- if (!frame_size) {
- av_log(s, AV_LOG_WARNING, "frame size not set\n");
- service->pcr_packet_period =
- pcr_st->codecpar->sample_rate / (10 * 512);
+ for (i = 0; i < ts->nb_services; i++) {
+ service = ts->services[i];
+
+ /* if no video stream, use the first stream as PCR */
+ if (service->pcr_pid == 0x1fff && s->nb_streams > 0) {
+ pcr_st = s->streams[0];
+ ts_st = pcr_st->priv_data;
+ service->pcr_pid = ts_st->pid;
+ } else
+ ts_st = pcr_st->priv_data;
+
+ if (ts->mux_rate > 1) {
+ service->pcr_packet_period = (int64_t)ts->mux_rate * ts->pcr_period /
+ (TS_PACKET_SIZE * 8 * 1000);
+ ts->sdt_packet_period = (int64_t)ts->mux_rate * SDT_RETRANS_TIME /
+ (TS_PACKET_SIZE * 8 * 1000);
+ ts->pat_packet_period = (int64_t)ts->mux_rate * PAT_RETRANS_TIME /
+ (TS_PACKET_SIZE * 8 * 1000);
+ if (ts->copyts < 1)
+ ts->first_pcr = av_rescale(s->max_delay, PCR_TIME_BASE, AV_TIME_BASE);
+ } else {
+ /* Arbitrary values, PAT/PMT will also be written on video key frames */
+ ts->sdt_packet_period = 200;
+ ts->pat_packet_period = 40;
+ if (pcr_st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) {
+ int frame_size = av_get_audio_frame_duration2(pcr_st->codecpar, 0);
+ if (!frame_size) {
+ av_log(s, AV_LOG_WARNING, "frame size not set\n");
+ service->pcr_packet_period =
+ pcr_st->codecpar->sample_rate / (10 * 512);
+ } else {
+ service->pcr_packet_period =
+ pcr_st->codecpar->sample_rate / (10 * frame_size);
+ }
} else {
+ // max delta PCR 0.1s
+ // TODO: should be avg_frame_rate
service->pcr_packet_period =
- pcr_st->codecpar->sample_rate / (10 * frame_size);
+ ts_st->user_tb.den / (10 * ts_st->user_tb.num);
}
- } else {
- // max delta PCR 0.1s
- // TODO: should be avg_frame_rate
- service->pcr_packet_period =
- ts_st->user_tb.den / (10 * ts_st->user_tb.num);
+ if (!service->pcr_packet_period)
+ service->pcr_packet_period = 1;
}
- if (!service->pcr_packet_period)
- service->pcr_packet_period = 1;
+
+ // output a PCR as soon as possible
+ service->pcr_packet_count = service->pcr_packet_period;
}
ts->last_pat_ts = AV_NOPTS_VALUE;
@@ -1005,8 +1022,6 @@ static int mpegts_init(AVFormatContext *s)
ts->sdt_packet_period = INT_MAX;
}
- // output a PCR as soon as possible
- service->pcr_packet_count = service->pcr_packet_period;
ts->pat_packet_count = ts->pat_packet_period - 1;
ts->sdt_packet_count = ts->sdt_packet_period - 1;
@@ -1172,9 +1187,14 @@ static uint8_t *get_ts_payload_start(uint8_t *pkt)
/* Add a PES header to the front of the payload, and segment into an integer
* number of TS packets. The final TS packet is padded using an oversized
* adaptation header to exactly fill the last TS packet.
- * NOTE: 'payload' contains a complete PES payload. */
-static void mpegts_write_pes(AVFormatContext *s, AVStream *st,
- const uint8_t *payload, int payload_size,
+ * NOTE: 'payload' contains a complete PES payload.
+ * NOTE2: When 'mode' < -1 it writes the rest of the payload (without header);
+ * When 'mode' = -1 it writes the entire payload with the header;
+ * when 'mode' = 0 it writes only one TS packet with the header;
+ * when 'mode' > 0 it writes only one TS packet.
+ * Note3: It returns the number of writed bytes. */
+static int mpegts_write_pes(AVFormatContext *s, AVStream *st,
+ const uint8_t *payload, int payload_size, int mode,
int64_t pts, int64_t dts, int key, int stream_id)
{
MpegTSWriteStream *ts_st = st->priv_data;
@@ -1186,13 +1206,14 @@ static void mpegts_write_pes(AVFormatContext *s, AVStream *st,
int64_t pcr = -1; /* avoid warning */
int64_t delay = av_rescale(s->max_delay, 90000, AV_TIME_BASE);
int force_pat = st->codecpar->codec_type == AVMEDIA_TYPE_VIDEO && key && !ts_st->prev_payload_key;
+ int ret_size = 0;
av_assert0(ts_st->payload != buf || st->codecpar->codec_type != AVMEDIA_TYPE_VIDEO);
if (ts->flags & MPEGTS_FLAG_PAT_PMT_AT_FRAMES && st->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) {
force_pat = 1;
}
- is_start = 1;
+ is_start = (mode > 0 || mode < -1) ? 0 : 1;
while (payload_size > 0) {
retransmit_si_info(s, force_pat, dts);
force_pat = 0;
@@ -1389,6 +1410,8 @@ static void mpegts_write_pes(AVFormatContext *s, AVStream *st,
memset(q, 0xff, pes_header_stuffing_bytes);
q += pes_header_stuffing_bytes;
}
+ if (is_dvb_subtitle)
+ ts_st->pes_flags |= PES_NEEDS_END_FLAG;
is_start = 0;
}
/* header size */
@@ -1420,7 +1443,7 @@ static void mpegts_write_pes(AVFormatContext *s, AVStream *st,
}
}
- if (is_dvb_subtitle && payload_size == len) {
+ if ((ts_st->pes_flags & PES_NEEDS_END_FLAG) && payload_size == len) {
memcpy(buf + TS_PACKET_SIZE - len, payload, len - 1);
buf[TS_PACKET_SIZE - 1] = 0xff; /* end_of_PES_data_field_marker: an 8-bit field with fixed contents 0xff for DVB subtitle */
} else {
@@ -1431,8 +1454,12 @@ static void mpegts_write_pes(AVFormatContext *s, AVStream *st,
payload_size -= len;
mpegts_prefix_m2ts_header(s);
avio_write(s->pb, buf, TS_PACKET_SIZE);
+ ret_size += len;
+ if (mode >= 0)
+ break; // Write one packet only then exit
}
ts_st->prev_payload_key = key;
+ return ret_size;
}
int ff_check_h264_startcode(AVFormatContext *s, const AVStream *st, const AVPacket *pkt)
@@ -1519,6 +1546,101 @@ static int opus_get_packet_samples(AVFormatContext *s, AVPacket *pkt)
return duration;
}
+static inline int write_full_pes_stream(AVFormatContext *s, AVStream *st,
+ const uint8_t *payload, int payload_size,
+ int64_t pts, int64_t dts, int key, int stream_id)
+{
+ return mpegts_write_pes(s, st, payload, payload_size, -1, pts, dts, key, stream_id);
+}
+
+static inline int write_flush_pes_stream(AVFormatContext *s, AVStream *st,
+ const uint8_t *payload, int payload_size,
+ int64_t pts, int64_t dts, int key, int stream_id)
+{
+ return mpegts_write_pes(s, st, payload, payload_size, -2, pts, dts, key, stream_id);
+}
+
+static inline int write_pkt_pes_stream(AVFormatContext *s, AVStream *st,
+ const uint8_t *payload, int payload_size, int mode,
+ int64_t pts, int64_t dts, int key, int stream_id)
+{
+ return mpegts_write_pes(s, st, payload, payload_size, mode, pts, dts, key, stream_id);
+}
+
+static int write_side_streams(AVFormatContext *s, int64_t dts, int64_t delay, int stream_id, int parallel)
+{
+ int i;
+ int sum = 0;
+ int check_mode;
+ int write_mode; // -1: Don't write anything
+ // 0: Start the PES packet and write 1 TS packet
+ // 1: Continue writing just 1 TS packet
+ // 2: Write full PES packet
+ int ret;
+ for(i=0; i<s->nb_streams; i++) {
+ AVStream *st2 = s->streams[i];
+ MpegTSWriteStream *ts_st2 = st2->priv_data;
+
+ check_mode = parallel ? (ts_st2->payload_top > 0) : 0;
+
+ if (ts_st2->payload_size && !check_mode
+ && (ts_st2->payload_dts == AV_NOPTS_VALUE || dts - ts_st2->payload_dts > delay/2 || (ts_st2->pes_flags & PES_START_FLAG))) {
+ write_mode = parallel ? 0 : 2;
+ } else if (check_mode) {
+ write_mode = 1;
+ } else {
+ write_mode = -1;
+ }
+
+ switch (write_mode)
+ {
+ case 2: // SEQUENTIAL MODE
+ ret = write_full_pes_stream(s, st2, ts_st2->payload, ts_st2->payload_size,
+ ts_st2->payload_pts, ts_st2->payload_dts,
+ ts_st2->payload_flags & AV_PKT_FLAG_KEY, stream_id);
+ ts_st2->payload_size = 0;
+ sum += ret;
+ break;
+ case 1: // PARALLEL MODE
+ case 0:
+ ret = write_pkt_pes_stream(s, st2, ts_st2->payload + ts_st2->payload_top,
+ ts_st2->payload_size - ts_st2->payload_top, write_mode,
+ ts_st2->payload_pts, ts_st2->payload_dts,
+ ts_st2->payload_flags & AV_PKT_FLAG_KEY, stream_id);
+ ts_st2->payload_top += ret;
+ sum += ret;
+ break;
+ default:
+ continue;
+ }
+
+ if (ts_st2->payload_size && ts_st2->payload_top == ts_st2->payload_size) {
+ ts_st2->payload_size = 0;
+ ts_st2->payload_top = 0;
+ ts_st2->pes_flags = 0;
+ }
+ }
+ return sum;
+}
+
+static int write_with_side_streams(AVFormatContext *s, AVStream *st, int payload_top,
+ const uint8_t *payload, int payload_size,
+ int64_t pts, int64_t dts, int64_t delay, int key, int stream_id)
+{
+ int force = payload_size - payload_top > MAX_PES_PAYLOAD ? 1 : 0;
+ if (force)
+ av_log(s, AV_LOG_WARNING, "PES packet oversized, full sequential writing required\n");
+ if (payload_size && payload_top != payload_size) {
+ do {
+ payload_top += mpegts_write_pes(s, st, payload + payload_top,
+ payload_size - payload_top, payload_top ? 1 : 0,
+ pts, dts, key, stream_id);
+ } while (force && payload_size != payload_top);
+ }
+ write_side_streams(s, dts, delay, stream_id, 1);
+ return payload_top;
+}
+
static int mpegts_write_packet_internal(AVFormatContext *s, AVPacket *pkt)
{
AVStream *st = s->streams[pkt->stream_index];
@@ -1739,53 +1861,77 @@ static int mpegts_write_packet_internal(AVFormatContext *s, AVPacket *pkt)
}
}
- if (pkt->dts != AV_NOPTS_VALUE) {
- int i;
- for(i=0; i<s->nb_streams; i++) {
- AVStream *st2 = s->streams[i];
- MpegTSWriteStream *ts_st2 = st2->priv_data;
- if ( ts_st2->payload_size
- && (ts_st2->payload_dts == AV_NOPTS_VALUE || dts - ts_st2->payload_dts > delay/2)) {
- mpegts_write_pes(s, st2, ts_st2->payload, ts_st2->payload_size,
- ts_st2->payload_pts, ts_st2->payload_dts,
- ts_st2->payload_flags & AV_PKT_FLAG_KEY, stream_id);
- ts_st2->payload_size = 0;
- }
- }
+ // Write pending PES packets (SEQUENTIAL MODE)
+ if (pkt->dts != AV_NOPTS_VALUE && !ts->parallel_mux) {
+ write_side_streams(s, dts, delay, stream_id, 0);
}
+ // Complete a new PES packet (new incoming data will go into another PES)
if (ts_st->payload_size && (ts_st->payload_size + size > ts->pes_payload_size ||
- (dts != AV_NOPTS_VALUE && ts_st->payload_dts != AV_NOPTS_VALUE &&
- av_compare_ts(dts - ts_st->payload_dts, st->time_base,
- s->max_delay, AV_TIME_BASE_Q) >= 0) ||
- ts_st->opus_queued_samples + opus_samples >= 5760 /* 120ms */)) {
- mpegts_write_pes(s, st, ts_st->payload, ts_st->payload_size,
- ts_st->payload_pts, ts_st->payload_dts,
- ts_st->payload_flags & AV_PKT_FLAG_KEY, stream_id);
- ts_st->payload_size = 0;
- ts_st->opus_queued_samples = 0;
+ (dts != AV_NOPTS_VALUE && ts_st->payload_dts != AV_NOPTS_VALUE &&
+ av_compare_ts(dts - ts_st->payload_dts, st->time_base,
+ s->max_delay, AV_TIME_BASE_Q) >= 0) ||
+ ts_st->opus_queued_samples + opus_samples >= 5760 /* 120ms */)) {
+ if (!ts->parallel_mux || ts_st->opus_queued_samples) {
+ write_full_pes_stream(s, st, ts_st->payload, ts_st->payload_size,
+ ts_st->payload_pts, ts_st->payload_dts,
+ ts_st->payload_flags & AV_PKT_FLAG_KEY, stream_id);
+ ts_st->payload_size = 0;
+ ts_st->opus_queued_samples = 0;
+ } else {
+ ts_st->pes_flags |= PES_START_FLAG | PES_FULL_FLAG;
+ }
+ }
+
+ // Write pending PES packets (PARALLEL MODE)
+ if (pkt->dts != AV_NOPTS_VALUE && ts->parallel_mux && (ts_st->pes_flags & PES_START_FLAG)) {
+ // Empty previous pending PES packet before starting a new one or write one packet
+ do {
+ write_with_side_streams(s, st, 0,
+ ts_st->payload, 0,
+ ts_st->payload_pts, ts_st->payload_dts, delay,
+ ts_st->payload_flags & AV_PKT_FLAG_KEY, stream_id);
+ } while (ts_st->pes_flags & PES_START_FLAG);
}
+ // Directly write a new PES packet with the incoming data
if (st->codecpar->codec_type != AVMEDIA_TYPE_AUDIO || size > ts->pes_payload_size) {
av_assert0(!ts_st->payload_size);
// for video and subtitle, write a single pes packet
- mpegts_write_pes(s, st, buf, size, pts, dts,
- pkt->flags & AV_PKT_FLAG_KEY, stream_id);
- ts_st->opus_queued_samples = 0;
- av_free(data);
- return 0;
+ if (!ts->parallel_mux || ts_st->opus_queued_samples) {
+ write_full_pes_stream(s, st, buf, size, pts, dts,
+ pkt->flags & AV_PKT_FLAG_KEY, stream_id);
+ ts_st->opus_queued_samples = 0;
+ goto free;
+ } else {
+ ts_st->payload_top = write_with_side_streams(s, st, 0,
+ buf, size,
+ pts, dts, delay,
+ pkt->flags & AV_PKT_FLAG_KEY, stream_id);
+ if (ts_st->payload_top == size) {
+ ts_st->payload_size = 0;
+ ts_st->payload_top = 0;
+ ts_st->pes_flags = 0;
+ goto free;
+ }
+ ts_st->pes_flags |= PES_START_FLAG;
+ ts_st->payload_size = 0; // this value will be set later
+ }
}
+ // Start a new PES packet
if (!ts_st->payload_size) {
ts_st->payload_pts = pts;
ts_st->payload_dts = dts;
ts_st->payload_flags = pkt->flags;
}
+ // Enqueue data in the current PES packet
memcpy(ts_st->payload + ts_st->payload_size, buf, size);
ts_st->payload_size += size;
ts_st->opus_queued_samples += opus_samples;
+free:
av_free(data);
return 0;
@@ -1794,13 +1940,20 @@ static int mpegts_write_packet_internal(AVFormatContext *s, AVPacket *pkt)
static void mpegts_write_flush(AVFormatContext *s)
{
int i;
+ MpegTSWrite *ts = s->priv_data;
/* flush current packets */
for (i = 0; i < s->nb_streams; i++) {
AVStream *st = s->streams[i];
MpegTSWriteStream *ts_st = st->priv_data;
if (ts_st->payload_size > 0) {
- mpegts_write_pes(s, st, ts_st->payload, ts_st->payload_size,
+ if (!ts->parallel_mux || ts_st->payload_top == 0)
+ write_full_pes_stream(s, st, ts_st->payload, ts_st->payload_size,
+ ts_st->payload_pts, ts_st->payload_dts,
+ ts_st->payload_flags & AV_PKT_FLAG_KEY, -1);
+ else
+ write_flush_pes_stream(s, st, ts_st->payload + ts_st->payload_top,
+ ts_st->payload_size - ts_st->payload_top,
ts_st->payload_pts, ts_st->payload_dts,
ts_st->payload_flags & AV_PKT_FLAG_KEY, -1);
ts_st->payload_size = 0;
@@ -1920,6 +2073,9 @@ static const AVOption options[] = {
{ "mpegts_m2ts_mode", "Enable m2ts mode.",
offsetof(MpegTSWrite, m2ts_mode), AV_OPT_TYPE_BOOL,
{ .i64 = -1 }, -1, 1, AV_OPT_FLAG_ENCODING_PARAM },
+ { "mpegts_interleave_mux", "Enable interleaved (non-Sequential) muxing mode.",
+ offsetof(MpegTSWrite, parallel_mux), AV_OPT_TYPE_BOOL,
+ { .i64 = 0 }, 0, 1, AV_OPT_FLAG_ENCODING_PARAM },
{ "muxrate", NULL,
offsetof(MpegTSWrite, mux_rate), AV_OPT_TYPE_INT,
{ .i64 = 1 }, 0, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM },