diff mbox

[FFmpeg-devel,v4] Add ZeroMQ as protocol option

Message ID 20190829183430.7bvwklitrkv3druq@manj
State Superseded
Headers show

Commit Message

Andriy Gelman Aug. 29, 2019, 6:34 p.m. UTC
Changes in v4: 
  - Use polling instead of non-blocking option for socket
    read/write operations.
  - Added pkt_size, timeout_send, timeout_recv options.
    Updated documentation for new options.
  - Removed redundant memcpy now that pkt_size is set.
  - Check that context is successfully created.
  - Added minimum version requirement for libzmq in
    configure (set to 4.3.1)
  - Removed link to ubuntu compilation guide from
    documentation.
  - Use zmq_errno() to get errno. 
  - Updated Changelog and bumped micro version number.
  - Added author name.

Thanks, 
Andriy
From ca5aa5149b97152b15457b2f40c753c80e68e3de Mon Sep 17 00:00:00 2001
From: Andriy Gelman <andriy.gelman@gmail.com>
Date: Tue, 30 Jul 2019 14:39:32 -0400
Subject: [PATCH] libavformat: Add ZeroMQ as a protocol option

Currently multiple clients are only supported by using a multicast
destination address. An alternative is to stream to a server which
re-distributes the content. This commit adds ZeroMQ as a protocol
option, which allows multiple clients to connect to a single ffmpeg
instance.
---
 Changelog               |   1 +
 configure               |   4 +-
 doc/general.texi        |   1 +
 doc/protocols.texi      |  48 +++++++++++
 libavformat/Makefile    |   1 +
 libavformat/libzmq.c    | 184 ++++++++++++++++++++++++++++++++++++++++
 libavformat/protocols.c |   1 +
 libavformat/version.h   |   2 +-
 8 files changed, 240 insertions(+), 2 deletions(-)
 create mode 100644 libavformat/libzmq.c

Comments

Marton Balint Aug. 29, 2019, 7:01 p.m. UTC | #1
On Thu, 29 Aug 2019, Andriy Gelman wrote:

> Changes in v4:
>  - Use polling instead of non-blocking option for socket
>    read/write operations.
>  - Added pkt_size, timeout_send, timeout_recv options.
>    Updated documentation for new options.

No, timeout_send and timeout_recv is not needed. The URL context has a 
timeout parameter.

Please see how unix.c or tcp.c does the polling: They both use a helper 
funciton called ff_network_wait_fd_timeout (implemented in network.c) 
which does polling in a loop with small timeouts in order to be able to 
check the interrupt callback. After successful polling you can call the 
actual transfer function.

You should do something similar, copy ff_network_wait_fd_timeout, replace 
the file descriptor polling with zmq polling and you should be good to go.

Thanks,
Marton
Andriy Gelman Aug. 30, 2019, 5:37 p.m. UTC | #2
On Thu, 29. Aug 21:01, Marton Balint wrote:
> 
> 
> On Thu, 29 Aug 2019, Andriy Gelman wrote:
> 
> > Changes in v4:
> >  - Use polling instead of non-blocking option for socket
> >    read/write operations.
> >  - Added pkt_size, timeout_send, timeout_recv options.
> >    Updated documentation for new options.
> 
> No, timeout_send and timeout_recv is not needed. The URL context has a
> timeout parameter.
> 
> Please see how unix.c or tcp.c does the polling: They both use a helper
> funciton called ff_network_wait_fd_timeout (implemented in network.c) which
> does polling in a loop with small timeouts in order to be able to check the
> interrupt callback. After successful polling you can call the actual
> transfer function.
> 
> You should do something similar, copy ff_network_wait_fd_timeout, replace
> the file descriptor polling with zmq polling and you should be good to go.

Thanks Marton, I've added these changes. I'll send the updated patch shortly. 

When working on this new version, I first tried to use the AVIO_FLAG_NONBLOCK
flag to decide whether to go into a blocking or non-blocking mode.

My intention was to return AVERROR(EAGAIN) when in the non-blocking mode if
read/write is not available. But avio exits with "Resource temporarily
unavailable" when AVIO_FLAG_NONBLOCK flag is set and AVERROR(EAGAIN) is
returned.  So probably I've misunderstood the meaning of AVIO_FLAG_NONBLOCK.
The updated patch is not using this flag.

