diff mbox

[FFmpeg-devel,v5,01/11] avformat: Add fifo pseudo-muxer

Message ID 1470312719-16591-1-git-send-email-sebechlebskyjan@gmail.com
State Superseded
Headers show

Commit Message

sebechlebskyjan@gmail.com Aug. 4, 2016, 12:11 p.m. UTC
From: Jan Sebechlebsky <sebechlebskyjan@gmail.com>

Signed-off-by: Jan Sebechlebsky <sebechlebskyjan@gmail.com>
---
 Changes from the last version of patch:
  - boolean AVOptions are now ints, this was the cause of fate test
    segfault reported by Michael

 Changelog                |   1 +
 configure                |   1 +
 doc/muxers.texi          |  90 +++++++
 libavformat/Makefile     |   1 +
 libavformat/allformats.c |   1 +
 libavformat/fifo.c       | 674 +++++++++++++++++++++++++++++++++++++++++++++++
 libavformat/version.h    |   2 +-
 7 files changed, 769 insertions(+), 1 deletion(-)
 create mode 100644 libavformat/fifo.c

Comments

Marton Balint Aug. 8, 2016, 9:55 p.m. UTC | #1
On Thu, 4 Aug 2016, sebechlebskyjan@gmail.com wrote:

> From: Jan Sebechlebsky <sebechlebskyjan@gmail.com>
>
> Signed-off-by: Jan Sebechlebsky <sebechlebskyjan@gmail.com>

applying the patches with "git am" seems to report some whitespace 
issues (empty lines at the end of files) could you have a look at those?

[...]

> --- a/doc/muxers.texi
> +++ b/doc/muxers.texi
> @@ -1436,6 +1436,96 @@ Specify whether to remove all fragments when finished. Default 0 (do not remove)
> 
> @end table
> 
> +@section fifo
> +
> +The fifo pseudo-muxer allows to separate encoding from any other muxer by using

... allows the separation of encoding and muxing by using a ...

> +first-in-first-out queue and running the actual muxer in a separate thread. This
> +is especially useful in combination with the @ref{tee} muxer and output to

I get errors for this line when builiding the docs:
doc/muxers.texi:1443: @ref reference to nonexistent node `tee'

> +several destinations with different reliability/writing speed/latency.
> +
> +The behavior of fifo muxer in case of failure can be configured:

The behaviour of the fifo muxer if the queue fills up or if the output 
fails is selectable,

> +@itemize @bullet
> +
> +@item
> +output can be transparently restarted with configurable delay between retries
> +based on real time or time of the processed stream.
> +
> +@item
> +encoding can be blocked during temporary failure, or continue transparently
> +dropping packets in case fifo queue fills up.
> +
> +@end itemize
> +
> +@table @option
> +
> +@item fifo_format
> +Specify the format name. Useful if it cannot be guessed from the
> +output name suffix.
> +
> +@item queue_size
> +Specify size of the queue (number of packets). Default value is 60.
> +
> +@item format_opts
> +Specify format options for the underlying muxer. Muxer options can be specified
> +as a list of @var{key}=@var{value} pairs separated by ':'.
> +
> +@item drop_pkts_on_overflow @var{bool}
> +If set to 1 (true), in case the fifo queue fills up, packets will be dropped
> +rather than blocking the encoder. This allows to continue streaming without
> +delaying the output, at the cost of ommiting part of the stream. By default

delaying the input you mean?

> +this option is set to 0 (false), so in such cases the encoder will be blocked
> +until the muxer processes some of the packets and none of them is lost.
> +
> +@item attempt_recovery @var{bool}
> +If failure occurs, attempt to recover the output. This is especially useful
> +when used with network output, allows to restart streaming transparently.
> +By default this option set to 0 (false).
> +
> +@item max_recovery_attempts
> +Sets maximum number of successive unsucessful recovery attempts after which
> +the output fails permanently. Unlimited if set to zero. Default value is 16.
> +
> +@item recovery_wait_time @var{duration}
> +Waiting time before the next recovery attempt after previous unsuccessfull
> +recovery attempt. Default value is 5 seconds.
> +
> +s@item recovery_wait_streamtime @var{bool}
> +If set to 0 (false), the real time is used when waiting for the recovery
> +attempt (i.e. the recovery will be attempted after at least
> +recovery_wait_time seconds).
> +If set to 1 (true), the time of the processed stream is taken into account
> +instead (i.e. the recovery will be attempted after at least recovery_wait_time
> +seconds of the stream is omitted).
> +By default, this option is set to 0 (false).
> +
> +@item recover_any_error @var{bool}
> +If set to 1 (true), recovery will be attempted regardless of type of the error
> +causing the failure. By default this option is set to 0 (false) and in case of
> +certain errors the recovery is not attempted even when @ref{attempt_recovery}

... in case of certain (usually permanent) errors ...

I also get an error when building docs:
doc/muxers.texi:1504: @ref reference to nonexistent node `attempt_recovery'

