From patchwork Tue Sep 28 08:22:40 2021 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Gijs Peskens X-Patchwork-Id: 30625 Delivered-To: ffmpegpatchwork2@gmail.com Received: by 2002:a6b:6506:0:0:0:0:0 with SMTP id z6csp4962304iob; Tue, 28 Sep 2021 01:23:08 -0700 (PDT) X-Google-Smtp-Source: ABdhPJzkeoH2UX6WF4+tbeSAlBQ3lWZR3sdHrhTyQZczBeLc96jRRDwgrpuGD/i2FbExi4WDV82G X-Received: by 2002:a50:cf86:: with SMTP id h6mr6038779edk.104.1632817387956; Tue, 28 Sep 2021 01:23:07 -0700 (PDT) ARC-Seal: i=1; a=rsa-sha256; t=1632817387; cv=none; d=google.com; s=arc-20160816; b=wdAAvAoeJmBVI5K7ARNR1KvhqSzUJ5UzJAl0pjUGynNHLmeG3xMgGO7COseaZ2ciWb XQ51x5WgOSYURRht72g/1XZM1aGyu/la2ZBe+/kxfRUB3zlDKMwK0oDX2oUJCcOOSyc8 xushEnF6mZ0owlo3E2fcRwUKVkbIQBLiB/IiQoHqQNa39ShwwsE94uUGjEPWYpJVANLZ A7S6GDgOrNn/AaIWEzoiqKr5UKXwT9IRWjPJ9/x8vzEd3kFiafIMINUol17UuapB7ADo 0kwRstF01EXSfXAPVmaS9Kb2EmNQHFlbkeijiu8JoVh+vhBqNuGrqI2XwNmPLM9+M2b4 lu4g== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:content-transfer-encoding:reply-to:list-subscribe :list-help:list-post:list-archive:list-unsubscribe:list-id :precedence:subject:mime-version:references:in-reply-to:message-id :date:to:from:dkim-signature:delivered-to; bh=HjnTEnq2iHNKt73rWozenGaW0pocfyLlrJgh0QDJibc=; b=INxDi11zqKD4rOTnPQqhXy3C7d+504TwYFFugOy2/Ra1BH6tS4WAb3NAGo+nkjykzH QRqR5qh+CpYAJio9yuBc1dVEXFQo1sqC1DZ9yvijujZagvlI6zKkrkW07gDSqDLumE+B DqJY+jdsYR2heGpAKeJ5kk8t/2fNR3LGGvrZUrn9PbvJUMVJaE06BquKdFqdu0uX9NEM MVXWrZTlkNIpiaCRs0acKvSmcyjv6PEb7z4hcgYpowLEmwnMiA5IeWIU/sk4rtCOt93e 2IubuWm3J7zp359fbip6VuK8QgZDuUobAFse1k+tLbIr564foRHzaWDyVKGJ0D1MRNWR rgbA== ARC-Authentication-Results: i=1; mx.google.com; dkim=neutral (body hash did not verify) header.i=@heteigenwijsje.nl header.s=dkim header.b=eMZVy7SK; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org; dmarc=fail (p=NONE sp=NONE dis=NONE) header.from=peskens.net Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org. [79.124.17.100]) by mx.google.com with ESMTP id z5si21196949ejl.639.2021.09.28.01.23.07; Tue, 28 Sep 2021 01:23:07 -0700 (PDT) Received-SPF: pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) client-ip=79.124.17.100; Authentication-Results: mx.google.com; dkim=neutral (body hash did not verify) header.i=@heteigenwijsje.nl header.s=dkim header.b=eMZVy7SK; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org; dmarc=fail (p=NONE sp=NONE dis=NONE) header.from=peskens.net Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 3C66D680A26; Tue, 28 Sep 2021 11:22:53 +0300 (EEST) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from smtp.heteigenwijsje.nl (smtp.heteigenwijsje.nl [80.127.116.100]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id 127896808EE for ; Tue, 28 Sep 2021 11:22:45 +0300 (EEST) Received: from mail.heteigenwijsje.nl (localhost [127.0.0.1]) by smtp.heteigenwijsje.nl (Postfix) with ESMTP id 0CAFE9C485 for ; Tue, 28 Sep 2021 10:18:04 +0200 (CEST) Authentication-Results: mail.heteigenwijsje.nl (amavisd-new); dkim=pass (1024-bit key) reason="pass (just generated, assumed good)" header.d=heteigenwijsje.nl DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d= heteigenwijsje.nl; h=content-transfer-encoding:mime-version :references:in-reply-to:x-mailer:message-id:date:date:subject :subject:to:from:from; s=dkim; t=1632817081; x=1635409082; bh=ep BmTO9epR1dUWv/4ZzVPRZ6nVEf+vcrkMDn0+wiTrk=; b=eMZVy7SKQtQUmuR7Ne wn+mZLgZVY2NUmwgHe6RhCwln7QFBOPrmAVD1fjgAQlTC+vZM0OQkdqxXky0+kSB oTtF3acxw+102+qZdzU2yXB2w0AcSfx8eivOxWMFFn7yMf3qc5x6DhVWAuKh4K7z ZiZv3cSaLXPcawP19ywTogx7o= X-Virus-Scanned: amavisd-new at mail.heteigenwijsje.nl Received: from smtp.heteigenwijsje.nl ([127.0.0.1]) by mail.heteigenwijsje.nl (mail.heteigenwijsje.nl [127.0.0.1]) (amavisd-new, port 10026) with ESMTP id ZYUVn8fUyhGm for ; Tue, 28 Sep 2021 10:18:01 +0200 (CEST) Received: from smtp.heteigenwijsje.nl (localhost [127.0.0.1]) by smtp.heteigenwijsje.nl (Postfix) with ESMTP id AC1FD9C405 for ; Tue, 28 Sep 2021 10:18:01 +0200 (CEST) Received: from unknown ([94.208.100.217]) by smtp.heteigenwijsje.nl with ESMTPSA id QHwzKbnPUmGEtAAAc3PRCQ (envelope-from ) for ; Tue, 28 Sep 2021 10:18:01 +0200 From: Gijs Peskens To: ffmpeg-devel@ffmpeg.org Date: Tue, 28 Sep 2021 10:22:40 +0200 Message-Id: <20210928082241.918233-2-gijs@peskens.net> X-Mailer: git-send-email 2.30.2 In-Reply-To: <20210928082241.918233-1-gijs@peskens.net> References: <20210928082241.918233-1-gijs@peskens.net> MIME-Version: 1.0 Subject: [FFmpeg-devel] [PATCH 2/3] librist: allow use of circular buffer for receiving. X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" X-TUID: njX/Bj/Cb13a libRIST internally stores packets in a fifo of 1024 packets, overwriting old packets when not read in a sufficient pace. Unfortunately this results in many fifo overflow errors when ffmpeg consumes a libRIST stream. This patch creates a receiver thread based on the UDP circular buffer code. Signed-off-by: Gijs Peskens --- libavformat/librist.c | 201 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 196 insertions(+), 5 deletions(-) diff --git a/libavformat/librist.c b/libavformat/librist.c index b120346f48..47c01a8432 100644 --- a/libavformat/librist.c +++ b/libavformat/librist.c @@ -26,6 +26,8 @@ #include "libavutil/opt.h" #include "libavutil/parseutils.h" #include "libavutil/time.h" +#include "libavutil/fifo.h" +#include "libavutil/intreadwrite.h" #include "avformat.h" #include "internal.h" @@ -33,6 +35,15 @@ #include "os_support.h" #include "url.h" +#if HAVE_W32THREADS +#undef HAVE_PTHREAD_CANCEL +#define HAVE_PTHREAD_CANCEL 1 +#endif + +#if HAVE_PTHREAD_CANCEL +#include "libavutil/thread.h" +#endif + #include #include // RIST_MAX_PACKET_SIZE - 28 minimum protocol overhead @@ -67,6 +78,19 @@ typedef struct RISTContext { struct rist_peer *peer; struct rist_ctx *ctx; + + int circular_buffer_size; + AVFifoBuffer *fifo; + int circular_buffer_error; + int overrun_nonfatal; + +#if HAVE_PTHREAD_CANCEL + pthread_t receiver_thread; + pthread_mutex_t mutex; + pthread_cond_t cond; + int thread_started; + int thread_stop; +#endif } RISTContext; #define D AV_OPT_FLAG_DECODING_PARAM @@ -82,6 +106,8 @@ static const AVOption librist_options[] = { { "log_level", "set loglevel", OFFSET(log_level), AV_OPT_TYPE_INT, {.i64=RIST_LOG_INFO}, -1, INT_MAX, .flags = D|E }, { "secret", "set encryption secret",OFFSET(secret), AV_OPT_TYPE_STRING,{.str=NULL}, 0, 0, .flags = D|E }, { "encryption","set encryption type",OFFSET(encryption), AV_OPT_TYPE_INT ,{.i64=0}, 0, INT_MAX, .flags = D|E }, + { "fifo_size", "set the receiving circular buffer size, expressed as a number of packets with size of 188 bytes, 0 to disable", OFFSET(circular_buffer_size), AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX, D }, + { "overrun_nonfatal", "survive in case of receiving circular buffer overrun", OFFSET(overrun_nonfatal), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, D }, { NULL } }; @@ -119,6 +145,15 @@ static int librist_close(URLContext *h) RISTContext *s = h->priv_data; int ret = 0; +#if HAVE_PTHREAD_CANCEL + if (s->thread_started) { + pthread_mutex_lock(&s->mutex); + s->thread_stop = 1; + pthread_mutex_unlock(&s->mutex); + pthread_join(s->receiver_thread, NULL); + } +#endif + av_fifo_freep(&s->fifo); s->peer = NULL; if (s->ctx) @@ -128,6 +163,78 @@ static int librist_close(URLContext *h) return risterr2ret(ret); } +static void *receiver_thread(void *_url_context) +{ + URLContext *h = _url_context; + RISTContext *s = h->priv_data; + int ret; + uint8_t tmp[4]; +#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41 + const struct rist_data_block *data_block; +#else + struct rist_data_block *data_block; +#endif + + while (1) + { + pthread_mutex_lock(&s->mutex); + if (s->thread_stop) + break; + pthread_mutex_unlock(&s->mutex); +#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41 + ret = rist_receiver_data_read(s->ctx, &data_block, POLLING_TIME); +#else + ret = rist_receiver_data_read2(s->ctx, &data_block, POLLING_TIME); +#endif + if (ret == 0) + continue; + + pthread_mutex_lock(&s->mutex); + if (ret < 0) { + s->circular_buffer_error = ret; + break; + } + + if (data_block->payload_len > MAX_PAYLOAD_SIZE) { +#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41 + rist_receiver_data_block_free((struct rist_data_block**)&data_block); +#else + rist_receiver_data_block_free2(&data_block); +#endif + s->circular_buffer_error = AVERROR_EXTERNAL; + break; + } + AV_WL32(tmp, data_block->payload_len); + if (av_fifo_space(s->fifo) < (data_block->payload_len +4)) + { + /* No Space left */ + if (s->overrun_nonfatal) { + av_log(h, AV_LOG_WARNING, "Circular buffer overrun. " + "Surviving due to overrun_nonfatal option\n"); + continue; + } else { + av_log(h, AV_LOG_ERROR, "Circular buffer overrun. " + "To avoid, increase fifo_size URL option. " + "To survive in such case, use overrun_nonfatal option\n"); + s->circular_buffer_error = AVERROR(EIO); + break; + } + } + av_fifo_generic_write(s->fifo, tmp, 4, NULL); + av_fifo_generic_write(s->fifo, (void*)data_block->payload, data_block->payload_len, NULL); + pthread_mutex_unlock(&s->mutex); + pthread_cond_signal(&s->cond); +#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41 + rist_receiver_data_block_free((struct rist_data_block**)&data_block); +#else + rist_receiver_data_block_free2(&data_block); +#endif + } + pthread_mutex_unlock(&s->mutex); + pthread_cond_signal(&s->cond); + return NULL; +} + static int librist_open(URLContext *h, const char *uri, int flags) { RISTContext *s = h->priv_data; @@ -194,27 +301,111 @@ static int librist_open(URLContext *h, const char *uri, int flags) if (ret < 0) goto err; + s->circular_buffer_size *= 188; + +#if HAVE_PTHREAD_CANCEL + //Create receiver thread if circular buffer size is set and we are receiving + if ((flags & AVIO_FLAG_READ) && s->circular_buffer_size > 0) { + /* start the task going */ + s->fifo = av_fifo_alloc(s->circular_buffer_size); + if (!s->fifo) { + ret = AVERROR(ENOMEM); + goto err; + } + ret = pthread_mutex_init(&s->mutex, NULL); + if (ret != 0) { + av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret)); + ret = AVERROR(ret); + goto err; + } + ret = pthread_cond_init(&s->cond, NULL); + if (ret != 0) { + av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret)); + ret = AVERROR(ret); + goto cond_fail; + } + ret = pthread_create(&s->receiver_thread, NULL, receiver_thread, h); + if (ret != 0) { + av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret)); + ret = AVERROR(ret); + goto thread_fail; + } + s->thread_started = 1; + } +#endif return 0; - +#if HAVE_PTHREAD_CANCEL + thread_fail: + pthread_cond_destroy(&s->cond); + cond_fail: + pthread_mutex_destroy(&s->mutex); +#endif err: librist_close(h); - + av_fifo_freep(&s->fifo); return risterr2ret(ret); } static int librist_read(URLContext *h, uint8_t *buf, int size) { RISTContext *s = h->priv_data; +#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41 + const struct rist_data_block *data_block; +#else + struct rist_data_block *data_block; +#endif int ret; +#if HAVE_PTHREAD_CANCEL + int avail, nonblock = h->flags & AVIO_FLAG_NONBLOCK; + + if (s->fifo) { + pthread_mutex_lock(&s->mutex); + do { + avail = av_fifo_size(s->fifo); + if (avail) { // >=size) { + uint8_t tmp[4]; + + av_fifo_generic_read(s->fifo, tmp, 4, NULL); + avail = AV_RL32(tmp); + if(avail > size){ + av_log(h, AV_LOG_WARNING, "Part of datagram lost due to insufficient buffer size\n"); + avail = size; + } + + av_fifo_generic_read(s->fifo, buf, avail, NULL); + av_fifo_drain(s->fifo, AV_RL32(tmp) - avail); + pthread_mutex_unlock(&s->mutex); + return avail; + } else if(s->circular_buffer_error){ + int err = s->circular_buffer_error; + pthread_mutex_unlock(&s->mutex); + return err; + } else if(nonblock) { + pthread_mutex_unlock(&s->mutex); + return AVERROR(EAGAIN); + } else { + /* FIXME: using the monotonic clock would be better, + but it does not exist on all supported platforms. */ + int64_t t = av_gettime() + 100000; + struct timespec tv = { .tv_sec = t / 1000000, + .tv_nsec = (t % 1000000) * 1000 }; + int err = pthread_cond_timedwait(&s->cond, &s->mutex, &tv); + if (err) { + pthread_mutex_unlock(&s->mutex); + return AVERROR(err == ETIMEDOUT ? EAGAIN : err); + } + nonblock = 1; + } + } while(1); + } +#endif + #if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41 - const struct rist_data_block *data_block; ret = rist_receiver_data_read(s->ctx, &data_block, POLLING_TIME); #else - struct rist_data_block *data_block; ret = rist_receiver_data_read2(s->ctx, &data_block, POLLING_TIME); #endif - if (ret < 0) return risterr2ret(ret);