From patchwork Tue Aug 16 09:48:11 2016 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: sebechlebskyjan@gmail.com X-Patchwork-Id: 187 Delivered-To: ffmpegpatchwork@gmail.com Received: by 10.103.140.67 with SMTP id o64csp2023480vsd; Tue, 16 Aug 2016 02:48:40 -0700 (PDT) X-Received: by 10.28.49.198 with SMTP id x189mr5213629wmx.111.1471340919935; Tue, 16 Aug 2016 02:48:39 -0700 (PDT) Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org. [79.124.17.100]) by mx.google.com with ESMTP id m8si24522093wje.287.2016.08.16.02.48.37; Tue, 16 Aug 2016 02:48:39 -0700 (PDT) Received-SPF: pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) client-ip=79.124.17.100; Authentication-Results: mx.google.com; dkim=neutral (body hash did not verify) header.i=@gmail.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org; dmarc=fail (p=NONE dis=NONE) header.from=gmail.com Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 126B6689AF6; Tue, 16 Aug 2016 12:48:32 +0300 (EEST) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from mail-wm0-f68.google.com (mail-wm0-f68.google.com [74.125.82.68]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id 329EA689AF1 for ; Tue, 16 Aug 2016 12:48:21 +0300 (EEST) Received: by mail-wm0-f68.google.com with SMTP id q128so15444928wma.1 for ; Tue, 16 Aug 2016 02:48:23 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=from:to:cc:subject:date:message-id:in-reply-to:references; bh=8r9kd70kos1nrXrCYVZdRjXS4nBzxbTIX+I0UqEU5oE=; b=iHY4JXBWcQuN2j1sZwFkgIwQVXlpn/l8LllEAhjmtzPA2drEjVRGFtsitfGBrXYmIg 7aStDakpl24RwYX84L+VlfIvoIRRx4CUuFqj+LyjkO9y5IvM+tS2xgH764pazPsXpefD dTbsWAV2yjf8is0xHO9BHxLKs1as2Oj5Ll1XBtXDKUdDj7xjqULBmCoqWIvTH3eI8N2u UiSRgDDZ9/A6RFGmXJrosWVFdAyiMBhPJAm8ryDiB1GHAQ4UaB2eRTGJngp75nxfP6ek geVYaC3tVok0HfRacE8zdtWgjHysWG2kKrsY66JLCsMLRX1K6cOf4e9RTbHuD1ZOspqt It4Q== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=8r9kd70kos1nrXrCYVZdRjXS4nBzxbTIX+I0UqEU5oE=; b=XWz3lDBm9XfEDc3iuow6Iox/P1wE4JP9QoTwf4FmUEHKvkJdlnXEwU5P++ikviqUok pOsNitRuCCvxu4lhGZwGml61oyUOR6z8GPhZagWAfSBDuzgYhYI1Dfbx62sYZKtoF8Mz 0Yq9aqAXAYOxJFabmV2Y8v5TIcM15UsYqgppcjO5IubX6uyEPJZ9PWRqeYQDzdtHbeEH j+aFe+hzA0DgoagRH6AN8wEZjxtACFS8oO5PMSD3zTwMfUqChorOPeMzU20uTBvdKg95 JoQK5Gn8v9F/C/tUDbzUj+jsvlc2JwXt4KYlRTD9/Al/utt4IFmmp+oXvoCML2nV83Hm bYOw== X-Gm-Message-State: AEkoouv+24h41Z3pxrmHAzGmxaZlXgA0wvZAeajdypZL19syGEC/MMNtm5K2YjRXlQzaKA== X-Received: by 10.194.105.40 with SMTP id gj8mr21773880wjb.71.1471340902491; Tue, 16 Aug 2016 02:48:22 -0700 (PDT) Received: from localhost.localdomain (157.174.broadband3.iol.cz. [85.70.174.157]) by smtp.gmail.com with ESMTPSA id d8sm20908596wmi.0.2016.08.16.02.48.20 (version=TLS1_2 cipher=ECDHE-RSA-AES128-SHA bits=128/128); Tue, 16 Aug 2016 02:48:21 -0700 (PDT) From: sebechlebskyjan@gmail.com To: ffmpeg-devel@ffmpeg.org Date: Tue, 16 Aug 2016 11:48:11 +0200 Message-Id: <1471340892-23123-1-git-send-email-sebechlebskyjan@gmail.com> X-Mailer: git-send-email 1.9.1 In-Reply-To: <20160815215037.GA1530047@phare.normalesup.org> References: <20160815215037.GA1530047@phare.normalesup.org> Subject: [FFmpeg-devel] [PATCH v9 01/11] avformat: Add fifo pseudo-muxer X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.20 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Cc: Jan Sebechlebsky MIME-Version: 1.0 Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" From: Jan Sebechlebsky Signed-off-by: Jan Sebechlebsky --- Changes since the last version of the patch: - fixed "s@item" in muxers.texi - fixed second -> seconds in FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC comment - removed AVFormat *oformat from FifoContext (it is local variable now) - if recovery uses stream time to wait, last_recovery_ts is set to AV_NOPTS_VALUE, and when recovery is attempted and last_recovery_ts recovery is performed immediately - fixed stray space in pthread_join in fifo_write_trailer - fixed superfluous tests and av_thread_message_flush() in fifo_deinit - changed upper bound for queue_size option to INT_MAX Changelog | 1 + configure | 1 + doc/muxers.texi | 92 +++++++ libavformat/Makefile | 1 + libavformat/allformats.c | 1 + libavformat/fifo.c | 661 +++++++++++++++++++++++++++++++++++++++++++++++ libavformat/version.h | 2 +- 7 files changed, 758 insertions(+), 1 deletion(-) create mode 100644 libavformat/fifo.c diff --git a/Changelog b/Changelog index b903e31..c693d34 100644 --- a/Changelog +++ b/Changelog @@ -15,6 +15,7 @@ version : - True Audio (TTA) muxer - crystalizer audio filter - acrusher audio filter +- fifo muxer version 3.1: diff --git a/configure b/configure index 9b92426..894e7a9 100755 --- a/configure +++ b/configure @@ -2834,6 +2834,7 @@ dv_muxer_select="dvprofile" dxa_demuxer_select="riffdec" eac3_demuxer_select="ac3_parser" f4v_muxer_select="mov_muxer" +fifo_muxer_deps="pthreads" flac_demuxer_select="flac_parser" hds_muxer_select="flv_muxer" hls_muxer_select="mpegts_muxer" diff --git a/doc/muxers.texi b/doc/muxers.texi index 5873269..617c9a3 100644 --- a/doc/muxers.texi +++ b/doc/muxers.texi @@ -1436,6 +1436,98 @@ Specify whether to remove all fragments when finished. Default 0 (do not remove) @end table +@section fifo + +The fifo pseudo-muxer allows the separation of encoding and muxing by using +first-in-first-out queue and running the actual muxer in a separate thread. This +is especially useful in combination with the @ref{tee} muxer and can be used to +send data to several destinations with different reliability/writing speed/latency. + +The behavior of the fifo muxer if the queue fills up or if the output fails is +selectable, + +@itemize @bullet + +@item +output can be transparently restarted with configurable delay between retries +based on real time or time of the processed stream. + +@item +encoding can be blocked during temporary failure, or continue transparently +dropping packets in case fifo queue fills up. + +@end itemize + +@table @option + +@item fifo_format +Specify the format name. Useful if it cannot be guessed from the +output name suffix. + +@item queue_size +Specify size of the queue (number of packets). Default value is 60. + +@item format_opts +Specify format options for the underlying muxer. Muxer options can be specified +as a list of @var{key}=@var{value} pairs separated by ':'. + +@item drop_pkts_on_overflow @var{bool} +If set to 1 (true), in case the fifo queue fills up, packets will be dropped +rather than blocking the encoder. This allows to continue streaming without +delaying the input, at the cost of ommiting part of the stream. By default +this option is set to 0 (false), so in such cases the encoder will be blocked +until the muxer processes some of the packets and none of them is lost. + +@item attempt_recovery @var{bool} +If failure occurs, attempt to recover the output. This is especially useful +when used with network output, allows to restart streaming transparently. +By default this option set to 0 (false). + +@item max_recovery_attempts +Sets maximum number of successive unsucessful recovery attempts after which +the output fails permanently. By default this option is set to 0 (unlimited). + +@item recovery_wait_time @var{duration} +Waiting time before the next recovery attempt after previous unsuccessfull +recovery attempt. Default value is 5 seconds. + +@item recovery_wait_streamtime @var{bool} +If set to 0 (false), the real time is used when waiting for the recovery +attempt (i.e. the recovery will be attempted after at least +recovery_wait_time seconds). +If set to 1 (true), the time of the processed stream is taken into account +instead (i.e. the recovery will be attempted after at least @var{recovery_wait_time} +seconds of the stream is omitted). +By default, this option is set to 0 (false). + +@item recover_any_error @var{bool} +If set to 1 (true), recovery will be attempted regardless of type of the error +causing the failure. By default this option is set to 0 (false) and in case of +certain (usually permanent) errors the recovery is not attempted even when +@var{attempt_recovery} is set to 1. + +@item restart_with_keyframe @var{bool} +Specify whether to wait for the keyframe after recovering from +queue overflow or failure. This option is set to 0 (false) by default. + +@end table + +@subsection Examples + +@itemize + +@item +Stream something to rtmp server, continue processing the stream at real-time +rate even in case of temporary failure (network outage) and attempt to recover +streaming every second indefinitely. +@example +ffmpeg -re -i ... -c:v libx264 -c:a aac -f fifo -fifo_format flv -map 0:v -map 0:a + -drop_pkts_on_overflow 1 -attempt_recovery 1 -recovery_wait_time 1 rtmp://example.com/live/stream_name +@end example + +@end itemize + +@anchor{tee} @section tee The tee muxer can be used to write the same data to several files or any diff --git a/libavformat/Makefile b/libavformat/Makefile index fda1e17..2d2b78c 100644 --- a/libavformat/Makefile +++ b/libavformat/Makefile @@ -162,6 +162,7 @@ OBJS-$(CONFIG_FFM_DEMUXER) += ffmdec.o OBJS-$(CONFIG_FFM_MUXER) += ffmenc.o OBJS-$(CONFIG_FFMETADATA_DEMUXER) += ffmetadec.o OBJS-$(CONFIG_FFMETADATA_MUXER) += ffmetaenc.o +OBJS-$(CONFIG_FIFO_MUXER) += fifo.o OBJS-$(CONFIG_FILMSTRIP_DEMUXER) += filmstripdec.o OBJS-$(CONFIG_FILMSTRIP_MUXER) += filmstripenc.o OBJS-$(CONFIG_FLAC_DEMUXER) += flacdec.o rawdec.o \ diff --git a/libavformat/allformats.c b/libavformat/allformats.c index a69195e..6a216ef 100644 --- a/libavformat/allformats.c +++ b/libavformat/allformats.c @@ -123,6 +123,7 @@ void av_register_all(void) REGISTER_MUXER (F4V, f4v); REGISTER_MUXDEMUX(FFM, ffm); REGISTER_MUXDEMUX(FFMETADATA, ffmetadata); + REGISTER_MUXER (FIFO, fifo); REGISTER_MUXDEMUX(FILMSTRIP, filmstrip); REGISTER_MUXDEMUX(FLAC, flac); REGISTER_DEMUXER (FLIC, flic); diff --git a/libavformat/fifo.c b/libavformat/fifo.c new file mode 100644 index 0000000..859cbb4 --- /dev/null +++ b/libavformat/fifo.c @@ -0,0 +1,661 @@ +/* + * FIFO pseudo-muxer + * Copyright (c) 2016 Jan Sebechlebsky + * + * This file is part of FFmpeg. + * + * FFmpeg is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * FFmpeg is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with FFmpeg; if not, write to the Free Software * Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include "libavutil/opt.h" +#include "libavutil/time.h" +#include "libavutil/thread.h" +#include "libavutil/threadmessage.h" +#include "avformat.h" +#include "internal.h" + +#define FIFO_DEFAULT_QUEUE_SIZE 60 +#define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS 0 +#define FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC 5000000 // 5 seconds + +typedef struct FifoContext { + const AVClass *class; + AVFormatContext *avf; + + char *format; + char *format_options_str; + AVDictionary *format_options; + + int queue_size; + AVThreadMessageQueue *queue; + + pthread_t writer_thread; + + /* Return value of last write_trailer_call */ + int write_trailer_ret; + + /* Time to wait before next recovery attempt + * This can refer to the time in processed stream, + * or real time. */ + int64_t recovery_wait_time; + + /* Maximal number of unsuccessful successive recovery attempts */ + int max_recovery_attempts; + + /* Whether to attempt recovery from failure */ + int attempt_recovery; + + /* If >0 stream time will be used when waiting + * for the recovery attempt instead of real time */ + int recovery_wait_streamtime; + + /* If >0 recovery will be attempted regardless of error code + * (except AVERROR_EXIT, so exit request is never ignored) */ + int recover_any_error; + + /* Whether to drop packets in case the queue is full. */ + int drop_pkts_on_overflow; + + /* Whether to wait for keyframe when recovering + * from failure or queue overflow */ + int restart_with_keyframe; + + pthread_mutex_t overflow_flag_lock; + /* Value > 0 signals queue overflow */ + volatile uint8_t overflow_flag; + +} FifoContext; + +typedef struct FifoThreadContext { + AVFormatContext *avf; + + /* Timestamp of last failure. + * This is either pts in case stream time is used, + * or microseconds as returned by av_getttime_relative() */ + int64_t last_recovery_ts; + + /* Number of current recovery process + * Value > 0 means we are in recovery process */ + int recovery_nr; + + /* If > 0 all frames will be dropped until keyframe is received */ + uint8_t drop_until_keyframe; + + /* Value > 0 means that the previous write_header call was successful + * so finalization by calling write_trailer and ff_io_close must be done + * before exiting / reinitialization of underlying muxer */ + uint8_t header_written; +} FifoThreadContext; + +typedef enum FifoMessageType { + FIFO_WRITE_HEADER, + FIFO_WRITE_PACKET, + FIFO_FLUSH_OUTPUT +} FifoMessageType; + +typedef struct FifoMessage { + FifoMessageType type; + AVPacket pkt; +} FifoMessage; + +static int fifo_thread_write_header(FifoThreadContext *ctx) +{ + AVFormatContext *avf = ctx->avf; + FifoContext *fifo = avf->priv_data; + AVFormatContext *avf2 = fifo->avf; + AVDictionary *format_options = NULL; + int ret, i; + + ret = av_dict_copy(&format_options, fifo->format_options, 0); + if (ret < 0) + return ret; + + ret = ff_format_output_open(avf2, avf->filename, &format_options); + if (ret < 0) { + av_log(avf, AV_LOG_ERROR, "Error opening %s: %s\n", avf->filename, + av_err2str(ret)); + goto end; + } + + for (i = 0;i < avf2->nb_streams; i++) + avf2->streams[i]->cur_dts = 0; + + ret = avformat_write_header(avf2, &format_options); + if (!ret) + ctx->header_written = 1; + + // Check for options unrecognized by underlying muxer + if (format_options) { + AVDictionaryEntry *entry = NULL; + while ((entry = av_dict_get(format_options, "", entry, AV_DICT_IGNORE_SUFFIX))) + av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key); + ret = AVERROR(EINVAL); + } + +end: + av_dict_free(&format_options); + return ret; +} + +static int fifo_thread_flush_output(FifoThreadContext *ctx) +{ + AVFormatContext *avf = ctx->avf; + FifoContext *fifo = avf->priv_data; + AVFormatContext *avf2 = fifo->avf; + + return av_write_frame(avf2, NULL); +} + +static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt) +{ + AVFormatContext *avf = ctx->avf; + FifoContext *fifo = avf->priv_data; + AVFormatContext *avf2 = fifo->avf; + AVRational src_tb, dst_tb; + int ret, s_idx; + + if (ctx->drop_until_keyframe) { + if (pkt->flags & AV_PKT_FLAG_KEY) { + ctx->drop_until_keyframe = 0; + av_log(avf, AV_LOG_VERBOSE, "Keyframe received, recovering...\n"); + } else { + av_log(avf, AV_LOG_VERBOSE, "Dropping non-keyframe packet\n"); + av_packet_unref(pkt); + return 0; + } + } + + s_idx = pkt->stream_index; + src_tb = avf->streams[s_idx]->time_base; + dst_tb = avf2->streams[s_idx]->time_base; + av_packet_rescale_ts(pkt, src_tb, dst_tb); + + ret = av_write_frame(avf2, pkt); + if (ret >= 0) + av_packet_unref(pkt); + return ret; +} + +static int fifo_thread_write_trailer(FifoThreadContext *ctx) +{ + AVFormatContext *avf = ctx->avf; + FifoContext *fifo = avf->priv_data; + AVFormatContext *avf2 = fifo->avf; + int ret; + + if (!ctx->header_written) + return 0; + + ret = av_write_trailer(avf2); + ff_format_io_close(avf2, &avf2->pb); + + return ret; +} + +static int fifo_thread_dispatch_message(FifoThreadContext *ctx, FifoMessage *msg) +{ + int ret; + + if (!ctx->header_written) { + ret = fifo_thread_write_header(ctx); + if (ret < 0) + return ret; + } + + switch(msg->type) { + case FIFO_WRITE_HEADER: + return ret; + case FIFO_WRITE_PACKET: + return fifo_thread_write_packet(ctx, &msg->pkt); + case FIFO_FLUSH_OUTPUT: + return fifo_thread_flush_output(ctx); + } + + return AVERROR(EINVAL); +} + +static int is_recoverable(const FifoContext *fifo, int err_no) { + if (!fifo->attempt_recovery) + return 0; + + if (fifo->recover_any_error) + return err_no != AVERROR_EXIT; + + switch (err_no) { + case AVERROR(EINVAL): + case AVERROR(ENOSYS): + case AVERROR_EOF: + case AVERROR_EXIT: + case AVERROR_PATCHWELCOME: + return 0; + default: + return 1; + } +} + +static void free_message(void *msg) +{ + FifoMessage *fifo_msg = msg; + + if (fifo_msg->type == FIFO_WRITE_PACKET) + av_packet_unref(&fifo_msg->pkt); +} + +static int fifo_thread_process_recovery_failure(FifoThreadContext *ctx, AVPacket *pkt, + int err_no) +{ + AVFormatContext *avf = ctx->avf; + FifoContext *fifo = avf->priv_data; + int ret; + + av_log(avf, AV_LOG_INFO, "Recovery failed: %s\n", + av_err2str(err_no)); + + if (fifo->recovery_wait_streamtime) { + if (pkt->pts == AV_NOPTS_VALUE) + av_log(avf, AV_LOG_WARNING, "Packet does not contain presentation" + " timestamp, recovery will be attempted immediately"); + ctx->last_recovery_ts = pkt->pts; + } else { + ctx->last_recovery_ts = av_gettime_relative(); + } + + if (fifo->max_recovery_attempts && + ctx->recovery_nr >= fifo->max_recovery_attempts) { + av_log(avf, AV_LOG_ERROR, + "Maximal number of %d recovery attempts reached.\n", + fifo->max_recovery_attempts); + ret = err_no; + } else { + ret = AVERROR(EAGAIN); + } + + return ret; +} + +static int fifo_thread_attempt_recovery(FifoThreadContext *ctx, FifoMessage *msg, int err_no) +{ + AVFormatContext *avf = ctx->avf; + FifoContext *fifo = avf->priv_data; + AVPacket *pkt = &msg->pkt; + int64_t time_since_recovery; + int ret; + + if (!is_recoverable(fifo, err_no)) { + ret = err_no; + goto fail; + } + + if (ctx->header_written) { + fifo->write_trailer_ret = fifo_thread_write_trailer(ctx); + ctx->header_written = 0; + } + + if (!ctx->recovery_nr) { + ctx->last_recovery_ts = fifo->recovery_wait_streamtime ? + AV_NOPTS_VALUE : 0; + } else { + if (fifo->recovery_wait_streamtime) { + if (ctx->last_recovery_ts == AV_NOPTS_VALUE) { + AVRational tb = avf->streams[pkt->stream_index]->time_base; + time_since_recovery = av_rescale_q(pkt->pts - ctx->last_recovery_ts, + tb, AV_TIME_BASE_Q); + } else { + /* Enforce recovery immediately */ + time_since_recovery = fifo->recovery_wait_time; + } + } else { + time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts; + } + + if (time_since_recovery < fifo->recovery_wait_time) + return AVERROR(EAGAIN); + } + + ctx->recovery_nr++; + + if (fifo->max_recovery_attempts) { + av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d/%d\n", + ctx->recovery_nr, fifo->max_recovery_attempts); + } else { + av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d\n", + ctx->recovery_nr); + } + + if (fifo->restart_with_keyframe && fifo->drop_pkts_on_overflow) + ctx->drop_until_keyframe = 1; + + ret = fifo_thread_dispatch_message(ctx, msg); + if (ret < 0) { + if (is_recoverable(fifo, ret)) { + return fifo_thread_process_recovery_failure(ctx, pkt, ret); + } else { + goto fail; + } + } else { + av_log(avf, AV_LOG_INFO, "Recovery successful\n"); + ctx->recovery_nr = 0; + } + + return 0; + +fail: + free_message(msg); + return ret; +} + +static int fifo_thread_recover(FifoThreadContext *ctx, FifoMessage *msg, int err_no) +{ + AVFormatContext *avf = ctx->avf; + FifoContext *fifo = avf->priv_data; + int ret; + + do { + if (!fifo->recovery_wait_streamtime && ctx->recovery_nr > 0) { + int64_t time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts; + int64_t time_to_wait = FFMAX(0, fifo->recovery_wait_time - time_since_recovery); + if (time_to_wait) + av_usleep(FFMIN(10000, time_to_wait)); + } + + ret = fifo_thread_attempt_recovery(ctx, msg, err_no); + } while (ret == AVERROR(EAGAIN) && !fifo->drop_pkts_on_overflow); + + if (ret == AVERROR(EAGAIN) && fifo->drop_pkts_on_overflow) { + if (msg->type == FIFO_WRITE_PACKET) + av_packet_unref(&msg->pkt); + ret = 0; + } + + return ret; +} + +static void *fifo_consumer_thread(void *data) +{ + AVFormatContext *avf = data; + FifoContext *fifo = avf->priv_data; + AVThreadMessageQueue *queue = fifo->queue; + FifoMessage msg = {FIFO_WRITE_HEADER, {0}}; + int ret; + + FifoThreadContext fifo_thread_ctx; + memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext)); + fifo_thread_ctx.avf = avf; + + while (1) { + uint8_t just_flushed = 0; + + if (!fifo_thread_ctx.recovery_nr) + ret = fifo_thread_dispatch_message(&fifo_thread_ctx, &msg); + + if (ret < 0 || fifo_thread_ctx.recovery_nr > 0) { + int rec_ret = fifo_thread_recover(&fifo_thread_ctx, &msg, ret); + if (rec_ret < 0) { + av_thread_message_queue_set_err_send(queue, rec_ret); + break; + } + } + + /* If the queue is full at the moment when fifo_write_packet + * attempts to insert new message (packet) to the queue, + * it sets the fifo->overflow_flag to 1 and drops packet. + * Here in consumer thread, the flag is checked and if it is + * set, the queue is flushed and flag cleared. */ + pthread_mutex_lock(&fifo->overflow_flag_lock); + if (fifo->overflow_flag) { + av_thread_message_flush(queue); + if (fifo->restart_with_keyframe) + fifo_thread_ctx.drop_until_keyframe = 1; + fifo->overflow_flag = 0; + just_flushed = 1; + } + pthread_mutex_unlock(&fifo->overflow_flag_lock); + + if (just_flushed) + av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n"); + + ret = av_thread_message_queue_recv(queue, &msg, 0); + if (ret < 0) { + av_thread_message_queue_set_err_send(queue, ret); + break; + } + } + + fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx); + + return NULL; +} + +static int fifo_mux_init(AVFormatContext *avf, AVOutputFormat *oformat) +{ + FifoContext *fifo = avf->priv_data; + AVFormatContext *avf2; + int ret = 0, i; + + ret = avformat_alloc_output_context2(&avf2, oformat, NULL, NULL); + if (ret < 0) { + return ret; + } + + fifo->avf = avf2; + + avf2->interrupt_callback = avf->interrupt_callback; + avf2->max_delay = avf->max_delay; + ret = av_dict_copy(&avf2->metadata, avf->metadata, 0); + if (ret < 0) + return ret; + avf2->opaque = avf->opaque; + avf2->io_close = avf->io_close; + avf2->io_open = avf->io_open; + avf2->flags = avf->flags; + + for (i = 0; i < avf->nb_streams; ++i) { + AVStream *st = avformat_new_stream(avf2, NULL); + if (!st) + return AVERROR(ENOMEM); + + ret = ff_stream_encode_params_copy(st, avf->streams[i]); + if (ret < 0) + return ret; + } + + return 0; +} + +static int fifo_init(AVFormatContext *avf) +{ + FifoContext *fifo = avf->priv_data; + AVOutputFormat *oformat; + int ret = 0; + + if (fifo->recovery_wait_streamtime && !fifo->drop_pkts_on_overflow) { + av_log(avf, AV_LOG_ERROR, "recovery_wait_streamtime can be turned on" + " only when drop_pkts_on_overflow is also turned on\n"); + return AVERROR(EINVAL); + } + + if (fifo->format_options_str) { + ret = av_dict_parse_string(&fifo->format_options, fifo->format_options_str, + "=", ":", 0); + if (ret < 0) { + av_log(avf, AV_LOG_ERROR, "Could not parse format options list '%s'\n", + fifo->format_options_str); + return ret; + } + } + + oformat = av_guess_format(fifo->format, avf->filename, NULL); + if (!oformat) { + ret = AVERROR_MUXER_NOT_FOUND; + return ret; + } + + ret = fifo_mux_init(avf, oformat); + if (ret < 0) + return ret; + + ret = av_thread_message_queue_alloc(&fifo->queue, (unsigned) fifo->queue_size, + sizeof(FifoMessage)); + if (ret < 0) + return ret; + + av_thread_message_queue_set_free_func(fifo->queue, free_message); + + ret = pthread_mutex_init(&fifo->overflow_flag_lock, NULL); + if (ret < 0) + return AVERROR(ret); + + return 0; +} + +static int fifo_write_header(AVFormatContext *avf) +{ + FifoContext * fifo = avf->priv_data; + int ret; + + ret = pthread_create(&fifo->writer_thread, NULL, fifo_consumer_thread, avf); + if (ret) { + av_log(avf, AV_LOG_ERROR, "Failed to start thread: %s\n", + av_err2str(AVERROR(ret))); + ret = AVERROR(ret); + } + + return 0; +} + +static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt) +{ + FifoContext *fifo = avf->priv_data; + FifoMessage msg = {.type = pkt ? FIFO_WRITE_PACKET : FIFO_FLUSH_OUTPUT}; + int ret; + + if (pkt) { + av_init_packet(&msg.pkt); + ret = av_packet_ref(&msg.pkt,pkt); + if (ret < 0) + return ret; + } + + ret = av_thread_message_queue_send(fifo->queue, &msg, + fifo->drop_pkts_on_overflow ? + AV_THREAD_MESSAGE_NONBLOCK : 0); + if (ret == AVERROR(EAGAIN)) { + uint8_t overflow_set = 0; + + /* Queue is full, set fifo->overflow_flag to 1 + * to let consumer thread know the queue should + * be flushed. */ + pthread_mutex_lock(&fifo->overflow_flag_lock); + if (!fifo->overflow_flag) + fifo->overflow_flag = overflow_set = 1; + pthread_mutex_unlock(&fifo->overflow_flag_lock); + + if (overflow_set) + av_log(avf, AV_LOG_WARNING, "FIFO queue full\n"); + ret = 0; + goto fail; + } else if (ret < 0) { + goto fail; + } + + return ret; +fail: + if (pkt) + av_packet_unref(&msg.pkt); + return ret; +} + +static int fifo_write_trailer(AVFormatContext *avf) +{ + FifoContext *fifo= avf->priv_data; + int ret; + + av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF); + + ret = pthread_join(fifo->writer_thread, NULL); + if (ret < 0) { + av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n", + av_err2str(AVERROR(ret))); + return AVERROR(ret); + } + + ret = fifo->write_trailer_ret; + return ret; +} + +static void fifo_deinit(AVFormatContext *avf) +{ + FifoContext *fifo = avf->priv_data; + + av_dict_free(&fifo->format_options); + avformat_free_context(fifo->avf); + av_thread_message_queue_free(&fifo->queue); + pthread_mutex_destroy(&fifo->overflow_flag_lock); +} + +#define OFFSET(x) offsetof(FifoContext, x) +static const AVOption options[] = { + {"fifo_format", "Target muxer", OFFSET(format), + AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM}, + + {"queue_size", "Size of fifo queue", OFFSET(queue_size), + AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_QUEUE_SIZE}, 1, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM}, + + {"format_opts", "Options to be passed to underlying muxer", OFFSET(format_options_str), + AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM}, + + {"drop_pkts_on_overflow", "Drop packets on fifo queue overflow not to block encoder", OFFSET(drop_pkts_on_overflow), + AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM}, + + {"restart_with_keyframe", "Wait for keyframe when restarting output", OFFSET(restart_with_keyframe), + AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM}, + + {"attempt_recovery", "Attempt recovery in case of failure", OFFSET(attempt_recovery), + AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM}, + + {"max_recovery_attempts", "Maximal number of recovery attempts", OFFSET(max_recovery_attempts), + AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS}, 0, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM}, + + {"recovery_wait_time", "Waiting time between recovery attempts", OFFSET(recovery_wait_time), + AV_OPT_TYPE_DURATION, {.i64 = FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM}, + + {"recovery_wait_streamtime", "Use stream time instead of real time while waiting for recovery", + OFFSET(recovery_wait_streamtime), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM}, + + {"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error), + AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM}, + + {NULL}, +}; + +static const AVClass fifo_muxer_class = { + .class_name = "Fifo muxer", + .item_name = av_default_item_name, + .option = options, + .version = LIBAVUTIL_VERSION_INT, +}; + +AVOutputFormat ff_fifo_muxer = { + .name = "fifo", + .long_name = NULL_IF_CONFIG_SMALL("FIFO queue pseudo-muxer"), + .priv_data_size = sizeof(FifoContext), + .init = fifo_init, + .write_header = fifo_write_header, + .write_packet = fifo_write_packet, + .write_trailer = fifo_write_trailer, + .deinit = fifo_deinit, + .priv_class = &fifo_muxer_class, + .flags = AVFMT_NOFILE | AVFMT_ALLOW_FLUSH | AVFMT_TS_NEGATIVE, +}; diff --git a/libavformat/version.h b/libavformat/version.h index 6f47a2f..647c39c 100644 --- a/libavformat/version.h +++ b/libavformat/version.h @@ -32,7 +32,7 @@ // Major bumping may affect Ticket5467, 5421, 5451(compatibility with Chromium) // Also please add any ticket numbers that you belive might be affected here #define LIBAVFORMAT_VERSION_MAJOR 57 -#define LIBAVFORMAT_VERSION_MINOR 46 +#define LIBAVFORMAT_VERSION_MINOR 47 #define LIBAVFORMAT_VERSION_MICRO 101 #define LIBAVFORMAT_VERSION_INT AV_VERSION_INT(LIBAVFORMAT_VERSION_MAJOR, \