diff mbox

[FFmpeg-devel,14/17] Add fileserver and add it to Makefile

Message ID 20180628005117.18902-15-klaxa1337@googlemail.com
State New
Headers show

Commit Message

Stephan Holljes June 28, 2018, 12:51 a.m. UTC
Signed-off-by: Stephan Holljes <klaxa1337@googlemail.com>
---
 Makefile     |   7 ++-
 ffserver.c   | 150 +++++++++++++++++++++++++++++++++++++++------------
 fileserver.c |  97 +++++++++++++++++++++++++++++++++
 fileserver.h |  63 ++++++++++++++++++++++
 4 files changed, 282 insertions(+), 35 deletions(-)
 create mode 100644 fileserver.c
 create mode 100644 fileserver.h
diff mbox

Patch

diff --git a/Makefile b/Makefile
index 83bc4e0..18f3ac3 100644
--- a/Makefile
+++ b/Makefile
@@ -4,8 +4,8 @@  LUA_FLAGS = $(shell pkg-config --libs --cflags lua5.3)
 CFLAGS=-fsanitize=address -fsanitize=undefined
 # LAV_FLAGS = -L/usr/local/lib -lavcodec -lavformat -lavutil
 
-ffserver: segment.o publisher.o lavfhttpd.o configreader.o ffserver.c
-	cc -g -Wall $(CFLAGS) $(LAV_FLAGS) $(LUA_FLAGS) -lpthread -o ffserver segment.o publisher.o lavfhttpd.o configreader.o ffserver.c
+ffserver: segment.o publisher.o fileserver.o lavfhttpd.o configreader.o ffserver.c
+	cc -g -Wall $(CFLAGS) $(LAV_FLAGS) $(LUA_FLAGS) -lpthread -o ffserver segment.o publisher.o fileserver.o lavfhttpd.o configreader.o ffserver.c
 
 segment.o: segment.c segment.h
 	cc -g -Wall $(CFLAGS) $(LAV_FLAGS) -lpthread -c segment.c
@@ -13,6 +13,9 @@  segment.o: segment.c segment.h
 publisher.o: publisher.c publisher.h
 	cc -g -Wall $(CFLAGS) $(LAV_FLAGS) -lpthread -c publisher.c
 
+fileserver.o: fileserver.c fileserver.h
+	cc -g -Wall $(CFLAGS) $(LAV_FLAGS) -lpthread -c fileserver.c
+
 lavfhttpd.o: lavfhttpd.c httpd.h
 	cc -g -Wall $(CFLAGS) $(LAV_FLAGS) -lpthread -c lavfhttpd.c
 
diff --git a/ffserver.c b/ffserver.c
index 4f42f74..91ad29a 100644
--- a/ffserver.c
+++ b/ffserver.c
@@ -37,9 +37,11 @@ 
 #include <libavutil/opt.h>
 #include <libavformat/avformat.h>
 #include <libavcodec/avcodec.h>
+#include <libavutil/file.h>
 
 #include "segment.h"
 #include "publisher.h"
+#include "fileserver.h"
 #include "httpd.h"
 #include "configreader.h"
 
@@ -64,6 +66,7 @@  struct AcceptInfo {
     struct HTTPDInterface *httpd;
     AVFormatContext **ifmt_ctxs;
     struct HTTPDConfig *config;
+    struct FileserverContext *fs;
     int nb_pub; /** number of publishers (streams) equal to number of ifmt_ctx */
 };
 
@@ -448,8 +451,8 @@  void *accept_thread(void *arg)
     struct AcceptInfo *info = (struct AcceptInfo*) arg;
     struct FFServerInfo *ffinfo = NULL;
     struct PublisherContext *pub;
-    char status[4096];
-    char *stream_name;
+    char status[4096], requested_file[1024], sanitized_file[1024];
+    char *stream_name, *resource;
     struct HTTPClient *client = NULL;
     void *server = NULL;
     AVIOContext *client_ctx = NULL;
@@ -493,6 +496,8 @@  void *accept_thread(void *arg)
 
         pub = NULL;
         ifmt_ctx = NULL;
+        resource = client->resource;
+        snprintf(requested_file, 1024, "%s", resource);
         for (i = 0; i < config->nb_streams; i++) {
             stream_name = info->pubs[i]->stream_name;
             //  skip leading '/'  ---v
@@ -515,7 +520,8 @@  void *accept_thread(void *arg)
             }
         }
 
