diff mbox series

[FFmpeg-devel] avformat/libsrt: Adding support for multiple clients as a server

Message ID 20240919134646.3841-1-nicolas.dato@gmail.com
State New
Headers show
Series [FFmpeg-devel] avformat/libsrt: Adding support for multiple clients as a server | expand

Checks

Context Check Description
yinshiyou/make_loongarch64 success Make finished
yinshiyou/make_fate_loongarch64 success Make fate finished
andriy/make_x86 success Make finished
andriy/make_fate_x86 success Make fate finished

Commit Message

Nicolas Jorge Dato Sept. 19, 2024, 1:46 p.m. UTC
From: Nicolas Jorge Dato <nicolas.dato@gmail.com>

When in listener mode and writing, now libsrt supports multiple clients
configured with the max_clients parameter.

When max_clients=1 (default), it behaves as before.
When max_clientes > 1, after accepting the first client it launches
a thread to listen for more clients, and a launches a new thread for
each client.
---
 libavformat/libsrt.c | 297 ++++++++++++++++++++++++++++++++++++++-----
 1 file changed, 265 insertions(+), 32 deletions(-)

Comments

Zhao Zhili Sept. 23, 2024, 12:34 p.m. UTC | #1
> On Sep 19, 2024, at 21:46, nicolas.dato@gmail.com wrote:
> 
> From: Nicolas Jorge Dato <nicolas.dato@gmail.com>
> 
> When in listener mode and writing, now libsrt supports multiple clients
> configured with the max_clients parameter.
> 
> When max_clients=1 (default), it behaves as before.
> When max_clientes > 1, after accepting the first client it launches
> a thread to listen for more clients, and a launches a new thread for
> each client.
> ---
> libavformat/libsrt.c | 297 ++++++++++++++++++++++++++++++++++++++-----
> 1 file changed, 265 insertions(+), 32 deletions(-)
> 
> diff --git a/libavformat/libsrt.c b/libavformat/libsrt.c
> index 9e860abccd..c9392e6996 100644
> --- a/libavformat/libsrt.c
> +++ b/libavformat/libsrt.c
> @@ -21,8 +21,11 @@
>  * Haivision Open SRT (Secure Reliable Transport) protocol
>  */
> 
> +#include <pthread.h>
> +#include <semaphore.h>
> #include <srt/srt.h>

The implementation isn't portable.

Although FFmpeg has ffserver once upon a time, the IO in libavformat isn’t designed
to be used as a server. I'm afraid a real server (e.g, srs-simple realtime server)
is more appropriate than stretch libavformat.