Another point: the documentation in url.h says:

" * In non-blocking mode, return AVERROR(EAGAIN) immediately.
 * In blocking mode, wait for data/EOF/error with a short timeout (0.1s),
 * and return AVERROR(EAGAIN) on timeout."

But the function ff_network_wait_fd_timeout returns AVERROR(ETIMEDOUT) on
timeout (causing the code to exit) in blocking mode. It seems to me that either
the documentation or the return value should be updated. Should I submit a
separate patch on this? 

Thanks, 
Andriy
Marton Balint Aug. 30, 2019, 11:48 p.m. UTC | #3
On Fri, 30 Aug 2019, Andriy Gelman wrote:

> On Thu, 29. Aug 21:01, Marton Balint wrote:
>> 
>> 
>> On Thu, 29 Aug 2019, Andriy Gelman wrote:
>> 
>> > Changes in v4:
>> >  - Use polling instead of non-blocking option for socket
>> >    read/write operations.
>> >  - Added pkt_size, timeout_send, timeout_recv options.
>> >    Updated documentation for new options.
>> 
>> No, timeout_send and timeout_recv is not needed. The URL context has a
>> timeout parameter.
>> 
>> Please see how unix.c or tcp.c does the polling: They both use a helper
>> funciton called ff_network_wait_fd_timeout (implemented in network.c) which
>> does polling in a loop with small timeouts in order to be able to check the
>> interrupt callback. After successful polling you can call the actual
>> transfer function.
>> 
>> You should do something similar, copy ff_network_wait_fd_timeout, replace
>> the file descriptor polling with zmq polling and you should be good to go.
>
> Thanks Marton, I've added these changes. I'll send the updated patch shortly. 
>
> When working on this new version, I first tried to use the AVIO_FLAG_NONBLOCK
> flag to decide whether to go into a blocking or non-blocking mode.
>
> My intention was to return AVERROR(EAGAIN) when in the non-blocking mode if
> read/write is not available. But avio exits with "Resource temporarily
> unavailable" when AVIO_FLAG_NONBLOCK flag is set and AVERROR(EAGAIN) is
> returned.  So probably I've misunderstood the meaning of AVIO_FLAG_NONBLOCK.
> The updated patch is not using this flag.

AVIO_FLAG_NONBLOCK is not heavily used or tested. You can still add 
support for it, (if the flag is set then you skip the code where you poll 
and if the transfer function returns EAGAIN then you return that as a 
special case). I am not sure how you can test it actually.

>
> Another point: the documentation in url.h says:
>
> " * In non-blocking mode, return AVERROR(EAGAIN) immediately.
> * In blocking mode, wait for data/EOF/error with a short timeout (0.1s),
> * and return AVERROR(EAGAIN) on timeout."
>
> But the function ff_network_wait_fd_timeout returns AVERROR(ETIMEDOUT) on
> timeout (causing the code to exit) in blocking mode. It seems to me that either
> the documentation or the return value should be updated. Should I submit a
> separate patch on this?

Yeah, the docs seems inaccurate. I guess the idea was that common code in 
avio.c:retry_transfer_wrapper should handle the retries but the common 
code has to differentiate between EAGAIN which should be retried 
immediately and EAGAIN which should be retried after sleeping a bit. Maybe 
we should simply return EINTR for the immediate retry case, because that 
already means an immediate retry in retry_transfer_wrapper.

So I guess the docs should be changed to that (yes, submit a separate 
patch), and existing protocols should be fixed to return EINTR when the 
poll() times out instead of EAGAIN. This way we can reduce the duplicated 
code from the protocols. And of course this also means that the zmq 
protocol can return EINTR on poll timeout and you can call ff_zmq_wait 
instead of ff_zmq_wait_timeout.

Regards,
Marton
Marton Balint Aug. 31, 2019, 1:20 a.m. UTC | #4
On Sat, 31 Aug 2019, Marton Balint wrote:

