diff mbox

[FFmpeg-devel,v8,01/11] avformat: Add fifo pseudo-muxer

Message ID 1471209308-25838-1-git-send-email-sebechlebskyjan@gmail.com
State Superseded
Headers show

Commit Message

sebechlebskyjan@gmail.com Aug. 14, 2016, 9:15 p.m. UTC
From: Jan Sebechlebsky <sebechlebskyjan@gmail.com>

Signed-off-by: Jan Sebechlebsky <sebechlebskyjan@gmail.com>
---
 Changes since the last version of the patch:
 - Fixed documentation (apart from the Marton's suggestions I've
   also changed example, since it used block_on_overflow option
   from the earlier version of patch)
 - Changed max_recovery_attempts default value to 0 (unlimited)
 - Removed unnecessary check for null ptr in free_message
 - Changed log level when loggin recovery attempt to AV_LOG_VERBOSE
 
 Changelog                |   1 +
 configure                |   1 +
 doc/muxers.texi          |  92 +++++++
 libavformat/Makefile     |   1 +
 libavformat/allformats.c |   1 +
 libavformat/fifo.c       | 662 +++++++++++++++++++++++++++++++++++++++++++++++
 libavformat/version.h    |   2 +-
 7 files changed, 759 insertions(+), 1 deletion(-)
 create mode 100644 libavformat/fifo.c

Comments

Nicolas George Aug. 15, 2016, 9:50 p.m. UTC | #1
L'octidi 28 thermidor, an CCXXIV, sebechlebskyjan@gmail.com a écrit :
> From: Jan Sebechlebsky <sebechlebskyjan@gmail.com>
> 
> Signed-off-by: Jan Sebechlebsky <sebechlebskyjan@gmail.com>
> ---
>  Changes since the last version of the patch:
>  - Fixed documentation (apart from the Marton's suggestions I've
>    also changed example, since it used block_on_overflow option
>    from the earlier version of patch)
>  - Changed max_recovery_attempts default value to 0 (unlimited)
>  - Removed unnecessary check for null ptr in free_message
>  - Changed log level when loggin recovery attempt to AV_LOG_VERBOSE
>  
>  Changelog                |   1 +
>  configure                |   1 +
>  doc/muxers.texi          |  92 +++++++
>  libavformat/Makefile     |   1 +
>  libavformat/allformats.c |   1 +
>  libavformat/fifo.c       | 662 +++++++++++++++++++++++++++++++++++++++++++++++
>  libavformat/version.h    |   2 +-
>  7 files changed, 759 insertions(+), 1 deletion(-)
>  create mode 100644 libavformat/fifo.c
> 
> diff --git a/Changelog b/Changelog
> index b903e31..c693d34 100644
> --- a/Changelog
> +++ b/Changelog
> @@ -15,6 +15,7 @@ version <next>:
>  - True Audio (TTA) muxer
>  - crystalizer audio filter
>  - acrusher audio filter
> +- fifo muxer
>  
>  
>  version 3.1:
> diff --git a/configure b/configure
> index bff8159..a252354 100755
> --- a/configure
> +++ b/configure
> @@ -2834,6 +2834,7 @@ dv_muxer_select="dvprofile"
>  dxa_demuxer_select="riffdec"
>  eac3_demuxer_select="ac3_parser"
>  f4v_muxer_select="mov_muxer"
> +fifo_muxer_deps="pthreads"
>  flac_demuxer_select="flac_parser"
>  hds_muxer_select="flv_muxer"
>  hls_muxer_select="mpegts_muxer"
> diff --git a/doc/muxers.texi b/doc/muxers.texi
> index 5873269..b4c3886 100644
> --- a/doc/muxers.texi
> +++ b/doc/muxers.texi
> @@ -1436,6 +1436,98 @@ Specify whether to remove all fragments when finished. Default 0 (do not remove)
>  
>  @end table
>  
> +@section fifo
> +
> +The fifo pseudo-muxer allows the separation of encoding and muxing by using
> +first-in-first-out queue and running the actual muxer in a separate thread. This
> +is especially useful in combination with the @ref{tee} muxer and can be used to
> +send data to several destinations with different reliability/writing speed/latency.
> +
> +The behavior of the fifo muxer if the queue fills up or if the output fails is
> +selectable,
> +
> +@itemize @bullet
> +
> +@item
> +output can be transparently restarted with configurable delay between retries
> +based on real time or time of the processed stream.
> +
> +@item
> +encoding can be blocked during temporary failure, or continue transparently
> +dropping packets in case fifo queue fills up.
> +
> +@end itemize
> +
> +@table @option
> +
> +@item fifo_format
> +Specify the format name. Useful if it cannot be guessed from the
> +output name suffix.
> +
> +@item queue_size
> +Specify size of the queue (number of packets). Default value is 60.
> +
> +@item format_opts
> +Specify format options for the underlying muxer. Muxer options can be specified
> +as a list of @var{key}=@var{value} pairs separated by ':'.
> +
> +@item drop_pkts_on_overflow @var{bool}
> +If set to 1 (true), in case the fifo queue fills up, packets will be dropped
> +rather than blocking the encoder. This allows to continue streaming without
> +delaying the input, at the cost of ommiting part of the stream. By default
> +this option is set to 0 (false), so in such cases the encoder will be blocked
> +until the muxer processes some of the packets and none of them is lost.
> +
> +@item attempt_recovery @var{bool}
> +If failure occurs, attempt to recover the output. This is especially useful
> +when used with network output, allows to restart streaming transparently.
> +By default this option set to 0 (false).
> +
> +@item max_recovery_attempts
> +Sets maximum number of successive unsucessful recovery attempts after which
> +the output fails permanently. By default this option is set to 0 (unlimited).
> +
> +@item recovery_wait_time @var{duration}
> +Waiting time before the next recovery attempt after previous unsuccessfull
> +recovery attempt. Default value is 5 seconds.
> +

> +s@item recovery_wait_streamtime @var{bool}
   ^
Strange.

> +If set to 0 (false), the real time is used when waiting for the recovery
> +attempt (i.e. the recovery will be attempted after at least
> +recovery_wait_time seconds).
> +If set to 1 (true), the time of the processed stream is taken into account
> +instead (i.e. the recovery will be attempted after at least @var{recovery_wait_time}
> +seconds of the stream is omitted).
> +By default, this option is set to 0 (false).
> +
> +@item recover_any_error @var{bool}
> +If set to 1 (true), recovery will be attempted regardless of type of the error
> +causing the failure. By default this option is set to 0 (false) and in case of
> +certain (usually permanent) errors the recovery is not attempted even when
> +@var{attempt_recovery} is set to 1.
> +
> +@item restart_with_keyframe @var{bool}
> +Specify whether to wait for the keyframe after recovering from
> +queue overflow or failure. This option is set to 0 (false) by default.
> +
> +@end table
> +
> +@subsection Examples
> +
> +@itemize
> +
> +@item
> +Stream something to rtmp server, continue processing the stream at real-time
> +rate even in case of temporary failure (network outage) and attempt to recover
> +streaming every second indefinitely.
> +@example
> +ffmpeg -re -i ... -c:v libx264 -c:a aac -f fifo -fifo_format flv -map 0:v -map 0:a
> +  -drop_pkts_on_overflow 1 -attempt_recovery 1 -recovery_wait_time 1 rtmp://example.com/live/stream_name
> +@end example
> +
> +@end itemize
> +
> +@anchor{tee}
>  @section tee
>  
>  The tee muxer can be used to write the same data to several files or any
> diff --git a/libavformat/Makefile b/libavformat/Makefile
> index fda1e17..2d2b78c 100644
> --- a/libavformat/Makefile
> +++ b/libavformat/Makefile
> @@ -162,6 +162,7 @@ OBJS-$(CONFIG_FFM_DEMUXER)               += ffmdec.o
>  OBJS-$(CONFIG_FFM_MUXER)                 += ffmenc.o
>  OBJS-$(CONFIG_FFMETADATA_DEMUXER)        += ffmetadec.o
>  OBJS-$(CONFIG_FFMETADATA_MUXER)          += ffmetaenc.o
> +OBJS-$(CONFIG_FIFO_MUXER)                += fifo.o
>  OBJS-$(CONFIG_FILMSTRIP_DEMUXER)         += filmstripdec.o
>  OBJS-$(CONFIG_FILMSTRIP_MUXER)           += filmstripenc.o
>  OBJS-$(CONFIG_FLAC_DEMUXER)              += flacdec.o rawdec.o \
> diff --git a/libavformat/allformats.c b/libavformat/allformats.c
> index a69195e..6a216ef 100644
> --- a/libavformat/allformats.c
> +++ b/libavformat/allformats.c
> @@ -123,6 +123,7 @@ void av_register_all(void)
>      REGISTER_MUXER   (F4V,              f4v);
>      REGISTER_MUXDEMUX(FFM,              ffm);
>      REGISTER_MUXDEMUX(FFMETADATA,       ffmetadata);
> +    REGISTER_MUXER   (FIFO,             fifo);
>      REGISTER_MUXDEMUX(FILMSTRIP,        filmstrip);
>      REGISTER_MUXDEMUX(FLAC,             flac);
>      REGISTER_DEMUXER (FLIC,             flic);
> diff --git a/libavformat/fifo.c b/libavformat/fifo.c
> new file mode 100644
> index 0000000..3748fa9
> --- /dev/null
> +++ b/libavformat/fifo.c
> @@ -0,0 +1,662 @@
> +/*
> + * FIFO pseudo-muxer
> + * Copyright (c) 2016 Jan Sebechlebsky
> + *
> + * This file is part of FFmpeg.
> + *
> + * FFmpeg is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public License
> + * as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * FFmpeg is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> + * GNU Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public License
> + * along with FFmpeg; if not, write to the Free Software * Foundation, Inc.,
> + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> + */
> +
> +#include "libavutil/opt.h"
> +#include "libavutil/time.h"
> +#include "libavutil/thread.h"
> +#include "libavutil/threadmessage.h"
> +#include "avformat.h"
> +#include "internal.h"
> +

> +#define FIFO_DEFAULT_QUEUE_SIZE              60
> +#define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS   0
> +#define FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC 5000000 // 5 second

Not very useful, but not harmful either.

Nit: plural for seconds.

