From patchwork Mon Apr 9 10:39:01 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Patchwork-Submitter: Bodecs Bela X-Patchwork-Id: 8364 Delivered-To: ffmpegpatchwork@gmail.com Received: by 10.2.1.70 with SMTP id c67csp2293122jad; Mon, 9 Apr 2018 03:39:10 -0700 (PDT) X-Google-Smtp-Source: AIpwx488espdjJkRD9TqE8L4bHcE7ohNlibvKuRIwFMt1l96sGKVwp5c5PLP7OzRpBulYAFGDHWR X-Received: by 10.28.163.5 with SMTP id m5mr21179010wme.33.1523270350891; Mon, 09 Apr 2018 03:39:10 -0700 (PDT) ARC-Seal: i=1; a=rsa-sha256; t=1523270350; cv=none; d=google.com; s=arc-20160816; b=X/Lf29OpkUrRy8ThJ0WWhCUl0+EQ8UiMBiN6CK+thFuBPxgGmqtXovMpZ5Bkyx6Sa1 m3drjRIljzepGI9g9/IaT+jZlfAr5PoXwUBhvYp/kc482o2QABfs2JcoEZywBgOlCYSU BmpO8/EHFb+q0kwTzKw+lGVGMhcnHda7M72tPAFVQtvLwV8Wef2oGPpI9OFJBO9C6VG7 Kn95dfc6OCVcclZAqY8uqYUenpMRe5JreT0vQjM2Djwl/IBGFAs6zDa/z4LVyFstIN9u fozzZk/X+YSvwQ1IDaTvm3WEWClWNDCFr7OEVra7u4ivfI7XwrhwuJsx9xg35gqKakHC +asg== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:reply-to:list-subscribe:list-help:list-post :list-archive:list-unsubscribe:list-id:precedence:subject :content-language:in-reply-to:mime-version:user-agent:date :message-id:from:references:to:delivered-to :arc-authentication-results; bh=nRbIN4iubhDvyre2lcC0pEDaElxWRZ9KFYj06vp02U0=; b=M9UocoJno4GLmo0IcdcDgI4V+peNoPpf+F4kUfy6r5K/OuVxyVl/ADiGj9dvT1Z9Kt tltm7ezxTFLkXKmQzOBN1vXJnRTKk+gMJWD16W4pOOHzQhkedr3kkhSPf0MGTKtZnzOJ IumFflrVSjg5Wt/A4CZMi2LeGJDbIDAtYYJ0sLa110ZDXWg9mzG4KqwRDrEy1RaiqedQ oSsWRjBW4A4JstFjlDrwVlSa3ZBMGWlWAQ06mjN7YMi8l1rd6EAYlRCNC18s5gOfztrM 4+RrwrtzerVGAM2qPNICG6ixzYRmBN/zobVLobS8WJMMBDOi9mQGl9x81P9XzfqhOy06 l+WA== ARC-Authentication-Results: i=1; mx.google.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org. [79.124.17.100]) by mx.google.com with ESMTP id b46si42184wra.81.2018.04.09.03.39.09; Mon, 09 Apr 2018 03:39:10 -0700 (PDT) Received-SPF: pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) client-ip=79.124.17.100; Authentication-Results: mx.google.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id B3FC3689E29; Mon, 9 Apr 2018 13:38:44 +0300 (EEST) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from mail-xsmtp2.externet.hu (mail-xsmtp2.externet.hu [212.40.96.153]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id C9A12689C72 for ; Mon, 9 Apr 2018 13:38:38 +0300 (EEST) Received: (qmail 2152 invoked from network); 9 Apr 2018 10:39:00 -0000 Received: from mail.officeline.hu (HELO ?10.0.0.10?) (bodecsb@vivanet.hu@217.173.32.91) by 0 with ESMTPA; 9 Apr 2018 10:39:00 -0000 To: ffmpeg-devel@ffmpeg.org References: <20180405223918.GD20131@michaelspb> <65a230ef-92bb-ae6c-ba3e-e4f16ad1bbd9@vivanet.hu> <20180408215758.GK20131@michaelspb> From: Bodecs Bela Message-ID: <48cd4265-c626-fc43-196c-844447f16884@vivanet.hu> Date: Mon, 9 Apr 2018 12:39:01 +0200 User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64; rv:52.0) Gecko/20100101 Thunderbird/52.7.0 MIME-Version: 1.0 In-Reply-To: <20180408215758.GK20131@michaelspb> Content-Language: hu X-Content-Filtered-By: Mailman/MimeDel 2.1.20 Subject: Re: [FFmpeg-devel] [PATCH] [RFC]doc/examples: alternative input handler X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.20 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" 2018.04.08. 23:57 keltezéssel, Michael Niedermayer írta: > On Sun, Apr 08, 2018 at 05:27:56PM +0200, Bodecs Bela wrote: >> >> 2018.04.06. 0:39 keltezéssel, Michael Niedermayer írta: >>> On Fri, Mar 30, 2018 at 02:47:25PM +0200, Bodecs Bela wrote: >>>> Hi All, >>>> >>>> regularly, on different forums and mailing lists a requirement popups for a >>>> feature to automatically failover switching between main input and a >>>> secondary input in case of main input unavailability. >>>> >>>> The base motivation: let's say you have a unreliable live stream source and >>>> you >>>> want to transcode its video and audio streams in realtime but you >>>> want to survive the ocasions when the source is unavailable. So use a >>>> secondary live source but the transition should occur seamlessly without >>>> breaking/re-starting the transcoding processs. >>>> >>>> Some days ago there was a discussion on devel-irc about this topic and we >>>> concluded that this feature is not feasible inside ffmpeg without "hacking", >>>> but a separate client app could do this. >>>> >>>> So I created this example app to handle two separate input sources and >>>> switching realtime between them. I am not sure wheter it should be inside >>>> the tools subdir. >>>> >>>> The detailed description is available in the header section of the source >>>> file. >>>> >>>> I will appretiate your suggestions about it. >>>> >>>> Thank you in advance. >>>> >>>> best, >>>> >>>> Bela Bodecs >>>> >>>> >>>> configure | 2 >>>> doc/examples/Makefile | 1 >>>> doc/examples/Makefile.example | 1 >>>> doc/examples/alternative_input.c | 1233 +++++++++++++++++++++++++++++++++++++++ >>> You may want to add yourself to MAINTAINERS, so it is not unmaintained >> ok I have checked the MAINAINERS file but curretly there is no Examples section, I have created it. >>> I think this is complex enough that it needs a maintainer >>> >> May I take your response as you agree to inlcude this as an example app? > iam fine with either > > >> >> >>> [...] >>> >>>> +static int open_single_input(int input_index) >>>> +{ >>>> + int ret, i; >>>> + AVInputFormat *input_format = NULL; >>>> + AVDictionary * input_options = NULL; >>>> + AVFormatContext * input_fmt_ctx = NULL; >>>> + >>>> + if (app_ctx.input_format_names[input_index]) { >>>> + if (!(input_format = av_find_input_format(app_ctx.input_format_names[input_index]))) { >>>> + timed_log(AV_LOG_ERROR, "Input #%d Unknown input format: '%s'\n", input_index, >>>> + app_ctx.input_format_names[input_index]); >>>> + return EINVAL; >>>> + } >>>> + } >>>> + >>>> + av_dict_set(&input_options, "rw_timeout", "2000000", 0); >>>> + av_dict_set(&input_options, "timeout", "2000", 0); >>>> + if ((app_ctx.input_fmt_ctx[input_index] = avformat_alloc_context()) < 0) >>>> + return AVERROR(ENOMEM); >>> i guess this was intended to be "!= NULL" >> yes, I will fix it >>> also you are mixing EINVAL with AVERROR(ENOMEM) these arent compatible. >>> Either all should be AVERROR or none >> ok, should I will modify EIVAL all of them to AVERROR(EINVAL)? > yes > > thx > > [...] > > Here is an updated version. bb > _______________________________________________ > ffmpeg-devel mailing list > ffmpeg-devel@ffmpeg.org > http://ffmpeg.org/mailman/listinfo/ffmpeg-devel From 7f23ad48dac2901b4eb1e7b40a90d716d5d54733 Mon Sep 17 00:00:00 2001 From: Bela Bodecs Date: Fri, 30 Mar 2018 14:30:25 +0200 Subject: [PATCH] [RFC]doc/examples: alternative input handler API utility for automatic failover switching between main input and secondary input in case of input unavailability. Motivation: let's say you have a unreliable live stream source and you want totranscode its first video and audio stream in realtime but you want to survive the ocasions when the source is unavailable. So use a secondary live source but the transition should occur seamlessly without breaking/re-starting the transcoding processs. Signed-off-by: Bela Bodecs --- MAINTAINERS | 2 +- configure | 2 + doc/examples/Makefile | 1 + doc/examples/Makefile.example | 1 + doc/examples/alternative_input.c | 1340 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 1345 insertions(+), 1 deletion(-) create mode 100644 doc/examples/alternative_input.c diff --git a/MAINTAINERS b/MAINTAINERS index 3c54ad6..f411d56 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -45,7 +45,7 @@ presets Robert Swain metadata subsystem Aurelien Jacobs release management Michael Niedermayer API tests Ludmila Glinskih - +examples Béla Bödecs Communication ============= diff --git a/configure b/configure index 0c5ed07..5585c60 100755 --- a/configure +++ b/configure @@ -1529,6 +1529,7 @@ EXAMPLE_LIST=" transcoding_example vaapi_encode_example vaapi_transcode_example + alternative_input_example " EXTERNAL_AUTODETECT_LIBRARY_LIST=" @@ -3337,6 +3338,7 @@ transcode_aac_example_deps="avcodec avformat swresample" transcoding_example_deps="avfilter avcodec avformat avutil" vaapi_encode_example_deps="avcodec avutil h264_vaapi_encoder" vaapi_transcode_example_deps="avcodec avformat avutil h264_vaapi_encoder" +alternative_input_example_deps="avfilter avcodec avformat avutil" # EXTRALIBS_LIST cpu_init_extralibs="pthreads_extralibs" diff --git a/doc/examples/Makefile b/doc/examples/Makefile index 928ff30..3c50eca 100644 --- a/doc/examples/Makefile +++ b/doc/examples/Makefile @@ -21,6 +21,7 @@ EXAMPLES-$(CONFIG_TRANSCODE_AAC_EXAMPLE) += transcode_aac EXAMPLES-$(CONFIG_TRANSCODING_EXAMPLE) += transcoding EXAMPLES-$(CONFIG_VAAPI_ENCODE_EXAMPLE) += vaapi_encode EXAMPLES-$(CONFIG_VAAPI_TRANSCODE_EXAMPLE) += vaapi_transcode +EXAMPLES-$(CONFIG_ALTERNATIVE_INPUT_EXAMPLE) += alternative_input EXAMPLES := $(EXAMPLES-yes:%=doc/examples/%$(PROGSSUF)$(EXESUF)) EXAMPLES_G := $(EXAMPLES-yes:%=doc/examples/%$(PROGSSUF)_g$(EXESUF)) diff --git a/doc/examples/Makefile.example b/doc/examples/Makefile.example index 6428154..937d266 100644 --- a/doc/examples/Makefile.example +++ b/doc/examples/Makefile.example @@ -30,6 +30,7 @@ EXAMPLES= avio_dir_cmd \ scaling_video \ transcode_aac \ transcoding \ + alternative_input \ OBJS=$(addsuffix .o,$(EXAMPLES)) diff --git a/doc/examples/alternative_input.c b/doc/examples/alternative_input.c new file mode 100644 index 0000000..f280e35 --- /dev/null +++ b/doc/examples/alternative_input.c @@ -0,0 +1,1340 @@ +/* + * Copyright (c) 2018 Bodecs Bela + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +/** + * @file + * API utility for automatic failover switching between main input and secondary + * input in case of input unavailability + * @example alternative_input.c + */ + +/** + * Motivation: let's say you have a unreliable live stream source and you want to + * transcode its first video and audio stream in realtime + * but you want to survive the ocasions when + * the source is unavailable. So use a secondary live source but + * the transition should occur seamlessly without breaking/re-starting + * the transcoding processs + * + * You may have a main source as an flv format rtmp:/// or + * an mpegts format udp:/// or a + * hls format http:///stream.m3u8 or whatever similar. + * + * Your original ffmpeg command line may look like this: + * ffmpeg -f -i -map 0:v:0 -map 0:a:0 + * -c:v x264 -s 640x360 -r 25 -pix_fmt yuv420p + * -c:a aac -ac 2 -ar 44100 + * -f hls out.m3u8 + * + * Should the source is unavailable you may want to use a secondary source to show a + * color-bar screen with a silent audio. To achive this we virtually cut into + * two halves your original ffmpeg command and insert alternative_input handler + * between them. + * + * Here is the modified output handler command line: (command#1) + * ffmpeg -y -f nut -listen 1 -i unix:output.unix + * -c:v x264 -s 640x360 -r 25 -pix_fmt yuv420p + * -c:a aac -ac 2 -ar 44100 + * -f hls out.m3u8 + * + * here is the modified main input producer command line: (command#2) + * ffmpeg -y -f -i -map 0:v:0 -map 0:a:0 + * -c:v rawvideo -s 640x360 -r 25 -pix_fmt yuv420p + * -c:a pcm_s32le -ac 2 -ar 44100 + * -write_index 0 -f nut -listen 1 unix:input_main.unix + * + * here is the secondary input producer command line: (command#3) + * ffmpeg -y -re -f lavfi + * -i "aevalsrc=exprs=0:nb_samples=1024:sample_rate=44100:channel_layout=stereo, \ + * aformat=sample_fmts=s32" + * -re -f lavfi -i "smptehdbars=size=640x360:rate=25, format=pix_fmts=yuv420p" + * -c:v rawvideo -c:a pcm_s32le + * -map 1 -map 0 + * -write_index 0 -f nut -listen 1 unix:input_second.unix + * + * and finally the alternative input handler command line: (command#4) + * alternative_input -im unix:input_main.unix -ifm nut + * -is unix:input_second.unix -ifs nut + * -o unix:output.unix -of nut + * -timeout 150 + * + * How to test: + * start modified output handler (command#1), then in a separate window + * start alternative input handler (command#4), then in a separate window + * start main input producer (command#2) and then in a separate window + * start secondary input producer (command#3). You will get on the output + * of output handler the main input. Now stop main input producer + * eg. by pressing q in its window. Now you get the secondary source + * (smpt-colorbars on screen and silence as audio) on the output of output + * handler Now, start the main input producer again. After successfull start + * you will get on the output of output handler the main input again. + * + * some suggestions: + * - use long analyze duration (-analyzeduration 10000000) option + * on main input to reliably collect all input info + * - all corresponding elementary streams on inputs of alternative + * input handler must have matching properties regarding + * stream type, pix format, pix size, audio sample rate + * - expected input format of alternative input handler is always + * intra only video and audio format is pcm_s32le + * - elementary stream number is unlimited in inputs + * - on beginning first start output handler, then alternative input handler, + * then main input and then secondary input because alternative input handler + * will stop immediatly if output is not writeable but try to open + * inputs continously + * - at beginning no output will be produced as long as both of + * main and second input are not opened + * - alternative input handler output video codec is rawvideo and + * output audio codec is pcm_s32le + * - nut muxer/demuxer format was tested successfully for output/input, + * other format may work (e.g. avi with their limitations) + * nut has a new option to make lower format overhead: -syncpoints none + * - only unix protocol was tested successfully for input/output + * - unavailable input will be tested for re-opening in each 1000 ms, even + * the secondary input as well + * - should the main input is avalailable again the switching back occurs + * + * + * Description of command line parameters of alternative input handler: + * -im url of primary/main input + * -ifm (optional) format name of primary input + * -is url of secondary input + * -ifs (optional) format name of secondary input + * -o url of output + * -of (optional) output format name + * -timeout (optional) if main input is not available for this time period, + * switching to the second input will occur (default value 100ms), + * value expressed in milliseconds + * -loglevel (optional) info|debug|warning|error (default level is info) + * -dsc (optional) internally inputs are consumed in real time fashion, + * if data may arrive quicker than relatime according to incoming timestamps, + * reading will be slow down. If consecutive timestamps differ more + * than this threshold value, then input data will be treated as discontinued. + * Value expressed in microseconds, default value is 3000000 + * -sf (optional) path of state file to write + * + * + * State file structure + * o:?:z:y 0:x 1:x + * + * There are 3 groups of data, separated by space: output, main input, second input + * + * ?: index of current input switched to output: _ - none (at starting), 0 - main, 1 - secondary + * z: number of failover switches + * y: current output state period length since last input source switching in milliseconds + * x: status of input: 1 - ok, 0 - error + * + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// how often try to re-open input in case of failover +#define INPUT_TRYING_INTERVAL_USEC 1000000 +#define DEFAULT_INPUT_TIMEOUT_MSEC 100 +#define DEFAULT_LOG_LEVEL AV_LOG_INFO +#define MAIN_INPUT_INDEX 0 +#define SECOND_INPUT_INDEX 1 +#define NB_INPUTS 2 +#define DEFAULT_INPUT_STREAM_TIME_DISCONTINUITY_THRESHOLD_US 3000000 +#define DEFAULT_OUTPUT_AUDIO_CODEC_NAME "pcm_s32le" +#define DEFAULT_EXPECTED_AUDIO_SAMPLE_FORMAT AV_SAMPLE_FMT_S32 +#define DEFAULT_OUTPUT_VIDEO_CODEC_NAME "rawvideo" + +typedef struct InputStreamStatData { + // these field are written/read by input handler threads + int64_t first_pts; // pts of first encoded active input's frame since the last open in its own input stream timebase + int64_t nb_frames; // nb of forwarded/encoded frames of current active input +} InputStreamStatData; + + +typedef struct OutputStreamStatData { + int64_t last_pts; // last encoded output frame end pts (pts + dur) in output stream timebase + int64_t first_pts; + int64_t pts_delta; // to adjust by this value the encoded frames pts in output stream timebase + int64_t nb_frames; // total output frames +} OutputStreamStatData; + + +typedef struct AppContext { + char *input_filenames[NB_INPUTS]; // e.g. "unix:doc/examples/input_main.unix"; + char *input_format_names[NB_INPUTS]; // e.g "nut" + + AVCodecContext **dec_ctx[NB_INPUTS]; // infinitely many streams in each input + AVFormatContext *input_fmt_ctx[NB_INPUTS]; + + char *output_filename; + char *output_format_name; + AVCodecContext **enc_ctx; // infinitely many streams as in input + AVFormatContext *output_fmt_ctx; + + InputStreamStatData *input_stream_data; + OutputStreamStatData *output_stream_data; + + int input_failover_counter; // main->second switchings + + pthread_mutex_t encoder_mutex; + int thread_id[NB_INPUTS]; + int input_timeout_ms; + int input_stream_time_discnt_thrshd_us; + int64_t start; // start wallclock time of this program + int64_t current_source_index_state_time; + + volatile sig_atomic_t input_source_index; + volatile sig_atomic_t to_exit; + volatile sig_atomic_t input_has_new_frame[NB_INPUTS]; + + char * state_file; + pthread_t input_threads[NB_INPUTS]; // each input has its own reading thread + +} AppContext; + + + +static AppContext app_ctx = { {NULL, NULL}, {NULL, NULL}, {NULL, NULL}, {NULL, NULL}, + NULL, NULL, NULL, NULL, NULL, NULL, + 0, PTHREAD_MUTEX_INITIALIZER, + {MAIN_INPUT_INDEX, SECOND_INPUT_INDEX}, DEFAULT_INPUT_TIMEOUT_MSEC, + DEFAULT_INPUT_STREAM_TIME_DISCONTINUITY_THRESHOLD_US, 0, + 0, 0, 0, {0, 0}, NULL }; + + +static const char *output_audio_codec_name = DEFAULT_OUTPUT_AUDIO_CODEC_NAME; +static const char *output_video_codec_name = DEFAULT_OUTPUT_VIDEO_CODEC_NAME; + +static void timed_log(int level, const char *fmt, ...) +{ + char timed_fmt[2048]; + int64_t now_us = av_gettime(); + va_list vl; + va_start(vl, fmt); + if (snprintf(timed_fmt, sizeof(timed_fmt), "[%"PRId64"--%"PRId64"] %s", now_us, now_us - app_ctx.start, fmt) > 0) + av_vlog(NULL, level, timed_fmt, vl); + va_end(vl); +} + + +static int open_single_input(int input_index) +{ + int ret, i; + AVInputFormat *input_format = NULL; + AVDictionary * input_options = NULL; + AVFormatContext * input_fmt_ctx = NULL; + + if (app_ctx.input_format_names[input_index]) { + if (!(input_format = av_find_input_format(app_ctx.input_format_names[input_index]))) { + timed_log(AV_LOG_ERROR, "Input #%d Unknown input format: '%s'\n", input_index, + app_ctx.input_format_names[input_index]); + return AVERROR(EINVAL); + } + } + + av_dict_set(&input_options, "rw_timeout", "2000000", 0); + av_dict_set(&input_options, "timeout", "2000", 0); + if (!(app_ctx.input_fmt_ctx[input_index] = avformat_alloc_context())) + return AVERROR(ENOMEM); + + + // try to open input several times + while (!app_ctx.to_exit) { + if ((ret = avformat_open_input(&app_ctx.input_fmt_ctx[input_index], + app_ctx.input_filenames[input_index], + input_format, &input_options)) >= 0) { + timed_log(AV_LOG_INFO, "Input #%d File successfully opened: %s\n", + input_index, app_ctx.input_filenames[input_index]); + break; + } + timed_log(AV_LOG_ERROR, "Input #%d Cannot open input file %s, %s\n", + input_index, app_ctx.input_filenames[input_index], av_err2str(ret)); + + av_usleep(INPUT_TRYING_INTERVAL_USEC); + } + + + input_fmt_ctx = app_ctx.input_fmt_ctx[input_index]; + + if ((ret = avformat_find_stream_info(input_fmt_ctx, NULL)) < 0) { + timed_log(AV_LOG_ERROR, "Input #%d Cannot find stream information\n", input_index); + return ret; + } + + app_ctx.dec_ctx[input_index] = av_mallocz_array(input_fmt_ctx->nb_streams, + sizeof(*app_ctx.dec_ctx[input_index])); + if (!app_ctx.dec_ctx[input_index]) { + timed_log(AV_LOG_ERROR, "Could not allocate decoding context array for Input #%d\n", input_index); + return AVERROR(ENOMEM); + } + + + // creating decoding context for each input stream + for (i = 0; i < input_fmt_ctx->nb_streams; i++) { + + AVStream *stream = input_fmt_ctx->streams[i]; + AVCodec *dec = avcodec_find_decoder(stream->codecpar->codec_id); + AVCodecContext *codec_ctx; + if (!dec) { + timed_log(AV_LOG_ERROR, "Input #%d Failed to find decoder for elementary stream index #%u\n", + input_index, i); + return AVERROR_DECODER_NOT_FOUND; + } + codec_ctx = avcodec_alloc_context3(dec); + if (!codec_ctx) { + timed_log(AV_LOG_ERROR, "Input #%d Failed to allocate the decoder context for " + "elementary stream index #%u\n", input_index, i); + return AVERROR(ENOMEM); + } + ret = avcodec_parameters_to_context(codec_ctx, stream->codecpar); + if (ret < 0) { + timed_log(AV_LOG_ERROR, + "Input #%d Failed to copy decoder parameters to decoder context for stream #%u\n", + input_index, i); + return ret; + } + + av_opt_set_int(codec_ctx, "refcounted_frames", 1, 0); + + + /* Reencode video and audio streams and only remux subtitles, data streams etc. */ + if (codec_ctx->codec_type == AVMEDIA_TYPE_VIDEO || codec_ctx->codec_type == AVMEDIA_TYPE_AUDIO) { + if (codec_ctx->codec_type == AVMEDIA_TYPE_VIDEO) + codec_ctx->framerate = av_guess_frame_rate(input_fmt_ctx, stream, NULL); + /* Open decoder */ + ret = avcodec_open2(codec_ctx, dec, NULL); + if (ret < 0) { + timed_log(AV_LOG_ERROR, "Input #%d Failed to open decoder for elementary stream #%u\n", + input_index, i); + return ret; + } + + } else if (codec_ctx->codec_type == AVMEDIA_TYPE_UNKNOWN) { + timed_log(AV_LOG_FATAL, "Input #%d Elementary stream #%d is of unknown type, cannot proceed\n", + input_index, i); + return AVERROR(EINVAL); + + } + + app_ctx.dec_ctx[input_index][i] = codec_ctx; + } + + av_dump_format(input_fmt_ctx, 0, app_ctx.input_filenames[input_index], 0); + + return 0; +} + + +static int try_to_reopen_input(int input_source_index) +{ + int ret; + while (!app_ctx.to_exit) { + if ((ret = open_single_input(input_source_index)) >= 0) { // + timed_log(AV_LOG_INFO, "Input #%d Successfull reopening\n", input_source_index); + + // intentionally do not dry the output pipeline here + // but remain in its current state to use other realtime stream as secondary input + return 0; + } + av_usleep(INPUT_TRYING_INTERVAL_USEC); + } + return AVERROR(EIO); +} + + +// input packet maybe null in case of drying +static int encode_frame(AVFrame *frame, int stream_index, int input_source_index) +{ + int ret; + AVCodecContext * enc_ctx = app_ctx.enc_ctx[stream_index]; + AVPacket *output_packet; + + output_packet = av_packet_alloc(); + if (!output_packet) { + timed_log(AV_LOG_ERROR, "Input #%d Stream #%d could not allocate output packet\n", + input_source_index, stream_index); + return AVERROR(ENOMEM); + } + + /* send the frame to the encoder */ + if (frame) { // frame maybe null + OutputStreamStatData * st_data = &app_ctx.output_stream_data[stream_index]; + st_data->last_pts = frame->pts; + if (!st_data->nb_frames) + st_data->first_pts = frame->pts; + st_data->nb_frames++; + + // add calculated frame duration to input frame pts + if (enc_ctx->codec_type == AVMEDIA_TYPE_AUDIO && frame->sample_rate) + // calculate frame duration by number of audio samples + st_data->last_pts += av_rescale_q(frame->nb_samples, av_make_q(1, frame->sample_rate), enc_ctx->time_base); + + else if (enc_ctx->codec_type == AVMEDIA_TYPE_VIDEO && st_data->nb_frames >= 2) + // use overall mean frame duration (curr_pts/nb_frames-1) * nb_frames + st_data->last_pts = av_rescale(frame->pts - st_data->first_pts, st_data->nb_frames, st_data->nb_frames - 1); + + + timed_log(AV_LOG_DEBUG, "Input #%d Stream #%d Send frame for encoding, pts: %3"PRId64"\n", + input_source_index, stream_index, frame->pts); + } + + ret = avcodec_send_frame(enc_ctx, frame); + if (ret == AVERROR(EAGAIN)) { + + } else if (ret < 0) { + timed_log(AV_LOG_ERROR, "Input #%d Error sending a frame for encoding: %s\n", + input_source_index, av_err2str(ret)); + return ret; + } + + while (ret >= 0) { + ret = avcodec_receive_packet(enc_ctx, output_packet); + if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) + return ret; + else if (ret < 0) { + timed_log(AV_LOG_ERROR, "Input #%d Stream #%d Error during encoding: %s\n", + input_source_index, stream_index, av_err2str(ret)); + return ret; + } + + timed_log(AV_LOG_DEBUG, "Input #%d Stream #%d Write output packet, pts: %"PRId64" (size=%d)\n", + input_source_index, stream_index, output_packet->pts, output_packet->size); + + output_packet->stream_index = stream_index; + ret = av_interleaved_write_frame(app_ctx.output_fmt_ctx, output_packet); + if (ret < 0) { + timed_log(AV_LOG_ERROR, "Input #%d Stream #%d Error muxing packet, %s\n", + input_source_index, stream_index, av_err2str(ret)); + break; + } + av_packet_unref(output_packet); + } + + av_packet_free(&output_packet); + return ret; +} + + +// packet maybe null, so need stream_index +static int handle_received_packet(AVPacket *packet, int stream_index, int input_source_index) +{ + int ret = 0; + int64_t new_pts = 0; + + AVCodecContext * dec_ctx = app_ctx.dec_ctx[input_source_index][stream_index]; + AVFormatContext * input_fmt_ctx = app_ctx.input_fmt_ctx[input_source_index]; + + AVFrame *frame = av_frame_alloc(); + if (!frame) { + timed_log(AV_LOG_ERROR, "Input #%d Stream #%d Could not allocate frame\n", + input_source_index, stream_index); + return AVERROR(ENOMEM); + } + + if (packet) { + timed_log(AV_LOG_DEBUG, "Input #%d Stream #%d packet received, pts: %3"PRId64", size: %d\n", + input_source_index, stream_index, packet->pts, packet->size); + } + + ret = avcodec_send_packet(dec_ctx, packet); + if (ret == AVERROR(EAGAIN)) { + // nothing to do + } else if (ret == AVERROR_EOF) { + timed_log(AV_LOG_INFO, "Input #%d Stream #%d avcodec_send_packet returned: %s\n", + input_source_index, stream_index, av_err2str(ret)); + + } else if (ret < 0) { + timed_log(AV_LOG_ERROR, "Input #%d Stream #%d Error while sending a packet to decoder: %s\n", + input_source_index, stream_index, av_err2str(ret)); + av_frame_free(&frame); + return ret; + } + + while (ret >= 0) { + ret = avcodec_receive_frame(dec_ctx, frame); + if (ret == AVERROR(EAGAIN)) + break; + else if (ret == AVERROR_EOF) { + timed_log(AV_LOG_INFO, "Input #%d Stream #%d avcodec_receive_frame returned: %s\n", + input_source_index, stream_index, av_err2str(ret)); + break; + } else if (ret < 0) { + timed_log(AV_LOG_ERROR, "Input #%d Stream #%d Error while receiving a frame from decoder: %s\n", + input_source_index, stream_index, av_err2str(ret)); + av_frame_free(&frame); + return ret; + } + + app_ctx.input_has_new_frame[input_source_index] = 1; + timed_log(AV_LOG_DEBUG, "Input #%d Set input_has_new_frame flag\n", input_source_index); + if (app_ctx.input_source_index == input_source_index && !pthread_mutex_trylock(&app_ctx.encoder_mutex) ) { + + InputStreamStatData * in_st_data = &app_ctx.input_stream_data[stream_index]; + + if (in_st_data->first_pts == AV_NOPTS_VALUE) { + in_st_data->first_pts = frame->pts; + in_st_data->nb_frames = 1; + + } else { + + int64_t avg_delta_frame_pts = (frame->pts - in_st_data->first_pts) / (double)in_st_data->nb_frames; + int64_t avg_delta_frame_pts_time = av_rescale_q(avg_delta_frame_pts, dec_ctx->time_base, AV_TIME_BASE_Q); + + if (in_st_data->nb_frames > 25 && dec_ctx->codec_type == AVMEDIA_TYPE_VIDEO) + timed_log(AV_LOG_DEBUG, "Input #%d stream #%d stream fps: %0.2f, nb_frames: %"PRId64"\n", + input_source_index, stream_index, + (double)1000000/avg_delta_frame_pts_time, in_st_data->nb_frames); + else + timed_log(AV_LOG_DEBUG, "Input #%d stream #%d nb_frames: %"PRId64"\n", + input_source_index, stream_index, in_st_data->nb_frames); + + in_st_data->nb_frames ++; + } + + new_pts = av_rescale_q_rnd(frame->pts - in_st_data->first_pts, + input_fmt_ctx->streams[stream_index]->time_base, + app_ctx.output_fmt_ctx->streams[stream_index]->time_base, + AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX); + new_pts += app_ctx.output_stream_data[stream_index].pts_delta; + + timed_log(AV_LOG_DEBUG, "Input #%d Stream #%d frame received and sending for encoding, " + "pts: %"PRId64" => %"PRId64"\n", input_source_index, + stream_index, frame->pts, new_pts); + + frame->pts = new_pts; + + ret = encode_frame(frame, stream_index, input_source_index); + if (ret < 0 && ret != AVERROR(EAGAIN)) { + app_ctx.to_exit = 1; + timed_log(AV_LOG_INFO, "encoding terminating\n"); + } + pthread_mutex_unlock(&app_ctx.encoder_mutex); + + } else + ret = 0; + + av_frame_unref(frame); + } + + av_frame_free(&frame); + + return ret; +} + + +static void print_usage(const char * program_name) +{ + av_log(NULL, AV_LOG_ERROR, "usage: %s -im [-ifm ] " + "-is [-ifs ] " + "-o [-of ] [-sf ] " + "[-timeout ] [-loglevel info|debug|warning|error] " + "[-dsc ]\n", program_name); +} + + +static int read_parameters(int argc, char **argv) +{ + int i; + + for (i = 1; i < argc; i++) { + if (!strcmp(argv[i], "-im") && i+1 < argc) { + app_ctx.input_filenames[MAIN_INPUT_INDEX] = argv[++i]; + + } else if (!strcmp(argv[i], "-ifm") && i+1 < argc) { + app_ctx.input_format_names[MAIN_INPUT_INDEX] = argv[++i]; + + } else if (!strcmp(argv[i], "-is") && i+1 < argc) { + app_ctx.input_filenames[SECOND_INPUT_INDEX] = argv[++i]; + + } else if (!strcmp(argv[i], "-ifs") && i+1 < argc) { + app_ctx.input_format_names[SECOND_INPUT_INDEX] = argv[++i]; + + } else if (!strcmp(argv[i], "-o") && i+1 < argc) { + app_ctx.output_filename = argv[++i]; + + } else if (!strcmp(argv[i], "-of") && i+1 < argc) { + app_ctx.output_format_name = argv[++i]; + + } else if (!strcmp(argv[i], "-sf") && i+1 < argc) { + app_ctx.state_file = argv[++i]; + + } else if (!strcmp(argv[i], "-loglevel") && i+1 < argc) { + i++; + if (!strcmp(argv[i], "info")) { + av_log_set_level(AV_LOG_INFO); + } else if (!strcmp(argv[i], "error")) { + av_log_set_level(AV_LOG_ERROR); + } else if (!strcmp(argv[i], "warning")) { + av_log_set_level(AV_LOG_WARNING); + } else if (!strcmp(argv[i], "debug")) { + av_log_set_level(AV_LOG_DEBUG); + } else { + timed_log(AV_LOG_ERROR, + "Unexpected loglevel value: %s\n", argv[i]); + return AVERROR(EINVAL); + } + + + } else if (!strcmp(argv[i], "-timeout") && i+1 < argc) { + char * tail = NULL; + app_ctx.input_timeout_ms = strtoll(argv[++i], &tail, 10); + if (*tail || app_ctx.input_timeout_ms < 1) { + timed_log(AV_LOG_ERROR, + "Invalid or negative value '%s' for input timeout checking interval\n", argv[i]); + return AVERROR(EINVAL); + } + + } else if (!strcmp(argv[i], "-dsc") && i+1 < argc) { + char * tail = NULL; + app_ctx.input_stream_time_discnt_thrshd_us = strtoll(argv[++i], &tail, 10); + if (*tail || app_ctx.input_timeout_ms < 1) { + timed_log(AV_LOG_ERROR, + "Invalid or negative value '%s' for input time discontinuity interval\n", argv[i]); + return AVERROR(EINVAL); + } + + } else { + timed_log(AV_LOG_ERROR, "unknown option, or missing parameter: %s\n", argv[i]); + print_usage(argv[0]); + return AVERROR(EINVAL); + } + } + + if (!app_ctx.input_filenames[MAIN_INPUT_INDEX] || + !app_ctx.input_filenames[SECOND_INPUT_INDEX] || + !app_ctx.output_filename) { + print_usage(argv[0]); + return AVERROR(EINVAL); + } + + return 0; +} + + +static int check_input_streams_matching(void) +{ + int i; + + if (app_ctx.input_fmt_ctx[MAIN_INPUT_INDEX]->nb_streams != app_ctx.input_fmt_ctx[SECOND_INPUT_INDEX]->nb_streams) { + timed_log(AV_LOG_ERROR, "First input has #%d streams but secondary input has #%d streams, " + "but stream numbers should be matching, so aborting\n", + app_ctx.input_fmt_ctx[MAIN_INPUT_INDEX]->nb_streams, + app_ctx.input_fmt_ctx[SECOND_INPUT_INDEX]->nb_streams); + return AVERROR(EINVAL); + } + + for (i = 0; i < app_ctx.input_fmt_ctx[MAIN_INPUT_INDEX]->nb_streams; i++) { + AVCodecContext * main_dec_ctx = app_ctx.dec_ctx[MAIN_INPUT_INDEX][i]; + AVCodecContext * second_dec_ctx = app_ctx.dec_ctx[SECOND_INPUT_INDEX][i]; + + if (main_dec_ctx->codec_type != second_dec_ctx->codec_type) { + timed_log(AV_LOG_ERROR, "Mismatching stream types at #%d elementary stream, aborting\n", i); + return AVERROR(EINVAL); + } + + if (main_dec_ctx->codec_type == AVMEDIA_TYPE_VIDEO) { + if (main_dec_ctx->width != second_dec_ctx->width) { + timed_log(AV_LOG_ERROR, "at stream #%d video width mismatch: %d != %d\n", i, + main_dec_ctx->width, second_dec_ctx->width); + return AVERROR(EINVAL); + } + if (main_dec_ctx->height != second_dec_ctx->height) { + timed_log(AV_LOG_ERROR, "at stream #%d video height mismatch: %d != %d\n", i, + main_dec_ctx->height, second_dec_ctx->height); + return AVERROR(EINVAL); + } + if (main_dec_ctx->pix_fmt != second_dec_ctx->pix_fmt) { + timed_log(AV_LOG_ERROR, "at stream #%d video pix_fmt mismatch: %d != %d\n", i, + main_dec_ctx->pix_fmt, second_dec_ctx->pix_fmt); + return AVERROR(EINVAL); + } + // TODO: check more video parameters + } + if (main_dec_ctx->codec_type == AVMEDIA_TYPE_AUDIO) { + if (main_dec_ctx->channels != second_dec_ctx->channels) { + timed_log(AV_LOG_ERROR, "at stream #%d audio channel number mismatch: %d != %d\n", i, + main_dec_ctx->channels, second_dec_ctx->channels); + return AVERROR(EINVAL); + } + if (main_dec_ctx->channel_layout != second_dec_ctx->channel_layout) { + timed_log(AV_LOG_ERROR, "at stream #%d audio channel layout mismatch: %"PRId64" != %"PRId64"\n", + i, main_dec_ctx->channel_layout, second_dec_ctx->channel_layout); + return AVERROR(EINVAL); + } + if (main_dec_ctx->sample_rate != second_dec_ctx->sample_rate) { + timed_log(AV_LOG_ERROR, "at stream #%d audio sample rate mismatch: %d != %d\n", i, + main_dec_ctx->sample_rate, second_dec_ctx->sample_rate); + return AVERROR(EINVAL); + } + + if (main_dec_ctx->sample_fmt != DEFAULT_EXPECTED_AUDIO_SAMPLE_FORMAT) { + timed_log(AV_LOG_ERROR, "at elementary stream #%d audio sample format is not as expected (%d)\n", + i, DEFAULT_EXPECTED_AUDIO_SAMPLE_FORMAT); + return AVERROR(EINVAL); + } + + if (main_dec_ctx->sample_fmt != second_dec_ctx->sample_fmt) { + timed_log(AV_LOG_ERROR, "at elementary stream #%d audio sample format mismatch: %d != %d\n", + i, main_dec_ctx->sample_fmt, second_dec_ctx->sample_fmt); + return AVERROR(EINVAL); + } + + // TODO: check more audio parameters + } + } + + return 0; +} + + + +static int allocate_arrays(void) +{ + + int nb_streams = app_ctx.input_fmt_ctx[MAIN_INPUT_INDEX]->nb_streams; + + app_ctx.enc_ctx = av_mallocz_array(nb_streams, sizeof(*app_ctx.enc_ctx)); + if (!app_ctx.enc_ctx) { + timed_log(AV_LOG_ERROR,"Could not allocate encoder context list\n"); + return AVERROR(ENOMEM); + } + + app_ctx.input_stream_data = av_mallocz_array(nb_streams, sizeof(*app_ctx.input_stream_data)); + if (!app_ctx.input_stream_data) { + timed_log(AV_LOG_ERROR,"Could not allocate input_stream_data list\n"); + return AVERROR(ENOMEM); + } + + app_ctx.output_stream_data = av_mallocz_array(nb_streams, sizeof(*app_ctx.output_stream_data)); + if (!app_ctx.output_stream_data) { + timed_log(AV_LOG_ERROR,"Could not allocate input_stream_data list\n"); + return AVERROR(ENOMEM); + } + + return 0; +} + + +static int open_output (void) +{ + int i, ret; + AVDictionary * output_options = NULL; + AVOutputFormat * output_format = NULL; + + AVStream * out_stream; + AVStream * in_stream; + AVCodecContext * dec_ctx = NULL, * enc_ctx = NULL; + AVCodec * output_video_codec, * output_audio_codec; + AVFormatContext * input_fmt_ctx = app_ctx.input_fmt_ctx[MAIN_INPUT_INDEX]; + + + if (app_ctx.output_format_name) { + if (!(output_format = av_guess_format(app_ctx.output_format_name, NULL, NULL))) { + timed_log(AV_LOG_ERROR, "Unknown output format: '%s'\n", app_ctx.output_format_name); + return AVERROR(EINVAL); + } + } + + // allocate the output media context + ret = avformat_alloc_output_context2(&app_ctx.output_fmt_ctx, output_format, NULL, + app_ctx.output_filename); + if (ret < 0 || !app_ctx.output_fmt_ctx) { + timed_log(AV_LOG_ERROR,"Could not deduce output format for %s.\n", app_ctx.output_filename); + return AVERROR(EINVAL); + } + + if ((ret = allocate_arrays()) < 0) + return ret; + + // find the video encoder for output + output_video_codec = avcodec_find_encoder_by_name(output_video_codec_name); + if (!output_video_codec) { + timed_log(AV_LOG_ERROR, "Output video codec '%s' not found\n", output_video_codec_name); + return AVERROR_ENCODER_NOT_FOUND; + } + + // find the audio encoder for output + output_audio_codec = avcodec_find_encoder_by_name(output_audio_codec_name); + if (!output_audio_codec) { + timed_log(AV_LOG_ERROR, "Output audio codec '%s' not found\n", output_audio_codec_name); + return AVERROR_ENCODER_NOT_FOUND; + } + + + // creating encoding context for each input stream based on main input format + for (i = 0; i < input_fmt_ctx->nb_streams; i++) { + + app_ctx.input_stream_data[i].first_pts = AV_NOPTS_VALUE; + app_ctx.output_stream_data[i].first_pts = AV_NOPTS_VALUE; + app_ctx.output_stream_data[i].last_pts = AV_NOPTS_VALUE; + app_ctx.output_stream_data[i].pts_delta = 0; + app_ctx.output_stream_data[i].nb_frames = 0; + + in_stream = input_fmt_ctx->streams[i]; + dec_ctx = app_ctx.dec_ctx[MAIN_INPUT_INDEX][i]; // based on main input + + out_stream = avformat_new_stream(app_ctx.output_fmt_ctx, NULL); + if (!out_stream) { + timed_log(AV_LOG_ERROR, "Failed allocating output stream\n"); + return AVERROR_UNKNOWN; + } + + enc_ctx = NULL; + + if (dec_ctx->codec_type == AVMEDIA_TYPE_VIDEO) { + + // create the context for video encoder + enc_ctx = avcodec_alloc_context3(output_video_codec); + if (!enc_ctx) { + timed_log(AV_LOG_ERROR, "Could not allocate output video codec context\n"); + return AVERROR(EINVAL); + } + + enc_ctx->height = dec_ctx->height; + enc_ctx->width = dec_ctx->width; + enc_ctx->sample_aspect_ratio = dec_ctx->sample_aspect_ratio; + enc_ctx->pix_fmt = dec_ctx->pix_fmt; + // TODO: check wheter pix_format included in output_video_codec->pix_fmts, + // supported format list of video codec + + enc_ctx->time_base = av_inv_q(dec_ctx->framerate); + enc_ctx->gop_size = 0; // intra only, but it is useless in case of rawvideo + + av_opt_set_int(enc_ctx, "refcounted_frames", 1, 0); + + ret = avcodec_open2(enc_ctx, output_video_codec, NULL); + if (ret < 0) { + timed_log(AV_LOG_ERROR, "Could not open output video codec: %s\n", av_err2str(ret)); + return ret; + } + + } else if (dec_ctx->codec_type == AVMEDIA_TYPE_AUDIO) { + + // create the context for audio encoder + enc_ctx = avcodec_alloc_context3(output_audio_codec); + if (!enc_ctx) { + timed_log(AV_LOG_ERROR, "Could not allocate output audio codec context\n"); + return AVERROR(EINVAL); + } + + enc_ctx->sample_rate = dec_ctx->sample_rate; + enc_ctx->channel_layout = dec_ctx->channel_layout; + enc_ctx->channels = dec_ctx->channels; + // TODO: check by av_get_channel_layout_nb_channels(enc_ctx->channel_layout); + + enc_ctx->sample_fmt = DEFAULT_EXPECTED_AUDIO_SAMPLE_FORMAT; // encoder->sample_fmts[0]; + enc_ctx->time_base = (AVRational){1, enc_ctx->sample_rate}; + + ret = avcodec_open2(enc_ctx, output_audio_codec, NULL); + if (ret < 0) { + timed_log(AV_LOG_ERROR, "Could not open output audio codec: %s\n", av_err2str(ret)); + return ret; + } + + } + + if (dec_ctx->codec_type == AVMEDIA_TYPE_VIDEO || dec_ctx->codec_type == AVMEDIA_TYPE_AUDIO) { + + ret = avcodec_parameters_from_context(out_stream->codecpar, enc_ctx); + if (ret < 0) { + timed_log(AV_LOG_ERROR, "Failed to copy encoder parameters to output stream #%u\n", i); + return ret; + } + if (app_ctx.output_fmt_ctx->oformat->flags & AVFMT_GLOBALHEADER) + enc_ctx->flags |= AV_CODEC_FLAG_GLOBAL_HEADER; + + out_stream->time_base = enc_ctx->time_base; // hint for the muxer + app_ctx.enc_ctx[i] = enc_ctx; + + } else if (dec_ctx->codec_type == AVMEDIA_TYPE_UNKNOWN) { + timed_log(AV_LOG_FATAL, "Elementary stream #%d is of unknown type, cannot proceed\n", i); + return AVERROR_INVALIDDATA; + + } else { + // this stream will be remuxed only + ret = avcodec_parameters_copy(out_stream->codecpar, in_stream->codecpar); + if (ret < 0) { + timed_log(AV_LOG_ERROR, "Copying codec parameters for elementary stream #%u failed\n", i); + return ret; + } + out_stream->time_base = in_stream->time_base; + } + + app_ctx.enc_ctx[i] = enc_ctx; + + } + + + av_dump_format(app_ctx.output_fmt_ctx, 0, app_ctx.output_filename, 1); + + + // open the output file, if needed by the format + if (!(app_ctx.output_fmt_ctx->oformat->flags & AVFMT_NOFILE)) { + ret = avio_open2(&app_ctx.output_fmt_ctx->pb, app_ctx.output_filename, + AVIO_FLAG_WRITE, NULL, &output_options); + if (ret < 0) { + timed_log(AV_LOG_ERROR, "Could not open '%s': %s\n", + app_ctx.output_filename, av_err2str(ret)); + return ret; + } + } + + + // Write the stream header, if any + ret = avformat_write_header(app_ctx.output_fmt_ctx, &output_options); + if (ret < 0) { + timed_log(AV_LOG_ERROR, "Error occurred when opening output file: %s\n", av_err2str(ret)); + return ret; + } + + return 0; +} + + +static int calculate_new_ts_delta_values(void) +{ + int i; + int64_t max_last_pts = AV_NOPTS_VALUE; + int max_index = -1; + + // find the max last_pts, this will be the old output duration + for(i = 0; i < app_ctx.output_fmt_ctx->nb_streams; i++) { + if (app_ctx.output_stream_data[i].last_pts == AV_NOPTS_VALUE) + continue; + + if (max_index == -1) { + max_index = i; + continue; + } + + if (av_compare_ts(app_ctx.output_stream_data[i].last_pts, + app_ctx.output_fmt_ctx->streams[i]->time_base, + app_ctx.output_stream_data[max_index].last_pts, + app_ctx.output_fmt_ctx->streams[max_index]->time_base) > 0) + max_index = i; + } + + if (max_index == -1) { + timed_log(AV_LOG_ERROR, "could not calculate new max pts\n"); + return AVERROR(EINVAL); + } + + // save here because we will clear somewhere in the next for loop + max_last_pts = app_ctx.output_stream_data[max_index].last_pts; + + // calculate new delta by adding the max and then rescaling to new input time base + for(i = 0; i < app_ctx.output_fmt_ctx->nb_streams; i++) { + app_ctx.output_stream_data[i].pts_delta = av_rescale_q_rnd(max_last_pts, + app_ctx.output_fmt_ctx->streams[max_index]->time_base, + app_ctx.output_fmt_ctx->streams[i]->time_base, + AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX); + app_ctx.input_stream_data[i].first_pts = AV_NOPTS_VALUE; + } + + return 0; +} + + +static int dry_current_input_pipeline(int input_source_index) +{ + int i, ret; + + for (i = 0; i < app_ctx.output_fmt_ctx->nb_streams; i++) + if ((ret = handle_received_packet(NULL, i, input_source_index)) < 0) + if (ret != AVERROR(EAGAIN) && ret != AVERROR_EOF) + timed_log(AV_LOG_WARNING, "Input #%d stream # %d problem on drying the pipeline: %s/n", + input_source_index, i, av_err2str(ret)); + + return 0; +} + + +static int handle_input(int input_source_index) +{ + int i, ret, eof_input = 0, error_input = 0, input_reopen_counter = 0; + AVPacket input_packet; + int64_t dts_delta_time = 0; + + timed_log(AV_LOG_INFO, "Input #%d thread started\n", input_source_index); + while (!app_ctx.to_exit) { // almost for ever + int to_set_dts_delta_time = 1; + + // read packets continouosly from input + while (!app_ctx.to_exit) { + ret = av_read_frame(app_ctx.input_fmt_ctx[input_source_index], &input_packet); + + if (ret < 0) { + if (ret == AVERROR_EOF) { + eof_input = 1; + timed_log(AV_LOG_INFO, "input #%d eof detected by av_read_frame\n", + input_source_index); + } else { + error_input = 1; + timed_log(AV_LOG_ERROR, "input #%d av_read_frame returned: %s\n", + input_source_index, av_err2str(ret)); + } + break; + } + + if (input_packet.stream_index >= app_ctx.input_fmt_ctx[input_source_index]->nb_streams) + timed_log(AV_LOG_WARNING, "Input #%d unexpected stream index: %d\n", + input_source_index, input_packet.stream_index); + + else { + // ensuring realtime processing + if (input_packet.dts != AV_NOPTS_VALUE) { + + int64_t dts_time = av_rescale_q(input_packet.dts, + app_ctx.input_fmt_ctx[input_source_index]->streams[input_packet.stream_index]->time_base, + AV_TIME_BASE_Q); + int64_t now_us = av_gettime_relative(); + int64_t sleep_us = dts_time - now_us + dts_delta_time; + + if (to_set_dts_delta_time) { + to_set_dts_delta_time = 0; + dts_delta_time = now_us - dts_time; + sleep_us = 0; + } + + if (abs(sleep_us) > app_ctx.input_stream_time_discnt_thrshd_us) { + timed_log(AV_LOG_INFO, + "Input #%d time discontinuity detected: %"PRIi64"us (limit: %dus), packet wallclock timestamp: %"PRIi64 + ", delta: %"PRIi64"us\n", + input_source_index, sleep_us, app_ctx.input_stream_time_discnt_thrshd_us, dts_time, dts_delta_time); + sleep_us = 0; + dts_delta_time = now_us - dts_time; + } + if (sleep_us > app_ctx.input_stream_time_discnt_thrshd_us) { + timed_log(AV_LOG_WARNING, "Input %d Too long sleeping time: %"PRIi64", truncate to %d\n", + input_source_index, sleep_us, app_ctx.input_stream_time_discnt_thrshd_us); + sleep_us = app_ctx.input_stream_time_discnt_thrshd_us; + } + if (sleep_us > 0) { + timed_log(AV_LOG_DEBUG, "Input #%d sleeping %"PRIi64"us to simulate realtime receiving\n", + input_source_index, sleep_us); + for(;sleep_us > app_ctx.input_timeout_ms * 500; sleep_us -= app_ctx.input_timeout_ms * 500) // 500 = 1000/2 + av_usleep(sleep_us); + + av_usleep(sleep_us); + } + } + + + if (app_ctx.dec_ctx[input_source_index][input_packet.stream_index]->codec_type == AVMEDIA_TYPE_VIDEO || + app_ctx.dec_ctx[input_source_index][input_packet.stream_index]->codec_type == AVMEDIA_TYPE_AUDIO) { + + if ((ret = handle_received_packet(&input_packet, input_packet.stream_index, input_source_index)) < 0) + if (ret != AVERROR(EAGAIN) && ret != AVERROR_EOF) + break; + + + } else if (app_ctx.input_source_index == input_source_index && !pthread_mutex_trylock(&app_ctx.encoder_mutex) ) { + app_ctx.input_has_new_frame[input_source_index] = 1; + + /* remux this frame without reencoding */ + av_packet_rescale_ts(&input_packet, + app_ctx.input_fmt_ctx[input_source_index]->streams[input_packet.stream_index]->time_base, + app_ctx.output_fmt_ctx->streams[input_packet.stream_index]->time_base); + + ret = av_interleaved_write_frame(app_ctx.output_fmt_ctx, &input_packet); + + pthread_mutex_unlock(&app_ctx.encoder_mutex); + + if (ret < 0) { + app_ctx.to_exit = 1; + break; + } + } + } + av_packet_unref(&input_packet); + } + + + if (!app_ctx.to_exit && (eof_input || error_input)) { + + timed_log(AV_LOG_INFO, "Gonna reopen Input #%d, ocasion: #%d\n", + input_source_index, ++input_reopen_counter); + + // dry current pipeline + dry_current_input_pipeline(input_source_index); + + // close input + for (i = 0; i < app_ctx.output_fmt_ctx->nb_streams; i++) + avcodec_free_context(&app_ctx.dec_ctx[input_source_index][i]); + + avformat_close_input(&app_ctx.input_fmt_ctx[input_source_index]); + + eof_input = 0; + error_input = 0; + + if (try_to_reopen_input(input_source_index) < 0) { + break; + } + + } + } + + if (!app_ctx.to_exit && eof_input) { + // dry current pipeline + dry_current_input_pipeline(input_source_index); + } + + return 0; +} + + + +static void *threaded_input_handler(void * arg) +{ + int input_source_index = *(int *)arg; + handle_input(input_source_index); + pthread_exit(NULL); +} + + +static int write_out_new_state_log(void) +{ + char tmp_state_file[1024]; + int ret, i; + FILE * outfile; + + if (!app_ctx.state_file || !app_ctx.state_file[0]) + return 0; // no state file was specified + + + if ((ret = snprintf(tmp_state_file, sizeof(tmp_state_file), "%s.tmp", app_ctx.state_file)) <= 0) { + timed_log(AV_LOG_ERROR, "could not create state file name: %s", av_err2str(ret)); + return ret; + } + + outfile = fopen(tmp_state_file, "wb"); + if (!outfile) { + timed_log(AV_LOG_ERROR, "could not open tmp state file: %s", av_err2str(errno)); + return errno; + } + + if (app_ctx.input_source_index == -1) + ret = fprintf(outfile, "o:_:"); + else + ret = fprintf(outfile, "o:%d:", app_ctx.input_source_index); + + if (ret < 0) { + timed_log(AV_LOG_ERROR, "Could not write into tmp state file (%s): %s\n", + tmp_state_file, av_err2str(ret)); + fclose(outfile); + return ret; + } + + ret = fprintf(outfile, "%d:%"PRId64, app_ctx.input_failover_counter, + (av_gettime() - app_ctx.current_source_index_state_time)/1000); + if (ret < 0) { + timed_log(AV_LOG_ERROR, "Could not write into tmp state file (%s): %s\n", + tmp_state_file, av_err2str(ret)); + fclose(outfile); + return ret; + } + + for (i = 0; i < NB_INPUTS; i++) { + ret = fprintf(outfile, " %d:%d", i, !app_ctx.input_has_new_frame[i]?0:1); + if (ret < 0) { + timed_log(AV_LOG_ERROR, "Could not write into tmp state file (%s): %s\n", + tmp_state_file, av_err2str(ret)); + fclose(outfile); + return ret; + } + } + + ret = fprintf(outfile, "\n"); + if (ret < 0) { + timed_log(AV_LOG_ERROR, "Could not write into tmp state file (%s): %s\n", + tmp_state_file, av_err2str(ret)); + fclose(outfile); + return ret; + } + fclose(outfile); + + if (rename(tmp_state_file, app_ctx.state_file) < 0) { + timed_log(AV_LOG_ERROR, "Could not rename state file (%s => %s): %s\n", + tmp_state_file, app_ctx.state_file, av_err2str(errno)); + return errno; + } + + return 0; +} + + +static void main_loop(void) +{ + int i; + int64_t last_input_check_time = av_gettime_relative(); + + write_out_new_state_log(); + + app_ctx.current_source_index_state_time = last_input_check_time; + while (!app_ctx.to_exit) { // almost for ever + int64_t now_us = av_gettime_relative(); + int64_t check_interval = now_us - last_input_check_time; + + if (check_interval > app_ctx.input_timeout_ms * 1000) { + last_input_check_time = now_us; + if (app_ctx.input_source_index == MAIN_INPUT_INDEX && app_ctx.input_has_new_frame[MAIN_INPUT_INDEX]) { + // normal case + timed_log(AV_LOG_DEBUG, "Checking running main input: ok, in last %"PRIi64"us \n", check_interval); + + } else if (app_ctx.input_source_index != MAIN_INPUT_INDEX && app_ctx.input_has_new_frame[MAIN_INPUT_INDEX]) { + if (!pthread_mutex_lock(&app_ctx.encoder_mutex)) { + if (app_ctx.input_source_index >= 0) { + timed_log(AV_LOG_INFO, "#%d switching back to main input because new frame arrived\n", + app_ctx.input_failover_counter); + calculate_new_ts_delta_values(); + } else + timed_log(AV_LOG_INFO, "Switching to main input\n"); + app_ctx.input_source_index = MAIN_INPUT_INDEX; + app_ctx.current_source_index_state_time = av_gettime(); + + pthread_mutex_unlock(&app_ctx.encoder_mutex); + } else + timed_log(AV_LOG_ERROR, "Could not lock encoder_mutex for input switching\n"); + + } else if (app_ctx.input_source_index != SECOND_INPUT_INDEX && app_ctx.input_has_new_frame[SECOND_INPUT_INDEX]) { + if (!pthread_mutex_lock(&app_ctx.encoder_mutex)) { + if (app_ctx.input_source_index >= 0) { + app_ctx.input_failover_counter++; + timed_log(AV_LOG_INFO, "#%d switching to second input, now new frame on Input #%d in last %"PRIi64"us\n", + app_ctx.input_failover_counter, MAIN_INPUT_INDEX, check_interval); + calculate_new_ts_delta_values(); + } else + timed_log(AV_LOG_INFO, "Switching to second input\n"); + app_ctx.input_source_index = SECOND_INPUT_INDEX; + app_ctx.current_source_index_state_time = av_gettime(); + + pthread_mutex_unlock(&app_ctx.encoder_mutex); + } else + timed_log(AV_LOG_ERROR, "Could not lock encoder_mutex for input switching\n"); + } + + + write_out_new_state_log(); + + for (i = 0; i < NB_INPUTS; i++) + app_ctx.input_has_new_frame[i] = 0; + + } + av_usleep(app_ctx.input_timeout_ms * 250); // 250 = 1000 / 4 + } +} + + + +int main(int argc, char **argv) +{ + int ret, i, k; + pthread_attr_t attr; + + app_ctx.start = av_gettime(); + av_log_set_level(DEFAULT_LOG_LEVEL); + + // read and check command line parameters + if (read_parameters(argc, argv) < 0) + exit(1); + + avformat_network_init(); + avfilter_register_all(); + + app_ctx.input_source_index = -1; // none + app_ctx.to_exit = 0; + + // For portability, explicitly create threads in a joinable state + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + + + for (k = 0; k < NB_INPUTS && !app_ctx.to_exit; k++) { + if ((ret = open_single_input(k)) < 0) // open input + goto end; + + app_ctx.input_has_new_frame[k] = 0; + + if ((ret = pthread_create(&app_ctx.input_threads[k], &attr, threaded_input_handler, (void *) &app_ctx.thread_id[k]))) { + timed_log(AV_LOG_ERROR, "return code from #%d pthread_create() is %d\n", k, ret); + goto end; + } + } + + if ((ret = check_input_streams_matching()) < 0) + goto end; + + if (open_output() < 0) + goto end; + + main_loop(); + + + if (!pthread_mutex_lock(&app_ctx.encoder_mutex)) { + app_ctx.input_source_index = -1; + pthread_mutex_unlock(&app_ctx.encoder_mutex); + } + + av_write_trailer(app_ctx.output_fmt_ctx); + + + if (!(app_ctx.output_fmt_ctx->oformat->flags & AVFMT_NOFILE)) + avio_closep(&app_ctx.output_fmt_ctx->pb); + +end: + + app_ctx.to_exit = 1; + // wait all input thread to terminate + for (k = 0; k < NB_INPUTS; k++) + pthread_join(app_ctx.input_threads[k], NULL); + + + for (k = 0; k < NB_INPUTS; k++) { + for (i = 0; app_ctx.output_fmt_ctx && + i < app_ctx.output_fmt_ctx->nb_streams; i++) + avcodec_free_context(&app_ctx.dec_ctx[k][i]); + + avformat_close_input(&app_ctx.input_fmt_ctx[k]); + } + + for (i = 0; app_ctx.output_fmt_ctx && i < app_ctx.output_fmt_ctx->nb_streams; i++) + avcodec_free_context(&app_ctx.enc_ctx[i]); + + avformat_close_input(&app_ctx.output_fmt_ctx); + avformat_free_context(app_ctx.output_fmt_ctx); + + avformat_network_deinit(); + + + pthread_mutex_destroy(&app_ctx.encoder_mutex); + + exit(0); +}