diff mbox

[FFmpeg-devel,v2] libavformat: Add ZeroMQ as a protocol option

Message ID 20190804183607.p3pau5ydgkonfvzl@manj
State New
Headers show

Commit Message

Andriy Gelman Aug. 4, 2019, 6:36 p.m. UTC
Changes in v2:
  1. Replaced zmq_poll with zmq_msg_recv. 
  2. Remove user timeout option as zmq_msg_recv(.., .., ZMQ_DONTWAIT) is a
  non-blocking call.
  3. Updated docs.

Andriy
From 53e6e00d30c9fbf5127eea9d377686d37e981c0c Mon Sep 17 00:00:00 2001
From: Andriy Gelman <andriy.gelman@gmail.com>
Date: Tue, 30 Jul 2019 14:39:32 -0400
Subject: [PATCH] libavformat: Add ZeroMQ as a protocol option

Currently multiple clients are only supported by using a
multicast destination address. An alternative is to stream to a server
which re-distributes the content.

This commit adds ZeroMQ as a protocol option, which allows
multiple clients to connect to a single ffmpeg instance.
---
 configure               |   2 +
 doc/general.texi        |   1 +
 doc/protocols.texi      |  32 ++++++++
 libavformat/Makefile    |   1 +
 libavformat/libzmq.c    | 159 ++++++++++++++++++++++++++++++++++++++++
 libavformat/protocols.c |   1 +
 6 files changed, 196 insertions(+)
 create mode 100644 libavformat/libzmq.c

Comments

