@@ -3400,6 +3400,8 @@ libsrt_protocol_deps="libsrt"
libsrt_protocol_select="network"
libssh_protocol_deps="libssh"
libtls_conflict="openssl gnutls mbedtls"
+libzmq_protocol_deps="libzmq"
+libzmq_protocol_select="network"
# filters
afftdn_filter_deps="avcodec"
@@ -1329,6 +1329,7 @@ performance on systems without hardware floating point support).
@item TCP @tab X
@item TLS @tab X
@item UDP @tab X
+@item ZMQ @tab E
@end multitable
@code{X} means that the protocol is supported.
@@ -1728,4 +1728,41 @@ Timeout in ms.
Create the Unix socket in listening mode.
@end table
+@section libzmq
+
+ZeroMQ asynchronous messaging library.
+
+This library supports unicast streaming to multiple clients without relying on
+an external server.
+
+The required syntax for streaming or connecting to a stream is:
+@example
+zmq:tcp://hostname:port
+@end example
+
+Example:
+Create a localhost stream on port 5555:
+@example
+ffmpeg -re -i input -f mpegts zmq:tcp://127.0.0.1:5555
+@end example
+
+Multiple clients may connect and view the stream using:
+@example
+ffplay zmq:tcp://127.0.0.1:5555
+@end example
+
+@table @option
+@item zmq_timeout
+Set timeout (milliseconds) in the socket message receive call.
+By default it is set to 0. For more details see the timeout option
+in @url{http://api.zeromq.org/3-2:zmq-poll}.
+@end table
+
+The order with which the streaming and connecting instances start does not
+matter. This is handled by the ZeroMQ library.
+
+ffmpeg must be compiled with the --enable-libzmq option to support
+this protocol. See the compilation guide @url{https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu}
+for an example on how this option may be set.
+
@c man end PROTOCOLS
@@ -631,6 +631,7 @@ OBJS-$(CONFIG_LIBRTMPTE_PROTOCOL) += librtmp.o
OBJS-$(CONFIG_LIBSMBCLIENT_PROTOCOL) += libsmbclient.o
OBJS-$(CONFIG_LIBSRT_PROTOCOL) += libsrt.o
OBJS-$(CONFIG_LIBSSH_PROTOCOL) += libssh.o
+OBJS-$(CONFIG_LIBZMQ_PROTOCOL) += libzmq.o
# libavdevice dependencies
OBJS-$(CONFIG_IEC61883_INDEV) += dv.o
new file mode 100644
@@ -0,0 +1,162 @@
+/*
+ * ZMQ URLProtocol
+ *
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <zmq.h>
+#include "url.h"
+#include "network.h"
+#include "libavutil/avstring.h"
+#include "libavutil/opt.h"
+
+typedef struct ZMQContext {
+ const AVClass *class;
+ void *context;
+ void *socket;
+ unsigned int timeout; /*blocking timeout during zmq poll in milliseconds */
+} ZMQContext;
+
+static const AVOption options[] = {
+ { "zmq_timeout", "Set timeout (milliseconds) in the socket message receive call. By default it is set to 0.", offsetof(ZMQContext, timeout), AV_OPT_TYPE_INT, { .i64 = 0 }, 0, INT_MAX, AV_OPT_FLAG_DECODING_PARAM },
+ { NULL }
+};
+
+static int ff_zmq_open(URLContext *h, const char *uri, int flags)
+{
+ int ret;
+ ZMQContext *s = h->priv_data;
+ s->context = zmq_ctx_new();
+ h->is_streamed = 1;
+
+ av_strstart(uri, "zmq:", &uri);
+
+ /*publish during write*/
+ if (h->flags & AVIO_FLAG_WRITE) {
+ s->socket = zmq_socket(s->context, ZMQ_PUB);
+ if (!s->socket) {
+ av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", zmq_strerror(errno));
+ zmq_ctx_destroy(s->context);
+ return AVERROR_EXTERNAL;
+ }
+
+ ret = zmq_bind(s->socket, uri);
+ if (ret < 0) {
+ av_log(h, AV_LOG_ERROR, "Error occured during zmq_bind(): %s\n", zmq_strerror(errno));
+ zmq_close(s->socket);
+ zmq_ctx_destroy(s->context);
+ return AVERROR_EXTERNAL;
+ }
+ }
+
+ /*subscribe for read*/
+ if (h->flags & AVIO_FLAG_READ) {
+ s->socket = zmq_socket(s->context, ZMQ_SUB);
+ if (!s->socket) {
+ av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", zmq_strerror(errno));
+ zmq_ctx_destroy(s->context);
+ return AVERROR_EXTERNAL;
+ }
+
+ zmq_setsockopt(s->socket, ZMQ_SUBSCRIBE, "", 0);
+ ret = zmq_connect(s->socket, uri);
+ if (ret == -1) {
+ av_log(h, AV_LOG_ERROR, "Error occured during zmq_connect(): %s\n", zmq_strerror(errno));
+ zmq_close(s->socket);
+ zmq_ctx_destroy(s->context);
+ return AVERROR_EXTERNAL;
+ }
+ }
+ return 0;
+}
+
+static int ff_zmq_write(URLContext *h, const unsigned char *buf, int size)
+{
+ int ret;
+ ZMQContext *s = h->priv_data;
+
+ ret = zmq_send(s->socket, buf, size, ZMQ_DONTWAIT);
+ if (ret >= 0)
+ return ret; /*number of sent bytes*/
+
+ /*errno = EAGAIN if messages cannot be pushed*/
+ if (ret == -1 && errno == EAGAIN) {
+ return AVERROR(EAGAIN);
+ } else
+ return AVERROR_EXTERNAL;
+}
+
+static int ff_zmq_read(URLContext *h, unsigned char *buf, int size)
+{
+ int ret;
+ ZMQContext *s = h->priv_data;
+ zmq_msg_t msg;
+ int msg_size;
+
+ zmq_pollitem_t items[] = {
+ { s->socket, 0, ZMQ_POLLIN, 0 }
+ };
+
+ //block for for s->timeout milliseconds
+ zmq_poll(items, 1, s->timeout);
+ if (items[0].revents & ZMQ_POLLIN) {
+ ret = zmq_msg_init(&msg);
+ if (ret == -1) {
+ av_log(h, AV_LOG_ERROR, "Error occured during zmq_msg_init(): %s\n", zmq_strerror(errno));
+ return AVERROR_EXTERNAL;
+ }
+ zmq_recvmsg(s->socket, &msg, 0);
+
+ /*truncate msg if it exceeds available space.*/
+ /* TODO: Add Separate thread for ZMQ read. Use AVFifoBufer similar to udp to avoid truncating message*/
+ msg_size = zmq_msg_size(&msg);
+ if (msg_size > size) {
+ msg_size = size;
+ av_log(h, AV_LOG_WARNING, "ZMQ message exceeds available space in the buffer. Message will be truncated\n");
+ }
+ memcpy(buf, zmq_msg_data(&msg), msg_size);
+ zmq_msg_close(&msg);
+ return msg_size;
+ } else
+ return AVERROR(EAGAIN);
+}
+
+static int ff_zmq_close(URLContext *h)
+{
+ ZMQContext *s = h->priv_data;
+ zmq_close(s->socket);
+ zmq_ctx_destroy(s->context);
+ return 0;
+}
+
+static const AVClass zmq_context_class = {
+ .class_name = "zmq",
+ .item_name = av_default_item_name,
+ .option = options,
+ .version = LIBAVUTIL_VERSION_INT,
+};
+
+const URLProtocol ff_libzmq_protocol = {
+ .name = "zmq",
+ .url_close = ff_zmq_close,
+ .url_open = ff_zmq_open,
+ .url_read = ff_zmq_read,
+ .url_write = ff_zmq_write,
+ .priv_data_size = sizeof(ZMQContext),
+ .priv_data_class = &zmq_context_class,
+ .flags = URL_PROTOCOL_FLAG_NETWORK,
+};
@@ -68,6 +68,7 @@ extern const URLProtocol ff_librtmpte_protocol;
extern const URLProtocol ff_libsrt_protocol;
extern const URLProtocol ff_libssh_protocol;
extern const URLProtocol ff_libsmbclient_protocol;
+extern const URLProtocol ff_libzmq_protocol;
#include "libavformat/protocol_list.c"
Hello, I've been looking for a way to stream to multiple clients without using a multicast destination address or an external server. I believe ZeroMQ, which is a lightweight asynchronous messaging library, is a possible option. It is already part of ffmpeg and can be used to forward messages to filters in a filtergraph. I've put together an initial patch which adds ZeroMQ as a protocol option. To run an example, you will need to compile with the --enable-libzmq option. One streaming instance can be started with: $ ./ffmpeg -i /dev/video0 -vcodec libx264 -tune zerolatency -f mpegts zmq:tcp://127.0.0.1:5555 Multiple clients can then connect with: $ ./ffplay zmq:tcp://127.0.0.1:5555 I would be happy to maintain the code. Thanks, Andriy From 366f705945f9b2c40158730ec18ac9259bca2695 Mon Sep 17 00:00:00 2001 From: Andriy Gelman <andriy.gelman@gmail.com> Date: Tue, 30 Jul 2019 14:39:32 -0400 Subject: [PATCH] libavformat: Add ZeroMQ as a protocol option Currently multiple clients are only supported by using a multicast destination address. An alternative is to stream to a server which re-distributes the content. This commit adds ZeroMQ as a protocol option, which allows multiple clients to connect to a single ffmpeg instance. --- configure | 2 + doc/general.texi | 1 + doc/protocols.texi | 37 +++++++++ libavformat/Makefile | 1 + libavformat/libzmq.c | 162 ++++++++++++++++++++++++++++++++++++++++ libavformat/protocols.c | 1 + 6 files changed, 204 insertions(+) create mode 100644 libavformat/libzmq.c