From patchwork Sun May 20 18:54:02 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Stephan Holljes X-Patchwork-Id: 9022 Delivered-To: ffmpegpatchwork@gmail.com Received: by 2002:a02:155:0:0:0:0:0 with SMTP id c82-v6csp7565470jad; Sun, 20 May 2018 11:54:43 -0700 (PDT) X-Google-Smtp-Source: AB8JxZpu72M2n+SNPPpZOsJA3I2Np3zr9b3kFlRJqe/AjWueP0gXPgVUVxeIJT6FEnPdls6A72/n X-Received: by 2002:adf:8f25:: with SMTP id p34-v6mr12912872wrb.193.1526842483899; Sun, 20 May 2018 11:54:43 -0700 (PDT) ARC-Seal: i=1; a=rsa-sha256; t=1526842483; cv=none; d=google.com; s=arc-20160816; b=vwmss6vrGIkvhwPEDPzz+ZrUIbC4ZrIISt9YiFyv/doUxiwiVhsxgfRqvTtWbRKOLQ 2frr1kXcENlB9Sjx6PhLA2g0g1XVPi8G4vN5gt5HrDtymgI9JEzdrjDccI46iL3d2RH7 4SjlFEVqywCJ7vLe2RFqABJF/SOOZGawKLGZgQKEJ3rXQKf6n0DzKqteeVi17McaNxVJ 7jrGWRlwkO7aLK2+/Z2/cxJHtPMdTnh0irVlYZNTQwa3wJPAdx5B8XfbanpGhoFRatkL KwChq9UigNeXS+I3UBjAQGF6IYYJB9xSlqjJrNVTzRhHi7gLXyXE9euEFsSgLPjbE5r4 iOsw== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:content-transfer-encoding:mime-version:cc:reply-to :list-subscribe:list-help:list-post:list-archive:list-unsubscribe :list-id:precedence:subject:references:in-reply-to:message-id:date :to:from:dkim-signature:delivered-to:arc-authentication-results; bh=54I/Gl/K1MNzY0HDh380Qb9lYbbIX4adyfpUjLgSI7U=; b=vGVkQl85aYe21ims/puZdGYRbfPKhlsaHp2kpTY3FpVBoWMOqIE9fgY2eDLTXGfahX IsPGBHSLazsCkk/mZfIGpX7mufi8gUUpjne0Ff3g8NEP/oM1vZQRYEVCaeVZ3aLMZ7/g wM++4DRT2OZDSZou0InWotzou3dP53y0d1vNtQ3RAY5sd2gtZ+3RTCFKu5Tko6v6DJWl tYv8NE7pD8W1fMr/RsJJc/+dwtKY/Ic8khzxeq8cRSDDoD6Y+yuS4HbFw3DcgXmppYXW o+TFaBl1S2GhmbRP0b3ZmVIij3YGwLIjLuA1+NIWn20y+ee/o7/R0zjyt2lMhDpPjT+S Grkg== ARC-Authentication-Results: i=1; mx.google.com; dkim=neutral (body hash did not verify) header.i=@googlemail.com header.s=20161025 header.b=cAJ1SUeX; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org; dmarc=fail (p=QUARANTINE sp=QUARANTINE dis=NONE) header.from=googlemail.com Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org. [79.124.17.100]) by mx.google.com with ESMTP id f35-v6si11007071wrf.83.2018.05.20.11.54.43; Sun, 20 May 2018 11:54:43 -0700 (PDT) Received-SPF: pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) client-ip=79.124.17.100; Authentication-Results: mx.google.com; dkim=neutral (body hash did not verify) header.i=@googlemail.com header.s=20161025 header.b=cAJ1SUeX; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org; dmarc=fail (p=QUARANTINE sp=QUARANTINE dis=NONE) header.from=googlemail.com Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 6A6C168A54C; Sun, 20 May 2018 21:53:45 +0300 (EEST) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from mail-wr0-f173.google.com (mail-wr0-f173.google.com [209.85.128.173]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id 0107868A545 for ; Sun, 20 May 2018 21:53:38 +0300 (EEST) Received: by mail-wr0-f173.google.com with SMTP id j1-v6so2924537wrm.1 for ; Sun, 20 May 2018 11:54:19 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=googlemail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references; bh=mnoTgC4xUi+CxHGqJv/0jJGf4rjsDtvgy1eZnBcQNCU=; b=cAJ1SUeXqP2kB0Z/s6JTx38Y3YprLoBVOfdIlmtgFJo7g9+O+SgSuBTUTlXIypUp9I UXebS6IFaicXphJfZ9jBcwsbOAvn4mlGVoshi/VzB+pCLJmaW/MzZUUIVzD/Ks4+Lb0w aG22HRzfyMszeio6GNKpbqWkyqMbK24nbRFLTksrl2AHI53XajXXwGqmDYyWm0QWjvtx EDP7FrrZah9hPBklD7/wUjsUibrIZbfHxwJDZdxT7ghH1RBbnEMzvLC3EfblJ/AtWIdg 7A7pnCiQ7JhN5U/wTaq9ulzfzxVpGvmqVUk/j+WHdlGEHnmxHeEBvLI1pmZO+ZmLO6LP dERg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=mnoTgC4xUi+CxHGqJv/0jJGf4rjsDtvgy1eZnBcQNCU=; b=ZKe9RGXsU+IorcKJi/VJmxIaghi3LmPGEl2o+sd8QBzd8Lghh+mfW6YzfRzbRFl5gx cmeWKx/I8afQK9hIFHBqCmsU8vvwfPg+cz0hH/jjRMI00jKDRWbobBd5iU04wyiOakJR PF8GQavhlAumGUzrquefdd1ULkADYzwkNtk8qj9NCTOaBoqyHhgq6OEeh6G3o+HDi4bI LsB0tGU40ti+rTE/vUt8qKXvr+dQgmuM323Me8fLzpTDIWba02TihsyQ8PSssBDaPFpB uX8l1Zz9VqXX95svjYkaY9//zYRNgDu6B6N6I+8Sea8Z6k8ikWl7Rf2bVvA7HooqGZXu JSeQ== X-Gm-Message-State: ALKqPwcSE4Z+6QrUoZEh9eWfvKDL0/DWXiMhDB7/6j5dGJ/hfGHT0NN1 G2mI93b9cMZ+odTx8rq613fIzw== X-Received: by 2002:adf:bb4c:: with SMTP id x12-v6mr10337339wrg.244.1526842458570; Sun, 20 May 2018 11:54:18 -0700 (PDT) Received: from localhost.localdomain ([46.5.2.0]) by smtp.gmail.com with ESMTPSA id u8-v6sm3755918wmc.40.2018.05.20.11.54.17 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Sun, 20 May 2018 11:54:17 -0700 (PDT) From: Stephan Holljes To: ffmpeg-devel@ffmpeg.org Date: Sun, 20 May 2018 20:54:02 +0200 Message-Id: <20180520185404.29836-7-klaxa1337@googlemail.com> X-Mailer: git-send-email 2.16.2 In-Reply-To: <20180520185404.29836-1-klaxa1337@googlemail.com> References: <20180520185404.29836-1-klaxa1337@googlemail.com> Subject: [FFmpeg-devel] [PATCH 6/8] ffserver.c: Add config file reading X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.20 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Cc: Stephan Holljes MIME-Version: 1.0 Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" Signed-off-by: Stephan Holljes --- ffserver.c | 248 ++++++++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 172 insertions(+), 76 deletions(-) diff --git a/ffserver.c b/ffserver.c index 44fc263..3d842d8 100644 --- a/ffserver.c +++ b/ffserver.c @@ -38,6 +38,7 @@ #include "segment.h" #include "publisher.h" #include "httpd.h" +#include "configreader.h" #define BUFFER_SECS 30 #define LISTEN_TIMEOUT_MSEC 1000 @@ -54,9 +55,11 @@ struct WriteInfo { }; struct AcceptInfo { - struct PublisherContext *pub; + struct PublisherContext **pubs; struct HTTPDInterface *httpd; - AVFormatContext *ifmt_ctx; + AVFormatContext **ifmt_ctxs; + struct HTTPDConfig *config; + int nb_pub; /* number of publishers (streams) equal to number of ifmt_ctx */ }; @@ -287,52 +290,77 @@ void *accept_thread(void *arg) { struct AcceptInfo *info = (struct AcceptInfo*) arg; struct FFServerInfo *ffinfo = NULL; + struct PublisherContext *pub; char status[4096]; + char *stream_name; struct HTTPClient *client = NULL; void *server = NULL; AVIOContext *client_ctx = NULL; AVFormatContext *ofmt_ctx = NULL; + AVFormatContext *ifmt_ctx; unsigned char *avio_buffer; AVOutputFormat *ofmt; AVDictionary *mkvopts = NULL; AVStream *in_stream, *out_stream; int ret, i, reply_code; - struct HTTPDConfig config = { - .bind_address = "0", - .port = 8080, - .accept_timeout = LISTEN_TIMEOUT_MSEC, - }; - - info->httpd->init(&server, config); - - + int shutdown; + struct HTTPDConfig *config = info->config; + + info->httpd->init(&server, *config); + for (;;) { - if (info->pub->shutdown) + shutdown = 1; + for (i = 0; i < config->nb_streams; i++) { + if (info->pubs[i] && !info->pubs[i]->shutdown) + shutdown = 0; + } + if (shutdown) break; - publisher_gen_status_json(info->pub, status); - av_log(server, AV_LOG_INFO, status); + for (i = 0; i < config->nb_streams; i++) { + publisher_gen_status_json(info->pubs[i], status); + av_log(server, AV_LOG_INFO, status); + } client = NULL; av_log(server, AV_LOG_DEBUG, "Accepting new clients.\n"); reply_code = 200; - if (publisher_reserve_client(info->pub)) { - av_log(client, AV_LOG_WARNING, "No more client slots free, Returning 503.\n"); - reply_code = 503; - } - + if ((ret = info->httpd->accept(server, &client, reply_code)) < 0) { if (ret == HTTPD_LISTEN_TIMEOUT) { - publisher_cancel_reserve(info->pub); continue; } else if (ret == HTTPD_CLIENT_ERROR) { info->httpd->close(server, client); } av_log(server, AV_LOG_WARNING, "Error during accept, retrying.\n"); - publisher_cancel_reserve(info->pub); continue; } - + + pub = NULL; + ifmt_ctx = NULL; + for (i = 0; i < config->nb_streams; i++) { + stream_name = info->pubs[i]->stream_name; + // skip leading '/' ---v + if(!strncmp(client->resource + 1, stream_name, strlen(stream_name))) { + pub = info->pubs[i]; + ifmt_ctx = info->ifmt_ctxs[i]; + break; + } + } + + if (!pub || !ifmt_ctx) { + av_log(client_ctx, AV_LOG_WARNING, "No suitable publisher found for resource: %s.\n", + client->resource ? client->resource : "(null)"); + reply_code = 404; + } + + + if (pub && ifmt_ctx && publisher_reserve_client(pub)) { + av_log(client_ctx, AV_LOG_WARNING, "No more client slots free, Returning 503.\n"); + reply_code = 503; + } + if (reply_code != 200) { - publisher_cancel_reserve(info->pub); + if (pub && ifmt_ctx) + publisher_cancel_reserve(pub); info->httpd->close(server, client); continue; } @@ -345,7 +373,7 @@ void *accept_thread(void *arg) client_ctx = avio_alloc_context(avio_buffer, AV_BUFSIZE, 1, ffinfo, NULL, &ffserver_write, NULL); if (!client_ctx) { av_log(client, AV_LOG_ERROR, "Could not allocate output format context.\n"); - publisher_cancel_reserve(info->pub); + publisher_cancel_reserve(pub); info->httpd->close(server, client); av_free(client_ctx->buffer); avio_context_free(&client_ctx); @@ -355,7 +383,7 @@ void *accept_thread(void *arg) avformat_alloc_output_context2(&ofmt_ctx, NULL, "matroska", NULL); if (!ofmt_ctx) { av_log(client, AV_LOG_ERROR, "Could not allocate output format context.\n"); - publisher_cancel_reserve(info->pub); + publisher_cancel_reserve(pub); info->httpd->close(server, client); avformat_free_context(ofmt_ctx); av_free(client_ctx->buffer); @@ -365,7 +393,7 @@ void *accept_thread(void *arg) } if ((ret = av_dict_set(&mkvopts, "live", "1", 0)) < 0) { av_log(client, AV_LOG_ERROR, "Failed to set live mode for matroska: %s\n", av_err2str(ret)); - publisher_cancel_reserve(info->pub); + publisher_cancel_reserve(pub); info->httpd->close(server, client); avformat_free_context(ofmt_ctx); av_free(client_ctx->buffer); @@ -377,13 +405,13 @@ void *accept_thread(void *arg) ofmt = ofmt_ctx->oformat; ofmt->flags |= AVFMT_NOFILE | AVFMT_FLAG_AUTO_BSF; - for (i = 0; i < info->ifmt_ctx->nb_streams; i++) { - in_stream = info->ifmt_ctx->streams[i]; + for (i = 0; i < ifmt_ctx->nb_streams; i++) { + in_stream = ifmt_ctx->streams[i]; out_stream = avformat_new_stream(ofmt_ctx, NULL); if (!out_stream) { av_log(client, AV_LOG_ERROR, "Could not allocate output stream.\n"); - publisher_cancel_reserve(info->pub); + publisher_cancel_reserve(pub); info->httpd->close(server, client); avformat_free_context(ofmt_ctx); av_free(client_ctx->buffer); @@ -395,7 +423,7 @@ void *accept_thread(void *arg) ret = avcodec_parameters_copy(out_stream->codecpar, in_stream->codecpar); if (ret < 0) { av_log(client, AV_LOG_ERROR, "Failed to copy context from input to output stream codec context: %s.\n", av_err2str(ret)); - publisher_cancel_reserve(info->pub); + publisher_cancel_reserve(pub); info->httpd->close(server, client); avformat_free_context(ofmt_ctx); av_free(client_ctx->buffer); @@ -412,12 +440,12 @@ void *accept_thread(void *arg) } av_dict_copy(&out_stream->metadata, in_stream->metadata, 0); } - av_dict_copy(&info->ifmt_ctx->metadata, ofmt_ctx->metadata, 0); + av_dict_copy(&ifmt_ctx->metadata, ofmt_ctx->metadata, 0); ofmt_ctx->pb = client_ctx; ret = avformat_write_header(ofmt_ctx, &mkvopts); if (ret < 0) { av_log(client, AV_LOG_ERROR, "Could not write header to client: %s.\n", av_err2str(ret)); - publisher_cancel_reserve(info->pub); + publisher_cancel_reserve(pub); info->httpd->close(server, client); avformat_free_context(ofmt_ctx); av_free(client_ctx->buffer); @@ -425,7 +453,7 @@ void *accept_thread(void *arg) av_free(ffinfo); continue; } - publisher_add_client(info->pub, ofmt_ctx, ffinfo); + publisher_add_client(pub, ofmt_ctx, ffinfo); ofmt_ctx = NULL; } @@ -467,59 +495,127 @@ void *write_thread(void *arg) return NULL; } - -int main(int argc, char *argv[]) -{ - struct ReadInfo rinfo; +void *run_server(void *arg) { struct AcceptInfo ainfo; - struct WriteInfo *winfos; - struct PublisherContext *pub; - int ret, i; - pthread_t r_thread, a_thread; - pthread_t *w_threads; + struct ReadInfo *rinfos; + struct WriteInfo **winfos_p; + struct HTTPDConfig *config = (struct HTTPDConfig*) arg; + struct PublisherContext **pubs; + AVFormatContext **ifmt_ctxs; + int ret, i, stream_index; + pthread_t *r_threads; + pthread_t **w_threads_p; - AVFormatContext *ifmt_ctx = NULL; - - rinfo.in_filename = "pipe:0"; - if (argc > 1) - rinfo.in_filename = argv[1]; + pubs = av_mallocz(config->nb_streams * sizeof(struct PublisherContext*)); + ifmt_ctxs = av_mallocz(config->nb_streams * sizeof(AVFormatContext*)); av_log_set_level(AV_LOG_INFO); - if ((ret = avformat_open_input(&ifmt_ctx, rinfo.in_filename, NULL, NULL))) { - av_log(NULL, AV_LOG_ERROR, "main: Could not open input\n"); - return 1; - } - - publisher_init(&pub); - - rinfo.ifmt_ctx = ifmt_ctx; - rinfo.pub = pub; - ainfo.ifmt_ctx = ifmt_ctx; - ainfo.pub = pub; + ainfo.pubs = pubs; + ainfo.ifmt_ctxs = ifmt_ctxs; + ainfo.nb_pub = config->nb_streams; ainfo.httpd = &lavfhttpd; + ainfo.config = config; - w_threads = (pthread_t*) av_malloc(sizeof(pthread_t) * pub->nb_threads); - winfos = (struct WriteInfo*) av_malloc(sizeof(struct WriteInfo) * pub->nb_threads); + rinfos = av_mallocz(config->nb_streams * sizeof(struct ReadInfo)); + winfos_p = av_mallocz(config->nb_streams * sizeof(struct WriteInfo*)); + r_threads = av_mallocz(config->nb_streams * sizeof(pthread_t)); + w_threads_p = av_mallocz(config->nb_streams * sizeof(pthread_t*)); - for (i = 0; i < pub->nb_threads; i++) { - winfos[i].pub = pub; - winfos[i].thread_id = i; - pthread_create(&w_threads[i], NULL, write_thread, &winfos[i]); + for (stream_index = 0; stream_index < config->nb_streams; stream_index++) { + struct PublisherContext *pub = NULL; + struct AVFormatContext *ifmt_ctx = NULL; + struct ReadInfo rinfo; + struct WriteInfo *winfos = NULL; + pthread_t *w_threads = NULL; + pthread_t r_thread; + rinfo.input_uri = config->streams[stream_index].input_uri; + + if ((ret = avformat_open_input(&ifmt_ctx, rinfo.input_uri, NULL, NULL))) { + av_log(NULL, AV_LOG_ERROR, "run_server: Could not open input\n"); + continue; + } + + ifmt_ctxs[stream_index] = ifmt_ctx; + + publisher_init(&pub, config->streams[stream_index].stream_name); + pubs[stream_index] = pub; + + rinfo.ifmt_ctx = ifmt_ctx; + rinfo.pub = pub; + + rinfos[stream_index] = rinfo; + + w_threads = av_malloc(sizeof(pthread_t) * pub->nb_threads); + winfos = av_malloc(sizeof(struct WriteInfo) * pub->nb_threads); + + w_threads_p[stream_index] = w_threads; + winfos_p[stream_index] = winfos; + + for (i = 0; i < pub->nb_threads; i++) { + winfos[i].pub = pub; + winfos[i].thread_id = i; + pthread_create(&w_threads[i], NULL, write_thread, &winfos_p[stream_index][i]); + } + w_threads_p[stream_index] = w_threads; + pthread_create(&r_thread, NULL, read_thread, &rinfos[stream_index]); + r_threads[stream_index] = r_thread; } - - pthread_create(&r_thread, NULL, read_thread, &rinfo); - + + + //pthread_create(&a_thread, NULL, accept_thread, &ainfo); accept_thread(&ainfo); - - pthread_join(r_thread, NULL); - - for (i = 0; i < pub->nb_threads; i++) { - pthread_join(w_threads[i], NULL); + for (stream_index = 0; stream_index < config->nb_streams; stream_index++) { + pthread_join(r_threads[stream_index], NULL); + if (pubs[stream_index]) { + for (i = 0; i < pubs[stream_index]->nb_threads; i++) { + pthread_join(w_threads_p[stream_index][i], NULL); + } + } + av_free(winfos_p[stream_index]); + av_free(w_threads_p[stream_index]); + // pubs[stream_index] could be null if the file could not be opened + if (pubs[stream_index]) + publisher_free(pubs[stream_index]); } - av_free(w_threads); - av_free(winfos); - - publisher_freep(&pub); + av_free(rinfos); + av_free(winfos_p); + av_free(r_threads); + av_free(w_threads_p); + av_free(pubs); + av_free(ifmt_ctxs); + + return NULL; +} + +int main(int argc, char *argv[]) +{ + struct HTTPDConfig *configs; + int nb_configs; + pthread_t *server_threads; + int i; + + if (argc < 2) { + printf("Usage: %s config.lua\n", argv[0]); + return 1; + } + + nb_configs = configs_read(&configs, argv[1]); + if (nb_configs <= 0) { + printf("No valid configurations parsed.\n"); + return 1; + } + server_threads = av_malloc(nb_configs * sizeof(pthread_t)); + for (i = 0; i < nb_configs; i++) { + config_dump(configs + i); + pthread_create(&server_threads[i], NULL, run_server, configs + i); + } + + for (i = 0; i < nb_configs; i++) { + pthread_join(server_threads[i], NULL); + config_free(configs + i); + } + av_free(configs); + av_free(server_threads); return 0; }