> +is set to 1.
> +
> +@item restart_with_keyframe @var{bool}
> +Specify whether to wait for the keyframe after recovering from
> +queue overflow or failure. This option is set to 0 (false) by default.
> +
> +@end table
> +
> +@subsection Examples
> +
> +@itemize
> +
> +@item
> +Stream something to rtmp server, continue processing the stream at real-time
> +rate even in case of temporary failure (network outage) and attempt to recover
> +streaming every second indefinitely.
> +@example
> +ffmpeg -re -i ... -c:v libx264 -c:a mp2 -f fifo -fifo_format flv -map 0:v -map 0:a
> +  -block_on_overflow 0 -attempt_recovery 1 -recovery_wait_time 1
> +  -max_recovery_attempts 0 rtmp://example.com/live/stream_name
> +@end example
> +
> +@end itemize
> +
> @section tee
>

[...]

> +    /* If >0 recovery will be attempted regardless of error code
> +     * (except AVERROR_EXIT, so exit request is never ignored) */
> +    int recover_any_error;
> +
> +    /* Whether to drop packets in case the queue is full. */
> +    int drop_pkts_on_overflow;
> +
> +    /* Whether to wait for keyframe when recovering
> +     * from failure or queue overflow */
> +    int restart_with_keyframe;
> +
> +    pthread_mutex_t overflow_flag_lock;
> +    /* Value > 0 signalizes queue overflow */

signals

[...]

> +static void *fifo_consumer_thread(void *data)
> +{
> +    AVFormatContext *avf = data;
> +    FifoContext *fifo = avf->priv_data;
> +    AVThreadMessageQueue *queue = fifo->queue;
> +    FifoMessage msg;
> +    int ret;
> +
> +    FifoThreadContext fifo_thread_ctx;
> +    memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext));
> +    fifo_thread_ctx.avf = avf;
> +
> +    ret = fifo_thread_write_header(&fifo_thread_ctx);
> +    if (ret < 0) {
> +        int rec_ret = fifo_thread_recover(&fifo_thread_ctx, NULL, ret);
> +        if (rec_ret < 0) {
> +            av_thread_message_queue_set_err_send(queue, rec_ret);
> +            return NULL;
> +        }
> +    }

I think you can move this code inside the start of the loop, and eliminate 
similar code at the end of the loop. Also using NULL as msg seems 
suspicous, since you dereference msg in fifo_thread_attempt_recovery 
and in fifo_thread_recover as well.

Maybe it is better to enter the loop with a dummy message type 
WRITE_HEADER, and modify fifo_thread_attempt_recovery accordingly.

> +
> +    while (1) {
> +        uint8_t just_flushed = 0;
> +
> +        /* If the queue is full at the moment when fifo_write_packet
> +         * attempts to insert new message (packet) to the queue,
> +         * it sets the fifo->overflow_flag to 1 and drops packet.
> +         * Here in consumer thread, the flag is checked and if it is
> +         * set, the queue is flushed and flag cleared. */
> +        pthread_mutex_lock(&fifo->overflow_flag_lock);
> +        if (fifo->overflow_flag) {
> +            av_thread_message_flush(queue);
> +            if (fifo->restart_with_keyframe)
> +                fifo_thread_ctx.drop_until_keyframe = 1;
> +            fifo->overflow_flag = 0;
> +            just_flushed = 1;
> +        }
> +        pthread_mutex_unlock(&fifo->overflow_flag_lock);
> +
> +        if (just_flushed)
> +            av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
> +
> +        ret = av_thread_message_queue_recv(queue, &msg, 0);
> +        if (ret < 0) {
> +            av_thread_message_queue_set_err_send(queue, ret);
> +            break;
> +        }
> +
> +        if (!fifo_thread_ctx.recovery_nr)
> +            ret = fifo_thread_dispatch_message(&fifo_thread_ctx, &msg);
> +
> +        if (ret < 0 || fifo_thread_ctx.recovery_nr > 0) {
> +            int rec_ret = fifo_thread_recover(&fifo_thread_ctx, &msg, ret);
> +            if (rec_ret < 0) {
> +                av_thread_message_queue_set_err_send(queue, rec_ret);
> +                break;
> +            }
> +        }
> +    }
> +
> +    fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx);
> +
> +    return NULL;
> +}
> +

[...]

> +static int fifo_init(AVFormatContext *avf)
> +{
> +    FifoContext *fifo = avf->priv_data;
> +    int ret = 0;
> +
> +    if (fifo->recovery_wait_streamtime && !fifo->drop_pkts_on_overflow) {
> +        av_log(avf, AV_LOG_ERROR, "recovery_wait_streamtime can be turned on"
> +               " only when block_on_overflow is turned off\n");

" when drop_pkts_on_overflow is also turned on"

> +        return AVERROR(EINVAL);
> +    }
> +
> +    if (fifo->format_options_str) {
> +        ret = av_dict_parse_string(&fifo->format_options, fifo->format_options_str,
> +                                   "=", ":", 0);
> +        if (ret < 0) {
> +            av_log(avf, AV_LOG_ERROR, "Could not parse format options list '%s'\n",
> +                   fifo->format_options_str);
> +            return ret;
> +        }
> +    }
> +
> +    fifo->oformat = av_guess_format(fifo->format, avf->filename, NULL);
> +    if (!fifo->oformat) {
> +        ret = AVERROR_MUXER_NOT_FOUND;
> +        return ret;
> +    }
> +
> +    ret = fifo_mux_init(avf);
> +    if (ret < 0)
> +        return ret;
> +
> +    ret = av_thread_message_queue_alloc(&fifo->queue, (unsigned) fifo->queue_size,
> +                                        sizeof(FifoMessage));
> +    if (!ret)

use (ret < 0) instead.

> +        av_thread_message_queue_set_free_func(fifo->queue, free_message);
> +
> +    ret = pthread_mutex_init(&fifo->overflow_flag_lock, NULL);
> +    if (ret < 0)
> +        return AVERROR(ret);
> +
> +    return 0;
> +}
> +

