diff mbox series

[FFmpeg-devel,26/27] WIP fftools/ffmpeg_mux: convert to the scheduler

Message ID 20230919191044.18873-27-anton@khirnov.net
State New
Headers show
Series [FFmpeg-devel,01/27] fftools/ffmpeg: move derivation of frame duration from filter framerate | expand

Commit Message

Anton Khirnov Sept. 19, 2023, 7:10 p.m. UTC
---
 fftools/ffmpeg.c          |  26 ------
 fftools/ffmpeg.h          |   8 --
 fftools/ffmpeg_mux.c      | 184 ++++++++++++--------------------------
 fftools/ffmpeg_mux.h      |  14 ++-
 fftools/ffmpeg_mux_init.c |  33 +++----
 fftools/ffmpeg_opt.c      |   6 +-
 6 files changed, 80 insertions(+), 191 deletions(-)
diff mbox series

Patch

diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index 69e73b84ed..e82d88b3e0 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -806,38 +806,12 @@  int trigger_fix_sub_duration_heartbeat(OutputStream *ost, const AVPacket *pkt)
 static int process_input_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
 {
     InputFile *f = input_files[ist->file_index];
-    int64_t dts_est = AV_NOPTS_VALUE;
     int ret = 0;
     int eof_reached = 0;
 
     if (ret == AVERROR_EOF || (!pkt && !ist->decoding_needed))
         eof_reached = 1;
 
-    if (pkt && pkt->opaque_ref) {
-        DemuxPktData *pd = (DemuxPktData*)pkt->opaque_ref->data;
-        dts_est = pd->dts_est;
-    }
-
-    if (f->recording_time != INT64_MAX) {
-        int64_t start_time = 0;
-        if (copy_ts) {
-            start_time += f->start_time != AV_NOPTS_VALUE ? f->start_time : 0;
-            start_time += start_at_zero ? 0 : f->start_time_effective;
-        }
-        if (dts_est >= f->recording_time + start_time)
-            pkt = NULL;
-    }
-
-    for (int oidx = 0; oidx < ist->nb_outputs; oidx++) {
-        OutputStream *ost = ist->outputs[oidx];
-        if (ost->enc || (!pkt && no_eof))
-            continue;
-
-        ret = of_streamcopy(ost, pkt, dts_est);
-        if (ret < 0)
-            return ret;
-    }
-
     return !eof_reached;
 }
 
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index 66b5e398bc..e3aea75415 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -665,7 +665,6 @@  extern FilterGraph **filtergraphs;
 extern int        nb_filtergraphs;
 
 extern char *vstats_filename;
-extern char *sdp_filename;
 
 extern float dts_delta_threshold;
 extern float dts_error_threshold;
@@ -813,13 +812,6 @@  void of_free(OutputFile **pof);
 
 void of_enc_stats_close(void);
 
-int of_output_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt);
-
-/**
- * @param dts predicted packet dts in AV_TIME_BASE_Q
- */
-int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts);
-
 int64_t of_filesize(OutputFile *of);
 
 int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch);
diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c
index 9628728d95..ba90079266 100644
--- a/fftools/ffmpeg_mux.c
+++ b/fftools/ffmpeg_mux.c
@@ -22,16 +22,13 @@ 
 
 #include "ffmpeg.h"
 #include "ffmpeg_mux.h"
-#include "objpool.h"
 #include "sync_queue.h"
-#include "thread_queue.h"
 
 #include "libavutil/fifo.h"
 #include "libavutil/intreadwrite.h"
 #include "libavutil/log.h"
 #include "libavutil/mem.h"
 #include "libavutil/timestamp.h"
-#include "libavutil/thread.h"
 
 #include "libavcodec/packet.h"
 
@@ -42,8 +39,6 @@  typedef struct MuxThreadContext {
     AVPacket *pkt;
 } MuxThreadContext;
 
