diff mbox

[FFmpeg-devel,v3] libavformat: Add ZeroMQ as a protocol option

Message ID 20190819212803.d2dk4jj3cumoymin@manj
State Superseded
Headers show

Commit Message

Andriy Gelman Aug. 19, 2019, 9:28 p.m. UTC
Minor changes in v3: 
1. Removed tab character from as per feedback 
2. Removed unused timeout variable from ZMQContext 

Andriy
From 66c11c12fcfa8a7fbb3c8c09d23c017992229a99 Mon Sep 17 00:00:00 2001
From: Andriy Gelman <andriy.gelman@gmail.com>
Date: Tue, 30 Jul 2019 14:39:32 -0400
Subject: [PATCH] libavformat: Add ZeroMQ as a protocol option

Currently multiple clients are only supported by using a multicast
destination address. An alternative is to stream to a server which
re-distributes the content. This commit adds ZeroMQ as a protocol
option, which allows multiple clients to connect to a single ffmpeg
instance.
---
 configure               |   2 +
 doc/general.texi        |   1 +
 doc/protocols.texi      |  32 ++++++++
 libavformat/Makefile    |   1 +
 libavformat/libzmq.c    | 158 ++++++++++++++++++++++++++++++++++++++++
 libavformat/protocols.c |   1 +
 6 files changed, 195 insertions(+)
 create mode 100644 libavformat/libzmq.c

Comments

Andriy Gelman Aug. 24, 2019, 2:54 a.m. UTC | #1
On Mon, 19. Aug 17:28, Andriy Gelman wrote:
> Minor changes in v3: 
> 1. Removed tab character from as per feedback 
> 2. Removed unused timeout variable from ZMQContext 
> 
> Andriy