[...]

> +static void fifo_deinit(AVFormatContext *avf)
> +{
> +    FifoContext *fifo = avf->priv_data;
> +
> +    if (fifo->format_options)
> +        av_dict_free(&fifo->format_options);
> +
> +    if (avf)

is this supposed to be if (fifo->avf) ?

> +        avformat_free_context(fifo->avf);
> +
> +    if (fifo->queue) {
> +        av_thread_message_flush(fifo->queue);
> +        av_thread_message_queue_free(&fifo->queue);
> +    }
> +
> +    pthread_mutex_destroy(&fifo->overflow_flag_lock);
> +}
> +

[...]

Thanks for your work, and sorry for the delay in the review.

Regards,
Marton
sebechlebskyjan@gmail.com Aug. 9, 2016, 11:22 a.m. UTC | #2
On 08/08/2016 11:55 PM, Marton Balint wrote:

>
> Thanks for your work, and sorry for the delay in the review.
>
> Regards,
> Marton
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel@ffmpeg.org
> http://ffmpeg.org/mailman/listinfo/ffmpeg-devel
Thanks for review! Hopefully I've fixed all the mentioned issues.
I'm resending the patch and also affected patches.

Regards,
Jan
diff mbox

Patch

diff --git a/Changelog b/Changelog
index 0f9b4cf..3f858f1 100644
--- a/Changelog
+++ b/Changelog
@@ -12,6 +12,7 @@  version <next>:
 - 16-bit support in selectivecolor filter
 - OpenH264 decoder wrapper
 - MediaCodec hwaccel
+- fifo muxer
 
 
 version 3.1:
diff --git a/configure b/configure
index 9f5b31f..4651f5f 100755
--- a/configure
+++ b/configure
@@ -2834,6 +2834,7 @@  dv_muxer_select="dvprofile"
 dxa_demuxer_select="riffdec"
 eac3_demuxer_select="ac3_parser"
 f4v_muxer_select="mov_muxer"
+fifo_muxer_deps="pthreads"
 flac_demuxer_select="flac_parser"
 hds_muxer_select="flv_muxer"
 hls_muxer_select="mpegts_muxer"
diff --git a/doc/muxers.texi b/doc/muxers.texi
index 5873269..e2bc290 100644
--- a/doc/muxers.texi
+++ b/doc/muxers.texi
@@ -1436,6 +1436,96 @@  Specify whether to remove all fragments when finished. Default 0 (do not remove)
 
 @end table
 
+@section fifo
+
+The fifo pseudo-muxer allows to separate encoding from any other muxer by using
+first-in-first-out queue and running the actual muxer in a separate thread. This
+is especially useful in combination with the @ref{tee} muxer and output to
+several destinations with different reliability/writing speed/latency.
+
+The behavior of fifo muxer in case of failure can be configured:
+@itemize @bullet
+
+@item
+output can be transparently restarted with configurable delay between retries
+based on real time or time of the processed stream.
+
+@item
+encoding can be blocked during temporary failure, or continue transparently
+dropping packets in case fifo queue fills up.
+
+@end itemize
+
+@table @option
+
+@item fifo_format
+Specify the format name. Useful if it cannot be guessed from the
+output name suffix.
+
+@item queue_size
+Specify size of the queue (number of packets). Default value is 60.
+
+@item format_opts
+Specify format options for the underlying muxer. Muxer options can be specified
+as a list of @var{key}=@var{value} pairs separated by ':'.
+
+@item drop_pkts_on_overflow @var{bool}
+If set to 1 (true), in case the fifo queue fills up, packets will be dropped
+rather than blocking the encoder. This allows to continue streaming without
+delaying the output, at the cost of ommiting part of the stream. By default
+this option is set to 0 (false), so in such cases the encoder will be blocked
+until the muxer processes some of the packets and none of them is lost.
+
+@item attempt_recovery @var{bool}
+If failure occurs, attempt to recover the output. This is especially useful
+when used with network output, allows to restart streaming transparently.
+By default this option set to 0 (false).
+
+@item max_recovery_attempts
+Sets maximum number of successive unsucessful recovery attempts after which
+the output fails permanently. Unlimited if set to zero. Default value is 16.
+
+@item recovery_wait_time @var{duration}
+Waiting time before the next recovery attempt after previous unsuccessfull
+recovery attempt. Default value is 5 seconds.
+
+s@item recovery_wait_streamtime @var{bool}
+If set to 0 (false), the real time is used when waiting for the recovery
+attempt (i.e. the recovery will be attempted after at least
+recovery_wait_time seconds).
+If set to 1 (true), the time of the processed stream is taken into account
+instead (i.e. the recovery will be attempted after at least recovery_wait_time
+seconds of the stream is omitted).
+By default, this option is set to 0 (false).
+
+@item recover_any_error @var{bool}
+If set to 1 (true), recovery will be attempted regardless of type of the error
+causing the failure. By default this option is set to 0 (false) and in case of
+certain errors the recovery is not attempted even when @ref{attempt_recovery}
+is set to 1.
+
+@item restart_with_keyframe @var{bool}
+Specify whether to wait for the keyframe after recovering from
+queue overflow or failure. This option is set to 0 (false) by default.
+
+@end table
+
+@subsection Examples
+
+@itemize
+
+@item
+Stream something to rtmp server, continue processing the stream at real-time
+rate even in case of temporary failure (network outage) and attempt to recover
+streaming every second indefinitely.
+@example
+ffmpeg -re -i ... -c:v libx264 -c:a mp2 -f fifo -fifo_format flv -map 0:v -map 0:a
+  -block_on_overflow 0 -attempt_recovery 1 -recovery_wait_time 1
+  -max_recovery_attempts 0 rtmp://example.com/live/stream_name
+@end example
+
+@end itemize
+
 @section tee
 
 The tee muxer can be used to write the same data to several files or any
