@@ -4,8 +4,8 @@ LUA_FLAGS = $(shell pkg-config --libs --cflags lua5.3)
CFLAGS=-fsanitize=address -fsanitize=undefined
# LAV_FLAGS = -L/usr/local/lib -lavcodec -lavformat -lavutil
-ffserver: segment.o publisher.o lavfhttpd.o configreader.o ffserver.c
- cc -g -Wall $(CFLAGS) $(LAV_FLAGS) $(LUA_FLAGS) -lpthread -o ffserver segment.o publisher.o lavfhttpd.o configreader.o ffserver.c
+ffserver: segment.o publisher.o fileserver.o lavfhttpd.o configreader.o ffserver.c
+ cc -g -Wall $(CFLAGS) $(LAV_FLAGS) $(LUA_FLAGS) -lpthread -o ffserver segment.o publisher.o fileserver.o lavfhttpd.o configreader.o ffserver.c
segment.o: segment.c segment.h
cc -g -Wall $(CFLAGS) $(LAV_FLAGS) -lpthread -c segment.c
@@ -13,6 +13,9 @@ segment.o: segment.c segment.h
publisher.o: publisher.c publisher.h
cc -g -Wall $(CFLAGS) $(LAV_FLAGS) -lpthread -c publisher.c
+fileserver.o: fileserver.c fileserver.h
+ cc -g -Wall $(CFLAGS) $(LAV_FLAGS) -lpthread -c fileserver.c
+
lavfhttpd.o: lavfhttpd.c httpd.h
cc -g -Wall $(CFLAGS) $(LAV_FLAGS) -lpthread -c lavfhttpd.c
@@ -37,9 +37,11 @@
#include <libavutil/opt.h>
#include <libavformat/avformat.h>
#include <libavcodec/avcodec.h>
+#include <libavutil/file.h>
#include "segment.h"
#include "publisher.h"
+#include "fileserver.h"
#include "httpd.h"
#include "configreader.h"
@@ -64,6 +66,7 @@ struct AcceptInfo {
struct HTTPDInterface *httpd;
AVFormatContext **ifmt_ctxs;
struct HTTPDConfig *config;
+ struct FileserverContext *fs;
int nb_pub; /** number of publishers (streams) equal to number of ifmt_ctx */
};
@@ -448,8 +451,8 @@ void *accept_thread(void *arg)
struct AcceptInfo *info = (struct AcceptInfo*) arg;
struct FFServerInfo *ffinfo = NULL;
struct PublisherContext *pub;
- char status[4096];
- char *stream_name;
+ char status[4096], requested_file[1024], sanitized_file[1024];
+ char *stream_name, *resource;
struct HTTPClient *client = NULL;
void *server = NULL;
AVIOContext *client_ctx = NULL;
@@ -493,6 +496,8 @@ void *accept_thread(void *arg)
pub = NULL;
ifmt_ctx = NULL;
+ resource = client->resource;
+ snprintf(requested_file, 1024, "%s", resource);
for (i = 0; i < config->nb_streams; i++) {
stream_name = info->pubs[i]->stream_name;
// skip leading '/' ---v
@@ -515,7 +520,8 @@ void *accept_thread(void *arg)
}
}
- if (!pub || !ifmt_ctx) {
+
+ if ((!pub || !ifmt_ctx) && !requested_file[0]) {
av_log(client_ctx, AV_LOG_WARNING, "No suitable publisher found for resource: %s.\n",
client->resource ? client->resource : "(null)");
reply_code = 404;
@@ -534,13 +540,6 @@ void *accept_thread(void *arg)
continue;
}
- avio_buffer = av_malloc(AV_BUFSIZE);
- if (!avio_buffer) {
- av_log(client_ctx, AV_LOG_ERROR, "Could not allocate output format context.\n");
- publisher_cancel_reserve(pub);
- info->httpd->close(server, client);
- continue;
- }
ffinfo = av_malloc(sizeof(*ffinfo));
if (!ffinfo) {
av_log(client_ctx, AV_LOG_ERROR, "Could not allocate FFServerInfo struct.\n");
@@ -551,6 +550,30 @@ void *accept_thread(void *arg)
ffinfo->httpd = info->httpd;
ffinfo->client = client;
ffinfo->server = server;
+
+
+ // try to serve file
+ if (requested_file[0]) {
+ snprintf(sanitized_file, 1024, "%s", requested_file);
+ resource = requested_file;
+ while(resource && *resource == '/') {
+ resource++;
+ }
+
+ snprintf(sanitized_file, 1024, "%s", resource);
+ fileserver_schedule(info->fs, ffinfo, sanitized_file);
+ continue;
+ }
+
+ avio_buffer = av_malloc(AV_BUFSIZE);
+ if (!avio_buffer) {
+ av_log(client_ctx, AV_LOG_ERROR, "Could not allocate output format context.\n");
+ publisher_cancel_reserve(pub);
+ info->httpd->close(server, client);
+ continue;
+ }
+
+
client_ctx = avio_alloc_context(avio_buffer, AV_BUFSIZE, 1, ffinfo, NULL, &ffserver_write, NULL);
if (!client_ctx) {
av_log(NULL, AV_LOG_ERROR, "Could not allocate output format context.\n");
@@ -676,16 +699,52 @@ void *write_thread(void *arg)
return NULL;
}
+void *fileserver_thread(void *arg)
+{
+ struct FileserverContext *fs = (struct FileserverContext*) arg;
+ int i, clients_served;
+ struct FileserverClient *c;
+ for (;;) {
+ usleep(500000);
+ av_log(NULL, AV_LOG_WARNING, "Checking clients, fileserver-thread %s: ", fs->server_name);
+ clients_served = 0;
+ for (i = 0; i < MAX_CLIENTS; i++) {
+ c = &fs->clients[i];
+ pthread_mutex_lock(&c->client_lock);
+ if (c->buf) {
+ c->ffinfo->httpd->write(c->ffinfo->server, c->ffinfo->client, c->buf, c->size);
+ av_file_unmap(c->buf, c->size);
+ c->ffinfo->httpd->close(c->ffinfo->server, c->ffinfo->client);
+ c->buf = NULL;
+ c->size = 0;
+ av_freep(&c->ffinfo);
+ clients_served++;
+ }
+ pthread_mutex_unlock(&c->client_lock);
+ }
+ av_log(NULL, AV_LOG_WARNING, "%d/%d served\n", clients_served, MAX_CLIENTS);
+ if (fs->shutdown) {
+ printf("shutting down\n");
+ break;
+ }
+ }
+
+ return NULL;
+}
+
void *run_server(void *arg) {
struct AcceptInfo ainfo;
struct ReadInfo *rinfos;
struct WriteInfo **winfos_p;
struct HTTPDConfig *config = (struct HTTPDConfig*) arg;
struct PublisherContext **pubs;
+ struct FileserverContext *fs = NULL;
AVFormatContext **ifmt_ctxs;
int ret, i, stream_index;
+ int stream_formats[FMT_NB] = { 0 };
pthread_t *r_threads;
pthread_t **w_threads_p;
+ pthread_t fs_thread = 0;
pubs = av_mallocz_array(config->nb_streams, sizeof(struct PublisherContext*));
if (!pubs) {
@@ -718,6 +777,7 @@ void *run_server(void *arg) {
av_log_set_level(AV_LOG_INFO);
ainfo.pubs = pubs;
+ ainfo.fs = fs;
ainfo.ifmt_ctxs = ifmt_ctxs;
ainfo.nb_pub = config->nb_streams;
ainfo.httpd = &lavfhttpd;
@@ -751,56 +811,75 @@ void *run_server(void *arg) {
struct WriteInfo *winfos = NULL;
pthread_t *w_threads = NULL;
pthread_t r_thread;
+ int stream_formats[FMT_NB] = { 0 };
rinfo.input_uri = config->streams[stream_index].input_uri;
rinfo.server_name = config->server_name;
+ for (i = 0; i < config->streams[stream_index].nb_formats; i++)
+ stream_formats[config->streams[stream_index].formats[i]] = 1;
+
if ((ret = avformat_open_input(&ifmt_ctx, rinfo.input_uri, NULL, NULL))) {
av_log(NULL, AV_LOG_ERROR, "run_server: Could not open input\n");
continue;
}
+
ifmt_ctxs[stream_index] = ifmt_ctx;
+ if (stream_formats[FMT_MATROSKA])
+ publisher_init(&pub, config->streams[stream_index].stream_name);
- publisher_init(&pub, config->streams[stream_index].stream_name);
pubs[stream_index] = pub;
rinfo.ifmt_ctx = ifmt_ctx;
rinfo.pub = pub;
rinfos[stream_index] = rinfo;
-
- w_threads = av_mallocz_array(pub->nb_threads, sizeof(pthread_t));
- if (!w_threads) {
- av_log(NULL, AV_LOG_ERROR, "Could not allocate write thread handles.\n");
- continue;
- }
- winfos = av_mallocz_array(pub->nb_threads, sizeof(struct WriteInfo));
- if (!winfos) {
- av_log(NULL, AV_LOG_ERROR, "Could not allocate write infos.\n");
- continue;
- }
- w_threads_p[stream_index] = w_threads;
- winfos_p[stream_index] = winfos;
-
- for (i = 0; i < pub->nb_threads; i++) {
- winfos[i].pub = pub;
- winfos[i].thread_id = i;
- ret = pthread_create(&w_threads[i], NULL, write_thread, &winfos_p[stream_index][i]);
+ if (stream_formats[FMT_MATROSKA]) {
+ w_threads = av_mallocz_array(pub->nb_threads, sizeof(pthread_t));
+ if (!w_threads) {
+ av_log(NULL, AV_LOG_ERROR, "Could not allocate write thread handles.\n");
+ continue;
+ }
+ winfos = av_mallocz_array(pub->nb_threads, sizeof(struct WriteInfo));
+ if (!winfos) {
+ av_log(NULL, AV_LOG_ERROR, "Could not allocate write infos.\n");
+ continue;
+ }
+ w_threads_p[stream_index] = w_threads;
+ winfos_p[stream_index] = winfos;
+
+ for (i = 0; i < pub->nb_threads; i++) {
+ winfos[i].pub = pub;
+ winfos[i].thread_id = i;
+ ret = pthread_create(&w_threads[i], NULL, write_thread, &winfos_p[stream_index][i]);
+ if (ret != 0) {
+ pub->shutdown = 1;
+ w_threads[i] = 0;
+ goto end;
+ }
+ }
+ w_threads_p[stream_index] = w_threads;
+ ret = pthread_create(&r_thread, NULL, read_thread, &rinfos[stream_index]);
if (ret != 0) {
pub->shutdown = 1;
+ r_thread = 0;
goto end;
}
+ r_threads[stream_index] = r_thread;
}
- w_threads_p[stream_index] = w_threads;
- ret = pthread_create(&r_thread, NULL, read_thread, &rinfos[stream_index]);
+
+ }
+ if (stream_formats[FMT_HLS] || stream_formats[FMT_DASH]) {
+ ret = pthread_create(&fs_thread, NULL, fileserver_thread, fs);
if (ret != 0) {
- pub->shutdown = 1;
+ fs->shutdown = 1;
+ fs_thread = 0;
goto end;
}
- r_threads[stream_index] = r_thread;
}
+
//pthread_create(&a_thread, NULL, accept_thread, &ainfo);
accept_thread(&ainfo);
@@ -823,6 +902,11 @@ end:
publisher_free(pubs[stream_index]);
}
+ if (fs_thread) {
+ fs->shutdown = 1;
+ pthread_join(fs_thread, NULL);
+ fileserver_free(fs);
+ }
error_cleanup:
av_free(rinfos);
av_free(winfos_p);
new file mode 100644
@@ -0,0 +1,97 @@
+/*
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include "fileserver.h"
+#include "httpd.h"
+
+#include <pthread.h>
+
+#include <libavutil/log.h>
+#include <libavutil/file.h>
+
+void fileserver_schedule(struct FileserverContext *fs, struct FFServerInfo *ffinfo, const char *filename)
+{
+ int i, ret;
+ char final_filename[1024];
+ struct FileserverClient *fsc = NULL;
+ for (i = 0; i < MAX_CLIENTS; i++) {
+ pthread_mutex_lock(&fs->clients[i].client_lock);
+ if (!fs->clients[i].ffinfo) {
+ fsc = &fs->clients[i];
+ break;
+ }
+ pthread_mutex_unlock(&fs->clients[i].client_lock);
+ }
+ // fsc still locked
+ if (fsc) {
+ fsc->ffinfo = ffinfo;
+ snprintf(final_filename, 1024, "%s/%s", fs->server_name, filename);
+ ret = av_file_map(final_filename, &fsc->buf, &fsc->size, 0, NULL);
+ if (ret < 0) {
+ av_log(NULL, AV_LOG_ERROR, "Could not read file: %s: %s.\n", final_filename, av_err2str(ret));
+ fsc->ffinfo->httpd->close(fsc->ffinfo->server, fsc->ffinfo->client);
+ av_freep(&fsc->ffinfo);
+ fsc->buf = NULL;
+ fsc->size = 0;
+ }
+ fsc->ofmt_ctx = ofmt_ctx;
+ pthread_mutex_unlock(&fsc->client_lock);
+ } else {
+ av_log(NULL, AV_LOG_WARNING, "Could not find free client slot.\n");
+ ffinfo->httpd->close(ffinfo->server, ffinfo->client);
+ av_free(ffinfo);
+ }
+ av_log(NULL, AV_LOG_INFO, "Scheduled serving file: %s\n", final_filename);
+ return;
+}
+
+void fileserver_init(struct FileserverContext **fs_p, const char *server_name)
+{
+ struct FileserverContext *fs = av_mallocz(sizeof(struct FileserverContext));
+ int i;
+ *fs_p = NULL;
+
+ if (!fs) {
+ av_log(NULL, AV_LOG_ERROR, "Could not allocate fileserver context.\n");
+ return;
+ }
+
+ for (i = 0; i < MAX_CLIENTS; i++) {
+ fs->clients[i].ffinfo = NULL;
+ fs->clients[i].buf = NULL;
+ fs->clients[i].size = 0;
+ pthread_mutex_init(&fs->clients[i].client_lock, NULL);
+ }
+
+ fs->nb_threads = 1;
+ fs->server_name = server_name;
+ fs->shutdown = 0;
+ *fs_p = fs;
+ return;
+}
+
+void fileserver_free(struct FileserverContext *fs) {
+ int i;
+ for (i = 0; i < MAX_CLIENTS; i++) {
+ pthread_mutex_lock(&fs->clients[i].client_lock);
+ av_free(fs->clients[i].ffinfo);
+ av_free(fs->clients[i].buf);
+ pthread_mutex_unlock(&fs->clients[i].client_lock);
+ }
+ av_free(fs);
+}
new file mode 100644
@@ -0,0 +1,63 @@
+/*
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef FILESERVER_H
+#define FILESERVER_H
+
+#define MAX_CLIENTS 16
+
+#include "httpd.h"
+#include <sys/types.h>
+
+struct FileserverClient {
+ struct FFServerInfo *ffinfo;
+ pthread_mutex_t client_lock;
+ unsigned char *buf;
+ size_t size;
+};
+
+
+struct FileserverContext {
+ struct FileserverClient clients[MAX_CLIENTS];
+ int nb_threads;
+ int shutdown;
+ const char *server_name;
+};
+
+/**
+ * Schedule file for sending to a client. It is added to a list of clients.
+ * @param fs pointer to the FileserverContext to schedule to
+ * @param ffinfo pointer to an FFServerInfo struct containing information for serving the file
+ * @param filename filename of the file to serve
+ */
+void fileserver_schedule(struct FileserverContext *fs, struct FFServerInfo *ffinfo, AVIOContext *ofmt_ctx, const char *filename);
+
+/**
+ * Initialize a Fileservercontext
+ * @param fs_p pointer to a pointer where the FileserverContext will be allocated
+ * @param server_name the name of the server
+ */
+void fileserver_init(struct FileserverContext **fs_p, const char *server_name);
+
+/**
+ * Free a Fileservercontext
+ */
+void fileserver_free(struct FileserverContext *fs);
+
+
+#endif
Signed-off-by: Stephan Holljes <klaxa1337@googlemail.com> --- Makefile | 7 ++- ffserver.c | 150 +++++++++++++++++++++++++++++++++++++++------------ fileserver.c | 97 +++++++++++++++++++++++++++++++++ fileserver.h | 63 ++++++++++++++++++++++ 4 files changed, 282 insertions(+), 35 deletions(-) create mode 100644 fileserver.c create mode 100644 fileserver.h