diff mbox

[FFmpeg-devel,2/4] ffserver: Implement publisher

Message ID 20180417015233.31983-3-klaxa1337@googlemail.com
State Superseded
Headers show

Commit Message

Stephan Holljes April 17, 2018, 1:52 a.m. UTC
---
 publisher.c | 301 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 publisher.h | 156 +++++++++++++++++++++++++++++++
 2 files changed, 457 insertions(+)
 create mode 100644 publisher.c
 create mode 100644 publisher.h
diff mbox

Patch

diff --git a/publisher.c b/publisher.c
new file mode 100644
index 0000000..65dca70
--- /dev/null
+++ b/publisher.c
@@ -0,0 +1,301 @@ 
+/*
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include "publisher.h"
+#include "segment.h"
+#include <libavutil/log.h>
+
+void client_log(struct Client *c)
+{
+    av_log(NULL, AV_LOG_INFO, "State: ");
+    switch(c->state) {
+    case FREE:
+        av_log(NULL, AV_LOG_INFO, "FREE\n");
+        break;
+    case RESERVED:
+        av_log(NULL, AV_LOG_INFO, "RESERVED\n");
+        break;
+    case WAIT:
+        av_log(NULL, AV_LOG_INFO, "WAIT\n");
+        break;
+    case WRITABLE:
+        av_log(NULL, AV_LOG_INFO, "WRITABLE\n");
+        break;
+    case BUSY:
+        av_log(NULL, AV_LOG_INFO, "BUSY\n");
+        break;
+    case BUFFER_FULL:
+        av_log(NULL, AV_LOG_INFO, "BUFFER_FULL\n");
+        break;
+    default:
+        av_log(NULL, AV_LOG_INFO, "UNKOWN\n");
+        break;
+    }
+}
+
+void client_disconnect(struct Client *c, int write_trailer)
+{
+    struct Segment *seg;
+    if (write_trailer)
+        av_write_trailer(c->ofmt_ctx);
+    avio_close(c->ofmt_ctx->pb);
+    avformat_free_context(c->ofmt_ctx);
+    pthread_mutex_lock(&c->buffer_lock);
+    while(av_fifo_size(c->buffer)) {
+        av_fifo_generic_read(c->buffer, &seg, sizeof(struct Segment*), NULL);
+        segment_unref(seg);
+    }
+    pthread_mutex_unlock(&c->buffer_lock);
+    c->ofmt_ctx = NULL;
+    client_set_state(c, FREE);
+    c->current_segment_id = -1;
+}
+
+void client_set_state(struct Client *c, enum State state)
+{
+    pthread_mutex_lock(&c->state_lock);
+    c->state = state;
+    pthread_mutex_unlock(&c->state_lock);
+}
+
+void client_push_segment(struct Client *c, struct Segment *seg)
+{
+    pthread_mutex_lock(&c->buffer_lock);
+    if (av_fifo_space(c->buffer) == 0) {
+        av_log(NULL, AV_LOG_WARNING, "Client buffer full, dropping Segment.\n");
+        client_set_state(c, BUFFER_FULL);
+        pthread_mutex_unlock(&c->buffer_lock);
+        return;
+    }
+    segment_ref(seg);
+    av_fifo_generic_write(c->buffer, &seg, sizeof(struct Segment*), NULL);
+    pthread_mutex_unlock(&c->buffer_lock);
+    client_set_state(c, WRITABLE);
+}
+
+void publisher_init(struct PublisherContext **pub)
+{
+    int i;
+    struct PublisherContext *pc = (struct PublisherContext*) av_malloc(sizeof(struct PublisherContext));
+    pc->nb_threads = 4;
+    pc->current_segment_id = -1;
+    pc->shutdown = 0;
+    pc->buffer = av_fifo_alloc_array(sizeof(struct Segment), MAX_SEGMENTS);
+    pc->fs_buffer = av_fifo_alloc_array(sizeof(struct Segment), MAX_SEGMENTS);
+    pthread_mutex_init(&pc->buffer_lock, NULL);
+    pthread_mutex_init(&pc->fs_buffer_lock, NULL);
+    for (i = 0; i < MAX_CLIENTS; i++) {
+        struct Client *c = &pc->clients[i];
+        c->buffer = av_fifo_alloc_array(sizeof(struct Segment), MAX_SEGMENTS);
+        c->id = i;
+        c->current_segment_id = -1;
+        pthread_mutex_init(&c->state_lock, NULL);
+        pthread_mutex_init(&c->buffer_lock, NULL);
+        client_set_state(c, FREE);
+    }
+    *pub = pc;
+}
+
+void publisher_push_segment(struct PublisherContext *pub, struct Segment *seg)
+{
+    struct Segment *drop;
+    pthread_mutex_lock(&pub->buffer_lock);
+    pthread_mutex_lock(&pub->fs_buffer_lock);
+    av_fifo_generic_write(pub->buffer, &seg, sizeof(struct Segment*), NULL);
+    segment_ref(seg);
+    if (av_fifo_size(pub->fs_buffer) >= BUFFER_SEGMENTS * sizeof(struct Segment*)) {
+        av_fifo_generic_read(pub->fs_buffer, &drop, sizeof(struct Segment*), NULL);
+        segment_unref(drop);
+    }
+    av_fifo_generic_write(pub->fs_buffer, &seg, sizeof(struct Segment*), NULL);
+    pthread_mutex_unlock(&pub->buffer_lock);
+    pthread_mutex_unlock(&pub->fs_buffer_lock);
+    segment_ref(seg);
+}
+
+int publisher_reserve_client(struct PublisherContext *pub)
+{
+    int i;
+    for (i = 0; i < MAX_CLIENTS; i++) {
+        switch(pub->clients[i].state) {
+            case FREE:
+                client_set_state(&pub->clients[i], RESERVED);
+                return 0;
+            default:
+                continue;
+        }
+    }
+    return 1;
+}
+
+void publisher_cancel_reserve(struct PublisherContext *pub)
+{
+    int i;
+    for (i = 0; i < MAX_CLIENTS; i++) {
+        switch(pub->clients[i].state) {
+            case RESERVED:
+                client_set_state(&pub->clients[i], FREE);
+                return;
+            default:
+                continue;
+        }
+    }
+    return;
+}
+
+void client_push_prebuffer(struct PublisherContext *pub, struct Client *c)
+{
+    int off;
+    int size;
+    struct Segment *seg;
+    pthread_mutex_lock(&pub->fs_buffer_lock);
+    size = av_fifo_size(pub->fs_buffer);
+    for (off = 0; off < size; off += sizeof(struct Segment*)) {
+        av_fifo_generic_peek_at(pub->fs_buffer, &seg, off, sizeof(struct Segment*), NULL);
+        client_push_segment(c, seg);
+    }
+    pthread_mutex_unlock(&pub->fs_buffer_lock);
+}
+
+void publisher_add_client(struct PublisherContext *pub, AVFormatContext *ofmt_ctx)
+{
+    int i;
+    for (i = 0; i < MAX_CLIENTS; i++) {
+        switch(pub->clients[i].state) {
+            case RESERVED:
+                pub->clients[i].ofmt_ctx = ofmt_ctx;
+                client_set_state(&pub->clients[i], WRITABLE);
+                client_push_prebuffer(pub, &pub->clients[i]);
+                return;
+            default:
+                continue;
+        }
+    }
+}
+
+void publisher_free(struct PublisherContext *pub)
+{
+    int i;
+    struct Segment *seg;
+    pthread_mutex_lock(&pub->buffer_lock);
+    while(av_fifo_size(pub->buffer)) {
+        av_fifo_generic_read(pub->buffer, &seg, sizeof(struct Segment*), NULL);
+        segment_unref(seg);
+    }
+    av_fifo_freep(&pub->buffer);
+    pthread_mutex_unlock(&pub->buffer_lock);
+    
+    pthread_mutex_lock(&pub->fs_buffer_lock);
+    while(av_fifo_size(pub->fs_buffer)) {
+        av_fifo_generic_read(pub->fs_buffer, &seg, sizeof(struct Segment*), NULL);
+        segment_unref(seg);
+    }
+    av_fifo_freep(&pub->fs_buffer);
+    for (i = 0; i < MAX_CLIENTS; i++) {
+        av_fifo_freep(&pub->clients[i].buffer);
+    }
+    pthread_mutex_unlock(&pub->fs_buffer_lock);
+    av_free(pub);
+    return;
+}
+
+void publisher_freep(struct PublisherContext **pub)
+{
+    publisher_free(*pub);
+    *pub = NULL;
+    return;
+}
+
+void publish(struct PublisherContext *pub)
+{
+    int i;
+    struct Segment *seg;
+    char filename[128] = {0};
+    pthread_mutex_lock(&pub->buffer_lock);
+    av_log(NULL, AV_LOG_DEBUG, "pub->buffer size: %d\n", av_fifo_size(pub->buffer));
+    if (av_fifo_size(pub->buffer) == 0) {
+        pthread_mutex_unlock(&pub->buffer_lock);
+        return;
+    }
+    av_fifo_generic_read(pub->buffer, &seg, sizeof(struct Segment*), NULL);
+    pthread_mutex_unlock(&pub->buffer_lock);
+    if (seg) {
+        pub->current_segment_id = seg->id;
+        snprintf(filename, 127, "segment-%04d.mkv", seg->id);
+        segment_save(seg, filename);
+        client_log(pub->clients);
+        
+        for (i = 0; i < MAX_CLIENTS; i++) {
+            switch(pub->clients[i].state) {
+                case BUFFER_FULL:
+                    av_log(pub, AV_LOG_WARNING, "Dropping segment for client %d, buffer full.\n", i);
+                    continue;
+                case WAIT:
+                case WRITABLE:
+                    client_push_segment(&pub->clients[i], seg);
+                default:
+                    continue;
+            }
+        }
+        segment_unref(seg);
+    }
+}
+
+void publisher_gen_status_json(struct PublisherContext *pub, char *status)
+{
+    int states[STATE_NB] = {0};
+    int current_read = 0, newest_write = 0, oldest_write = 0;
+    int i;
+    struct Client *c;
+
+    current_read = pub->current_segment_id;
+    oldest_write = current_read;
+
+    for (i = 0; i < MAX_CLIENTS; i++) {
+        c = &pub->clients[i];
+        if (c->current_segment_id > 0 && c->current_segment_id < oldest_write) {
+            oldest_write = c->current_segment_id;
+        }
+        if (c->current_segment_id > newest_write) {
+            newest_write = c->current_segment_id;
+        }
+        states[c->state]++;
+    }
+    
+
+    snprintf(status, 4095,
+    "{\n\t\"free\": %d,\n"
+    "\t\"reserved\": %d,\n"
+    "\t\"wait\": %d,\n"
+    "\t\"writable\": %d,\n"
+    "\t\"busy\": %d,\n"
+    "\t\"buffer_full\": %d,\n"
+    "\t\"current_read\": %d,\n"
+    "\t\"newest_write\": %d,\n"
+    "\t\"oldest_write\": %d\n"
+    "}\n",
+    states[FREE],
+    states[RESERVED],
+    states[WAIT],
+    states[WRITABLE],
+    states[BUSY],
+    states[BUFFER_FULL],
+    current_read,
+    newest_write,
+    oldest_write);
+}
diff --git a/publisher.h b/publisher.h
new file mode 100644
index 0000000..2508837
--- /dev/null
+++ b/publisher.h
@@ -0,0 +1,156 @@ 
+/*
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef PUBLISHER_H
+#define PUBLISHER_H
+
+#include <libavformat/avformat.h>
+#include <libavutil/fifo.h>
+#include <pthread.h>
+#include "segment.h"
+
+#define MAX_CLIENTS 16
+#define MAX_SEGMENTS 16
+#define BUFFER_SEGMENTS 10
+
+/* Client State enum */
+
+enum State {
+    FREE,         // no client connected
+    RESERVED,     // reserved for a client that just connected
+    WAIT,         // up to date, no new Segments to write
+    WRITABLE,     // buffer is not full, new Segments can be pushed
+    BUSY,         // currently writing to this client
+    BUFFER_FULL,  // client buffer is full, new Segments will be dropped
+    STATE_NB
+};
+
+
+struct Client {
+    AVFormatContext *ofmt_ctx; // writable AVFormatContext, basically our tcp connection to the client
+    AVFifoBuffer *buffer; // Client buffer of Segment references
+    enum State state;
+    pthread_mutex_t buffer_lock;
+    pthread_mutex_t state_lock;
+    int id;
+    int current_segment_id; // The stream-based id of the segment that has last been worked on.
+};
+
+struct PublisherContext {
+    struct Client clients[MAX_CLIENTS]; // currently compile-time configuration, easly made dynamic with malloc?
+    AVFifoBuffer *buffer; // publisher buffer for new Segments
+    AVFifoBuffer *fs_buffer; // fast start buffer
+    pthread_mutex_t buffer_lock;
+    pthread_mutex_t fs_buffer_lock;
+    int nb_threads;
+    int current_segment_id;
+    int shutdown; // indicate shutdown, gracefully close client connections and files and exit
+};
+
+/**
+ * Log a client's stats to the console.
+ *
+ * @param c pointer to the client to print
+ */
+void client_log(struct Client *c);
+
+/**
+ * Disconnect a client.
+ *
+ * @param c pointer to the client to disconnect.
+ */
+void client_disconnect(struct Client *c, int write_trailer);
+
+/**
+ * Set a client's state. Note: This is protected by mutex locks.
+ *
+ * @param c pointer to the client to set the state of
+ * @param state the state to set the client to
+ */
+void client_set_state(struct Client *c, enum State state);
+
+/**
+ * Allocate and initialize a PublisherContext
+ *
+ * @param pub pointer to a pointer to a PublisherContext. It will be allocated and initialized.
+ */
+void publisher_init(struct PublisherContext **pub);
+
+/**
+ * Push a Segment to a PublisherContext.
+ *
+ * @param pub pointer to a PublisherContext
+ * @param seg pointer to the Segment to add
+ */
+void publisher_push_segment(struct PublisherContext *pub, struct Segment *seg);
+
+/**
+ * Reserve a slot in the client struct of a PublisherContext. May fail if the number 
+ * of maximum clients has been reached.
+ *
+ * @param pub pointer to a PublisherContext
+ * @return 0 in case of success, 1 in case of failure
+ */
+int publisher_reserve_client(struct PublisherContext *pub);
+
+/**
+ * Cancel a single reservation. This can be used if a client spot was reserved, but the client
+ * unexpectedly disconnects or sends an invalid request.
+ *
+ * @param pub pointer to a PublisherContext
+ */
+void publisher_cancel_reserve(struct PublisherContext *pub);
+
+/**
+ * Add a client by its ofmt_ctx. This initializes an element in the client struct of the PublisherContext
+ * that has been reserved prior to calling this function.
+ *
+ * @param pub pointer to a PublisherContext
+ * @param ofmt_ctx AVFormatContext of a client
+ */
+void publisher_add_client(struct PublisherContext *pub, AVFormatContext *ofmt_ctx);
+
+/**
+ * Free buffers and associated client buffers.
+ *
+ * @param pub pointer to the PublisherContext to free
+ */
+void publisher_free(struct PublisherContext *pub);
+
+/**
+ * Free buffers and associated client buffers and set *pub to NULL.
+ *
+ * @param pub pointer to the PublisherContext pointer to free
+ */
+void publisher_freep(struct PublisherContext **pub);
+
+/**
+ * Signal to the PublisherContext to check its buffer and publish pending Segments.
+ *
+ * @param pub pointer to a PublisherContext
+ */
+void publish(struct PublisherContext *pub);
+
+/**
+ * Print the current client and file reading status to a json string.
+ * @param pub pointer to a PublisherContext
+ * @param status string of at least 4096 bytes size.
+ */
+void publisher_gen_status_json(struct PublisherContext *pub, char *status);
+
+#endif // PUBLISHER_H