diff mbox series

[FFmpeg-devel] avformat/libsrt: data transmission bitrate control

Message ID DB9PR10MB46199E1CD917EF76BAEDDD3CC2D49@DB9PR10MB4619.EURPRD10.PROD.OUTLOOK.COM
State New
Headers show
Series [FFmpeg-devel] avformat/libsrt: data transmission bitrate control | expand

Checks

Context Check Description
andriy/configurex86 warning Failed to apply patch
andriy/configureppc warning Failed to apply patch

Commit Message

Bartsevich, Dmitry Sept. 8, 2021, 11:37 a.m. UTC
The patch adds 3 parameters ("bitrate", "burst_bits", "fifo_size") and output bitrate control to the libsrt muxer. The code is mostly taken from udp.c and the reasoning is the same: data transmission bursts cause decoding errors on some decoders. Windows-specific APIs (performance counters and waitable timers) are used instead of standard FFmpeg routines to measure time intervals and to delay thread execution in Windows build: standard ones don't provide sub-millisecond precision and accuracy which are required for smooth outbound traffic.

Muxer URL would look like this:
"srt://10.10.10.10:12345?mode=caller&bitrate=15000000&burst_bits=150000"

Signed-off-by: Dmitry Bartsevich <bartsevich@scnsoft.com>
---
libavformat/libsrt.c | 275 +++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 275 insertions(+)

#include "url.h"
+#if HAVE_W32THREADS
+#undef HAVE_PTHREAD_CANCEL
+#define HAVE_PTHREAD_CANCEL 1
+#endif
+
+#if HAVE_PTHREAD_CANCEL
+#include "libavutil/thread.h"
+#endif
+
/* This is for MPEG-TS and it's a default SRTO_PAYLOADSIZE for SRTT_LIVE (8 TS packets) */
#ifndef SRT_LIVE_DEFAULT_PAYLOAD_SIZE
#define SRT_LIVE_DEFAULT_PAYLOAD_SIZE 1316
@@ -90,6 +102,21 @@ typedef struct SRTContext {
     SRT_TRANSTYPE transtype;
     int linger;
     int tsbpd;
+
+    /* Circular Buffer variables for use in SRT sending code */
+    int circular_buffer_size;
+    AVFifoBuffer *fifo;
+    int circular_buffer_error;
+    int64_t bitrate; /* number of bits to send per second */
+    int64_t burst_bits;
+    int close_req;
+#if HAVE_PTHREAD_CANCEL
+    pthread_t circular_buffer_thread;
+    pthread_mutex_t mutex;
+    pthread_cond_t cond;
+    int thread_started;
+#endif
+    uint8_t tmp[SRT_LIVE_MAX_PAYLOAD_SIZE+4];
} SRTContext;
 #define D AV_OPT_FLAG_DECODING_PARAM