Andriy Gelman Aug. 7, 2019, 2:15 a.m. UTC | #1
On Sun, 04. Aug 14:36, Andriy Gelman wrote:
> Changes in v2:
>   1. Replaced zmq_poll with zmq_msg_recv. 
>   2. Remove user timeout option as zmq_msg_recv(.., .., ZMQ_DONTWAIT) is a
>   non-blocking call.
>   3. Updated docs.
> 
> Andriy
>   
> From 53e6e00d30c9fbf5127eea9d377686d37e981c0c Mon Sep 17 00:00:00 2001
> From: Andriy Gelman <andriy.gelman@gmail.com>
> Date: Tue, 30 Jul 2019 14:39:32 -0400
> Subject: [PATCH] libavformat: Add ZeroMQ as a protocol option
> 
> Currently multiple clients are only supported by using a
> multicast destination address. An alternative is to stream to a server
> which re-distributes the content.
> 
> This commit adds ZeroMQ as a protocol option, which allows
> multiple clients to connect to a single ffmpeg instance.
> ---
>  configure               |   2 +
>  doc/general.texi        |   1 +
>  doc/protocols.texi      |  32 ++++++++
>  libavformat/Makefile    |   1 +
>  libavformat/libzmq.c    | 159 ++++++++++++++++++++++++++++++++++++++++
>  libavformat/protocols.c |   1 +
>  6 files changed, 196 insertions(+)
>  create mode 100644 libavformat/libzmq.c
> 
> diff --git a/configure b/configure
> index 5a4f507246..4515341b06 100755
> --- a/configure
> +++ b/configure
> @@ -3400,6 +3400,8 @@ libsrt_protocol_deps="libsrt"
>  libsrt_protocol_select="network"
>  libssh_protocol_deps="libssh"
>  libtls_conflict="openssl gnutls mbedtls"
> +libzmq_protocol_deps="libzmq"
> +libzmq_protocol_select="network"
>  
>  # filters
>  afftdn_filter_deps="avcodec"
> diff --git a/doc/general.texi b/doc/general.texi
> index 3c0c803449..b8e063268c 100644
> --- a/doc/general.texi
> +++ b/doc/general.texi
> @@ -1329,6 +1329,7 @@ performance on systems without hardware floating point support).
>  @item TCP          @tab X
>  @item TLS          @tab X
>  @item UDP          @tab X
> +@item ZMQ          @tab E
>  @end multitable
>  
>  @code{X} means that the protocol is supported.
> diff --git a/doc/protocols.texi b/doc/protocols.texi
> index 3e4e7af3d4..174eaacd0f 100644
> --- a/doc/protocols.texi
> +++ b/doc/protocols.texi
> @@ -1728,4 +1728,36 @@ Timeout in ms.
>  Create the Unix socket in listening mode.
>  @end table
>  
> +@section libzmq
> +
> +ZeroMQ asynchronous messaging library.
> +
> +This library supports unicast streaming to multiple clients without relying on
> +an external server.
> +
> +The required syntax for streaming or connecting to a stream is:
> +@example
> +zmq:tcp://ip-address:port
> +@end example
> +
> +Example:
> +Create a localhost stream on port 5555:
> +@example
> +ffmpeg -re -i input -f mpegts zmq:tcp://127.0.0.1:5555
> +@end example
> +
> +Multiple clients may connect to the stream using:
> +@example
> +ffplay zmq:tcp://127.0.0.1:5555
> +@end example
> +
> +Streaming to multiple clients is implemented using a ZMQ Pub-Sub pattern.
> +The server binds to a port and publishes data. Clients connect to the
> +server (IP address/port) and subscribe to the stream. The order in which
> +the server and client start generally does not matter.
> +
> +ffmpeg must be compiled with the --enable-libzmq option to support
> +this protocol option. See the compilation guide @url{https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu}
> +for an example on how this option may be set.
> +
>  @c man end PROTOCOLS
> diff --git a/libavformat/Makefile b/libavformat/Makefile
> index a434b005a4..4a535429b7 100644
> --- a/libavformat/Makefile
> +++ b/libavformat/Makefile
> @@ -631,6 +631,7 @@ OBJS-$(CONFIG_LIBRTMPTE_PROTOCOL)        += librtmp.o
>  OBJS-$(CONFIG_LIBSMBCLIENT_PROTOCOL)     += libsmbclient.o
>  OBJS-$(CONFIG_LIBSRT_PROTOCOL)           += libsrt.o
>  OBJS-$(CONFIG_LIBSSH_PROTOCOL)           += libssh.o
> +OBJS-$(CONFIG_LIBZMQ_PROTOCOL) 			 		 += libzmq.o
>  
>  # libavdevice dependencies
>  OBJS-$(CONFIG_IEC61883_INDEV)            += dv.o
> diff --git a/libavformat/libzmq.c b/libavformat/libzmq.c
> new file mode 100644
> index 0000000000..24eebb1fee
> --- /dev/null
> +++ b/libavformat/libzmq.c
> @@ -0,0 +1,159 @@
> +/*
> + * ZMQ URLProtocol
> + *
> + * This file is part of FFmpeg.
> + *
> + * FFmpeg is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * FFmpeg is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with FFmpeg; if not, write to the Free Software
> + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> + */
> +
> +#include <zmq.h>
> +#include "url.h"
> +#include "network.h"
> +#include "libavutil/avstring.h"
> +#include "libavutil/opt.h"
> +
> +typedef struct ZMQContext {
> +    const AVClass *class;
> +    void *context;
> +    void *socket;
> +    unsigned int timeout; /*blocking timeout during zmq poll in milliseconds */
> +} ZMQContext;
> +
> +static const AVOption options[] = {
> +    { NULL }
> +};
> +
> +static int ff_zmq_open(URLContext *h, const char *uri, int flags)
> +{
> +    int ret;
> +    ZMQContext *s   = h->priv_data;
> +    s->context      = zmq_ctx_new();
> +    h->is_streamed  = 1;
> +
> +    av_strstart(uri, "zmq:", &uri);
> +
> +    /*publish during write*/
> +    if (h->flags & AVIO_FLAG_WRITE) {
> +        s->socket = zmq_socket(s->context, ZMQ_PUB);
> +        if (!s->socket) {
> +            av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", zmq_strerror(errno));
> +            zmq_ctx_destroy(s->context);
> +            return AVERROR_EXTERNAL;
> +        }
> +
> +        ret = zmq_bind(s->socket, uri);
> +        if (ret < 0) {
> +            av_log(h, AV_LOG_ERROR, "Error occured during zmq_bind(): %s\n", zmq_strerror(errno));
> +            zmq_close(s->socket);
> +            zmq_ctx_destroy(s->context);
> +            return AVERROR_EXTERNAL;
> +        }
> +    }
> +
> +    /*subscribe for read*/
> +    if (h->flags & AVIO_FLAG_READ) {
> +        s->socket = zmq_socket(s->context, ZMQ_SUB);
> +        if (!s->socket) {
> +            av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", zmq_strerror(errno));
> +            zmq_ctx_destroy(s->context);
> +            return AVERROR_EXTERNAL;
> +        }
> +
> +        zmq_setsockopt(s->socket, ZMQ_SUBSCRIBE, "", 0);
> +        ret = zmq_connect(s->socket, uri);
> +        if (ret == -1) {
> +            av_log(h, AV_LOG_ERROR, "Error occured during zmq_connect(): %s\n", zmq_strerror(errno));
> +            zmq_close(s->socket);
> +            zmq_ctx_destroy(s->context);
> +            return AVERROR_EXTERNAL;
> +        }
> +    }
> +    return 0;
> +}
> +
> +static int ff_zmq_write(URLContext *h, const unsigned char *buf, int size)
> +{
> +    int ret;
> +    ZMQContext *s = h->priv_data;
> +
> +    ret = zmq_send(s->socket, buf, size, ZMQ_DONTWAIT);
> +    if (ret >= 0)
> +        return ret; /*number of sent bytes*/
> +
> +    /*errno = EAGAIN if messages cannot be pushed*/
> +    if (ret == -1 && errno == EAGAIN) {
> +        return AVERROR(EAGAIN);
> +    } else
> +        return AVERROR_EXTERNAL;
> +}
> +
> +static int ff_zmq_read(URLContext *h, unsigned char *buf, int size)
> +{
> +    int ret;
> +    ZMQContext *s = h->priv_data;
> +    zmq_msg_t msg;
> +    int msg_size;
> +
> +    ret = zmq_msg_init(&msg);
> +    if (ret == -1) {
> +      av_log(h, AV_LOG_ERROR, "Error occured during zmq_msg_init(): %s\n", zmq_strerror(errno));
> +      return AVERROR_EXTERNAL;
> +    }
> +
> +    ret = zmq_msg_recv(&msg, s->socket, ZMQ_DONTWAIT);
> +    if (ret == -1) {
> +        ret = (errno == EAGAIN) ? AVERROR(EAGAIN) : AVERROR_EXTERNAL;
> +        if (ret == AVERROR_EXTERNAL)
> +          av_log(h, AV_LOG_ERROR, "Error occured during zmq_msg_recv(): %s\n", zmq_strerror(errno));
> +        goto finish;
> +    }
> +
> +    msg_size = zmq_msg_size(&msg);
> +    if (msg_size > size) {
> +        msg_size = size;
> +        av_log(h, AV_LOG_WARNING, "ZMQ message exceeds available space in the buffer. Message will be truncated\n");
> +    }
> +    memcpy(buf, zmq_msg_data(&msg), msg_size);
> +
> +finish:
> +    zmq_msg_close(&msg);
> +    return ret;
> +}
> +
> +static int ff_zmq_close(URLContext *h)
> +{
> +    ZMQContext *s = h->priv_data;
> +    zmq_close(s->socket);
> +    zmq_ctx_destroy(s->context);
> +    return 0;
> +}
> +
> +static const AVClass zmq_context_class = {
> +    .class_name = "zmq",
> +    .item_name  = av_default_item_name,
> +    .option     = options,
> +    .version    = LIBAVUTIL_VERSION_INT,
> +};
> +
> +const URLProtocol ff_libzmq_protocol = {
> +    .name            = "zmq",
> +    .url_close       = ff_zmq_close,
> +    .url_open        = ff_zmq_open,
> +    .url_read        = ff_zmq_read,
> +    .url_write       = ff_zmq_write,
> +    .priv_data_size  = sizeof(ZMQContext),
> +    .priv_data_class = &zmq_context_class,
> +    .flags           = URL_PROTOCOL_FLAG_NETWORK,
> +};
> diff --git a/libavformat/protocols.c b/libavformat/protocols.c
> index ad95659795..face5b29b5 100644
> --- a/libavformat/protocols.c
> +++ b/libavformat/protocols.c
> @@ -68,6 +68,7 @@ extern const URLProtocol ff_librtmpte_protocol;
>  extern const URLProtocol ff_libsrt_protocol;
>  extern const URLProtocol ff_libssh_protocol;
>  extern const URLProtocol ff_libsmbclient_protocol;
> +extern const URLProtocol ff_libzmq_protocol;
>  
>  #include "libavformat/protocol_list.c"
>  
> -- 
> 2.22.0