-int want_sdp = 1;
-
 static Muxer *mux_from_of(OutputFile *of)
 {
     return (Muxer*)of;
@@ -254,19 +249,22 @@  void *muxer_thread(void *arg)
         OutputStream *ost;
         int stream_idx, stream_eof = 0;
 
-        ret = tq_receive(mux->tq, &stream_idx, mt.pkt);
+        ret = sch_mux_receive(mux->sch, of->index, mt.pkt);
+        stream_idx = mt.pkt->stream_index;
         if (stream_idx < 0) {
             av_log(mux, AV_LOG_VERBOSE, "All streams finished\n");
             ret = 0;
             break;
         }
 
-        ost = of->streams[stream_idx];
+        ost = of->streams[mux->sch_stream_idx[stream_idx]];
+        mt.pkt->stream_index = ost->index;
+
         ret = sync_queue_process(mux, ost, ret < 0 ? NULL : mt.pkt, &stream_eof);
         av_packet_unref(mt.pkt);
         if (ret == AVERROR_EOF) {
             if (stream_eof) {
-                tq_receive_finish(mux->tq, stream_idx);
+                sch_mux_receive_finish(mux->sch, of->index, stream_idx);
             } else {
                 av_log(mux, AV_LOG_VERBOSE, "Muxer returned EOF\n");
                 ret = 0;
@@ -278,17 +276,19 @@  void *muxer_thread(void *arg)
         }
     }
 
+    // XXX move av_write_trailer() here?
+
 finish:
     mux_thread_uninit(&mt);
 
-    for (unsigned int i = 0; i < mux->fc->nb_streams; i++)
-        tq_receive_finish(mux->tq, i);
+    sch_mux_receive_finish(mux->sch, of->index, -1);
 
     av_log(mux, AV_LOG_VERBOSE, "Terminating muxer thread\n");
 
     return (void*)(intptr_t)ret;
 }
 
+#if 0
 static int thread_submit_packet(Muxer *mux, OutputStream *ost, AVPacket *pkt)
 {
     int ret = 0;
@@ -296,7 +296,8 @@  static int thread_submit_packet(Muxer *mux, OutputStream *ost, AVPacket *pkt)
     if (!pkt || ost->finished & MUXER_FINISHED)
         goto finish;
 
-    ret = tq_send(mux->tq, ost->index, pkt);
+    // XXX
+    //ret = tq_send(mux->tq, ost->index, pkt);
     if (ret < 0)
         goto finish;
 
@@ -306,8 +307,9 @@  finish:
     if (pkt)
         av_packet_unref(pkt);
 
+    // XXX
     ost->finished |= MUXER_FINISHED;
-    tq_send_finish(mux->tq, ost->index);
+    //tq_send_finish(mux->tq, ost->index);
     return ret == AVERROR_EOF ? 0 : ret;
 }
 
@@ -428,57 +430,48 @@  fail:
     av_log(ost, AV_LOG_ERROR, "Error %s\n", err_msg);
     return exit_on_error ? ret : 0;
 }
+#endif
 
-int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts)
+static int of_streamcopy(OutputStream *ost, AVPacket *pkt, int64_t dts)
 {
     OutputFile *of = output_files[ost->file_index];
     MuxStream  *ms = ms_from_ost(ost);
     int64_t start_time = (of->start_time == AV_NOPTS_VALUE) ? 0 : of->start_time;
     int64_t ts_offset;
-    AVPacket *opkt = ms->pkt;
-    int ret;
-
-    av_packet_unref(opkt);
 
     if (of->recording_time != INT64_MAX &&
         dts >= of->recording_time + start_time)
-        pkt = NULL;
-
-    // EOF: flush output bitstream filters.
-    if (!pkt)
-        return of_output_packet(of, ost, NULL);
+        return AVERROR_EOF;
 
     if (!ms->streamcopy_started && !(pkt->flags & AV_PKT_FLAG_KEY) &&
         !ms->copy_initial_nonkeyframes)
-        return 0;
+        return AVERROR(EAGAIN);
 
     if (!ms->streamcopy_started) {
         if (!ms->copy_prior_start &&
             (pkt->pts == AV_NOPTS_VALUE ?
              dts < ms->ts_copy_start :
              pkt->pts < av_rescale_q(ms->ts_copy_start, AV_TIME_BASE_Q, pkt->time_base)))
-            return 0;
+            return AVERROR(EAGAIN);
 
         if (of->start_time != AV_NOPTS_VALUE && dts < of->start_time)
-            return 0;
+            return AVERROR(EAGAIN);
     }
 
-    ret = av_packet_ref(opkt, pkt);
-    if (ret < 0)
-        return ret;
-
-    ts_offset = av_rescale_q(start_time, AV_TIME_BASE_Q, opkt->time_base);
+    ts_offset = av_rescale_q(start_time, AV_TIME_BASE_Q, pkt->time_base);
 
     if (pkt->pts != AV_NOPTS_VALUE)
-        opkt->pts -= ts_offset;
+        pkt->pts -= ts_offset;
 
     if (pkt->dts == AV_NOPTS_VALUE) {
-        opkt->dts = av_rescale_q(dts, AV_TIME_BASE_Q, opkt->time_base);
+        pkt->dts = av_rescale_q(dts, AV_TIME_BASE_Q, pkt->time_base);
     } else if (ost->st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) {
-        opkt->pts = opkt->dts - ts_offset;
+        pkt->pts = pkt->dts - ts_offset;
     }
-    opkt->dts -= ts_offset;
+    pkt->dts -= ts_offset;
 
+    // XXX
+#if 0
     {
         int ret = trigger_fix_sub_duration_heartbeat(ost, pkt);
         if (ret < 0) {
@@ -488,73 +481,42 @@  int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts)
             return ret;
         }
     }