@@ -142,6 +169,9 @@ static const AVOption libsrt_options[] = {
     { "file",           NULL, 0, AV_OPT_TYPE_CONST,  { .i64 = SRTT_FILE }, INT_MIN, INT_MAX, .flags = D|E, "transtype" },
     { "linger",         "Number of seconds that the socket waits for unsent data when closing", OFFSET(linger),           AV_OPT_TYPE_INT,      { .i64 = -1 }, -1, INT_MAX,   .flags = D|E },
     { "tsbpd",          "Timestamp-based packet delivery",                                      OFFSET(tsbpd),            AV_OPT_TYPE_BOOL,     { .i64 = -1 }, -1, 1,         .flags = D|E },
+    { "bitrate",        "Bits to send per second",                                              OFFSET(bitrate),          AV_OPT_TYPE_INT64,    { .i64 = 0  },  0, INT64_MAX, .flags = E },
+    { "burst_bits",     "Max length of bursts in bits (when using bitrate)",                    OFFSET(burst_bits),       AV_OPT_TYPE_INT64,    { .i64 = 0  },  0, INT64_MAX, .flags = E },
+    { "fifo_size",      "set the SRT sending circular buffer size, expressed as a number of packets with size of 188 bytes", OFFSET(circular_buffer_size), AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX, .flags = E },
     { NULL }
};
@@ -165,6 +195,138 @@ static int libsrt_socket_nonblock(int socket, int enable)
     return srt_setsockopt(socket, 0, SRTO_RCVSYN, &blocking, sizeof(blocking));
}
+#if HAVE_PTHREAD_CANCEL
+
+/* More precise time measurement in Windows, call default routine otherwise */
+static int64_t av_gettime_relative_precise(void)
+{
+#ifdef _WIN32
+    static LARGE_INTEGER freq;
+    LARGE_INTEGER t;
+
+    if (freq.QuadPart == 0) {
+        QueryPerformanceFrequency(&freq);
+    }
+
+    QueryPerformanceCounter(&t);
+    return t.QuadPart * 1000000 / freq.QuadPart;
+#else
+    return av_gettime_relative();
+#endif
+}
+
+static void *circular_buffer_task_tx(void *_URLContext)
+{
+    URLContext *h = _URLContext;
+    SRTContext *s = h->priv_data;
+    int old_cancelstate;
+    int64_t target_timestamp = av_gettime_relative_precise();
+    int64_t start_timestamp = av_gettime_relative_precise();
+    int64_t sent_bits = 0;
+    int64_t burst_interval = s->bitrate ? (s->burst_bits * 1000000 / s->bitrate) : 0;
+    int64_t max_delay = s->bitrate ? ((int64_t)h->max_packet_size * 8 * 1000000 / s->bitrate + 1) : 0;
+#ifdef _WIN32
+    /* Use waitable timers to delay thread execution in Windows, as default
+       Sleep() API call has 1ms resolution and is not accurate enough */
+    LARGE_INTEGER timeout;
+    HANDLE waitable_timer = CreateWaitableTimer(NULL, FALSE, NULL);
+#endif
+
+    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
+    pthread_mutex_lock(&s->mutex);
+
+    if (libsrt_socket_nonblock(s->fd, 0) < 0) {
+        av_log(h, AV_LOG_ERROR, "Failed to set blocking mode");
+        s->circular_buffer_error = AVERROR(EIO);
+        goto end;
+    }
+
+    for(;;) {
+        int len;
+        const uint8_t *p;
+        uint8_t tmp[4];
+        int64_t timestamp;
+
+        len = av_fifo_size(s->fifo);
+
+        while (len<4) {
+            if (s->close_req)
+                goto end;
+            pthread_cond_wait(&s->cond, &s->mutex);
+            len = av_fifo_size(s->fifo);
+        }
+
+        av_fifo_generic_read(s->fifo, tmp, 4, NULL);
+        len = AV_RL32(tmp);
+
+        av_assert0(len >= 0);
+        av_assert0(len <= sizeof(s->tmp));
+
+        av_fifo_generic_read(s->fifo, s->tmp, len, NULL);
+
+        pthread_mutex_unlock(&s->mutex);
+        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
+
+        if (s->bitrate) {
+            timestamp = av_gettime_relative_precise();
+            if (timestamp < target_timestamp) {
+                int64_t delay = target_timestamp - timestamp;
+                if (delay > max_delay) {
+                    delay = max_delay;
+                    start_timestamp = timestamp + delay;
+                    sent_bits = 0;
+                }
+#ifdef _WIN32
+                /* Relative waitable timer delay in 100-nanosecond units */
+                timeout.QuadPart = -delay * 10;
+                if (SetWaitableTimer(waitable_timer, &timeout, NULL, NULL, NULL, FALSE))
+                    WaitForSingleObject(waitable_timer, INFINITE);
+#else
+                av_usleep(delay);
+#endif
+            } else {
+                if (timestamp - burst_interval > target_timestamp) {
+                    start_timestamp = timestamp - burst_interval;
+                    sent_bits = 0;
+                }
+            }
+            sent_bits += len * 8;
+            target_timestamp = start_timestamp + sent_bits * 1000000 / s->bitrate;
+        }
+
+        p = s->tmp;
+        while (len) {
+            int ret;
+            av_assert0(len > 0);
+            ret = srt_sendmsg(s->fd, p, len, -1, 0);
+            if (ret >= 0) {
+                len -= ret;
+                p   += ret;
+            } else {
+                ret = ff_neterrno();
+                if (ret != AVERROR(EAGAIN) && ret != AVERROR(EINTR)) {
+                    pthread_mutex_lock(&s->mutex);
+                    s->circular_buffer_error = ret;
+                    pthread_mutex_unlock(&s->mutex);
+                    return NULL;
+                }
+            }
+        }
+
+        pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
+        pthread_mutex_lock(&s->mutex);
+    }
+
+end:
+    pthread_mutex_unlock(&s->mutex);
+#ifdef _WIN32
+    CloseHandle(waitable_timer);
+#endif
+    return NULL;
+}
+
+#endif
+
static int libsrt_epoll_create(URLContext *h, int fd, int write)
{
     int modes = SRT_EPOLL_ERR | (write ? SRT_EPOLL_OUT : SRT_EPOLL_IN);
@@ -379,6 +541,7 @@ static int libsrt_setup(URLContext *h, const char *uri, int flags)
     char portstr[10];
     int64_t open_timeout = 0;
     int eid, write_eid;
+    int is_output = (flags & AVIO_FLAG_WRITE) != 0;
     av_url_split(proto, sizeof(proto), NULL, 0, hostname, sizeof(hostname),
         &port, path, sizeof(path), uri);
@@ -490,9 +653,55 @@ static int libsrt_setup(URLContext *h, const char *uri, int flags)
     s->fd = fd;
     s->eid = eid;
+#if HAVE_PTHREAD_CANCEL
+    /*
+      Create thread in case of:
+      output and bitrate and circular_buffer_size is set
+    */
+
+    if (is_output && s->bitrate && !s->circular_buffer_size) {
+        /* Warn user in case of 'circular_buffer_size' is not set */
+        av_log(h, AV_LOG_WARNING,"'bitrate' option was set but 'circular_buffer_size' is not, but required\n");
+    }
+
+    if (is_output && s->bitrate && s->circular_buffer_size) {
+        /* start the task going */
+        s->fifo = av_fifo_alloc(s->circular_buffer_size);
+        if (!s->fifo) {
+            ret = AVERROR(ENOMEM);
+            goto fail;
+        }
+        ret = pthread_mutex_init(&s->mutex, NULL);
+        if (ret != 0) {
+            av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret));
+            ret = AVERROR(ret);
+            goto fail;
+        }
+        ret = pthread_cond_init(&s->cond, NULL);
+        if (ret != 0) {
+            av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
+            ret = AVERROR(ret);
+            goto cond_fail;
+        }
+        ret = pthread_create(&s->circular_buffer_thread, NULL, circular_buffer_task_tx, h);
+        if (ret != 0) {
+            av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
+            ret = AVERROR(ret);
+            goto thread_fail;
+        }
+        s->thread_started = 1;
+    }
+#endif
+
     freeaddrinfo(ai);
     return 0;
