From patchwork Tue Jul 30 20:44:01 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Andriy Gelman X-Patchwork-Id: 14156 Return-Path: X-Original-To: patchwork@ffaux-bg.ffmpeg.org Delivered-To: patchwork@ffaux-bg.ffmpeg.org Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org [79.124.17.100]) by ffaux.localdomain (Postfix) with ESMTP id 4C5E44489E9 for ; Tue, 30 Jul 2019 23:44:16 +0300 (EEST) Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 2955B689831; Tue, 30 Jul 2019 23:44:16 +0300 (EEST) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from mail-vk1-f180.google.com (mail-vk1-f180.google.com [209.85.221.180]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id 9D290680BED for ; Tue, 30 Jul 2019 23:44:09 +0300 (EEST) Received: by mail-vk1-f180.google.com with SMTP id b200so13114736vkf.10 for ; Tue, 30 Jul 2019 13:44:09 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=date:from:to:subject:message-id:mime-version:content-disposition :user-agent; bh=HaSUMLPUBiWXKvTzzQXrLyrxDSPgMMrNw6VhXtQr1AI=; b=ucn81HxvIiiuIwKgJCqfTnS+SUqrADbK43DNrFXF0aHmDgFfNcmTjQzx+jByekocAj dvsKwCGCEUMb4m/7UhbbHMbvK9+rjd5GrCZ9qgrBEqET1VWqefyIeBSvPDnhpOYeYsRT qPNTTtc6iGKHMp71i2Io7KG+MhDIqfnE2wzurce2jkxuJowWcsuEPAcrSSHnUcQk3Ubh moGiOKSYMjTIk/QX+QkIQsblUHLdBIfpfNxoi22pkVA228Mvo/Oiv4XdhiEqlFnmfvYF mYHm3f324BLGpPj2V7MYCreN3wLMyMSUfpuPf89LESdS2/JiQiB2hT2mFleHS/Zdj5/S 5ebQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:date:from:to:subject:message-id:mime-version :content-disposition:user-agent; bh=HaSUMLPUBiWXKvTzzQXrLyrxDSPgMMrNw6VhXtQr1AI=; b=T0CkTbSI2BeLjbDa5xvCXLxqskwcRe7XDEI2Uf7omXyvGBtMzqY6m7Fq5C+Upzqwo0 qAoZyAkQTLWyZC4tmVM+CKSs1Uqry9KEq3Q2d0arXTZ+SaY69x1U8Xqr9/6c6abBR507 3rH1aYzzBnWCEJq6nZ9vS3q9AmZVKowCuhaeHvbfQhrtTNnG/iocgBIxFIxgcW+m/0Aq rR5G8Pmwulv5LNm0Iz2buHVpQ0NoSygP+VKGuDHjEVGhf79x7k7MGFxb2H+4vZrUeSxx EBuFTsiWQvtAKIbSUQnQWYlmPJ9zY9x0YAdF5URibIncERNtadMBO+A6J6Yz1v+IiMXb +/sg== X-Gm-Message-State: APjAAAU8MxM7yjgkxuEbhW3mELamwZE6LIQodDCQKuAo8xs3s+f/hSKe IZ9w9kt9UBrHn4HTXo18ycZcw6+u X-Google-Smtp-Source: APXvYqxbO4dYLkyZyBGit4F/8Rw+jC7JSCujZo4UqQO89wyh34PP84vC06ufkHlqgkyo189VYAjseA== X-Received: by 2002:ac5:cc4d:: with SMTP id l13mr1701953vkm.72.1564519447817; Tue, 30 Jul 2019 13:44:07 -0700 (PDT) Received: from manj (ip-184-209-61-122.spfdma.spcsdns.net. [184.209.61.122]) by smtp.gmail.com with ESMTPSA id s137sm22085746vsc.22.2019.07.30.13.44.04 for (version=TLS1_3 cipher=AEAD-AES256-GCM-SHA384 bits=256/256); Tue, 30 Jul 2019 13:44:07 -0700 (PDT) Date: Tue, 30 Jul 2019 16:44:01 -0400 From: Andriy Gelman To: ffmpeg-devel@ffmpeg.org Message-ID: <20190730204401.kmixoh36gwhamr7k@manj> MIME-Version: 1.0 Content-Disposition: inline User-Agent: NeoMutt/20180716-1344-11488f-dirty Subject: [FFmpeg-devel] [PATCH] libavformat: Add ZeroMQ as a protocol option X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.20 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" Hello, I've been looking for a way to stream to multiple clients without using a multicast destination address or an external server. I believe ZeroMQ, which is a lightweight asynchronous messaging library, is a possible option. It is already part of ffmpeg and can be used to forward messages to filters in a filtergraph. I've put together an initial patch which adds ZeroMQ as a protocol option. To run an example, you will need to compile with the --enable-libzmq option. One streaming instance can be started with: $ ./ffmpeg -i /dev/video0 -vcodec libx264 -tune zerolatency -f mpegts zmq:tcp://127.0.0.1:5555 Multiple clients can then connect with: $ ./ffplay zmq:tcp://127.0.0.1:5555 I would be happy to maintain the code. Thanks, Andriy From 366f705945f9b2c40158730ec18ac9259bca2695 Mon Sep 17 00:00:00 2001 From: Andriy Gelman Date: Tue, 30 Jul 2019 14:39:32 -0400 Subject: [PATCH] libavformat: Add ZeroMQ as a protocol option Currently multiple clients are only supported by using a multicast destination address. An alternative is to stream to a server which re-distributes the content. This commit adds ZeroMQ as a protocol option, which allows multiple clients to connect to a single ffmpeg instance. --- configure | 2 + doc/general.texi | 1 + doc/protocols.texi | 37 +++++++++ libavformat/Makefile | 1 + libavformat/libzmq.c | 162 ++++++++++++++++++++++++++++++++++++++++ libavformat/protocols.c | 1 + 6 files changed, 204 insertions(+) create mode 100644 libavformat/libzmq.c diff --git a/configure b/configure index 5a4f507246..4515341b06 100755 --- a/configure +++ b/configure @@ -3400,6 +3400,8 @@ libsrt_protocol_deps="libsrt" libsrt_protocol_select="network" libssh_protocol_deps="libssh" libtls_conflict="openssl gnutls mbedtls" +libzmq_protocol_deps="libzmq" +libzmq_protocol_select="network" # filters afftdn_filter_deps="avcodec" diff --git a/doc/general.texi b/doc/general.texi index 3c0c803449..b8e063268c 100644 --- a/doc/general.texi +++ b/doc/general.texi @@ -1329,6 +1329,7 @@ performance on systems without hardware floating point support). @item TCP @tab X @item TLS @tab X @item UDP @tab X +@item ZMQ @tab E @end multitable @code{X} means that the protocol is supported. diff --git a/doc/protocols.texi b/doc/protocols.texi index 3e4e7af3d4..04d0fa8101 100644 --- a/doc/protocols.texi +++ b/doc/protocols.texi @@ -1728,4 +1728,41 @@ Timeout in ms. Create the Unix socket in listening mode. @end table +@section libzmq + +ZeroMQ asynchronous messaging library. + +This library supports unicast streaming to multiple clients without relying on +an external server. + +The required syntax for streaming or connecting to a stream is: +@example +zmq:tcp://hostname:port +@end example + +Example: +Create a localhost stream on port 5555: +@example +ffmpeg -re -i input -f mpegts zmq:tcp://127.0.0.1:5555 +@end example + +Multiple clients may connect and view the stream using: +@example +ffplay zmq:tcp://127.0.0.1:5555 +@end example + +@table @option +@item zmq_timeout +Set timeout (milliseconds) in the socket message receive call. +By default it is set to 0. For more details see the timeout option +in @url{http://api.zeromq.org/3-2:zmq-poll}. +@end table + +The order with which the streaming and connecting instances start does not +matter. This is handled by the ZeroMQ library. + +ffmpeg must be compiled with the --enable-libzmq option to support +this protocol. See the compilation guide @url{https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu} +for an example on how this option may be set. + @c man end PROTOCOLS diff --git a/libavformat/Makefile b/libavformat/Makefile index a434b005a4..4a535429b7 100644 --- a/libavformat/Makefile +++ b/libavformat/Makefile @@ -631,6 +631,7 @@ OBJS-$(CONFIG_LIBRTMPTE_PROTOCOL) += librtmp.o OBJS-$(CONFIG_LIBSMBCLIENT_PROTOCOL) += libsmbclient.o OBJS-$(CONFIG_LIBSRT_PROTOCOL) += libsrt.o OBJS-$(CONFIG_LIBSSH_PROTOCOL) += libssh.o +OBJS-$(CONFIG_LIBZMQ_PROTOCOL) += libzmq.o # libavdevice dependencies OBJS-$(CONFIG_IEC61883_INDEV) += dv.o diff --git a/libavformat/libzmq.c b/libavformat/libzmq.c new file mode 100644 index 0000000000..a7ef7fe77c --- /dev/null +++ b/libavformat/libzmq.c @@ -0,0 +1,162 @@ +/* + * ZMQ URLProtocol + * + * This file is part of FFmpeg. + * + * FFmpeg is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * FFmpeg is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with FFmpeg; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include "url.h" +#include "network.h" +#include "libavutil/avstring.h" +#include "libavutil/opt.h" + +typedef struct ZMQContext { + const AVClass *class; + void *context; + void *socket; + unsigned int timeout; /*blocking timeout during zmq poll in milliseconds */ +} ZMQContext; + +static const AVOption options[] = { + { "zmq_timeout", "Set timeout (milliseconds) in the socket message receive call. By default it is set to 0.", offsetof(ZMQContext, timeout), AV_OPT_TYPE_INT, { .i64 = 0 }, 0, INT_MAX, AV_OPT_FLAG_DECODING_PARAM }, + { NULL } +}; + +static int ff_zmq_open(URLContext *h, const char *uri, int flags) +{ + int ret; + ZMQContext *s = h->priv_data; + s->context = zmq_ctx_new(); + h->is_streamed = 1; + + av_strstart(uri, "zmq:", &uri); + + /*publish during write*/ + if (h->flags & AVIO_FLAG_WRITE) { + s->socket = zmq_socket(s->context, ZMQ_PUB); + if (!s->socket) { + av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", zmq_strerror(errno)); + zmq_ctx_destroy(s->context); + return AVERROR_EXTERNAL; + } + + ret = zmq_bind(s->socket, uri); + if (ret < 0) { + av_log(h, AV_LOG_ERROR, "Error occured during zmq_bind(): %s\n", zmq_strerror(errno)); + zmq_close(s->socket); + zmq_ctx_destroy(s->context); + return AVERROR_EXTERNAL; + } + } + + /*subscribe for read*/ + if (h->flags & AVIO_FLAG_READ) { + s->socket = zmq_socket(s->context, ZMQ_SUB); + if (!s->socket) { + av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", zmq_strerror(errno)); + zmq_ctx_destroy(s->context); + return AVERROR_EXTERNAL; + } + + zmq_setsockopt(s->socket, ZMQ_SUBSCRIBE, "", 0); + ret = zmq_connect(s->socket, uri); + if (ret == -1) { + av_log(h, AV_LOG_ERROR, "Error occured during zmq_connect(): %s\n", zmq_strerror(errno)); + zmq_close(s->socket); + zmq_ctx_destroy(s->context); + return AVERROR_EXTERNAL; + } + } + return 0; +} + +static int ff_zmq_write(URLContext *h, const unsigned char *buf, int size) +{ + int ret; + ZMQContext *s = h->priv_data; + + ret = zmq_send(s->socket, buf, size, ZMQ_DONTWAIT); + if (ret >= 0) + return ret; /*number of sent bytes*/ + + /*errno = EAGAIN if messages cannot be pushed*/ + if (ret == -1 && errno == EAGAIN) { + return AVERROR(EAGAIN); + } else + return AVERROR_EXTERNAL; +} + +static int ff_zmq_read(URLContext *h, unsigned char *buf, int size) +{ + int ret; + ZMQContext *s = h->priv_data; + zmq_msg_t msg; + int msg_size; + + zmq_pollitem_t items[] = { + { s->socket, 0, ZMQ_POLLIN, 0 } + }; + + //block for for s->timeout milliseconds + zmq_poll(items, 1, s->timeout); + if (items[0].revents & ZMQ_POLLIN) { + ret = zmq_msg_init(&msg); + if (ret == -1) { + av_log(h, AV_LOG_ERROR, "Error occured during zmq_msg_init(): %s\n", zmq_strerror(errno)); + return AVERROR_EXTERNAL; + } + zmq_recvmsg(s->socket, &msg, 0); + + /*truncate msg if it exceeds available space.*/ + /* TODO: Add Separate thread for ZMQ read. Use AVFifoBufer similar to udp to avoid truncating message*/ + msg_size = zmq_msg_size(&msg); + if (msg_size > size) { + msg_size = size; + av_log(h, AV_LOG_WARNING, "ZMQ message exceeds available space in the buffer. Message will be truncated\n"); + } + memcpy(buf, zmq_msg_data(&msg), msg_size); + zmq_msg_close(&msg); + return msg_size; + } else + return AVERROR(EAGAIN); +} + +static int ff_zmq_close(URLContext *h) +{ + ZMQContext *s = h->priv_data; + zmq_close(s->socket); + zmq_ctx_destroy(s->context); + return 0; +} + +static const AVClass zmq_context_class = { + .class_name = "zmq", + .item_name = av_default_item_name, + .option = options, + .version = LIBAVUTIL_VERSION_INT, +}; + +const URLProtocol ff_libzmq_protocol = { + .name = "zmq", + .url_close = ff_zmq_close, + .url_open = ff_zmq_open, + .url_read = ff_zmq_read, + .url_write = ff_zmq_write, + .priv_data_size = sizeof(ZMQContext), + .priv_data_class = &zmq_context_class, + .flags = URL_PROTOCOL_FLAG_NETWORK, +}; diff --git a/libavformat/protocols.c b/libavformat/protocols.c index ad95659795..face5b29b5 100644 --- a/libavformat/protocols.c +++ b/libavformat/protocols.c @@ -68,6 +68,7 @@ extern const URLProtocol ff_librtmpte_protocol; extern const URLProtocol ff_libsrt_protocol; extern const URLProtocol ff_libssh_protocol; extern const URLProtocol ff_libsmbclient_protocol; +extern const URLProtocol ff_libzmq_protocol; #include "libavformat/protocol_list.c"