>
>
> On Fri, 30 Aug 2019, Andriy Gelman wrote:
>
>> On Thu, 29. Aug 21:01, Marton Balint wrote:
>>> 
>>> 
>>> On Thu, 29 Aug 2019, Andriy Gelman wrote:
>>> 
>>> > Changes in v4:
>>> >  - Use polling instead of non-blocking option for socket
>>> >    read/write operations.
>>> >  - Added pkt_size, timeout_send, timeout_recv options.
>>> >    Updated documentation for new options.
>>> 
>>> No, timeout_send and timeout_recv is not needed. The URL context has a
>>> timeout parameter.
>>> 
>>> Please see how unix.c or tcp.c does the polling: They both use a helper
>>> funciton called ff_network_wait_fd_timeout (implemented in network.c) 
> which
>>> does polling in a loop with small timeouts in order to be able to check 
> the
>>> interrupt callback. After successful polling you can call the actual
>>> transfer function.
>>> 
>>> You should do something similar, copy ff_network_wait_fd_timeout, replace
>>> the file descriptor polling with zmq polling and you should be good to go.
>>
>> Thanks Marton, I've added these changes. I'll send the updated patch 
> shortly. 
>>
>> When working on this new version, I first tried to use the 
> AVIO_FLAG_NONBLOCK
>> flag to decide whether to go into a blocking or non-blocking mode.
>>
>> My intention was to return AVERROR(EAGAIN) when in the non-blocking mode if
>> read/write is not available. But avio exits with "Resource temporarily
>> unavailable" when AVIO_FLAG_NONBLOCK flag is set and AVERROR(EAGAIN) is
>> returned.  So probably I've misunderstood the meaning of 
> AVIO_FLAG_NONBLOCK.
>> The updated patch is not using this flag.
>
> AVIO_FLAG_NONBLOCK is not heavily used or tested. You can still add 
> support for it, (if the flag is set then you skip the code where you poll 
> and if the transfer function returns EAGAIN then you return that as a 
> special case). I am not sure how you can test it actually.
>
>>
>> Another point: the documentation in url.h says:
>>
>> " * In non-blocking mode, return AVERROR(EAGAIN) immediately.
>> * In blocking mode, wait for data/EOF/error with a short timeout (0.1s),
>> * and return AVERROR(EAGAIN) on timeout."
>>
>> But the function ff_network_wait_fd_timeout returns AVERROR(ETIMEDOUT) on
>> timeout (causing the code to exit) in blocking mode. It seems to me that 
> either
>> the documentation or the return value should be updated. Should I submit a
>> separate patch on this?
>
> Yeah, the docs seems inaccurate. I guess the idea was that common code in 
> avio.c:retry_transfer_wrapper should handle the retries but the common 
> code has to differentiate between EAGAIN which should be retried 
> immediately and EAGAIN which should be retried after sleeping a bit. Maybe 
> we should simply return EINTR for the immediate retry case, because that 
> already means an immediate retry in retry_transfer_wrapper.

Ok, this won't work out of the box, because for EINTR we will not 
check if rw_timeout is elapsed...

>
> So I guess the docs should be changed to that (yes, submit a separate 
> patch), and existing protocols should be fixed to return EINTR when the 
> poll() times out instead of EAGAIN. This way we can reduce the duplicated 
> code from the protocols. And of course this also means that the zmq 
> protocol can return EINTR on poll timeout and you can call ff_zmq_wait 
> instead of ff_zmq_wait_timeout.

Keep the zmq patch as is then, we can remove the extra waiter loop 
function once the API is cleaned up...