-        if (!pub || !ifmt_ctx) {
+
+        if ((!pub || !ifmt_ctx) && !requested_file[0]) {
             av_log(client_ctx, AV_LOG_WARNING, "No suitable publisher found for resource: %s.\n",
                                                         client->resource ? client->resource : "(null)");
             reply_code = 404;
@@ -534,13 +540,6 @@  void *accept_thread(void *arg)
             continue;
         }
 
-        avio_buffer = av_malloc(AV_BUFSIZE);
-        if (!avio_buffer) {
-            av_log(client_ctx, AV_LOG_ERROR, "Could not allocate output format context.\n");
-            publisher_cancel_reserve(pub);
-            info->httpd->close(server, client);
-            continue;
-        }
         ffinfo = av_malloc(sizeof(*ffinfo));
         if (!ffinfo) {
             av_log(client_ctx, AV_LOG_ERROR, "Could not allocate FFServerInfo struct.\n");
@@ -551,6 +550,30 @@  void *accept_thread(void *arg)
         ffinfo->httpd = info->httpd;
         ffinfo->client = client;
         ffinfo->server = server;
+
+
+        // try to serve file
+        if (requested_file[0]) {
+            snprintf(sanitized_file, 1024, "%s", requested_file);
+            resource = requested_file;
+            while(resource && *resource == '/') {
+                resource++;
+            }
+
+            snprintf(sanitized_file, 1024, "%s", resource);
+            fileserver_schedule(info->fs, ffinfo, sanitized_file);
+            continue;
+        }
+
+        avio_buffer = av_malloc(AV_BUFSIZE);
+        if (!avio_buffer) {
+            av_log(client_ctx, AV_LOG_ERROR, "Could not allocate output format context.\n");
+            publisher_cancel_reserve(pub);
+            info->httpd->close(server, client);
+            continue;
+        }
+
+
         client_ctx = avio_alloc_context(avio_buffer, AV_BUFSIZE, 1, ffinfo, NULL, &ffserver_write, NULL);
         if (!client_ctx) {
             av_log(NULL, AV_LOG_ERROR, "Could not allocate output format context.\n");
@@ -676,16 +699,52 @@  void *write_thread(void *arg)
     return NULL;
 }
 
+void *fileserver_thread(void *arg)
+{
+    struct FileserverContext *fs = (struct FileserverContext*) arg;
+    int i, clients_served;
+    struct FileserverClient *c;
+    for (;;) {
+        usleep(500000);
+        av_log(NULL, AV_LOG_WARNING, "Checking clients, fileserver-thread %s: ", fs->server_name);
+        clients_served = 0;
+        for (i = 0; i < MAX_CLIENTS; i++) {
+            c = &fs->clients[i];
+            pthread_mutex_lock(&c->client_lock);
+            if (c->buf) {
+                c->ffinfo->httpd->write(c->ffinfo->server, c->ffinfo->client, c->buf, c->size);
+                av_file_unmap(c->buf, c->size);
+                c->ffinfo->httpd->close(c->ffinfo->server, c->ffinfo->client);
+                c->buf = NULL;
+                c->size = 0;
+                av_freep(&c->ffinfo);
+                clients_served++;
+            }
+            pthread_mutex_unlock(&c->client_lock);
+        }
+        av_log(NULL, AV_LOG_WARNING, "%d/%d served\n", clients_served, MAX_CLIENTS);
+        if (fs->shutdown) {
+            printf("shutting down\n");
+            break;
+        }
+    }
+
+    return NULL;
+}
+
 void *run_server(void *arg) {
     struct AcceptInfo ainfo;
     struct ReadInfo *rinfos;
     struct WriteInfo **winfos_p;
     struct HTTPDConfig *config = (struct HTTPDConfig*) arg;
     struct PublisherContext **pubs;
+    struct FileserverContext *fs = NULL;
     AVFormatContext **ifmt_ctxs;
     int ret, i, stream_index;
+    int stream_formats[FMT_NB] = { 0 };
     pthread_t *r_threads;
     pthread_t **w_threads_p;
+    pthread_t fs_thread = 0;
 
     pubs = av_mallocz_array(config->nb_streams, sizeof(struct PublisherContext*));
     if (!pubs) {
@@ -718,6 +777,7 @@  void *run_server(void *arg) {
     av_log_set_level(AV_LOG_INFO);
 
     ainfo.pubs = pubs;
+    ainfo.fs = fs;
     ainfo.ifmt_ctxs = ifmt_ctxs;
     ainfo.nb_pub = config->nb_streams;
     ainfo.httpd = &lavfhttpd;
@@ -751,56 +811,75 @@  void *run_server(void *arg) {
         struct WriteInfo *winfos = NULL;
         pthread_t *w_threads = NULL;
         pthread_t r_thread;
+        int stream_formats[FMT_NB] = { 0 };
         rinfo.input_uri = config->streams[stream_index].input_uri;
         rinfo.server_name = config->server_name;
 
+        for (i = 0; i < config->streams[stream_index].nb_formats; i++)
+            stream_formats[config->streams[stream_index].formats[i]] = 1;
+
         if ((ret = avformat_open_input(&ifmt_ctx, rinfo.input_uri, NULL, NULL))) {
             av_log(NULL, AV_LOG_ERROR, "run_server: Could not open input\n");
             continue;
         }
 
+
         ifmt_ctxs[stream_index] = ifmt_ctx;
+        if (stream_formats[FMT_MATROSKA])
+            publisher_init(&pub, config->streams[stream_index].stream_name);
 
-        publisher_init(&pub, config->streams[stream_index].stream_name);
         pubs[stream_index] = pub;
 
         rinfo.ifmt_ctx = ifmt_ctx;
         rinfo.pub = pub;
 
         rinfos[stream_index] = rinfo;
-
-        w_threads = av_mallocz_array(pub->nb_threads, sizeof(pthread_t));
-        if (!w_threads) {
-            av_log(NULL, AV_LOG_ERROR, "Could not allocate write thread handles.\n");
-            continue;
-        }
-        winfos = av_mallocz_array(pub->nb_threads, sizeof(struct WriteInfo));
-        if (!winfos) {
-            av_log(NULL, AV_LOG_ERROR, "Could not allocate write infos.\n");
-            continue;
-        }
-        w_threads_p[stream_index] = w_threads;
-        winfos_p[stream_index] = winfos;
-
-        for (i = 0; i < pub->nb_threads; i++) {
-            winfos[i].pub = pub;
-            winfos[i].thread_id = i;
-            ret = pthread_create(&w_threads[i], NULL, write_thread, &winfos_p[stream_index][i]);
+        if (stream_formats[FMT_MATROSKA]) {
+            w_threads = av_mallocz_array(pub->nb_threads, sizeof(pthread_t));
+            if (!w_threads) {
+                av_log(NULL, AV_LOG_ERROR, "Could not allocate write thread handles.\n");
+                continue;
+            }
+            winfos = av_mallocz_array(pub->nb_threads, sizeof(struct WriteInfo));
+            if (!winfos) {
+                av_log(NULL, AV_LOG_ERROR, "Could not allocate write infos.\n");
+                continue;
+            }
+            w_threads_p[stream_index] = w_threads;
+            winfos_p[stream_index] = winfos;
+
+            for (i = 0; i < pub->nb_threads; i++) {
+                winfos[i].pub = pub;
+                winfos[i].thread_id = i;
+                ret = pthread_create(&w_threads[i], NULL, write_thread, &winfos_p[stream_index][i]);
+                if (ret != 0) {
+                    pub->shutdown = 1;
+                    w_threads[i] = 0;
+                    goto end;
+                }
+            }
+            w_threads_p[stream_index] = w_threads;
+            ret = pthread_create(&r_thread, NULL, read_thread, &rinfos[stream_index]);
             if (ret != 0) {
                 pub->shutdown = 1;
+                r_thread = 0;
                 goto end;
             }
+            r_threads[stream_index] = r_thread;
         }
-        w_threads_p[stream_index] = w_threads;
-        ret = pthread_create(&r_thread, NULL, read_thread, &rinfos[stream_index]);
+
+    }
+    if (stream_formats[FMT_HLS] || stream_formats[FMT_DASH]) {
+        ret = pthread_create(&fs_thread, NULL, fileserver_thread, fs);
         if (ret != 0) {
-            pub->shutdown = 1;
+            fs->shutdown = 1;
+            fs_thread = 0;
             goto end;
         }
-        r_threads[stream_index] = r_thread;
     }
 
 
+
     //pthread_create(&a_thread, NULL, accept_thread, &ainfo);
     accept_thread(&ainfo);
 
@@ -823,6 +902,11 @@  end:
             publisher_free(pubs[stream_index]);
     }
 
+    if (fs_thread) {
+        fs->shutdown = 1;
+        pthread_join(fs_thread, NULL);
+        fileserver_free(fs);
+    }
 error_cleanup:
     av_free(rinfos);
     av_free(winfos_p);
diff --git a/fileserver.c b/fileserver.c
new file mode 100644
index 0000000..a3460e3
--- /dev/null
+++ b/fileserver.c
@@ -0,0 +1,97 @@ 
+/*
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include "fileserver.h"
+#include "httpd.h"
+
+#include <pthread.h>
+
+#include <libavutil/log.h>
+#include <libavutil/file.h>
+
+void fileserver_schedule(struct FileserverContext *fs, struct FFServerInfo *ffinfo, const char *filename)
+{
+    int i, ret;
+    char final_filename[1024];
+    struct FileserverClient *fsc = NULL;
+    for (i = 0; i < MAX_CLIENTS; i++) {
+        pthread_mutex_lock(&fs->clients[i].client_lock);
+        if (!fs->clients[i].ffinfo) {
+            fsc = &fs->clients[i];
+            break;
+        }
+        pthread_mutex_unlock(&fs->clients[i].client_lock);
+    }
+    // fsc still locked
+    if (fsc) {
+        fsc->ffinfo = ffinfo;
+        snprintf(final_filename, 1024, "%s/%s", fs->server_name, filename);
+        ret = av_file_map(final_filename, &fsc->buf, &fsc->size, 0, NULL);
+        if (ret < 0) {
+            av_log(NULL, AV_LOG_ERROR, "Could not read file: %s: %s.\n", final_filename, av_err2str(ret));
+            fsc->ffinfo->httpd->close(fsc->ffinfo->server, fsc->ffinfo->client);
+            av_freep(&fsc->ffinfo);
+            fsc->buf = NULL;
+            fsc->size = 0;
+        }
+        fsc->ofmt_ctx = ofmt_ctx;
+        pthread_mutex_unlock(&fsc->client_lock);
+    } else {
+        av_log(NULL, AV_LOG_WARNING, "Could not find free client slot.\n");
+        ffinfo->httpd->close(ffinfo->server, ffinfo->client);
+        av_free(ffinfo);
+    }
+    av_log(NULL, AV_LOG_INFO, "Scheduled serving file: %s\n", final_filename);
+    return;
+}
+
+void fileserver_init(struct FileserverContext **fs_p, const char *server_name)
+{
+    struct FileserverContext *fs = av_mallocz(sizeof(struct FileserverContext));
+    int i;
+    *fs_p = NULL;
+
+    if (!fs) {
+        av_log(NULL, AV_LOG_ERROR, "Could not allocate fileserver context.\n");
+        return;
+    }
+
+    for (i = 0; i < MAX_CLIENTS; i++) {
+        fs->clients[i].ffinfo = NULL;
+        fs->clients[i].buf = NULL;
+        fs->clients[i].size = 0;
+        pthread_mutex_init(&fs->clients[i].client_lock, NULL);
+    }
+
+    fs->nb_threads = 1;
+    fs->server_name = server_name;
+    fs->shutdown = 0;
+    *fs_p = fs;
+    return;
+}
+
+void fileserver_free(struct FileserverContext *fs) {
+    int i;
+    for (i = 0; i < MAX_CLIENTS; i++) {
+        pthread_mutex_lock(&fs->clients[i].client_lock);
+        av_free(fs->clients[i].ffinfo);
+        av_free(fs->clients[i].buf);
+        pthread_mutex_unlock(&fs->clients[i].client_lock);
+    }
+    av_free(fs);
+}
diff --git a/fileserver.h b/fileserver.h
new file mode 100644
index 0000000..6e449c0
--- /dev/null
+++ b/fileserver.h
@@ -0,0 +1,63 @@ 
+/*
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef FILESERVER_H
+#define FILESERVER_H
+
+#define MAX_CLIENTS 16
+
+#include "httpd.h"
+#include <sys/types.h>
+
+struct FileserverClient {
+    struct FFServerInfo *ffinfo;
+    pthread_mutex_t client_lock;
+    unsigned char *buf;
+    size_t size;
+};
+
+
+struct FileserverContext {
+    struct FileserverClient clients[MAX_CLIENTS];
+    int nb_threads;
+    int shutdown;
+    const char *server_name;
+};
+
+/**
+ * Schedule file for sending to a client. It is added to a list of clients.
+ * @param fs pointer to the FileserverContext to schedule to
+ * @param ffinfo pointer to an FFServerInfo struct containing information for serving the file
+ * @param filename filename of the file to serve
+ */
+void fileserver_schedule(struct FileserverContext *fs, struct FFServerInfo *ffinfo, AVIOContext *ofmt_ctx, const char *filename);
+
+/**
+ * Initialize a Fileservercontext
+ * @param fs_p pointer to a pointer where the FileserverContext will be allocated
+ * @param server_name the name of the server
+ */
+void fileserver_init(struct FileserverContext **fs_p, const char *server_name);
+
+/**
+ * Free a Fileservercontext
+ */
+void fileserver_free(struct FileserverContext *fs);
+
+
+#endif