diff mbox series

[FFmpeg-devel,RFC] avformat/fifo: add timeshift option to delay output

Message ID 20200507202124.10746-1-cus@passwd.hu
State Superseded
Headers show
Series [FFmpeg-devel,RFC] avformat/fifo: add timeshift option to delay output
Related show

Checks

Context Check Description
andriy/default pending
andriy/make success Make finished
andriy/make_fate success Make fate finished

Commit Message

Marton Balint May 7, 2020, 8:21 p.m. UTC
Signed-off-by: Marton Balint <cus@passwd.hu>
---
 libavformat/fifo.c | 59 +++++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 58 insertions(+), 1 deletion(-)

Comments

Tao Zhang May 9, 2020, 2:35 a.m. UTC | #1
I have tested with below commands, It works fine. Thanks Marton.
ffmpeg -i input_rtmp_addr -map 0:v -map 0:a -c copy  -f fifo
-timeshift 20 -queue_size 6000000 -fifo_format flv output_rtmp_addr
ffmpeg -stream_loop -1 -re -i input_file -map 0:v -map 0:a -c copy  -f
fifo -timeshift 20 -queue_size 6000000 -fifo_format flv
output_rtmp_addr

Marton Balint <cus@passwd.hu> 于2020年5月8日周五 上午4:28写道:
>
> Signed-off-by: Marton Balint <cus@passwd.hu>
> ---
>  libavformat/fifo.c | 59 +++++++++++++++++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 58 insertions(+), 1 deletion(-)
>
> diff --git a/libavformat/fifo.c b/libavformat/fifo.c
> index d11dc6626c..17748e94ce 100644
> --- a/libavformat/fifo.c
> +++ b/libavformat/fifo.c
> @@ -19,6 +19,8 @@
>   * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
>   */
>
> +#include <stdatomic.h>
> +
>  #include "libavutil/avassert.h"
>  #include "libavutil/opt.h"
>  #include "libavutil/time.h"
> @@ -77,6 +79,9 @@ typedef struct FifoContext {
>      /* Value > 0 signals queue overflow */
>      volatile uint8_t overflow_flag;
>
> +    atomic_int_least64_t queue_duration;
> +    int64_t last_sent_dts;
> +    int64_t timeshift;
>  } FifoContext;
>
>  typedef struct FifoThreadContext {
> @@ -98,9 +103,12 @@ typedef struct FifoThreadContext {
>       * so finalization by calling write_trailer and ff_io_close must be done
>       * before exiting / reinitialization of underlying muxer */
>      uint8_t header_written;
> +
> +    int64_t last_received_dts;
>  } FifoThreadContext;
>
>  typedef enum FifoMessageType {
> +    FIFO_NOOP,
>      FIFO_WRITE_HEADER,
>      FIFO_WRITE_PACKET,
>      FIFO_FLUSH_OUTPUT
> @@ -159,6 +167,15 @@ static int fifo_thread_flush_output(FifoThreadContext *ctx)
>      return av_write_frame(avf2, NULL);
>  }
>
> +static int64_t next_duration(AVFormatContext *avf, AVPacket *pkt, int64_t *last_dts)
> +{
> +    AVStream *st = avf->streams[pkt->stream_index];
> +    int64_t dts = av_rescale_q(pkt->dts, st->time_base, AV_TIME_BASE_Q);
> +    int64_t duration = (*last_dts == AV_NOPTS_VALUE ? 0 : dts - *last_dts);
> +    *last_dts = dts;
> +    return duration;
> +}
> +
>  static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt)
>  {
>      AVFormatContext *avf = ctx->avf;
> @@ -167,6 +184,9 @@ static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt)
>      AVRational src_tb, dst_tb;
>      int ret, s_idx;
>
> +    if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE)
> +        atomic_fetch_sub_explicit(&fifo->queue_duration, next_duration(avf, pkt, &ctx->last_received_dts), memory_order_relaxed);
> +
>      if (ctx->drop_until_keyframe) {
>          if (pkt->flags & AV_PKT_FLAG_KEY) {
>              ctx->drop_until_keyframe = 0;
> @@ -209,6 +229,9 @@ static int fifo_thread_dispatch_message(FifoThreadContext *ctx, FifoMessage *msg
>  {
>      int ret = AVERROR(EINVAL);
>
> +    if (msg->type == FIFO_NOOP)
> +        return 0;
> +
>      if (!ctx->header_written) {
>          ret = fifo_thread_write_header(ctx);
>          if (ret < 0)
> @@ -390,12 +413,13 @@ static void *fifo_consumer_thread(void *data)
>      AVFormatContext *avf = data;
>      FifoContext *fifo = avf->priv_data;
>      AVThreadMessageQueue *queue = fifo->queue;
> -    FifoMessage msg = {FIFO_WRITE_HEADER, {0}};
> +    FifoMessage msg = {fifo->timeshift ? FIFO_NOOP : FIFO_WRITE_HEADER, {0}};
>      int ret;
>
>      FifoThreadContext fifo_thread_ctx;
>      memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext));
>      fifo_thread_ctx.avf = avf;
> +    fifo_thread_ctx.last_received_dts = AV_NOPTS_VALUE;
>
>      while (1) {
>          uint8_t just_flushed = 0;
> @@ -429,6 +453,10 @@ static void *fifo_consumer_thread(void *data)
>          if (just_flushed)
>              av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
>
> +        if (fifo->timeshift)
> +            while (atomic_load_explicit(&fifo->queue_duration, memory_order_relaxed) < fifo->timeshift)
> +                av_usleep(10000);
> +
>          ret = av_thread_message_queue_recv(queue, &msg, 0);
>          if (ret < 0) {
>              av_thread_message_queue_set_err_send(queue, ret);
> @@ -488,6 +516,8 @@ static int fifo_init(AVFormatContext *avf)
>                 " only when drop_pkts_on_overflow is also turned on\n");
>          return AVERROR(EINVAL);
>      }
> +    atomic_init(&fifo->queue_duration, 0);
> +    fifo->last_sent_dts = AV_NOPTS_VALUE;
>
>      oformat = av_guess_format(fifo->format, avf->url, NULL);
>      if (!oformat) {
> @@ -563,6 +593,9 @@ static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
>          goto fail;
>      }
>
> +    if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE)
> +        atomic_fetch_add_explicit(&fifo->queue_duration, next_duration(avf, pkt, &fifo->last_sent_dts), memory_order_relaxed);
> +
>      return ret;
>  fail:
>      if (pkt)
> @@ -576,6 +609,27 @@ static int fifo_write_trailer(AVFormatContext *avf)
>      int ret;
>
>      av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF);
> +    if (fifo->timeshift) {
> +        int64_t now = av_gettime_relative();
> +        int64_t elapsed = 0;
> +        FifoMessage msg = {FIFO_NOOP};
> +        do {
> +            int64_t delay = av_gettime_relative() - now;
> +            if (delay < 0) { // Discontinuity?
> +                delay = 10000;
> +                now = av_gettime_relative();
> +            } else {
> +                now += delay;
> +            }
> +            atomic_fetch_add_explicit(&fifo->queue_duration, delay, memory_order_relaxed);
> +            elapsed += delay;
> +            if (elapsed > fifo->timeshift)
> +                break;
> +            av_usleep(10000);
> +            ret = av_thread_message_queue_send(fifo->queue, &msg, AV_THREAD_MESSAGE_NONBLOCK);
> +        } while (ret >= 0 || ret == AVERROR(EAGAIN));
> +        atomic_store(&fifo->queue_duration, INT64_MAX);
> +    }
>
>      ret = pthread_join(fifo->writer_thread, NULL);
>      if (ret < 0) {
> @@ -630,6 +684,9 @@ static const AVOption options[] = {
>          {"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error),
>           AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
>
> +        {"timeshift", "Delay fifo output", OFFSET(timeshift),
> +         AV_OPT_TYPE_DURATION, {.i64 = 0}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
> +
>          {NULL},
>  };
>
> --
> 2.16.4
>
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel@ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
diff mbox series

Patch

diff --git a/libavformat/fifo.c b/libavformat/fifo.c
index d11dc6626c..17748e94ce 100644
--- a/libavformat/fifo.c
+++ b/libavformat/fifo.c
@@ -19,6 +19,8 @@ 
  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  */
 
+#include <stdatomic.h>
+
 #include "libavutil/avassert.h"
 #include "libavutil/opt.h"
 #include "libavutil/time.h"
@@ -77,6 +79,9 @@  typedef struct FifoContext {
     /* Value > 0 signals queue overflow */
     volatile uint8_t overflow_flag;
 
+    atomic_int_least64_t queue_duration;
+    int64_t last_sent_dts;
+    int64_t timeshift;
 } FifoContext;
 
 typedef struct FifoThreadContext {
@@ -98,9 +103,12 @@  typedef struct FifoThreadContext {
      * so finalization by calling write_trailer and ff_io_close must be done
      * before exiting / reinitialization of underlying muxer */
     uint8_t header_written;
+
+    int64_t last_received_dts;
 } FifoThreadContext;
 
 typedef enum FifoMessageType {
+    FIFO_NOOP,
     FIFO_WRITE_HEADER,
     FIFO_WRITE_PACKET,
     FIFO_FLUSH_OUTPUT
@@ -159,6 +167,15 @@  static int fifo_thread_flush_output(FifoThreadContext *ctx)
     return av_write_frame(avf2, NULL);
 }
 
+static int64_t next_duration(AVFormatContext *avf, AVPacket *pkt, int64_t *last_dts)
+{
+    AVStream *st = avf->streams[pkt->stream_index];
+    int64_t dts = av_rescale_q(pkt->dts, st->time_base, AV_TIME_BASE_Q);
+    int64_t duration = (*last_dts == AV_NOPTS_VALUE ? 0 : dts - *last_dts);
+    *last_dts = dts;
+    return duration;
+}
+
 static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt)
 {
     AVFormatContext *avf = ctx->avf;
@@ -167,6 +184,9 @@  static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt)
     AVRational src_tb, dst_tb;
     int ret, s_idx;
 
+    if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE)
+        atomic_fetch_sub_explicit(&fifo->queue_duration, next_duration(avf, pkt, &ctx->last_received_dts), memory_order_relaxed);
+
     if (ctx->drop_until_keyframe) {
         if (pkt->flags & AV_PKT_FLAG_KEY) {
             ctx->drop_until_keyframe = 0;
@@ -209,6 +229,9 @@  static int fifo_thread_dispatch_message(FifoThreadContext *ctx, FifoMessage *msg
 {
     int ret = AVERROR(EINVAL);
 
+    if (msg->type == FIFO_NOOP)
+        return 0;
+
     if (!ctx->header_written) {
         ret = fifo_thread_write_header(ctx);
         if (ret < 0)
@@ -390,12 +413,13 @@  static void *fifo_consumer_thread(void *data)
     AVFormatContext *avf = data;
     FifoContext *fifo = avf->priv_data;
     AVThreadMessageQueue *queue = fifo->queue;
-    FifoMessage msg = {FIFO_WRITE_HEADER, {0}};
+    FifoMessage msg = {fifo->timeshift ? FIFO_NOOP : FIFO_WRITE_HEADER, {0}};
     int ret;
 
     FifoThreadContext fifo_thread_ctx;
     memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext));
     fifo_thread_ctx.avf = avf;
+    fifo_thread_ctx.last_received_dts = AV_NOPTS_VALUE;
 
     while (1) {
         uint8_t just_flushed = 0;
@@ -429,6 +453,10 @@  static void *fifo_consumer_thread(void *data)
         if (just_flushed)
             av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
 
+        if (fifo->timeshift)
+            while (atomic_load_explicit(&fifo->queue_duration, memory_order_relaxed) < fifo->timeshift)
+                av_usleep(10000);
+
         ret = av_thread_message_queue_recv(queue, &msg, 0);
         if (ret < 0) {
             av_thread_message_queue_set_err_send(queue, ret);
@@ -488,6 +516,8 @@  static int fifo_init(AVFormatContext *avf)
                " only when drop_pkts_on_overflow is also turned on\n");
         return AVERROR(EINVAL);
     }
+    atomic_init(&fifo->queue_duration, 0);
+    fifo->last_sent_dts = AV_NOPTS_VALUE;
 
     oformat = av_guess_format(fifo->format, avf->url, NULL);
     if (!oformat) {
@@ -563,6 +593,9 @@  static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
         goto fail;
     }
 
+    if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE)
+        atomic_fetch_add_explicit(&fifo->queue_duration, next_duration(avf, pkt, &fifo->last_sent_dts), memory_order_relaxed);
+
     return ret;
 fail:
     if (pkt)
@@ -576,6 +609,27 @@  static int fifo_write_trailer(AVFormatContext *avf)
     int ret;
 
     av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF);
+    if (fifo->timeshift) {
+        int64_t now = av_gettime_relative();
+        int64_t elapsed = 0;
+        FifoMessage msg = {FIFO_NOOP};
+        do {
+            int64_t delay = av_gettime_relative() - now;
+            if (delay < 0) { // Discontinuity?
+                delay = 10000;
+                now = av_gettime_relative();
+            } else {
+                now += delay;
+            }
+            atomic_fetch_add_explicit(&fifo->queue_duration, delay, memory_order_relaxed);
+            elapsed += delay;
+            if (elapsed > fifo->timeshift)
+                break;
+            av_usleep(10000);
+            ret = av_thread_message_queue_send(fifo->queue, &msg, AV_THREAD_MESSAGE_NONBLOCK);
+        } while (ret >= 0 || ret == AVERROR(EAGAIN));
+        atomic_store(&fifo->queue_duration, INT64_MAX);
+    }
 
     ret = pthread_join(fifo->writer_thread, NULL);
     if (ret < 0) {
@@ -630,6 +684,9 @@  static const AVOption options[] = {
         {"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error),
          AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
 
+        {"timeshift", "Delay fifo output", OFFSET(timeshift),
+         AV_OPT_TYPE_DURATION, {.i64 = 0}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
+
         {NULL},
 };