Thanks,
Marton
Andriy Gelman Aug. 31, 2019, 8:35 p.m. UTC | #5
On Sat, 31. Aug 03:20, Marton Balint wrote:
> 
> 
> On Sat, 31 Aug 2019, Marton Balint wrote:
> 
> > 
> > 
> > On Fri, 30 Aug 2019, Andriy Gelman wrote:
> > 
> > > On Thu, 29. Aug 21:01, Marton Balint wrote:
> > > > 
> > > > 
> > > > On Thu, 29 Aug 2019, Andriy Gelman wrote:
> > > > 
> > > > > Changes in v4:
> > > > >  - Use polling instead of non-blocking option for socket
> > > > >    read/write operations.
> > > > >  - Added pkt_size, timeout_send, timeout_recv options.
> > > > >    Updated documentation for new options.
> > > > 
> > > > No, timeout_send and timeout_recv is not needed. The URL context has a
> > > > timeout parameter.
> > > > 
> > > > Please see how unix.c or tcp.c does the polling: They both use a helper
> > > > funciton called ff_network_wait_fd_timeout (implemented in
> > > > network.c)
> > which
> > > > does polling in a loop with small timeouts in order to be able
> > > > to check
> > the
> > > > interrupt callback. After successful polling you can call the actual
> > > > transfer function.
> > > > 
> > > > You should do something similar, copy ff_network_wait_fd_timeout, replace
> > > > the file descriptor polling with zmq polling and you should be good to go.
> > > 
> > > Thanks Marton, I've added these changes. I'll send the updated patch
> > shortly.
> > > 
> > > When working on this new version, I first tried to use the
> > AVIO_FLAG_NONBLOCK
> > > flag to decide whether to go into a blocking or non-blocking mode.
> > > 
> > > My intention was to return AVERROR(EAGAIN) when in the non-blocking mode if
> > > read/write is not available. But avio exits with "Resource temporarily
> > > unavailable" when AVIO_FLAG_NONBLOCK flag is set and AVERROR(EAGAIN) is
> > > returned.  So probably I've misunderstood the meaning of
> > AVIO_FLAG_NONBLOCK.
> > > The updated patch is not using this flag.
> > 
> > AVIO_FLAG_NONBLOCK is not heavily used or tested. You can still add
> > support for it, (if the flag is set then you skip the code where you
> > poll and if the transfer function returns EAGAIN then you return that as
> > a special case). I am not sure how you can test it actually.
> > 
> > > 
> > > Another point: the documentation in url.h says:
> > > 
> > > " * In non-blocking mode, return AVERROR(EAGAIN) immediately.
> > > * In blocking mode, wait for data/EOF/error with a short timeout (0.1s),
> > > * and return AVERROR(EAGAIN) on timeout."
> > > 
> > > But the function ff_network_wait_fd_timeout returns AVERROR(ETIMEDOUT) on
> > > timeout (causing the code to exit) in blocking mode. It seems to me
> > > that
> > either
> > > the documentation or the return value should be updated. Should I submit a
> > > separate patch on this?
> > 
> > Yeah, the docs seems inaccurate. I guess the idea was that common code
> > in avio.c:retry_transfer_wrapper should handle the retries but the
> > common code has to differentiate between EAGAIN which should be retried
> > immediately and EAGAIN which should be retried after sleeping a bit.
> > Maybe we should simply return EINTR for the immediate retry case,
> > because that already means an immediate retry in retry_transfer_wrapper.
> 
> Ok, this won't work out of the box, because for EINTR we will not check if
> rw_timeout is elapsed...
> 
> > 
> > So I guess the docs should be changed to that (yes, submit a separate
> > patch), and existing protocols should be fixed to return EINTR when the
> > poll() times out instead of EAGAIN. This way we can reduce the
> > duplicated code from the protocols. And of course this also means that
> > the zmq protocol can return EINTR on poll timeout and you can call
> > ff_zmq_wait instead of ff_zmq_wait_timeout.
> 
> Keep the zmq patch as is then, we can remove the extra waiter loop function
> once the API is cleaned up...

ok, sounds good. I'll look over the code in retry_transfer_wrapper and try to
propose a solution. 

Thanks, 
Andriy

> 
> Thanks,
> Marton
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel@ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
> 
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
diff mbox

Patch

diff --git a/Changelog b/Changelog
index f1cddc4274..1ccaca0b60 100644
--- a/Changelog
+++ b/Changelog
@@ -7,6 +7,7 @@  version <next>:
 - Intel QSV-accelerated VP9 decoding
 - support for TrueHD in mp4
 - Supoort AMD AMF encoder on Linux (via Vulkan)
+- ZeroMQ protocol
 
 
 version 4.2:
diff --git a/configure b/configure
index 691ae9bd4f..2cc6323534 100755
--- a/configure
+++ b/configure
@@ -3413,6 +3413,8 @@  libsrt_protocol_deps="libsrt"
 libsrt_protocol_select="network"
 libssh_protocol_deps="libssh"
 libtls_conflict="openssl gnutls mbedtls"
+libzmq_protocol_deps="libzmq"
+libzmq_protocol_select="network"
 
 # filters
 afftdn_filter_deps="avcodec"
@@ -6323,7 +6325,7 @@  enabled libxavs           && require libxavs "stdint.h xavs.h" xavs_encoder_enco
 enabled libxavs2          && require_pkg_config libxavs2 "xavs2 >= 1.3.0" "stdint.h xavs2.h" xavs2_api_get
 enabled libxvid           && require libxvid xvid.h xvid_global -lxvidcore
 enabled libzimg           && require_pkg_config libzimg "zimg >= 2.7.0" zimg.h zimg_get_api_version
