From patchwork Fri Aug 30 17:37:22 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Andriy Gelman X-Patchwork-Id: 14814 Return-Path: X-Original-To: patchwork@ffaux-bg.ffmpeg.org Delivered-To: patchwork@ffaux-bg.ffmpeg.org Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org [79.124.17.100]) by ffaux.localdomain (Postfix) with ESMTP id D614A44A076 for ; Fri, 30 Aug 2019 20:37:26 +0300 (EEST) Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id BDFD5687FAC; Fri, 30 Aug 2019 20:37:26 +0300 (EEST) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from mail-qk1-f176.google.com (mail-qk1-f176.google.com [209.85.222.176]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id 05B16687FA2 for ; Fri, 30 Aug 2019 20:37:24 +0300 (EEST) Received: by mail-qk1-f176.google.com with SMTP id m2so6800084qki.12 for ; Fri, 30 Aug 2019 10:37:24 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=date:from:to:subject:message-id:mime-version:content-disposition :user-agent; bh=TV5gNjYrpNUP9NDIHp7gc4Gy4DfYYm1gVBP45rcgtSE=; b=EVuSD1qAvViTKQT75UYAu77MxvvyFXb5021ST8QU/nJnrhYf51T5G2IUrLVZCFaCL0 ZdoxWWCMNORsNiOg/rRJFM0uUjUslcrfVCwJEOXMGif/wQmdP26BNrrQwUXGKL3+dMu9 hKK6O4T45hjMLU/hqnYTPwYMD3IGt/qszSQbXXxDEKeA6Fc+l3PsVrEsWddrUy/BxWNW CgXRqefewB9KvlcrPFKXymZynnkHtewC7OYM4myL+VAmX4TX85SdYeFk0I1/ZMqaHTT+ R6n7oiR5e93vSvER8kmvzy2wedyjuyINhd8tqTC9Z1ErjN1GhdtMg9eY/k0uZ78OG2r3 /9SA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:date:from:to:subject:message-id:mime-version :content-disposition:user-agent; bh=TV5gNjYrpNUP9NDIHp7gc4Gy4DfYYm1gVBP45rcgtSE=; b=Tgxws0jOYbw+Es5iaCfpf7zoppCWnMQlTcXnBSv63zbizbjb0r5asIy9k2Sl2Y1AAK qQAzZHVD//0eW+3KF72TyTCCH98ntGROU5U58EzZT8ykPBHQ7OlN8TEgAGyZHctBgONz 58fWM813k0gU+pSenDP6BTPm5QTiatg9x9elwuBhkqGjQsVg+INBL5Gge6TMueQ2Xz1k xvHiYub4mb1lO7y7bh062V7BP2pgfuyAA9HsjovDcteszUXahWo+FyEVrdJIQG5NHuLr V11iph2n9tUwSnarXmFgKx8HxyihwWZeWLzT8clwK+TCazhDP9hRi4emhc1p1EBXtc06 +u2g== X-Gm-Message-State: APjAAAX7ohceCdBbNfD25pmEkDouh89xZDroiP4OGEpMcqI5xKPlnWJY WdSMYVtYbtUAMnFMW+F1eD9bBGRHAHc= X-Google-Smtp-Source: APXvYqyX7LbK9Kw/7YtlscG0AmkOpZfF16k/1I7SvrAo7daXHMe2rs4Fhv/zebbGp1gZ5Y31rHeQiQ== X-Received: by 2002:ae9:eb03:: with SMTP id b3mr11892276qkg.207.1567186643220; Fri, 30 Aug 2019 10:37:23 -0700 (PDT) Received: from manj (ip-184-209-191-104.spfdma.spcsdns.net. [184.209.191.104]) by smtp.gmail.com with ESMTPSA id x3sm2885190qkl.71.2019.08.30.10.37.21 for (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Fri, 30 Aug 2019 10:37:22 -0700 (PDT) Date: Fri, 30 Aug 2019 13:37:22 -0400 From: Andriy Gelman To: ffmpeg-devel@ffmpeg.org Message-ID: <20190830173722.i4n7ymxax6pcvcix@manj> MIME-Version: 1.0 Content-Disposition: inline User-Agent: NeoMutt/20180716-1344-11488f-dirty Subject: [FFmpeg-devel] [PATCH v5] Add ZeroMQ as protocol option X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.20 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" Changes in v5: - Use polling in a loop to check interrupt callback. - Remove timeout_send/timeout_recv options and update documentation. Thanks, Andriy From a0c642f59c314c7ae53917f2b0d02bc6779009c2 Mon Sep 17 00:00:00 2001 From: Andriy Gelman Date: Tue, 30 Jul 2019 14:39:32 -0400 Subject: [PATCH] libavformat: Add ZeroMQ as a protocol option Currently multiple clients are only supported by using a multicast destination address. An alternative is to stream to a server which re-distributes the content. This commit adds ZeroMQ as a protocol option, which allows multiple clients to connect to a single ffmpeg instance. --- Changelog | 1 + configure | 4 +- doc/general.texi | 1 + doc/protocols.texi | 42 +++++++++ libavformat/Makefile | 1 + libavformat/libzmq.c | 199 ++++++++++++++++++++++++++++++++++++++++ libavformat/protocols.c | 1 + libavformat/version.h | 2 +- 8 files changed, 249 insertions(+), 2 deletions(-) create mode 100644 libavformat/libzmq.c diff --git a/Changelog b/Changelog index 20e42964da..4b29e015a0 100644 --- a/Changelog +++ b/Changelog @@ -8,6 +8,7 @@ version : - support for TrueHD in mp4 - Supoort AMD AMF encoder on Linux (via Vulkan) - IMM5 video decoder +- ZeroMQ protocol version 4.2: diff --git a/configure b/configure index 08a54a7e9f..bde2bcc004 100755 --- a/configure +++ b/configure @@ -3414,6 +3414,8 @@ libsrt_protocol_deps="libsrt" libsrt_protocol_select="network" libssh_protocol_deps="libssh" libtls_conflict="openssl gnutls mbedtls" +libzmq_protocol_deps="libzmq" +libzmq_protocol_select="network" # filters afftdn_filter_deps="avcodec" @@ -6324,7 +6326,7 @@ enabled libxavs && require libxavs "stdint.h xavs.h" xavs_encoder_enco enabled libxavs2 && require_pkg_config libxavs2 "xavs2 >= 1.3.0" "stdint.h xavs2.h" xavs2_api_get enabled libxvid && require libxvid xvid.h xvid_global -lxvidcore enabled libzimg && require_pkg_config libzimg "zimg >= 2.7.0" zimg.h zimg_get_api_version -enabled libzmq && require_pkg_config libzmq libzmq zmq.h zmq_ctx_new +enabled libzmq && require_pkg_config libzmq "libzmq >= 4.3.1" zmq.h zmq_ctx_new enabled libzvbi && require_pkg_config libzvbi zvbi-0.2 libzvbi.h vbi_decoder_new && { test_cpp_condition libzvbi.h "VBI_VERSION_MAJOR > 0 || VBI_VERSION_MINOR > 2 || VBI_VERSION_MINOR == 2 && VBI_VERSION_MICRO >= 28" || enabled gpl || die "ERROR: libzvbi requires version 0.2.28 or --enable-gpl."; } diff --git a/doc/general.texi b/doc/general.texi index d0c3525e02..2744c238cf 100644 --- a/doc/general.texi +++ b/doc/general.texi @@ -1339,6 +1339,7 @@ performance on systems without hardware floating point support). @item TCP @tab X @item TLS @tab X @item UDP @tab X +@item ZMQ @tab E @end multitable @code{X} means that the protocol is supported. diff --git a/doc/protocols.texi b/doc/protocols.texi index 3e4e7af3d4..32a89c2fe5 100644 --- a/doc/protocols.texi +++ b/doc/protocols.texi @@ -1728,4 +1728,46 @@ Timeout in ms. Create the Unix socket in listening mode. @end table +@section libzmq + +ZeroMQ asynchronous messaging library. + +This library supports unicast streaming to multiple clients without relying on +an external server. + +The required syntax for streaming or connecting to a stream is: +@example +zmq:tcp://ip-address:port +@end example + +Example: +Create a localhost stream on port 5555: +@example +ffmpeg -re -i input -f mpegts zmq:tcp://127.0.0.1:5555 +@end example + +Multiple clients may connect to the stream using: +@example +ffplay zmq:tcp://127.0.0.1:5555 +@end example + +Streaming to multiple clients is implemented using a ZeroMQ Pub-Sub pattern. +The server side binds to a port and publishes data. Clients connect to the +server (via IP address/port) and subscribe to the stream. The order in which +the server and client start generally does not matter. + +ffmpeg must be compiled with the --enable-libzmq option to support +this protocol. + +Options can be set on the @command{ffmpeg}/@command{ffplay} command +line. The following options are supported: + +@table @option + +@item pkt_size +Forces the maximum packet size for sending/receiving data. The default value is 32,768 bytes. On the server side, this sets the maximum size of sent packets via ZeroMQ. On the clients, it sets an internal buffer size for receiving packets. Note that pkt_size on the clients should be equal to or greater than pkt_size on the server. Otherwise the received message may be truncated causing decoding errors. + +@end table + + @c man end PROTOCOLS diff --git a/libavformat/Makefile b/libavformat/Makefile index a434b005a4..efa3a112ae 100644 --- a/libavformat/Makefile +++ b/libavformat/Makefile @@ -631,6 +631,7 @@ OBJS-$(CONFIG_LIBRTMPTE_PROTOCOL) += librtmp.o OBJS-$(CONFIG_LIBSMBCLIENT_PROTOCOL) += libsmbclient.o OBJS-$(CONFIG_LIBSRT_PROTOCOL) += libsrt.o OBJS-$(CONFIG_LIBSSH_PROTOCOL) += libssh.o +OBJS-$(CONFIG_LIBZMQ_PROTOCOL) += libzmq.o # libavdevice dependencies OBJS-$(CONFIG_IEC61883_INDEV) += dv.o diff --git a/libavformat/libzmq.c b/libavformat/libzmq.c new file mode 100644 index 0000000000..9d8ffaaf4c --- /dev/null +++ b/libavformat/libzmq.c @@ -0,0 +1,199 @@ +/* + * ZeroMQ Protocol + * Copyright (c) 2019 Andriy Gelman + * + * This file is part of FFmpeg. + * + * FFmpeg is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * FFmpeg is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with FFmpeg; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include "url.h" +#include "network.h" +#include "libavutil/avstring.h" +#include "libavutil/opt.h" +#include "libavutil/time.h" + +#define ZMQ_STRERROR zmq_strerror(zmq_errno()) + +typedef struct ZMQContext { + const AVClass *class; + void *context; + void *socket; + int pkt_size; + int pkt_size_overflow; /*keep track of the largest packet during overflow*/ +} ZMQContext; + +#define OFFSET(x) offsetof(ZMQContext, x) +#define D AV_OPT_FLAG_DECODING_PARAM +#define E AV_OPT_FLAG_ENCODING_PARAM +static const AVOption options[] = { + { "pkt_size", "Maximum send/read packet size", OFFSET(pkt_size), AV_OPT_TYPE_INT, { .i64 = 32768 }, -1, INT_MAX, .flags = D | E }, + { NULL } +}; + +static int ff_zmq_wait(URLContext *h, void *socket, int write) +{ + int ret; + int ev = write ? ZMQ_POLLOUT : ZMQ_POLLIN; + zmq_pollitem_t items = { .socket = socket, .fd = 0, .events = ev, .revents = 0 }; + ret = zmq_poll(&items, 1, POLLING_TIME); + if (ret == -1) { + av_log(h, AV_LOG_ERROR, "Error occured during zmq_poll(): %s\n", ZMQ_STRERROR); + return AVERROR_EXTERNAL; + } + return items.revents & ev ? 0 : AVERROR(EAGAIN); +} + +static int ff_zmq_wait_timeout(URLContext *h, void *socket , int write, int64_t timeout, AVIOInterruptCB *int_cb) +{ + int ret; + int64_t wait_start = 0; + + while (1) { + if (ff_check_interrupt(int_cb)) + return AVERROR_EXIT; + ret = ff_zmq_wait(h, socket, write); + if (ret != AVERROR(EAGAIN)) + return ret; + if (timeout > 0) { + if (!wait_start) + wait_start = av_gettime_relative(); + else if (av_gettime_relative() - wait_start > timeout) + return AVERROR(ETIMEDOUT); + } + } +} + +static int ff_zmq_open(URLContext *h, const char *uri, int flags) +{ + int ret; + ZMQContext *s = h->priv_data; + s->pkt_size_overflow = 0; + h->is_streamed = 1; + + if (s->pkt_size > 0) + h->max_packet_size = s->pkt_size; + + s->context = zmq_ctx_new(); + if (!s->context) { + /*errno not set on failure during zmq_ctx_new()*/ + av_log(h, AV_LOG_ERROR, "Error occured during zmq_ctx_new()\n"); + return AVERROR_EXTERNAL; + } + + av_strstart(uri, "zmq:", &uri); + + /*publish during write*/ + if (h->flags & AVIO_FLAG_WRITE) { + s->socket = zmq_socket(s->context, ZMQ_PUB); + if (!s->socket) { + av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", ZMQ_STRERROR); + zmq_ctx_destroy(s->context); + return AVERROR_EXTERNAL; + } + + ret = zmq_bind(s->socket, uri); + if (ret == -1) { + av_log(h, AV_LOG_ERROR, "Error occured during zmq_bind(): %s\n", ZMQ_STRERROR); + zmq_close(s->socket); + zmq_ctx_destroy(s->context); + return AVERROR_EXTERNAL; + } + } + + /*subscribe for read*/ + if (h->flags & AVIO_FLAG_READ) { + s->socket = zmq_socket(s->context, ZMQ_SUB); + if (!s->socket) { + av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", ZMQ_STRERROR); + zmq_ctx_destroy(s->context); + return AVERROR_EXTERNAL; + } + + zmq_setsockopt(s->socket, ZMQ_SUBSCRIBE, "", 0); + ret = zmq_connect(s->socket, uri); + if (ret == -1) { + av_log(h, AV_LOG_ERROR, "Error occured during zmq_connect(): %s\n", ZMQ_STRERROR); + zmq_close(s->socket); + zmq_ctx_destroy(s->context); + return AVERROR_EXTERNAL; + } + } + return 0; +} + +static int ff_zmq_write(URLContext *h, const unsigned char *buf, int size) +{ + int ret; + ZMQContext *s = h->priv_data; + + ret = ff_zmq_wait_timeout(h, s->socket, 1, h->rw_timeout, &h->interrupt_callback); + if (ret) + return ret; + ret = zmq_send(s->socket, buf, size, 0); + if (ret == -1) { + av_log(h, AV_LOG_ERROR, "Error occured during zmq_send(): %s\n", ZMQ_STRERROR); + return AVERROR_EXTERNAL; + } + return ret; /*number of bytes sent*/ +} + +static int ff_zmq_read(URLContext *h, unsigned char *buf, int size) +{ + int ret; + ZMQContext *s = h->priv_data; + + ret = ff_zmq_wait_timeout(h, s->socket, 0, h->rw_timeout, &h->interrupt_callback); + if (ret) + return ret; + ret = zmq_recv(s->socket, buf, size, 0); + if (ret == -1) { + av_log(h, AV_LOG_ERROR, "Error occured during zmq_recv(): %s\n", ZMQ_STRERROR); + return AVERROR_EXTERNAL; + } + if (ret > size) { + s->pkt_size_overflow = FFMAX(s->pkt_size_overflow, ret); + av_log(h, AV_LOG_WARNING, "Message exceeds available space in the buffer. Message will be truncated. Setting -pkt_size %d may resolve the issue.\n", s->pkt_size_overflow); + ret = size; + } + return ret; /*number of bytes read*/ +} + +static int ff_zmq_close(URLContext *h) +{ + ZMQContext *s = h->priv_data; + zmq_close(s->socket); + zmq_ctx_destroy(s->context); + return 0; +} + +static const AVClass zmq_context_class = { + .class_name = "zmq", + .item_name = av_default_item_name, + .option = options, + .version = LIBAVUTIL_VERSION_INT, +}; + +const URLProtocol ff_libzmq_protocol = { + .name = "zmq", + .url_close = ff_zmq_close, + .url_open = ff_zmq_open, + .url_read = ff_zmq_read, + .url_write = ff_zmq_write, + .priv_data_size = sizeof(ZMQContext), + .priv_data_class = &zmq_context_class, + .flags = URL_PROTOCOL_FLAG_NETWORK, +}; diff --git a/libavformat/protocols.c b/libavformat/protocols.c index ad95659795..face5b29b5 100644 --- a/libavformat/protocols.c +++ b/libavformat/protocols.c @@ -68,6 +68,7 @@ extern const URLProtocol ff_librtmpte_protocol; extern const URLProtocol ff_libsrt_protocol; extern const URLProtocol ff_libssh_protocol; extern const URLProtocol ff_libsmbclient_protocol; +extern const URLProtocol ff_libzmq_protocol; #include "libavformat/protocol_list.c" diff --git a/libavformat/version.h b/libavformat/version.h index af0db1e970..edfa73fb97 100644 --- a/libavformat/version.h +++ b/libavformat/version.h @@ -32,7 +32,7 @@ // Major bumping may affect Ticket5467, 5421, 5451(compatibility with Chromium) // Also please add any ticket numbers that you believe might be affected here #define LIBAVFORMAT_VERSION_MAJOR 58 -#define LIBAVFORMAT_VERSION_MINOR 31 +#define LIBAVFORMAT_VERSION_MINOR 32 #define LIBAVFORMAT_VERSION_MICRO 104 #define LIBAVFORMAT_VERSION_INT AV_VERSION_INT(LIBAVFORMAT_VERSION_MAJOR, \