From patchwork Mon May 28 18:18:57 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Stephan Holljes X-Patchwork-Id: 9123 Delivered-To: ffmpegpatchwork@gmail.com Received: by 2002:a02:11c:0:0:0:0:0 with SMTP id c28-v6csp2230610jad; Mon, 28 May 2018 11:27:07 -0700 (PDT) X-Google-Smtp-Source: AB8JxZrvV62gcHz5ZEHU8ErJ6WSF0vO9aymRS2MqBbOUdCXmJvqyfDECX23Ixy+lFenDcYk2FZ40 X-Received: by 2002:a1c:5894:: with SMTP id m142-v6mr9145219wmb.10.1527532027055; Mon, 28 May 2018 11:27:07 -0700 (PDT) ARC-Seal: i=1; a=rsa-sha256; t=1527532027; cv=none; d=google.com; s=arc-20160816; b=VHu0GFeUPjf9Z8zxf4ZaEi6dKGrogDGVM16CVuoZabwQyEqduNGKKdx9WhoM4xqRva 77AhLYTv9s6JWa2OWY1e3oB77pC97mkCIcxfIhAk1L3y8gZ/O9J+J7u6xM3JY7yaWJVZ K0BBwhO3QXRyns2dvneJyzRuIO7q+1jKhfHMaH9+RnX9nHHZoarYsMraqLfbcYIE0iwr OmjtPFqgZLzMNuuotcjkQfCZSOAj56Gr4ADY4bG2yMm3VCKHTV5L94elFAzoXLFb5Rud 5vXhtJ/CRNfFbe9PZYhkprOGC/eTcGqwa9Xmna+xpMiksbqOoJ4dxKYUjoNRI9xHkj8W PEVA== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:content-transfer-encoding:mime-version:cc:reply-to :list-subscribe:list-help:list-post:list-archive:list-unsubscribe :list-id:precedence:subject:references:in-reply-to:message-id:date :to:from:dkim-signature:delivered-to:arc-authentication-results; bh=RdxADqRPsJcEPNrokTjomM8morwbKcBlZIDu7Bok4s8=; b=MoKMV2Yp6bQVSJChT8CMfr0ki9x0OM8AK1cvu53YX0FnAqhzijcYLykQ+2Mg8Y93hV /38rU1WBLWE1K8SS6RiuaKbLfhEDju4UdwbWzuKZ63W9BfLSJYzX0XWPSnFvYw5jMTvM HQVYBgPOwnlW8lkPEGgiq9j3B7YFhOWV77YeiwM5R4GEgzHaa40S8HSyGDWaO2zoON6r TEg+1k9Rl6amsMsOvuYbh/ygHLk2kTUN04Q9Hz3eXVSoN/SQWZSWSlhJMHJNl+2tw6CG Z6Yi2JQ8c207+1rSAOC/tvNw+yv4PGHPECY370+9mmVyJKD7k9rG2N2KUGs4iPbYnr+T 2ehg== ARC-Authentication-Results: i=1; mx.google.com; dkim=neutral (body hash did not verify) header.i=@googlemail.com header.s=20161025 header.b=TQoLJBxc; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org; dmarc=fail (p=QUARANTINE sp=QUARANTINE dis=NONE) header.from=googlemail.com Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org. [79.124.17.100]) by mx.google.com with ESMTP id i18-v6si10467057wmh.82.2018.05.28.11.27.06; Mon, 28 May 2018 11:27:07 -0700 (PDT) Received-SPF: pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) client-ip=79.124.17.100; Authentication-Results: mx.google.com; dkim=neutral (body hash did not verify) header.i=@googlemail.com header.s=20161025 header.b=TQoLJBxc; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org; dmarc=fail (p=QUARANTINE sp=QUARANTINE dis=NONE) header.from=googlemail.com Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id EF9B868A3B1; Mon, 28 May 2018 21:26:20 +0300 (EEST) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from mail-wm0-f66.google.com (mail-wm0-f66.google.com [74.125.82.66]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id A2AA9689EFF for ; Mon, 28 May 2018 21:26:19 +0300 (EEST) Received: by mail-wm0-f66.google.com with SMTP id m129-v6so34111714wmb.3 for ; Mon, 28 May 2018 11:27:03 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=googlemail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references; bh=CZpo1DiZxOZSJMYhqJrFvzEHn8ODQtWTETvqJvmkWZ8=; b=TQoLJBxcKlInFR1r6qvsH3yrgbD/mzvCPZT/0kunIXsKxUYS+wCgsqrcPwfaNRZAIe Gl/eUm51qcxA8GjrpuXz+saY/RyNWqTN3Ss+dQzi+V5fHccffCiaXHtemgSD4scIJg5S /jOF5XPoI2mfuabc9cuHb1p/zANdsnr9nOqWrQ8i0gMoRrsEqwt47h7yu1f2/XJNe3Za A0eciQ6mqR+Uv/nsBT3GQt0qUypZd4Dm9prQIamEODW9rkIlov7WNWf9b7X3FgFzn3SA gFqo71Kgc31F6NSd3GL96eHXve4oav4Eg9y1jpusmB2KEEwQHYzQ5fK6DVUsDWBTb0cu gQbg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=CZpo1DiZxOZSJMYhqJrFvzEHn8ODQtWTETvqJvmkWZ8=; b=aqRwGQVZXvc3XTnK0ni8vgSe5hzl2/tqKTy1vSRXRqSlqygPrP3KuSdX6qYn5QDaow CRtYj6Ln33Peev/iRvWwMTDPkQCshtBZBS7dAvMEKzsUN5EPogPYiUMUnF9WHcKu+P8w +m/OwGaM9mL9S7F5hexCNjpETKNY0hvhes8I7N/DfV1ke7AxktRktQu7j0fx44G5UY5T EL39deJsv+Yhq0L+V7Zf8WBeUAiw/m72D9nT1fkGILMPepRNhrW1d/7xt/7YL+O+w0yX AzuKlerozh+OiqPSGvDsLTxfZ3y79KAXFuDuAAJU8osYfBY9m9uWezrwtFpdFV786wWu yK1Q== X-Gm-Message-State: ALKqPwdiRLxFRfxJMI3A0ShWWdbuXGavkZ67cNaHuc7DZhOlWGrJ+l+D y0i8apXI/i4//mmGtxMNwxMqMg== X-Received: by 2002:a1c:a84d:: with SMTP id r74-v6mr10449132wme.114.1527531572712; Mon, 28 May 2018 11:19:32 -0700 (PDT) Received: from localhost.localdomain ([46.5.2.0]) by smtp.gmail.com with ESMTPSA id p5-v6sm14649465wre.83.2018.05.28.11.19.31 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Mon, 28 May 2018 11:19:32 -0700 (PDT) From: Stephan Holljes To: ffmpeg-devel@ffmpeg.org Date: Mon, 28 May 2018 20:18:57 +0200 Message-Id: <20180528181859.12182-7-klaxa1337@googlemail.com> X-Mailer: git-send-email 2.16.2 In-Reply-To: <20180528181859.12182-1-klaxa1337@googlemail.com> References: <20180520185404.29836-1-klaxa1337@googlemail.com> <20180528181859.12182-1-klaxa1337@googlemail.com> Subject: [FFmpeg-devel] [PATCH 6/8] ffserver.c: Add config file reading X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.20 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Cc: Stephan Holljes MIME-Version: 1.0 Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" Signed-off-by: Stephan Holljes --- ffserver.c | 248 ++++++++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 172 insertions(+), 76 deletions(-) diff --git a/ffserver.c b/ffserver.c index 402e710..12c257f 100644 --- a/ffserver.c +++ b/ffserver.c @@ -38,6 +38,7 @@ #include "segment.h" #include "publisher.h" #include "httpd.h" +#include "configreader.h" #define BUFFER_SECS 30 #define LISTEN_TIMEOUT_MSEC 1000 @@ -54,9 +55,11 @@ struct WriteInfo { }; struct AcceptInfo { - struct PublisherContext *pub; + struct PublisherContext **pubs; struct HTTPDInterface *httpd; - AVFormatContext *ifmt_ctx; + AVFormatContext **ifmt_ctxs; + struct HTTPDConfig *config; + int nb_pub; /* number of publishers (streams) equal to number of ifmt_ctx */ }; @@ -290,52 +293,77 @@ void *accept_thread(void *arg) { struct AcceptInfo *info = (struct AcceptInfo*) arg; struct FFServerInfo *ffinfo = NULL; + struct PublisherContext *pub; char status[4096]; + char *stream_name; struct HTTPClient *client = NULL; void *server = NULL; AVIOContext *client_ctx = NULL; AVFormatContext *ofmt_ctx = NULL; + AVFormatContext *ifmt_ctx; unsigned char *avio_buffer; AVOutputFormat *ofmt; AVDictionary *mkvopts = NULL; AVStream *in_stream, *out_stream; int ret, i, reply_code; - struct HTTPDConfig config = { - .bind_address = "0", - .port = 8080, - .accept_timeout = LISTEN_TIMEOUT_MSEC, - }; - - info->httpd->init(&server, config); - - + int shutdown; + struct HTTPDConfig *config = info->config; + + info->httpd->init(&server, *config); + for (;;) { - if (info->pub->shutdown) + shutdown = 1; + for (i = 0; i < config->nb_streams; i++) { + if (info->pubs[i] && !info->pubs[i]->shutdown) + shutdown = 0; + } + if (shutdown) break; - publisher_gen_status_json(info->pub, status); - av_log(server, AV_LOG_INFO, status); + for (i = 0; i < config->nb_streams; i++) { + publisher_gen_status_json(info->pubs[i], status); + av_log(server, AV_LOG_INFO, status); + } client = NULL; av_log(server, AV_LOG_DEBUG, "Accepting new clients.\n"); reply_code = 200; - if (publisher_reserve_client(info->pub)) { - av_log(client, AV_LOG_WARNING, "No more client slots free, Returning 503.\n"); - reply_code = 503; - } - + if ((ret = info->httpd->accept(server, &client, reply_code)) < 0) { if (ret == HTTPD_LISTEN_TIMEOUT) { - publisher_cancel_reserve(info->pub); continue; } else if (ret == HTTPD_CLIENT_ERROR) { info->httpd->close(server, client); } av_log(server, AV_LOG_WARNING, "Error during accept, retrying.\n"); - publisher_cancel_reserve(info->pub); continue; } - + + pub = NULL; + ifmt_ctx = NULL; + for (i = 0; i < config->nb_streams; i++) { + stream_name = info->pubs[i]->stream_name; + // skip leading '/' ---v + if(!strncmp(client->resource + 1, stream_name, strlen(stream_name))) { + pub = info->pubs[i]; + ifmt_ctx = info->ifmt_ctxs[i]; + break; + } + } + + if (!pub || !ifmt_ctx) { + av_log(client_ctx, AV_LOG_WARNING, "No suitable publisher found for resource: %s.\n", + client->resource ? client->resource : "(null)"); + reply_code = 404; + } + + + if (pub && ifmt_ctx && publisher_reserve_client(pub)) { + av_log(client_ctx, AV_LOG_WARNING, "No more client slots free, Returning 503.\n"); + reply_code = 503; + } + if (reply_code != 200) { - publisher_cancel_reserve(info->pub); + if (pub && ifmt_ctx) + publisher_cancel_reserve(pub); info->httpd->close(server, client); continue; } @@ -348,7 +376,7 @@ void *accept_thread(void *arg) client_ctx = avio_alloc_context(avio_buffer, AV_BUFSIZE, 1, ffinfo, NULL, &ffserver_write, NULL); if (!client_ctx) { av_log(client, AV_LOG_ERROR, "Could not allocate output format context.\n"); - publisher_cancel_reserve(info->pub); + publisher_cancel_reserve(pub); info->httpd->close(server, client); av_free(client_ctx->buffer); avio_context_free(&client_ctx); @@ -358,7 +386,7 @@ void *accept_thread(void *arg) avformat_alloc_output_context2(&ofmt_ctx, NULL, "matroska", NULL); if (!ofmt_ctx) { av_log(client, AV_LOG_ERROR, "Could not allocate output format context.\n"); - publisher_cancel_reserve(info->pub); + publisher_cancel_reserve(pub); info->httpd->close(server, client); avformat_free_context(ofmt_ctx); av_free(client_ctx->buffer); @@ -368,7 +396,7 @@ void *accept_thread(void *arg) } if ((ret = av_dict_set(&mkvopts, "live", "1", 0)) < 0) { av_log(client, AV_LOG_ERROR, "Failed to set live mode for matroska: %s\n", av_err2str(ret)); - publisher_cancel_reserve(info->pub); + publisher_cancel_reserve(pub); info->httpd->close(server, client); avformat_free_context(ofmt_ctx); av_free(client_ctx->buffer); @@ -380,13 +408,13 @@ void *accept_thread(void *arg) ofmt = ofmt_ctx->oformat; ofmt->flags |= AVFMT_NOFILE | AVFMT_FLAG_AUTO_BSF; - for (i = 0; i < info->ifmt_ctx->nb_streams; i++) { - in_stream = info->ifmt_ctx->streams[i]; + for (i = 0; i < ifmt_ctx->nb_streams; i++) { + in_stream = ifmt_ctx->streams[i]; out_stream = avformat_new_stream(ofmt_ctx, NULL); if (!out_stream) { av_log(client, AV_LOG_ERROR, "Could not allocate output stream.\n"); - publisher_cancel_reserve(info->pub); + publisher_cancel_reserve(pub); info->httpd->close(server, client); avformat_free_context(ofmt_ctx); av_free(client_ctx->buffer); @@ -398,7 +426,7 @@ void *accept_thread(void *arg) ret = avcodec_parameters_copy(out_stream->codecpar, in_stream->codecpar); if (ret < 0) { av_log(client, AV_LOG_ERROR, "Failed to copy context from input to output stream codec context: %s.\n", av_err2str(ret)); - publisher_cancel_reserve(info->pub); + publisher_cancel_reserve(pub); info->httpd->close(server, client); avformat_free_context(ofmt_ctx); av_free(client_ctx->buffer); @@ -415,12 +443,12 @@ void *accept_thread(void *arg) } av_dict_copy(&out_stream->metadata, in_stream->metadata, 0); } - av_dict_copy(&info->ifmt_ctx->metadata, ofmt_ctx->metadata, 0); + av_dict_copy(&ifmt_ctx->metadata, ofmt_ctx->metadata, 0); ofmt_ctx->pb = client_ctx; ret = avformat_write_header(ofmt_ctx, &mkvopts); if (ret < 0) { av_log(client, AV_LOG_ERROR, "Could not write header to client: %s.\n", av_err2str(ret)); - publisher_cancel_reserve(info->pub); + publisher_cancel_reserve(pub); info->httpd->close(server, client); avformat_free_context(ofmt_ctx); av_free(client_ctx->buffer); @@ -428,7 +456,7 @@ void *accept_thread(void *arg) av_free(ffinfo); continue; } - publisher_add_client(info->pub, ofmt_ctx, ffinfo); + publisher_add_client(pub, ofmt_ctx, ffinfo); ofmt_ctx = NULL; } @@ -470,59 +498,127 @@ void *write_thread(void *arg) return NULL; } - -int main(int argc, char *argv[]) -{ - struct ReadInfo rinfo; +void *run_server(void *arg) { struct AcceptInfo ainfo; - struct WriteInfo *winfos; - struct PublisherContext *pub; - int ret, i; - pthread_t r_thread, a_thread; - pthread_t *w_threads; + struct ReadInfo *rinfos; + struct WriteInfo **winfos_p; + struct HTTPDConfig *config = (struct HTTPDConfig*) arg; + struct PublisherContext **pubs; + AVFormatContext **ifmt_ctxs; + int ret, i, stream_index; + pthread_t *r_threads; + pthread_t **w_threads_p; - AVFormatContext *ifmt_ctx = NULL; - - rinfo.in_filename = "pipe:0"; - if (argc > 1) - rinfo.in_filename = argv[1]; + pubs = av_mallocz(config->nb_streams * sizeof(struct PublisherContext*)); + ifmt_ctxs = av_mallocz(config->nb_streams * sizeof(AVFormatContext*)); av_log_set_level(AV_LOG_INFO); - if ((ret = avformat_open_input(&ifmt_ctx, rinfo.in_filename, NULL, NULL))) { - av_log(NULL, AV_LOG_ERROR, "main: Could not open input\n"); - return 1; - } - - publisher_init(&pub); - - rinfo.ifmt_ctx = ifmt_ctx; - rinfo.pub = pub; - ainfo.ifmt_ctx = ifmt_ctx; - ainfo.pub = pub; + ainfo.pubs = pubs; + ainfo.ifmt_ctxs = ifmt_ctxs; + ainfo.nb_pub = config->nb_streams; ainfo.httpd = &lavfhttpd; + ainfo.config = config; - w_threads = (pthread_t*) av_malloc(sizeof(pthread_t) * pub->nb_threads); - winfos = (struct WriteInfo*) av_malloc(sizeof(struct WriteInfo) * pub->nb_threads); + rinfos = av_mallocz(config->nb_streams * sizeof(struct ReadInfo)); + winfos_p = av_mallocz(config->nb_streams * sizeof(struct WriteInfo*)); + r_threads = av_mallocz(config->nb_streams * sizeof(pthread_t)); + w_threads_p = av_mallocz(config->nb_streams * sizeof(pthread_t*)); - for (i = 0; i < pub->nb_threads; i++) { - winfos[i].pub = pub; - winfos[i].thread_id = i; - pthread_create(&w_threads[i], NULL, write_thread, &winfos[i]); + for (stream_index = 0; stream_index < config->nb_streams; stream_index++) { + struct PublisherContext *pub = NULL; + struct AVFormatContext *ifmt_ctx = NULL; + struct ReadInfo rinfo; + struct WriteInfo *winfos = NULL; + pthread_t *w_threads = NULL; + pthread_t r_thread; + rinfo.input_uri = config->streams[stream_index].input_uri; + + if ((ret = avformat_open_input(&ifmt_ctx, rinfo.input_uri, NULL, NULL))) { + av_log(NULL, AV_LOG_ERROR, "run_server: Could not open input\n"); + continue; + } + + ifmt_ctxs[stream_index] = ifmt_ctx; + + publisher_init(&pub, config->streams[stream_index].stream_name); + pubs[stream_index] = pub; + + rinfo.ifmt_ctx = ifmt_ctx; + rinfo.pub = pub; + + rinfos[stream_index] = rinfo; + + w_threads = av_malloc(sizeof(pthread_t) * pub->nb_threads); + winfos = av_malloc(sizeof(struct WriteInfo) * pub->nb_threads); + + w_threads_p[stream_index] = w_threads; + winfos_p[stream_index] = winfos; + + for (i = 0; i < pub->nb_threads; i++) { + winfos[i].pub = pub; + winfos[i].thread_id = i; + pthread_create(&w_threads[i], NULL, write_thread, &winfos_p[stream_index][i]); + } + w_threads_p[stream_index] = w_threads; + pthread_create(&r_thread, NULL, read_thread, &rinfos[stream_index]); + r_threads[stream_index] = r_thread; } - - pthread_create(&r_thread, NULL, read_thread, &rinfo); - + + + //pthread_create(&a_thread, NULL, accept_thread, &ainfo); accept_thread(&ainfo); - - pthread_join(r_thread, NULL); - - for (i = 0; i < pub->nb_threads; i++) { - pthread_join(w_threads[i], NULL); + for (stream_index = 0; stream_index < config->nb_streams; stream_index++) { + pthread_join(r_threads[stream_index], NULL); + if (pubs[stream_index]) { + for (i = 0; i < pubs[stream_index]->nb_threads; i++) { + pthread_join(w_threads_p[stream_index][i], NULL); + } + } + av_free(winfos_p[stream_index]); + av_free(w_threads_p[stream_index]); + // pubs[stream_index] could be null if the file could not be opened + if (pubs[stream_index]) + publisher_free(pubs[stream_index]); } - av_free(w_threads); - av_free(winfos); - - publisher_freep(&pub); + av_free(rinfos); + av_free(winfos_p); + av_free(r_threads); + av_free(w_threads_p); + av_free(pubs); + av_free(ifmt_ctxs); + + return NULL; +} + +int main(int argc, char *argv[]) +{ + struct HTTPDConfig *configs; + int nb_configs; + pthread_t *server_threads; + int i; + + if (argc < 2) { + printf("Usage: %s config.lua\n", argv[0]); + return 1; + } + + nb_configs = configs_read(&configs, argv[1]); + if (nb_configs <= 0) { + printf("No valid configurations parsed.\n"); + return 1; + } + server_threads = av_malloc(nb_configs * sizeof(pthread_t)); + for (i = 0; i < nb_configs; i++) { + config_dump(configs + i); + pthread_create(&server_threads[i], NULL, run_server, configs + i); + } + + for (i = 0; i < nb_configs; i++) { + pthread_join(server_threads[i], NULL); + config_free(configs + i); + } + av_free(configs); + av_free(server_threads); return 0; }