-enabled libzmq            && require_pkg_config libzmq libzmq zmq.h zmq_ctx_new
+enabled libzmq            && require_pkg_config libzmq "libzmq >= 4.3.1" zmq.h zmq_ctx_new
 enabled libzvbi           && require_pkg_config libzvbi zvbi-0.2 libzvbi.h vbi_decoder_new &&
                              { test_cpp_condition libzvbi.h "VBI_VERSION_MAJOR > 0 || VBI_VERSION_MINOR > 2 || VBI_VERSION_MINOR == 2 && VBI_VERSION_MICRO >= 28" ||
                                enabled gpl || die "ERROR: libzvbi requires version 0.2.28 or --enable-gpl."; }
diff --git a/doc/general.texi b/doc/general.texi
index d0c3525e02..2744c238cf 100644
--- a/doc/general.texi
+++ b/doc/general.texi
@@ -1339,6 +1339,7 @@  performance on systems without hardware floating point support).
 @item TCP          @tab X
 @item TLS          @tab X
 @item UDP          @tab X
+@item ZMQ          @tab E
 @end multitable
 
 @code{X} means that the protocol is supported.
diff --git a/doc/protocols.texi b/doc/protocols.texi
index 3e4e7af3d4..4ce33d60da 100644
--- a/doc/protocols.texi
+++ b/doc/protocols.texi
@@ -1728,4 +1728,52 @@  Timeout in ms.
 Create the Unix socket in listening mode.
 @end table
 
+@section libzmq
+
+ZeroMQ asynchronous messaging library.
+
+This library supports unicast streaming to multiple clients without relying on
+an external server.
+
+The required syntax for streaming or connecting to a stream is:
+@example
+zmq:tcp://ip-address:port
+@end example
+
+Example:
+Create a localhost stream on port 5555:
+@example
+ffmpeg -re -i input -f mpegts zmq:tcp://127.0.0.1:5555
+@end example
+
+Multiple clients may connect to the stream using:
+@example
+ffplay zmq:tcp://127.0.0.1:5555
+@end example
+
+Streaming to multiple clients is implemented using a ZeroMQ Pub-Sub pattern.
+The server side binds to a port and publishes data. Clients connect to the
+server (via IP address/port) and subscribe to the stream. The order in which
+the server and client start generally does not matter.
+
+ffmpeg must be compiled with the --enable-libzmq option to support
+this protocol.
+
+Options can be set on the @command{ffmpeg}/@command{ffplay} command
+line. The following options are supported:
+
+@table @option
+
+@item pkt_size
+Forces the maximum packet size for sending/receiving data. The default value is 32,768 bytes. On the server side, this sets the maximum size of sent packets via ZeroMQ. On the clients, it sets an internal buffer size for receiving packets. Note that pkt_size on the clients should be equal to or greater than pkt_size on the server. Otherwise the received message may be truncated causing decoding errors.
+
+@item timeout_send
+Polling timeout (in milliseconds) on the server side. The default value is 0 (non-blocking operation). A value of -1 means that the polling operation will block infinitly until the packet can be sent.
+
+@item timeout_recv
+Polling timeout (in milliseconds) on the client side. The default value is 0 (non-blocking operation). A value of -1 means that the polling operation will block infintly until a packet can be received.
+
+@end table
+
+
 @c man end PROTOCOLS
diff --git a/libavformat/Makefile b/libavformat/Makefile
index a434b005a4..efa3a112ae 100644
--- a/libavformat/Makefile
+++ b/libavformat/Makefile
@@ -631,6 +631,7 @@  OBJS-$(CONFIG_LIBRTMPTE_PROTOCOL)        += librtmp.o
 OBJS-$(CONFIG_LIBSMBCLIENT_PROTOCOL)     += libsmbclient.o
 OBJS-$(CONFIG_LIBSRT_PROTOCOL)           += libsrt.o
 OBJS-$(CONFIG_LIBSSH_PROTOCOL)           += libssh.o
+OBJS-$(CONFIG_LIBZMQ_PROTOCOL)           += libzmq.o
 
 # libavdevice dependencies
 OBJS-$(CONFIG_IEC61883_INDEV)            += dv.o