+#if HAVE_PTHREAD_CANCEL
+ thread_fail:
+    pthread_cond_destroy(&s->cond);
+ cond_fail:
+    pthread_mutex_destroy(&s->mutex);
+#endif
  fail:
     if (cur_ai->ai_next) {
         /* Retry with the next sockaddr */
@@ -643,7 +852,25 @@ static int libsrt_open(URLContext *h, const char *uri, int flags)
         if (av_find_info_tag(buf, sizeof(buf), "linger", p)) {
             s->linger = strtol(buf, NULL, 10);
         }
+        if (av_find_info_tag(buf, sizeof(buf), "fifo_size", p)) {
+            s->circular_buffer_size = strtol(buf, NULL, 10);
+            if (!HAVE_PTHREAD_CANCEL)
+                av_log(h, AV_LOG_WARNING,
+                       "'circular_buffer_size' option was set but it is not supported "
+                       "on this build (pthread support is required)\n");
+        }
+        if (av_find_info_tag(buf, sizeof(buf), "bitrate", p)) {
+            s->bitrate = strtoll(buf, NULL, 10);
+            if (!HAVE_PTHREAD_CANCEL)
+                av_log(h, AV_LOG_WARNING,
+                       "'bitrate' option was set but it is not supported "
+                       "on this build (pthread support is required)\n");
+        }
+        if (av_find_info_tag(buf, sizeof(buf), "burst_bits", p)) {
+            s->burst_bits = strtoll(buf, NULL, 10);
+        }
     }