diff --git a/libavformat/Makefile b/libavformat/Makefile
index e2cb474..3c5e509 100644
--- a/libavformat/Makefile
+++ b/libavformat/Makefile
@@ -162,6 +162,7 @@  OBJS-$(CONFIG_FFM_DEMUXER)               += ffmdec.o
 OBJS-$(CONFIG_FFM_MUXER)                 += ffmenc.o
 OBJS-$(CONFIG_FFMETADATA_DEMUXER)        += ffmetadec.o
 OBJS-$(CONFIG_FFMETADATA_MUXER)          += ffmetaenc.o
+OBJS-$(CONFIG_FIFO_MUXER)                += fifo.o
 OBJS-$(CONFIG_FILMSTRIP_DEMUXER)         += filmstripdec.o
 OBJS-$(CONFIG_FILMSTRIP_MUXER)           += filmstripenc.o
 OBJS-$(CONFIG_FLAC_DEMUXER)              += flacdec.o rawdec.o \
diff --git a/libavformat/allformats.c b/libavformat/allformats.c
index 10c9bcc..fd93dd3 100644
--- a/libavformat/allformats.c
+++ b/libavformat/allformats.c
@@ -123,6 +123,7 @@  void av_register_all(void)
     REGISTER_MUXER   (F4V,              f4v);
     REGISTER_MUXDEMUX(FFM,              ffm);
     REGISTER_MUXDEMUX(FFMETADATA,       ffmetadata);
+    REGISTER_MUXER   (FIFO,             fifo);
     REGISTER_MUXDEMUX(FILMSTRIP,        filmstrip);
     REGISTER_MUXDEMUX(FLAC,             flac);
     REGISTER_DEMUXER (FLIC,             flic);
