[FFmpeg-devel,2/4] ffserver: Implement publisher

Submitted by Stephan Holljes on April 12, 2018, 1:35 p.m.

Details

Message ID 20180412133549.19939-3-klaxa1337@googlemail.com
State New
Headers show

Commit Message

Stephan Holljes April 12, 2018, 1:35 p.m.
---
 publisher.c | 278 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 publisher.h | 134 +++++++++++++++++++++++++++++
 2 files changed, 412 insertions(+)
 create mode 100644 publisher.c
 create mode 100644 publisher.h

Comments

Michael Niedermayer April 13, 2018, 3:17 p.m.
On Thu, Apr 12, 2018 at 03:35:47PM +0200, Stephan Holljes wrote:
> ---
>  publisher.c | 278 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>  publisher.h | 134 +++++++++++++++++++++++++++++
>  2 files changed, 412 insertions(+)
>  create mode 100644 publisher.c
>  create mode 100644 publisher.h
> 
> diff --git a/publisher.c b/publisher.c
> new file mode 100644
> index 0000000..d1ccb95
> --- /dev/null
> +++ b/publisher.c
> @@ -0,0 +1,278 @@
> +#include "publisher.h"
> +#include "segment.h"
> +#include <libavutil/log.h>
> +
> +void client_log(struct Client *c)
> +{
> +    char state[64];
> +    sprintf("State: ", state);
> +    switch(c->state) {
> +    case FREE:
> +        sprintf(state, "FREE");
> +        break;
> +    case RESERVED:
> +        sprintf(state, "RESERVED");
> +        break;
> +    case WAIT:
> +        sprintf(state, "WAIT");
> +        break;
> +    case WRITABLE:
> +        sprintf(state, "WRITABLE");
> +        break;
> +    case BUSY:
> +        sprintf(state, "BUSY");
> +        break;
> +    case BUFFER_FULL:
> +        sprintf(state, "BUFFER_FULL");
> +        break;
> +    default:
> +        sprintf(state, "UNDEFINED");
> +        break;
> +    }
> +    av_log(NULL, AV_LOG_INFO, "%s\n", state);

const char *state = "UNDEFINED";
...
case WAIT:
    state = "WAIT"
...

av_log(..., "State: %s", state);

simpler, no buffer, no copy (which lacks buffer size checks)


[...]
> +void publisher_gen_status_json(struct PublisherContext *pub, char *status)
> +{
> +    int nb_free = 0, nb_reserved = 0, nb_wait = 0, nb_writable = 0, nb_busy = 0, nb_buffer_full = 0, current_read = 0, newest_write = 0, oldest_write = 0;
> +    int i;
> +    struct Client *c;
> +
> +    current_read = pub->current_segment_id;
> +    oldest_write = current_read;
> +
> +    for (i = 0; i < MAX_CLIENTS; i++) {
> +        c = &pub->clients[i];
> +        if (c->current_segment_id > 0 && c->current_segment_id < oldest_write) {
> +            oldest_write = c->current_segment_id;
> +        }
> +        if (c->current_segment_id > newest_write) {
> +            newest_write = c->current_segment_id;
> +        }
> +
> +        switch(c->state) {
> +        case FREE:
> +            nb_free++;
> +            continue;
> +        case RESERVED:
> +            nb_reserved++;
> +            continue;
> +        case WAIT:
> +            nb_wait++;
> +            continue;
> +        case WRITABLE:
> +            nb_writable++;
> +            continue;
> +        case BUSY:
> +            nb_busy++;
> +            continue;
> +        case BUFFER_FULL:
> +            nb_buffer_full++;
> +            continue;
> +        default:
> +            continue;
> +        }
> +    }

you can simplify this substantially:

int nb_table[STATE_NB] = {0};

...
for()
    nb_table[c->state]++;

[..]

Patch hide | download patch | download mbox

diff --git a/publisher.c b/publisher.c
new file mode 100644
index 0000000..d1ccb95
--- /dev/null
+++ b/publisher.c
@@ -0,0 +1,278 @@ 
+#include "publisher.h"
+#include "segment.h"
+#include <libavutil/log.h>
+
+void client_log(struct Client *c)
+{
+    char state[64];
+    sprintf("State: ", state);
+    switch(c->state) {
+    case FREE:
+        sprintf(state, "FREE");
+        break;
+    case RESERVED:
+        sprintf(state, "RESERVED");
+        break;
+    case WAIT:
+        sprintf(state, "WAIT");
+        break;
+    case WRITABLE:
+        sprintf(state, "WRITABLE");
+        break;
+    case BUSY:
+        sprintf(state, "BUSY");
+        break;
+    case BUFFER_FULL:
+        sprintf(state, "BUFFER_FULL");
+        break;
+    default:
+        sprintf(state, "UNDEFINED");
+        break;
+    }
+    av_log(NULL, AV_LOG_INFO, "%s\n", state);
+}
+
+void client_disconnect(struct Client *c)
+{
+    struct Segment *seg;
+    av_write_trailer(c->ofmt_ctx);
+    avio_close(c->ofmt_ctx->pb);
+    avformat_free_context(c->ofmt_ctx);
+    while(av_fifo_size(c->buffer)) {
+        av_fifo_generic_read(c->buffer, &seg, sizeof(struct Segment*), NULL);
+        segment_unref(seg);
+    }
+    client_set_state(c, FREE);
+    c->current_segment_id = -1;
+}
+
+void client_set_state(struct Client *c, enum State state)
+{
+    pthread_mutex_lock(&c->state_lock);
+    c->state = state;
+    pthread_mutex_unlock(&c->state_lock);
+}
+
+void client_push_segment(struct Client *c, struct Segment *seg)
+{
+    if (av_fifo_space(c->buffer) == 0) {
+        av_log(NULL, AV_LOG_WARNING, "Client buffer full, dropping Segment.\n");
+        client_set_state(c, BUFFER_FULL);
+        return;
+    }
+    segment_ref(seg);
+    av_fifo_generic_write(c->buffer, &seg, sizeof(struct Segment*), NULL);
+    client_set_state(c, WRITABLE);
+}
+
+void publisher_init(struct PublisherContext **pub)
+{
+    int i;
+    struct PublisherContext *pc = (struct PublisherContext*) malloc(sizeof(struct PublisherContext));
+    pc->nb_threads = 4;
+    pc->current_segment_id = -1;
+    pc->shutdown = 0;
+    pc->buffer = av_fifo_alloc_array(sizeof(struct Segment), MAX_SEGMENTS);
+    pc->fs_buffer = av_fifo_alloc_array(sizeof(struct Segment), MAX_SEGMENTS);
+    for (i = 0; i < MAX_CLIENTS; i++) {
+        struct Client *c = &pc->clients[i];
+        c->buffer = av_fifo_alloc_array(sizeof(struct Segment), MAX_SEGMENTS);
+        c->id = i;
+        c->current_segment_id = -1;
+        pthread_mutex_init(&c->state_lock, NULL);
+        client_set_state(c, FREE);
+    }
+    *pub = pc;
+}
+
+void publisher_push_segment(struct PublisherContext *pub, struct Segment *seg)
+{
+    struct Segment *drop;
+    av_fifo_generic_write(pub->buffer, &seg, sizeof(struct Segment*), NULL);
+    segment_ref(seg);
+    if (av_fifo_size(pub->fs_buffer) >= BUFFER_SEGMENTS * sizeof(struct Segment*)) {
+        av_fifo_generic_read(pub->fs_buffer, &drop, sizeof(struct Segment*), NULL);
+        segment_unref(drop);
+    }
+    av_fifo_generic_write(pub->fs_buffer, &seg, sizeof(struct Segment*), NULL);
+    segment_ref(seg);
+}
+
+int publisher_reserve_client(struct PublisherContext *pub)
+{
+    int i;
+    for (i = 0; i < MAX_CLIENTS; i++) {
+        switch(pub->clients[i].state) {
+            case FREE:
+                client_set_state(&pub->clients[i], RESERVED);
+                return 0;
+            default:
+                continue;
+        }
+    }
+    return 1;
+}
+
+void publisher_cancel_reserve(struct PublisherContext *pub)
+{
+    int i;
+    for (i = 0; i < MAX_CLIENTS; i++) {
+        switch(pub->clients[i].state) {
+            case RESERVED:
+                client_set_state(&pub->clients[i], FREE);
+                return;
+            default:
+                continue;
+        }
+    }
+    return;
+}
+
+void client_push_prebuffer(struct PublisherContext *pub, struct Client *c)
+{
+    int off;
+    int size;
+    struct Segment *seg;
+    size = av_fifo_size(pub->fs_buffer);
+    for (off = 0; off < size; off += sizeof(struct Segment*)) {
+        av_fifo_generic_peek_at(pub->fs_buffer, &seg, off, sizeof(struct Segment*), NULL);
+        client_push_segment(c, seg);
+    }
+}
+
+void publisher_add_client(struct PublisherContext *pub, AVFormatContext *ofmt_ctx)
+{
+    int i;
+    struct Segment *prebuffer_seg;
+    for (i = 0; i < MAX_CLIENTS; i++) {
+        switch(pub->clients[i].state) {
+            case RESERVED:
+                pub->clients[i].ofmt_ctx = ofmt_ctx;
+                client_set_state(&pub->clients[i], WRITABLE);
+                client_push_prebuffer(pub, &pub->clients[i]);
+                return;
+            default:
+                continue;
+        }
+    }
+}
+
+void publisher_free(struct PublisherContext *pub)
+{
+    int i;
+    struct Segment *seg;
+    while(av_fifo_size(pub->buffer)) {
+        av_fifo_generic_read(pub->buffer, &seg, sizeof(struct Segment*), NULL);
+        segment_unref(seg);
+    }
+    av_fifo_freep(&pub->buffer);
+    while(av_fifo_size(pub->fs_buffer)) {
+        av_fifo_generic_read(pub->fs_buffer, &seg, sizeof(struct Segment*), NULL);
+        segment_unref(seg);
+    }
+    av_fifo_freep(&pub->fs_buffer);
+    for (i = 0; i < MAX_CLIENTS; i++) {
+        av_fifo_freep(&pub->clients[i].buffer);
+    }
+    return;
+}
+
+void publisher_freep(struct PublisherContext **pub)
+{
+    publisher_free(*pub);
+    *pub = NULL;
+    return;
+}
+
+void publish(struct PublisherContext *pub)
+{
+    int i;
+    struct Segment *seg;
+    av_log(NULL, AV_LOG_DEBUG, "pub->buffer size: %d\n", av_fifo_size(pub->buffer));
+    if (av_fifo_size(pub->buffer) == 0)
+        return;
+    av_log(NULL, AV_LOG_DEBUG, "Peeking buffer\n");
+    av_fifo_generic_read(pub->buffer, &seg, sizeof(struct Segment*), NULL);
+    av_log(NULL, AV_LOG_DEBUG, "Peeked buffer\n");
+    if (seg) {
+        pub->current_segment_id = seg->id;
+        for (i = 0; i < MAX_CLIENTS; i++) {
+            switch(pub->clients[i].state) {
+                case BUFFER_FULL:
+                    av_log(NULL, AV_LOG_WARNING, "Dropping segment for client %d, buffer full.\n", i);
+                    continue;
+                case WAIT:
+                case WRITABLE:
+                    client_push_segment(&pub->clients[i], seg);
+                default:
+                    continue;
+            }
+        }
+        segment_unref(seg);
+    }
+}
+
+void publisher_gen_status_json(struct PublisherContext *pub, char *status)
+{
+    int nb_free = 0, nb_reserved = 0, nb_wait = 0, nb_writable = 0, nb_busy = 0, nb_buffer_full = 0, current_read = 0, newest_write = 0, oldest_write = 0;
+    int i;
+    struct Client *c;
+
+    current_read = pub->current_segment_id;
+    oldest_write = current_read;
+
+    for (i = 0; i < MAX_CLIENTS; i++) {
+        c = &pub->clients[i];
+        if (c->current_segment_id > 0 && c->current_segment_id < oldest_write) {
+            oldest_write = c->current_segment_id;
+        }
+        if (c->current_segment_id > newest_write) {
+            newest_write = c->current_segment_id;
+        }
+
+        switch(c->state) {
+        case FREE:
+            nb_free++;
+            continue;
+        case RESERVED:
+            nb_reserved++;
+            continue;
+        case WAIT:
+            nb_wait++;
+            continue;
+        case WRITABLE:
+            nb_writable++;
+            continue;
+        case BUSY:
+            nb_busy++;
+            continue;
+        case BUFFER_FULL:
+            nb_buffer_full++;
+            continue;
+        default:
+            continue;
+        }
+    }
+
+
+    snprintf(status, 4095,
+    "{\n\t\"free\": %d,\n"
+    "\t\"reserved\": %d,\n"
+    "\t\"wait\": %d,\n"
+    "\t\"writable\": %d,\n"
+    "\t\"busy\": %d\n"
+    "\t\"buffer_full\": %d\n"
+    "\t\"current_read\": %d\n"
+    "\t\"newest_write\": %d\n"
+    "\t\"oldest_write\": %d\n"
+    "}\n",
+    nb_free,
+    nb_reserved,
+    nb_wait,
+    nb_writable,
+    nb_busy,
+    nb_buffer_full,
+    current_read,
+    newest_write,
+    oldest_write);
+}
diff --git a/publisher.h b/publisher.h
new file mode 100644
index 0000000..7646fda
--- /dev/null
+++ b/publisher.h
@@ -0,0 +1,134 @@ 
+#ifndef PUBLISHER_H
+#define PUBLISHER_H
+
+#include <libavformat/avformat.h>
+#include <libavutil/fifo.h>
+#include <pthread.h>
+#include "segment.h"
+
+#define MAX_CLIENTS 16
+#define MAX_SEGMENTS 16
+#define BUFFER_SEGMENTS 10
+
+/* Client State enum */
+
+enum State {
+    FREE,       // no client connected
+    RESERVED,   // reserved for a client that just connected
+    WAIT,       // up to date, no new Segments to write
+    WRITABLE,   // buffer is not full, new Segments can be pushed
+    BUSY,       // currently writing to this client
+    BUFFER_FULL // client buffer is full, new Segments will be dropped
+};
+
+
+struct Client {
+    AVFormatContext *ofmt_ctx; // writable AVFormatContext, basically our tcp connection to the client
+    AVFifoBuffer *buffer; // Client buffer of Segment references
+    enum State state;
+    pthread_mutex_t state_lock;
+    int id;
+    int current_segment_id; // The stream-based id of the segment that has last been worked on.
+};
+
+struct PublisherContext {
+    struct Client clients[MAX_CLIENTS]; // currently compile-time configuration, easly made dynamic with malloc?
+    AVFifoBuffer *buffer; // publisher buffer for new Segments
+    AVFifoBuffer *fs_buffer; // fast start buffer
+    int nb_threads;
+    int current_segment_id;
+    int shutdown; // indicate shutdown, gracefully close client connections and files and exit
+};
+
+/**
+ * Log a client's stats to the console.
+ *
+ * @param c pointer to the client to print
+ */
+void client_log(struct Client *c);
+
+/**
+ * Disconnect a client
+ *
+ * @param c pointer to the client to disconnect.
+ */
+void client_disconnect(struct Client *c);
+
+/**
+ * Set a client's state. Note: This is protected by mutex locks.
+ *
+ * @param c pointer to the client to set the state of
+ * @param state the state to set the client to
+ */
+void client_set_state(struct Client *c, enum State state);
+
+/**
+ * Allocate and initialize a PublisherContext
+ *
+ * @param pub pointer to a pointer to a PublisherContext. It will be allocated and initialized.
+ */
+void publisher_init(struct PublisherContext **pub);
+
+/**
+ * Push a Segment to a PublisherContext.
+ *
+ * @param pub pointer to a PublisherContext
+ * @param seg pointer to the Segment to add
+ */
+void publisher_push_segment(struct PublisherContext *pub, struct Segment *seg);
+
+/**
+ * Reserve a slot in the client struct of a PublisherContext. May fail if the number 
+ * of maximum clients has been reached.
+ *
+ * @param pub pointer to a PublisherContext
+ * @return 0 in case of success, 1 in case of failure
+ */
+int publisher_reserve_client(struct PublisherContext *pub);
+
+/**
+ * Cancel a single reservation. This can be used if a client spot was reserved, but the client
+ * unexpectedly disconnects or sends an invalid request.
+ *
+ * @param pub pointer to a PublisherContext
+ */
+void publisher_cancel_reserve(struct PublisherContext *pub);
+
+/**
+ * Add a client by its ofmt_ctx. This initializes an element in the client struct of the PublisherContext
+ * that has been reserved prior to calling this function.
+ *
+ * @param pub pointer to a PublisherContext
+ * @param ofmt_ctx AVFormatContext of a client
+ */
+void publisher_add_client(struct PublisherContext *pub, AVFormatContext *ofmt_ctx);
+
+/**
+ * Free buffers and associated client buffers.
+ *
+ * @param pub pointer to the PublisherContext to free
+ */
+void publisher_free(struct PublisherContext *pub);
+
+/**
+ * Free buffers and associated client buffers and set *pub to NULL.
+ *
+ * @param pub pointer to the PublisherContext pointer to free
+ */
+void publisher_freep(struct PublisherContext **pub);
+
+/**
+ * Signal to the PublisherContext to check its buffer and publish pending Segments.
+ *
+ * @param pub pointer to a PublisherContext
+ */
+void publish(struct PublisherContext *pub);
+
+/**
+ * Print the current client and file reading status to a json string.
+ * @param pub pointer to a PublisherContext
+ * @param status string of at least 4096 bytes size.
+ */
+void publisher_gen_status_json(struct PublisherContext *pub, char *status);
+
+#endif // PUBLISHER_H