From patchwork Thu May 31 22:20:30 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Stephan Holljes X-Patchwork-Id: 9213 Delivered-To: ffmpegpatchwork@gmail.com Received: by 2002:a02:11c:0:0:0:0:0 with SMTP id c28-v6csp164035jad; Thu, 31 May 2018 15:28:35 -0700 (PDT) X-Google-Smtp-Source: ADUXVKLN+eU1P+msmsjNjR09cyok+oY75Ed3HqEXQ79QU+JVH8PPcJYNaF8zwqRIPn/Wskq2h47Q X-Received: by 2002:adf:9561:: with SMTP id 88-v6mr2528069wrs.223.1527805715218; Thu, 31 May 2018 15:28:35 -0700 (PDT) ARC-Seal: i=1; a=rsa-sha256; t=1527805715; cv=none; d=google.com; s=arc-20160816; b=Nxtj88fb9DDYJBktPXLh75mVe56JDr/rdnffTuVC3IY9rS5MF1tJoA+sd7p1mzTuN0 lvVuLiy4XF4U7jo6nflEkbFFaY50O3FWVy2ysz0RiCNRIORQSMpOt/IDeZ1rHQE1nb/P EkBFSsVm1W3SIGu+k8mHGxi9XVRYKumnootQS0AsCyLbY2e7TmmJTbQ+MQBzoNablm2x 8dFsS/bm51hz7fid4A6V6OQbVqBTgRabS6/TquS5phwbx0a61nJZnKeF/nlkFohbAdPt Na3AR6e6KvM9De1SqbggNFVovzX7sW7LMVlFOWD2H9SDe27Vu8kQhFC/14SBUPGPBwc0 ujTg== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:content-transfer-encoding:mime-version:cc:reply-to :list-subscribe:list-help:list-post:list-archive:list-unsubscribe :list-id:precedence:subject:references:in-reply-to:message-id:date :to:from:dkim-signature:delivered-to:arc-authentication-results; bh=4G0lTWyDSzmEIHhaAfZqS+ijCGp8wcaXX0llJYWilwY=; b=T3eao7NVEmssctHI1pTbOO6FJm6ls8QnWr1Voho7aH8YNrTeW4EubOIts6syMJzSCE stpjzlTldASH10pUpimNdGzyOgMyX4qvXUT4b5kvDtUyrWEzM05m5j1onLgh46iBUOps UXHveztuR4JYaE5xNqJfCQocb1EnIJJ6sHgs1PY+wGvQQfXb57osHWcJdRQnKkEFFnRR RTl3FuN0vq2FKt/M1geo40cfP5/uVe6tFgGmS0ypK6BDbKQGsl8VrcIOXz0/EbWCqjMl LcB257n9S/mMMVG+v5xrLoz/BNXg8QDir8Kp+gSoJ6OP5JhEoXNeUrqcBPQhZ5TtMpqF H2Gg== ARC-Authentication-Results: i=1; mx.google.com; dkim=neutral (body hash did not verify) header.i=@googlemail.com header.s=20161025 header.b=AjKKfiSK; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org; dmarc=fail (p=QUARANTINE sp=QUARANTINE dis=NONE) header.from=googlemail.com Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org. [79.124.17.100]) by mx.google.com with ESMTP id m1-v6si16135323wrp.99.2018.05.31.15.28.34; Thu, 31 May 2018 15:28:35 -0700 (PDT) Received-SPF: pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) client-ip=79.124.17.100; Authentication-Results: mx.google.com; dkim=neutral (body hash did not verify) header.i=@googlemail.com header.s=20161025 header.b=AjKKfiSK; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org; dmarc=fail (p=QUARANTINE sp=QUARANTINE dis=NONE) header.from=googlemail.com Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id AA4E768A571; Fri, 1 Jun 2018 01:27:48 +0300 (EEST) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from mail-wr0-f171.google.com (mail-wr0-f171.google.com [209.85.128.171]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id 5CE2E68A3F5 for ; Fri, 1 Jun 2018 01:27:47 +0300 (EEST) Received: by mail-wr0-f171.google.com with SMTP id w7-v6so22517336wrn.6 for ; Thu, 31 May 2018 15:28:32 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=googlemail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references; bh=xCBztddj+qA3Z9kuE2PPAm9pLbDg1ko/UOt6q87dUAs=; b=AjKKfiSKnv2IuKg+Tf9dStTLMJmpCVxXHkKAV4WbrHDzBBf02suUHx62kmL/MhgRp6 9EhDKA/vaHQ0mYAHdfNUxCHQEZHxDN363ASqo7a+CxEwhHS8a+yZTzrWN24v8NeiVSDo sSnrRytwF3kj6szr3MNBat3bRBbFU7byo7c6ypcwXmkJfNtbMR01QXIQWpRyoaNw57D5 AcuWbPy0OgM53xV5y95lVNrzRNdIebHtJFfl/Ll3PZNAXHHp8wDxgMCD6VGtDqWgZ+iT 241zKogxh3xckL1WbHiKfG69UdntzWGCe7T174IvnJpddvrZUMTrzFrvHXGyAYW+i6rM bRFg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=xCBztddj+qA3Z9kuE2PPAm9pLbDg1ko/UOt6q87dUAs=; b=s5SR0wU14eZKg7ybM5G8z2rznPSrCU7L4qjOKf7PLufqs3AgHKXOz2xhzSGY0rJKGo zZNO5RdfFF7mM7HuBohh6Fx7WsmfhMLND1AXr20x+bdkEhsHWGczvqwBY1qJx1aFFMbW 7IMAo+MzMcvQXbNuEiwB+rZytzaWoAtBU40ykAAHBuaDgolOCp+2bZIYaiPWEOStjqju 6wpQWLhguCCcL6jutRHVZIx2s0bNdrADTzq53CG5VsfEGZUlpNO2gXSjlMbbpbH8vqQd w2O5KMFdrGvjrtmSKFf9EKj6ISK7mRf4tQa8QAPI4lsus4D+NDPXvPNV5CKKEcVOvw+O /9BA== X-Gm-Message-State: ALKqPwcbF1kUGIjhD7DRFs+Ed7cRSsCHYklDGcZyhyglTj4D+t5RXCmx fDPaif7e3ZVOJYB55ADHEZdrJw== X-Received: by 2002:adf:d192:: with SMTP id h18-v6mr6488374wri.198.1527805249872; Thu, 31 May 2018 15:20:49 -0700 (PDT) Received: from localhost.localdomain ([46.5.2.0]) by smtp.gmail.com with ESMTPSA id 72-v6sm38288429wrb.22.2018.05.31.15.20.48 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Thu, 31 May 2018 15:20:49 -0700 (PDT) From: Stephan Holljes To: ffmpeg-devel@ffmpeg.org Date: Fri, 1 Jun 2018 00:20:30 +0200 Message-Id: <20180531222032.30111-7-klaxa1337@googlemail.com> X-Mailer: git-send-email 2.16.2 In-Reply-To: <20180531222032.30111-1-klaxa1337@googlemail.com> References: <20180528181859.12182-1-klaxa1337@googlemail.com> <20180531222032.30111-1-klaxa1337@googlemail.com> Subject: [FFmpeg-devel] [PATCH 6/8] ffserver.c: Add config file reading X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.20 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Cc: Stephan Holljes MIME-Version: 1.0 Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" Signed-off-by: Stephan Holljes --- ffserver.c | 248 ++++++++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 172 insertions(+), 76 deletions(-) diff --git a/ffserver.c b/ffserver.c index b80a7f8..1363cdc 100644 --- a/ffserver.c +++ b/ffserver.c @@ -38,6 +38,7 @@ #include "segment.h" #include "publisher.h" #include "httpd.h" +#include "configreader.h" #define BUFFER_SECS 30 #define LISTEN_TIMEOUT_MSEC 1000 @@ -54,9 +55,11 @@ struct WriteInfo { }; struct AcceptInfo { - struct PublisherContext *pub; + struct PublisherContext **pubs; struct HTTPDInterface *httpd; - AVFormatContext *ifmt_ctx; + AVFormatContext **ifmt_ctxs; + struct HTTPDConfig *config; + int nb_pub; /* number of publishers (streams) equal to number of ifmt_ctx */ }; @@ -286,52 +289,77 @@ void *accept_thread(void *arg) { struct AcceptInfo *info = (struct AcceptInfo*) arg; struct FFServerInfo *ffinfo = NULL; + struct PublisherContext *pub; char status[4096]; + char *stream_name; struct HTTPClient *client = NULL; void *server = NULL; AVIOContext *client_ctx = NULL; AVFormatContext *ofmt_ctx = NULL; + AVFormatContext *ifmt_ctx; unsigned char *avio_buffer; AVOutputFormat *ofmt; AVDictionary *mkvopts = NULL; AVStream *in_stream, *out_stream; int ret, i, reply_code; - struct HTTPDConfig config = { - .bind_address = "0", - .port = 8080, - .accept_timeout = LISTEN_TIMEOUT_MSEC, - }; - - info->httpd->init(&server, config); - - + int shutdown; + struct HTTPDConfig *config = info->config; + + info->httpd->init(&server, *config); + for (;;) { - if (info->pub->shutdown) + shutdown = 1; + for (i = 0; i < config->nb_streams; i++) { + if (info->pubs[i] && !info->pubs[i]->shutdown) + shutdown = 0; + } + if (shutdown) break; - publisher_gen_status_json(info->pub, status); - av_log(server, AV_LOG_INFO, status); + for (i = 0; i < config->nb_streams; i++) { + publisher_gen_status_json(info->pubs[i], status); + av_log(server, AV_LOG_INFO, status); + } client = NULL; av_log(server, AV_LOG_DEBUG, "Accepting new clients.\n"); reply_code = 200; - if (publisher_reserve_client(info->pub)) { - av_log(client, AV_LOG_WARNING, "No more client slots free, Returning 503.\n"); - reply_code = 503; - } - + if ((ret = info->httpd->accept(server, &client, reply_code)) < 0) { if (ret == HTTPD_LISTEN_TIMEOUT) { - publisher_cancel_reserve(info->pub); continue; } else if (ret == HTTPD_CLIENT_ERROR) { info->httpd->close(server, client); } av_log(server, AV_LOG_WARNING, "Error during accept, retrying.\n"); - publisher_cancel_reserve(info->pub); continue; } - + + pub = NULL; + ifmt_ctx = NULL; + for (i = 0; i < config->nb_streams; i++) { + stream_name = info->pubs[i]->stream_name; + // skip leading '/' ---v + if(!strncmp(client->resource + 1, stream_name, strlen(stream_name))) { + pub = info->pubs[i]; + ifmt_ctx = info->ifmt_ctxs[i]; + break; + } + } + + if (!pub || !ifmt_ctx) { + av_log(client_ctx, AV_LOG_WARNING, "No suitable publisher found for resource: %s.\n", + client->resource ? client->resource : "(null)"); + reply_code = 404; + } + + + if (pub && ifmt_ctx && publisher_reserve_client(pub)) { + av_log(client_ctx, AV_LOG_WARNING, "No more client slots free, Returning 503.\n"); + reply_code = 503; + } + if (reply_code != 200) { - publisher_cancel_reserve(info->pub); + if (pub && ifmt_ctx) + publisher_cancel_reserve(pub); info->httpd->close(server, client); continue; } @@ -344,7 +372,7 @@ void *accept_thread(void *arg) client_ctx = avio_alloc_context(avio_buffer, AV_BUFSIZE, 1, ffinfo, NULL, &ffserver_write, NULL); if (!client_ctx) { av_log(client, AV_LOG_ERROR, "Could not allocate output format context.\n"); - publisher_cancel_reserve(info->pub); + publisher_cancel_reserve(pub); info->httpd->close(server, client); av_free(client_ctx->buffer); avio_context_free(&client_ctx); @@ -354,7 +382,7 @@ void *accept_thread(void *arg) avformat_alloc_output_context2(&ofmt_ctx, NULL, "matroska", NULL); if (!ofmt_ctx) { av_log(client, AV_LOG_ERROR, "Could not allocate output format context.\n"); - publisher_cancel_reserve(info->pub); + publisher_cancel_reserve(pub); info->httpd->close(server, client); avformat_free_context(ofmt_ctx); av_free(client_ctx->buffer); @@ -364,7 +392,7 @@ void *accept_thread(void *arg) } if ((ret = av_dict_set(&mkvopts, "live", "1", 0)) < 0) { av_log(client, AV_LOG_ERROR, "Failed to set live mode for matroska: %s\n", av_err2str(ret)); - publisher_cancel_reserve(info->pub); + publisher_cancel_reserve(pub); info->httpd->close(server, client); avformat_free_context(ofmt_ctx); av_free(client_ctx->buffer); @@ -376,13 +404,13 @@ void *accept_thread(void *arg) ofmt = ofmt_ctx->oformat; ofmt->flags |= AVFMT_NOFILE | AVFMT_FLAG_AUTO_BSF; - for (i = 0; i < info->ifmt_ctx->nb_streams; i++) { - in_stream = info->ifmt_ctx->streams[i]; + for (i = 0; i < ifmt_ctx->nb_streams; i++) { + in_stream = ifmt_ctx->streams[i]; out_stream = avformat_new_stream(ofmt_ctx, NULL); if (!out_stream) { av_log(client, AV_LOG_ERROR, "Could not allocate output stream.\n"); - publisher_cancel_reserve(info->pub); + publisher_cancel_reserve(pub); info->httpd->close(server, client); avformat_free_context(ofmt_ctx); av_free(client_ctx->buffer); @@ -394,7 +422,7 @@ void *accept_thread(void *arg) ret = avcodec_parameters_copy(out_stream->codecpar, in_stream->codecpar); if (ret < 0) { av_log(client, AV_LOG_ERROR, "Failed to copy context from input to output stream codec context: %s.\n", av_err2str(ret)); - publisher_cancel_reserve(info->pub); + publisher_cancel_reserve(pub); info->httpd->close(server, client); avformat_free_context(ofmt_ctx); av_free(client_ctx->buffer); @@ -411,12 +439,12 @@ void *accept_thread(void *arg) } av_dict_copy(&out_stream->metadata, in_stream->metadata, 0); } - av_dict_copy(&info->ifmt_ctx->metadata, ofmt_ctx->metadata, 0); + av_dict_copy(&ifmt_ctx->metadata, ofmt_ctx->metadata, 0); ofmt_ctx->pb = client_ctx; ret = avformat_write_header(ofmt_ctx, &mkvopts); if (ret < 0) { av_log(client, AV_LOG_ERROR, "Could not write header to client: %s.\n", av_err2str(ret)); - publisher_cancel_reserve(info->pub); + publisher_cancel_reserve(pub); info->httpd->close(server, client); avformat_free_context(ofmt_ctx); av_free(client_ctx->buffer); @@ -424,7 +452,7 @@ void *accept_thread(void *arg) av_free(ffinfo); continue; } - publisher_add_client(info->pub, ofmt_ctx, ffinfo); + publisher_add_client(pub, ofmt_ctx, ffinfo); ofmt_ctx = NULL; } @@ -466,59 +494,127 @@ void *write_thread(void *arg) return NULL; } - -int main(int argc, char *argv[]) -{ - struct ReadInfo rinfo; +void *run_server(void *arg) { struct AcceptInfo ainfo; - struct WriteInfo *winfos; - struct PublisherContext *pub; - int ret, i; - pthread_t r_thread, a_thread; - pthread_t *w_threads; + struct ReadInfo *rinfos; + struct WriteInfo **winfos_p; + struct HTTPDConfig *config = (struct HTTPDConfig*) arg; + struct PublisherContext **pubs; + AVFormatContext **ifmt_ctxs; + int ret, i, stream_index; + pthread_t *r_threads; + pthread_t **w_threads_p; - AVFormatContext *ifmt_ctx = NULL; - - rinfo.in_filename = "pipe:0"; - if (argc > 1) - rinfo.in_filename = argv[1]; + pubs = av_mallocz(config->nb_streams * sizeof(struct PublisherContext*)); + ifmt_ctxs = av_mallocz(config->nb_streams * sizeof(AVFormatContext*)); av_log_set_level(AV_LOG_INFO); - if ((ret = avformat_open_input(&ifmt_ctx, rinfo.in_filename, NULL, NULL))) { - av_log(NULL, AV_LOG_ERROR, "main: Could not open input\n"); - return 1; - } - - publisher_init(&pub); - - rinfo.ifmt_ctx = ifmt_ctx; - rinfo.pub = pub; - ainfo.ifmt_ctx = ifmt_ctx; - ainfo.pub = pub; + ainfo.pubs = pubs; + ainfo.ifmt_ctxs = ifmt_ctxs; + ainfo.nb_pub = config->nb_streams; ainfo.httpd = &lavfhttpd; + ainfo.config = config; - w_threads = (pthread_t*) av_malloc(sizeof(pthread_t) * pub->nb_threads); - winfos = (struct WriteInfo*) av_malloc(sizeof(struct WriteInfo) * pub->nb_threads); + rinfos = av_mallocz(config->nb_streams * sizeof(struct ReadInfo)); + winfos_p = av_mallocz(config->nb_streams * sizeof(struct WriteInfo*)); + r_threads = av_mallocz(config->nb_streams * sizeof(pthread_t)); + w_threads_p = av_mallocz(config->nb_streams * sizeof(pthread_t*)); - for (i = 0; i < pub->nb_threads; i++) { - winfos[i].pub = pub; - winfos[i].thread_id = i; - pthread_create(&w_threads[i], NULL, write_thread, &winfos[i]); + for (stream_index = 0; stream_index < config->nb_streams; stream_index++) { + struct PublisherContext *pub = NULL; + struct AVFormatContext *ifmt_ctx = NULL; + struct ReadInfo rinfo; + struct WriteInfo *winfos = NULL; + pthread_t *w_threads = NULL; + pthread_t r_thread; + rinfo.input_uri = config->streams[stream_index].input_uri; + + if ((ret = avformat_open_input(&ifmt_ctx, rinfo.input_uri, NULL, NULL))) { + av_log(NULL, AV_LOG_ERROR, "run_server: Could not open input\n"); + continue; + } + + ifmt_ctxs[stream_index] = ifmt_ctx; + + publisher_init(&pub, config->streams[stream_index].stream_name); + pubs[stream_index] = pub; + + rinfo.ifmt_ctx = ifmt_ctx; + rinfo.pub = pub; + + rinfos[stream_index] = rinfo; + + w_threads = av_malloc(sizeof(pthread_t) * pub->nb_threads); + winfos = av_malloc(sizeof(struct WriteInfo) * pub->nb_threads); + + w_threads_p[stream_index] = w_threads; + winfos_p[stream_index] = winfos; + + for (i = 0; i < pub->nb_threads; i++) { + winfos[i].pub = pub; + winfos[i].thread_id = i; + pthread_create(&w_threads[i], NULL, write_thread, &winfos_p[stream_index][i]); + } + w_threads_p[stream_index] = w_threads; + pthread_create(&r_thread, NULL, read_thread, &rinfos[stream_index]); + r_threads[stream_index] = r_thread; } - - pthread_create(&r_thread, NULL, read_thread, &rinfo); - + + + //pthread_create(&a_thread, NULL, accept_thread, &ainfo); accept_thread(&ainfo); - - pthread_join(r_thread, NULL); - - for (i = 0; i < pub->nb_threads; i++) { - pthread_join(w_threads[i], NULL); + for (stream_index = 0; stream_index < config->nb_streams; stream_index++) { + pthread_join(r_threads[stream_index], NULL); + if (pubs[stream_index]) { + for (i = 0; i < pubs[stream_index]->nb_threads; i++) { + pthread_join(w_threads_p[stream_index][i], NULL); + } + } + av_free(winfos_p[stream_index]); + av_free(w_threads_p[stream_index]); + // pubs[stream_index] could be null if the file could not be opened + if (pubs[stream_index]) + publisher_free(pubs[stream_index]); } - av_free(w_threads); - av_free(winfos); - - publisher_freep(&pub); + av_free(rinfos); + av_free(winfos_p); + av_free(r_threads); + av_free(w_threads_p); + av_free(pubs); + av_free(ifmt_ctxs); + + return NULL; +} + +int main(int argc, char *argv[]) +{ + struct HTTPDConfig *configs; + int nb_configs; + pthread_t *server_threads; + int i; + + if (argc < 2) { + printf("Usage: %s config.lua\n", argv[0]); + return 1; + } + + nb_configs = configs_read(&configs, argv[1]); + if (nb_configs <= 0) { + printf("No valid configurations parsed.\n"); + return 1; + } + server_threads = av_malloc(nb_configs * sizeof(pthread_t)); + for (i = 0; i < nb_configs; i++) { + config_dump(configs + i); + pthread_create(&server_threads[i], NULL, run_server, configs + i); + } + + for (i = 0; i < nb_configs; i++) { + pthread_join(server_threads[i], NULL); + config_free(configs + i); + } + av_free(configs); + av_free(server_threads); return 0; }