From patchwork Sun May 13 00:07:37 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Stephan Holljes X-Patchwork-Id: 8933 Delivered-To: ffmpegpatchwork@gmail.com Received: by 2002:a02:155:0:0:0:0:0 with SMTP id c82-v6csp2437190jad; Sat, 12 May 2018 17:08:13 -0700 (PDT) X-Google-Smtp-Source: AB8JxZpe76S1RitYEb82P9W6wJVro1oMd25YVX2gSpNRsmqMXpdJmWM5XLcNVSfjGSODySc1uFLJ X-Received: by 2002:a1c:5d4f:: with SMTP id r76-v6mr2148506wmb.93.1526170093644; Sat, 12 May 2018 17:08:13 -0700 (PDT) ARC-Seal: i=1; a=rsa-sha256; t=1526170093; cv=none; d=google.com; s=arc-20160816; b=vd6EVIv8ujfNp2jAnVHki9/EqsTh94STvfDfa8utL4Xf7Syggq6GDGMJMp0epypbsp B0AZXJ9dJPff5N7ebk9Y/VlA6/nSkFoUBjdWszTdnv1HTuQyWStXRVd1Z9izd5HvBYmd MD+3oFoeoOeemakHkzTRR0DuKf1phKBkk28R/lqKA3FF9YEz6hk+XBHLBk6smqqm2E6g 4Q0lH5kkZ+hkDthHoTiqdtIAAJuGzGZB240qUi5VeQGWGIgd1eET2LooPReY77GPe0Aj 0SeaXrIL/F4EftpAcHhOOILkosMIfprHDpuybVPItGDV9pCmyh/o+jUiMxCVHJahncSt 4guw== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:content-transfer-encoding:mime-version:cc:reply-to :list-subscribe:list-help:list-post:list-archive:list-unsubscribe :list-id:precedence:subject:references:in-reply-to:message-id:date :to:from:dkim-signature:delivered-to:arc-authentication-results; bh=XxukiBiDCqGfQ2HMJcmk4TXMOTJQa+WNoVy35BAqTtc=; b=ABMJnbo7yoqyxE2+3GZTbria/2tlXo0B0thQS3F/PJL4FIg52q+jlMGOq1nkODKQXE YFzH53xRqUAhkP9AWRHt15R1jE7Hwro3rWOzq4uUsmdGo3twxZD69WugcUxUbrD6vZvX vivHsrEM3lnYL9oDKHrxlLFJOleGxy8bT7ZCvbuMcNpyWCXllN3y3qXlzdzOErhm2HtM Y5bdaMgwuqXvGEYFtwNUVV3Toxk3NLFJkJ9IAdozaCLHHTFuMGsJ5ZHyPIHm58owOGu4 PIxS6aUcHOF8y7gHLcHyeVFavyyziIVt2KCfJ1MWwxCJPNWITJOLq67h1Vv2LVmOZYZi WkWQ== ARC-Authentication-Results: i=1; mx.google.com; dkim=neutral (body hash did not verify) header.i=@googlemail.com header.s=20161025 header.b=scxBdnXU; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org; dmarc=fail (p=QUARANTINE sp=QUARANTINE dis=NONE) header.from=googlemail.com Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org. [79.124.17.100]) by mx.google.com with ESMTP id m88-v6si3131421wmi.48.2018.05.12.17.08.12; Sat, 12 May 2018 17:08:13 -0700 (PDT) Received-SPF: pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) client-ip=79.124.17.100; Authentication-Results: mx.google.com; dkim=neutral (body hash did not verify) header.i=@googlemail.com header.s=20161025 header.b=scxBdnXU; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org; dmarc=fail (p=QUARANTINE sp=QUARANTINE dis=NONE) header.from=googlemail.com Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id D270968A5EA; Sun, 13 May 2018 03:07:33 +0300 (EEST) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from mail-wm0-f54.google.com (mail-wm0-f54.google.com [74.125.82.54]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id 8FAC968A3B5 for ; Sun, 13 May 2018 03:07:27 +0300 (EEST) Received: by mail-wm0-f54.google.com with SMTP id l1-v6so9051898wmb.2 for ; Sat, 12 May 2018 17:08:04 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=googlemail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references; bh=nQF0CPALHt/djfuhgWlRwvLDesoTkstTXDRR4iXbXnY=; b=scxBdnXU3bQ7NFUCFwKEm0XVsFqobafCgKkPAiaqzKqVQsxVspygcmLh8/h6UD5uKc vy5oR3zNYxB/839Z9z5ZOrgvhqqcjAi0CpZYZc8hK9PvfCjazs0RLk6GJPNnNu5RQZCt O3fUntu8xmViq9hDDkoXlUt+DXgv4k5R+3W1auxPgAAllot40rkh4Q4RiWsisRHqAHB6 58NugzprecllZCBLNZ9KzSQqDyfgxoa7Y8yX8PD2tbcs8lD0uUCTKJYEsmztEi3lxj9n gv817bvUoi+M8mef0gKvLBXyvYaBTT8TdN7ZdDOwTqdgzMkqe/OGSIqii6oPxdRloeue kuCg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=nQF0CPALHt/djfuhgWlRwvLDesoTkstTXDRR4iXbXnY=; b=R+byEfBtYz8R5lH+WRkg7jMZ8SC+qudSGHeLKsigcFUq5oMd+ZKAJ03dXe+2AMBrMJ hV/WeiHCfRpDm1Q9pbdUppCisTezuWHoSf24FWzyUoWQdaIRbflx5rB3MFkJzacfYJFp ENQqH0f/XHlyvKkHYB/hV9CgDFvJQcTXNzbVJKse68s1jGMh5Plu9xUn5+CNrFsCvAsW //qAhBQTP5EE4X6oi3d/6KcibrZ7Gsy/LHXDO5MzC1RIfsAy/R/hBNVoLF4+LPv9FHmV fnDVSLKuMCt3j3nE/nRXimq7ifZy8TkcGi9eoKI8a+2GmPNaRZFvIGx54yaYykhkdCXT eoQA== X-Gm-Message-State: ALKqPwdDtUTcNMmFfgR3t8NtZkNxpgUzwpikuBX6QEv8OtYc85UGxs8O BuHirEfY6bbjEfbj4pfbMww3Jw== X-Received: by 2002:a1c:8895:: with SMTP id k143-v6mr2232634wmd.17.1526170083755; Sat, 12 May 2018 17:08:03 -0700 (PDT) Received: from localhost.localdomain ([46.5.2.0]) by smtp.gmail.com with ESMTPSA id r14-v6sm8245469wra.41.2018.05.12.17.08.02 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Sat, 12 May 2018 17:08:02 -0700 (PDT) From: Stephan Holljes To: ffmpeg-devel@ffmpeg.org Date: Sun, 13 May 2018 02:07:37 +0200 Message-Id: <20180513000740.12548-3-klaxa1337@googlemail.com> X-Mailer: git-send-email 2.16.2 In-Reply-To: <20180513000740.12548-1-klaxa1337@googlemail.com> References: <20180510154126.30789-1-klaxa1337@googlemail.com> <20180513000740.12548-1-klaxa1337@googlemail.com> Subject: [FFmpeg-devel] [PATCH 2/5] ffserver: Implement publisher X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.20 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Cc: Stephan Holljes MIME-Version: 1.0 Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" Signed-off-by: Stephan Holljes --- publisher.c | 308 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ publisher.h | 170 +++++++++++++++++++++++++++++++++ 2 files changed, 478 insertions(+) create mode 100644 publisher.c create mode 100644 publisher.h diff --git a/publisher.c b/publisher.c new file mode 100644 index 0000000..1123056 --- /dev/null +++ b/publisher.c @@ -0,0 +1,308 @@ +/* + * This file is part of FFmpeg. + * + * FFmpeg is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * FFmpeg is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with FFmpeg; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include "publisher.h" +#include "segment.h" +#include + +void client_log(struct Client *c) +{ + av_log(NULL, AV_LOG_INFO, "State: "); + switch(c->state) { + case FREE: + av_log(NULL, AV_LOG_INFO, "FREE\n"); + break; + case RESERVED: + av_log(NULL, AV_LOG_INFO, "RESERVED\n"); + break; + case WAIT: + av_log(NULL, AV_LOG_INFO, "WAIT\n"); + break; + case WRITABLE: + av_log(NULL, AV_LOG_INFO, "WRITABLE\n"); + break; + case BUSY: + av_log(NULL, AV_LOG_INFO, "BUSY\n"); + break; + case BUFFER_FULL: + av_log(NULL, AV_LOG_INFO, "BUFFER_FULL\n"); + break; + default: + av_log(NULL, AV_LOG_INFO, "UNKOWN\n"); + break; + } +} + +void client_disconnect(struct Client *c, int write_trailer) +{ + struct Segment *seg; + client_set_state(c, BUSY); + if (write_trailer) + av_write_trailer(c->ofmt_ctx); + c->ffinfo->httpd->close(c->ffinfo->server, c->ffinfo->client); + av_free(c->ofmt_ctx->pb->buffer); + avio_context_free(&c->ofmt_ctx->pb); + avformat_free_context(c->ofmt_ctx); + av_free(c->ffinfo); + c->ofmt_ctx = NULL; + c->ffinfo = NULL; + pthread_mutex_lock(&c->buffer_lock); + while(av_fifo_size(c->buffer)) { + av_fifo_generic_read(c->buffer, &seg, sizeof(struct Segment*), NULL); + segment_unref(seg); + } + pthread_mutex_unlock(&c->buffer_lock); + c->current_segment_id = -1; + client_set_state(c, FREE); +} + +void client_set_state(struct Client *c, enum State state) +{ + pthread_mutex_lock(&c->state_lock); + c->state = state; + pthread_mutex_unlock(&c->state_lock); +} + +void client_push_segment(struct Client *c, struct Segment *seg) +{ + pthread_mutex_lock(&c->buffer_lock); + if (av_fifo_space(c->buffer) == 0) { + av_log(NULL, AV_LOG_WARNING, "Client buffer full, dropping Segment.\n"); + client_set_state(c, BUFFER_FULL); + pthread_mutex_unlock(&c->buffer_lock); + return; + } + segment_ref(seg); + av_fifo_generic_write(c->buffer, &seg, sizeof(struct Segment*), NULL); + pthread_mutex_unlock(&c->buffer_lock); + client_set_state(c, WRITABLE); +} + +void publisher_init(struct PublisherContext **pub) +{ + int i; + struct PublisherContext *pc = (struct PublisherContext*) av_malloc(sizeof(struct PublisherContext)); + pc->nb_threads = 8; + pc->current_segment_id = -1; + pc->shutdown = 0; + pc->buffer = av_fifo_alloc_array(sizeof(struct Segment), MAX_SEGMENTS); + pc->fs_buffer = av_fifo_alloc_array(sizeof(struct Segment), MAX_SEGMENTS); + pthread_mutex_init(&pc->buffer_lock, NULL); + pthread_mutex_init(&pc->fs_buffer_lock, NULL); + for (i = 0; i < MAX_CLIENTS; i++) { + struct Client *c = &pc->clients[i]; + c->buffer = av_fifo_alloc_array(sizeof(struct Segment), MAX_SEGMENTS); + c->ofmt_ctx = NULL; + c->ffinfo = NULL; + c->id = i; + c->current_segment_id = -1; + pthread_mutex_init(&c->state_lock, NULL); + pthread_mutex_init(&c->buffer_lock, NULL); + client_set_state(c, FREE); + } + *pub = pc; +} + +void publisher_push_segment(struct PublisherContext *pub, struct Segment *seg) +{ + struct Segment *drop; + pthread_mutex_lock(&pub->buffer_lock); + pthread_mutex_lock(&pub->fs_buffer_lock); + av_fifo_generic_write(pub->buffer, &seg, sizeof(struct Segment*), NULL); + segment_ref(seg); + if (av_fifo_size(pub->fs_buffer) >= BUFFER_SEGMENTS * sizeof(struct Segment*)) { + av_fifo_generic_read(pub->fs_buffer, &drop, sizeof(struct Segment*), NULL); + segment_unref(drop); + } + av_fifo_generic_write(pub->fs_buffer, &seg, sizeof(struct Segment*), NULL); + pthread_mutex_unlock(&pub->buffer_lock); + pthread_mutex_unlock(&pub->fs_buffer_lock); + segment_ref(seg); +} + +int publisher_reserve_client(struct PublisherContext *pub) +{ + int i; + for (i = 0; i < MAX_CLIENTS; i++) { + switch(pub->clients[i].state) { + case FREE: + client_set_state(&pub->clients[i], RESERVED); + return 0; + default: + continue; + } + } + return 1; +} + +void publisher_cancel_reserve(struct PublisherContext *pub) +{ + int i; + for (i = 0; i < MAX_CLIENTS; i++) { + switch(pub->clients[i].state) { + case RESERVED: + client_set_state(&pub->clients[i], FREE); + return; + default: + continue; + } + } + return; +} + +void client_push_prebuffer(struct PublisherContext *pub, struct Client *c) +{ + int off; + int size; + struct Segment *seg; + pthread_mutex_lock(&pub->fs_buffer_lock); + size = av_fifo_size(pub->fs_buffer); + for (off = 0; off < size; off += sizeof(struct Segment*)) { + av_fifo_generic_peek_at(pub->fs_buffer, &seg, off, sizeof(struct Segment*), NULL); + client_push_segment(c, seg); + } + pthread_mutex_unlock(&pub->fs_buffer_lock); +} + +void publisher_add_client(struct PublisherContext *pub, AVFormatContext *ofmt_ctx, struct FFServerInfo *ffinfo) +{ + int i; + for (i = 0; i < MAX_CLIENTS; i++) { + switch(pub->clients[i].state) { + case RESERVED: + pub->clients[i].ofmt_ctx = ofmt_ctx; + pub->clients[i].ffinfo = ffinfo; + client_set_state(&pub->clients[i], WRITABLE); + client_push_prebuffer(pub, &pub->clients[i]); + return; + default: + continue; + } + } +} + +void publisher_free(struct PublisherContext *pub) +{ + int i; + struct Segment *seg; + pthread_mutex_lock(&pub->buffer_lock); + while(av_fifo_size(pub->buffer)) { + av_fifo_generic_read(pub->buffer, &seg, sizeof(struct Segment*), NULL); + segment_unref(seg); + } + av_fifo_freep(&pub->buffer); + pthread_mutex_unlock(&pub->buffer_lock); + + pthread_mutex_lock(&pub->fs_buffer_lock); + while(av_fifo_size(pub->fs_buffer)) { + av_fifo_generic_read(pub->fs_buffer, &seg, sizeof(struct Segment*), NULL); + segment_unref(seg); + } + av_fifo_freep(&pub->fs_buffer); + for (i = 0; i < MAX_CLIENTS; i++) { + av_fifo_freep(&pub->clients[i].buffer); + } + pthread_mutex_unlock(&pub->fs_buffer_lock); + av_free(pub); + return; +} + +void publisher_freep(struct PublisherContext **pub) +{ + publisher_free(*pub); + *pub = NULL; + return; +} + +void publish(struct PublisherContext *pub) +{ + int i; + struct Segment *seg; + char filename[128] = {0}; + pthread_mutex_lock(&pub->buffer_lock); + av_log(NULL, AV_LOG_DEBUG, "pub->buffer size: %d\n", av_fifo_size(pub->buffer)); + if (av_fifo_size(pub->buffer) == 0) { + pthread_mutex_unlock(&pub->buffer_lock); + return; + } + av_fifo_generic_read(pub->buffer, &seg, sizeof(struct Segment*), NULL); + pthread_mutex_unlock(&pub->buffer_lock); + if (seg) { + pub->current_segment_id = seg->id; + snprintf(filename, 127, "segment-%04d.mkv", seg->id); +// segment_save(seg, filename); + + for (i = 0; i < MAX_CLIENTS; i++) { + switch(pub->clients[i].state) { + case BUFFER_FULL: + av_log(pub, AV_LOG_WARNING, "Dropping segment for client %d, buffer full.\n", i); + continue; + case WAIT: + case WRITABLE: + client_push_segment(&pub->clients[i], seg); + default: + continue; + } + } + segment_unref(seg); + } +} + +void publisher_gen_status_json(struct PublisherContext *pub, char *status) +{ + int states[STATE_NB] = {0}; + int current_read = 0, newest_write = 0, oldest_write = 0; + int i; + struct Client *c; + + current_read = pub->current_segment_id; + oldest_write = current_read; + + for (i = 0; i < MAX_CLIENTS; i++) { + c = &pub->clients[i]; + if (c->current_segment_id > 0 && c->current_segment_id < oldest_write) { + oldest_write = c->current_segment_id; + } + if (c->current_segment_id > newest_write) { + newest_write = c->current_segment_id; + } + states[c->state]++; + } + + + snprintf(status, 4095, + "{\n\t\"free\": %d,\n" + "\t\"reserved\": %d,\n" + "\t\"wait\": %d,\n" + "\t\"writable\": %d,\n" + "\t\"busy\": %d,\n" + "\t\"buffer_full\": %d,\n" + "\t\"current_read\": %d,\n" + "\t\"newest_write\": %d,\n" + "\t\"oldest_write\": %d\n" + "}\n", + states[FREE], + states[RESERVED], + states[WAIT], + states[WRITABLE], + states[BUSY], + states[BUFFER_FULL], + current_read, + newest_write, + oldest_write); +} diff --git a/publisher.h b/publisher.h new file mode 100644 index 0000000..97b745d --- /dev/null +++ b/publisher.h @@ -0,0 +1,170 @@ +/* + * This file is part of FFmpeg. + * + * FFmpeg is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * FFmpeg is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with FFmpeg; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef PUBLISHER_H +#define PUBLISHER_H + +#include +#include +#include +#include "segment.h" +#include "httpd.h" + +#define MAX_CLIENTS 16 +#define MAX_SEGMENTS 16 +#define BUFFER_SEGMENTS 10 + +/* Client State enum */ + +enum State { + FREE, // no client connected + RESERVED, // reserved for a client that just connected + WAIT, // up to date, no new Segments to write + WRITABLE, // buffer is not full, new Segments can be pushed + BUSY, // currently writing to this client + BUFFER_FULL, // client buffer is full, new Segments will be dropped + STATE_NB +}; + + +/* struct containing server and client info per client AVIOContext */ + +struct FFServerInfo { + struct HTTPDInterface *httpd; + void *server; + struct HTTPClient *client; +}; + + +struct Client { + AVFormatContext *ofmt_ctx; // writable AVFormatContext, basically our tcp connection to the client + AVFifoBuffer *buffer; // Client buffer of Segment references + char *method; + char *resource; + struct FFServerInfo *ffinfo; + enum State state; + pthread_mutex_t buffer_lock; + pthread_mutex_t state_lock; + int id; + int current_segment_id; // The stream-based id of the segment that has last been worked on. +}; + +struct PublisherContext { + struct Client clients[MAX_CLIENTS]; // currently compile-time configuration, easly made dynamic with malloc? + AVFifoBuffer *buffer; // publisher buffer for new Segments + AVFifoBuffer *fs_buffer; // fast start buffer + pthread_mutex_t buffer_lock; + pthread_mutex_t fs_buffer_lock; + int nb_threads; + int current_segment_id; + int shutdown; // indicate shutdown, gracefully close client connections and files and exit +}; + +/** + * Log a client's stats to the console. + * + * @param c pointer to the client to print + */ +void client_log(struct Client *c); + +/** + * Disconnect a client. + * + * @param c pointer to the client to disconnect. + */ +void client_disconnect(struct Client *c, int write_trailer); + +/** + * Set a client's state. Note: This is protected by mutex locks. + * + * @param c pointer to the client to set the state of + * @param state the state to set the client to + */ +void client_set_state(struct Client *c, enum State state); + +/** + * Allocate and initialize a PublisherContext + * + * @param pub pointer to a pointer to a PublisherContext. It will be allocated and initialized. + */ +void publisher_init(struct PublisherContext **pub); + +/** + * Push a Segment to a PublisherContext. + * + * @param pub pointer to a PublisherContext + * @param seg pointer to the Segment to add + */ +void publisher_push_segment(struct PublisherContext *pub, struct Segment *seg); + +/** + * Reserve a slot in the client struct of a PublisherContext. May fail if the number + * of maximum clients has been reached. + * + * @param pub pointer to a PublisherContext + * @return 0 in case of success, 1 in case of failure + */ +int publisher_reserve_client(struct PublisherContext *pub); + +/** + * Cancel a single reservation. This can be used if a client spot was reserved, but the client + * unexpectedly disconnects or sends an invalid request. + * + * @param pub pointer to a PublisherContext + */ +void publisher_cancel_reserve(struct PublisherContext *pub); + +/** + * Add a client by its ofmt_ctx. This initializes an element in the client struct of the PublisherContext + * that has been reserved prior to calling this function. + * + * @param pub pointer to a PublisherContext + * @param ofmt_ctx AVFormatContext of a client + * @param ffinfo pointer to struct containing custom IO information for server independent write implementation + */ +void publisher_add_client(struct PublisherContext *pub, AVFormatContext *ofmt_ctx, struct FFServerInfo *ffinfo); + +/** + * Free buffers and associated client buffers. + * + * @param pub pointer to the PublisherContext to free + */ +void publisher_free(struct PublisherContext *pub); + +/** + * Free buffers and associated client buffers and set *pub to NULL. + * + * @param pub pointer to the PublisherContext pointer to free + */ +void publisher_freep(struct PublisherContext **pub); + +/** + * Signal to the PublisherContext to check its buffer and publish pending Segments. + * + * @param pub pointer to a PublisherContext + */ +void publish(struct PublisherContext *pub); + +/** + * Print the current client and file reading status to a json string. + * @param pub pointer to a PublisherContext + * @param status string of at least 4096 bytes size. + */ +void publisher_gen_status_json(struct PublisherContext *pub, char *status); + +#endif // PUBLISHER_H