-
-    ret = of_output_packet(of, ost, opkt);
-    if (ret < 0)
-        return ret;
+#endif
 
     ms->streamcopy_started = 1;
 
     return 0;
 }
 
-static int thread_stop(Muxer *mux)
+int of_streamcopy_hook(void *opaque, void *item)
 {
-    void *ret;
+    OutputStream      *ost = opaque;
+    const InputStream *ist = ost->ist;
+    const InputFile     *f = input_files[ist->file_index];
 
-    if (!mux || !mux->tq)
-        return 0;
+    AVPacket          *pkt = item;
+    int64_t        dts_est = AV_NOPTS_VALUE;
 
-    for (unsigned int i = 0; i < mux->fc->nb_streams; i++)
-        tq_send_finish(mux->tq, i);
+    // nothing for us to do about seek signals
+    if ((intptr_t)pkt->opaque == PKT_OPAQUE_SEEK)
+        return AVERROR(EAGAIN);
 
-    pthread_join(mux->thread, &ret);
-
-    tq_free(&mux->tq);
-
-    return (int)(intptr_t)ret;
-}
-
-static int thread_start(Muxer *mux)
-{
-    AVFormatContext *fc = mux->fc;
-    ObjPool *op;
-    int ret;
-
-    op = objpool_alloc_packets();
-    if (!op)
-        return AVERROR(ENOMEM);
-
-    mux->tq = tq_alloc(fc->nb_streams, mux->thread_queue_size, op, pkt_move);
-    if (!mux->tq) {
-        objpool_free(&op);
-        return AVERROR(ENOMEM);
+    if (pkt->opaque_ref) {
+        DemuxPktData *pd = (DemuxPktData*)pkt->opaque_ref->data;
+        dts_est = pd->dts_est;
     }
 
-    ret = pthread_create(&mux->thread, NULL, muxer_thread, (void*)mux);
-    if (ret) {
-        tq_free(&mux->tq);
-        return AVERROR(ret);
-    }
-
-    /* flush the muxing queues */
-    for (int i = 0; i < fc->nb_streams; i++) {
-        OutputStream *ost = mux->of.streams[i];
-        MuxStream     *ms = ms_from_ost(ost);
-        AVPacket *pkt;
-
-        while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) {
-            ret = thread_submit_packet(mux, ost, pkt);
-            if (pkt) {
-                ms->muxing_queue_data_size -= pkt->size;
-                av_packet_free(&pkt);
-            }
-            if (ret < 0)
-                return ret;
+    if (f->recording_time != INT64_MAX) {
+        int64_t start_time = 0;
+        if (copy_ts) {
+            start_time += f->start_time != AV_NOPTS_VALUE ? f->start_time : 0;
+            start_time += start_at_zero ? 0 : f->start_time_effective;
         }
+        if (dts_est >= f->recording_time + start_time)
+            return AVERROR_EOF;
     }
 
-    return 0;
+    return of_streamcopy(ost, pkt, dts_est);
 }
 
 int print_sdp(const char *filename)
@@ -565,11 +527,6 @@  int print_sdp(const char *filename)
     AVIOContext *sdp_pb;
     AVFormatContext **avc;
 
-    for (i = 0; i < nb_output_files; i++) {
-        if (!mux_from_of(output_files[i])->header_written)
-            return 0;
-    }
-
     avc = av_malloc_array(nb_output_files, sizeof(*avc));
     if (!avc)
         return AVERROR(ENOMEM);
@@ -604,25 +561,17 @@  int print_sdp(const char *filename)
         avio_closep(&sdp_pb);
     }
 
-    // SDP successfully written, allow muxer threads to start
-    ret = 1;
-
 fail:
     av_freep(&avc);
     return ret;
 }
 
