From patchwork Sun May 13 00:07:39 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Stephan Holljes X-Patchwork-Id: 8934 Delivered-To: ffmpegpatchwork@gmail.com Received: by 2002:a02:155:0:0:0:0:0 with SMTP id c82-v6csp2437271jad; Sat, 12 May 2018 17:08:21 -0700 (PDT) X-Google-Smtp-Source: AB8JxZryrBb8nv3TFUq+jwoslZSIpTLu6o6mmrj89xtPBkFXK3d2CeVJMGm2G/W7TJfjT5zFv3RH X-Received: by 2002:a1c:1a49:: with SMTP id a70-v6mr2168915wma.77.1526170100914; Sat, 12 May 2018 17:08:20 -0700 (PDT) ARC-Seal: i=1; a=rsa-sha256; t=1526170100; cv=none; d=google.com; s=arc-20160816; b=BbN4ITtAurSMn7ggWUFiwonqwe/HKZVLaetWv69H9RGqSazLWDMM8tPnZodsoQg7xU 5WxPmKyGownTpCG+o/zhQc8/oVx1A8/ZeBhchfU4cYrVSqdnxYsjLqvehMOtiNVjNV1S EKC9FvRLMqXNLd+1uZ1M72ILZFXx4U+UJmSAmpZ/iulaoV1F4fA2tjRwhkVsDTwnF3M6 YGm+OLcjAqU++pW0mzbcZZ+kzfleE1V0aZe6apAEUgmWDpfEPZ2h62485l8RgG3/9xT+ oWwRlR39i3zz5Lh9dKJco++g1wpVt+D+BE37VX+s76sJYf0mbP7ArGgG5T3FuOZF70PQ 2RKw== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:content-transfer-encoding:mime-version:cc:reply-to :list-subscribe:list-help:list-post:list-archive:list-unsubscribe :list-id:precedence:subject:references:in-reply-to:message-id:date :to:from:dkim-signature:delivered-to:arc-authentication-results; bh=pKrQtYuOnVW8sZ7Fo9udiWN0svTw7iRolaH74ZOwx4g=; b=Jo+V1zUsi2kGDpAKHwccmflko1y3sg4vIF9HpsjvZO2nEwbloa2zxUiYa43PwjUBMU EQfS6TSQw10yEzSYImIEg3npG+2ej5dveZP+0TL0Ae2BMhAHTBwt/il2helszGIiQepo TzUycc2XcH2qSgoJabXrrDHi3qAIr47FR+jGuSCnYK8La/xEI5POEpwT5JYsHdJNB8DE 7Tfgus614fG7dh7oGlXM/gZtdq2vqEvu9wUN9sNNg4/uTE+JClMJ0JVI3Qbj0Hz8rLCd Luk8fSs3wLrNjvhxXYgN6eOLroxC+E3pD5DS/+Nn1WpUM/yGdPGfCRm9WFoLJfcoK8/y McNA== ARC-Authentication-Results: i=1; mx.google.com; dkim=neutral (body hash did not verify) header.i=@googlemail.com header.s=20161025 header.b=owl6hsDk; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org; dmarc=fail (p=QUARANTINE sp=QUARANTINE dis=NONE) header.from=googlemail.com Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org. [79.124.17.100]) by mx.google.com with ESMTP id v8-v6si5332715wrd.20.2018.05.12.17.08.20; Sat, 12 May 2018 17:08:20 -0700 (PDT) Received-SPF: pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) client-ip=79.124.17.100; Authentication-Results: mx.google.com; dkim=neutral (body hash did not verify) header.i=@googlemail.com header.s=20161025 header.b=owl6hsDk; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org; dmarc=fail (p=QUARANTINE sp=QUARANTINE dis=NONE) header.from=googlemail.com Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 78CFC68A60F; Sun, 13 May 2018 03:07:37 +0300 (EEST) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from mail-wm0-f46.google.com (mail-wm0-f46.google.com [74.125.82.46]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id AB62C68A5F5 for ; Sun, 13 May 2018 03:07:31 +0300 (EEST) Received: by mail-wm0-f46.google.com with SMTP id l1-v6so9052010wmb.2 for ; Sat, 12 May 2018 17:08:09 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=googlemail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references; bh=qxAjfEks9SE1N+J4Y3Xni2BxfslhUCkAqiEDd/2iWXc=; b=owl6hsDkG9fkdyvg49ow9/WYnM4Y4RW+cPq/DGJyBQmhsuJSPnBSmpviWHIOUqACaC axUYPG22LwMZqPB1gsJnaSpJ4fNEvi7V0cqs3LcEfeCfBtlg+aKado12tv58UxzvvKU8 mS06hlIXAo6bGZmJgEeVr16lOUqtnm01Ill2NGfmWe5xWeo687N10TCgFr0wyyHD+oA4 e6gxKww68lmMRAm4Ybss5mb/tve9ceSEUdxBlc4zBFs9263agJh17XZThaWrLk4mpBbM sWQ7s69T9TBMG0UMMMI3b8U3PTU5M0yILEDhRuDytV5nfOyJKqyDe2IPLrUxDo2ok7Fb ltMA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=qxAjfEks9SE1N+J4Y3Xni2BxfslhUCkAqiEDd/2iWXc=; b=fTSNwNAiuTNFdw2TifmDqnS8zvxvPiIc+7IFz/lElvZkGW+5ik1Fd/oV/E0pQk0RYu S0l65LJEI6NgXWyuhKLres1xIWvvo8pqxQUXYwGWrzmzJmMWfn4ysPsRZvzoHY/qn1kj /8wAKLIw4zv/CrSw3+2K8yhjBuCRydEyOW0tU90CWs1z2zfimWG5vjpLsMXTi4+p1TXp Qasc5ZyC9hi2fQ3PqBpGnZKJXY9Y5wKlSfvjdEXB2O0BLbHDhcOOmlnfnPi/aNPWGq2l D2T2sXl816kt8qDO34knqpzH2UayhVE589qEVmEur670dKTMo1HKh79ULhI7n2ZFwIY2 CfEA== X-Gm-Message-State: ALKqPwdino9NteoD5ln2rQb4XRDdK6VgMOQFU+e7fByN3s3vGVOnK493 6jhLSAh3mw58LoLnvlxb+5KJdw== X-Received: by 2002:a1c:8350:: with SMTP id f77-v6mr1973392wmd.1.1526170086847; Sat, 12 May 2018 17:08:06 -0700 (PDT) Received: from localhost.localdomain ([46.5.2.0]) by smtp.gmail.com with ESMTPSA id r14-v6sm8245469wra.41.2018.05.12.17.08.05 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Sat, 12 May 2018 17:08:06 -0700 (PDT) From: Stephan Holljes To: ffmpeg-devel@ffmpeg.org Date: Sun, 13 May 2018 02:07:39 +0200 Message-Id: <20180513000740.12548-5-klaxa1337@googlemail.com> X-Mailer: git-send-email 2.16.2 In-Reply-To: <20180513000740.12548-1-klaxa1337@googlemail.com> References: <20180510154126.30789-1-klaxa1337@googlemail.com> <20180513000740.12548-1-klaxa1337@googlemail.com> Subject: [FFmpeg-devel] [PATCH 4/5] ffserver: Implement ffserver and add Makefile X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.20 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Cc: Stephan Holljes MIME-Version: 1.0 Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" Signed-off-by: Stephan Holljes --- Makefile | 19 +++ ffserver.c | 520 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 539 insertions(+) create mode 100644 Makefile create mode 100644 ffserver.c diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..fbecdeb --- /dev/null +++ b/Makefile @@ -0,0 +1,19 @@ +all: ffserver +LAV_FLAGS = $(shell pkg-config --libs --cflags libavformat libavcodec libavutil) +CFLAGS=-fsanitize=address -fsanitize=undefined +# LAV_FLAGS = -L/usr/local/lib -lavcodec -lavformat -lavutil + +ffserver: segment.o publisher.o lavfhttpd.o ffserver.c + cc -g -Wall $(CFLAGS) $(LAV_FLAGS) -lpthread -o ffserver segment.o publisher.o lavfhttpd.o ffserver.c + +segment.o: segment.c segment.h + cc -g -Wall $(CFLAGS) $(LAV_FLAGS) -lpthread -c segment.c + +publisher.o: publisher.c publisher.h + cc -g -Wall $(CFLAGS) $(LAV_FLAGS) -lpthread -c publisher.c + +lavfhttpd.o: lavfhttpd.c httpd.h + cc -g -Wall $(CFLAGS) $(LAV_FLAGS) -lpthread -c lavfhttpd.c + +clean: + rm -f *.o ffserver diff --git a/ffserver.c b/ffserver.c new file mode 100644 index 0000000..39e1c32 --- /dev/null +++ b/ffserver.c @@ -0,0 +1,520 @@ +/* + * Copyright (c) 2018 Stephan Holljes + * + * This file is part of FFmpeg. + * + * FFmpeg is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * FFmpeg is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with FFmpeg; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/** + * @file + * multimedia server based on the FFmpeg libraries + */ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "segment.h" +#include "publisher.h" +#include "httpd.h" + +#define BUFFER_SECS 30 +#define LISTEN_TIMEOUT_MSEC 1000 + +struct ReadInfo { + struct PublisherContext *pub; + AVFormatContext *ifmt_ctx; + char *in_filename; +}; + +struct WriteInfo { + struct PublisherContext *pub; + int thread_id; +}; + +struct AcceptInfo { + struct PublisherContext *pub; + struct HTTPDInterface *httpd; + AVFormatContext *ifmt_ctx; +}; + + +int ffserver_write(void *opaque, unsigned char *buf, int buf_size) +{ + struct FFServerInfo *info = (struct FFServerInfo*) opaque; + return info->httpd->write(info->server, info->client, buf, buf_size); +} + + +void *read_thread(void *arg) +{ + struct ReadInfo *info = (struct ReadInfo*) arg; + AVFormatContext *ifmt_ctx = info->ifmt_ctx; + int ret, i; + int video_idx = -1; + int id = 0; + int64_t pts, now, start; + int64_t *ts; + struct Segment *seg = NULL; + AVPacket pkt; + AVStream *in_stream; + AVRational tb; + tb.num = 1; + tb.den = AV_TIME_BASE; + AVStream *stream; + AVCodecParameters *params; + enum AVMediaType type; + + if ((ret = avformat_find_stream_info(ifmt_ctx, NULL)) < 0) { + av_log(ifmt_ctx, AV_LOG_ERROR, "Could not get input stream info.\n"); + goto end; + } + + av_log(ifmt_ctx, AV_LOG_INFO, "Finding video stream.\n"); + for (i = 0; i < ifmt_ctx->nb_streams; i++) { + av_log(ifmt_ctx, AV_LOG_DEBUG, "Checking stream %d\n", i); + stream = ifmt_ctx->streams[i]; + params = stream->codecpar; + type = params->codec_type; + if (type == AVMEDIA_TYPE_VIDEO) { + video_idx = i; + break; + } + } + if (video_idx == -1) { + av_log(ifmt_ctx, AV_LOG_ERROR, "No video stream found.\n"); + goto end; + } + + + // All information needed to start segmenting the file is gathered now. + // start BUFFER_SECS seconds "in the past" to "catch up" to real-time. Has no effect on streamed sources. + start = av_gettime_relative() - BUFFER_SECS * AV_TIME_BASE; + + // segmenting main-loop + + for (;;) { + ret = av_read_frame(ifmt_ctx, &pkt); + if (ret < 0) + break; + + in_stream = ifmt_ctx->streams[pkt.stream_index]; + if (pkt.pts == AV_NOPTS_VALUE) { + pkt.pts = 0; + } + if (pkt.dts == AV_NOPTS_VALUE) { + pkt.dts = 0; + } + + pkt.pts = av_rescale_q_rnd(pkt.pts, in_stream->time_base, tb, AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX); + pkt.dts = av_rescale_q_rnd(pkt.dts, in_stream->time_base, tb, AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX); + pkt.duration = av_rescale_q(pkt.duration, in_stream->time_base, tb); + pkt.pos = -1; + + // current pts + pts = pkt.pts; //av_rescale_q(pkt.pts, in_stream->time_base, tb); + + // current stream "uptime" + now = av_gettime_relative() - start; + + // simulate real-time reading + while (pts > now) { + usleep(1000); + now = av_gettime_relative() - start; + } + + // keyframe or first Segment + if ((pkt.flags & AV_PKT_FLAG_KEY && pkt.stream_index == video_idx) || !seg) { + if (seg) { + segment_close(seg); + publisher_push_segment(info->pub, seg); + av_log(NULL, AV_LOG_DEBUG, "New segment pushed.\n"); + publish(info->pub); + av_log(NULL, AV_LOG_DEBUG, "Published new segment.\n"); + } + segment_init(&seg, ifmt_ctx); + seg->id = id++; + av_log(NULL, AV_LOG_DEBUG, "Starting new segment, id: %d\n", seg->id); + } + + ts = av_dynarray2_add((void **)&seg->ts, &seg->ts_len, sizeof(int64_t), + (const void *)&pkt.dts); + if (!ts) { + av_log(seg->fmt_ctx, AV_LOG_ERROR, "could not write dts\n."); + goto end; + } + + ts = av_dynarray2_add((void **)&seg->ts, &seg->ts_len, sizeof(int64_t), + (const void *)&pkt.pts); + if (!ts) { + av_log(seg->fmt_ctx, AV_LOG_ERROR, "could not write pts\n."); + goto end; + } + ret = av_write_frame(seg->fmt_ctx, &pkt); + av_packet_unref(&pkt); + if (ret < 0) { + av_log(seg->fmt_ctx, AV_LOG_ERROR, "av_write_frame() failed.\n"); + goto end; + } + } + + if (ret < 0 && ret != AVERROR_EOF) { + av_log(seg->fmt_ctx, AV_LOG_ERROR, "Error occurred during read: %s\n", av_err2str(ret)); + goto end; + } + + segment_close(seg); + publisher_push_segment(info->pub, seg); + publish(info->pub); + + +end: + avformat_close_input(&ifmt_ctx); + info->pub->shutdown = 1; + return NULL; +} + +void write_segment(struct Client *c) +{ + struct Segment *seg; + int ret; + int pkt_count = 0; + pthread_mutex_lock(&c->buffer_lock); + if (av_fifo_size(c->buffer) > 0) { + AVFormatContext *fmt_ctx; + AVIOContext *avio_ctx; + AVPacket pkt; + struct SegmentReadInfo info; + unsigned char *avio_buffer; + + av_fifo_generic_peek(c->buffer, &seg, sizeof(struct Segment*), NULL); + pthread_mutex_unlock(&c->buffer_lock); + c->current_segment_id = seg->id; + info.buf = seg->buf; + info.left = seg->size; + + if (!(fmt_ctx = avformat_alloc_context())) { + av_log(NULL, AV_LOG_ERROR, "Could not allocate format context\n"); + client_disconnect(c, 0); + return; + } + + avio_buffer = (unsigned char*) av_malloc(AV_BUFSIZE); + avio_ctx = avio_alloc_context(avio_buffer, AV_BUFSIZE, 0, &info, &segment_read, NULL, NULL); + + fmt_ctx->pb = avio_ctx; + ret = avformat_open_input(&fmt_ctx, NULL, seg->ifmt, NULL); + if (ret < 0) { + av_log(avio_ctx, AV_LOG_ERROR, "Could not open input\n"); + av_free(avio_ctx->buffer); + avio_context_free(&avio_ctx); + client_disconnect(c, 0); + return; + } + + ret = avformat_find_stream_info(fmt_ctx, NULL); + if (ret < 0) { + av_log(fmt_ctx, AV_LOG_ERROR, "Could not find stream information\n"); + av_free(avio_ctx->buffer); + avio_context_free(&avio_ctx); + client_disconnect(c, 0); + return; + } + + av_log(fmt_ctx, AV_LOG_DEBUG, "Client: %d, Segment: %d\n", c->id, seg->id); + + for (;;) { + ret = av_read_frame(fmt_ctx, &pkt); + if (ret < 0) + break; + + pkt.dts = seg->ts[pkt_count]; + pkt.pts = seg->ts[pkt_count+1]; + pkt_count += 2; + ret = av_write_frame(c->ofmt_ctx, &pkt); + av_packet_unref(&pkt); + if (ret < 0) { + av_log(fmt_ctx, AV_LOG_ERROR, "write_frame failed, disconnecting client: %d\n", c->id); + avformat_close_input(&fmt_ctx); + av_free(avio_ctx->buffer); + avio_context_free(&avio_ctx); + client_disconnect(c, 0); + return; + } + } + avformat_close_input(&fmt_ctx); + av_free(avio_ctx->buffer); + avformat_free_context(fmt_ctx); + avio_context_free(&avio_ctx); + pthread_mutex_lock(&c->buffer_lock); + av_fifo_drain(c->buffer, sizeof(struct Segment*)); + pthread_mutex_unlock(&c->buffer_lock); + segment_unref(seg); + client_set_state(c, WRITABLE); + } else { + pthread_mutex_unlock(&c->buffer_lock); + client_set_state(c, WAIT); + } +} + +void *accept_thread(void *arg) +{ + struct AcceptInfo *info = (struct AcceptInfo*) arg; + struct FFServerInfo *ffinfo = NULL; + char status[4096]; + struct HTTPClient *client = NULL; + void *server = NULL; + AVIOContext *client_ctx = NULL; + AVFormatContext *ofmt_ctx = NULL; + unsigned char *avio_buffer; + AVOutputFormat *ofmt; + AVDictionary *mkvopts = NULL; + AVStream *in_stream, *out_stream; + int ret, i, reply_code; + struct HTTPDConfig config = { + .bind_address = "0", + .port = 8080, + .accept_timeout = LISTEN_TIMEOUT_MSEC, + }; + + info->httpd->init(&server, config); + + + for (;;) { + if (info->pub->shutdown) + break; + publisher_gen_status_json(info->pub, status); + av_log(server, AV_LOG_INFO, status); + client = NULL; + av_log(server, AV_LOG_DEBUG, "Accepting new clients.\n"); + reply_code = 200; + if (publisher_reserve_client(info->pub)) { + av_log(client, AV_LOG_WARNING, "No more client slots free, Returning 503.\n"); + reply_code = 503; + } + + if ((ret = info->httpd->accept(server, &client, reply_code)) < 0) { + if (ret == HTTPD_LISTEN_TIMEOUT) { + publisher_cancel_reserve(info->pub); + continue; + } else if (ret == HTTPD_CLIENT_ERROR) { + info->httpd->close(server, client); + } + av_log(server, AV_LOG_WARNING, "Error during accept, retrying.\n"); + publisher_cancel_reserve(info->pub); + continue; + } + + if (reply_code != 200) { + publisher_cancel_reserve(info->pub); + info->httpd->close(server, client); + continue; + } + + avio_buffer = av_malloc(AV_BUFSIZE); + ffinfo = av_malloc(sizeof(struct FFServerInfo)); + ffinfo->httpd = info->httpd; + ffinfo->client = client; + ffinfo->server = server; + client_ctx = avio_alloc_context(avio_buffer, AV_BUFSIZE, 1, ffinfo, NULL, &ffserver_write, NULL); + if (!client_ctx) { + av_log(client, AV_LOG_ERROR, "Could not allocate output format context.\n"); + publisher_cancel_reserve(info->pub); + info->httpd->close(server, client); + av_free(client_ctx->buffer); + avio_context_free(&client_ctx); + av_free(ffinfo); + continue; + } + avformat_alloc_output_context2(&ofmt_ctx, NULL, "matroska", NULL); + if (!ofmt_ctx) { + av_log(client, AV_LOG_ERROR, "Could not allocate output format context.\n"); + publisher_cancel_reserve(info->pub); + info->httpd->close(server, client); + avformat_free_context(ofmt_ctx); + av_free(client_ctx->buffer); + avio_context_free(&client_ctx); + av_free(ffinfo); + continue; + } + if ((ret = av_dict_set(&mkvopts, "live", "1", 0)) < 0) { + av_log(client, AV_LOG_ERROR, "Failed to set live mode for matroska: %s\n", av_err2str(ret)); + publisher_cancel_reserve(info->pub); + info->httpd->close(server, client); + avformat_free_context(ofmt_ctx); + av_free(client_ctx->buffer); + avio_context_free(&client_ctx); + av_free(ffinfo); + continue; + } + ofmt_ctx->flags |= AVFMT_FLAG_GENPTS; + ofmt = ofmt_ctx->oformat; + ofmt->flags |= AVFMT_NOFILE | AVFMT_FLAG_AUTO_BSF; + + for (i = 0; i < info->ifmt_ctx->nb_streams; i++) { + in_stream = info->ifmt_ctx->streams[i]; + out_stream = avformat_new_stream(ofmt_ctx, NULL); + + if (!out_stream) { + av_log(client, AV_LOG_ERROR, "Could not allocate output stream.\n"); + publisher_cancel_reserve(info->pub); + info->httpd->close(server, client); + avformat_free_context(ofmt_ctx); + av_free(client_ctx->buffer); + avio_context_free(&client_ctx); + av_free(ffinfo); + continue; + } + + ret = avcodec_parameters_copy(out_stream->codecpar, in_stream->codecpar); + if (ret < 0) { + av_log(client, AV_LOG_ERROR, "Failed to copy context from input to output stream codec context: %s.\n", av_err2str(ret)); + publisher_cancel_reserve(info->pub); + info->httpd->close(server, client); + avformat_free_context(ofmt_ctx); + av_free(client_ctx->buffer); + avio_context_free(&client_ctx); + av_free(ffinfo); + continue; + } + out_stream->codecpar->codec_tag = 0; + if (out_stream->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) { + if (in_stream->sample_aspect_ratio.num) + out_stream->sample_aspect_ratio = in_stream->sample_aspect_ratio; + out_stream->avg_frame_rate = in_stream->avg_frame_rate; + out_stream->r_frame_rate = in_stream->r_frame_rate; + } + av_dict_copy(&out_stream->metadata, in_stream->metadata, 0); + } + av_dict_copy(&info->ifmt_ctx->metadata, ofmt_ctx->metadata, 0); + ofmt_ctx->pb = client_ctx; + ret = avformat_write_header(ofmt_ctx, &mkvopts); + if (ret < 0) { + av_log(client, AV_LOG_ERROR, "Could not write header to client: %s.\n", av_err2str(ret)); + publisher_cancel_reserve(info->pub); + info->httpd->close(server, client); + avformat_free_context(ofmt_ctx); + av_free(client_ctx->buffer); + avio_context_free(&client_ctx); + av_free(ffinfo); + continue; + } + publisher_add_client(info->pub, ofmt_ctx, ffinfo); + ofmt_ctx = NULL; + + } + av_log(server, AV_LOG_INFO, "Shutting down http server.\n"); + info->httpd->shutdown(server); + av_log(NULL, AV_LOG_INFO, "Shut down http server.\n"); + return NULL; +} + +void *write_thread(void *arg) +{ + struct WriteInfo *info = (struct WriteInfo*) arg; + int i, nb_free; + struct Client *c; + for(;;) { + nb_free = 0; + usleep(500000); + av_log(NULL, AV_LOG_DEBUG, "Checking clients, thread: %d\n", info->thread_id); + for (i = 0; i < MAX_CLIENTS; i++) { + c = &info->pub->clients[i]; + switch(c->state) { + case WRITABLE: + client_set_state(c, BUSY); + write_segment(c); + if (info->pub->shutdown && info->pub->current_segment_id == c->current_segment_id) { + client_disconnect(c, 1); + } + continue; + case FREE: + nb_free++; + default: + continue; + } + } + if (info->pub->shutdown && nb_free == MAX_CLIENTS) + break; + } + + return NULL; +} + + +int main(int argc, char *argv[]) +{ + struct ReadInfo rinfo; + struct AcceptInfo ainfo; + struct WriteInfo *winfos; + struct PublisherContext *pub; + int ret, i; + pthread_t r_thread, a_thread; + pthread_t *w_threads; + + AVFormatContext *ifmt_ctx = NULL; + + rinfo.in_filename = "pipe:0"; + if (argc > 1) + rinfo.in_filename = argv[1]; + + av_log_set_level(AV_LOG_INFO); + + if ((ret = avformat_open_input(&ifmt_ctx, rinfo.in_filename, NULL, NULL))) { + av_log(NULL, AV_LOG_ERROR, "main: Could not open input\n"); + return 1; + } + + publisher_init(&pub); + + rinfo.ifmt_ctx = ifmt_ctx; + rinfo.pub = pub; + ainfo.ifmt_ctx = ifmt_ctx; + ainfo.pub = pub; + ainfo.httpd = &lavfhttpd; + + w_threads = (pthread_t*) av_malloc(sizeof(pthread_t) * pub->nb_threads); + winfos = (struct WriteInfo*) av_malloc(sizeof(struct WriteInfo) * pub->nb_threads); + + for (i = 0; i < pub->nb_threads; i++) { + winfos[i].pub = pub; + winfos[i].thread_id = i; + pthread_create(&w_threads[i], NULL, write_thread, &winfos[i]); + } + + pthread_create(&r_thread, NULL, read_thread, &rinfo); + + accept_thread(&ainfo); + + pthread_join(r_thread, NULL); + + for (i = 0; i < pub->nb_threads; i++) { + pthread_join(w_threads[i], NULL); + } + av_free(w_threads); + av_free(winfos); + + publisher_freep(&pub); + return 0; +}