+    s->circular_buffer_size *= 188;
     ret = libsrt_setup(h, uri, flags);
     if (ret < 0)
         goto err;
@@ -680,6 +907,36 @@ static int libsrt_write(URLContext *h, const uint8_t *buf, int size)
     SRTContext *s = h->priv_data;
     int ret;
+#if HAVE_PTHREAD_CANCEL
+    if (s->fifo) {
+        uint8_t tmp[4];
+
+        pthread_mutex_lock(&s->mutex);
+
+        /*
+          Return error if last tx failed.
+          Here we can't know on which packet error was, but it needs to know that error exists.
+        */
+        if (s->circular_buffer_error < 0) {
+            int err = s->circular_buffer_error;
+            pthread_mutex_unlock(&s->mutex);
+            return err;
+        }
+
+        if(av_fifo_space(s->fifo) < size + 4) {
+            /* What about a partial packet tx ? */
+            pthread_mutex_unlock(&s->mutex);
+            return AVERROR(ENOMEM);
+        }
+        AV_WL32(tmp, size);
+        av_fifo_generic_write(s->fifo, tmp, 4, NULL); /* size of packet */
+        av_fifo_generic_write(s->fifo, (uint8_t *)buf, size, NULL); /* the data */
+        pthread_cond_signal(&s->cond);
+        pthread_mutex_unlock(&s->mutex);
+        return size;
+    }
+#endif
+
     if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
         ret = libsrt_network_wait_fd_timeout(h, s->eid, 1, h->rw_timeout, &h->interrupt_callback);
         if (ret)
@@ -698,6 +955,24 @@ static int libsrt_close(URLContext *h)
{
     SRTContext *s = h->priv_data;
+#if HAVE_PTHREAD_CANCEL
+    // Request close once writing is finished
+    if (s->thread_started) {
+        int ret;
+
+        pthread_mutex_lock(&s->mutex);
+        s->close_req = 1;
+        pthread_cond_signal(&s->cond);
+        pthread_mutex_unlock(&s->mutex);
+
+        ret = pthread_join(s->circular_buffer_thread, NULL);
+        if (ret != 0)
+            av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));
+        pthread_mutex_destroy(&s->mutex);
+        pthread_cond_destroy(&s->cond);
+    }
+#endif
+
     srt_epoll_release(s->eid);
     srt_close(s->fd);
--
2.16.1.windows.4

Comments

Zhao Zhili Sept. 8, 2021, 2:59 p.m. UTC | #1
> On Sep 8, 2021, at 7:37 PM, Bartsevich, Dmitry <Bartsevich@scnsoft.com> wrote:
> 
> The patch adds 3 parameters ("bitrate", "burst_bits", "fifo_size") and output bitrate control to the libsrt muxer. The code is mostly taken from udp.c and the reasoning is the same: data transmission bursts cause decoding errors on some decoders. Windows-specific APIs (performance counters and waitable timers) are used instead of standard FFmpeg routines to measure time intervals and to delay thread execution in Windows build: standard ones don't provide sub-millisecond precision and accuracy which are required for smooth outbound traffic.
> 
> Muxer URL would look like this:
> "srt://10.10.10.10:12345?mode=caller&bitrate=15000000&burst_bits=150000"

No, this is reinvent pacing control which libsrt has builtin implementation and doing better.
Have a look at SRTO_MAXBW, SRTO_INPUTBW and SRTO_OHEADBW options.
SRTO_MAXBW has a default value 1 Gbps which is too high for regular video bitrate and
network condition.