> From 66c11c12fcfa8a7fbb3c8c09d23c017992229a99 Mon Sep 17 00:00:00 2001
> From: Andriy Gelman <andriy.gelman@gmail.com>
> Date: Tue, 30 Jul 2019 14:39:32 -0400
> Subject: [PATCH] libavformat: Add ZeroMQ as a protocol option
> 
> Currently multiple clients are only supported by using a multicast
> destination address. An alternative is to stream to a server which
> re-distributes the content. This commit adds ZeroMQ as a protocol
> option, which allows multiple clients to connect to a single ffmpeg
> instance.
> ---
>  configure               |   2 +
>  doc/general.texi        |   1 +
>  doc/protocols.texi      |  32 ++++++++
>  libavformat/Makefile    |   1 +
>  libavformat/libzmq.c    | 158 ++++++++++++++++++++++++++++++++++++++++
>  libavformat/protocols.c |   1 +
>  6 files changed, 195 insertions(+)
>  create mode 100644 libavformat/libzmq.c
> 
> diff --git a/configure b/configure
> index c09c842809..a4134024c2 100755
> --- a/configure
> +++ b/configure
> @@ -3411,6 +3411,8 @@ libsrt_protocol_deps="libsrt"
>  libsrt_protocol_select="network"
>  libssh_protocol_deps="libssh"
>  libtls_conflict="openssl gnutls mbedtls"
> +libzmq_protocol_deps="libzmq"
> +libzmq_protocol_select="network"
>  
>  # filters
>  afftdn_filter_deps="avcodec"
> diff --git a/doc/general.texi b/doc/general.texi
> index 3c0c803449..b8e063268c 100644
> --- a/doc/general.texi
> +++ b/doc/general.texi
> @@ -1329,6 +1329,7 @@ performance on systems without hardware floating point support).
>  @item TCP          @tab X
>  @item TLS          @tab X
>  @item UDP          @tab X
> +@item ZMQ          @tab E
>  @end multitable
>  
>  @code{X} means that the protocol is supported.
> diff --git a/doc/protocols.texi b/doc/protocols.texi
> index 3e4e7af3d4..174eaacd0f 100644
> --- a/doc/protocols.texi
> +++ b/doc/protocols.texi
> @@ -1728,4 +1728,36 @@ Timeout in ms.
>  Create the Unix socket in listening mode.
>  @end table
>  
> +@section libzmq
> +
> +ZeroMQ asynchronous messaging library.
> +
> +This library supports unicast streaming to multiple clients without relying on
> +an external server.
> +
> +The required syntax for streaming or connecting to a stream is:
> +@example
> +zmq:tcp://ip-address:port
> +@end example
> +
> +Example:
> +Create a localhost stream on port 5555:
> +@example
> +ffmpeg -re -i input -f mpegts zmq:tcp://127.0.0.1:5555
> +@end example
> +
> +Multiple clients may connect to the stream using:
> +@example
> +ffplay zmq:tcp://127.0.0.1:5555
> +@end example
> +
> +Streaming to multiple clients is implemented using a ZMQ Pub-Sub pattern.
> +The server binds to a port and publishes data. Clients connect to the
> +server (IP address/port) and subscribe to the stream. The order in which
> +the server and client start generally does not matter.
> +
> +ffmpeg must be compiled with the --enable-libzmq option to support
> +this protocol option. See the compilation guide @url{https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu}
> +for an example on how this option may be set.
> +
>  @c man end PROTOCOLS
> diff --git a/libavformat/Makefile b/libavformat/Makefile
> index a434b005a4..efa3a112ae 100644
> --- a/libavformat/Makefile
> +++ b/libavformat/Makefile
> @@ -631,6 +631,7 @@ OBJS-$(CONFIG_LIBRTMPTE_PROTOCOL)        += librtmp.o
>  OBJS-$(CONFIG_LIBSMBCLIENT_PROTOCOL)     += libsmbclient.o
>  OBJS-$(CONFIG_LIBSRT_PROTOCOL)           += libsrt.o
>  OBJS-$(CONFIG_LIBSSH_PROTOCOL)           += libssh.o
> +OBJS-$(CONFIG_LIBZMQ_PROTOCOL)           += libzmq.o
>  
>  # libavdevice dependencies
>  OBJS-$(CONFIG_IEC61883_INDEV)            += dv.o
> diff --git a/libavformat/libzmq.c b/libavformat/libzmq.c
> new file mode 100644
> index 0000000000..ac35c01cf8
> --- /dev/null
> +++ b/libavformat/libzmq.c
> @@ -0,0 +1,158 @@
> +/*
> + * ZMQ URLProtocol
> + *
> + * This file is part of FFmpeg.
> + *
> + * FFmpeg is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * FFmpeg is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with FFmpeg; if not, write to the Free Software
> + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> + */
> +
> +#include <zmq.h>
> +#include "url.h"
> +#include "network.h"
> +#include "libavutil/avstring.h"
> +#include "libavutil/opt.h"
> +
> +typedef struct ZMQContext {
> +    const AVClass *class;
> +    void *context;
> +    void *socket;
> +} ZMQContext;
> +
> +static const AVOption options[] = {
> +    { NULL }
> +};
> +
> +static int ff_zmq_open(URLContext *h, const char *uri, int flags)
> +{
> +    int ret;
> +    ZMQContext *s   = h->priv_data;
> +    s->context      = zmq_ctx_new();
> +    h->is_streamed  = 1;
> +
> +    av_strstart(uri, "zmq:", &uri);
> +
> +    /*publish during write*/
> +    if (h->flags & AVIO_FLAG_WRITE) {
> +        s->socket = zmq_socket(s->context, ZMQ_PUB);
> +        if (!s->socket) {
> +            av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", zmq_strerror(errno));
> +            zmq_ctx_destroy(s->context);
> +            return AVERROR_EXTERNAL;
> +        }
> +
> +        ret = zmq_bind(s->socket, uri);
> +        if (ret == -1) {
> +            av_log(h, AV_LOG_ERROR, "Error occured during zmq_bind(): %s\n", zmq_strerror(errno));
> +            zmq_close(s->socket);
> +            zmq_ctx_destroy(s->context);
> +            return AVERROR_EXTERNAL;
> +        }
> +    }
> +
> +    /*subscribe for read*/
> +    if (h->flags & AVIO_FLAG_READ) {
> +        s->socket = zmq_socket(s->context, ZMQ_SUB);
> +        if (!s->socket) {
> +            av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", zmq_strerror(errno));
> +            zmq_ctx_destroy(s->context);
> +            return AVERROR_EXTERNAL;
> +        }
> +
> +        zmq_setsockopt(s->socket, ZMQ_SUBSCRIBE, "", 0);
> +        ret = zmq_connect(s->socket, uri);
> +        if (ret == -1) {
> +            av_log(h, AV_LOG_ERROR, "Error occured during zmq_connect(): %s\n", zmq_strerror(errno));
> +            zmq_close(s->socket);
> +            zmq_ctx_destroy(s->context);
> +            return AVERROR_EXTERNAL;
> +        }
> +    }
> +    return 0;
> +}
> +
> +static int ff_zmq_write(URLContext *h, const unsigned char *buf, int size)
> +{
> +    int ret;
> +    ZMQContext *s = h->priv_data;
> +
> +    ret = zmq_send(s->socket, buf, size, ZMQ_DONTWAIT);
> +    if (ret >= 0)
> +        return ret; /*number of sent bytes*/
> +
> +    /*errno = EAGAIN if messages cannot be pushed*/
> +    if (ret == -1 && errno == EAGAIN) {
> +        return AVERROR(EAGAIN);
> +    } else
> +        return AVERROR_EXTERNAL;
> +}
> +
> +static int ff_zmq_read(URLContext *h, unsigned char *buf, int size)
> +{
> +    int ret;
> +    ZMQContext *s = h->priv_data;
> +    zmq_msg_t msg;
> +    int msg_size;
> +
> +    ret = zmq_msg_init(&msg);
> +    if (ret == -1) {
> +      av_log(h, AV_LOG_ERROR, "Error occured during zmq_msg_init(): %s\n", zmq_strerror(errno));
> +      return AVERROR_EXTERNAL;
> +    }
> +
> +    ret = zmq_msg_recv(&msg, s->socket, ZMQ_DONTWAIT);
> +    if (ret == -1) {
> +        ret = (errno == EAGAIN) ? AVERROR(EAGAIN) : AVERROR_EXTERNAL;
> +        if (ret == AVERROR_EXTERNAL)
> +          av_log(h, AV_LOG_ERROR, "Error occured during zmq_msg_recv(): %s\n", zmq_strerror(errno));
> +        goto finish;
> +    }
> +
> +    msg_size = zmq_msg_size(&msg);
> +    if (msg_size > size) {
> +        msg_size = size;
> +        av_log(h, AV_LOG_WARNING, "ZMQ message exceeds available space in the buffer. Message will be truncated\n");
> +    }
> +    memcpy(buf, zmq_msg_data(&msg), msg_size);
> +
> +finish:
> +    zmq_msg_close(&msg);
> +    return ret;
> +}
> +
> +static int ff_zmq_close(URLContext *h)
> +{
> +    ZMQContext *s = h->priv_data;
> +    zmq_close(s->socket);
> +    zmq_ctx_destroy(s->context);
> +    return 0;
> +}
> +
> +static const AVClass zmq_context_class = {
> +    .class_name = "zmq",
> +    .item_name  = av_default_item_name,
> +    .option     = options,
> +    .version    = LIBAVUTIL_VERSION_INT,
> +};
> +
> +const URLProtocol ff_libzmq_protocol = {
> +    .name            = "zmq",
> +    .url_close       = ff_zmq_close,
> +    .url_open        = ff_zmq_open,
> +    .url_read        = ff_zmq_read,
> +    .url_write       = ff_zmq_write,
> +    .priv_data_size  = sizeof(ZMQContext),
> +    .priv_data_class = &zmq_context_class,
> +    .flags           = URL_PROTOCOL_FLAG_NETWORK,
> +};
> diff --git a/libavformat/protocols.c b/libavformat/protocols.c
> index ad95659795..face5b29b5 100644
> --- a/libavformat/protocols.c
> +++ b/libavformat/protocols.c
> @@ -68,6 +68,7 @@ extern const URLProtocol ff_librtmpte_protocol;
>  extern const URLProtocol ff_libsrt_protocol;
>  extern const URLProtocol ff_libssh_protocol;
>  extern const URLProtocol ff_libsmbclient_protocol;
> +extern const URLProtocol ff_libzmq_protocol;
>  
>  #include "libavformat/protocol_list.c"
>  
> -- 
> 2.22.0
> 

ping
Marton Balint Aug. 24, 2019, 5:33 p.m. UTC | #2
On Fri, 23 Aug 2019, Andriy Gelman wrote:

> On Mon, 19. Aug 17:28, Andriy Gelman wrote:
>> Minor changes in v3: 
>> 1. Removed tab character from as per feedback 
>> 2. Removed unused timeout variable from ZMQContext 
>> 
>> Andriy
>
>> From 66c11c12fcfa8a7fbb3c8c09d23c017992229a99 Mon Sep 17 00:00:00 2001
>> From: Andriy Gelman <andriy.gelman@gmail.com>
>> Date: Tue, 30 Jul 2019 14:39:32 -0400
>> Subject: [PATCH] libavformat: Add ZeroMQ as a protocol option
>> 
>> Currently multiple clients are only supported by using a multicast
>> destination address. An alternative is to stream to a server which
>> re-distributes the content. This commit adds ZeroMQ as a protocol
>> option, which allows multiple clients to connect to a single ffmpeg
>> instance.
>> ---
>>  configure               |   2 +
>>  doc/general.texi        |   1 +
>>  doc/protocols.texi      |  32 ++++++++
>>  libavformat/Makefile    |   1 +
>>  libavformat/libzmq.c    | 158 ++++++++++++++++++++++++++++++++++++++++
>>  libavformat/protocols.c |   1 +
>>  6 files changed, 195 insertions(+)
>>  create mode 100644 libavformat/libzmq.c