> +
> +typedef struct FifoContext {
> +    const AVClass *class;
> +    AVFormatContext *avf;
> +
> +    char *format;

> +    AVOutputFormat *oformat;

Does not seem to be needed in the context, could be a local variable.

> +
> +    char *format_options_str;
> +    AVDictionary *format_options;
> +
> +    int queue_size;
> +    AVThreadMessageQueue *queue;
> +    pthread_t writer_thread;
> +
> +    /* Return value of last write_trailer_call */
> +    int write_trailer_ret;
> +
> +    /* Time to wait before next recovery attempt
> +     * This can refer to the time in processed stream,
> +     * or real time. */
> +    int64_t recovery_wait_time;
> +
> +    /* Maximal number of unsuccessful successive recovery attempts */
> +    int max_recovery_attempts;
> +
> +    /* Whether to attempt recovery from failure */
> +    int attempt_recovery;
> +
> +    /* If >0 stream time will be used when waiting
> +     * for the recovery attempt instead of real time */
> +    int recovery_wait_streamtime;
> +
> +    /* If >0 recovery will be attempted regardless of error code
> +     * (except AVERROR_EXIT, so exit request is never ignored) */
> +    int recover_any_error;
> +
> +    /* Whether to drop packets in case the queue is full. */
> +    int drop_pkts_on_overflow;
> +
> +    /* Whether to wait for keyframe when recovering
> +     * from failure or queue overflow */
> +    int restart_with_keyframe;
> +
> +    pthread_mutex_t overflow_flag_lock;
> +    /* Value > 0 signals queue overflow */
> +    volatile uint8_t overflow_flag;
> +
> +} FifoContext;
> +
> +typedef struct FifoThreadContext {
> +    AVFormatContext *avf;
> +
> +    /* Timestamp of last failure.
> +     * This is either pts in case stream time is used,
> +     * or microseconds as returned by av_getttime_relative() */
> +    int64_t last_recovery_ts;
> +
> +    /* Number of current recovery process
> +     * Value > 0 means we are in recovery process */
> +    int recovery_nr;
> +
> +    /* If > 0 all frames will be dropped until keyframe is received */
> +    uint8_t drop_until_keyframe;
> +
> +    /* Value > 0 means that the previous write_header call was successful
> +     * so finalization by calling write_trailer and ff_io_close must be done
> +     * before exiting / reinitialization of underlying muxer */
> +    uint8_t header_written;
> +} FifoThreadContext;
> +
> +typedef enum FifoMessageType {
> +    FIFO_WRITE_HEADER,
> +    FIFO_WRITE_PACKET,
> +    FIFO_FLUSH_OUTPUT
> +} FifoMessageType;
> +
> +typedef struct FifoMessage {
> +    FifoMessageType type;
> +    AVPacket pkt;
> +} FifoMessage;
> +
> +static int fifo_thread_write_header(FifoThreadContext *ctx)
> +{
> +    AVFormatContext *avf = ctx->avf;
> +    FifoContext *fifo = avf->priv_data;
> +    AVFormatContext *avf2 = fifo->avf;
> +    AVDictionary *format_options = NULL;
> +    int ret, i;
> +
> +    ret = av_dict_copy(&format_options, fifo->format_options, 0);
> +    if (ret < 0)
> +        return ret;
> +
> +    ret = ff_format_output_open(avf2, avf->filename, &format_options);
> +    if (ret < 0) {
> +        av_log(avf, AV_LOG_ERROR, "Error opening %s: %s\n", avf->filename,
> +               av_err2str(ret));
> +        goto end;
> +    }
> +
> +    for (i = 0;i < avf2->nb_streams; i++)
> +        avf2->streams[i]->cur_dts = 0;
> +
> +    ret = avformat_write_header(avf2, &format_options);
> +    if (!ret)
> +        ctx->header_written = 1;
> +

> +    // Check for options unrecognized by underlying muxer
> +    if (format_options) {
> +        AVDictionaryEntry *entry = NULL;
> +        while ((entry = av_dict_get(format_options, "", entry, AV_DICT_IGNORE_SUFFIX)))
> +            av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key);
> +        ret = AVERROR(EINVAL);
> +    }

This snippet seems to pop at quite a few places, it could become a helper
function. But do not consider it necessary for now.