ping
Jack Waller Aug. 7, 2019, 2:44 a.m. UTC | #2
On Wed, Aug 7, 2019 at 10:22 AM Andriy Gelman <andriy.gelman@gmail.com>
wrote:

> On Sun, 04. Aug 14:36, Andriy Gelman wrote:
> > Changes in v2:
> >   1. Replaced zmq_poll with zmq_msg_recv.
> >   2. Remove user timeout option as zmq_msg_recv(.., .., ZMQ_DONTWAIT) is
> a
> >   non-blocking call.
> >   3. Updated docs.
> >
> > Andriy
>
> > diff --git a/libavformat/Makefile b/libavformat/Makefile
> > index a434b005a4..4a535429b7 100644
> > --- a/libavformat/Makefile
> > +++ b/libavformat/Makefile
> > @@ -631,6 +631,7 @@ OBJS-$(CONFIG_LIBRTMPTE_PROTOCOL)        += librtmp.o
> >  OBJS-$(CONFIG_LIBSMBCLIENT_PROTOCOL)     += libsmbclient.o
> >  OBJS-$(CONFIG_LIBSRT_PROTOCOL)           += libsrt.o
> >  OBJS-$(CONFIG_LIBSSH_PROTOCOL)           += libssh.o
> > +OBJS-$(CONFIG_LIBZMQ_PROTOCOL)
> += libzmq.o
> >
>