-int mux_check_init(Muxer *mux)
+int mux_check_init(void *arg)
 {
+    Muxer     *mux = arg;
     OutputFile *of = &mux->of;
     AVFormatContext *fc = mux->fc;
-    int ret, i;
-
-    for (i = 0; i < fc->nb_streams; i++) {
-        OutputStream *ost = of->streams[i];
-        if (!ost->initialized)
-            return 0;
-    }
+    int ret;
 
     ret = avformat_write_header(fc, &mux->opts);
     if (ret < 0) {
@@ -636,26 +585,6 @@  int mux_check_init(Muxer *mux)
     av_dump_format(fc, of->index, fc->url, 1);
     nb_output_dumped++;
 
-    if (sdp_filename || want_sdp) {
-        ret = print_sdp(sdp_filename);
-        if (ret < 0) {
-            av_log(NULL, AV_LOG_ERROR, "Error writing the SDP.\n");
-            return ret;
-        } else if (ret == 1) {
-            /* SDP is written only after all the muxers are ready, so now we
-             * start ALL the threads */
-            for (i = 0; i < nb_output_files; i++) {
-                ret = thread_start(mux_from_of(output_files[i]));
-                if (ret < 0)
-                    return ret;
-            }
-        }
-    } else {
-        ret = thread_start(mux_from_of(of));
-        if (ret < 0)
-            return ret;
-    }
-
     return 0;
 }
 
@@ -711,9 +640,10 @@  int of_stream_init(OutputFile *of, OutputStream *ost)
                                          ost->st->time_base);
     }
 
-    ost->initialized = 1;
+    if (ms->sched_idx >= 0)
+        return sch_mux_stream_ready(mux->sch, of->index, ms->sched_idx);
 
-    return mux_check_init(mux);
+    return 0;
 }
 
 static int check_written(OutputFile *of)
@@ -827,15 +757,13 @@  int of_write_trailer(OutputFile *of)
     AVFormatContext *fc = mux->fc;
     int ret, mux_result = 0;
 