Missing changelog entry and libavformat minor version bump.

>> 
>> diff --git a/configure b/configure
>> index c09c842809..a4134024c2 100755
>> --- a/configure
>> +++ b/configure
>> @@ -3411,6 +3411,8 @@ libsrt_protocol_deps="libsrt"
>>  libsrt_protocol_select="network"
>>  libssh_protocol_deps="libssh"
>>  libtls_conflict="openssl gnutls mbedtls"
>> +libzmq_protocol_deps="libzmq"
>> +libzmq_protocol_select="network"

You may want to enforce a minimum version requirement for libzmq in 
the pkg_config part of configure depending on the API you use.

>>
>>  # filters
>>  afftdn_filter_deps="avcodec"
>> diff --git a/doc/general.texi b/doc/general.texi
>> index 3c0c803449..b8e063268c 100644
>> --- a/doc/general.texi
>> +++ b/doc/general.texi
>> @@ -1329,6 +1329,7 @@ performance on systems without hardware floating point support).
>>  @item TCP          @tab X
>>  @item TLS          @tab X
>>  @item UDP          @tab X
>> +@item ZMQ          @tab E
>>  @end multitable
>>
>>  @code{X} means that the protocol is supported.
>> diff --git a/doc/protocols.texi b/doc/protocols.texi
>> index 3e4e7af3d4..174eaacd0f 100644
>> --- a/doc/protocols.texi
>> +++ b/doc/protocols.texi
>> @@ -1728,4 +1728,36 @@ Timeout in ms.
>>  Create the Unix socket in listening mode.
>>  @end table
>> 
>> +@section libzmq
>> +
>> +ZeroMQ asynchronous messaging library.
>> +
>> +This library supports unicast streaming to multiple clients without relying on
>> +an external server.
>> +
>> +The required syntax for streaming or connecting to a stream is:
>> +@example
>> +zmq:tcp://ip-address:port
>> +@end example
>> +
>> +Example:
>> +Create a localhost stream on port 5555:
>> +@example
>> +ffmpeg -re -i input -f mpegts zmq:tcp://127.0.0.1:5555
>> +@end example
>> +
>> +Multiple clients may connect to the stream using:
>> +@example
>> +ffplay zmq:tcp://127.0.0.1:5555
>> +@end example
>> +
>> +Streaming to multiple clients is implemented using a ZMQ Pub-Sub pattern.
>> +The server binds to a port and publishes data. Clients connect to the
>> +server (IP address/port) and subscribe to the stream. The order in which
>> +the server and client start generally does not matter.
>> +
>> +ffmpeg must be compiled with the --enable-libzmq option to support
>> +this protocol option. See the compilation guide @url{https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu}
>> +for an example on how this option may be set.

I think I'd rather not reference the compilation guide, as there are no 
specific instructions there to compile with libzmq. If you insist, then at 
least loose the "Ubuntu" part from the link.