There is a TAB here,
Please remove it


> >  # libavdevice dependencies
> >  OBJS-$(CONFIG_IEC61883_INDEV)            += dv.o
>
>

>
diff mbox

Patch

diff --git a/configure b/configure
index 5a4f507246..4515341b06 100755
--- a/configure
+++ b/configure
@@ -3400,6 +3400,8 @@  libsrt_protocol_deps="libsrt"
 libsrt_protocol_select="network"
 libssh_protocol_deps="libssh"
 libtls_conflict="openssl gnutls mbedtls"
+libzmq_protocol_deps="libzmq"
+libzmq_protocol_select="network"
 
 # filters
 afftdn_filter_deps="avcodec"
diff --git a/doc/general.texi b/doc/general.texi
index 3c0c803449..b8e063268c 100644
--- a/doc/general.texi
+++ b/doc/general.texi
@@ -1329,6 +1329,7 @@  performance on systems without hardware floating point support).
 @item TCP          @tab X
 @item TLS          @tab X
 @item UDP          @tab X
+@item ZMQ          @tab E
 @end multitable
 
 @code{X} means that the protocol is supported.
diff --git a/doc/protocols.texi b/doc/protocols.texi
index 3e4e7af3d4..174eaacd0f 100644
--- a/doc/protocols.texi
+++ b/doc/protocols.texi
@@ -1728,4 +1728,36 @@  Timeout in ms.
 Create the Unix socket in listening mode.
 @end table
 