> +
> +end:
> +    av_dict_free(&format_options);
> +    return ret;
> +}
> +
> +static int fifo_thread_flush_output(FifoThreadContext *ctx)
> +{
> +    AVFormatContext *avf = ctx->avf;
> +    FifoContext *fifo = avf->priv_data;
> +    AVFormatContext *avf2 = fifo->avf;
> +
> +    return av_write_frame(avf2, NULL);
> +}
> +
> +static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt)
> +{
> +    AVFormatContext *avf = ctx->avf;
> +    FifoContext *fifo = avf->priv_data;
> +    AVFormatContext *avf2 = fifo->avf;
> +    AVRational src_tb, dst_tb;
> +    int ret, s_idx;
> +
> +    if (ctx->drop_until_keyframe) {
> +        if (pkt->flags & AV_PKT_FLAG_KEY) {
> +            ctx->drop_until_keyframe = 0;
> +            av_log(avf, AV_LOG_VERBOSE, "Keyframe received, recovering...\n");
> +        } else {
> +            av_log(avf, AV_LOG_VERBOSE, "Dropping non-keyframe packet\n");
> +            av_packet_unref(pkt);
> +            return 0;
> +        }
> +    }
> +

> +    s_idx = pkt->stream_index;
> +    src_tb = avf->streams[s_idx]->time_base;
> +    dst_tb = avf2->streams[s_idx]->time_base;
> +    av_packet_rescale_ts(pkt, src_tb, dst_tb);
> +
> +    ret = av_write_frame(avf2, pkt);
> +    if (ret >= 0)
> +        av_packet_unref(pkt);
> +    return ret;
> +}
> +
> +static int fifo_thread_write_trailer(FifoThreadContext *ctx)
> +{
> +    AVFormatContext *avf = ctx->avf;
> +    FifoContext *fifo = avf->priv_data;
> +    AVFormatContext *avf2 = fifo->avf;
> +    int ret;
> +
> +    if (!ctx->header_written)
> +        return 0;
> +
> +    ret = av_write_trailer(avf2);
> +    ff_format_io_close(avf2, &avf2->pb);
> +
> +    return ret;
> +}
> +
> +static int fifo_thread_dispatch_message(FifoThreadContext *ctx, FifoMessage *msg)
> +{
> +    int ret;
> +
> +    if (!ctx->header_written) {
> +        ret = fifo_thread_write_header(ctx);
> +        if (ret < 0)
> +            return ret;
> +    }
> +
> +    switch(msg->type) {
> +    case FIFO_WRITE_HEADER:
> +        return ret;
> +    case FIFO_WRITE_PACKET:
> +        return fifo_thread_write_packet(ctx, &msg->pkt);
> +    case FIFO_FLUSH_OUTPUT:
> +        return fifo_thread_flush_output(ctx);
> +    }
> +
> +    return AVERROR(EINVAL);
> +}
> +
> +static int is_recoverable(const FifoContext *fifo, int err_no) {
> +    if (!fifo->attempt_recovery)
> +        return 0;
> +
> +    if (fifo->recover_any_error)
> +        return err_no != AVERROR_EXIT;
> +
> +    switch (err_no) {
> +    case AVERROR(EINVAL):
> +    case AVERROR(ENOSYS):
> +    case AVERROR_EOF:
> +    case AVERROR_EXIT:
> +    case AVERROR_PATCHWELCOME:
> +        return 0;
> +    default:
> +        return 1;
> +    }
> +}
> +
> +static void free_message(void *msg)
> +{
> +    FifoMessage *fifo_msg = msg;
> +
> +    if (fifo_msg->type == FIFO_WRITE_PACKET)
> +        av_packet_unref(&fifo_msg->pkt);
> +}
> +
> +static int fifo_thread_process_recovery_failure(FifoThreadContext *ctx, AVPacket *pkt,
> +                                                int err_no)
> +{
> +    AVFormatContext *avf = ctx->avf;
> +    FifoContext *fifo = avf->priv_data;
> +    int ret;
> +
> +    av_log(avf, AV_LOG_INFO, "Recovery failed: %s\n",
> +           av_err2str(err_no));
> +
> +    if (fifo->recovery_wait_streamtime) {
> +        if (pkt->pts == AV_NOPTS_VALUE)
> +            av_log(avf, AV_LOG_WARNING, "Packet does not contain presentation"
> +                   " timestamp, recovery will be attempted immediately");

> +        ctx->last_recovery_ts = pkt->pts;
> +    } else {
> +        ctx->last_recovery_ts = av_gettime_relative();

In my experience, this kind of construct is slightly simpler if you compute
the next timestamp immediately instead of keeping the last one. I mean,
instead of this:

	last_ts = now();
	...
	sleep(last_ts + delay - now());

you can write this:

	next_ts = now() + delay;
	...
	sleep(next_ts - now());

> +    }
> +
> +    if (fifo->max_recovery_attempts &&
> +        ctx->recovery_nr >= fifo->max_recovery_attempts) {
> +        av_log(avf, AV_LOG_ERROR,
> +               "Maximal number of %d recovery attempts reached.\n",
> +               fifo->max_recovery_attempts);
> +        ret = err_no;
> +    } else {
> +        ret = AVERROR(EAGAIN);
> +    }
> +
> +    return ret;
> +}
> +
> +static int fifo_thread_attempt_recovery(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
> +{
> +    AVFormatContext *avf = ctx->avf;
> +    FifoContext *fifo = avf->priv_data;
> +    AVPacket *pkt = &msg->pkt;
> +    int64_t time_since_recovery;
> +    int ret;
> +
> +    if (!is_recoverable(fifo, err_no)) {
> +        ret = err_no;
> +        goto fail;
> +    }
> +
> +    if (ctx->header_written) {
> +        fifo->write_trailer_ret = fifo_thread_write_trailer(ctx);
> +        ctx->header_written = 0;
> +    }
> +
> +    if (!ctx->recovery_nr) {

> +        ctx->last_recovery_ts = 0;

AV_NOPTS_VALUE? And maybe an av_assert1() when you use it to be sure it was
actually set.

> +    } else {
> +        if (fifo->recovery_wait_streamtime) {
> +            AVRational tb = avf->streams[pkt->stream_index]->time_base;
> +            time_since_recovery = av_rescale_q(pkt->pts - ctx->last_recovery_ts,
> +                                               tb, AV_TIME_BASE_Q);
> +        } else {
> +            time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
> +        }
> +
> +        if (time_since_recovery < fifo->recovery_wait_time)
> +            return AVERROR(EAGAIN);
> +    }
> +
> +    ctx->recovery_nr++;
> +
> +    if (fifo->max_recovery_attempts) {
> +        av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d/%d\n",
> +               ctx->recovery_nr, fifo->max_recovery_attempts);
> +    } else {
> +        av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d\n",
> +               ctx->recovery_nr);
> +    }
> +
> +    if (fifo->restart_with_keyframe && fifo->drop_pkts_on_overflow)
> +        ctx->drop_until_keyframe = 1;
> +
> +    ret = fifo_thread_dispatch_message(ctx, msg);
> +    if (ret < 0) {
> +        if (is_recoverable(fifo, ret)) {
> +            return fifo_thread_process_recovery_failure(ctx, pkt, ret);
> +        } else {
> +            goto fail;
> +        }
> +    } else {
> +        av_log(avf, AV_LOG_INFO, "Recovery successful\n");
> +        ctx->recovery_nr = 0;
> +    }
> +
> +    return 0;
> +
> +fail:
> +    free_message(msg);
> +    return ret;
> +}
> +
> +static int fifo_thread_recover(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
> +{
> +    AVFormatContext *avf = ctx->avf;
> +    FifoContext *fifo = avf->priv_data;
> +    int ret;
> +
> +    do {
> +        if (!fifo->recovery_wait_streamtime && ctx->recovery_nr > 0) {
> +            int64_t time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
> +            int64_t time_to_wait = FFMAX(0, fifo->recovery_wait_time - time_since_recovery);

> +            if (time_to_wait)
> +                av_usleep(FFMIN(10000, time_to_wait));

This is why I really do not like threads: they are supposed to make things
simpler and avoid all the hassle of event loops and polling, but as soon as
the task becomes non-trivial, it becomes necessary to implement an event
loop or polling in the thread.

End of rant, you can resume your normal occupations.

> +        }
> +
> +        ret = fifo_thread_attempt_recovery(ctx, msg, err_no);
> +    } while (ret == AVERROR(EAGAIN) && !fifo->drop_pkts_on_overflow);
> +
> +    if (ret == AVERROR(EAGAIN) && fifo->drop_pkts_on_overflow) {
> +        if (msg->type == FIFO_WRITE_PACKET)
> +            av_packet_unref(&msg->pkt);
> +        ret = 0;
> +    }
> +
> +    return ret;
> +}
> +
> +static void *fifo_consumer_thread(void *data)
> +{
> +    AVFormatContext *avf = data;
> +    FifoContext *fifo = avf->priv_data;
> +    AVThreadMessageQueue *queue = fifo->queue;
> +    FifoMessage msg = {FIFO_WRITE_HEADER, {0}};
> +    int ret;
> +
> +    FifoThreadContext fifo_thread_ctx;
> +    memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext));
> +    fifo_thread_ctx.avf = avf;
> +
> +    while (1) {
> +        uint8_t just_flushed = 0;
> +
> +        if (!fifo_thread_ctx.recovery_nr)
> +            ret = fifo_thread_dispatch_message(&fifo_thread_ctx, &msg);
> +
> +        if (ret < 0 || fifo_thread_ctx.recovery_nr > 0) {
> +            int rec_ret = fifo_thread_recover(&fifo_thread_ctx, &msg, ret);
> +            if (rec_ret < 0) {
> +                av_thread_message_queue_set_err_send(queue, rec_ret);
> +                break;
> +            }
> +        }
> +
> +        /* If the queue is full at the moment when fifo_write_packet
> +         * attempts to insert new message (packet) to the queue,
> +         * it sets the fifo->overflow_flag to 1 and drops packet.
> +         * Here in consumer thread, the flag is checked and if it is
> +         * set, the queue is flushed and flag cleared. */

> +        pthread_mutex_lock(&fifo->overflow_flag_lock);
> +        if (fifo->overflow_flag) {
> +            av_thread_message_flush(queue);
> +            if (fifo->restart_with_keyframe)
> +                fifo_thread_ctx.drop_until_keyframe = 1;
> +            fifo->overflow_flag = 0;
> +            just_flushed = 1;
> +        }
> +        pthread_mutex_unlock(&fifo->overflow_flag_lock);

I wonder if this whole thing could not be made simpler. This is just a
suggestion, so do not feel obligated to act on it if you lack time.

This has the feel of an out-of-band message. This is a rather useful
feature. This could be added to the thread message queue rather easily.

But in fact, I think it could be made even simpler. See below where
overflow_flag is set.

> +
> +        if (just_flushed)
> +            av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
> +
> +        ret = av_thread_message_queue_recv(queue, &msg, 0);

> +        if (ret < 0) {
> +            av_thread_message_queue_set_err_send(queue, ret);
> +            break;

I do not think this is necessary: the only error possible here is the one
you sent yourself using av_thread_message_queue_set_err_recv().

> +        }
> +    }
> +
> +    fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx);
> +
> +    return NULL;
> +}
> +
> +static int fifo_mux_init(AVFormatContext *avf)
> +{
> +    FifoContext *fifo = avf->priv_data;
> +    AVFormatContext *avf2;
> +    int ret = 0, i;
> +
> +    ret = avformat_alloc_output_context2(&avf2, fifo->oformat, NULL, NULL);
> +    if (ret < 0) {
> +        return ret;
> +    }
> +
> +    fifo->avf = avf2;
> +

> +    avf2->interrupt_callback = avf->interrupt_callback;

You have not documented the fact that the interrupt callback needs to be
thread-safe. And if users can select the fifo muxer, this becomes a problem.

> +    avf2->max_delay = avf->max_delay;
> +    ret = av_dict_copy(&avf2->metadata, avf->metadata, 0);
> +    if (ret < 0)
> +        return ret;
> +    avf2->opaque = avf->opaque;
> +    avf2->io_close = avf->io_close;
> +    avf2->io_open = avf->io_open;
> +    avf2->flags = avf->flags;
> +
> +    for (i = 0; i < avf->nb_streams; ++i) {
> +        AVStream *st = avformat_new_stream(avf2, NULL);
> +        if (!st)
> +            return AVERROR(ENOMEM);
> +
> +        ret = ff_stream_encode_params_copy(st, avf->streams[i]);
> +        if (ret < 0)
> +            return ret;
> +    }
> +
> +    return 0;
> +}
> +
> +static int fifo_init(AVFormatContext *avf)
> +{
> +    FifoContext *fifo = avf->priv_data;
> +    int ret = 0;
> +
> +    if (fifo->recovery_wait_streamtime && !fifo->drop_pkts_on_overflow) {
> +        av_log(avf, AV_LOG_ERROR, "recovery_wait_streamtime can be turned on"
> +               " only when drop_pkts_on_overflow is also turned on\n");
> +        return AVERROR(EINVAL);
> +    }
> +
> +    if (fifo->format_options_str) {
> +        ret = av_dict_parse_string(&fifo->format_options, fifo->format_options_str,
> +                                   "=", ":", 0);
> +        if (ret < 0) {
> +            av_log(avf, AV_LOG_ERROR, "Could not parse format options list '%s'\n",
> +                   fifo->format_options_str);
> +            return ret;
> +        }
> +    }
> +
> +    fifo->oformat = av_guess_format(fifo->format, avf->filename, NULL);
> +    if (!fifo->oformat) {
> +        ret = AVERROR_MUXER_NOT_FOUND;
> +        return ret;
> +    }
> +
> +    ret = fifo_mux_init(avf);
> +    if (ret < 0)
> +        return ret;
> +
> +    ret = av_thread_message_queue_alloc(&fifo->queue, (unsigned) fifo->queue_size,
> +                                        sizeof(FifoMessage));
> +    if (ret < 0)
> +        return ret;
> +
> +    av_thread_message_queue_set_free_func(fifo->queue, free_message);
> +
> +    ret = pthread_mutex_init(&fifo->overflow_flag_lock, NULL);
> +    if (ret < 0)
> +        return AVERROR(ret);
> +
> +    return 0;
> +}
> +
> +static int fifo_write_header(AVFormatContext *avf)
> +{
> +    FifoContext * fifo = avf->priv_data;
> +    int ret;
> +
> +    ret = pthread_create(&fifo->writer_thread, NULL, fifo_consumer_thread, avf);
> +    if (ret) {
> +        av_log(avf, AV_LOG_ERROR, "Failed to start thread: %s\n",
> +               av_err2str(AVERROR(ret)));
> +        ret = AVERROR(ret);
> +    }
> +
> +    return 0;
> +}
> +
> +static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
> +{
> +    FifoContext *fifo = avf->priv_data;
> +    FifoMessage msg = {.type = pkt ? FIFO_WRITE_PACKET : FIFO_FLUSH_OUTPUT};
> +    int ret;
> +
> +    if (pkt) {
> +        av_init_packet(&msg.pkt);
> +        ret = av_packet_ref(&msg.pkt,pkt);
> +        if (ret < 0)
> +            return ret;
> +    }
> +
> +    ret = av_thread_message_queue_send(fifo->queue, &msg,
> +                                       fifo->drop_pkts_on_overflow ?
> +                                       AV_THREAD_MESSAGE_NONBLOCK : 0);

> +    if (ret == AVERROR(EAGAIN)) {
> +        uint8_t overflow_set = 0;
> +
> +        /* Queue is full, set fifo->overflow_flag to 1
> +         * to let consumer thread know the queue should
> +         * be flushed. */
> +        pthread_mutex_lock(&fifo->overflow_flag_lock);
> +        if (!fifo->overflow_flag)
> +            fifo->overflow_flag = overflow_set = 1;
> +        pthread_mutex_unlock(&fifo->overflow_flag_lock);
> +
> +        if (overflow_set)
> +            av_log(avf, AV_LOG_WARNING, "FIFO queue full\n");
> +        ret = 0;
> +        goto fail;

Can you explain why you do not just drop the packet here and now? If you do
that and move the discard until keyframe here, you can dispense with the
overflow flag and the lock. The consumer thread becomes much simpler and can
focus on the retries themselves. Plus, the whole process becomes slightly
more efficient since packets are discarded earlier.

Also, why do you flush the whole 60 packets when it blocks? If the output is
only lagging a little, discarding a single packet could be enough. And if
not, it will discard the next packets.

> +    } else if (ret < 0) {
> +        goto fail;
> +    }
> +
> +    return ret;
> +fail:
> +    if (pkt)
> +        av_packet_unref(&msg.pkt);
> +    return ret;
> +}
> +
> +static int fifo_write_trailer(AVFormatContext *avf)
> +{
> +    FifoContext *fifo= avf->priv_data;
> +    int ret;
> +
> +    av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF);
> +