>> +
>>  @c man end PROTOCOLS
>> diff --git a/libavformat/Makefile b/libavformat/Makefile
>> index a434b005a4..efa3a112ae 100644
>> --- a/libavformat/Makefile
>> +++ b/libavformat/Makefile
>> @@ -631,6 +631,7 @@ OBJS-$(CONFIG_LIBRTMPTE_PROTOCOL)        += librtmp.o
>>  OBJS-$(CONFIG_LIBSMBCLIENT_PROTOCOL)     += libsmbclient.o
>>  OBJS-$(CONFIG_LIBSRT_PROTOCOL)           += libsrt.o
>>  OBJS-$(CONFIG_LIBSSH_PROTOCOL)           += libssh.o
>> +OBJS-$(CONFIG_LIBZMQ_PROTOCOL)           += libzmq.o
>>
>>  # libavdevice dependencies
>>  OBJS-$(CONFIG_IEC61883_INDEV)            += dv.o
>> diff --git a/libavformat/libzmq.c b/libavformat/libzmq.c
>> new file mode 100644
>> index 0000000000..ac35c01cf8
>> --- /dev/null
>> +++ b/libavformat/libzmq.c
>> @@ -0,0 +1,158 @@
>> +/*
>> + * ZMQ URLProtocol
>> + *
>> + * This file is part of FFmpeg.
>> + *
>> + * FFmpeg is free software; you can redistribute it and/or
>> + * modify it under the terms of the GNU Lesser General Public
>> + * License as published by the Free Software Foundation; either
>> + * version 2.1 of the License, or (at your option) any later version.
>> + *
>> + * FFmpeg is distributed in the hope that it will be useful,
>> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
>> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
>> + * Lesser General Public License for more details.
>> + *
>> + * You should have received a copy of the GNU Lesser General Public
>> + * License along with FFmpeg; if not, write to the Free Software
>> + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
>> + */
>> +
>> +#include <zmq.h>
>> +#include "url.h"
>> +#include "network.h"
>> +#include "libavutil/avstring.h"
>> +#include "libavutil/opt.h"
>> +
>> +typedef struct ZMQContext {
>> +    const AVClass *class;
>> +    void *context;
>> +    void *socket;
>> +} ZMQContext;
>> +
>> +static const AVOption options[] = {
>> +    { NULL }
>> +};
>> +
>> +static int ff_zmq_open(URLContext *h, const char *uri, int flags)
>> +{
>> +    int ret;
>> +    ZMQContext *s   = h->priv_data;
>> +    s->context      = zmq_ctx_new();

You should check if the context creation was successful.

>> +    h->is_streamed  = 1;
>> +
>> +    av_strstart(uri, "zmq:", &uri);
>> +
>> +    /*publish during write*/
>> +    if (h->flags & AVIO_FLAG_WRITE) {
>> +        s->socket = zmq_socket(s->context, ZMQ_PUB);
>> +        if (!s->socket) {
>> +            av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", zmq_strerror(errno));

zmq_errno() instead of errno? Same goes for all similar cases.

>> +            zmq_ctx_destroy(s->context);
>> +            return AVERROR_EXTERNAL;
>> +        }
>> +
>> +        ret = zmq_bind(s->socket, uri);
>> +        if (ret == -1) {
>> +            av_log(h, AV_LOG_ERROR, "Error occured during zmq_bind(): %s\n", zmq_strerror(errno));
>> +            zmq_close(s->socket);
>> +            zmq_ctx_destroy(s->context);
>> +            return AVERROR_EXTERNAL;
>> +        }
>> +    }
>> +
>> +    /*subscribe for read*/
>> +    if (h->flags & AVIO_FLAG_READ) {
>> +        s->socket = zmq_socket(s->context, ZMQ_SUB);
>> +        if (!s->socket) {
>> +            av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", zmq_strerror(errno));
>> +            zmq_ctx_destroy(s->context);
>> +            return AVERROR_EXTERNAL;
>> +        }
>> +
>> +        zmq_setsockopt(s->socket, ZMQ_SUBSCRIBE, "", 0);
>> +        ret = zmq_connect(s->socket, uri);
>> +        if (ret == -1) {
>> +            av_log(h, AV_LOG_ERROR, "Error occured during zmq_connect(): %s\n", zmq_strerror(errno));
>> +            zmq_close(s->socket);
>> +            zmq_ctx_destroy(s->context);
>> +            return AVERROR_EXTERNAL;
>> +        }
>> +    }
>> +    return 0;
>> +}
>> +
>> +static int ff_zmq_write(URLContext *h, const unsigned char *buf, int size)
>> +{
>> +    int ret;
>> +    ZMQContext *s = h->priv_data;
>> +
>> +    ret = zmq_send(s->socket, buf, size, ZMQ_DONTWAIT);

I can see that you are using non-blocking mode. A polling with timeout 
approach is preferred, see how tcp or unix does it.

>> +    if (ret >= 0)
>> +        return ret; /*number of sent bytes*/
>> +
>> +    /*errno = EAGAIN if messages cannot be pushed*/
>> +    if (ret == -1 && errno == EAGAIN) {
>> +        return AVERROR(EAGAIN);
>> +    } else
>> +        return AVERROR_EXTERNAL;
>> +}
>> +
>> +static int ff_zmq_read(URLContext *h, unsigned char *buf, int size)
>> +{
>> +    int ret;
>> +    ZMQContext *s = h->priv_data;
>> +    zmq_msg_t msg;
>> +    int msg_size;
>> +
>> +    ret = zmq_msg_init(&msg);
>> +    if (ret == -1) {
>> +      av_log(h, AV_LOG_ERROR, "Error occured during zmq_msg_init(): %s\n", zmq_strerror(errno));
>> +      return AVERROR_EXTERNAL;
>> +    }
>> +
>> +    ret = zmq_msg_recv(&msg, s->socket, ZMQ_DONTWAIT);

Same here, a polling with timeout is preferred.

>> +    if (ret == -1) {
>> +        ret = (errno == EAGAIN) ? AVERROR(EAGAIN) : AVERROR_EXTERNAL;
>> +        if (ret == AVERROR_EXTERNAL)
>> +          av_log(h, AV_LOG_ERROR, "Error occured during zmq_msg_recv(): %s\n", zmq_strerror(errno));

identation

>> +        goto finish;
>> +    }
>> +
>> +    msg_size = zmq_msg_size(&msg);
>> +    if (msg_size > size) {
>> +        msg_size = size;
>> +        av_log(h, AV_LOG_WARNING, "ZMQ message exceeds available space in the buffer. Message will be truncated\n");

Probably a user settable pkt_size option would be useful which sets the 
URLContext max_packet_size which basically controls the size of the 
allocated IO buffer.

>> +    }
>> +    memcpy(buf, zmq_msg_data(&msg), msg_size);

If you are truncating anyway then please use zmq_recv directly, so you can 
avoid the memcpy.

>> +
>> +finish:
>> +    zmq_msg_close(&msg);
>> +    return ret;
>> +}
>> +
>> +static int ff_zmq_close(URLContext *h)
>> +{
>> +    ZMQContext *s = h->priv_data;
>> +    zmq_close(s->socket);
>> +    zmq_ctx_destroy(s->context);
>> +    return 0;
>> +}
>> +
>> +static const AVClass zmq_context_class = {
>> +    .class_name = "zmq",
>> +    .item_name  = av_default_item_name,
>> +    .option     = options,
>> +    .version    = LIBAVUTIL_VERSION_INT,
>> +};
>> +
>> +const URLProtocol ff_libzmq_protocol = {
>> +    .name            = "zmq",
>> +    .url_close       = ff_zmq_close,
>> +    .url_open        = ff_zmq_open,
>> +    .url_read        = ff_zmq_read,
>> +    .url_write       = ff_zmq_write,
>> +    .priv_data_size  = sizeof(ZMQContext),
>> +    .priv_data_class = &zmq_context_class,
>> +    .flags           = URL_PROTOCOL_FLAG_NETWORK,
>> +};
>> diff --git a/libavformat/protocols.c b/libavformat/protocols.c
>> index ad95659795..face5b29b5 100644
>> --- a/libavformat/protocols.c
>> +++ b/libavformat/protocols.c
>> @@ -68,6 +68,7 @@ extern const URLProtocol ff_librtmpte_protocol;
>>  extern const URLProtocol ff_libsrt_protocol;
>>  extern const URLProtocol ff_libssh_protocol;
>>  extern const URLProtocol ff_libsmbclient_protocol;
>> +extern const URLProtocol ff_libzmq_protocol;
>>
>>  #include "libavformat/protocol_list.c"
>> 
>> -- 
>> 2.22.0
>> 
>
> ping

Regards,
Marton
Andriy Gelman Aug. 28, 2019, 9:42 p.m. UTC | #3
Marton, 
Thanks for reviewing this patch.

On Sat, 24. Aug 19:33, Marton Balint wrote:
> 
> 
> On Fri, 23 Aug 2019, Andriy Gelman wrote:
> 
> > On Mon, 19. Aug 17:28, Andriy Gelman wrote:
> > > Minor changes in v3: 1. Removed tab character from as per feedback
> > > 2. Removed unused timeout variable from ZMQContext
> > > 
> > > Andriy
> > 
> > > From 66c11c12fcfa8a7fbb3c8c09d23c017992229a99 Mon Sep 17 00:00:00 2001
> > > From: Andriy Gelman <andriy.gelman@gmail.com>
> > > Date: Tue, 30 Jul 2019 14:39:32 -0400
> > > Subject: [PATCH] libavformat: Add ZeroMQ as a protocol option
> > > 
> > > Currently multiple clients are only supported by using a multicast
> > > destination address. An alternative is to stream to a server which
> > > re-distributes the content. This commit adds ZeroMQ as a protocol
> > > option, which allows multiple clients to connect to a single ffmpeg
> > > instance.
> > > ---
> > >  configure               |   2 +
> > >  doc/general.texi        |   1 +
> > >  doc/protocols.texi      |  32 ++++++++
> > >  libavformat/Makefile    |   1 +
> > >  libavformat/libzmq.c    | 158 ++++++++++++++++++++++++++++++++++++++++
> > >  libavformat/protocols.c |   1 +
> > >  6 files changed, 195 insertions(+)
> > >  create mode 100644 libavformat/libzmq.c
> 
> Missing changelog entry and libavformat minor version bump.