+@section libzmq
+
+ZeroMQ asynchronous messaging library.
+
+This library supports unicast streaming to multiple clients without relying on
+an external server.
+
+The required syntax for streaming or connecting to a stream is:
+@example
+zmq:tcp://ip-address:port
+@end example
+
+Example:
+Create a localhost stream on port 5555:
+@example
+ffmpeg -re -i input -f mpegts zmq:tcp://127.0.0.1:5555
+@end example
+
+Multiple clients may connect to the stream using:
+@example
+ffplay zmq:tcp://127.0.0.1:5555
+@end example
+
+Streaming to multiple clients is implemented using a ZMQ Pub-Sub pattern.
+The server binds to a port and publishes data. Clients connect to the
+server (IP address/port) and subscribe to the stream. The order in which
+the server and client start generally does not matter.
+
+ffmpeg must be compiled with the --enable-libzmq option to support
+this protocol option. See the compilation guide @url{https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu}
+for an example on how this option may be set.
+
 @c man end PROTOCOLS
diff --git a/libavformat/Makefile b/libavformat/Makefile
index a434b005a4..4a535429b7 100644
--- a/libavformat/Makefile
+++ b/libavformat/Makefile
@@ -631,6 +631,7 @@  OBJS-$(CONFIG_LIBRTMPTE_PROTOCOL)        += librtmp.o
 OBJS-$(CONFIG_LIBSMBCLIENT_PROTOCOL)     += libsmbclient.o
 OBJS-$(CONFIG_LIBSRT_PROTOCOL)           += libsrt.o
 OBJS-$(CONFIG_LIBSSH_PROTOCOL)           += libssh.o
+OBJS-$(CONFIG_LIBZMQ_PROTOCOL) 			 		 += libzmq.o
 
 # libavdevice dependencies
 OBJS-$(CONFIG_IEC61883_INDEV)            += dv.o