> 
> +#include "libavutil/fifo.h"
> #include "libavutil/mem.h"
> #include "libavutil/opt.h"
> #include "libavutil/parseutils.h"
> @@ -51,10 +54,22 @@ enum SRTMode {
>     SRT_MODE_RENDEZVOUS = 2
> };
> 
> -typedef struct SRTContext {
> -    const AVClass *class;
> +typedef struct SRTClientContext {
> +    int set;
> +    int ended;
> +    URLContext *h;
> +    pthread_t thread;
> +    int payload_size;
>     int fd;
>     int eid;
> +    sem_t msg;
> +    AVFifo *fifo;
> +} SRTClientContext;
> +
> +typedef struct SRTContext {
> +    const AVClass *class;
> +    int *fd;
> +    int *eid;
>     int64_t rw_timeout;
>     int64_t listen_timeout;
>     int recv_buffer_size;
> @@ -93,6 +108,13 @@ typedef struct SRTContext {
>     SRT_TRANSTYPE transtype;
>     int linger;
>     int tsbpd;
> +    pthread_mutex_t accept_mutex;
> +    pthread_t accept_thread;
> +    int listen_fd;
> +    int listen_eid;
> +    SRTClientContext *client_context;
> +    int max_clients;
> +    int close_threads;
> } SRTContext;
> 
> #define D AV_OPT_FLAG_DECODING_PARAM
> @@ -146,6 +168,7 @@ static const AVOption libsrt_options[] = {
>     { "file",           NULL, 0, AV_OPT_TYPE_CONST,  { .i64 = SRTT_FILE }, INT_MIN, INT_MAX, .flags = D|E, .unit = "transtype" },
>     { "linger",         "Number of seconds that the socket waits for unsent data when closing", OFFSET(linger),           AV_OPT_TYPE_INT,      { .i64 = -1 }, -1, INT_MAX,   .flags = D|E },
>     { "tsbpd",          "Timestamp-based packet delivery",                                      OFFSET(tsbpd),            AV_OPT_TYPE_BOOL,     { .i64 = -1 }, -1, 1,         .flags = D|E },
> +    { "max_clients",    "Maximum simultaneous clients when mode=listener and writing packages", OFFSET(max_clients),      AV_OPT_TYPE_INT,      { .i64 = 1 },   1, INT_MAX,   .flags = E },
>     { NULL }
> };
> 
> @@ -235,13 +258,159 @@ static int libsrt_network_wait_fd_timeout(URLContext *h, int eid, int write, int
>     }
> }
> 
> -static int libsrt_listen(int eid, int fd, const struct sockaddr *addr, socklen_t addrlen, URLContext *h, int64_t timeout)
> +static int libsrt_accept(int fd, URLContext *h)
> {
>     int ret;
> -    int reuse = 1;
>     /* Max streamid length plus an extra space for the terminating null character */
>     char streamid[513];
>     int streamid_len = sizeof(streamid);
> +
> +    ret = srt_accept(fd, NULL, NULL);
> +    if (ret < 0)
> +        return libsrt_neterrno(h);
> +    if (libsrt_socket_nonblock(ret, 1) < 0)
> +        av_log(h, AV_LOG_DEBUG, "libsrt_socket_nonblock failed\n");
> +    if (!libsrt_getsockopt(h, ret, SRTO_STREAMID, "SRTO_STREAMID", streamid, &streamid_len))
> +        /* Note: returned streamid_len doesn't count the terminating null character */
> +        av_log(h, AV_LOG_VERBOSE, "accept streamid [%s], length %d\n", streamid, streamid_len);
> +
> +    return ret;
> +}
> +
> +static int libsrt_write_common(URLContext *h, int fd, int eid, const uint8_t *buf, int size)
> +{
> +    int ret;
> +
> +    if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
> +        ret = libsrt_network_wait_fd_timeout(h, eid, 1, h->rw_timeout, &h->interrupt_callback);
> +        if (ret)
> +            return ret;
> +    }
> +
> +    ret = srt_sendmsg(fd, buf, size, -1, 1);
> +    if (ret < 0) {
> +        ret = libsrt_neterrno(h);
> +    }
> +
> +    return ret;
> +}
> +
> +static void *libsrt_client_thread(void *_SRTClientContext)
> +{
> +    SRTClientContext *c = _SRTClientContext;
> +    URLContext *h = c->h;
> +    SRTContext *s = h->priv_data;
> +    uint8_t *buf;
> +    int ret;
> +
> +    buf = av_malloc(c->payload_size);
> +    if (buf == NULL) {
> +        av_log(h, AV_LOG_ERROR, "%s\n", av_err2str(AVERROR(ENOMEM));
> +        return NULL;
> +    }
> +    while (!s->close_threads) {
> +        sem_wait(&c->msg);
> +        while (!s->close_threads && av_fifo_read(c->fifo, buf, c->payload_size) >= 0) {
> +            do {
> +                ret = libsrt_write_common(h, c->fd, c->eid, buf, c->payload_size);
> +            } while(ret == AVERROR(EAGAIN) && !s->close_threads);
> +            if (ret < 0 && ret != AVERROR(EAGAIN)) {
> +                av_log(h, AV_LOG_INFO, "ending client thread with error ret %s\n", av_err2str(ret));
> +                goto end;
> +            }
> +        }
> +    }
> +end:
> +    av_freep(&buf);
> +    c->ended = 1;
> +
> +    return NULL;
> +}
> +
> +static int libsrt_launch_client_thread(URLContext *h, SRTClientContext *c, int fd, int eid)
> +{
> +    SRTContext *s = h->priv_data;
> +
> +    c->set = 1;
> +    c->ended = 0;
> +    c->h = h;
> +    c->fd = fd;
> +    c->eid = eid;
> +    c->payload_size = SRT_LIVE_DEFAULT_PAYLOAD_SIZE;
> +    if (s->payload_size > 0) {
> +        c->payload_size = s->payload_size;
> +    }
> +    c->fifo = av_fifo_alloc2(c->payload_size * 4096, 1, 0);
> +    sem_init(&c->msg, 0, 0);
> +    pthread_create(&c->thread, NULL, libsrt_client_thread, c);
> +
> +    return 0;
> +}
> +
> +static int libsrt_close_client_thread(URLContext *h, SRTClientContext *c)
> +{
> +    pthread_join(c->thread, NULL);
> +    sem_destroy(&c->msg);
> +    av_fifo_freep2(&c->fifo);
> +    srt_epoll_release(c->eid);
> +    srt_close(c->fd);
> +    c->ended = 0;
> +    c->set = 0;
> +    return 0;
> +}
> +
> +static void *libsrt_accept_thread(void *_URLContext)
> +{
> +    URLContext *h = _URLContext;
> +    SRTContext *s = h->priv_data;
> +    int i;
> +    int ret;
> +    int client_fd;
> +
> +    while (!s->close_threads) {
> +        pthread_mutex_lock(&s->accept_mutex);
> +        for (i = 0; i < s->max_clients; i++) {
> +            if (s->client_context[i].set && s->client_context[i].ended) {
> +                av_log(h, AV_LOG_DEBUG, "closing client thread idx %d\n", i);
> +                libsrt_close_client_thread(h, &s->client_context[i]);
> +            }
> +        }
> +        pthread_mutex_unlock(&s->accept_mutex);
> +
> +        ret = libsrt_network_wait_fd_timeout(h, s->listen_eid, 0, 250000, &h->interrupt_callback);
> +        if (ret < 0)
> +            continue;
> +
> +        client_fd = ret = libsrt_accept(s->listen_fd, h);
> +        if (ret < 0)
> +            continue;
> +        av_log(h, AV_LOG_DEBUG, "new client connection\n");
> +        pthread_mutex_lock(&s->accept_mutex);
> +        for (i = 0; i < s->max_clients; i++) {
> +            if (!s->client_context[i].set) {
> +                s->fd[i] = client_fd;
> +                s->eid[i] = libsrt_epoll_create(h, client_fd, 1);
> +                av_log(h, AV_LOG_DEBUG, "launching client thread idx %d\n", i);
> +                libsrt_launch_client_thread(h, &s->client_context[i], s->fd[i], s->eid[i]);
> +                break;
> +            }
> +        }
> +        if (i == s->max_clients) {
> +            av_log(h, AV_LOG_DEBUG, "no more clients available, max_clients = %d\n", s->max_clients);
> +            srt_close(client_fd);
> +        }
> +        pthread_mutex_unlock(&s->accept_mutex);
> +    }
> +    av_log(h, AV_LOG_DEBUG, "exiting accept thread\n");
> +
> +    return NULL;
> +}
> +
> +static int libsrt_listen(int eid, int fd, const struct sockaddr *addr, socklen_t addrlen, URLContext *h, int64_t timeout)
> +{
> +    int ret;
> +    int reuse = 1;
> +
>     if (srt_setsockopt(fd, SOL_SOCKET, SRTO_REUSEADDR, &reuse, sizeof(reuse))) {
>         av_log(h, AV_LOG_WARNING, "setsockopt(SRTO_REUSEADDR) failed\n");
>     }
> @@ -255,14 +424,7 @@ static int libsrt_listen(int eid, int fd, const struct sockaddr *addr, socklen_t
>     if (ret < 0)
>         return ret;
> 
> -    ret = srt_accept(fd, NULL, NULL);
> -    if (ret < 0)
> -        return libsrt_neterrno(h);
> -    if (libsrt_socket_nonblock(ret, 1) < 0)
> -        av_log(h, AV_LOG_DEBUG, "libsrt_socket_nonblock failed\n");
> -    if (!libsrt_getsockopt(h, ret, SRTO_STREAMID, "SRTO_STREAMID", streamid, &streamid_len))
> -        /* Note: returned streamid_len doesn't count the terminating null character */
> -        av_log(h, AV_LOG_VERBOSE, "accept streamid [%s], length %d\n", streamid, streamid_len);
> +    ret = libsrt_accept(fd, h);
> 
>     return ret;
> }
> @@ -462,10 +624,15 @@ static int libsrt_setup(URLContext *h, const char *uri, int flags)
>             goto fail1;
>         // multi-client
>         ret = libsrt_listen(read_eid, fd, cur_ai->ai_addr, cur_ai->ai_addrlen, h, s->listen_timeout);
> -        srt_epoll_release(read_eid);
> -        if (ret < 0)
> -            goto fail1;
> -        srt_close(fd);
> +        if (!(flags & AVIO_FLAG_WRITE) || s->max_clients == 1) {
> +            srt_epoll_release(read_eid);
> +            if (ret < 0)
> +                goto fail1;
> +            srt_close(fd);
> +        } else {
> +            s->listen_eid = read_eid;
> +            s->listen_fd = fd;
> +        }
>         fd = ret;
>     } else {
>         int write_eid = ret = libsrt_epoll_create(h, fd, 1);
> @@ -508,8 +675,15 @@ static int libsrt_setup(URLContext *h, const char *uri, int flags)
>         goto fail1;
> 
>     h->is_streamed = 1;
> -    s->fd = fd;
> -    s->eid = eid;
> +    s->fd[0] = fd;
> +    s->eid[0] = eid;
> +
> +    if (s->mode == SRT_MODE_LISTENER && (flags & AVIO_FLAG_WRITE) && s->max_clients > 1) {
> +        av_log(h, AV_LOG_DEBUG, "launching client thread idx 0\n");
> +        libsrt_launch_client_thread(h, &s->client_context[0], fd, eid);
> +        av_log(h, AV_LOG_DEBUG, "launching accept_thread\n");
> +        pthread_create(&s->accept_thread, NULL, libsrt_accept_thread, h);
> +    }
> 
>     freeaddrinfo(ai);
>     return 0;
> @@ -671,7 +845,26 @@ static int libsrt_open(URLContext *h, const char *uri, int flags)
>         if (av_find_info_tag(buf, sizeof(buf), "linger", p)) {
>             s->linger = strtol(buf, NULL, 10);
>         }
> +        if (av_find_info_tag(buf, sizeof(buf), "max_clients", p)) {
> +            s->max_clients = strtoll(buf, NULL, 10);
> +            if (s->max_clients < 1) {
> +                ret = AVERROR(EINVAL);
> +                goto err;
> +            }
> +        }
> +    }
> +    if (s->mode != SRT_MODE_LISTENER || !(flags & AVIO_FLAG_WRITE)) {
> +        s->max_clients = 1;
>     }
> +    s->fd = av_calloc(s->max_clients, sizeof(*s->fd));
> +    s->eid = av_calloc(s->max_clients, sizeof(*s->eid));
> +    s->client_context = av_calloc(s->max_clients, sizeof(*s->client_context));
> +    if (s->fd == NULL || s->eid == NULL || s->client_context == NULL) {
> +        ret = AVERROR(ENOMEM);
> +        av_log(h, AV_LOG_ERROR, "%s\n", av_err2str(ret));
> +        goto err;
> +    }
> +
>     ret = libsrt_setup(h, uri, flags);
>     if (ret < 0)
>         goto err;
> @@ -688,12 +881,12 @@ static int libsrt_read(URLContext *h, uint8_t *buf, int size)
>     int ret;
> 
>     if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
> -        ret = libsrt_network_wait_fd_timeout(h, s->eid, 0, h->rw_timeout, &h->interrupt_callback);
> +        ret = libsrt_network_wait_fd_timeout(h, s->eid[0], 0, h->rw_timeout, &h->interrupt_callback);
>         if (ret)
>             return ret;
>     }
> 
> -    ret = srt_recvmsg(s->fd, buf, size);
> +    ret = srt_recvmsg(s->fd[0], buf, size);
>     if (ret < 0) {
>         ret = libsrt_neterrno(h);
>     }
> @@ -704,17 +897,35 @@ static int libsrt_read(URLContext *h, uint8_t *buf, int size)
> static int libsrt_write(URLContext *h, const uint8_t *buf, int size)
> {
>     SRTContext *s = h->priv_data;
> +    SRTClientContext *c;
> +    int i;
>     int ret;
> -
> -    if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
> -        ret = libsrt_network_wait_fd_timeout(h, s->eid, 1, h->rw_timeout, &h->interrupt_callback);
> -        if (ret)
> -            return ret;
> -    }
> -
> -    ret = srt_sendmsg(s->fd, buf, size, -1, 1);
> -    if (ret < 0) {
> -        ret = libsrt_neterrno(h);
> +    int any_ok = 0;
> +    int any = 0;
> +
> +    if (s->mode == SRT_MODE_LISTENER && s->max_clients > 1) {
> +        pthread_mutex_lock(&s->accept_mutex);
> +        for (i = 0; i < s->max_clients; i++) {
> +            c = &s->client_context[i];
> +            if (c->set && !c->ended) {
> +                any = 1;
> +                ret = av_fifo_write(c->fifo, buf, size);
> +                if (ret >= 0) {
> +                    sem_post(&c->msg);
> +                    any_ok = 1;
> +                }
> +            }
> +        }
> +        pthread_mutex_unlock(&s->accept_mutex);
> +        if (!any) {
> +            ret = AVERROR(EIO);
> +        } else if (!any_ok) {
> +            ret = AVERROR(EAGAIN);
> +        } else {
> +            ret = size;
> +        }
> +    } else {
> +        ret = libsrt_write_common(h, s->fd[0], s->eid[0], buf, size);
>     }
> 
>     return ret;
> @@ -723,9 +934,31 @@ static int libsrt_write(URLContext *h, const uint8_t *buf, int size)
> static int libsrt_close(URLContext *h)
> {
>     SRTContext *s = h->priv_data;
> +    int i;
> 
> -    srt_epoll_release(s->eid);
> -    srt_close(s->fd);
> +    if (s->max_clients > 1) {
> +        s->close_threads = 1;
> +        for (i = 0; i < s->max_clients; i++) {
> +            if (s->client_context[i].set) {
> +                sem_post(&s->client_context[i].msg);
> +            }
> +        }
> +        pthread_join(s->accept_thread, NULL);
> +        srt_epoll_release(s->listen_eid);
> +        srt_close(s->listen_fd);
> +        for (i = 0; i < s->max_clients; i++) {
> +            if (s->client_context[i].set) {
> +                av_log(h, AV_LOG_DEBUG, "closing client thread idx %d\n", i);
> +                libsrt_close_client_thread(h, &s->client_context[i]);
> +            }
> +        }
> +    } else {
> +        srt_epoll_release(s->eid[0]);
> +        srt_close(s->fd[0]);
> +    }
> +    av_freep(&s->fd);
> +    av_freep(&s->eid);
> +    av_freep(&s->client_context);
> 
>     srt_cleanup();
> 
> -- 
> 2.39.4
> 
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel@ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
> 
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
diff mbox series

Patch

diff --git a/libavformat/libsrt.c b/libavformat/libsrt.c
index 9e860abccd..c9392e6996 100644
--- a/libavformat/libsrt.c
+++ b/libavformat/libsrt.c
@@ -21,8 +21,11 @@ 
  * Haivision Open SRT (Secure Reliable Transport) protocol
  */
 
+#include <pthread.h>
+#include <semaphore.h>
 #include <srt/srt.h>
 
+#include "libavutil/fifo.h"
 #include "libavutil/mem.h"
 #include "libavutil/opt.h"
 #include "libavutil/parseutils.h"
@@ -51,10 +54,22 @@  enum SRTMode {
     SRT_MODE_RENDEZVOUS = 2
 };
 
-typedef struct SRTContext {
-    const AVClass *class;
+typedef struct SRTClientContext {
+    int set;
+    int ended;
+    URLContext *h;
+    pthread_t thread;
+    int payload_size;
     int fd;
     int eid;
+    sem_t msg;
+    AVFifo *fifo;
+} SRTClientContext;
+
+typedef struct SRTContext {
+    const AVClass *class;
+    int *fd;
+    int *eid;
     int64_t rw_timeout;
     int64_t listen_timeout;
     int recv_buffer_size;
@@ -93,6 +108,13 @@  typedef struct SRTContext {
     SRT_TRANSTYPE transtype;
     int linger;
     int tsbpd;
+    pthread_mutex_t accept_mutex;
+    pthread_t accept_thread;
+    int listen_fd;
+    int listen_eid;
+    SRTClientContext *client_context;
+    int max_clients;
+    int close_threads;
 } SRTContext;
 
 #define D AV_OPT_FLAG_DECODING_PARAM
@@ -146,6 +168,7 @@  static const AVOption libsrt_options[] = {
     { "file",           NULL, 0, AV_OPT_TYPE_CONST,  { .i64 = SRTT_FILE }, INT_MIN, INT_MAX, .flags = D|E, .unit = "transtype" },
     { "linger",         "Number of seconds that the socket waits for unsent data when closing", OFFSET(linger),           AV_OPT_TYPE_INT,      { .i64 = -1 }, -1, INT_MAX,   .flags = D|E },
     { "tsbpd",          "Timestamp-based packet delivery",                                      OFFSET(tsbpd),            AV_OPT_TYPE_BOOL,     { .i64 = -1 }, -1, 1,         .flags = D|E },
+    { "max_clients",    "Maximum simultaneous clients when mode=listener and writing packages", OFFSET(max_clients),      AV_OPT_TYPE_INT,      { .i64 = 1 },   1, INT_MAX,   .flags = E },
     { NULL }
 };
 
@@ -235,13 +258,159 @@  static int libsrt_network_wait_fd_timeout(URLContext *h, int eid, int write, int
     }
 }
 
-static int libsrt_listen(int eid, int fd, const struct sockaddr *addr, socklen_t addrlen, URLContext *h, int64_t timeout)
+static int libsrt_accept(int fd, URLContext *h)
 {
     int ret;
-    int reuse = 1;
     /* Max streamid length plus an extra space for the terminating null character */
     char streamid[513];
     int streamid_len = sizeof(streamid);
+
+    ret = srt_accept(fd, NULL, NULL);
+    if (ret < 0)
+        return libsrt_neterrno(h);
+    if (libsrt_socket_nonblock(ret, 1) < 0)
+        av_log(h, AV_LOG_DEBUG, "libsrt_socket_nonblock failed\n");
+    if (!libsrt_getsockopt(h, ret, SRTO_STREAMID, "SRTO_STREAMID", streamid, &streamid_len))
+        /* Note: returned streamid_len doesn't count the terminating null character */
+        av_log(h, AV_LOG_VERBOSE, "accept streamid [%s], length %d\n", streamid, streamid_len);
+
+    return ret;
+}
+
+static int libsrt_write_common(URLContext *h, int fd, int eid, const uint8_t *buf, int size)
+{
+    int ret;
+
+    if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
+        ret = libsrt_network_wait_fd_timeout(h, eid, 1, h->rw_timeout, &h->interrupt_callback);
+        if (ret)
+            return ret;
+    }
+
+    ret = srt_sendmsg(fd, buf, size, -1, 1);
+    if (ret < 0) {
+        ret = libsrt_neterrno(h);
+    }
+
+    return ret;
+}
+
+static void *libsrt_client_thread(void *_SRTClientContext)
+{
+    SRTClientContext *c = _SRTClientContext;
+    URLContext *h = c->h;
+    SRTContext *s = h->priv_data;
+    uint8_t *buf;
+    int ret;
+
+    buf = av_malloc(c->payload_size);
+    if (buf == NULL) {
+        av_log(h, AV_LOG_ERROR, "%s\n", av_err2str(AVERROR(ENOMEM));
+        return NULL;
+    }
+    while (!s->close_threads) {
+        sem_wait(&c->msg);
+        while (!s->close_threads && av_fifo_read(c->fifo, buf, c->payload_size) >= 0) {
+            do {
+                ret = libsrt_write_common(h, c->fd, c->eid, buf, c->payload_size);
+            } while(ret == AVERROR(EAGAIN) && !s->close_threads);
+            if (ret < 0 && ret != AVERROR(EAGAIN)) {
+                av_log(h, AV_LOG_INFO, "ending client thread with error ret %s\n", av_err2str(ret));
+                goto end;
+            }
+        }
+    }
+end:
+    av_freep(&buf);
+    c->ended = 1;
+
+    return NULL;
+}
+
+static int libsrt_launch_client_thread(URLContext *h, SRTClientContext *c, int fd, int eid)
+{
+    SRTContext *s = h->priv_data;
+
+    c->set = 1;
+    c->ended = 0;
+    c->h = h;
+    c->fd = fd;
+    c->eid = eid;
+    c->payload_size = SRT_LIVE_DEFAULT_PAYLOAD_SIZE;
+    if (s->payload_size > 0) {
+        c->payload_size = s->payload_size;
+    }
+    c->fifo = av_fifo_alloc2(c->payload_size * 4096, 1, 0);
+    sem_init(&c->msg, 0, 0);
+    pthread_create(&c->thread, NULL, libsrt_client_thread, c);
+
+    return 0;
+}
+
+static int libsrt_close_client_thread(URLContext *h, SRTClientContext *c)
+{
+    pthread_join(c->thread, NULL);
+    sem_destroy(&c->msg);
+    av_fifo_freep2(&c->fifo);
+    srt_epoll_release(c->eid);
+    srt_close(c->fd);
+    c->ended = 0;
+    c->set = 0;
+    return 0;
+}
+
+static void *libsrt_accept_thread(void *_URLContext)
+{
+    URLContext *h = _URLContext;
+    SRTContext *s = h->priv_data;
+    int i;
+    int ret;
+    int client_fd;
+
+    while (!s->close_threads) {
+        pthread_mutex_lock(&s->accept_mutex);
+        for (i = 0; i < s->max_clients; i++) {
+            if (s->client_context[i].set && s->client_context[i].ended) {
+                av_log(h, AV_LOG_DEBUG, "closing client thread idx %d\n", i);
+                libsrt_close_client_thread(h, &s->client_context[i]);
+            }
+        }
+        pthread_mutex_unlock(&s->accept_mutex);
+
+        ret = libsrt_network_wait_fd_timeout(h, s->listen_eid, 0, 250000, &h->interrupt_callback);
+        if (ret < 0)
+            continue;
+
+        client_fd = ret = libsrt_accept(s->listen_fd, h);
+        if (ret < 0)
+            continue;
+        av_log(h, AV_LOG_DEBUG, "new client connection\n");
+        pthread_mutex_lock(&s->accept_mutex);
+        for (i = 0; i < s->max_clients; i++) {
+            if (!s->client_context[i].set) {
+                s->fd[i] = client_fd;
+                s->eid[i] = libsrt_epoll_create(h, client_fd, 1);
+                av_log(h, AV_LOG_DEBUG, "launching client thread idx %d\n", i);
+                libsrt_launch_client_thread(h, &s->client_context[i], s->fd[i], s->eid[i]);
+                break;
+            }
+        }
+        if (i == s->max_clients) {
+            av_log(h, AV_LOG_DEBUG, "no more clients available, max_clients = %d\n", s->max_clients);
+            srt_close(client_fd);
+        }
+        pthread_mutex_unlock(&s->accept_mutex);
+    }
+    av_log(h, AV_LOG_DEBUG, "exiting accept thread\n");
+
+    return NULL;
+}
+
+static int libsrt_listen(int eid, int fd, const struct sockaddr *addr, socklen_t addrlen, URLContext *h, int64_t timeout)
+{
+    int ret;
+    int reuse = 1;
+
     if (srt_setsockopt(fd, SOL_SOCKET, SRTO_REUSEADDR, &reuse, sizeof(reuse))) {
         av_log(h, AV_LOG_WARNING, "setsockopt(SRTO_REUSEADDR) failed\n");
     }
@@ -255,14 +424,7 @@  static int libsrt_listen(int eid, int fd, const struct sockaddr *addr, socklen_t
     if (ret < 0)
         return ret;
 
-    ret = srt_accept(fd, NULL, NULL);
-    if (ret < 0)
-        return libsrt_neterrno(h);
-    if (libsrt_socket_nonblock(ret, 1) < 0)
-        av_log(h, AV_LOG_DEBUG, "libsrt_socket_nonblock failed\n");
-    if (!libsrt_getsockopt(h, ret, SRTO_STREAMID, "SRTO_STREAMID", streamid, &streamid_len))
-        /* Note: returned streamid_len doesn't count the terminating null character */
-        av_log(h, AV_LOG_VERBOSE, "accept streamid [%s], length %d\n", streamid, streamid_len);
+    ret = libsrt_accept(fd, h);
 
     return ret;
 }
@@ -462,10 +624,15 @@  static int libsrt_setup(URLContext *h, const char *uri, int flags)
             goto fail1;
         // multi-client
         ret = libsrt_listen(read_eid, fd, cur_ai->ai_addr, cur_ai->ai_addrlen, h, s->listen_timeout);
-        srt_epoll_release(read_eid);
-        if (ret < 0)
-            goto fail1;
-        srt_close(fd);
+        if (!(flags & AVIO_FLAG_WRITE) || s->max_clients == 1) {
+            srt_epoll_release(read_eid);
+            if (ret < 0)
+                goto fail1;
+            srt_close(fd);
+        } else {
+            s->listen_eid = read_eid;
+            s->listen_fd = fd;
+        }
         fd = ret;
     } else {
         int write_eid = ret = libsrt_epoll_create(h, fd, 1);
@@ -508,8 +675,15 @@  static int libsrt_setup(URLContext *h, const char *uri, int flags)
         goto fail1;
 
     h->is_streamed = 1;
-    s->fd = fd;
-    s->eid = eid;
+    s->fd[0] = fd;
+    s->eid[0] = eid;
+
+    if (s->mode == SRT_MODE_LISTENER && (flags & AVIO_FLAG_WRITE) && s->max_clients > 1) {
+        av_log(h, AV_LOG_DEBUG, "launching client thread idx 0\n");
+        libsrt_launch_client_thread(h, &s->client_context[0], fd, eid);
+        av_log(h, AV_LOG_DEBUG, "launching accept_thread\n");
+        pthread_create(&s->accept_thread, NULL, libsrt_accept_thread, h);
+    }
 
     freeaddrinfo(ai);
     return 0;
@@ -671,7 +845,26 @@  static int libsrt_open(URLContext *h, const char *uri, int flags)
         if (av_find_info_tag(buf, sizeof(buf), "linger", p)) {
             s->linger = strtol(buf, NULL, 10);
         }
+        if (av_find_info_tag(buf, sizeof(buf), "max_clients", p)) {
+            s->max_clients = strtoll(buf, NULL, 10);
+            if (s->max_clients < 1) {
+                ret = AVERROR(EINVAL);
+                goto err;
+            }
+        }
+    }
+    if (s->mode != SRT_MODE_LISTENER || !(flags & AVIO_FLAG_WRITE)) {
+        s->max_clients = 1;
     }
+    s->fd = av_calloc(s->max_clients, sizeof(*s->fd));
+    s->eid = av_calloc(s->max_clients, sizeof(*s->eid));
+    s->client_context = av_calloc(s->max_clients, sizeof(*s->client_context));
+    if (s->fd == NULL || s->eid == NULL || s->client_context == NULL) {
+        ret = AVERROR(ENOMEM);
+        av_log(h, AV_LOG_ERROR, "%s\n", av_err2str(ret));
+        goto err;
+    }
+
     ret = libsrt_setup(h, uri, flags);
     if (ret < 0)
         goto err;
@@ -688,12 +881,12 @@  static int libsrt_read(URLContext *h, uint8_t *buf, int size)
     int ret;
 
     if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
-        ret = libsrt_network_wait_fd_timeout(h, s->eid, 0, h->rw_timeout, &h->interrupt_callback);
+        ret = libsrt_network_wait_fd_timeout(h, s->eid[0], 0, h->rw_timeout, &h->interrupt_callback);
         if (ret)
             return ret;
     }
 
-    ret = srt_recvmsg(s->fd, buf, size);
+    ret = srt_recvmsg(s->fd[0], buf, size);
     if (ret < 0) {
         ret = libsrt_neterrno(h);
     }
@@ -704,17 +897,35 @@  static int libsrt_read(URLContext *h, uint8_t *buf, int size)
 static int libsrt_write(URLContext *h, const uint8_t *buf, int size)
 {
     SRTContext *s = h->priv_data;
+    SRTClientContext *c;
+    int i;
     int ret;
-
-    if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
-        ret = libsrt_network_wait_fd_timeout(h, s->eid, 1, h->rw_timeout, &h->interrupt_callback);
-        if (ret)
-            return ret;
-    }
-
-    ret = srt_sendmsg(s->fd, buf, size, -1, 1);
-    if (ret < 0) {
-        ret = libsrt_neterrno(h);
+    int any_ok = 0;
+    int any = 0;
+
+    if (s->mode == SRT_MODE_LISTENER && s->max_clients > 1) {
+        pthread_mutex_lock(&s->accept_mutex);
+        for (i = 0; i < s->max_clients; i++) {
+            c = &s->client_context[i];
+            if (c->set && !c->ended) {
+                any = 1;
+                ret = av_fifo_write(c->fifo, buf, size);
+                if (ret >= 0) {
+                    sem_post(&c->msg);
+                    any_ok = 1;
+                }
+            }
+        }
+        pthread_mutex_unlock(&s->accept_mutex);
+        if (!any) {
+            ret = AVERROR(EIO);
+        } else if (!any_ok) {
+            ret = AVERROR(EAGAIN);
+        } else {
+            ret = size;
+        }
+    } else {
+        ret = libsrt_write_common(h, s->fd[0], s->eid[0], buf, size);
     }
 
     return ret;
@@ -723,9 +934,31 @@  static int libsrt_write(URLContext *h, const uint8_t *buf, int size)
 static int libsrt_close(URLContext *h)
 {
     SRTContext *s = h->priv_data;
+    int i;
 
-    srt_epoll_release(s->eid);
-    srt_close(s->fd);
+    if (s->max_clients > 1) {
+        s->close_threads = 1;
+        for (i = 0; i < s->max_clients; i++) {
+            if (s->client_context[i].set) {
+                sem_post(&s->client_context[i].msg);
+            }
+        }
+        pthread_join(s->accept_thread, NULL);
+        srt_epoll_release(s->listen_eid);
+        srt_close(s->listen_fd);
+        for (i = 0; i < s->max_clients; i++) {
+            if (s->client_context[i].set) {
+                av_log(h, AV_LOG_DEBUG, "closing client thread idx %d\n", i);
+                libsrt_close_client_thread(h, &s->client_context[i]);
+            }
+        }
+    } else {
+        srt_epoll_release(s->eid[0]);
+        srt_close(s->fd[0]);
+    }
+    av_freep(&s->fd);
+    av_freep(&s->eid);
+    av_freep(&s->client_context);
 
     srt_cleanup();