> +    ret = pthread_join( fifo->writer_thread, NULL);

Nit: stray space.

> +    if (ret < 0) {
> +        av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
> +               av_err2str(AVERROR(ret)));
> +        return AVERROR(ret);
> +    }
> +
> +    ret = fifo->write_trailer_ret;
> +    return ret;
> +}
> +
> +static void fifo_deinit(AVFormatContext *avf)
> +{
> +    FifoContext *fifo = avf->priv_data;
> +

> +    if (fifo->format_options)

Superfluous test.

> +        av_dict_free(&fifo->format_options);
> +
> +    avformat_free_context(fifo->avf);
> +

> +    if (fifo->queue) {
> +        av_thread_message_flush(fifo->queue);
> +        av_thread_message_queue_free(&fifo->queue);
> +    }

Superfluous test and flush: all taken care by free.

> +
> +    pthread_mutex_destroy(&fifo->overflow_flag_lock);
> +}
> +
> +#define OFFSET(x) offsetof(FifoContext, x)
> +static const AVOption options[] = {
> +        {"fifo_format", "Target muxer", OFFSET(format),
> +         AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
> +
> +        {"queue_size", "Size of fifo queue", OFFSET(queue_size),

> +         AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_QUEUE_SIZE}, 1, 1024, AV_OPT_FLAG_ENCODING_PARAM},

1024 seems a bit arbitrary.

> +
> +        {"format_opts", "Options to be passed to underlying muxer", OFFSET(format_options_str),
> +         AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
> +
> +        {"drop_pkts_on_overflow", "Drop packets on fifo queue overflow not to block encoder", OFFSET(drop_pkts_on_overflow),
> +         AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
> +
> +        {"restart_with_keyframe", "Wait for keyframe when restarting output", OFFSET(restart_with_keyframe),
> +         AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
> +
> +        {"attempt_recovery", "Attempt recovery in case of failure", OFFSET(attempt_recovery),
> +        AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
> +
> +        {"max_recovery_attempts", "Maximal number of recovery attempts", OFFSET(max_recovery_attempts),
> +         AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS}, 0, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
> +
> +        {"recovery_wait_time", "Waiting time between recovery attempts", OFFSET(recovery_wait_time),
> +         AV_OPT_TYPE_DURATION, {.i64 = FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
> +
> +        {"recovery_wait_streamtime", "Use stream time instead of real time while waiting for recovery",
> +         OFFSET(recovery_wait_streamtime), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
> +
> +        {"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error),
> +         AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
> +
> +        {NULL},
> +};
> +
> +static const AVClass fifo_muxer_class = {
> +    .class_name = "Fifo muxer",
> +    .item_name  = av_default_item_name,
> +    .option     = options,
> +    .version    = LIBAVUTIL_VERSION_INT,
> +};
> +
> +AVOutputFormat ff_fifo_muxer = {
> +    .name           = "fifo",
> +    .long_name      = NULL_IF_CONFIG_SMALL("FIFO queue pseudo-muxer"),
> +    .priv_data_size = sizeof(FifoContext),
> +    .init           = fifo_init,
> +    .write_header   = fifo_write_header,
> +    .write_packet   = fifo_write_packet,
> +    .write_trailer  = fifo_write_trailer,
> +    .deinit         = fifo_deinit,
> +    .priv_class     = &fifo_muxer_class,
> +    .flags          = AVFMT_NOFILE | AVFMT_ALLOW_FLUSH | AVFMT_TS_NEGATIVE,
> +};
> diff --git a/libavformat/version.h b/libavformat/version.h
> index 6f47a2f..647c39c 100644
> --- a/libavformat/version.h
> +++ b/libavformat/version.h
> @@ -32,7 +32,7 @@
>  // Major bumping may affect Ticket5467, 5421, 5451(compatibility with Chromium)
>  // Also please add any ticket numbers that you belive might be affected here
>  #define LIBAVFORMAT_VERSION_MAJOR  57
> -#define LIBAVFORMAT_VERSION_MINOR  46

> +#define LIBAVFORMAT_VERSION_MINOR  47
>  #define LIBAVFORMAT_VERSION_MICRO 101

Reset micro when bumping minor.

>  
>  #define LIBAVFORMAT_VERSION_INT AV_VERSION_INT(LIBAVFORMAT_VERSION_MAJOR, \

Regards,
sebechlebskyjan@gmail.com Aug. 15, 2016, 11:05 p.m. UTC | #2
On 08/15/2016 11:50 PM, Nicolas George wrote:

> L'octidi 28 thermidor, an CCXXIV, sebechlebskyjan@gmail.com a écrit :
[...]
>> +s@item recovery_wait_streamtime @var{bool}
>     ^
> Strange.
>
Sorry, that is obviously a typo. Strange thing is it had not produced 
any kind of warning/error.
[...]
>> +#define FIFO_DEFAULT_QUEUE_SIZE              60
>> +#define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS   0
>> +#define FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC 5000000 // 5 second
> Not very useful, but not harmful either.
>
> Nit: plural for seconds.
>
>> +
>> +typedef struct FifoContext {
>> +    const AVClass *class;
>> +    AVFormatContext *avf;
>> +
>> +    char *format;
>> +    AVOutputFormat *oformat;
> Does not seem to be needed in the context, could be a local variable.
You're probably right, I'll check it and make it local variable if so.
>> +    // Check for options unrecognized by underlying muxer
>> +    if (format_options) {
>> +        AVDictionaryEntry *entry = NULL;
>> +        while ((entry = av_dict_get(format_options, "", entry, AV_DICT_IGNORE_SUFFIX)))
>> +            av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key);
>> +        ret = AVERROR(EINVAL);
>> +    }
> This snippet seems to pop at quite a few places, it could become a helper
> function. But do not consider it necessary for now.
I'll try to keep it in mind and add helper function later.
> [...]
>> +        ctx->last_recovery_ts = pkt->pts;
>> +    } else {
>> +        ctx->last_recovery_ts = av_gettime_relative();
> In my experience, this kind of construct is slightly simpler if you compute
> the next timestamp immediately instead of keeping the last one. I mean,
> instead of this:
>
> 	last_ts = now();
> 	...
> 	sleep(last_ts + delay - now());
>
> you can write this:
>
> 	next_ts = now() + delay;
> 	...
> 	sleep(next_ts - now());
I'll give it a try and see if it is more cleaner in context of this code.
>> +    }
>> +
>> +    if (fifo->max_recovery_attempts &&
>> +        ctx->recovery_nr >= fifo->max_recovery_attempts) {
>> +        av_log(avf, AV_LOG_ERROR,
>> +               "Maximal number of %d recovery attempts reached.\n",
>> +               fifo->max_recovery_attempts);
>> +        ret = err_no;
>> +    } else {
>> +        ret = AVERROR(EAGAIN);
>> +    }
>> +
>> +    return ret;
>> +}
>> +
>> +static int fifo_thread_attempt_recovery(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
>> +{
>> +    AVFormatContext *avf = ctx->avf;
>> +    FifoContext *fifo = avf->priv_data;
>> +    AVPacket *pkt = &msg->pkt;
>> +    int64_t time_since_recovery;
>> +    int ret;
>> +
>> +    if (!is_recoverable(fifo, err_no)) {
>> +        ret = err_no;
>> +        goto fail;
>> +    }
>> +
>> +    if (ctx->header_written) {
>> +        fifo->write_trailer_ret = fifo_thread_write_trailer(ctx);
>> +        ctx->header_written = 0;
>> +    }
>> +
>> +    if (!ctx->recovery_nr) {
>> +        ctx->last_recovery_ts = 0;
> AV_NOPTS_VALUE? And maybe an av_assert1() when you use it to be sure it was
> actually set.
I'll take a look at it. Using zero will delay the initial recovery 
attempt if failure happens at
the pts = 0, so treating it specially seems like a good idea.
> [...]
>> +        pthread_mutex_lock(&fifo->overflow_flag_lock);
>> +        if (fifo->overflow_flag) {
>> +            av_thread_message_flush(queue);
>> +            if (fifo->restart_with_keyframe)
>> +                fifo_thread_ctx.drop_until_keyframe = 1;
>> +            fifo->overflow_flag = 0;
>> +            just_flushed = 1;
>> +        }
>> +        pthread_mutex_unlock(&fifo->overflow_flag_lock);
> I wonder if this whole thing could not be made simpler. This is just a
> suggestion, so do not feel obligated to act on it if you lack time.
>
> This has the feel of an out-of-band message. This is a rather useful
> feature. This could be added to the thread message queue rather easily.
I am not sure if I understand this. Do you mean thread queue function 
which would set
certain flag that the queue should be flushed and flushed the queue at 
certain point in time (next receive call?)?
> But in fact, I think it could be made even simpler. See below where
> overflow_flag is set.
>
>> +
>> +        if (just_flushed)
>> +            av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
>> +
>> +        ret = av_thread_message_queue_recv(queue, &msg, 0);
> [...]
>> +        if (ret < 0) {
>> +            av_thread_message_queue_set_err_send(queue, ret);
>> +            break;
> I do not think this is necessary: the only error possible here is the one
> you sent yourself using av_thread_message_queue_set_err_recv().
You're right. I'll remove it.
>> +        }
>> +    }
>> +
>> +    fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx);
>> +
>> +    return NULL;
>> +}
>> +
>> +static int fifo_mux_init(AVFormatContext *avf)
>> +{
>> +    FifoContext *fifo = avf->priv_data;
>> +    AVFormatContext *avf2;
>> +    int ret = 0, i;
>> +
>> +    ret = avformat_alloc_output_context2(&avf2, fifo->oformat, NULL, NULL);
>> +    if (ret < 0) {
>> +        return ret;
>> +    }
>> +
>> +    fifo->avf = avf2;
>> +
>> +    avf2->interrupt_callback = avf->interrupt_callback;
> You have not documented the fact that the interrupt callback needs to be
> thread-safe. And if users can select the fifo muxer, this becomes a problem.
I will submit another patch adding that to the API documentation. Thanks 
for reminding.
[...]
>> +    if (ret == AVERROR(EAGAIN)) {
>> +        uint8_t overflow_set = 0;
>> +
>> +        /* Queue is full, set fifo->overflow_flag to 1
>> +         * to let consumer thread know the queue should
>> +         * be flushed. */
>> +        pthread_mutex_lock(&fifo->overflow_flag_lock);
>> +        if (!fifo->overflow_flag)
>> +            fifo->overflow_flag = overflow_set = 1;
>> +        pthread_mutex_unlock(&fifo->overflow_flag_lock);
>> +
>> +        if (overflow_set)
>> +            av_log(avf, AV_LOG_WARNING, "FIFO queue full\n");
>> +        ret = 0;
>> +        goto fail;
> Can you explain why you do not just drop the packet here and now? If you do
> that and move the discard until keyframe here, you can dispense with the
> overflow flag and the lock. The consumer thread becomes much simpler and can
> focus on the retries themselves. Plus, the whole process becomes slightly
> more efficient since packets are discarded earlier.
>
> Also, why do you flush the whole 60 packets when it blocks? If the output is
> only lagging a little, discarding a single packet could be enough. And if
> not, it will discard the next packets.
The current packet is discarded here. We discussed with Marton how to 
drop packets and agreed that flushing the whole queue should be OK - it 
is done within single critical section in av_thread_queue_flush(), 
flushing just single packet might not be enough in some situations. The 
reason why is it done in worker thread and not in this call is that 
flushing the queue can get quite costly -> dereferencing larger number 
of packets can lead to many free() calls which may be costly, the idea 
was to move potentially costly operation to worker thread.
     It may be a good thing to make this configurable (allow user to set 
how many messages to flush on overflow) and add function to thread queue 
which would flush certain number of messages in single critical section.
> [...]
>> +         AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_QUEUE_SIZE}, 1, 1024, AV_OPT_FLAG_ENCODING_PARAM},
> 1024 seems a bit arbitrary.
I'll change that to INT_MAX.
> [...]
>> +#define LIBAVFORMAT_VERSION_MINOR  47
>>   #define LIBAVFORMAT_VERSION_MICRO 101
> Reset micro when bumping minor.
I'll fix that.

Thanks for review! I'll fix the issues and resend the patch soon.

Regards,
Jan
Marton Balint Aug. 15, 2016, 11:14 p.m. UTC | #3
>>> +        if (!fifo->overflow_flag)
>>> +            fifo->overflow_flag = overflow_set = 1;
>>> +        pthread_mutex_unlock(&fifo->overflow_flag_lock);
>>> +
>>> +        if (overflow_set)
>>> +            av_log(avf, AV_LOG_WARNING, "FIFO queue full\n");
>>> +        ret = 0;
>>> +        goto fail;

>> Can you explain why you do not just drop the packet here and now? If you do
>> that and move the discard until keyframe here, you can dispense with the
>> overflow flag and the lock. The consumer thread becomes much simpler and can
>> focus on the retries themselves. Plus, the whole process becomes slightly
>> more efficient since packets are discarded earlier.
>>
>> Also, why do you flush the whole 60 packets when it blocks? If the output is
>> only lagging a little, discarding a single packet could be enough. And if
>> not, it will discard the next packets.

> The current packet is discarded here. We discussed with Marton how to 
> drop packets and agreed that flushing the whole queue should be OK - it 
> is done within single critical section in av_thread_queue_flush(), 
> flushing just single packet might not be enough in some situations. The 
> reason why is it done in worker thread and not in this call is that 
> flushing the queue can get quite costly -> dereferencing larger number 
> of packets can lead to many free() calls which may be costly, the idea 
> was to move potentially costly operation to worker thread.

I remember some other advantages of flushing in blocks as well, which made 
me suggest it to Jan:

- It is a good thing if the consumer knows that there was a packet
discontinuity, it can decide what to do. If you drop packets early, the
consumer has no chance of knowing if there was a packet discontinuity.

- Dropping packets in continous blocks, rather than one by one, will 
probably cause less artifacts for the user, for example if you drop a 
reference frame from every GOP, it can make the video glitchy for the 
whole GOP. And exactly this can happen, if the output is only lagging a 
little.

- When the output blocks for some reason, then starts to work, then 
instead of one continous drop of packets you would get a continous drop, 
then packets and drops alternating, and finally when there is enough space 
in the fifo, continous packets. This is also ugly.

- Also it is not healthy to operate with an almost full fifo, because of
the additional latency it causes. So if the queue fills up for any reason,
let's give us a fresh chance to operate normally, with as small latency as
possible.

Regards,
Marton
sebechlebskyjan@gmail.com Aug. 15, 2016, 11:59 p.m. UTC | #4
On 08/16/2016 01:05 AM, Jan Sebechlebsky wrote:
> On 08/15/2016 11:50 PM, Nicolas George wrote:
>>
>>> +
>>> +        if (just_flushed)
>>> +            av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
>>> +
>>> +        ret = av_thread_message_queue_recv(queue, &msg, 0);
>> [...]
>>> +        if (ret < 0) {
>>> +            av_thread_message_queue_set_err_send(queue, ret);
>>> +            break;
>> I do not think this is necessary: the only error possible here is the 
>> one
>> you sent yourself using av_thread_message_queue_set_err_recv().
> You're right. I'll remove it.
Actually on second thought I think it I'll leave it there - it ensures 
that all subsequent calls to write_frame will return error code.

Regards,
Jan
Nicolas George Aug. 18, 2016, 11:12 a.m. UTC | #5
Le decadi 30 thermidor, an CCXXIV, Jan Sebechlebsky a écrit :
> I am not sure if I understand this. Do you mean thread queue function which
> would set
> certain flag that the queue should be flushed and flushed the queue at
> certain point in time (next receive call?)?

That is a bit too specific but you have the gist of it.

As a rule, the more communication mechanisms are mixed together, the harder
it becomes to get things right and efficient. Furthermore, using low-level
functions is often more tricky than high-level functions. In this code, you
are doing both: mixing a message queue with a low-level flag and mutex. It
would be more efficient if the message queue could take care of all your
needs with a simple API.

In that particular case, we can copy what other protocols do, because they
certainly gave it enough thought: allow a single out-of-band message that
jumps directly in the front of the queue. Then you just need to make that
message need "queue needs flushing" to achieve your goal.

Adding the out-of-band message could be done with a flag,
AV_THREAD_MESSAGE_OOB or AV_THREAD_MESSAGE_IMMEDIATE.

The tricky part is to make sure that it can not fail or block. With regard
to normal in-band messages, it can be done by having an extra, separate,
slot for the out-of-band message. But it still blocks if a second
out-of-band is added before the first one is consumed. I think the best
solution for that is to require a combine callback function:

int combine_messages(void *cur_v, const void *new_v, void *opaque)
{
    MessageType *cur = cur_v, const *new = new_v;

    cur->flags |= new->flags;
    cur->count += new->count;
    //cur->first_ts = cur->first_ts;
    cur->last_ts = new->last_ts;
    return 0;
}

If the combine callback can not fail, then the whole process can not fail
either, and we have won.

But see below.

> The current packet is discarded here. We discussed with Marton how to drop
> packets and agreed that flushing the whole queue should be OK - it is done
> within single critical section in av_thread_queue_flush(), flushing just
> single packet might not be enough in some situations. The reason why is it
> done in worker thread and not in this call is that flushing the queue can
> get quite costly -> dereferencing larger number of packets can lead to many
> free() calls which may be costly, the idea was to move potentially costly
> operation to worker thread.
>     It may be a good thing to make this configurable (allow user to set how
> many messages to flush on overflow) and add function to thread queue which
> would flush certain number of messages in single critical section.

I am really not convinced by these arguments, especially with the code
complexity their conclusion causes.

First of all: "might in some situations": as you point in the next
paragraph, it needs to be configurable. Eventually, the feature can go in
without. But the design should make configuration easy. In this particular
case, it does not, because it structurally confuses the size of the queue
and the number of messages to discard to smooth an overrun.

Furthermore, there is somewhat of a contradiction in the arguments between
the requirement of a single critical section and the worry about the cost of
the free()s. If the free()s are expensive, then they should not happen in a
critical section.

As it happens, I think my proposal takes care of all four points in a single
sweep. I could say, wondering if someone will catch the reference, that my
plan is sheer elegance in its simplicity:

In the thread message queue, focus on the actual muxing and the recovering.
That is already a full task. Leave the handling of overruns to the main
thread -> takes care of the code complexity.

In the main thread, discard the packets one at a time, as they arrive -> a
single free() per packet, which is considered normal handling cost in a
muxer -> takes care of the cost of free()s.

Still in the main thread, set a field "discard_n_next_packets" -> takes care
of discarding several consecutive packets to reduce artifacts (which may not
always for the best; I think speech audio codecs are better with sparse
missing packets, for example). Logically, the "discard_until_keyframe"
belongs at the same place; and the logic could also include some kind of
"discard_until_less_than_n_packets_in_queue".

And last of the four: nothing above requires any critical section.

Regards,
Nicolas George Aug. 18, 2016, 11:19 a.m. UTC | #6
Le decadi 30 thermidor, an CCXXIV, Marton Balint a écrit :
> I remember some other advantages of flushing in blocks as well, which made
> me suggest it to Jan:
> 
> - It is a good thing if the consumer knows that there was a packet
> discontinuity, it can decide what to do. If you drop packets early, the
> consumer has no chance of knowing if there was a packet discontinuity.

This is an interesting argument, but there is plenty of room in the message
structure to add a boolean that says "this message is the first one after an
overrun", or even a counter "n packets were discarded just before this one".

> - Dropping packets in continous blocks, rather than one by one, will
> probably cause less artifacts for the user, for example if you drop a
> reference frame from every GOP, it can make the video glitchy for the whole
> GOP. And exactly this can happen, if the output is only lagging a little.
> 
> - When the output blocks for some reason, then starts to work, then instead
> of one continous drop of packets you would get a continous drop, then
> packets and drops alternating, and finally when there is enough space in the
> fifo, continous packets. This is also ugly.
> 
> - Also it is not healthy to operate with an almost full fifo, because of
> the additional latency it causes. So if the queue fills up for any reason,
> let's give us a fresh chance to operate normally, with as small latency as
> possible.

I think the proposal in my previous mail works for all these issue (which
are, I believe, actually three wordings of the same one).

Regards,
Marton Balint Aug. 18, 2016, 6:28 p.m. UTC | #7
On Thu, 18 Aug 2016, Nicolas George wrote:

> Le decadi 30 thermidor, an CCXXIV, Marton Balint a écrit :
>> I remember some other advantages of flushing in blocks as well, which made
>> me suggest it to Jan:
>>
>> - It is a good thing if the consumer knows that there was a packet
>> discontinuity, it can decide what to do. If you drop packets early, the
>> consumer has no chance of knowing if there was a packet discontinuity.
>
> This is an interesting argument, but there is plenty of room in the message
> structure to add a boolean that says "this message is the first one after an
> overrun", or even a counter "n packets were discarded just before this one".
>
>> - Dropping packets in continous blocks, rather than one by one, will
>> probably cause less artifacts for the user, for example if you drop a
>> reference frame from every GOP, it can make the video glitchy for the whole
>> GOP. And exactly this can happen, if the output is only lagging a little.
>>
>> - When the output blocks for some reason, then starts to work, then instead
>> of one continous drop of packets you would get a continous drop, then
>> packets and drops alternating, and finally when there is enough space in the
>> fifo, continous packets. This is also ugly.
>>
>> - Also it is not healthy to operate with an almost full fifo, because of
>> the additional latency it causes. So if the queue fills up for any reason,
>> let's give us a fresh chance to operate normally, with as small latency as
>> possible.
>
> I think the proposal in my previous mail works for all these issue (which
> are, I believe, actually three wordings of the same one).

So you want to do the block flushes in the producer thread. That would 
probably cause half-full fifos when recovering from a full fifo, 
because the producer thread has no knowledge when the consumer actually 
returned from a single long blockage at the writing of one packet.

On the other hand, if the the flush happen in the consumer thread, it will 
happen exactly at the time, when the consumer thread is no longer blocked, 
therefore I find it superior. (and also cleaner, because the consumer does 
the consuming...)

An early version of Jan's code used flushes both at the producer and the 
consumer to work around this issue, but it become more complicated than 
doing the flushes in one place, in the consumer.

I hope I addressed your concerns, I'd like to commit the code as it is. 
Even if you still don't agree with everything, I think we should commit 
the code, because it works, and improvements can be done later.

Regards,
Marton
diff mbox

Patch

diff --git a/Changelog b/Changelog
index b903e31..c693d34 100644
--- a/Changelog
+++ b/Changelog
@@ -15,6 +15,7 @@  version <next>:
 - True Audio (TTA) muxer
 - crystalizer audio filter
 - acrusher audio filter
+- fifo muxer
 
 
 version 3.1:
diff --git a/configure b/configure
index bff8159..a252354 100755
--- a/configure
+++ b/configure
@@ -2834,6 +2834,7 @@  dv_muxer_select="dvprofile"
 dxa_demuxer_select="riffdec"
 eac3_demuxer_select="ac3_parser"
 f4v_muxer_select="mov_muxer"
+fifo_muxer_deps="pthreads"
 flac_demuxer_select="flac_parser"
 hds_muxer_select="flv_muxer"
 hls_muxer_select="mpegts_muxer"
diff --git a/doc/muxers.texi b/doc/muxers.texi
index 5873269..b4c3886 100644
--- a/doc/muxers.texi
+++ b/doc/muxers.texi
@@ -1436,6 +1436,98 @@  Specify whether to remove all fragments when finished. Default 0 (do not remove)
 
 @end table
 
+@section fifo
+
+The fifo pseudo-muxer allows the separation of encoding and muxing by using
+first-in-first-out queue and running the actual muxer in a separate thread. This
+is especially useful in combination with the @ref{tee} muxer and can be used to
+send data to several destinations with different reliability/writing speed/latency.
+
+The behavior of the fifo muxer if the queue fills up or if the output fails is
+selectable,
+
+@itemize @bullet
+
+@item
+output can be transparently restarted with configurable delay between retries
+based on real time or time of the processed stream.
+
+@item
+encoding can be blocked during temporary failure, or continue transparently
+dropping packets in case fifo queue fills up.
+
+@end itemize
+
+@table @option
+
+@item fifo_format
+Specify the format name. Useful if it cannot be guessed from the
+output name suffix.
+
+@item queue_size
+Specify size of the queue (number of packets). Default value is 60.
+
+@item format_opts
+Specify format options for the underlying muxer. Muxer options can be specified
+as a list of @var{key}=@var{value} pairs separated by ':'.
+
+@item drop_pkts_on_overflow @var{bool}
+If set to 1 (true), in case the fifo queue fills up, packets will be dropped
+rather than blocking the encoder. This allows to continue streaming without
+delaying the input, at the cost of ommiting part of the stream. By default
+this option is set to 0 (false), so in such cases the encoder will be blocked
+until the muxer processes some of the packets and none of them is lost.
+
+@item attempt_recovery @var{bool}
+If failure occurs, attempt to recover the output. This is especially useful
+when used with network output, allows to restart streaming transparently.
+By default this option set to 0 (false).
+
+@item max_recovery_attempts
+Sets maximum number of successive unsucessful recovery attempts after which
+the output fails permanently. By default this option is set to 0 (unlimited).
+
+@item recovery_wait_time @var{duration}
+Waiting time before the next recovery attempt after previous unsuccessfull
+recovery attempt. Default value is 5 seconds.
+
+s@item recovery_wait_streamtime @var{bool}
+If set to 0 (false), the real time is used when waiting for the recovery
+attempt (i.e. the recovery will be attempted after at least
+recovery_wait_time seconds).
+If set to 1 (true), the time of the processed stream is taken into account
+instead (i.e. the recovery will be attempted after at least @var{recovery_wait_time}
+seconds of the stream is omitted).
+By default, this option is set to 0 (false).
+
+@item recover_any_error @var{bool}
+If set to 1 (true), recovery will be attempted regardless of type of the error
+causing the failure. By default this option is set to 0 (false) and in case of
+certain (usually permanent) errors the recovery is not attempted even when
+@var{attempt_recovery} is set to 1.
+
+@item restart_with_keyframe @var{bool}
+Specify whether to wait for the keyframe after recovering from
+queue overflow or failure. This option is set to 0 (false) by default.
+
+@end table
+
+@subsection Examples
+
+@itemize
+
+@item
+Stream something to rtmp server, continue processing the stream at real-time
+rate even in case of temporary failure (network outage) and attempt to recover
+streaming every second indefinitely.
+@example
+ffmpeg -re -i ... -c:v libx264 -c:a aac -f fifo -fifo_format flv -map 0:v -map 0:a
+  -drop_pkts_on_overflow 1 -attempt_recovery 1 -recovery_wait_time 1 rtmp://example.com/live/stream_name
+@end example
+
+@end itemize
+
+@anchor{tee}
 @section tee
 
 The tee muxer can be used to write the same data to several files or any
diff --git a/libavformat/Makefile b/libavformat/Makefile
index fda1e17..2d2b78c 100644
--- a/libavformat/Makefile
+++ b/libavformat/Makefile
@@ -162,6 +162,7 @@  OBJS-$(CONFIG_FFM_DEMUXER)               += ffmdec.o
 OBJS-$(CONFIG_FFM_MUXER)                 += ffmenc.o
 OBJS-$(CONFIG_FFMETADATA_DEMUXER)        += ffmetadec.o
 OBJS-$(CONFIG_FFMETADATA_MUXER)          += ffmetaenc.o
+OBJS-$(CONFIG_FIFO_MUXER)                += fifo.o
 OBJS-$(CONFIG_FILMSTRIP_DEMUXER)         += filmstripdec.o
 OBJS-$(CONFIG_FILMSTRIP_MUXER)           += filmstripenc.o
 OBJS-$(CONFIG_FLAC_DEMUXER)              += flacdec.o rawdec.o \
diff --git a/libavformat/allformats.c b/libavformat/allformats.c
index a69195e..6a216ef 100644
--- a/libavformat/allformats.c
+++ b/libavformat/allformats.c
@@ -123,6 +123,7 @@  void av_register_all(void)
     REGISTER_MUXER   (F4V,              f4v);
     REGISTER_MUXDEMUX(FFM,              ffm);
     REGISTER_MUXDEMUX(FFMETADATA,       ffmetadata);
+    REGISTER_MUXER   (FIFO,             fifo);
     REGISTER_MUXDEMUX(FILMSTRIP,        filmstrip);
     REGISTER_MUXDEMUX(FLAC,             flac);
     REGISTER_DEMUXER (FLIC,             flic);
diff --git a/libavformat/fifo.c b/libavformat/fifo.c
new file mode 100644
index 0000000..3748fa9
--- /dev/null
+++ b/libavformat/fifo.c
@@ -0,0 +1,662 @@ 
+/*
+ * FIFO pseudo-muxer
+ * Copyright (c) 2016 Jan Sebechlebsky
+ *
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with FFmpeg; if not, write to the Free Software * Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include "libavutil/opt.h"
+#include "libavutil/time.h"
+#include "libavutil/thread.h"
+#include "libavutil/threadmessage.h"
+#include "avformat.h"
+#include "internal.h"
+
+#define FIFO_DEFAULT_QUEUE_SIZE              60
+#define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS   0
+#define FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC 5000000 // 5 second
+
+typedef struct FifoContext {
+    const AVClass *class;
+    AVFormatContext *avf;
+
+    char *format;
+    AVOutputFormat *oformat;
+
+    char *format_options_str;
+    AVDictionary *format_options;
+
+    int queue_size;
+    AVThreadMessageQueue *queue;
+    pthread_t writer_thread;
+
+    /* Return value of last write_trailer_call */
+    int write_trailer_ret;
+
+    /* Time to wait before next recovery attempt
+     * This can refer to the time in processed stream,
+     * or real time. */
+    int64_t recovery_wait_time;
+
+    /* Maximal number of unsuccessful successive recovery attempts */
+    int max_recovery_attempts;
+
+    /* Whether to attempt recovery from failure */
+    int attempt_recovery;
+
+    /* If >0 stream time will be used when waiting
+     * for the recovery attempt instead of real time */
+    int recovery_wait_streamtime;
+
+    /* If >0 recovery will be attempted regardless of error code
+     * (except AVERROR_EXIT, so exit request is never ignored) */
+    int recover_any_error;
+
+    /* Whether to drop packets in case the queue is full. */
+    int drop_pkts_on_overflow;
+
+    /* Whether to wait for keyframe when recovering
+     * from failure or queue overflow */
+    int restart_with_keyframe;
+
+    pthread_mutex_t overflow_flag_lock;
+    /* Value > 0 signals queue overflow */
+    volatile uint8_t overflow_flag;
+
+} FifoContext;
+
+typedef struct FifoThreadContext {
+    AVFormatContext *avf;
+
+    /* Timestamp of last failure.
+     * This is either pts in case stream time is used,
+     * or microseconds as returned by av_getttime_relative() */
+    int64_t last_recovery_ts;
+
+    /* Number of current recovery process
+     * Value > 0 means we are in recovery process */
+    int recovery_nr;
+
+    /* If > 0 all frames will be dropped until keyframe is received */
+    uint8_t drop_until_keyframe;
+
+    /* Value > 0 means that the previous write_header call was successful
+     * so finalization by calling write_trailer and ff_io_close must be done
+     * before exiting / reinitialization of underlying muxer */
+    uint8_t header_written;
+} FifoThreadContext;
+
+typedef enum FifoMessageType {
+    FIFO_WRITE_HEADER,
+    FIFO_WRITE_PACKET,
+    FIFO_FLUSH_OUTPUT
+} FifoMessageType;
+
+typedef struct FifoMessage {
+    FifoMessageType type;
+    AVPacket pkt;
+} FifoMessage;
+
+static int fifo_thread_write_header(FifoThreadContext *ctx)
+{
+    AVFormatContext *avf = ctx->avf;
+    FifoContext *fifo = avf->priv_data;
+    AVFormatContext *avf2 = fifo->avf;
+    AVDictionary *format_options = NULL;
+    int ret, i;
+
+    ret = av_dict_copy(&format_options, fifo->format_options, 0);
+    if (ret < 0)
+        return ret;
+
+    ret = ff_format_output_open(avf2, avf->filename, &format_options);
+    if (ret < 0) {
+        av_log(avf, AV_LOG_ERROR, "Error opening %s: %s\n", avf->filename,
+               av_err2str(ret));
+        goto end;
+    }
+
+    for (i = 0;i < avf2->nb_streams; i++)
+        avf2->streams[i]->cur_dts = 0;
+
+    ret = avformat_write_header(avf2, &format_options);
+    if (!ret)
+        ctx->header_written = 1;
+
+    // Check for options unrecognized by underlying muxer
+    if (format_options) {
+        AVDictionaryEntry *entry = NULL;
+        while ((entry = av_dict_get(format_options, "", entry, AV_DICT_IGNORE_SUFFIX)))
+            av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key);
+        ret = AVERROR(EINVAL);
+    }
+
+end:
+    av_dict_free(&format_options);
+    return ret;
+}
+
+static int fifo_thread_flush_output(FifoThreadContext *ctx)
+{
+    AVFormatContext *avf = ctx->avf;
+    FifoContext *fifo = avf->priv_data;
+    AVFormatContext *avf2 = fifo->avf;
+
+    return av_write_frame(avf2, NULL);
+}
+
+static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt)
+{
+    AVFormatContext *avf = ctx->avf;
+    FifoContext *fifo = avf->priv_data;
+    AVFormatContext *avf2 = fifo->avf;
+    AVRational src_tb, dst_tb;
+    int ret, s_idx;
+
+    if (ctx->drop_until_keyframe) {
+        if (pkt->flags & AV_PKT_FLAG_KEY) {
+            ctx->drop_until_keyframe = 0;
+            av_log(avf, AV_LOG_VERBOSE, "Keyframe received, recovering...\n");
+        } else {
+            av_log(avf, AV_LOG_VERBOSE, "Dropping non-keyframe packet\n");
+            av_packet_unref(pkt);
+            return 0;
+        }
+    }
+
+    s_idx = pkt->stream_index;
+    src_tb = avf->streams[s_idx]->time_base;
+    dst_tb = avf2->streams[s_idx]->time_base;
+    av_packet_rescale_ts(pkt, src_tb, dst_tb);
+
+    ret = av_write_frame(avf2, pkt);
+    if (ret >= 0)
+        av_packet_unref(pkt);
+    return ret;
+}
+
+static int fifo_thread_write_trailer(FifoThreadContext *ctx)
+{
+    AVFormatContext *avf = ctx->avf;
+    FifoContext *fifo = avf->priv_data;
+    AVFormatContext *avf2 = fifo->avf;
+    int ret;
+
+    if (!ctx->header_written)
+        return 0;
+
+    ret = av_write_trailer(avf2);
+    ff_format_io_close(avf2, &avf2->pb);
+
+    return ret;
+}
+
+static int fifo_thread_dispatch_message(FifoThreadContext *ctx, FifoMessage *msg)
+{
+    int ret;
+
+    if (!ctx->header_written) {
+        ret = fifo_thread_write_header(ctx);
+        if (ret < 0)
+            return ret;
+    }
+
+    switch(msg->type) {
+    case FIFO_WRITE_HEADER:
+        return ret;
+    case FIFO_WRITE_PACKET:
+        return fifo_thread_write_packet(ctx, &msg->pkt);
+    case FIFO_FLUSH_OUTPUT:
+        return fifo_thread_flush_output(ctx);
+    }
+
+    return AVERROR(EINVAL);
+}
+
+static int is_recoverable(const FifoContext *fifo, int err_no) {
+    if (!fifo->attempt_recovery)
+        return 0;
+
+    if (fifo->recover_any_error)
+        return err_no != AVERROR_EXIT;
+
+    switch (err_no) {
+    case AVERROR(EINVAL):
+    case AVERROR(ENOSYS):
+    case AVERROR_EOF:
+    case AVERROR_EXIT:
+    case AVERROR_PATCHWELCOME:
+        return 0;
+    default:
+        return 1;
+    }
+}
+
+static void free_message(void *msg)
+{
+    FifoMessage *fifo_msg = msg;
+
+    if (fifo_msg->type == FIFO_WRITE_PACKET)
+        av_packet_unref(&fifo_msg->pkt);
+}
+
+static int fifo_thread_process_recovery_failure(FifoThreadContext *ctx, AVPacket *pkt,
+                                                int err_no)
+{
+    AVFormatContext *avf = ctx->avf;
+    FifoContext *fifo = avf->priv_data;
+    int ret;
+
+    av_log(avf, AV_LOG_INFO, "Recovery failed: %s\n",
+           av_err2str(err_no));
+
+    if (fifo->recovery_wait_streamtime) {
+        if (pkt->pts == AV_NOPTS_VALUE)
+            av_log(avf, AV_LOG_WARNING, "Packet does not contain presentation"
+                   " timestamp, recovery will be attempted immediately");
+        ctx->last_recovery_ts = pkt->pts;
+    } else {
+        ctx->last_recovery_ts = av_gettime_relative();
+    }
+
+    if (fifo->max_recovery_attempts &&
+        ctx->recovery_nr >= fifo->max_recovery_attempts) {
+        av_log(avf, AV_LOG_ERROR,
+               "Maximal number of %d recovery attempts reached.\n",
+               fifo->max_recovery_attempts);
+        ret = err_no;
+    } else {
+        ret = AVERROR(EAGAIN);
+    }
+
+    return ret;
+}
+
+static int fifo_thread_attempt_recovery(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
+{
+    AVFormatContext *avf = ctx->avf;
+    FifoContext *fifo = avf->priv_data;
+    AVPacket *pkt = &msg->pkt;
+    int64_t time_since_recovery;
+    int ret;
+
+    if (!is_recoverable(fifo, err_no)) {
+        ret = err_no;
+        goto fail;
+    }
+
+    if (ctx->header_written) {
+        fifo->write_trailer_ret = fifo_thread_write_trailer(ctx);
+        ctx->header_written = 0;
+    }
+
+    if (!ctx->recovery_nr) {
+        ctx->last_recovery_ts = 0;
+    } else {
+        if (fifo->recovery_wait_streamtime) {
+            AVRational tb = avf->streams[pkt->stream_index]->time_base;
+            time_since_recovery = av_rescale_q(pkt->pts - ctx->last_recovery_ts,
+                                               tb, AV_TIME_BASE_Q);
+        } else {
+            time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
+        }
+
+        if (time_since_recovery < fifo->recovery_wait_time)
+            return AVERROR(EAGAIN);
+    }
+
+    ctx->recovery_nr++;
+
+    if (fifo->max_recovery_attempts) {
+        av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d/%d\n",
+               ctx->recovery_nr, fifo->max_recovery_attempts);
+    } else {
+        av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d\n",
+               ctx->recovery_nr);
+    }
+
+    if (fifo->restart_with_keyframe && fifo->drop_pkts_on_overflow)
+        ctx->drop_until_keyframe = 1;
+
+    ret = fifo_thread_dispatch_message(ctx, msg);
+    if (ret < 0) {
+        if (is_recoverable(fifo, ret)) {
+            return fifo_thread_process_recovery_failure(ctx, pkt, ret);
+        } else {
+            goto fail;
+        }
+    } else {
+        av_log(avf, AV_LOG_INFO, "Recovery successful\n");
+        ctx->recovery_nr = 0;
+    }
+
+    return 0;
+
+fail:
+    free_message(msg);
+    return ret;
+}
+
+static int fifo_thread_recover(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
+{
+    AVFormatContext *avf = ctx->avf;
+    FifoContext *fifo = avf->priv_data;
+    int ret;
+
+    do {
+        if (!fifo->recovery_wait_streamtime && ctx->recovery_nr > 0) {
+            int64_t time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
+            int64_t time_to_wait = FFMAX(0, fifo->recovery_wait_time - time_since_recovery);
+            if (time_to_wait)
+                av_usleep(FFMIN(10000, time_to_wait));
+        }
+
+        ret = fifo_thread_attempt_recovery(ctx, msg, err_no);
+    } while (ret == AVERROR(EAGAIN) && !fifo->drop_pkts_on_overflow);
+
+    if (ret == AVERROR(EAGAIN) && fifo->drop_pkts_on_overflow) {
+        if (msg->type == FIFO_WRITE_PACKET)
+            av_packet_unref(&msg->pkt);
+        ret = 0;
+    }
+
+    return ret;
+}
+
+static void *fifo_consumer_thread(void *data)
+{
+    AVFormatContext *avf = data;
+    FifoContext *fifo = avf->priv_data;
+    AVThreadMessageQueue *queue = fifo->queue;
+    FifoMessage msg = {FIFO_WRITE_HEADER, {0}};
+    int ret;
+
+    FifoThreadContext fifo_thread_ctx;
+    memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext));
+    fifo_thread_ctx.avf = avf;
+
+    while (1) {
+        uint8_t just_flushed = 0;
+
+        if (!fifo_thread_ctx.recovery_nr)
+            ret = fifo_thread_dispatch_message(&fifo_thread_ctx, &msg);
+
+        if (ret < 0 || fifo_thread_ctx.recovery_nr > 0) {
+            int rec_ret = fifo_thread_recover(&fifo_thread_ctx, &msg, ret);
+            if (rec_ret < 0) {
+                av_thread_message_queue_set_err_send(queue, rec_ret);
+                break;
+            }
+        }
+
+        /* If the queue is full at the moment when fifo_write_packet
+         * attempts to insert new message (packet) to the queue,
+         * it sets the fifo->overflow_flag to 1 and drops packet.
+         * Here in consumer thread, the flag is checked and if it is
+         * set, the queue is flushed and flag cleared. */
+        pthread_mutex_lock(&fifo->overflow_flag_lock);
+        if (fifo->overflow_flag) {
+            av_thread_message_flush(queue);
+            if (fifo->restart_with_keyframe)
+                fifo_thread_ctx.drop_until_keyframe = 1;
+            fifo->overflow_flag = 0;
+            just_flushed = 1;
+        }
+        pthread_mutex_unlock(&fifo->overflow_flag_lock);
+
+        if (just_flushed)
+            av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
+
+        ret = av_thread_message_queue_recv(queue, &msg, 0);
+        if (ret < 0) {
+            av_thread_message_queue_set_err_send(queue, ret);
+            break;
+        }
+    }
+
+    fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx);
+
+    return NULL;
+}
+
+static int fifo_mux_init(AVFormatContext *avf)
+{
+    FifoContext *fifo = avf->priv_data;
+    AVFormatContext *avf2;
+    int ret = 0, i;
+
+    ret = avformat_alloc_output_context2(&avf2, fifo->oformat, NULL, NULL);
+    if (ret < 0) {
+        return ret;
+    }
+
+    fifo->avf = avf2;
+
+    avf2->interrupt_callback = avf->interrupt_callback;
+    avf2->max_delay = avf->max_delay;
+    ret = av_dict_copy(&avf2->metadata, avf->metadata, 0);
+    if (ret < 0)
+        return ret;
+    avf2->opaque = avf->opaque;
+    avf2->io_close = avf->io_close;
+    avf2->io_open = avf->io_open;
+    avf2->flags = avf->flags;
+
+    for (i = 0; i < avf->nb_streams; ++i) {
+        AVStream *st = avformat_new_stream(avf2, NULL);
+        if (!st)
+            return AVERROR(ENOMEM);
+
+        ret = ff_stream_encode_params_copy(st, avf->streams[i]);
+        if (ret < 0)
+            return ret;
+    }
+
+    return 0;
+}
+
+static int fifo_init(AVFormatContext *avf)
+{
+    FifoContext *fifo = avf->priv_data;
+    int ret = 0;
+
+    if (fifo->recovery_wait_streamtime && !fifo->drop_pkts_on_overflow) {
+        av_log(avf, AV_LOG_ERROR, "recovery_wait_streamtime can be turned on"
+               " only when drop_pkts_on_overflow is also turned on\n");
+        return AVERROR(EINVAL);
+    }
+
+    if (fifo->format_options_str) {
+        ret = av_dict_parse_string(&fifo->format_options, fifo->format_options_str,
+                                   "=", ":", 0);
+        if (ret < 0) {
+            av_log(avf, AV_LOG_ERROR, "Could not parse format options list '%s'\n",
+                   fifo->format_options_str);
+            return ret;
+        }
+    }
+
+    fifo->oformat = av_guess_format(fifo->format, avf->filename, NULL);
+    if (!fifo->oformat) {
+        ret = AVERROR_MUXER_NOT_FOUND;
+        return ret;
+    }
+
+    ret = fifo_mux_init(avf);
+    if (ret < 0)
+        return ret;
+
+    ret = av_thread_message_queue_alloc(&fifo->queue, (unsigned) fifo->queue_size,
+                                        sizeof(FifoMessage));
+    if (ret < 0)
+        return ret;
+
+    av_thread_message_queue_set_free_func(fifo->queue, free_message);
+
+    ret = pthread_mutex_init(&fifo->overflow_flag_lock, NULL);
+    if (ret < 0)
+        return AVERROR(ret);
+
+    return 0;
+}
+
+static int fifo_write_header(AVFormatContext *avf)
+{
+    FifoContext * fifo = avf->priv_data;
+    int ret;
+
+    ret = pthread_create(&fifo->writer_thread, NULL, fifo_consumer_thread, avf);
+    if (ret) {
+        av_log(avf, AV_LOG_ERROR, "Failed to start thread: %s\n",
+               av_err2str(AVERROR(ret)));
+        ret = AVERROR(ret);
+    }
+
+    return 0;
+}
+
+static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
+{
+    FifoContext *fifo = avf->priv_data;
+    FifoMessage msg = {.type = pkt ? FIFO_WRITE_PACKET : FIFO_FLUSH_OUTPUT};
+    int ret;
+
+    if (pkt) {
+        av_init_packet(&msg.pkt);
+        ret = av_packet_ref(&msg.pkt,pkt);
+        if (ret < 0)
+            return ret;
+    }
+
+    ret = av_thread_message_queue_send(fifo->queue, &msg,
+                                       fifo->drop_pkts_on_overflow ?
+                                       AV_THREAD_MESSAGE_NONBLOCK : 0);
+    if (ret == AVERROR(EAGAIN)) {
+        uint8_t overflow_set = 0;
+
+        /* Queue is full, set fifo->overflow_flag to 1
+         * to let consumer thread know the queue should
+         * be flushed. */
+        pthread_mutex_lock(&fifo->overflow_flag_lock);
+        if (!fifo->overflow_flag)
+            fifo->overflow_flag = overflow_set = 1;
+        pthread_mutex_unlock(&fifo->overflow_flag_lock);
+
+        if (overflow_set)
+            av_log(avf, AV_LOG_WARNING, "FIFO queue full\n");
+        ret = 0;
+        goto fail;
+    } else if (ret < 0) {
+        goto fail;
+    }
+
+    return ret;
+fail:
+    if (pkt)
+        av_packet_unref(&msg.pkt);
+    return ret;
+}
+
+static int fifo_write_trailer(AVFormatContext *avf)
+{
+    FifoContext *fifo= avf->priv_data;
+    int ret;
+
+    av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF);
+
+    ret = pthread_join( fifo->writer_thread, NULL);
+    if (ret < 0) {
+        av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
+               av_err2str(AVERROR(ret)));
+        return AVERROR(ret);
+    }
+
+    ret = fifo->write_trailer_ret;
+    return ret;
+}
+
+static void fifo_deinit(AVFormatContext *avf)
+{
+    FifoContext *fifo = avf->priv_data;
+
+    if (fifo->format_options)
+        av_dict_free(&fifo->format_options);
+
+    avformat_free_context(fifo->avf);
+
+    if (fifo->queue) {
+        av_thread_message_flush(fifo->queue);
+        av_thread_message_queue_free(&fifo->queue);
+    }
+
+    pthread_mutex_destroy(&fifo->overflow_flag_lock);
+}
+
+#define OFFSET(x) offsetof(FifoContext, x)
+static const AVOption options[] = {
+        {"fifo_format", "Target muxer", OFFSET(format),
+         AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
+
+        {"queue_size", "Size of fifo queue", OFFSET(queue_size),
+         AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_QUEUE_SIZE}, 1, 1024, AV_OPT_FLAG_ENCODING_PARAM},
+
+        {"format_opts", "Options to be passed to underlying muxer", OFFSET(format_options_str),
+         AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
+
+        {"drop_pkts_on_overflow", "Drop packets on fifo queue overflow not to block encoder", OFFSET(drop_pkts_on_overflow),
+         AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
+
+        {"restart_with_keyframe", "Wait for keyframe when restarting output", OFFSET(restart_with_keyframe),
+         AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
+
+        {"attempt_recovery", "Attempt recovery in case of failure", OFFSET(attempt_recovery),
+        AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
+
+        {"max_recovery_attempts", "Maximal number of recovery attempts", OFFSET(max_recovery_attempts),
+         AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS}, 0, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
+
+        {"recovery_wait_time", "Waiting time between recovery attempts", OFFSET(recovery_wait_time),
+         AV_OPT_TYPE_DURATION, {.i64 = FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
+
+        {"recovery_wait_streamtime", "Use stream time instead of real time while waiting for recovery",
+         OFFSET(recovery_wait_streamtime), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
+
+        {"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error),
+         AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
+
+        {NULL},
+};
+
+static const AVClass fifo_muxer_class = {
+    .class_name = "Fifo muxer",
+    .item_name  = av_default_item_name,
+    .option     = options,
+    .version    = LIBAVUTIL_VERSION_INT,
+};
+
+AVOutputFormat ff_fifo_muxer = {
+    .name           = "fifo",
+    .long_name      = NULL_IF_CONFIG_SMALL("FIFO queue pseudo-muxer"),
+    .priv_data_size = sizeof(FifoContext),
+    .init           = fifo_init,
+    .write_header   = fifo_write_header,
+    .write_packet   = fifo_write_packet,
+    .write_trailer  = fifo_write_trailer,
+    .deinit         = fifo_deinit,
+    .priv_class     = &fifo_muxer_class,
+    .flags          = AVFMT_NOFILE | AVFMT_ALLOW_FLUSH | AVFMT_TS_NEGATIVE,
+};
diff --git a/libavformat/version.h b/libavformat/version.h
index 6f47a2f..647c39c 100644
--- a/libavformat/version.h
+++ b/libavformat/version.h
@@ -32,7 +32,7 @@ 
 // Major bumping may affect Ticket5467, 5421, 5451(compatibility with Chromium)
 // Also please add any ticket numbers that you belive might be affected here
 #define LIBAVFORMAT_VERSION_MAJOR  57
-#define LIBAVFORMAT_VERSION_MINOR  46
+#define LIBAVFORMAT_VERSION_MINOR  47
 #define LIBAVFORMAT_VERSION_MICRO 101
 
 #define LIBAVFORMAT_VERSION_INT AV_VERSION_INT(LIBAVFORMAT_VERSION_MAJOR, \