diff --git a/libavformat/fifo.c b/libavformat/fifo.c
new file mode 100644
index 0000000..bd9d934
--- /dev/null
+++ b/libavformat/fifo.c
@@ -0,0 +1,674 @@ 
+/*
+ * FIFO pseudo-muxer
+ * Copyright (c) 2016 Jan Sebechlebsky
+ *
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with FFmpeg; if not, write to the Free Software * Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include "libavutil/opt.h"
+#include "libavutil/time.h"
+#include "libavutil/threadmessage.h"
+#include "avformat.h"
+#include "internal.h"
+#include "pthread.h"
+
+#define FIFO_DEFAULT_QUEUE_SIZE              60
+#define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS   16
+#define FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC 5000000 // 5 second
+
+typedef struct FifoContext {
+    const AVClass *class;
+    AVFormatContext *avf;
+
+    char *format;
+    AVOutputFormat *oformat;
+
+    char *format_options_str;
+    AVDictionary *format_options;
+
+    int queue_size;
+    AVThreadMessageQueue *queue;
+    pthread_t writer_thread;
+
+    /* Return value of last write_trailer_call */
+    int write_trailer_ret;
+
+    /* Time to wait before next recovery attempt
+     * This can refer to the time in processed stream,
+     * or real time. */
+    int64_t recovery_wait_time;
+
+    /* Maximal number of unsuccessful successive recovery attempts */
+    int max_recovery_attempts;
+
+    /* Whether to attempt recovery from failure */
+    int attempt_recovery;
+
+    /* If >0 stream time will be used when waiting
+     * for the recovery attempt instead of real time */
+    int recovery_wait_streamtime;
+
+    /* If >0 recovery will be attempted regardless of error code
+     * (except AVERROR_EXIT, so exit request is never ignored) */
+    int recover_any_error;
+
+    /* Whether to drop packets in case the queue is full. */
+    int drop_pkts_on_overflow;
+
+    /* Whether to wait for keyframe when recovering
+     * from failure or queue overflow */
+    int restart_with_keyframe;
+
+    pthread_mutex_t overflow_flag_lock;
+    /* Value > 0 signalizes queue overflow */
+    volatile uint8_t overflow_flag;
+
+} FifoContext;
+
+typedef struct FifoThreadContext {
+    AVFormatContext *avf;
+
+    /* Timestamp of last failure.
+     * This is either pts in case stream time is used,
+     * or microseconds as returned by av_getttime_relative() */
+    int64_t last_recovery_ts;
+
+    /* Number of current recovery process
+     * Value > 0 means we are in recovery process */
+    int recovery_nr;
+
+    /* If > 0 all frames will be dropped until keyframe is received */
+    uint8_t drop_until_keyframe;
+
+    /* Value > 0 means that the previous write_header call was successful
+     * so finalization by calling write_trailer and ff_io_close must be done
+     * before exiting / reinitialization of underlying muxer */
+    uint8_t header_written;
+} FifoThreadContext;
+
+typedef enum FifoMessageType {
+    FIFO_WRITE_PACKET,
+    FIFO_FLUSH_OUTPUT
+} FifoMessageType;
+
+typedef struct FifoMessage {
+    FifoMessageType type;
+    AVPacket pkt;
+} FifoMessage;
+
+static int fifo_thread_write_header(FifoThreadContext *ctx)
+{
+    AVFormatContext *avf = ctx->avf;
+    FifoContext *fifo = avf->priv_data;
+    AVFormatContext *avf2 = fifo->avf;
+    AVDictionary *format_options = NULL;
+    int ret, i;
+
+    ret = av_dict_copy(&format_options, fifo->format_options, 0);
+    if (ret < 0)
+        return ret;
+
+    ret = ff_format_output_open(avf2, avf->filename, &format_options);
+    if (ret < 0) {
+        av_log(avf, AV_LOG_ERROR, "Error opening %s: %s\n", avf->filename,
+               av_err2str(ret));
+        goto end;
+    }
+
+    for (i = 0;i < avf2->nb_streams; i++)
+        avf2->streams[i]->cur_dts = 0;
+
+    ret = avformat_write_header(avf2, &format_options);
+    if (!ret)
+        ctx->header_written = 1;
+
+    // Check for options unrecognized by underlying muxer
+    if (format_options) {
+        AVDictionaryEntry *entry = NULL;
+        while ((entry = av_dict_get(format_options, "", entry, AV_DICT_IGNORE_SUFFIX)))
+            av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key);
+        ret = AVERROR(EINVAL);
+    }
+
+end:
+    av_dict_free(&format_options);
+    return ret;
+}
+
+static int fifo_thread_flush_output(FifoThreadContext *ctx)
+{
+    AVFormatContext *avf = ctx->avf;
+    FifoContext *fifo = avf->priv_data;
+    AVFormatContext *avf2 = fifo->avf;
+
+    return av_write_frame(avf2, NULL);
+}
+
+static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt)
+{
+    AVFormatContext *avf = ctx->avf;
+    FifoContext *fifo = avf->priv_data;
+    AVFormatContext *avf2 = fifo->avf;
+    AVRational src_tb, dst_tb;
+    int ret, s_idx;
+
+    if (ctx->drop_until_keyframe) {
+        if (pkt->flags & AV_PKT_FLAG_KEY) {
+            ctx->drop_until_keyframe = 0;
+            av_log(avf, AV_LOG_VERBOSE, "Keyframe received, recovering...\n");
+        } else {
+            av_log(avf, AV_LOG_VERBOSE, "Dropping non-keyframe packet\n");
+            av_packet_unref(pkt);
+            return 0;
+        }
+    }
+
+    s_idx = pkt->stream_index;
+    src_tb = avf->streams[s_idx]->time_base;
+    dst_tb = avf2->streams[s_idx]->time_base;
+    av_packet_rescale_ts(pkt, src_tb, dst_tb);
+
+    ret = av_write_frame(avf2, pkt);
+    if (ret >= 0)
+        av_packet_unref(pkt);
+    return ret;
+}
+
+static int fifo_thread_write_trailer(FifoThreadContext *ctx)
+{
+    AVFormatContext *avf = ctx->avf;
+    FifoContext *fifo = avf->priv_data;
+    AVFormatContext *avf2 = fifo->avf;
+    int ret;
+
+    if (!ctx->header_written)
+        return 0;
+
+    ret = av_write_trailer(avf2);
+    ff_format_io_close(avf2, &avf2->pb);
+
+    return ret;
+}
+
+static int fifo_thread_dispatch_message(FifoThreadContext *ctx, FifoMessage *msg)
+{
+    int ret;
+
+    if (!ctx->header_written) {
+        ret = fifo_thread_write_header(ctx);
+        if (ret < 0)
+            return ret;
+    }
+
+    if (msg->type == FIFO_WRITE_PACKET) {
+        return fifo_thread_write_packet(ctx, &msg->pkt);
+    } else if (msg->type == FIFO_FLUSH_OUTPUT) {
+        return fifo_thread_flush_output(ctx);
+    }
+
+    return AVERROR(EINVAL);
+}
+
+static int is_recoverable(const FifoContext *fifo, int err_no) {
+    if (!fifo->attempt_recovery)
+        return 0;
+
+    if (fifo->recover_any_error)
+        return err_no != AVERROR_EXIT;
+
+    switch (err_no) {
+    case AVERROR(EINVAL):
+    case AVERROR(ENOSYS):
+    case AVERROR_EOF:
+    case AVERROR_EXIT:
+    case AVERROR_PATCHWELCOME:
+        return 0;
+    default:
+        return 1;
+    }
+}
+
+static void free_message(void *msg)
+{
+    FifoMessage *fifo_msg = msg;
+
+    if (!fifo_msg)
+        return;
+
+    if (fifo_msg->type == FIFO_WRITE_PACKET)
+        av_packet_unref(&fifo_msg->pkt);
+}
+
+static int fifo_thread_process_recovery_failure(FifoThreadContext *ctx, AVPacket *pkt,
+                                                int err_no)
+{
+    AVFormatContext *avf = ctx->avf;
+    FifoContext *fifo = avf->priv_data;
+    int ret;
+
+    av_log(avf, AV_LOG_INFO, "Recovery failed: %s\n",
+           av_err2str(err_no));
+
+    if (fifo->recovery_wait_streamtime) {
+        if (pkt->pts == AV_NOPTS_VALUE)
+            av_log(avf, AV_LOG_WARNING, "Packet does not contain presentation"
+                   " timestamp, recovery will be attempted immediately");
+        ctx->last_recovery_ts = pkt->pts;
+    } else {
+        ctx->last_recovery_ts = av_gettime_relative();
+    }
+
+    if (fifo->max_recovery_attempts &&
+        ctx->recovery_nr >= fifo->max_recovery_attempts) {
+        av_log(avf, AV_LOG_ERROR,
+               "Maximal number of %d recovery attempts reached.\n",
+               fifo->max_recovery_attempts);
+        ret = err_no;
+    } else {
+        ret = AVERROR(EAGAIN);
+    }
+
+    return ret;
+}
+
+static int fifo_thread_attempt_recovery(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
+{
+    AVFormatContext *avf = ctx->avf;
+    FifoContext *fifo = avf->priv_data;
+    AVPacket *pkt = &msg->pkt;
+    int64_t time_since_recovery;
+    int ret;
+
+    if (!is_recoverable(fifo, err_no)) {
+        ret = err_no;
+        goto fail;
+    }
+
+    if (ctx->header_written) {
+        fifo->write_trailer_ret = fifo_thread_write_trailer(ctx);
+        ctx->header_written = 0;
+    }
+
+    if (!ctx->recovery_nr) {
+        ctx->last_recovery_ts = 0;
+    } else {
+        if (fifo->recovery_wait_streamtime) {
+            AVRational tb = avf->streams[pkt->stream_index]->time_base;
+            time_since_recovery = av_rescale_q(pkt->pts - ctx->last_recovery_ts,
+                                               tb, AV_TIME_BASE_Q);
+        } else {
+            time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
+        }
+
+        if (time_since_recovery < fifo->recovery_wait_time)
+            return AVERROR(EAGAIN);
+    }
+
+    ctx->recovery_nr++;
+
+    if (fifo->max_recovery_attempts) {
+        av_log(avf, AV_LOG_INFO, "Recovery attempt #%d/%d\n",
+               ctx->recovery_nr, fifo->max_recovery_attempts);
+    } else {
+        av_log(avf, AV_LOG_INFO, "Recovery attempt #%d\n",
+               ctx->recovery_nr);
+    }
+
+    if (fifo->restart_with_keyframe && fifo->drop_pkts_on_overflow)
+        ctx->drop_until_keyframe = 1;
+
+    if (msg) {
+        ret = fifo_thread_dispatch_message(ctx, msg);
+    } else {
+        ret = fifo_thread_write_header(ctx);
+    }
+    if (ret < 0) {
+        if (is_recoverable(fifo, ret)) {
+            return fifo_thread_process_recovery_failure(ctx, pkt, ret);
+        } else {
+            goto fail;
+        }
+    } else {
+        av_log(avf, AV_LOG_INFO, "Recovery successful\n");
+        ctx->recovery_nr = 0;
+    }
+
+    return 0;
+
+fail:
+    free_message(msg);
+    return ret;
+}
+
+static int fifo_thread_recover(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
+{
+    AVFormatContext *avf = ctx->avf;
+    FifoContext *fifo = avf->priv_data;
+    int ret;
+
+    do {
+        if (!fifo->recovery_wait_streamtime && ctx->recovery_nr > 0) {
+            int64_t time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
+            int64_t time_to_wait = FFMAX(0, fifo->recovery_wait_time - time_since_recovery);
+            if (time_to_wait)
+                av_usleep(FFMIN(10000, time_to_wait));
+        }
+
+        ret = fifo_thread_attempt_recovery(ctx, msg, err_no);
+    } while (ret == AVERROR(EAGAIN) && !fifo->drop_pkts_on_overflow);
+
+    if (ret == AVERROR(EAGAIN) && fifo->drop_pkts_on_overflow) {
+        if (msg->type == FIFO_WRITE_PACKET)
+            av_packet_unref(&msg->pkt);
+        ret = 0;
+    }
+
+    return ret;
+}
+
+static void *fifo_consumer_thread(void *data)
+{
+    AVFormatContext *avf = data;
+    FifoContext *fifo = avf->priv_data;
+    AVThreadMessageQueue *queue = fifo->queue;
+    FifoMessage msg;
+    int ret;
+
+    FifoThreadContext fifo_thread_ctx;
+    memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext));
+    fifo_thread_ctx.avf = avf;
+
+    ret = fifo_thread_write_header(&fifo_thread_ctx);
+    if (ret < 0) {
+        int rec_ret = fifo_thread_recover(&fifo_thread_ctx, NULL, ret);
+        if (rec_ret < 0) {
+            av_thread_message_queue_set_err_send(queue, rec_ret);
+            return NULL;
+        }
+    }
+
+    while (1) {
+        uint8_t just_flushed = 0;
+
+        /* If the queue is full at the moment when fifo_write_packet
+         * attempts to insert new message (packet) to the queue,
+         * it sets the fifo->overflow_flag to 1 and drops packet.
+         * Here in consumer thread, the flag is checked and if it is
+         * set, the queue is flushed and flag cleared. */
+        pthread_mutex_lock(&fifo->overflow_flag_lock);
+        if (fifo->overflow_flag) {
+            av_thread_message_flush(queue);
+            if (fifo->restart_with_keyframe)
+                fifo_thread_ctx.drop_until_keyframe = 1;
+            fifo->overflow_flag = 0;
+            just_flushed = 1;
+        }
+        pthread_mutex_unlock(&fifo->overflow_flag_lock);
+
+        if (just_flushed)
+            av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
+
+        ret = av_thread_message_queue_recv(queue, &msg, 0);
+        if (ret < 0) {
+            av_thread_message_queue_set_err_send(queue, ret);
+            break;
+        }
+
+        if (!fifo_thread_ctx.recovery_nr)
+            ret = fifo_thread_dispatch_message(&fifo_thread_ctx, &msg);
+
+        if (ret < 0 || fifo_thread_ctx.recovery_nr > 0) {
+            int rec_ret = fifo_thread_recover(&fifo_thread_ctx, &msg, ret);
+            if (rec_ret < 0) {
+                av_thread_message_queue_set_err_send(queue, rec_ret);
+                break;
+            }
+        }
+    }
+
+    fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx);
+
+    return NULL;
+}
+
+static int fifo_mux_init(AVFormatContext *avf)
+{
+    FifoContext *fifo = avf->priv_data;
+    AVFormatContext *avf2;
+    int ret = 0, i;
+
+    ret = avformat_alloc_output_context2(&avf2, fifo->oformat, NULL, NULL);
+    if (ret < 0) {
+        return ret;
+    }
+
+    fifo->avf = avf2;
+
+    avf2->interrupt_callback = avf->interrupt_callback;
+    avf2->max_delay = avf->max_delay;
+    ret = av_dict_copy(&avf2->metadata, avf->metadata, 0);
+    if (ret < 0)
+        return ret;
+    avf2->opaque = avf->opaque;
+    avf2->io_close = avf->io_close;
+    avf2->io_open = avf->io_open;
+    avf2->flags = avf->flags;
+
+    for (i = 0; i < avf->nb_streams; ++i) {
+        AVStream *st = avformat_new_stream(avf2, NULL);
+        if (!st)
+            return AVERROR(ENOMEM);
+
+        ret = ff_stream_encode_params_copy(st, avf->streams[i]);
+        if (ret < 0)
+            return ret;
+    }
+
+    return 0;
+}
+
+static int fifo_init(AVFormatContext *avf)
+{
+    FifoContext *fifo = avf->priv_data;
+    int ret = 0;
+
+    if (fifo->recovery_wait_streamtime && !fifo->drop_pkts_on_overflow) {
+        av_log(avf, AV_LOG_ERROR, "recovery_wait_streamtime can be turned on"
+               " only when block_on_overflow is turned off\n");
+        return AVERROR(EINVAL);
+    }
+
+    if (fifo->format_options_str) {
+        ret = av_dict_parse_string(&fifo->format_options, fifo->format_options_str,
+                                   "=", ":", 0);
+        if (ret < 0) {
+            av_log(avf, AV_LOG_ERROR, "Could not parse format options list '%s'\n",
+                   fifo->format_options_str);
+            return ret;
+        }
+    }
+
+    fifo->oformat = av_guess_format(fifo->format, avf->filename, NULL);
+    if (!fifo->oformat) {
+        ret = AVERROR_MUXER_NOT_FOUND;
+        return ret;
+    }
+
+    ret = fifo_mux_init(avf);
+    if (ret < 0)
+        return ret;
+
+    ret = av_thread_message_queue_alloc(&fifo->queue, (unsigned) fifo->queue_size,
+                                        sizeof(FifoMessage));
+    if (!ret)
+        av_thread_message_queue_set_free_func(fifo->queue, free_message);
+
+    ret = pthread_mutex_init(&fifo->overflow_flag_lock, NULL);
+    if (ret < 0)
+        return AVERROR(ret);
+
+    return 0;
+}
+
+static int fifo_write_header(AVFormatContext *avf)
+{
+    FifoContext * fifo = avf->priv_data;
+    int ret;
+
+    ret = pthread_create(&fifo->writer_thread, NULL, fifo_consumer_thread, avf);
+    if (ret) {
+        av_log(avf, AV_LOG_ERROR, "Failed to start thread: %s\n",
+               av_err2str(AVERROR(ret)));
+        ret = AVERROR(ret);
+    }
+
+    return 0;
+}
+
+static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
+{
+    FifoContext *fifo = avf->priv_data;
+    FifoMessage msg = {.type = pkt ? FIFO_WRITE_PACKET : FIFO_FLUSH_OUTPUT};
+    int ret;
+
+    if (pkt) {
+        av_init_packet(&msg.pkt);
+        ret = av_packet_ref(&msg.pkt,pkt);
+        if (ret < 0)
+            return ret;
+    }
+
+    ret = av_thread_message_queue_send(fifo->queue, &msg,
+                                       fifo->drop_pkts_on_overflow ?
+                                       AV_THREAD_MESSAGE_NONBLOCK : 0);
+    if (ret == AVERROR(EAGAIN)) {
+        uint8_t overflow_set = 0;
+
+        /* Queue is full, set fifo->overflow_flag to 1
+         * to let consumer thread know the queue should
+         * be flushed. */
+        pthread_mutex_lock(&fifo->overflow_flag_lock);
+        if (!fifo->overflow_flag)
+            fifo->overflow_flag = overflow_set = 1;
+        pthread_mutex_unlock(&fifo->overflow_flag_lock);
+
+        if (overflow_set)
+            av_log(avf, AV_LOG_WARNING, "FIFO queue full\n");
+        ret = 0;
+        goto fail;
+    } else if (ret < 0) {
+        goto fail;
+    }
+
+    return ret;
+fail:
+    if (pkt)
+        av_packet_unref(&msg.pkt);
+    return ret;
+}
+
+static int fifo_write_trailer(AVFormatContext *avf)
+{
+    FifoContext *fifo= avf->priv_data;
+    int ret;
+
+    av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF);
+
+    ret = pthread_join( fifo->writer_thread, NULL);
+    if (ret < 0) {
+        av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
+               av_err2str(AVERROR(ret)));
+        return AVERROR(ret);
+    }
+
+    ret = fifo->write_trailer_ret;
+    return ret;
+}
+
+static void fifo_deinit(AVFormatContext *avf)
+{
+    FifoContext *fifo = avf->priv_data;
+
+    if (fifo->format_options)
+        av_dict_free(&fifo->format_options);
+
+    if (avf)
+        avformat_free_context(fifo->avf);
+
+    if (fifo->queue) {
+        av_thread_message_flush(fifo->queue);
+        av_thread_message_queue_free(&fifo->queue);
+    }
+
+    pthread_mutex_destroy(&fifo->overflow_flag_lock);
+}
+
+#define OFFSET(x) offsetof(FifoContext, x)
+static const AVOption options[] = {
+        {"fifo_format", "Target muxer", OFFSET(format),
+         AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
+
+        {"queue_size", "Size of fifo queue", OFFSET(queue_size),
+         AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_QUEUE_SIZE}, 1, 1024, AV_OPT_FLAG_ENCODING_PARAM},
+
+        {"format_opts", "Options to be passed to underlying muxer", OFFSET(format_options_str),
+         AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
+
+        {"drop_pkts_on_overflow", "Drop packets on fifo queue overflow not to block encoder", OFFSET(drop_pkts_on_overflow),
+         AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
+
+        {"restart_with_keyframe", "Wait for keyframe when restarting output", OFFSET(restart_with_keyframe),
+         AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
+
+        {"attempt_recovery", "Attempt recovery in case of failure", OFFSET(attempt_recovery),
+        AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
+
+        {"max_recovery_attempts", "Maximal number of recovery attempts", OFFSET(max_recovery_attempts),
+         AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS}, 0, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
+
+        {"recovery_wait_time", "Waiting time between recovery attempts", OFFSET(recovery_wait_time),
+         AV_OPT_TYPE_DURATION, {.i64 = FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
+
+        {"recovery_wait_streamtime", "Use stream time instead of real time while waiting for recovery",
+         OFFSET(recovery_wait_streamtime), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
+
+        {"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error),
+         AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
+
+        {NULL},
+};
+
+static const AVClass fifo_muxer_class = {
+    .class_name = "Fifo muxer",
+    .item_name  = av_default_item_name,
+    .option     = options,
+    .version    = LIBAVUTIL_VERSION_INT,
+};
+
+AVOutputFormat ff_fifo_muxer = {
+    .name           = "fifo",
+    .long_name      = NULL_IF_CONFIG_SMALL("FIFO queue pseudo-muxer"),
+    .priv_data_size = sizeof(FifoContext),
+    .init           = fifo_init,
+    .write_header   = fifo_write_header,
+    .write_packet   = fifo_write_packet,
+    .write_trailer  = fifo_write_trailer,
+    .deinit         = fifo_deinit,
+    .priv_class     = &fifo_muxer_class,
+    .flags          = AVFMT_NOFILE | AVFMT_ALLOW_FLUSH | AVFMT_TS_NEGATIVE,
+};
+
diff --git a/libavformat/version.h b/libavformat/version.h
index 5630808..8899bfd 100644
--- a/libavformat/version.h
+++ b/libavformat/version.h
@@ -32,7 +32,7 @@ 
 // Major bumping may affect Ticket5467, 5421, 5451(compatibility with Chromium)
 // Also please add any ticket numbers that you belive might be affected here
 #define LIBAVFORMAT_VERSION_MAJOR  57
-#define LIBAVFORMAT_VERSION_MINOR  44
+#define LIBAVFORMAT_VERSION_MINOR  45
 #define LIBAVFORMAT_VERSION_MICRO 100
 
 #define LIBAVFORMAT_VERSION_INT AV_VERSION_INT(LIBAVFORMAT_VERSION_MAJOR, \