ok, will update.

> 
> > > 
> > > diff --git a/configure b/configure
> > > index c09c842809..a4134024c2 100755
> > > --- a/configure
> > > +++ b/configure
> > > @@ -3411,6 +3411,8 @@ libsrt_protocol_deps="libsrt"
> > >  libsrt_protocol_select="network"
> > >  libssh_protocol_deps="libssh"
> > >  libtls_conflict="openssl gnutls mbedtls"
> > > +libzmq_protocol_deps="libzmq"
> > > +libzmq_protocol_select="network"
> 
> You may want to enforce a minimum version requirement for libzmq in the
> pkg_config part of configure depending on the API you use.

ok

> 
> > > 
> > >  # filters
> > >  afftdn_filter_deps="avcodec"
> > > diff --git a/doc/general.texi b/doc/general.texi
> > > index 3c0c803449..b8e063268c 100644
> > > --- a/doc/general.texi
> > > +++ b/doc/general.texi
> > > @@ -1329,6 +1329,7 @@ performance on systems without hardware floating point support).
> > >  @item TCP          @tab X
> > >  @item TLS          @tab X
> > >  @item UDP          @tab X
> > > +@item ZMQ          @tab E
> > >  @end multitable
> > > 
> > >  @code{X} means that the protocol is supported.
> > > diff --git a/doc/protocols.texi b/doc/protocols.texi
> > > index 3e4e7af3d4..174eaacd0f 100644
> > > --- a/doc/protocols.texi
> > > +++ b/doc/protocols.texi
> > > @@ -1728,4 +1728,36 @@ Timeout in ms.
> > >  Create the Unix socket in listening mode.
> > >  @end table
> > > 
> > > +@section libzmq
> > > +
> > > +ZeroMQ asynchronous messaging library.
> > > +
> > > +This library supports unicast streaming to multiple clients without relying on
> > > +an external server.
> > > +
> > > +The required syntax for streaming or connecting to a stream is:
> > > +@example
> > > +zmq:tcp://ip-address:port
> > > +@end example
> > > +
> > > +Example:
> > > +Create a localhost stream on port 5555:
> > > +@example
> > > +ffmpeg -re -i input -f mpegts zmq:tcp://127.0.0.1:5555
> > > +@end example
> > > +
> > > +Multiple clients may connect to the stream using:
> > > +@example
> > > +ffplay zmq:tcp://127.0.0.1:5555
> > > +@end example
> > > +
> > > +Streaming to multiple clients is implemented using a ZMQ Pub-Sub pattern.
> > > +The server binds to a port and publishes data. Clients connect to the
> > > +server (IP address/port) and subscribe to the stream. The order in which
> > > +the server and client start generally does not matter.
> > > +
> > > +ffmpeg must be compiled with the --enable-libzmq option to support
> > > +this protocol option. See the compilation guide @url{https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu}
> > > +for an example on how this option may be set.
> 
> I think I'd rather not reference the compilation guide, as there are no
> specific instructions there to compile with libzmq. If you insist, then at
> least loose the "Ubuntu" part from the link.

I'll remove the reference to the compilation guide.