diff --git a/libavformat/libzmq.c b/libavformat/libzmq.c
new file mode 100644
index 0000000000..24eebb1fee
--- /dev/null
+++ b/libavformat/libzmq.c
@@ -0,0 +1,159 @@ 
+/*
+ * ZMQ URLProtocol
+ *
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <zmq.h>
+#include "url.h"
+#include "network.h"
+#include "libavutil/avstring.h"
+#include "libavutil/opt.h"
+
+typedef struct ZMQContext {
+    const AVClass *class;
+    void *context;
+    void *socket;
+    unsigned int timeout; /*blocking timeout during zmq poll in milliseconds */
+} ZMQContext;
+
+static const AVOption options[] = {
+    { NULL }
+};
+
+static int ff_zmq_open(URLContext *h, const char *uri, int flags)
+{
+    int ret;
+    ZMQContext *s   = h->priv_data;
+    s->context      = zmq_ctx_new();
+    h->is_streamed  = 1;
+
+    av_strstart(uri, "zmq:", &uri);
+
+    /*publish during write*/
+    if (h->flags & AVIO_FLAG_WRITE) {
+        s->socket = zmq_socket(s->context, ZMQ_PUB);
+        if (!s->socket) {
+            av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", zmq_strerror(errno));
+            zmq_ctx_destroy(s->context);
+            return AVERROR_EXTERNAL;
+        }
+
+        ret = zmq_bind(s->socket, uri);
+        if (ret < 0) {
+            av_log(h, AV_LOG_ERROR, "Error occured during zmq_bind(): %s\n", zmq_strerror(errno));
+            zmq_close(s->socket);
+            zmq_ctx_destroy(s->context);
+            return AVERROR_EXTERNAL;
+        }
+    }
+
+    /*subscribe for read*/
+    if (h->flags & AVIO_FLAG_READ) {
+        s->socket = zmq_socket(s->context, ZMQ_SUB);
+        if (!s->socket) {
+            av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", zmq_strerror(errno));
+            zmq_ctx_destroy(s->context);
+            return AVERROR_EXTERNAL;
+        }
+
+        zmq_setsockopt(s->socket, ZMQ_SUBSCRIBE, "", 0);
+        ret = zmq_connect(s->socket, uri);
+        if (ret == -1) {
+            av_log(h, AV_LOG_ERROR, "Error occured during zmq_connect(): %s\n", zmq_strerror(errno));
+            zmq_close(s->socket);
+            zmq_ctx_destroy(s->context);
+            return AVERROR_EXTERNAL;
+        }
+    }
+    return 0;
+}
+
+static int ff_zmq_write(URLContext *h, const unsigned char *buf, int size)
+{
+    int ret;
+    ZMQContext *s = h->priv_data;
+
+    ret = zmq_send(s->socket, buf, size, ZMQ_DONTWAIT);
+    if (ret >= 0)
+        return ret; /*number of sent bytes*/
+
+    /*errno = EAGAIN if messages cannot be pushed*/
+    if (ret == -1 && errno == EAGAIN) {
+        return AVERROR(EAGAIN);
+    } else
+        return AVERROR_EXTERNAL;
+}
+
+static int ff_zmq_read(URLContext *h, unsigned char *buf, int size)
+{
+    int ret;
+    ZMQContext *s = h->priv_data;
+    zmq_msg_t msg;
+    int msg_size;
+
+    ret = zmq_msg_init(&msg);
+    if (ret == -1) {
+      av_log(h, AV_LOG_ERROR, "Error occured during zmq_msg_init(): %s\n", zmq_strerror(errno));
+      return AVERROR_EXTERNAL;
+    }
+
+    ret = zmq_msg_recv(&msg, s->socket, ZMQ_DONTWAIT);
+    if (ret == -1) {
+        ret = (errno == EAGAIN) ? AVERROR(EAGAIN) : AVERROR_EXTERNAL;
+        if (ret == AVERROR_EXTERNAL)
+          av_log(h, AV_LOG_ERROR, "Error occured during zmq_msg_recv(): %s\n", zmq_strerror(errno));
+        goto finish;
+    }
+
+    msg_size = zmq_msg_size(&msg);
+    if (msg_size > size) {
+        msg_size = size;
+        av_log(h, AV_LOG_WARNING, "ZMQ message exceeds available space in the buffer. Message will be truncated\n");
+    }
+    memcpy(buf, zmq_msg_data(&msg), msg_size);
+
+finish:
+    zmq_msg_close(&msg);
+    return ret;
+}
+
+static int ff_zmq_close(URLContext *h)
+{
+    ZMQContext *s = h->priv_data;
+    zmq_close(s->socket);
+    zmq_ctx_destroy(s->context);
+    return 0;
+}
+
+static const AVClass zmq_context_class = {
+    .class_name = "zmq",
+    .item_name  = av_default_item_name,
+    .option     = options,
+    .version    = LIBAVUTIL_VERSION_INT,
+};
+
+const URLProtocol ff_libzmq_protocol = {
+    .name            = "zmq",
+    .url_close       = ff_zmq_close,
+    .url_open        = ff_zmq_open,
+    .url_read        = ff_zmq_read,
+    .url_write       = ff_zmq_write,
+    .priv_data_size  = sizeof(ZMQContext),
+    .priv_data_class = &zmq_context_class,
+    .flags           = URL_PROTOCOL_FLAG_NETWORK,
+};
diff --git a/libavformat/protocols.c b/libavformat/protocols.c
index ad95659795..face5b29b5 100644
--- a/libavformat/protocols.c
+++ b/libavformat/protocols.c
@@ -68,6 +68,7 @@  extern const URLProtocol ff_librtmpte_protocol;
 extern const URLProtocol ff_libsrt_protocol;
 extern const URLProtocol ff_libssh_protocol;
 extern const URLProtocol ff_libsmbclient_protocol;
+extern const URLProtocol ff_libzmq_protocol;
 
 #include "libavformat/protocol_list.c"