diff --git a/libavformat/libzmq.c b/libavformat/libzmq.c
new file mode 100644
index 0000000000..fb25ae4428
--- /dev/null
+++ b/libavformat/libzmq.c
@@ -0,0 +1,184 @@ 
+/*
+ * ZeroMQ Protocol
+ * Copyright (c) 2019 Andriy Gelman
+ *
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <zmq.h>
+#include "url.h"
+#include "network.h"
+#include "libavutil/avstring.h"
+#include "libavutil/opt.h"
+
+#define ZMQ_STRERROR  zmq_strerror(zmq_errno())
+
+typedef struct ZMQContext {
+    const AVClass *class;
+    void *context;
+    void *socket;
+    int   timeout_send;
+    int   timeout_recv;
+    int   pkt_size;
+    int   pkt_size_overflow; /*keep track of the largest packet during overflow*/
+} ZMQContext;
+
+#define OFFSET(x) offsetof(ZMQContext, x)
+#define D AV_OPT_FLAG_DECODING_PARAM
+#define E AV_OPT_FLAG_ENCODING_PARAM
+static const AVOption options[] = {
+    { "timeout_send", "timeout (milliseconds) of socket send", OFFSET(timeout_send), AV_OPT_TYPE_INT, { .i64 = 0 },     -1, INT_MAX, .flags = E },
+    { "timeout_recv", "timeout (milliseconds) of socket read", OFFSET(timeout_recv), AV_OPT_TYPE_INT, { .i64 = 0 },     -1, INT_MAX, .flags = D },
+    { "pkt_size",     "Maximum send/read packet size",         OFFSET(pkt_size),     AV_OPT_TYPE_INT, { .i64 = 32768 }, -1, INT_MAX, .flags = D | E },
+    { NULL }
+};
+
+static int ff_zmq_open(URLContext *h, const char *uri, int flags)
+{
+    int ret;
+    ZMQContext *s        = h->priv_data;
+    s->pkt_size_overflow = 0;
+    h->is_streamed       = 1;
+
+    if (s->pkt_size > 0)
+        h->max_packet_size = s->pkt_size;
+
+    s->context = zmq_ctx_new();
+    if (!s->context) {
+        /*errno not set on failure during zmq_ctx_new()*/
+        av_log(h, AV_LOG_ERROR, "Error occured during zmq_ctx_new()\n");
+        return AVERROR_EXTERNAL;
+    }
+
+    av_strstart(uri, "zmq:", &uri);
+
+    /*publish during write*/
+    if (h->flags & AVIO_FLAG_WRITE) {
+        s->socket = zmq_socket(s->context, ZMQ_PUB);
+        if (!s->socket) {
+            av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", ZMQ_STRERROR);
+            zmq_ctx_destroy(s->context);
+            return AVERROR_EXTERNAL;
+        }
+
+        ret = zmq_bind(s->socket, uri);
+        if (ret == -1) {
+            av_log(h, AV_LOG_ERROR, "Error occured during zmq_bind(): %s\n", ZMQ_STRERROR);
+            zmq_close(s->socket);
+            zmq_ctx_destroy(s->context);
+            return AVERROR_EXTERNAL;
+        }
+    }
+
+    /*subscribe for read*/
+    if (h->flags & AVIO_FLAG_READ) {
+        s->socket = zmq_socket(s->context, ZMQ_SUB);
+        if (!s->socket) {
+            av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", ZMQ_STRERROR);
+            zmq_ctx_destroy(s->context);
+            return AVERROR_EXTERNAL;
+        }
+
+        zmq_setsockopt(s->socket, ZMQ_SUBSCRIBE, "", 0);
+        ret = zmq_connect(s->socket, uri);
+        if (ret == -1) {
+            av_log(h, AV_LOG_ERROR, "Error occured during zmq_connect(): %s\n", ZMQ_STRERROR);
+            zmq_close(s->socket);
+            zmq_ctx_destroy(s->context);
+            return AVERROR_EXTERNAL;
+        }
+    }
+    return 0;
+}
+
+static int ff_zmq_write(URLContext *h, const unsigned char *buf, int size)
+{
+    int ret;
+    ZMQContext *s = h->priv_data;
+
+    zmq_pollitem_t items = { s->socket,  0, ZMQ_POLLOUT, 0 };
+
+    ret = zmq_poll(&items, 1, s->timeout_send);
+    if (ret == -1) {
+        av_log(h, AV_LOG_ERROR, "Error occured during zmq_poll(): %s\n", ZMQ_STRERROR);
+        return AVERROR_EXTERNAL;
+    }
+    if (!(items.revents & ZMQ_POLLOUT))
+        return AVERROR(EAGAIN);
+
+    ret = zmq_send(s->socket, buf, size, 0);
+    if (ret == -1) {
+        av_log(h, AV_LOG_ERROR, "Error occured during zmq_send(): %s\n", ZMQ_STRERROR);
+        return AVERROR_EXTERNAL;
+    }
+    return ret; /*number of bytes sent*/
+}
+
+static int ff_zmq_read(URLContext *h, unsigned char *buf, int size)
+{
+    int ret;
+    ZMQContext *s = h->priv_data;
+
+    zmq_pollitem_t items = { s->socket, 0, ZMQ_POLLIN, 0 };
+
+    ret = zmq_poll(&items, 1, s->timeout_recv);
+    if (ret == -1) {
+        av_log(h, AV_LOG_ERROR, "Error occured during zmq_poll(): %s\n", ZMQ_STRERROR);
+        return AVERROR_EXTERNAL;
+    }
+    if (!(items.revents & ZMQ_POLLIN))
+        return AVERROR(EAGAIN);
+
+    ret = zmq_recv(s->socket, buf, size, 0);
+    if (ret == -1) {
+        av_log(h, AV_LOG_ERROR, "Error occured during zmq_recv(): %s\n", ZMQ_STRERROR);
+        return AVERROR_EXTERNAL;
+    }
+
+    if (ret > size) {
+        s->pkt_size_overflow = FFMAX(s->pkt_size_overflow, ret);
+        av_log(h, AV_LOG_WARNING, "Message exceeds available space in the buffer. Message will be truncated. Setting -pkt_size %d may resolve the issue.\n", s->pkt_size_overflow);
+        ret = size;
+    }
+    return ret; /*number of bytes read*/
+}
+
+static int ff_zmq_close(URLContext *h)
+{
+    ZMQContext *s = h->priv_data;
+    zmq_close(s->socket);
+    zmq_ctx_destroy(s->context);
+    return 0;
+}
+
+static const AVClass zmq_context_class = {
+    .class_name = "zmq",
+    .item_name  = av_default_item_name,
+    .option     = options,
+    .version    = LIBAVUTIL_VERSION_INT,
+};
+
+const URLProtocol ff_libzmq_protocol = {
+    .name            = "zmq",
+    .url_close       = ff_zmq_close,
+    .url_open        = ff_zmq_open,
+    .url_read        = ff_zmq_read,
+    .url_write       = ff_zmq_write,
+    .priv_data_size  = sizeof(ZMQContext),
+    .priv_data_class = &zmq_context_class,
+    .flags           = URL_PROTOCOL_FLAG_NETWORK,
+};
diff --git a/libavformat/protocols.c b/libavformat/protocols.c
index ad95659795..face5b29b5 100644
--- a/libavformat/protocols.c
+++ b/libavformat/protocols.c
@@ -68,6 +68,7 @@  extern const URLProtocol ff_librtmpte_protocol;
 extern const URLProtocol ff_libsrt_protocol;
 extern const URLProtocol ff_libssh_protocol;
 extern const URLProtocol ff_libsmbclient_protocol;
+extern const URLProtocol ff_libzmq_protocol;
 
 #include "libavformat/protocol_list.c"
 
diff --git a/libavformat/version.h b/libavformat/version.h
index af0db1e970..edfa73fb97 100644
--- a/libavformat/version.h
+++ b/libavformat/version.h
@@ -32,7 +32,7 @@ 
 // Major bumping may affect Ticket5467, 5421, 5451(compatibility with Chromium)
 // Also please add any ticket numbers that you believe might be affected here
 #define LIBAVFORMAT_VERSION_MAJOR  58
-#define LIBAVFORMAT_VERSION_MINOR  31
+#define LIBAVFORMAT_VERSION_MINOR  32
 #define LIBAVFORMAT_VERSION_MICRO 104
 
 #define LIBAVFORMAT_VERSION_INT AV_VERSION_INT(LIBAVFORMAT_VERSION_MAJOR, \