-    if (!mux->tq) {
+    if (!mux->header_written) {
         av_log(mux, AV_LOG_ERROR,
                "Nothing was written into output file, because "
                "at least one of its streams received no packets.\n");
         return AVERROR(EINVAL);
     }
 
-    mux_result = thread_stop(mux);
-
     ret = av_write_trailer(fc);
     if (ret < 0) {
         av_log(mux, AV_LOG_ERROR, "Error writing trailer: %s\n", av_err2str(ret));
@@ -951,8 +879,6 @@  void of_free(OutputFile **pof)
         return;
     mux = mux_from_of(of);
 
-    thread_stop(mux);
-
     sq_free(&of->sq_encode);
     sq_free(&mux->sq_mux);
 
diff --git a/fftools/ffmpeg_mux.h b/fftools/ffmpeg_mux.h
index d5aba6db36..311b612018 100644
--- a/fftools/ffmpeg_mux.h
+++ b/fftools/ffmpeg_mux.h
@@ -25,7 +25,6 @@ 
 #include <stdint.h>
 
 #include "ffmpeg_sched.h"
-#include "thread_queue.h"
 
 #include "libavformat/avformat.h"
 
@@ -33,7 +32,6 @@ 
 
 #include "libavutil/dict.h"
 #include "libavutil/fifo.h"
-#include "libavutil/thread.h"
 
 typedef struct MuxStream {
     OutputStream ost;
@@ -104,9 +102,6 @@  typedef struct Muxer {
     int         *sch_stream_idx;
     int       nb_sch_stream_idx;
 
-    pthread_t    thread;
-    ThreadQueue *tq;
-
     AVDictionary *opts;
 
     int thread_queue_size;
@@ -120,14 +115,15 @@  typedef struct Muxer {
     AVPacket *sq_pkt;
 } Muxer;
 
-/* whether we want to print an SDP, set in of_open() */
-extern int want_sdp;
-
-int mux_check_init(Muxer *mux);
+int mux_check_init(void *arg);
 
 static MuxStream *ms_from_ost(OutputStream *ost)
 {
     return (MuxStream*)ost;
 }
 
+/* XXX explain why this is needed
+ * (so it's called in the sending thread rather than the muxer) */
+int of_streamcopy_hook(void *opaque, void *item);
+
 #endif /* FFTOOLS_FFMPEG_MUX_H */
diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c
index 9fb6a74393..122a7344e4 100644
--- a/fftools/ffmpeg_mux_init.c
+++ b/fftools/ffmpeg_mux_init.c
@@ -1375,7 +1375,7 @@  static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
 
     MATCH_PER_STREAM_OPT(bitstream_filters, str, bsfs, oc, st);
     if (bsfs && *bsfs) {
-        ret = av_bsf_list_parse_str(bsfs, &ms->bsf_ctx);
+        ret = sch_add_mux_stream_bsf(mux->sch, mux->of.index, ost->index, bsfs);
         if (ret < 0) {
             av_log(ost, AV_LOG_ERROR, "Error parsing bitstream filter sequence '%s': %s\n", bsfs, av_err2str(ret));
             return ret;
@@ -1486,7 +1486,8 @@  static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
                 return ret;
         } else {
             ret = sch_connect(mux->sch, SCH_DSTREAM(ost->ist->file_index, sched_idx),
-                              SCH_MSTREAM(ost->file_index, ms->sched_idx), NULL, NULL);
+                              SCH_MSTREAM(ost->file_index, ms->sched_idx),
+                              of_streamcopy_hook, ost);
             if (ret < 0)
                 return ret;
         }
@@ -1931,11 +1932,17 @@  static int setup_sync_queues(Muxer *mux, AVFormatContext *oc, int64_t buf_size_u
      * - at least one encoded audio/video stream is frame-limited, since
      *   that has similar semantics to 'shortest'
      * - at least one audio encoder requires constant frame sizes
+     *
+     * Note that encoding sync queues are handled in the scheduler, because
+     * different encoders run in different threads and need external
+     * synchronization, while muxer sync queues can be handled inside the muxer
      */
     if ((of->shortest && nb_av_enc > 1) || limit_frames_av_enc || nb_audio_fs) {
-        of->sq_encode = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, mux);
-        if (!of->sq_encode)
-            return AVERROR(ENOMEM);
+        int sq_idx, ret;
+
+        sq_idx = sch_add_sq_enc(mux->sch, buf_size_us, mux);
+        if (sq_idx < 0)
+            return sq_idx;
 
         for (int i = 0; i < oc->nb_streams; i++) {
             OutputStream *ost = of->streams[i];
@@ -1945,13 +1952,11 @@  static int setup_sync_queues(Muxer *mux, AVFormatContext *oc, int64_t buf_size_u
             if (!IS_AV_ENC(ost, type))
                 continue;
 
-            ost->sq_idx_encode = sq_add_stream(of->sq_encode,
-                                               of->shortest || ms->max_frames < INT64_MAX);
-            if (ost->sq_idx_encode < 0)
-                return ost->sq_idx_encode;
-
-            if (ms->max_frames != INT64_MAX)
-                sq_limit_frames(of->sq_encode, ost->sq_idx_encode, ms->max_frames);
+            ret = sch_sq_add_enc(mux->sch, sq_idx, ms->sched_idx_enc,
+                                 of->shortest || ms->max_frames < INT64_MAX,
+                                 ms->max_frames);
+            if (ret < 0)
+                return ret;
         }
     }
 
@@ -2704,8 +2709,6 @@  int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
     av_strlcat(mux->log_name, "/",               sizeof(mux->log_name));
     av_strlcat(mux->log_name, oc->oformat->name, sizeof(mux->log_name));
 
-    if (strcmp(oc->oformat->name, "rtp"))
-        want_sdp = 0;
 
     of->format = oc->oformat;
     if (recording_time != INT64_MAX)
@@ -2721,7 +2724,7 @@  int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
                                            AVFMT_FLAG_BITEXACT);
     }
 
-    err = sch_add_mux(sch, muxer_thread, NULL, mux,
+    err = sch_add_mux(sch, muxer_thread, mux_check_init, mux,
                       !strcmp(oc->oformat->name, "rtp"));
     if (err < 0)
         return err;
diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c
index d463306546..6177a96a4e 100644
--- a/fftools/ffmpeg_opt.c
+++ b/fftools/ffmpeg_opt.c
@@ -64,7 +64,6 @@  const char *const opt_name_top_field_first[]                  = {"top", NULL};
 HWDevice *filter_hw_device;
 
 char *vstats_filename;
-char *sdp_filename;
 
 float audio_drift_threshold = 0.1;
 float dts_delta_threshold   = 10;
@@ -580,9 +579,8 @@  fail:
 
 static int opt_sdp_file(void *optctx, const char *opt, const char *arg)
 {
-    av_free(sdp_filename);
-    sdp_filename = av_strdup(arg);
-    return 0;
+    Scheduler *sch = optctx;
+    return sch_sdp_filename(sch, arg);
 }
 
 #if CONFIG_VAAPI