From patchwork Sun Mar 8 20:33:28 2020 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Andriy Gelman X-Patchwork-Id: 18093 Return-Path: X-Original-To: patchwork@ffaux-bg.ffmpeg.org Delivered-To: patchwork@ffaux-bg.ffmpeg.org Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org [79.124.17.100]) by ffaux.localdomain (Postfix) with ESMTP id 32AF844B2CD for ; Sun, 8 Mar 2020 22:33:52 +0200 (EET) Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 0B7F16881E9; Sun, 8 Mar 2020 22:33:52 +0200 (EET) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from mail-qt1-f195.google.com (mail-qt1-f195.google.com [209.85.160.195]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id E41F26880DA for ; Sun, 8 Mar 2020 22:33:44 +0200 (EET) Received: by mail-qt1-f195.google.com with SMTP id d22so5650735qtn.0 for ; Sun, 08 Mar 2020 13:33:44 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:mime-version :content-transfer-encoding; bh=Bd5KX6YHJdpw4V4SyhqBj8s75voTT0W/9XjLkiRwOrM=; b=URD+8FAOMY2uGHU0E7PfrGVnzW9VIn3WsSBwnHi8pCXkeUg4Gl7+b2P5Zjgzqy/YDv CqdRVbBtxvgxoXR7dKg5jwytN842COfLK0emk/6ucOa4d3ZnwUBNwRWRL3kanp+jwW0o zbD+JMI74wxLpAjOjCP51H27du51nr6ZckjJNkpOQLbePBwOjOc39C4s/A1JCoFdZzvK X4WVqwoiayjxmftoRYCesRsZspseka6OGniyadNs8MS+ZCpuxBRiWvqK9725y1n3y55U ZkpAy71rXxLsxc2rNLS2ORU035/WVBZ+Tp8LQy7O8UNfk74UOBfssmL/7sCyRG49qa5R ByHg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:mime-version :content-transfer-encoding; bh=Bd5KX6YHJdpw4V4SyhqBj8s75voTT0W/9XjLkiRwOrM=; b=nzkakkhoviXscvD56YM709SNoQuf72G/6zjxUp8THkMr0hJ2IvRPtNVa24BsrD3WwE 0G2xAgG2Xvy8ia94ekggYdg86CFeqzuKId9Dnv8dUTVAyvBBBgjoyEIUczjG98y4q+9A nCxvHLcbJ+49bKVEj59ojGDGuN/jqdhIwzAkwN7KLjpNvST658Ck/k6k9es4o8vfuOzZ Jb4msm1TJLRogR2Hr2yBQvM6JfsSsp+pbRWFT7owa1e3E7Iixm8KEX+MfDK5YJ/7aDcp CosWl9BTWTBODONrZee9CNaAe/C41HAJYyW/ncMn2YkuNRWUDbNyBd8EpViFUx02eI/c E7CQ== X-Gm-Message-State: ANhLgQ2ThmbB7maQ2q96e/Qp2sMJEAwHM97VPeudS+W24nJrlm9QaMdV sV13DPo4rhzVG+KX1kSynxCtTAqH X-Google-Smtp-Source: ADFU+vsA+BLX44kJ+3FdAqLDssGmoIwBZVSk6GlfQvxoDv1VBxSwjqvLFBMcC+kToh68/GQOkxHf0Q== X-Received: by 2002:ac8:4694:: with SMTP id g20mr12150468qto.13.1583699622886; Sun, 08 Mar 2020 13:33:42 -0700 (PDT) Received: from localhost.localdomain (c-71-232-27-28.hsd1.ma.comcast.net. [71.232.27.28]) by smtp.gmail.com with ESMTPSA id v1sm2991138qkd.74.2020.03.08.13.33.41 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Sun, 08 Mar 2020 13:33:42 -0700 (PDT) From: Andriy Gelman X-Google-Original-From: Andriy Gelman To: ffmpeg-devel@ffmpeg.org Date: Sun, 8 Mar 2020 16:33:28 -0400 Message-Id: <20200308203329.14369-1-andriy.gelman@gmail.com> X-Mailer: git-send-email 2.25.0 MIME-Version: 1.0 Subject: [FFmpeg-devel] [PATCH v3 1/2] avformat: Add AMQP version 0-9-1 protocol support X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.20 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Cc: Andriy Gelman Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" From: Andriy Gelman Supports connecting to a RabbitMQ broker via AMQP version 0-9-1. Signed-off-by: Andriy Gelman --- Changes in v3: - Fixed a bug when parsing username that contained a url encoded ":" - Fixed documentation error and improved logging Changes in v2: - Addressed comments from Marton - Updated documentation Compilation notes: - Requires librabbitmq-dev package (on ubuntu). - The pkg-config libprabbitmq.pc has a corrupt entry. **update: fixed on the github master branch** The line "Libs.private: rt; -lpthread" should be changed to "Libs.private: -lrt -pthread". - Compile FFmpeg with --enable-librabbitmq Changelog | 1 + configure | 5 + doc/general.texi | 1 + doc/protocols.texi | 60 ++++++++ libavformat/Makefile | 1 + libavformat/libamqp.c | 296 ++++++++++++++++++++++++++++++++++++++++ libavformat/protocols.c | 1 + libavformat/version.h | 4 +- 8 files changed, 367 insertions(+), 2 deletions(-) create mode 100644 libavformat/libamqp.c diff --git a/Changelog b/Changelog index cb310a3abc2..ab30d670a15 100644 --- a/Changelog +++ b/Changelog @@ -43,6 +43,7 @@ version : - Rayman 2 ADPCM decoder - Rayman 2 APM demuxer - cas video filter +- AMQP 0-9-1 protocol (RabbitMQ) version 4.2: diff --git a/configure b/configure index 06e3a7b2a88..8b171349440 100755 --- a/configure +++ b/configure @@ -255,6 +255,7 @@ External library support: --enable-libopenmpt enable decoding tracked files via libopenmpt [no] --enable-libopus enable Opus de/encoding via libopus [no] --enable-libpulse enable Pulseaudio input via libpulse [no] + --enable-librabbitmq enable RabbitMQ library [no] --enable-librav1e enable AV1 encoding via rav1e [no] --enable-librsvg enable SVG rasterization via librsvg [no] --enable-librubberband enable rubberband needed for rubberband filter [no] @@ -1789,6 +1790,7 @@ EXTERNAL_LIBRARY_LIST=" libopenmpt libopus libpulse + librabbitmq librav1e librsvg librtmp @@ -3434,6 +3436,8 @@ unix_protocol_deps="sys_un_h" unix_protocol_select="network" # external library protocols +libamqp_protocol_deps="librabbitmq" +libamqp_protocol_select="network" librtmp_protocol_deps="librtmp" librtmpe_protocol_deps="librtmp" librtmps_protocol_deps="librtmp" @@ -6317,6 +6321,7 @@ enabled libopus && { } } enabled libpulse && require_pkg_config libpulse libpulse pulse/pulseaudio.h pa_context_new +enabled librabbitmq && require_pkg_config librabbitmq "librabbitmq >= 0.7.1" amqp.h amqp_new_connection enabled librav1e && require_pkg_config librav1e "rav1e >= 0.1.0" rav1e.h rav1e_context_new enabled librsvg && require_pkg_config librsvg librsvg-2.0 librsvg-2.0/librsvg/rsvg.h rsvg_handle_render_cairo enabled librtmp && require_pkg_config librtmp librtmp librtmp/rtmp.h RTMP_Socket diff --git a/doc/general.texi b/doc/general.texi index dbdc3485982..623566dabea 100644 --- a/doc/general.texi +++ b/doc/general.texi @@ -1330,6 +1330,7 @@ performance on systems without hardware floating point support). @multitable @columnfractions .4 .1 @item Name @tab Support +@item AMQP @tab X @item file @tab X @item FTP @tab X @item Gopher @tab X diff --git a/doc/protocols.texi b/doc/protocols.texi index 54a287f488b..27df4759b63 100644 --- a/doc/protocols.texi +++ b/doc/protocols.texi @@ -51,6 +51,66 @@ in microseconds. A description of the currently available protocols follows. +@section amqp + +Advanced Message Queueing Protocol (AMQP) version 0-9-1 is a broker based +publish-subscribe communication protocol. + +FFmpeg must be compiled with --enable-librabbitmq to support AMQP. A separate +AMQP broker must also be run. An example open-source AMQP broker is RabbitMQ. + +After starting the broker, an FFmpeg client may stream data to the broker using +the command: + +@example +ffmpeg -re -i input -f mpegts amqp://[[user]:[password]@@]hostname[:port] +@end example + +Where hostname and port (default is 5672) is the address of the broker. The +client may also set a user/password for authentication. The default for both +fields is "guest". + +Muliple subscribers may stream from the broker using the command: +@example +ffplay amqp://[[user]:[password]@@]hostname[:port] +@end example + +In RabbitMQ all data published to the broker flows through a specific exchange, +and each subscribing client has an assigned queue/buffer. When a packet arrives +at an exchange, it may be copied to a client's queue depending on the exchange +and routing_key fields. + +The following options are supported: + +@table @option + +@item exchange +Sets the exchange to use on the broker. RabbitMQ has several predefined +exchanges: "amq.direct" is the default exchange, where the publisher and +subscriber must have a matching routing_key; "amq.fanout" is the same as a +broadcast operation (i.e. the data is forwarded to all queues on the fanout +exchange independent of the routing_key); and "amq.topic" is similar to +"amq.direct", but allows for more complex pattern matching (refer to the RabbitMQ +documentation). + +@item routing_key +Sets the routing key. The default value is "amqp". The routing key is used on +the "amq.direct" and "amq.topic" exchanges to decide whether packets are written +to the queue of a subscriber. + +@item pkt_size +Maximum size of each packet sent/received to the broker. Default is 131072. +Minimum is 4096 and max is any large value (representable by an int). When +receiving packets, this sets an internal buffer size in FFmpeg. It should be +equal to or greater than the size of the published packets to the broker. Otherwise +the received message may be truncated causing decoding errors. + +@item connection_timeout +The timeout in seconds during the initial connection to the broker. The +default value is rw_timeout, or 5 seconds if rw_timeout is not set. + +@end table + @section async Asynchronous data filling wrapper for input stream. diff --git a/libavformat/Makefile b/libavformat/Makefile index e0681058a29..52b44289fee 100644 --- a/libavformat/Makefile +++ b/libavformat/Makefile @@ -629,6 +629,7 @@ OBJS-$(CONFIG_UDPLITE_PROTOCOL) += udp.o ip.o OBJS-$(CONFIG_UNIX_PROTOCOL) += unix.o # external library protocols +OBJS-$(CONFIG_LIBAMQP_PROTOCOL) += libamqp.o OBJS-$(CONFIG_LIBRTMP_PROTOCOL) += librtmp.o OBJS-$(CONFIG_LIBRTMPE_PROTOCOL) += librtmp.o OBJS-$(CONFIG_LIBRTMPS_PROTOCOL) += librtmp.o diff --git a/libavformat/libamqp.c b/libavformat/libamqp.c new file mode 100644 index 00000000000..41ea110caed --- /dev/null +++ b/libavformat/libamqp.c @@ -0,0 +1,296 @@ +/* + * Advanced Message Queuing Protocol (AMQP) 0-9-1 + * Copyright (c) 2020 Andriy Gelman + * + * This file is part of FFmpeg. + * + * FFmpeg is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * FFmpeg is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with FFmpeg; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include +#include +#include "avformat.h" +#include "libavformat/urldecode.h" +#include "libavutil/avstring.h" +#include "libavutil/opt.h" +#include "libavutil/time.h" +#include "network.h" +#include "url.h" + +typedef struct AMQPContext { + const AVClass *class; + amqp_connection_state_t conn; + amqp_socket_t *socket; + const char *exchange; + const char *routing_key; + int pkt_size; + int64_t connection_timeout; + int pkt_size_overflow; +} AMQPContext; + +#define STR_LEN 1024 +#define DEFAULT_CHANNEL 1 + +#define OFFSET(x) offsetof(AMQPContext, x) +#define D AV_OPT_FLAG_DECODING_PARAM +#define E AV_OPT_FLAG_ENCODING_PARAM +static const AVOption options[] = { + { "pkt_size", "Maximum send/read packet size", OFFSET(pkt_size), AV_OPT_TYPE_INT, { .i64 = 131072 }, 4096, INT_MAX, .flags = D | E }, + { "exchange", "Exchange to send/read packets", OFFSET(exchange), AV_OPT_TYPE_STRING, { .str = "amq.direct" }, 0, 0, .flags = D | E }, + { "routing_key", "Key to filter streams", OFFSET(routing_key), AV_OPT_TYPE_STRING, { .str = "amqp" }, 0, 0, .flags = D | E }, + { "connection_timeout", "Initial connection timeout", OFFSET(connection_timeout), AV_OPT_TYPE_DURATION, { .i64 = -1 }, -1, INT64_MAX, .flags = D | E}, + { NULL } +}; + +static int amqp_proto_open(URLContext *h, const char *uri, int flags) +{ + int ret, server_msg; + char hostname[STR_LEN], credentials[STR_LEN]; + int port; + const char *user, *password = NULL; + const char *user_decoded, *password_decoded; + char *p; + amqp_rpc_reply_t broker_reply; + struct timeval tval = { 0 }; + + AMQPContext *s = h->priv_data; + + h->is_streamed = 1; + h->max_packet_size = s->pkt_size; + + av_url_split(NULL, 0, credentials, sizeof(credentials), + hostname, sizeof(hostname), &port, NULL, 0, uri); + + if (port < 0) + port = 5672; + + if (hostname[0] == '\0' || port <= 0 || port > 65535 ) { + av_log(h, AV_LOG_ERROR, "Invalid hostname/port\n"); + return AVERROR(EINVAL); + } + + p = strchr(credentials, ':'); + if (p) { + *p = '\0'; + password = p + 1; + } + + if (!password || *password == '\0') + password = "guest"; + + password_decoded = ff_urldecode(password, 0); + if (!password_decoded) + return AVERROR(ENOMEM); + + user = credentials; + if (*user == '\0') + user = "guest"; + + user_decoded = ff_urldecode(user, 0); + if (!user_decoded) { + av_freep(&password_decoded); + return AVERROR(ENOMEM); + } + + s->conn = amqp_new_connection(); + if (!s->conn) { + av_log(h, AV_LOG_ERROR, "Error creating connection\n"); + return AVERROR_EXTERNAL; + } + + s->socket = amqp_tcp_socket_new(s->conn); + if (!s->socket) { + av_log(h, AV_LOG_ERROR, "Error creating socket\n"); + goto destroy_connection; + } + + if (s->connection_timeout < 0) + s->connection_timeout = (h->rw_timeout > 0 ? h->rw_timeout : 5000000); + + tval.tv_sec = s->connection_timeout / 1000000; + tval.tv_usec = s->connection_timeout % 1000000; + ret = amqp_socket_open_noblock(s->socket, hostname, port, &tval); + + if (ret) { + av_log(h, AV_LOG_ERROR, "Error connecting to server: %s\n", + amqp_error_string2(ret)); + goto destroy_connection; + } + + broker_reply = amqp_login(s->conn, "/", 0, s->pkt_size, 0, + AMQP_SASL_METHOD_PLAIN, user_decoded, password_decoded); + + if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) { + av_log(h, AV_LOG_ERROR, "Error login\n"); + server_msg = AMQP_ACCESS_REFUSED; + goto close_connection; + } + + amqp_channel_open(s->conn, DEFAULT_CHANNEL); + broker_reply = amqp_get_rpc_reply(s->conn); + + if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) { + av_log(h, AV_LOG_ERROR, "Error set channel\n"); + server_msg = AMQP_CHANNEL_ERROR; + goto close_connection; + } + + if (h->flags & AVIO_FLAG_READ) { + amqp_bytes_t queuename; + char queuename_buff[STR_LEN]; + amqp_queue_declare_ok_t *r; + + r = amqp_queue_declare(s->conn, DEFAULT_CHANNEL, amqp_empty_bytes, + 0, 0, 0, 1, amqp_empty_table); + broker_reply = amqp_get_rpc_reply(s->conn); + if (!r || broker_reply.reply_type != AMQP_RESPONSE_NORMAL) { + av_log(h, AV_LOG_ERROR, "Error declare queue\n"); + server_msg = AMQP_RESOURCE_ERROR; + goto close_channel; + } + + /* store queuename */ + queuename.bytes = queuename_buff; + queuename.len = FFMIN(r->queue.len, STR_LEN); + memcpy(queuename.bytes, r->queue.bytes, queuename.len); + + amqp_queue_bind(s->conn, DEFAULT_CHANNEL, queuename, + amqp_cstring_bytes(s->exchange), + amqp_cstring_bytes(s->routing_key), amqp_empty_table); + + broker_reply = amqp_get_rpc_reply(s->conn); + if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) { + av_log(h, AV_LOG_ERROR, "Queue bind error\n"); + server_msg = AMQP_INTERNAL_ERROR; + goto close_channel; + } + + amqp_basic_consume(s->conn, DEFAULT_CHANNEL, queuename, amqp_empty_bytes, + 0, 1, 0, amqp_empty_table); + + broker_reply = amqp_get_rpc_reply(s->conn); + if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) { + av_log(h, AV_LOG_ERROR, "Set consume error\n"); + server_msg = AMQP_INTERNAL_ERROR; + goto close_channel; + } + } + + av_freep(&user_decoded); + av_freep(&password_decoded); + return 0; + +close_channel: + amqp_channel_close(s->conn, DEFAULT_CHANNEL, server_msg); +close_connection: + amqp_connection_close(s->conn, server_msg); +destroy_connection: + amqp_destroy_connection(s->conn); + + av_freep(&user_decoded); + av_freep(&password_decoded); + return AVERROR_EXTERNAL; +} + +static int amqp_proto_write(URLContext *h, const unsigned char *buf, int size) +{ + int ret; + AMQPContext *s = h->priv_data; + int fd = amqp_socket_get_sockfd(s->socket); + + amqp_bytes_t message = { size, (void *)buf }; + amqp_basic_properties_t props; + + ret = ff_network_wait_fd_timeout(fd, 1, h->rw_timeout, &h->interrupt_callback); + if (ret) + return ret; + + props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; + props.content_type = amqp_cstring_bytes("octet/stream"); + props.delivery_mode = 2; /* persistent delivery mode */ + + ret = amqp_basic_publish(s->conn, DEFAULT_CHANNEL, amqp_cstring_bytes(s->exchange), + amqp_cstring_bytes(s->routing_key), 0, 0, + &props, message); + + if (ret) { + av_log(h, AV_LOG_ERROR, "Error publish: %s\n", amqp_error_string2(ret)); + return AVERROR_EXTERNAL; + } + + return size; +} + +static int amqp_proto_read(URLContext *h, unsigned char *buf, int size) +{ + AMQPContext *s = h->priv_data; + int fd = amqp_socket_get_sockfd(s->socket); + int ret; + + amqp_rpc_reply_t broker_reply; + amqp_envelope_t envelope; + + ret = ff_network_wait_fd_timeout(fd, 0, h->rw_timeout, &h->interrupt_callback); + if (ret) + return ret; + + amqp_maybe_release_buffers(s->conn); + broker_reply = amqp_consume_message(s->conn, &envelope, NULL, 0); + + if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) + return AVERROR_EXTERNAL; + + if (envelope.message.body.len > size) { + s->pkt_size_overflow = FFMAX(s->pkt_size_overflow, envelope.message.body.len); + av_log(h, AV_LOG_WARNING, "Message exceeds space in the buffer. " + "Message will be truncated. Setting -pkt_size %d " + "may resolve this issue.\n", s->pkt_size_overflow); + } + size = FFMIN(size, envelope.message.body.len); + + memcpy(buf, envelope.message.body.bytes, size); + amqp_destroy_envelope(&envelope); + + return size; +} + +static int amqp_proto_close(URLContext *h) +{ + AMQPContext *s = h->priv_data; + amqp_channel_close(s->conn, DEFAULT_CHANNEL, AMQP_REPLY_SUCCESS); + amqp_connection_close(s->conn, AMQP_REPLY_SUCCESS); + amqp_destroy_connection(s->conn); + + return 0; +} + +static const AVClass amqp_context_class = { + .class_name = "amqp", + .item_name = av_default_item_name, + .option = options, + .version = LIBAVUTIL_VERSION_INT, +}; + +const URLProtocol ff_libamqp_protocol = { + .name = "amqp", + .url_close = amqp_proto_close, + .url_open = amqp_proto_open, + .url_read = amqp_proto_read, + .url_write = amqp_proto_write, + .priv_data_size = sizeof(AMQPContext), + .priv_data_class = &amqp_context_class, + .flags = URL_PROTOCOL_FLAG_NETWORK, +}; diff --git a/libavformat/protocols.c b/libavformat/protocols.c index 29fb99e7fa3..f1b8eab0fd6 100644 --- a/libavformat/protocols.c +++ b/libavformat/protocols.c @@ -60,6 +60,7 @@ extern const URLProtocol ff_tls_protocol; extern const URLProtocol ff_udp_protocol; extern const URLProtocol ff_udplite_protocol; extern const URLProtocol ff_unix_protocol; +extern const URLProtocol ff_libamqp_protocol; extern const URLProtocol ff_librtmp_protocol; extern const URLProtocol ff_librtmpe_protocol; extern const URLProtocol ff_librtmps_protocol; diff --git a/libavformat/version.h b/libavformat/version.h index 4724269b3c4..a233b673512 100644 --- a/libavformat/version.h +++ b/libavformat/version.h @@ -32,8 +32,8 @@ // Major bumping may affect Ticket5467, 5421, 5451(compatibility with Chromium) // Also please add any ticket numbers that you believe might be affected here #define LIBAVFORMAT_VERSION_MAJOR 58 -#define LIBAVFORMAT_VERSION_MINOR 39 -#define LIBAVFORMAT_VERSION_MICRO 101 +#define LIBAVFORMAT_VERSION_MINOR 40 +#define LIBAVFORMAT_VERSION_MICRO 100 #define LIBAVFORMAT_VERSION_INT AV_VERSION_INT(LIBAVFORMAT_VERSION_MAJOR, \ LIBAVFORMAT_VERSION_MINOR, \ From patchwork Sun Mar 8 20:33:29 2020 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Andriy Gelman X-Patchwork-Id: 18094 Return-Path: X-Original-To: patchwork@ffaux-bg.ffmpeg.org Delivered-To: patchwork@ffaux-bg.ffmpeg.org Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org [79.124.17.100]) by ffaux.localdomain (Postfix) with ESMTP id 4D8CF44B2CD for ; Sun, 8 Mar 2020 22:33:55 +0200 (EET) Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 2DFFE6881B5; Sun, 8 Mar 2020 22:33:55 +0200 (EET) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from mail-qv1-f65.google.com (mail-qv1-f65.google.com [209.85.219.65]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id 028696883C5 for ; Sun, 8 Mar 2020 22:33:47 +0200 (EET) Received: by mail-qv1-f65.google.com with SMTP id fc12so3468874qvb.6 for ; Sun, 08 Mar 2020 13:33:47 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references :mime-version:content-transfer-encoding; bh=xkpBWWvMawV+XN6Vl/55vBP0lcFDrjVm0uW1GG265FE=; b=nhm+nTqISTDPRj8IdhKSUtU69HRumaJT1IvIIAd+6Pmwx+p3mHlQI9HUCVsvgkisBk IrwREL9EpnQ4gBmXzB8el5pOgxztiktnwSGw8RG3fo45kw8DSXjxT910OJnvdVi8jE/7 gwzF3BqauKrdy/HrFzuFSMp023jPIVUgfVleKc+yyYSzAMRP8bmRrJ70K7kwj+CtXeyF Is9EQkjKtN5phFD6oGUDigvBhICZftqm+hTLdOsGHV7uWHEVeVD+OG4ddQGMs7y9K8KY 8zk5FLtsywaI+F1SMmyrFfpU5JWHB7jZDq2i5Y4gSbUchz22NDtNPkAemJx04HBWRBIg F64Q== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references:mime-version:content-transfer-encoding; bh=xkpBWWvMawV+XN6Vl/55vBP0lcFDrjVm0uW1GG265FE=; b=AGKf6X4fICnBh3hRwkUIAcR5XR05i6wsVz9VQm/sEug+xowMCovfRhHG18MHxxSeWJ RiVJcUxaJJD8Q1DvfFhBVaQTfbD48vbY06vA1MShLnu+acDN7a6/SQuSkZgMKud+QD3n I3nUJPSC9E8iQ1lguwv/qQO+CIorjUz3pw5qB6hAtqwQ0sQI+jFhJ75nLe+78QhnWTFJ H7xMdMFuA4vRmoj684+Ifi0PNZukpjQd6VGJ/nROQihi4Tbi8tmUvYK2uR6H+vlGwImV aHJiKEaPfe101DZ2aO82BedJNPZu85xJ61NbSWHwxOuPaQHVqqBIf+SZPrFPvBRpbLc9 zF5w== X-Gm-Message-State: ANhLgQ10VxAnSfyI430EmnAJuefnhZaa4kxY84RgO2TbHubwP0aqWLOL ldy3O8D1DUzUgtfDaDMwgeiRDsay X-Google-Smtp-Source: ADFU+vss75muuGHg0J2ywF4lTHsoeFyh7FDFomXECPk2i9vhe8g33ABLdKApKZA30krKl090+Cmvbw== X-Received: by 2002:a05:6214:1449:: with SMTP id b9mr8170353qvy.217.1583699626287; Sun, 08 Mar 2020 13:33:46 -0700 (PDT) Received: from localhost.localdomain (c-71-232-27-28.hsd1.ma.comcast.net. [71.232.27.28]) by smtp.gmail.com with ESMTPSA id v1sm2991138qkd.74.2020.03.08.13.33.45 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Sun, 08 Mar 2020 13:33:45 -0700 (PDT) From: Andriy Gelman X-Google-Original-From: Andriy Gelman To: ffmpeg-devel@ffmpeg.org Date: Sun, 8 Mar 2020 16:33:29 -0400 Message-Id: <20200308203329.14369-2-andriy.gelman@gmail.com> X-Mailer: git-send-email 2.25.0 In-Reply-To: <20200308203329.14369-1-andriy.gelman@gmail.com> References: <20200308203329.14369-1-andriy.gelman@gmail.com> MIME-Version: 1.0 Subject: [FFmpeg-devel] [PATCH v3 2/2] avformat/libzmq: Make default pkt_size value consistent with amqp X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.20 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Cc: Andriy Gelman Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" From: Andriy Gelman Signed-off-by: Andriy Gelman --- doc/protocols.texi | 2 +- libavformat/libzmq.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/protocols.texi b/doc/protocols.texi index 27df4759b63..e510019f2d8 100644 --- a/doc/protocols.texi +++ b/doc/protocols.texi @@ -1858,7 +1858,7 @@ line. The following options are supported: @item pkt_size Forces the maximum packet size for sending/receiving data. The default value is -32,768 bytes. On the server side, this sets the maximum size of sent packets +131,072 bytes. On the server side, this sets the maximum size of sent packets via ZeroMQ. On the clients, it sets an internal buffer size for receiving packets. Note that pkt_size on the clients should be equal to or greater than pkt_size on the server. Otherwise the received message may be truncated causing diff --git a/libavformat/libzmq.c b/libavformat/libzmq.c index 8c8b294c921..1b0d8638dbf 100644 --- a/libavformat/libzmq.c +++ b/libavformat/libzmq.c @@ -40,7 +40,7 @@ typedef struct ZMQContext { #define D AV_OPT_FLAG_DECODING_PARAM #define E AV_OPT_FLAG_ENCODING_PARAM static const AVOption options[] = { - { "pkt_size", "Maximum send/read packet size", OFFSET(pkt_size), AV_OPT_TYPE_INT, { .i64 = 32768 }, -1, INT_MAX, .flags = D | E }, + { "pkt_size", "Maximum send/read packet size", OFFSET(pkt_size), AV_OPT_TYPE_INT, { .i64 = 131072 }, -1, INT_MAX, .flags = D | E }, { NULL } };