> 
> Signed-off-by: Dmitry Bartsevich <bartsevich@scnsoft.com>
> ---
> libavformat/libsrt.c | 275 +++++++++++++++++++++++++++++++++++++++++++++++++++
> 1 file changed, 275 insertions(+)
> 
> diff --git a/libavformat/libsrt.c b/libavformat/libsrt.c
> index e5701625b8..0a00277d80 100644
> --- a/libavformat/libsrt.c
> +++ b/libavformat/libsrt.c
> @@ -23,6 +23,9 @@
> #include <srt/srt.h>
> +#include "libavutil/avassert.h"
> +#include "libavutil/fifo.h"
> +#include "libavutil/intreadwrite.h"
> #include "libavutil/opt.h"
> #include "libavutil/parseutils.h"
> #include "libavutil/time.h"
> @@ -33,6 +36,15 @@
> #include "os_support.h"
> #include "url.h"
> +#if HAVE_W32THREADS
> +#undef HAVE_PTHREAD_CANCEL
> +#define HAVE_PTHREAD_CANCEL 1
> +#endif
> +
> +#if HAVE_PTHREAD_CANCEL
> +#include "libavutil/thread.h"
> +#endif
> +
> /* This is for MPEG-TS and it's a default SRTO_PAYLOADSIZE for SRTT_LIVE (8 TS packets) */
> #ifndef SRT_LIVE_DEFAULT_PAYLOAD_SIZE
> #define SRT_LIVE_DEFAULT_PAYLOAD_SIZE 1316
> @@ -90,6 +102,21 @@ typedef struct SRTContext {
>     SRT_TRANSTYPE transtype;
>     int linger;
>     int tsbpd;
> +
> +    /* Circular Buffer variables for use in SRT sending code */
> +    int circular_buffer_size;
> +    AVFifoBuffer *fifo;
> +    int circular_buffer_error;
> +    int64_t bitrate; /* number of bits to send per second */
> +    int64_t burst_bits;
> +    int close_req;
> +#if HAVE_PTHREAD_CANCEL
> +    pthread_t circular_buffer_thread;
> +    pthread_mutex_t mutex;
> +    pthread_cond_t cond;
> +    int thread_started;
> +#endif
> +    uint8_t tmp[SRT_LIVE_MAX_PAYLOAD_SIZE+4];
> } SRTContext;
> #define D AV_OPT_FLAG_DECODING_PARAM
> @@ -142,6 +169,9 @@ static const AVOption libsrt_options[] = {
>     { "file",           NULL, 0, AV_OPT_TYPE_CONST,  { .i64 = SRTT_FILE }, INT_MIN, INT_MAX, .flags = D|E, "transtype" },
>     { "linger",         "Number of seconds that the socket waits for unsent data when closing", OFFSET(linger),           AV_OPT_TYPE_INT,      { .i64 = -1 }, -1, INT_MAX,   .flags = D|E },
>     { "tsbpd",          "Timestamp-based packet delivery",                                      OFFSET(tsbpd),            AV_OPT_TYPE_BOOL,     { .i64 = -1 }, -1, 1,         .flags = D|E },
> +    { "bitrate",        "Bits to send per second",                                              OFFSET(bitrate),          AV_OPT_TYPE_INT64,    { .i64 = 0  },  0, INT64_MAX, .flags = E },
> +    { "burst_bits",     "Max length of bursts in bits (when using bitrate)",                    OFFSET(burst_bits),       AV_OPT_TYPE_INT64,    { .i64 = 0  },  0, INT64_MAX, .flags = E },
> +    { "fifo_size",      "set the SRT sending circular buffer size, expressed as a number of packets with size of 188 bytes", OFFSET(circular_buffer_size), AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX, .flags = E },
>     { NULL }
> };
> @@ -165,6 +195,138 @@ static int libsrt_socket_nonblock(int socket, int enable)
>     return srt_setsockopt(socket, 0, SRTO_RCVSYN, &blocking, sizeof(blocking));
> }
> +#if HAVE_PTHREAD_CANCEL
> +
> +/* More precise time measurement in Windows, call default routine otherwise */
> +static int64_t av_gettime_relative_precise(void)
> +{
> +#ifdef _WIN32
> +    static LARGE_INTEGER freq;
> +    LARGE_INTEGER t;
> +
> +    if (freq.QuadPart == 0) {
> +        QueryPerformanceFrequency(&freq);
> +    }
> +
> +    QueryPerformanceCounter(&t);
> +    return t.QuadPart * 1000000 / freq.QuadPart;
> +#else
> +    return av_gettime_relative();
> +#endif
> +}
> +
> +static void *circular_buffer_task_tx(void *_URLContext)
> +{
> +    URLContext *h = _URLContext;
> +    SRTContext *s = h->priv_data;
> +    int old_cancelstate;
> +    int64_t target_timestamp = av_gettime_relative_precise();
> +    int64_t start_timestamp = av_gettime_relative_precise();
> +    int64_t sent_bits = 0;
> +    int64_t burst_interval = s->bitrate ? (s->burst_bits * 1000000 / s->bitrate) : 0;
> +    int64_t max_delay = s->bitrate ? ((int64_t)h->max_packet_size * 8 * 1000000 / s->bitrate + 1) : 0;
> +#ifdef _WIN32
> +    /* Use waitable timers to delay thread execution in Windows, as default
> +       Sleep() API call has 1ms resolution and is not accurate enough */
> +    LARGE_INTEGER timeout;
> +    HANDLE waitable_timer = CreateWaitableTimer(NULL, FALSE, NULL);
> +#endif
> +
> +    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
> +    pthread_mutex_lock(&s->mutex);
> +
> +    if (libsrt_socket_nonblock(s->fd, 0) < 0) {
> +        av_log(h, AV_LOG_ERROR, "Failed to set blocking mode");
> +        s->circular_buffer_error = AVERROR(EIO);
> +        goto end;
> +    }
> +
> +    for(;;) {
> +        int len;
> +        const uint8_t *p;
> +        uint8_t tmp[4];
> +        int64_t timestamp;
> +
> +        len = av_fifo_size(s->fifo);
> +
> +        while (len<4) {
> +            if (s->close_req)
> +                goto end;
> +            pthread_cond_wait(&s->cond, &s->mutex);
> +            len = av_fifo_size(s->fifo);
> +        }
> +
> +        av_fifo_generic_read(s->fifo, tmp, 4, NULL);
> +        len = AV_RL32(tmp);
> +
> +        av_assert0(len >= 0);
> +        av_assert0(len <= sizeof(s->tmp));
> +
> +        av_fifo_generic_read(s->fifo, s->tmp, len, NULL);
> +
> +        pthread_mutex_unlock(&s->mutex);
> +        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
> +
> +        if (s->bitrate) {
> +            timestamp = av_gettime_relative_precise();
> +            if (timestamp < target_timestamp) {
> +                int64_t delay = target_timestamp - timestamp;
> +                if (delay > max_delay) {
> +                    delay = max_delay;
> +                    start_timestamp = timestamp + delay;
> +                    sent_bits = 0;
> +                }
> +#ifdef _WIN32
> +                /* Relative waitable timer delay in 100-nanosecond units */
> +                timeout.QuadPart = -delay * 10;
> +                if (SetWaitableTimer(waitable_timer, &timeout, NULL, NULL, NULL, FALSE))
> +                    WaitForSingleObject(waitable_timer, INFINITE);
> +#else
> +                av_usleep(delay);
> +#endif
> +            } else {
> +                if (timestamp - burst_interval > target_timestamp) {
> +                    start_timestamp = timestamp - burst_interval;
> +                    sent_bits = 0;
> +                }
> +            }
> +            sent_bits += len * 8;
> +            target_timestamp = start_timestamp + sent_bits * 1000000 / s->bitrate;
> +        }
> +
> +        p = s->tmp;
> +        while (len) {
> +            int ret;
> +            av_assert0(len > 0);
> +            ret = srt_sendmsg(s->fd, p, len, -1, 0);
> +            if (ret >= 0) {
> +                len -= ret;
> +                p   += ret;
> +            } else {
> +                ret = ff_neterrno();
> +                if (ret != AVERROR(EAGAIN) && ret != AVERROR(EINTR)) {
> +                    pthread_mutex_lock(&s->mutex);
> +                    s->circular_buffer_error = ret;
> +                    pthread_mutex_unlock(&s->mutex);
> +                    return NULL;
> +                }
> +            }
> +        }
> +
> +        pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
> +        pthread_mutex_lock(&s->mutex);
> +    }
> +
> +end:
> +    pthread_mutex_unlock(&s->mutex);
> +#ifdef _WIN32
> +    CloseHandle(waitable_timer);
> +#endif
> +    return NULL;
> +}
> +
> +#endif
> +
> static int libsrt_epoll_create(URLContext *h, int fd, int write)
> {
>     int modes = SRT_EPOLL_ERR | (write ? SRT_EPOLL_OUT : SRT_EPOLL_IN);
> @@ -379,6 +541,7 @@ static int libsrt_setup(URLContext *h, const char *uri, int flags)
>     char portstr[10];
>     int64_t open_timeout = 0;
>     int eid, write_eid;
> +    int is_output = (flags & AVIO_FLAG_WRITE) != 0;
>     av_url_split(proto, sizeof(proto), NULL, 0, hostname, sizeof(hostname),
>         &port, path, sizeof(path), uri);
> @@ -490,9 +653,55 @@ static int libsrt_setup(URLContext *h, const char *uri, int flags)
>     s->fd = fd;
>     s->eid = eid;
> +#if HAVE_PTHREAD_CANCEL
> +    /*
> +      Create thread in case of:
> +      output and bitrate and circular_buffer_size is set
> +    */
> +
> +    if (is_output && s->bitrate && !s->circular_buffer_size) {
> +        /* Warn user in case of 'circular_buffer_size' is not set */
> +        av_log(h, AV_LOG_WARNING,"'bitrate' option was set but 'circular_buffer_size' is not, but required\n");
> +    }
> +
> +    if (is_output && s->bitrate && s->circular_buffer_size) {
> +        /* start the task going */
> +        s->fifo = av_fifo_alloc(s->circular_buffer_size);
> +        if (!s->fifo) {
> +            ret = AVERROR(ENOMEM);
> +            goto fail;
> +        }
> +        ret = pthread_mutex_init(&s->mutex, NULL);
> +        if (ret != 0) {
> +            av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret));
> +            ret = AVERROR(ret);
> +            goto fail;
> +        }
> +        ret = pthread_cond_init(&s->cond, NULL);
> +        if (ret != 0) {
> +            av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
> +            ret = AVERROR(ret);
> +            goto cond_fail;
> +        }
> +        ret = pthread_create(&s->circular_buffer_thread, NULL, circular_buffer_task_tx, h);
> +        if (ret != 0) {
> +            av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
> +            ret = AVERROR(ret);
> +            goto thread_fail;
> +        }
> +        s->thread_started = 1;
> +    }
> +#endif
> +
>     freeaddrinfo(ai);
>     return 0;
> +#if HAVE_PTHREAD_CANCEL
> + thread_fail:
> +    pthread_cond_destroy(&s->cond);
> + cond_fail:
> +    pthread_mutex_destroy(&s->mutex);
> +#endif
>  fail:
>     if (cur_ai->ai_next) {
>         /* Retry with the next sockaddr */
> @@ -643,7 +852,25 @@ static int libsrt_open(URLContext *h, const char *uri, int flags)
>         if (av_find_info_tag(buf, sizeof(buf), "linger", p)) {
>             s->linger = strtol(buf, NULL, 10);
>         }
> +        if (av_find_info_tag(buf, sizeof(buf), "fifo_size", p)) {
> +            s->circular_buffer_size = strtol(buf, NULL, 10);
> +            if (!HAVE_PTHREAD_CANCEL)
> +                av_log(h, AV_LOG_WARNING,
> +                       "'circular_buffer_size' option was set but it is not supported "
> +                       "on this build (pthread support is required)\n");
> +        }
> +        if (av_find_info_tag(buf, sizeof(buf), "bitrate", p)) {
> +            s->bitrate = strtoll(buf, NULL, 10);
> +            if (!HAVE_PTHREAD_CANCEL)
> +                av_log(h, AV_LOG_WARNING,
> +                       "'bitrate' option was set but it is not supported "
> +                       "on this build (pthread support is required)\n");
> +        }
> +        if (av_find_info_tag(buf, sizeof(buf), "burst_bits", p)) {
> +            s->burst_bits = strtoll(buf, NULL, 10);
> +        }
>     }
> +    s->circular_buffer_size *= 188;
>     ret = libsrt_setup(h, uri, flags);
>     if (ret < 0)
>         goto err;
> @@ -680,6 +907,36 @@ static int libsrt_write(URLContext *h, const uint8_t *buf, int size)
>     SRTContext *s = h->priv_data;
>     int ret;
> +#if HAVE_PTHREAD_CANCEL
> +    if (s->fifo) {
> +        uint8_t tmp[4];
> +
> +        pthread_mutex_lock(&s->mutex);
> +
> +        /*
> +          Return error if last tx failed.
> +          Here we can't know on which packet error was, but it needs to know that error exists.
> +        */
> +        if (s->circular_buffer_error < 0) {
> +            int err = s->circular_buffer_error;
> +            pthread_mutex_unlock(&s->mutex);
> +            return err;
> +        }
> +
> +        if(av_fifo_space(s->fifo) < size + 4) {
> +            /* What about a partial packet tx ? */
> +            pthread_mutex_unlock(&s->mutex);
> +            return AVERROR(ENOMEM);
> +        }
> +        AV_WL32(tmp, size);
> +        av_fifo_generic_write(s->fifo, tmp, 4, NULL); /* size of packet */
> +        av_fifo_generic_write(s->fifo, (uint8_t *)buf, size, NULL); /* the data */
> +        pthread_cond_signal(&s->cond);
> +        pthread_mutex_unlock(&s->mutex);
> +        return size;
> +    }
> +#endif
> +
>     if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
>         ret = libsrt_network_wait_fd_timeout(h, s->eid, 1, h->rw_timeout, &h->interrupt_callback);
>         if (ret)
> @@ -698,6 +955,24 @@ static int libsrt_close(URLContext *h)
> {
>     SRTContext *s = h->priv_data;
> +#if HAVE_PTHREAD_CANCEL
> +    // Request close once writing is finished
> +    if (s->thread_started) {
> +        int ret;
> +
> +        pthread_mutex_lock(&s->mutex);
> +        s->close_req = 1;
> +        pthread_cond_signal(&s->cond);
> +        pthread_mutex_unlock(&s->mutex);
> +
> +        ret = pthread_join(s->circular_buffer_thread, NULL);
> +        if (ret != 0)
> +            av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));
> +        pthread_mutex_destroy(&s->mutex);
> +        pthread_cond_destroy(&s->cond);
> +    }
> +#endif
> +
>     srt_epoll_release(s->eid);
>     srt_close(s->fd);
> --
> 2.16.1.windows.4
> 
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel@ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
> 
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
>
diff mbox series

Patch

diff --git a/libavformat/libsrt.c b/libavformat/libsrt.c
index e5701625b8..0a00277d80 100644
--- a/libavformat/libsrt.c
+++ b/libavformat/libsrt.c
@@ -23,6 +23,9 @@ 
 #include <srt/srt.h>
+#include "libavutil/avassert.h"
+#include "libavutil/fifo.h"
+#include "libavutil/intreadwrite.h"
#include "libavutil/opt.h"
#include "libavutil/parseutils.h"
#include "libavutil/time.h"
@@ -33,6 +36,15 @@ 
#include "os_support.h"