> 
> > > +
> > >  @c man end PROTOCOLS
> > > diff --git a/libavformat/Makefile b/libavformat/Makefile
> > > index a434b005a4..efa3a112ae 100644
> > > --- a/libavformat/Makefile
> > > +++ b/libavformat/Makefile
> > > @@ -631,6 +631,7 @@ OBJS-$(CONFIG_LIBRTMPTE_PROTOCOL)        += librtmp.o
> > >  OBJS-$(CONFIG_LIBSMBCLIENT_PROTOCOL)     += libsmbclient.o
> > >  OBJS-$(CONFIG_LIBSRT_PROTOCOL)           += libsrt.o
> > >  OBJS-$(CONFIG_LIBSSH_PROTOCOL)           += libssh.o
> > > +OBJS-$(CONFIG_LIBZMQ_PROTOCOL)           += libzmq.o
> > > 
> > >  # libavdevice dependencies
> > >  OBJS-$(CONFIG_IEC61883_INDEV)            += dv.o
> > > diff --git a/libavformat/libzmq.c b/libavformat/libzmq.c
> > > new file mode 100644
> > > index 0000000000..ac35c01cf8
> > > --- /dev/null
> > > +++ b/libavformat/libzmq.c
> > > @@ -0,0 +1,158 @@
> > > +/*
> > > + * ZMQ URLProtocol
> > > + *
> > > + * This file is part of FFmpeg.
> > > + *
> > > + * FFmpeg is free software; you can redistribute it and/or
> > > + * modify it under the terms of the GNU Lesser General Public
> > > + * License as published by the Free Software Foundation; either
> > > + * version 2.1 of the License, or (at your option) any later version.
> > > + *
> > > + * FFmpeg is distributed in the hope that it will be useful,
> > > + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> > > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> > > + * Lesser General Public License for more details.
> > > + *
> > > + * You should have received a copy of the GNU Lesser General Public
> > > + * License along with FFmpeg; if not, write to the Free Software
> > > + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> > > + */
> > > +
> > > +#include <zmq.h>
> > > +#include "url.h"
> > > +#include "network.h"
> > > +#include "libavutil/avstring.h"
> > > +#include "libavutil/opt.h"
> > > +
> > > +typedef struct ZMQContext {
> > > +    const AVClass *class;
> > > +    void *context;
> > > +    void *socket;
> > > +} ZMQContext;
> > > +
> > > +static const AVOption options[] = {
> > > +    { NULL }
> > > +};
> > > +
> > > +static int ff_zmq_open(URLContext *h, const char *uri, int flags)
> > > +{
> > > +    int ret;
> > > +    ZMQContext *s   = h->priv_data;
> > > +    s->context      = zmq_ctx_new();
> 
> You should check if the context creation was successful.

thanks, I'll add the check.

> 
> > > +    h->is_streamed  = 1;
> > > +
> > > +    av_strstart(uri, "zmq:", &uri);
> > > +
> > > +    /*publish during write*/
> > > +    if (h->flags & AVIO_FLAG_WRITE) {
> > > +        s->socket = zmq_socket(s->context, ZMQ_PUB);
> > > +        if (!s->socket) {
> > > +            av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", zmq_strerror(errno));
> 
> zmq_errno() instead of errno? Same goes for all similar cases.

The documentation says to use zmq_errno() on non-POSIX systems. 
But also suggests:
"Users not experiencing issues with retrieving the correct value of errno should
not use this function and should instead access the errno variable directly."

On IRC J_Darnley pointed out that ffmpeg.c includes errno.h without any checks.
It seems reasonable to assume that errno is available.

> 
> > > +            zmq_ctx_destroy(s->context);
> > > +            return AVERROR_EXTERNAL;
> > > +        }
> > > +
> > > +        ret = zmq_bind(s->socket, uri);
> > > +        if (ret == -1) {
> > > +            av_log(h, AV_LOG_ERROR, "Error occured during zmq_bind(): %s\n", zmq_strerror(errno));
> > > +            zmq_close(s->socket);
> > > +            zmq_ctx_destroy(s->context);
> > > +            return AVERROR_EXTERNAL;
> > > +        }
> > > +    }
> > > +
> > > +    /*subscribe for read*/
> > > +    if (h->flags & AVIO_FLAG_READ) {
> > > +        s->socket = zmq_socket(s->context, ZMQ_SUB);
> > > +        if (!s->socket) {
> > > +            av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", zmq_strerror(errno));
> > > +            zmq_ctx_destroy(s->context);
> > > +            return AVERROR_EXTERNAL;
> > > +        }
> > > +
> > > +        zmq_setsockopt(s->socket, ZMQ_SUBSCRIBE, "", 0);
> > > +        ret = zmq_connect(s->socket, uri);
> > > +        if (ret == -1) {
> > > +            av_log(h, AV_LOG_ERROR, "Error occured during zmq_connect(): %s\n", zmq_strerror(errno));
> > > +            zmq_close(s->socket);
> > > +            zmq_ctx_destroy(s->context);
> > > +            return AVERROR_EXTERNAL;
> > > +        }
> > > +    }
> > > +    return 0;
> > > +}
> > > +
> > > +static int ff_zmq_write(URLContext *h, const unsigned char *buf, int size)
> > > +{
> > > +    int ret;
> > > +    ZMQContext *s = h->priv_data;
> > > +
> > > +    ret = zmq_send(s->socket, buf, size, ZMQ_DONTWAIT);
> 
> I can see that you are using non-blocking mode. A polling with timeout
> approach is preferred, see how tcp or unix does it.

I used polling in the initial patch, but I switched to non-blocking because I
thought it was a cleaner solution. I'll revert to polling with a timeout in the
next version.

> 
> > > +    if (ret >= 0)
> > > +        return ret; /*number of sent bytes*/
> > > +
> > > +    /*errno = EAGAIN if messages cannot be pushed*/
> > > +    if (ret == -1 && errno == EAGAIN) {
> > > +        return AVERROR(EAGAIN);
> > > +    } else
> > > +        return AVERROR_EXTERNAL;
> > > +}
> > > +
> > > +static int ff_zmq_read(URLContext *h, unsigned char *buf, int size)
> > > +{
> > > +    int ret;
> > > +    ZMQContext *s = h->priv_data;
> > > +    zmq_msg_t msg;
> > > +    int msg_size;
> > > +
> > > +    ret = zmq_msg_init(&msg);
> > > +    if (ret == -1) {
> > > +      av_log(h, AV_LOG_ERROR, "Error occured during zmq_msg_init(): %s\n", zmq_strerror(errno));
> > > +      return AVERROR_EXTERNAL;
> > > +    }
> > > +
> > > +    ret = zmq_msg_recv(&msg, s->socket, ZMQ_DONTWAIT);
> 
> Same here, a polling with timeout is preferred.
> 
> > > +    if (ret == -1) {
> > > +        ret = (errno == EAGAIN) ? AVERROR(EAGAIN) : AVERROR_EXTERNAL;
> > > +        if (ret == AVERROR_EXTERNAL)
> > > +          av_log(h, AV_LOG_ERROR, "Error occured during zmq_msg_recv(): %s\n", zmq_strerror(errno));
> 
> identation
> 
> > > +        goto finish;
> > > +    }
> > > +
> > > +    msg_size = zmq_msg_size(&msg);
> > > +    if (msg_size > size) {
> > > +        msg_size = size;
> > > +        av_log(h, AV_LOG_WARNING, "ZMQ message exceeds available space in the buffer. Message will be truncated\n");
> 
> Probably a user settable pkt_size option would be useful which sets the
> URLContext max_packet_size which basically controls the size of the
> allocated IO buffer.
> 
> > > +    }
> > > +    memcpy(buf, zmq_msg_data(&msg), msg_size);
> 
> If you are truncating anyway then please use zmq_recv directly, so you can
> avoid the memcpy.

yes, good idea on setting pkt_size and avoiding memcpy.

> 
> > > +
> > > +finish:
> > > +    zmq_msg_close(&msg);
> > > +    return ret;
> > > +}
> > > +
> > > +static int ff_zmq_close(URLContext *h)
> > > +{
> > > +    ZMQContext *s = h->priv_data;
> > > +    zmq_close(s->socket);
> > > +    zmq_ctx_destroy(s->context);
> > > +    return 0;
> > > +}
> > > +
> > > +static const AVClass zmq_context_class = {
> > > +    .class_name = "zmq",
> > > +    .item_name  = av_default_item_name,
> > > +    .option     = options,
> > > +    .version    = LIBAVUTIL_VERSION_INT,
> > > +};
> > > +
> > > +const URLProtocol ff_libzmq_protocol = {
> > > +    .name            = "zmq",
> > > +    .url_close       = ff_zmq_close,
> > > +    .url_open        = ff_zmq_open,
> > > +    .url_read        = ff_zmq_read,
> > > +    .url_write       = ff_zmq_write,
> > > +    .priv_data_size  = sizeof(ZMQContext),
> > > +    .priv_data_class = &zmq_context_class,
> > > +    .flags           = URL_PROTOCOL_FLAG_NETWORK,
> > > +};
> > > diff --git a/libavformat/protocols.c b/libavformat/protocols.c
> > > index ad95659795..face5b29b5 100644
> > > --- a/libavformat/protocols.c
> > > +++ b/libavformat/protocols.c
> > > @@ -68,6 +68,7 @@ extern const URLProtocol ff_librtmpte_protocol;
> > >  extern const URLProtocol ff_libsrt_protocol;
> > >  extern const URLProtocol ff_libssh_protocol;
> > >  extern const URLProtocol ff_libsmbclient_protocol;
> > > +extern const URLProtocol ff_libzmq_protocol;
> > > 
> > >  #include "libavformat/protocol_list.c"
> > > 
> > > -- 
> > > 2.22.0
> > > 
> > 
> > ping
> 
> Regards,
> Marton

Thanks, 
Andriy
Marton Balint Aug. 29, 2019, 12:28 a.m. UTC | #4
On Wed, 28 Aug 2019, Andriy Gelman wrote:

>> > > +    h->is_streamed  = 1;
>> > > +
>> > > +    av_strstart(uri, "zmq:", &uri);
>> > > +
>> > > +    /*publish during write*/
>> > > +    if (h->flags & AVIO_FLAG_WRITE) {
>> > > +        s->socket = zmq_socket(s->context, ZMQ_PUB);
>> > > +        if (!s->socket) {
>> > > +            av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", zmq_strerror(errno));
>> 
>> zmq_errno() instead of errno? Same goes for all similar cases.
>
> The documentation says to use zmq_errno() on non-POSIX systems. 
> But also suggests:
> "Users not experiencing issues with retrieving the correct value of errno should
> not use this function and should instead access the errno variable directly."
>
> On IRC J_Darnley pointed out that ffmpeg.c includes errno.h without any checks.
> It seems reasonable to assume that errno is available.

That does not matter, because ffmpeg uses errno for checking errors 
when ffmpeg.c calls the standard C library, not when a linked 
library calls the standard C library.

I am no expert in this area, but as far as I understood it from the docs 
of zmq_errno() the problem is not the "availability" of errno but that 
multiple C runtimes might be in use an you might not get the errno of the 
C runtime libzmq is using but the errno of the C runtime your application 
is using.

Here are some more deails I found:
https://grokbase.com/t/zeromq/zeromq-dev/1087reyava/portability-of-0mq-api

So zmq_errno() still feels safer and more portable.

>> > > +static int ff_zmq_write(URLContext *h, const unsigned char *buf, int size)
>> > > +{
>> > > +    int ret;
>> > > +    ZMQContext *s = h->priv_data;
>> > > +
>> > > +    ret = zmq_send(s->socket, buf, size, ZMQ_DONTWAIT);
>> 
>> I can see that you are using non-blocking mode. A polling with timeout
>> approach is preferred, see how tcp or unix does it.
>
> I used polling in the initial patch, but I switched to non-blocking because I
> thought it was a cleaner solution. I'll revert to polling with a timeout in the
> next version.

Polling returns immediately when data is available. For the non-blocking 
approach the code falls back to 1ms (which can easily be 10ms on Win32) sleeps 
after a few retries. This severely can hurt performance, that is why the 
polling approach is preferred.

Thanks,
Marton
Andriy Gelman Aug. 29, 2019, 2:10 a.m. UTC | #5
On Thu, 29. Aug 02:28, Marton Balint wrote:
> 
> 
> On Wed, 28 Aug 2019, Andriy Gelman wrote:
> 
> > > > > +    h->is_streamed  = 1;
> > > > > +
> > > > > +    av_strstart(uri, "zmq:", &uri);
> > > > > +
> > > > > +    /*publish during write*/
> > > > > +    if (h->flags & AVIO_FLAG_WRITE) {
> > > > > +        s->socket = zmq_socket(s->context, ZMQ_PUB);
> > > > > +        if (!s->socket) {
> > > > > +            av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", zmq_strerror(errno));
> > > 
> > > zmq_errno() instead of errno? Same goes for all similar cases.
> > 
> > The documentation says to use zmq_errno() on non-POSIX systems. But also
> > suggests:
> > "Users not experiencing issues with retrieving the correct value of errno should
> > not use this function and should instead access the errno variable directly."
> > 
> > On IRC J_Darnley pointed out that ffmpeg.c includes errno.h without any checks.
> > It seems reasonable to assume that errno is available.
> 
> That does not matter, because ffmpeg uses errno for checking errors when
> ffmpeg.c calls the standard C library, not when a linked library calls the
> standard C library.
> 
> I am no expert in this area, but as far as I understood it from the docs of
> zmq_errno() the problem is not the "availability" of errno but that multiple
> C runtimes might be in use an you might not get the errno of the C runtime
> libzmq is using but the errno of the C runtime your application is using.
> 
> Here are some more deails I found:
> https://grokbase.com/t/zeromq/zeromq-dev/1087reyava/portability-of-0mq-api
> 
> So zmq_errno() still feels safer and more portable.

ok, I misunderstood the problem. I'll use zmq_errno()   

> 
> > > > > +static int ff_zmq_write(URLContext *h, const unsigned char *buf, int size)
> > > > > +{
> > > > > +    int ret;
> > > > > +    ZMQContext *s = h->priv_data;
> > > > > +
> > > > > +    ret = zmq_send(s->socket, buf, size, ZMQ_DONTWAIT);
> > > 
> > > I can see that you are using non-blocking mode. A polling with timeout
> > > approach is preferred, see how tcp or unix does it.
> > 
> > I used polling in the initial patch, but I switched to non-blocking because I
> > thought it was a cleaner solution. I'll revert to polling with a timeout in the
> > next version.
> 
> Polling returns immediately when data is available. For the non-blocking
> approach the code falls back to 1ms (which can easily be 10ms on Win32)
> sleeps after a few retries. This severely can hurt performance, that is why
> the polling approach is preferred.

That's very good to know. 

Thanks, 
Andriy
diff mbox

Patch

diff --git a/configure b/configure
index c09c842809..a4134024c2 100755
--- a/configure
+++ b/configure
@@ -3411,6 +3411,8 @@  libsrt_protocol_deps="libsrt"
 libsrt_protocol_select="network"
 libssh_protocol_deps="libssh"
 libtls_conflict="openssl gnutls mbedtls"
+libzmq_protocol_deps="libzmq"
+libzmq_protocol_select="network"
 
 # filters
 afftdn_filter_deps="avcodec"
diff --git a/doc/general.texi b/doc/general.texi
index 3c0c803449..b8e063268c 100644
--- a/doc/general.texi
+++ b/doc/general.texi
@@ -1329,6 +1329,7 @@  performance on systems without hardware floating point support).
 @item TCP          @tab X
 @item TLS          @tab X
 @item UDP          @tab X
+@item ZMQ          @tab E
 @end multitable
 
 @code{X} means that the protocol is supported.
diff --git a/doc/protocols.texi b/doc/protocols.texi
index 3e4e7af3d4..174eaacd0f 100644
--- a/doc/protocols.texi
+++ b/doc/protocols.texi
@@ -1728,4 +1728,36 @@  Timeout in ms.
 Create the Unix socket in listening mode.
 @end table
 
+@section libzmq
+
+ZeroMQ asynchronous messaging library.
+
+This library supports unicast streaming to multiple clients without relying on
+an external server.
+
+The required syntax for streaming or connecting to a stream is:
+@example
+zmq:tcp://ip-address:port
+@end example
+
+Example:
+Create a localhost stream on port 5555:
+@example
+ffmpeg -re -i input -f mpegts zmq:tcp://127.0.0.1:5555
+@end example
+
+Multiple clients may connect to the stream using:
+@example
+ffplay zmq:tcp://127.0.0.1:5555
+@end example
+
+Streaming to multiple clients is implemented using a ZMQ Pub-Sub pattern.
+The server binds to a port and publishes data. Clients connect to the
+server (IP address/port) and subscribe to the stream. The order in which
+the server and client start generally does not matter.
+
+ffmpeg must be compiled with the --enable-libzmq option to support
+this protocol option. See the compilation guide @url{https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu}
+for an example on how this option may be set.
+
 @c man end PROTOCOLS
diff --git a/libavformat/Makefile b/libavformat/Makefile
index a434b005a4..efa3a112ae 100644
--- a/libavformat/Makefile
+++ b/libavformat/Makefile
@@ -631,6 +631,7 @@  OBJS-$(CONFIG_LIBRTMPTE_PROTOCOL)        += librtmp.o
 OBJS-$(CONFIG_LIBSMBCLIENT_PROTOCOL)     += libsmbclient.o
 OBJS-$(CONFIG_LIBSRT_PROTOCOL)           += libsrt.o
 OBJS-$(CONFIG_LIBSSH_PROTOCOL)           += libssh.o
+OBJS-$(CONFIG_LIBZMQ_PROTOCOL)           += libzmq.o
 
 # libavdevice dependencies
 OBJS-$(CONFIG_IEC61883_INDEV)            += dv.o
diff --git a/libavformat/libzmq.c b/libavformat/libzmq.c
new file mode 100644
index 0000000000..ac35c01cf8
--- /dev/null
+++ b/libavformat/libzmq.c
@@ -0,0 +1,158 @@ 
+/*
+ * ZMQ URLProtocol
+ *
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <zmq.h>
+#include "url.h"
+#include "network.h"
+#include "libavutil/avstring.h"
+#include "libavutil/opt.h"
+
+typedef struct ZMQContext {
+    const AVClass *class;
+    void *context;
+    void *socket;
+} ZMQContext;
+
+static const AVOption options[] = {
+    { NULL }
+};
+
+static int ff_zmq_open(URLContext *h, const char *uri, int flags)
+{
+    int ret;
+    ZMQContext *s   = h->priv_data;
+    s->context      = zmq_ctx_new();
+    h->is_streamed  = 1;
+
+    av_strstart(uri, "zmq:", &uri);
+
+    /*publish during write*/
+    if (h->flags & AVIO_FLAG_WRITE) {
+        s->socket = zmq_socket(s->context, ZMQ_PUB);
+        if (!s->socket) {
+            av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", zmq_strerror(errno));
+            zmq_ctx_destroy(s->context);
+            return AVERROR_EXTERNAL;
+        }
+
+        ret = zmq_bind(s->socket, uri);
+        if (ret == -1) {
+            av_log(h, AV_LOG_ERROR, "Error occured during zmq_bind(): %s\n", zmq_strerror(errno));
+            zmq_close(s->socket);
+            zmq_ctx_destroy(s->context);
+            return AVERROR_EXTERNAL;
+        }
+    }
+
+    /*subscribe for read*/
+    if (h->flags & AVIO_FLAG_READ) {
+        s->socket = zmq_socket(s->context, ZMQ_SUB);
+        if (!s->socket) {
+            av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", zmq_strerror(errno));
+            zmq_ctx_destroy(s->context);
+            return AVERROR_EXTERNAL;
+        }
+
+        zmq_setsockopt(s->socket, ZMQ_SUBSCRIBE, "", 0);
+        ret = zmq_connect(s->socket, uri);
+        if (ret == -1) {
+            av_log(h, AV_LOG_ERROR, "Error occured during zmq_connect(): %s\n", zmq_strerror(errno));
+            zmq_close(s->socket);
+            zmq_ctx_destroy(s->context);
+            return AVERROR_EXTERNAL;
+        }
+    }
+    return 0;
+}
+
+static int ff_zmq_write(URLContext *h, const unsigned char *buf, int size)
+{
+    int ret;
+    ZMQContext *s = h->priv_data;
+
+    ret = zmq_send(s->socket, buf, size, ZMQ_DONTWAIT);
+    if (ret >= 0)
+        return ret; /*number of sent bytes*/
+
+    /*errno = EAGAIN if messages cannot be pushed*/
+    if (ret == -1 && errno == EAGAIN) {
+        return AVERROR(EAGAIN);
+    } else
+        return AVERROR_EXTERNAL;
+}
+
+static int ff_zmq_read(URLContext *h, unsigned char *buf, int size)
+{
+    int ret;
+    ZMQContext *s = h->priv_data;
+    zmq_msg_t msg;
+    int msg_size;
+
+    ret = zmq_msg_init(&msg);
+    if (ret == -1) {
+      av_log(h, AV_LOG_ERROR, "Error occured during zmq_msg_init(): %s\n", zmq_strerror(errno));
+      return AVERROR_EXTERNAL;
+    }
+
+    ret = zmq_msg_recv(&msg, s->socket, ZMQ_DONTWAIT);
+    if (ret == -1) {
+        ret = (errno == EAGAIN) ? AVERROR(EAGAIN) : AVERROR_EXTERNAL;
+        if (ret == AVERROR_EXTERNAL)
+          av_log(h, AV_LOG_ERROR, "Error occured during zmq_msg_recv(): %s\n", zmq_strerror(errno));
+        goto finish;
+    }
+
+    msg_size = zmq_msg_size(&msg);
+    if (msg_size > size) {
+        msg_size = size;
+        av_log(h, AV_LOG_WARNING, "ZMQ message exceeds available space in the buffer. Message will be truncated\n");
+    }
+    memcpy(buf, zmq_msg_data(&msg), msg_size);
+
+finish:
+    zmq_msg_close(&msg);
+    return ret;
+}
+
+static int ff_zmq_close(URLContext *h)
+{
+    ZMQContext *s = h->priv_data;
+    zmq_close(s->socket);
+    zmq_ctx_destroy(s->context);
+    return 0;
+}
+
+static const AVClass zmq_context_class = {
+    .class_name = "zmq",
+    .item_name  = av_default_item_name,
+    .option     = options,
+    .version    = LIBAVUTIL_VERSION_INT,
+};
+
+const URLProtocol ff_libzmq_protocol = {
+    .name            = "zmq",
+    .url_close       = ff_zmq_close,
+    .url_open        = ff_zmq_open,
+    .url_read        = ff_zmq_read,
+    .url_write       = ff_zmq_write,
+    .priv_data_size  = sizeof(ZMQContext),
+    .priv_data_class = &zmq_context_class,
+    .flags           = URL_PROTOCOL_FLAG_NETWORK,
+};
diff --git a/libavformat/protocols.c b/libavformat/protocols.c
index ad95659795..face5b29b5 100644
--- a/libavformat/protocols.c
+++ b/libavformat/protocols.c
@@ -68,6 +68,7 @@  extern const URLProtocol ff_librtmpte_protocol;
 extern const URLProtocol ff_libsrt_protocol;
 extern const URLProtocol ff_libssh_protocol;
 extern const URLProtocol ff_libsmbclient_protocol;
+extern const URLProtocol ff_libzmq_protocol;
 
 #include "libavformat/protocol_list.c"