Message ID | 20240919134646.3841-1-nicolas.dato@gmail.com |
---|---|
State | New |
Headers | show |
Series | [FFmpeg-devel] avformat/libsrt: Adding support for multiple clients as a server | expand |
Context | Check | Description |
---|---|---|
yinshiyou/make_loongarch64 | success | Make finished |
yinshiyou/make_fate_loongarch64 | success | Make fate finished |
andriy/make_x86 | success | Make finished |
andriy/make_fate_x86 | success | Make fate finished |
> On Sep 19, 2024, at 21:46, nicolas.dato@gmail.com wrote: > > From: Nicolas Jorge Dato <nicolas.dato@gmail.com> > > When in listener mode and writing, now libsrt supports multiple clients > configured with the max_clients parameter. > > When max_clients=1 (default), it behaves as before. > When max_clientes > 1, after accepting the first client it launches > a thread to listen for more clients, and a launches a new thread for > each client. > --- > libavformat/libsrt.c | 297 ++++++++++++++++++++++++++++++++++++++----- > 1 file changed, 265 insertions(+), 32 deletions(-) > > diff --git a/libavformat/libsrt.c b/libavformat/libsrt.c > index 9e860abccd..c9392e6996 100644 > --- a/libavformat/libsrt.c > +++ b/libavformat/libsrt.c > @@ -21,8 +21,11 @@ > * Haivision Open SRT (Secure Reliable Transport) protocol > */ > > +#include <pthread.h> > +#include <semaphore.h> > #include <srt/srt.h> The implementation isn't portable. Although FFmpeg has ffserver once upon a time, the IO in libavformat isn’t designed to be used as a server. I'm afraid a real server (e.g, srs-simple realtime server) is more appropriate than stretch libavformat. > > +#include "libavutil/fifo.h" > #include "libavutil/mem.h" > #include "libavutil/opt.h" > #include "libavutil/parseutils.h" > @@ -51,10 +54,22 @@ enum SRTMode { > SRT_MODE_RENDEZVOUS = 2 > }; > > -typedef struct SRTContext { > - const AVClass *class; > +typedef struct SRTClientContext { > + int set; > + int ended; > + URLContext *h; > + pthread_t thread; > + int payload_size; > int fd; > int eid; > + sem_t msg; > + AVFifo *fifo; > +} SRTClientContext; > + > +typedef struct SRTContext { > + const AVClass *class; > + int *fd; > + int *eid; > int64_t rw_timeout; > int64_t listen_timeout; > int recv_buffer_size; > @@ -93,6 +108,13 @@ typedef struct SRTContext { > SRT_TRANSTYPE transtype; > int linger; > int tsbpd; > + pthread_mutex_t accept_mutex; > + pthread_t accept_thread; > + int listen_fd; > + int listen_eid; > + SRTClientContext *client_context; > + int max_clients; > + int close_threads; > } SRTContext; > > #define D AV_OPT_FLAG_DECODING_PARAM > @@ -146,6 +168,7 @@ static const AVOption libsrt_options[] = { > { "file", NULL, 0, AV_OPT_TYPE_CONST, { .i64 = SRTT_FILE }, INT_MIN, INT_MAX, .flags = D|E, .unit = "transtype" }, > { "linger", "Number of seconds that the socket waits for unsent data when closing", OFFSET(linger), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E }, > { "tsbpd", "Timestamp-based packet delivery", OFFSET(tsbpd), AV_OPT_TYPE_BOOL, { .i64 = -1 }, -1, 1, .flags = D|E }, > + { "max_clients", "Maximum simultaneous clients when mode=listener and writing packages", OFFSET(max_clients), AV_OPT_TYPE_INT, { .i64 = 1 }, 1, INT_MAX, .flags = E }, > { NULL } > }; > > @@ -235,13 +258,159 @@ static int libsrt_network_wait_fd_timeout(URLContext *h, int eid, int write, int > } > } > > -static int libsrt_listen(int eid, int fd, const struct sockaddr *addr, socklen_t addrlen, URLContext *h, int64_t timeout) > +static int libsrt_accept(int fd, URLContext *h) > { > int ret; > - int reuse = 1; > /* Max streamid length plus an extra space for the terminating null character */ > char streamid[513]; > int streamid_len = sizeof(streamid); > + > + ret = srt_accept(fd, NULL, NULL); > + if (ret < 0) > + return libsrt_neterrno(h); > + if (libsrt_socket_nonblock(ret, 1) < 0) > + av_log(h, AV_LOG_DEBUG, "libsrt_socket_nonblock failed\n"); > + if (!libsrt_getsockopt(h, ret, SRTO_STREAMID, "SRTO_STREAMID", streamid, &streamid_len)) > + /* Note: returned streamid_len doesn't count the terminating null character */ > + av_log(h, AV_LOG_VERBOSE, "accept streamid [%s], length %d\n", streamid, streamid_len); > + > + return ret; > +} > + > +static int libsrt_write_common(URLContext *h, int fd, int eid, const uint8_t *buf, int size) > +{ > + int ret; > + > + if (!(h->flags & AVIO_FLAG_NONBLOCK)) { > + ret = libsrt_network_wait_fd_timeout(h, eid, 1, h->rw_timeout, &h->interrupt_callback); > + if (ret) > + return ret; > + } > + > + ret = srt_sendmsg(fd, buf, size, -1, 1); > + if (ret < 0) { > + ret = libsrt_neterrno(h); > + } > + > + return ret; > +} > + > +static void *libsrt_client_thread(void *_SRTClientContext) > +{ > + SRTClientContext *c = _SRTClientContext; > + URLContext *h = c->h; > + SRTContext *s = h->priv_data; > + uint8_t *buf; > + int ret; > + > + buf = av_malloc(c->payload_size); > + if (buf == NULL) { > + av_log(h, AV_LOG_ERROR, "%s\n", av_err2str(AVERROR(ENOMEM)); > + return NULL; > + } > + while (!s->close_threads) { > + sem_wait(&c->msg); > + while (!s->close_threads && av_fifo_read(c->fifo, buf, c->payload_size) >= 0) { > + do { > + ret = libsrt_write_common(h, c->fd, c->eid, buf, c->payload_size); > + } while(ret == AVERROR(EAGAIN) && !s->close_threads); > + if (ret < 0 && ret != AVERROR(EAGAIN)) { > + av_log(h, AV_LOG_INFO, "ending client thread with error ret %s\n", av_err2str(ret)); > + goto end; > + } > + } > + } > +end: > + av_freep(&buf); > + c->ended = 1; > + > + return NULL; > +} > + > +static int libsrt_launch_client_thread(URLContext *h, SRTClientContext *c, int fd, int eid) > +{ > + SRTContext *s = h->priv_data; > + > + c->set = 1; > + c->ended = 0; > + c->h = h; > + c->fd = fd; > + c->eid = eid; > + c->payload_size = SRT_LIVE_DEFAULT_PAYLOAD_SIZE; > + if (s->payload_size > 0) { > + c->payload_size = s->payload_size; > + } > + c->fifo = av_fifo_alloc2(c->payload_size * 4096, 1, 0); > + sem_init(&c->msg, 0, 0); > + pthread_create(&c->thread, NULL, libsrt_client_thread, c); > + > + return 0; > +} > + > +static int libsrt_close_client_thread(URLContext *h, SRTClientContext *c) > +{ > + pthread_join(c->thread, NULL); > + sem_destroy(&c->msg); > + av_fifo_freep2(&c->fifo); > + srt_epoll_release(c->eid); > + srt_close(c->fd); > + c->ended = 0; > + c->set = 0; > + return 0; > +} > + > +static void *libsrt_accept_thread(void *_URLContext) > +{ > + URLContext *h = _URLContext; > + SRTContext *s = h->priv_data; > + int i; > + int ret; > + int client_fd; > + > + while (!s->close_threads) { > + pthread_mutex_lock(&s->accept_mutex); > + for (i = 0; i < s->max_clients; i++) { > + if (s->client_context[i].set && s->client_context[i].ended) { > + av_log(h, AV_LOG_DEBUG, "closing client thread idx %d\n", i); > + libsrt_close_client_thread(h, &s->client_context[i]); > + } > + } > + pthread_mutex_unlock(&s->accept_mutex); > + > + ret = libsrt_network_wait_fd_timeout(h, s->listen_eid, 0, 250000, &h->interrupt_callback); > + if (ret < 0) > + continue; > + > + client_fd = ret = libsrt_accept(s->listen_fd, h); > + if (ret < 0) > + continue; > + av_log(h, AV_LOG_DEBUG, "new client connection\n"); > + pthread_mutex_lock(&s->accept_mutex); > + for (i = 0; i < s->max_clients; i++) { > + if (!s->client_context[i].set) { > + s->fd[i] = client_fd; > + s->eid[i] = libsrt_epoll_create(h, client_fd, 1); > + av_log(h, AV_LOG_DEBUG, "launching client thread idx %d\n", i); > + libsrt_launch_client_thread(h, &s->client_context[i], s->fd[i], s->eid[i]); > + break; > + } > + } > + if (i == s->max_clients) { > + av_log(h, AV_LOG_DEBUG, "no more clients available, max_clients = %d\n", s->max_clients); > + srt_close(client_fd); > + } > + pthread_mutex_unlock(&s->accept_mutex); > + } > + av_log(h, AV_LOG_DEBUG, "exiting accept thread\n"); > + > + return NULL; > +} > + > +static int libsrt_listen(int eid, int fd, const struct sockaddr *addr, socklen_t addrlen, URLContext *h, int64_t timeout) > +{ > + int ret; > + int reuse = 1; > + > if (srt_setsockopt(fd, SOL_SOCKET, SRTO_REUSEADDR, &reuse, sizeof(reuse))) { > av_log(h, AV_LOG_WARNING, "setsockopt(SRTO_REUSEADDR) failed\n"); > } > @@ -255,14 +424,7 @@ static int libsrt_listen(int eid, int fd, const struct sockaddr *addr, socklen_t > if (ret < 0) > return ret; > > - ret = srt_accept(fd, NULL, NULL); > - if (ret < 0) > - return libsrt_neterrno(h); > - if (libsrt_socket_nonblock(ret, 1) < 0) > - av_log(h, AV_LOG_DEBUG, "libsrt_socket_nonblock failed\n"); > - if (!libsrt_getsockopt(h, ret, SRTO_STREAMID, "SRTO_STREAMID", streamid, &streamid_len)) > - /* Note: returned streamid_len doesn't count the terminating null character */ > - av_log(h, AV_LOG_VERBOSE, "accept streamid [%s], length %d\n", streamid, streamid_len); > + ret = libsrt_accept(fd, h); > > return ret; > } > @@ -462,10 +624,15 @@ static int libsrt_setup(URLContext *h, const char *uri, int flags) > goto fail1; > // multi-client > ret = libsrt_listen(read_eid, fd, cur_ai->ai_addr, cur_ai->ai_addrlen, h, s->listen_timeout); > - srt_epoll_release(read_eid); > - if (ret < 0) > - goto fail1; > - srt_close(fd); > + if (!(flags & AVIO_FLAG_WRITE) || s->max_clients == 1) { > + srt_epoll_release(read_eid); > + if (ret < 0) > + goto fail1; > + srt_close(fd); > + } else { > + s->listen_eid = read_eid; > + s->listen_fd = fd; > + } > fd = ret; > } else { > int write_eid = ret = libsrt_epoll_create(h, fd, 1); > @@ -508,8 +675,15 @@ static int libsrt_setup(URLContext *h, const char *uri, int flags) > goto fail1; > > h->is_streamed = 1; > - s->fd = fd; > - s->eid = eid; > + s->fd[0] = fd; > + s->eid[0] = eid; > + > + if (s->mode == SRT_MODE_LISTENER && (flags & AVIO_FLAG_WRITE) && s->max_clients > 1) { > + av_log(h, AV_LOG_DEBUG, "launching client thread idx 0\n"); > + libsrt_launch_client_thread(h, &s->client_context[0], fd, eid); > + av_log(h, AV_LOG_DEBUG, "launching accept_thread\n"); > + pthread_create(&s->accept_thread, NULL, libsrt_accept_thread, h); > + } > > freeaddrinfo(ai); > return 0; > @@ -671,7 +845,26 @@ static int libsrt_open(URLContext *h, const char *uri, int flags) > if (av_find_info_tag(buf, sizeof(buf), "linger", p)) { > s->linger = strtol(buf, NULL, 10); > } > + if (av_find_info_tag(buf, sizeof(buf), "max_clients", p)) { > + s->max_clients = strtoll(buf, NULL, 10); > + if (s->max_clients < 1) { > + ret = AVERROR(EINVAL); > + goto err; > + } > + } > + } > + if (s->mode != SRT_MODE_LISTENER || !(flags & AVIO_FLAG_WRITE)) { > + s->max_clients = 1; > } > + s->fd = av_calloc(s->max_clients, sizeof(*s->fd)); > + s->eid = av_calloc(s->max_clients, sizeof(*s->eid)); > + s->client_context = av_calloc(s->max_clients, sizeof(*s->client_context)); > + if (s->fd == NULL || s->eid == NULL || s->client_context == NULL) { > + ret = AVERROR(ENOMEM); > + av_log(h, AV_LOG_ERROR, "%s\n", av_err2str(ret)); > + goto err; > + } > + > ret = libsrt_setup(h, uri, flags); > if (ret < 0) > goto err; > @@ -688,12 +881,12 @@ static int libsrt_read(URLContext *h, uint8_t *buf, int size) > int ret; > > if (!(h->flags & AVIO_FLAG_NONBLOCK)) { > - ret = libsrt_network_wait_fd_timeout(h, s->eid, 0, h->rw_timeout, &h->interrupt_callback); > + ret = libsrt_network_wait_fd_timeout(h, s->eid[0], 0, h->rw_timeout, &h->interrupt_callback); > if (ret) > return ret; > } > > - ret = srt_recvmsg(s->fd, buf, size); > + ret = srt_recvmsg(s->fd[0], buf, size); > if (ret < 0) { > ret = libsrt_neterrno(h); > } > @@ -704,17 +897,35 @@ static int libsrt_read(URLContext *h, uint8_t *buf, int size) > static int libsrt_write(URLContext *h, const uint8_t *buf, int size) > { > SRTContext *s = h->priv_data; > + SRTClientContext *c; > + int i; > int ret; > - > - if (!(h->flags & AVIO_FLAG_NONBLOCK)) { > - ret = libsrt_network_wait_fd_timeout(h, s->eid, 1, h->rw_timeout, &h->interrupt_callback); > - if (ret) > - return ret; > - } > - > - ret = srt_sendmsg(s->fd, buf, size, -1, 1); > - if (ret < 0) { > - ret = libsrt_neterrno(h); > + int any_ok = 0; > + int any = 0; > + > + if (s->mode == SRT_MODE_LISTENER && s->max_clients > 1) { > + pthread_mutex_lock(&s->accept_mutex); > + for (i = 0; i < s->max_clients; i++) { > + c = &s->client_context[i]; > + if (c->set && !c->ended) { > + any = 1; > + ret = av_fifo_write(c->fifo, buf, size); > + if (ret >= 0) { > + sem_post(&c->msg); > + any_ok = 1; > + } > + } > + } > + pthread_mutex_unlock(&s->accept_mutex); > + if (!any) { > + ret = AVERROR(EIO); > + } else if (!any_ok) { > + ret = AVERROR(EAGAIN); > + } else { > + ret = size; > + } > + } else { > + ret = libsrt_write_common(h, s->fd[0], s->eid[0], buf, size); > } > > return ret; > @@ -723,9 +934,31 @@ static int libsrt_write(URLContext *h, const uint8_t *buf, int size) > static int libsrt_close(URLContext *h) > { > SRTContext *s = h->priv_data; > + int i; > > - srt_epoll_release(s->eid); > - srt_close(s->fd); > + if (s->max_clients > 1) { > + s->close_threads = 1; > + for (i = 0; i < s->max_clients; i++) { > + if (s->client_context[i].set) { > + sem_post(&s->client_context[i].msg); > + } > + } > + pthread_join(s->accept_thread, NULL); > + srt_epoll_release(s->listen_eid); > + srt_close(s->listen_fd); > + for (i = 0; i < s->max_clients; i++) { > + if (s->client_context[i].set) { > + av_log(h, AV_LOG_DEBUG, "closing client thread idx %d\n", i); > + libsrt_close_client_thread(h, &s->client_context[i]); > + } > + } > + } else { > + srt_epoll_release(s->eid[0]); > + srt_close(s->fd[0]); > + } > + av_freep(&s->fd); > + av_freep(&s->eid); > + av_freep(&s->client_context); > > srt_cleanup(); > > -- > 2.39.4 > > _______________________________________________ > ffmpeg-devel mailing list > ffmpeg-devel@ffmpeg.org > https://ffmpeg.org/mailman/listinfo/ffmpeg-devel > > To unsubscribe, visit link above, or email > ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
diff --git a/libavformat/libsrt.c b/libavformat/libsrt.c index 9e860abccd..c9392e6996 100644 --- a/libavformat/libsrt.c +++ b/libavformat/libsrt.c @@ -21,8 +21,11 @@ * Haivision Open SRT (Secure Reliable Transport) protocol */ +#include <pthread.h> +#include <semaphore.h> #include <srt/srt.h> +#include "libavutil/fifo.h" #include "libavutil/mem.h" #include "libavutil/opt.h" #include "libavutil/parseutils.h" @@ -51,10 +54,22 @@ enum SRTMode { SRT_MODE_RENDEZVOUS = 2 }; -typedef struct SRTContext { - const AVClass *class; +typedef struct SRTClientContext { + int set; + int ended; + URLContext *h; + pthread_t thread; + int payload_size; int fd; int eid; + sem_t msg; + AVFifo *fifo; +} SRTClientContext; + +typedef struct SRTContext { + const AVClass *class; + int *fd; + int *eid; int64_t rw_timeout; int64_t listen_timeout; int recv_buffer_size; @@ -93,6 +108,13 @@ typedef struct SRTContext { SRT_TRANSTYPE transtype; int linger; int tsbpd; + pthread_mutex_t accept_mutex; + pthread_t accept_thread; + int listen_fd; + int listen_eid; + SRTClientContext *client_context; + int max_clients; + int close_threads; } SRTContext; #define D AV_OPT_FLAG_DECODING_PARAM @@ -146,6 +168,7 @@ static const AVOption libsrt_options[] = { { "file", NULL, 0, AV_OPT_TYPE_CONST, { .i64 = SRTT_FILE }, INT_MIN, INT_MAX, .flags = D|E, .unit = "transtype" }, { "linger", "Number of seconds that the socket waits for unsent data when closing", OFFSET(linger), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E }, { "tsbpd", "Timestamp-based packet delivery", OFFSET(tsbpd), AV_OPT_TYPE_BOOL, { .i64 = -1 }, -1, 1, .flags = D|E }, + { "max_clients", "Maximum simultaneous clients when mode=listener and writing packages", OFFSET(max_clients), AV_OPT_TYPE_INT, { .i64 = 1 }, 1, INT_MAX, .flags = E }, { NULL } }; @@ -235,13 +258,159 @@ static int libsrt_network_wait_fd_timeout(URLContext *h, int eid, int write, int } } -static int libsrt_listen(int eid, int fd, const struct sockaddr *addr, socklen_t addrlen, URLContext *h, int64_t timeout) +static int libsrt_accept(int fd, URLContext *h) { int ret; - int reuse = 1; /* Max streamid length plus an extra space for the terminating null character */ char streamid[513]; int streamid_len = sizeof(streamid); + + ret = srt_accept(fd, NULL, NULL); + if (ret < 0) + return libsrt_neterrno(h); + if (libsrt_socket_nonblock(ret, 1) < 0) + av_log(h, AV_LOG_DEBUG, "libsrt_socket_nonblock failed\n"); + if (!libsrt_getsockopt(h, ret, SRTO_STREAMID, "SRTO_STREAMID", streamid, &streamid_len)) + /* Note: returned streamid_len doesn't count the terminating null character */ + av_log(h, AV_LOG_VERBOSE, "accept streamid [%s], length %d\n", streamid, streamid_len); + + return ret; +} + +static int libsrt_write_common(URLContext *h, int fd, int eid, const uint8_t *buf, int size) +{ + int ret; + + if (!(h->flags & AVIO_FLAG_NONBLOCK)) { + ret = libsrt_network_wait_fd_timeout(h, eid, 1, h->rw_timeout, &h->interrupt_callback); + if (ret) + return ret; + } + + ret = srt_sendmsg(fd, buf, size, -1, 1); + if (ret < 0) { + ret = libsrt_neterrno(h); + } + + return ret; +} + +static void *libsrt_client_thread(void *_SRTClientContext) +{ + SRTClientContext *c = _SRTClientContext; + URLContext *h = c->h; + SRTContext *s = h->priv_data; + uint8_t *buf; + int ret; + + buf = av_malloc(c->payload_size); + if (buf == NULL) { + av_log(h, AV_LOG_ERROR, "%s\n", av_err2str(AVERROR(ENOMEM)); + return NULL; + } + while (!s->close_threads) { + sem_wait(&c->msg); + while (!s->close_threads && av_fifo_read(c->fifo, buf, c->payload_size) >= 0) { + do { + ret = libsrt_write_common(h, c->fd, c->eid, buf, c->payload_size); + } while(ret == AVERROR(EAGAIN) && !s->close_threads); + if (ret < 0 && ret != AVERROR(EAGAIN)) { + av_log(h, AV_LOG_INFO, "ending client thread with error ret %s\n", av_err2str(ret)); + goto end; + } + } + } +end: + av_freep(&buf); + c->ended = 1; + + return NULL; +} + +static int libsrt_launch_client_thread(URLContext *h, SRTClientContext *c, int fd, int eid) +{ + SRTContext *s = h->priv_data; + + c->set = 1; + c->ended = 0; + c->h = h; + c->fd = fd; + c->eid = eid; + c->payload_size = SRT_LIVE_DEFAULT_PAYLOAD_SIZE; + if (s->payload_size > 0) { + c->payload_size = s->payload_size; + } + c->fifo = av_fifo_alloc2(c->payload_size * 4096, 1, 0); + sem_init(&c->msg, 0, 0); + pthread_create(&c->thread, NULL, libsrt_client_thread, c); + + return 0; +} + +static int libsrt_close_client_thread(URLContext *h, SRTClientContext *c) +{ + pthread_join(c->thread, NULL); + sem_destroy(&c->msg); + av_fifo_freep2(&c->fifo); + srt_epoll_release(c->eid); + srt_close(c->fd); + c->ended = 0; + c->set = 0; + return 0; +} + +static void *libsrt_accept_thread(void *_URLContext) +{ + URLContext *h = _URLContext; + SRTContext *s = h->priv_data; + int i; + int ret; + int client_fd; + + while (!s->close_threads) { + pthread_mutex_lock(&s->accept_mutex); + for (i = 0; i < s->max_clients; i++) { + if (s->client_context[i].set && s->client_context[i].ended) { + av_log(h, AV_LOG_DEBUG, "closing client thread idx %d\n", i); + libsrt_close_client_thread(h, &s->client_context[i]); + } + } + pthread_mutex_unlock(&s->accept_mutex); + + ret = libsrt_network_wait_fd_timeout(h, s->listen_eid, 0, 250000, &h->interrupt_callback); + if (ret < 0) + continue; + + client_fd = ret = libsrt_accept(s->listen_fd, h); + if (ret < 0) + continue; + av_log(h, AV_LOG_DEBUG, "new client connection\n"); + pthread_mutex_lock(&s->accept_mutex); + for (i = 0; i < s->max_clients; i++) { + if (!s->client_context[i].set) { + s->fd[i] = client_fd; + s->eid[i] = libsrt_epoll_create(h, client_fd, 1); + av_log(h, AV_LOG_DEBUG, "launching client thread idx %d\n", i); + libsrt_launch_client_thread(h, &s->client_context[i], s->fd[i], s->eid[i]); + break; + } + } + if (i == s->max_clients) { + av_log(h, AV_LOG_DEBUG, "no more clients available, max_clients = %d\n", s->max_clients); + srt_close(client_fd); + } + pthread_mutex_unlock(&s->accept_mutex); + } + av_log(h, AV_LOG_DEBUG, "exiting accept thread\n"); + + return NULL; +} + +static int libsrt_listen(int eid, int fd, const struct sockaddr *addr, socklen_t addrlen, URLContext *h, int64_t timeout) +{ + int ret; + int reuse = 1; + if (srt_setsockopt(fd, SOL_SOCKET, SRTO_REUSEADDR, &reuse, sizeof(reuse))) { av_log(h, AV_LOG_WARNING, "setsockopt(SRTO_REUSEADDR) failed\n"); } @@ -255,14 +424,7 @@ static int libsrt_listen(int eid, int fd, const struct sockaddr *addr, socklen_t if (ret < 0) return ret; - ret = srt_accept(fd, NULL, NULL); - if (ret < 0) - return libsrt_neterrno(h); - if (libsrt_socket_nonblock(ret, 1) < 0) - av_log(h, AV_LOG_DEBUG, "libsrt_socket_nonblock failed\n"); - if (!libsrt_getsockopt(h, ret, SRTO_STREAMID, "SRTO_STREAMID", streamid, &streamid_len)) - /* Note: returned streamid_len doesn't count the terminating null character */ - av_log(h, AV_LOG_VERBOSE, "accept streamid [%s], length %d\n", streamid, streamid_len); + ret = libsrt_accept(fd, h); return ret; } @@ -462,10 +624,15 @@ static int libsrt_setup(URLContext *h, const char *uri, int flags) goto fail1; // multi-client ret = libsrt_listen(read_eid, fd, cur_ai->ai_addr, cur_ai->ai_addrlen, h, s->listen_timeout); - srt_epoll_release(read_eid); - if (ret < 0) - goto fail1; - srt_close(fd); + if (!(flags & AVIO_FLAG_WRITE) || s->max_clients == 1) { + srt_epoll_release(read_eid); + if (ret < 0) + goto fail1; + srt_close(fd); + } else { + s->listen_eid = read_eid; + s->listen_fd = fd; + } fd = ret; } else { int write_eid = ret = libsrt_epoll_create(h, fd, 1); @@ -508,8 +675,15 @@ static int libsrt_setup(URLContext *h, const char *uri, int flags) goto fail1; h->is_streamed = 1; - s->fd = fd; - s->eid = eid; + s->fd[0] = fd; + s->eid[0] = eid; + + if (s->mode == SRT_MODE_LISTENER && (flags & AVIO_FLAG_WRITE) && s->max_clients > 1) { + av_log(h, AV_LOG_DEBUG, "launching client thread idx 0\n"); + libsrt_launch_client_thread(h, &s->client_context[0], fd, eid); + av_log(h, AV_LOG_DEBUG, "launching accept_thread\n"); + pthread_create(&s->accept_thread, NULL, libsrt_accept_thread, h); + } freeaddrinfo(ai); return 0; @@ -671,7 +845,26 @@ static int libsrt_open(URLContext *h, const char *uri, int flags) if (av_find_info_tag(buf, sizeof(buf), "linger", p)) { s->linger = strtol(buf, NULL, 10); } + if (av_find_info_tag(buf, sizeof(buf), "max_clients", p)) { + s->max_clients = strtoll(buf, NULL, 10); + if (s->max_clients < 1) { + ret = AVERROR(EINVAL); + goto err; + } + } + } + if (s->mode != SRT_MODE_LISTENER || !(flags & AVIO_FLAG_WRITE)) { + s->max_clients = 1; } + s->fd = av_calloc(s->max_clients, sizeof(*s->fd)); + s->eid = av_calloc(s->max_clients, sizeof(*s->eid)); + s->client_context = av_calloc(s->max_clients, sizeof(*s->client_context)); + if (s->fd == NULL || s->eid == NULL || s->client_context == NULL) { + ret = AVERROR(ENOMEM); + av_log(h, AV_LOG_ERROR, "%s\n", av_err2str(ret)); + goto err; + } + ret = libsrt_setup(h, uri, flags); if (ret < 0) goto err; @@ -688,12 +881,12 @@ static int libsrt_read(URLContext *h, uint8_t *buf, int size) int ret; if (!(h->flags & AVIO_FLAG_NONBLOCK)) { - ret = libsrt_network_wait_fd_timeout(h, s->eid, 0, h->rw_timeout, &h->interrupt_callback); + ret = libsrt_network_wait_fd_timeout(h, s->eid[0], 0, h->rw_timeout, &h->interrupt_callback); if (ret) return ret; } - ret = srt_recvmsg(s->fd, buf, size); + ret = srt_recvmsg(s->fd[0], buf, size); if (ret < 0) { ret = libsrt_neterrno(h); } @@ -704,17 +897,35 @@ static int libsrt_read(URLContext *h, uint8_t *buf, int size) static int libsrt_write(URLContext *h, const uint8_t *buf, int size) { SRTContext *s = h->priv_data; + SRTClientContext *c; + int i; int ret; - - if (!(h->flags & AVIO_FLAG_NONBLOCK)) { - ret = libsrt_network_wait_fd_timeout(h, s->eid, 1, h->rw_timeout, &h->interrupt_callback); - if (ret) - return ret; - } - - ret = srt_sendmsg(s->fd, buf, size, -1, 1); - if (ret < 0) { - ret = libsrt_neterrno(h); + int any_ok = 0; + int any = 0; + + if (s->mode == SRT_MODE_LISTENER && s->max_clients > 1) { + pthread_mutex_lock(&s->accept_mutex); + for (i = 0; i < s->max_clients; i++) { + c = &s->client_context[i]; + if (c->set && !c->ended) { + any = 1; + ret = av_fifo_write(c->fifo, buf, size); + if (ret >= 0) { + sem_post(&c->msg); + any_ok = 1; + } + } + } + pthread_mutex_unlock(&s->accept_mutex); + if (!any) { + ret = AVERROR(EIO); + } else if (!any_ok) { + ret = AVERROR(EAGAIN); + } else { + ret = size; + } + } else { + ret = libsrt_write_common(h, s->fd[0], s->eid[0], buf, size); } return ret; @@ -723,9 +934,31 @@ static int libsrt_write(URLContext *h, const uint8_t *buf, int size) static int libsrt_close(URLContext *h) { SRTContext *s = h->priv_data; + int i; - srt_epoll_release(s->eid); - srt_close(s->fd); + if (s->max_clients > 1) { + s->close_threads = 1; + for (i = 0; i < s->max_clients; i++) { + if (s->client_context[i].set) { + sem_post(&s->client_context[i].msg); + } + } + pthread_join(s->accept_thread, NULL); + srt_epoll_release(s->listen_eid); + srt_close(s->listen_fd); + for (i = 0; i < s->max_clients; i++) { + if (s->client_context[i].set) { + av_log(h, AV_LOG_DEBUG, "closing client thread idx %d\n", i); + libsrt_close_client_thread(h, &s->client_context[i]); + } + } + } else { + srt_epoll_release(s->eid[0]); + srt_close(s->fd[0]); + } + av_freep(&s->fd); + av_freep(&s->eid); + av_freep(&s->client_context); srt_cleanup();
From: Nicolas Jorge Dato <nicolas.dato@gmail.com> When in listener mode and writing, now libsrt supports multiple clients configured with the max_clients parameter. When max_clients=1 (default), it behaves as before. When max_clientes > 1, after accepting the first client it launches a thread to listen for more clients, and a launches a new thread for each client. --- libavformat/libsrt.c | 297 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 265 insertions(+), 32 deletions(-)