diff mbox series

[FFmpeg-devel,v2,1/2] avformat: Add AMQP version 0-9-1 protocol support

Message ID 20200228205020.31418-1-andriy.gelman@gmail.com
State Superseded
Headers show
Series [FFmpeg-devel,v2,1/2] avformat: Add AMQP version 0-9-1 protocol support | expand

Checks

Context Check Description
andriy/ffmpeg-patchwork success Make fate finished

Commit Message

Andriy Gelman Feb. 28, 2020, 8:50 p.m. UTC
From: Andriy Gelman <andriy.gelman@gmail.com>

Supports connecting to a RabbitMQ broker via AMQP version 0-9-1.

Signed-off-by: Andriy Gelman <andriy.gelman@gmail.com>
---

Changes in v2: 
    - Addressed comments from Marton
    - Updated documentation

Compilation notes:
    - Requires librabbitmq-dev package (on ubuntu).
    - The pkg-config libprabbitmq.pc has a corrupt entry.  
      **update: fixed on the github master branch**
      The line "Libs.private: rt; -lpthread" should be changed to
      "Libs.private: -lrt -pthread".
    - Compile FFmpeg with --enable-librabbitmq

To run an example:
#
# Start the RabbitMQ broker (I use docker)
# The following starts the broker on localhost:5672. A webui is available on
# localhost:15672 (User/password is "guest" by default)
#
$ docker run -it --rm --name rabbitmq -p 127.0.0.1:5672:5672 -p 127.0.0.1:15672:15672 rabbitmq:3-management

#
# Stream to the RabbitMQ broker:
#
$ ./ffmpeg -re -f lavfi -i yuvtestsrc -codec:v libx264 -f mpegts -routing_key "amqp" -exchange "amq.direct" amqp://localhost

#
# Connect any number of clients to fetch data from the broker:
# The clients are filtered by the routing_key and exchange.
#
$ ./ffplay -routing_key "amqp" -exchange "amq.direct" amqp://localhost

 Changelog               |   1 +
 configure               |   5 +
 doc/general.texi        |   1 +
 doc/protocols.texi      |  60 +++++++++
 libavformat/Makefile    |   1 +
 libavformat/libamqp.c   | 286 ++++++++++++++++++++++++++++++++++++++++
 libavformat/protocols.c |   1 +
 libavformat/version.h   |   4 +-
 8 files changed, 357 insertions(+), 2 deletions(-)
 create mode 100644 libavformat/libamqp.c

Comments

Paul B Mahol Feb. 29, 2020, 10:21 a.m. UTC | #1
I think this was already rejected?

On 2/28/20, Andriy Gelman <andriy.gelman@gmail.com> wrote:
> From: Andriy Gelman <andriy.gelman@gmail.com>
>
> Supports connecting to a RabbitMQ broker via AMQP version 0-9-1.
>
> Signed-off-by: Andriy Gelman <andriy.gelman@gmail.com>
> ---
>
> Changes in v2:
>     - Addressed comments from Marton
>     - Updated documentation
>
> Compilation notes:
>     - Requires librabbitmq-dev package (on ubuntu).
>     - The pkg-config libprabbitmq.pc has a corrupt entry.
>       **update: fixed on the github master branch**
>       The line "Libs.private: rt; -lpthread" should be changed to
>       "Libs.private: -lrt -pthread".
>     - Compile FFmpeg with --enable-librabbitmq
>
> To run an example:
> #
> # Start the RabbitMQ broker (I use docker)
> # The following starts the broker on localhost:5672. A webui is available on
> # localhost:15672 (User/password is "guest" by default)
> #
> $ docker run -it --rm --name rabbitmq -p 127.0.0.1:5672:5672 -p
> 127.0.0.1:15672:15672 rabbitmq:3-management
>
> #
> # Stream to the RabbitMQ broker:
> #
> $ ./ffmpeg -re -f lavfi -i yuvtestsrc -codec:v libx264 -f mpegts
> -routing_key "amqp" -exchange "amq.direct" amqp://localhost
>
> #
> # Connect any number of clients to fetch data from the broker:
> # The clients are filtered by the routing_key and exchange.
> #
> $ ./ffplay -routing_key "amqp" -exchange "amq.direct" amqp://localhost
>
>  Changelog               |   1 +
>  configure               |   5 +
>  doc/general.texi        |   1 +
>  doc/protocols.texi      |  60 +++++++++
>  libavformat/Makefile    |   1 +
>  libavformat/libamqp.c   | 286 ++++++++++++++++++++++++++++++++++++++++
>  libavformat/protocols.c |   1 +
>  libavformat/version.h   |   4 +-
>  8 files changed, 357 insertions(+), 2 deletions(-)
>  create mode 100644 libavformat/libamqp.c
>
> diff --git a/Changelog b/Changelog
> index cb310a3abc2..ab30d670a15 100644
> --- a/Changelog
> +++ b/Changelog
> @@ -43,6 +43,7 @@ version <next>:
>  - Rayman 2 ADPCM decoder
>  - Rayman 2 APM demuxer
>  - cas video filter
> +- AMQP 0-9-1 protocol (RabbitMQ)
>
>
>  version 4.2:
> diff --git a/configure b/configure
> index 06e3a7b2a88..8b171349440 100755
> --- a/configure
> +++ b/configure
> @@ -255,6 +255,7 @@ External library support:
>    --enable-libopenmpt      enable decoding tracked files via libopenmpt
> [no]
>    --enable-libopus         enable Opus de/encoding via libopus [no]
>    --enable-libpulse        enable Pulseaudio input via libpulse [no]
> +  --enable-librabbitmq     enable RabbitMQ library [no]
>    --enable-librav1e        enable AV1 encoding via rav1e [no]
>    --enable-librsvg         enable SVG rasterization via librsvg [no]
>    --enable-librubberband   enable rubberband needed for rubberband filter
> [no]
> @@ -1789,6 +1790,7 @@ EXTERNAL_LIBRARY_LIST="
>      libopenmpt
>      libopus
>      libpulse
> +    librabbitmq
>      librav1e
>      librsvg
>      librtmp
> @@ -3434,6 +3436,8 @@ unix_protocol_deps="sys_un_h"
>  unix_protocol_select="network"
>
>  # external library protocols
> +libamqp_protocol_deps="librabbitmq"
> +libamqp_protocol_select="network"
>  librtmp_protocol_deps="librtmp"
>  librtmpe_protocol_deps="librtmp"
>  librtmps_protocol_deps="librtmp"
> @@ -6317,6 +6321,7 @@ enabled libopus           && {
>      }
>  }
>  enabled libpulse          && require_pkg_config libpulse libpulse
> pulse/pulseaudio.h pa_context_new
> +enabled librabbitmq       && require_pkg_config librabbitmq "librabbitmq >=
> 0.7.1" amqp.h amqp_new_connection
>  enabled librav1e          && require_pkg_config librav1e "rav1e >= 0.1.0"
> rav1e.h rav1e_context_new
>  enabled librsvg           && require_pkg_config librsvg librsvg-2.0
> librsvg-2.0/librsvg/rsvg.h rsvg_handle_render_cairo
>  enabled librtmp           && require_pkg_config librtmp librtmp
> librtmp/rtmp.h RTMP_Socket
> diff --git a/doc/general.texi b/doc/general.texi
> index dbdc3485982..623566dabea 100644
> --- a/doc/general.texi
> +++ b/doc/general.texi
> @@ -1330,6 +1330,7 @@ performance on systems without hardware floating point
> support).
>
>  @multitable @columnfractions .4 .1
>  @item Name         @tab Support
> +@item AMQP         @tab X
>  @item file         @tab X
>  @item FTP          @tab X
>  @item Gopher       @tab X
> diff --git a/doc/protocols.texi b/doc/protocols.texi
> index 54a287f488b..dc5f49ba8cc 100644
> --- a/doc/protocols.texi
> +++ b/doc/protocols.texi
> @@ -51,6 +51,66 @@ in microseconds.
>
>  A description of the currently available protocols follows.
>
> +@section amqp
> +
> +Advanced Message Queueing Protocol (AMQP) version 0-9-1 is a broker based
> +publish-subscribe communication protocol.
> +
> +FFmpeg must be compiled with --enable-librabbitmq to support AMQP. A
> separate
> +AMQP broker must also be run. An example open-source AMQP broker is
> RabbitMQ.
> +
> +After starting the broker, an FFmpeg client may stream data to the broker
> using
> +the command:
> +
> +@example
> +ffmpeg -re -i input -f mpegts amqp://[[user]:[password]@@]hostname[:port]
> +@end example
> +
> +Where hostname and port (default is 5672) is the address of the broker. The
> +client may also set a user/password for authentication. The default for
> both
> +fields is "guest".
> +
> +Muliple subscribers may stream from the broker using the command:
> +@example
> +ffplay amqp://[[user]:[password]@@]hostname[:port]
> +@end example
> +
> +In RabbitMQ all data published to the broker flows through a specific
> exchange,
> +and each subscribing client has an assigned queue/buffer. When a packet
> arrives
> +at an exchange, it may be copied to a client's queue depending on the
> exchange
> +and routing_key fields.
> +
> +The following options are supported:
> +
> +@table @option
> +
> +@item exchange
> +Sets the exchange to use on the broker. RabbitMQ has several predefined
> +exchanges: "amq.direct" is the default exchange, where the publisher and
> +subscriber must have a matching routing_key; "amq.fanout" is the same as a
> +broadcast operation (i.e. the data is forwarded to all queues on the fanout
> +exchange independent of the routing_key); and "amq.topic" is similar to
> +"amq.direct", but allows for more complex pattern matching (refer to the
> RabbitMQ
> +documentation).
> +
> +@item routing_key
> +Sets the routing key. The default value is "amqp". The routing key is used
> on
> +the "amq.direct" and "amq.topic" exchanges to decide whether packets are
> written
> +to the queue of a subscriber.
> +
> +@item pkt_size
> +Maximum size of each packet sent/received to the broker. Default is 131072.
> +Minimum is 4096 and max is any large value (representable by an int). When
> +receiving packets, this sets an internal buffer size in FFmpeg. It should
> be
> +equal to or greater than the size of the published packets to the broker.
> Otherwise
> +the received message may be truncated causing decoding errors.
> +
> +@item connection_timeout
> +The timeout in microseconds during the initial connection to the broker.
> The
> +default value is rw_timeout, or 5000000 microseconds if rw_timeout is not
> set.
> +
> +@end table
> +
>  @section async
>
>  Asynchronous data filling wrapper for input stream.
> diff --git a/libavformat/Makefile b/libavformat/Makefile
> index e0681058a29..52b44289fee 100644
> --- a/libavformat/Makefile
> +++ b/libavformat/Makefile
> @@ -629,6 +629,7 @@ OBJS-$(CONFIG_UDPLITE_PROTOCOL)          += udp.o ip.o
>  OBJS-$(CONFIG_UNIX_PROTOCOL)             += unix.o
>
>  # external library protocols
> +OBJS-$(CONFIG_LIBAMQP_PROTOCOL)          += libamqp.o
>  OBJS-$(CONFIG_LIBRTMP_PROTOCOL)          += librtmp.o
>  OBJS-$(CONFIG_LIBRTMPE_PROTOCOL)         += librtmp.o
>  OBJS-$(CONFIG_LIBRTMPS_PROTOCOL)         += librtmp.o
> diff --git a/libavformat/libamqp.c b/libavformat/libamqp.c
> new file mode 100644
> index 00000000000..baa42306bb0
> --- /dev/null
> +++ b/libavformat/libamqp.c
> @@ -0,0 +1,286 @@
> +/*
> + * Advanced Message Queuing Protocol (AMQP) 0-9-1
> + * Copyright (c) 2020 Andriy Gelman
> + *
> + * This file is part of FFmpeg.
> + *
> + * FFmpeg is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * FFmpeg is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with FFmpeg; if not, write to the Free Software
> + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
> USA
> + */
> +
> +#include <amqp.h>
> +#include <amqp_tcp_socket.h>
> +#include <sys/time.h>
> +#include "avformat.h"
> +#include "libavformat/urldecode.h"
> +#include "libavutil/avstring.h"
> +#include "libavutil/opt.h"
> +#include "libavutil/time.h"
> +#include "network.h"
> +#include "url.h"
> +
> +typedef struct AMQPContext {
> +    const AVClass *class;
> +    amqp_connection_state_t conn;
> +    amqp_socket_t *socket;
> +    const char *exchange;
> +    const char *routing_key;
> +    int pkt_size;
> +    int64_t connection_timeout;
> +    int pkt_size_overflow;
> +} AMQPContext;
> +
> +#define STR_LEN           1024
> +#define DEFAULT_CHANNEL   1
> +
> +#define OFFSET(x) offsetof(AMQPContext, x)
> +#define D AV_OPT_FLAG_DECODING_PARAM
> +#define E AV_OPT_FLAG_ENCODING_PARAM
> +static const AVOption options[] = {
> +    { "pkt_size", "Maximum send/read packet size", OFFSET(pkt_size),
> AV_OPT_TYPE_INT, { .i64 = 131072 }, 4096, INT_MAX, .flags = D | E },
> +    { "exchange", "Exchange to send/read packets", OFFSET(exchange),
> AV_OPT_TYPE_STRING, { .str = "amq.direct" }, 0, 0, .flags = D | E },
> +    { "routing_key", "Key to filter streams", OFFSET(routing_key),
> AV_OPT_TYPE_STRING, { .str = "amqp" }, 0, 0, .flags = D | E },
> +    { "connection_timeout", "Initial connection timeout",
> OFFSET(connection_timeout), AV_OPT_TYPE_DURATION, { .i64 = -1 }, -1,
> INT_MAX, .flags = D | E},
> +    { NULL }
> +};
> +
> +static int amqp_proto_open(URLContext *h, const char *uri, int flags)
> +{
> +    int ret, server_msg;
> +    char hostname[STR_LEN], credentials[STR_LEN];
> +    char *credentials_decoded;
> +    int port;
> +    const char *user, *password = NULL;
> +    char *p;
> +    amqp_rpc_reply_t broker_reply;
> +    struct timeval tval = { 0 };
> +
> +    AMQPContext *s = h->priv_data;
> +
> +    h->is_streamed     = 1;
> +    h->max_packet_size = s->pkt_size;
> +
> +    av_url_split(NULL, 0, credentials, sizeof(credentials),
> +                 hostname, sizeof(hostname), &port, NULL, 0, uri);
> +
> +    if (port < 0)
> +        port = 5672;
> +
> +    if (hostname[0] == '\0' || port <= 0 || port > 65535 ) {
> +        av_log(h, AV_LOG_ERROR, "Invalid hostname/port\n");
> +        return AVERROR(EINVAL);
> +    }
> +
> +    credentials_decoded = ff_urldecode(credentials, 0);
> +    if (!credentials_decoded)
> +        return AVERROR(ENOMEM);
> +
> +    p = strchr(credentials_decoded, ':');
> +    if (p) {
> +        *p = '\0';
> +        password = p + 1;
> +    }
> +
> +    if (!password || *password == '\0')
> +        password = "guest";
> +
> +    user = credentials_decoded;
> +    if (*user == '\0')
> +        user = "guest";
> +
> +    s->conn = amqp_new_connection();
> +    if (!s->conn) {
> +        av_log(h, AV_LOG_ERROR, "Error creating connection\n");
> +        return AVERROR_EXTERNAL;
> +    }
> +
> +    s->socket = amqp_tcp_socket_new(s->conn);
> +    if (!s->socket) {
> +        av_log(h, AV_LOG_ERROR, "Error creating socket\n");
> +        goto destroy_connection;
> +    }
> +
> +    if (s->connection_timeout < 0)
> +        s->connection_timeout = (h->rw_timeout > 0 ? h->rw_timeout :
> 5000000);
> +
> +    tval.tv_sec  = s->connection_timeout / 1000000;
> +    tval.tv_usec = s->connection_timeout % 1000000;
> +    ret = amqp_socket_open_noblock(s->socket, hostname, port, &tval);
> +
> +    if (ret) {
> +        av_log(h, AV_LOG_ERROR, "Error connecting to server\n");
> +        goto destroy_connection;
> +    }
> +
> +    broker_reply = amqp_login(s->conn, "/", 0, s->pkt_size, 0,
> +                              AMQP_SASL_METHOD_PLAIN, user, password);
> +
> +    if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
> +        av_log(h, AV_LOG_ERROR, "Error login\n");
> +        server_msg = AMQP_ACCESS_REFUSED;
> +        goto close_connection;
> +    }
> +
> +    amqp_channel_open(s->conn, DEFAULT_CHANNEL);
> +    broker_reply = amqp_get_rpc_reply(s->conn);
> +
> +    if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
> +        av_log(h, AV_LOG_ERROR, "Error set channel\n");
> +        server_msg = AMQP_CHANNEL_ERROR;
> +        goto close_connection;
> +    }
> +
> +    if (h->flags & AVIO_FLAG_READ) {
> +        amqp_bytes_t queuename;
> +        char queuename_buff[STR_LEN];
> +        amqp_queue_declare_ok_t *r;
> +
> +        r = amqp_queue_declare(s->conn, DEFAULT_CHANNEL, amqp_empty_bytes,
> +                               0, 0, 0, 1, amqp_empty_table);
> +        broker_reply = amqp_get_rpc_reply(s->conn);
> +        if (!r || broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
> +            av_log(h, AV_LOG_ERROR, "Error declare queue\n");
> +            server_msg = AMQP_RESOURCE_ERROR;
> +            goto close_channel;
> +        }
> +
> +        /* store queuename */
> +        queuename.bytes = queuename_buff;
> +        queuename.len = FFMIN(r->queue.len, STR_LEN);
> +        memcpy(queuename.bytes, r->queue.bytes, queuename.len);
> +
> +        amqp_queue_bind(s->conn, DEFAULT_CHANNEL, queuename,
> amqp_cstring_bytes(s->exchange),
> +                        amqp_cstring_bytes(s->routing_key),
> amqp_empty_table);
> +
> +        broker_reply = amqp_get_rpc_reply(s->conn);
> +        if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
> +            av_log(h, AV_LOG_ERROR, "Queue bind error\n");
> +            server_msg = AMQP_INTERNAL_ERROR;
> +            goto close_channel;
> +        }
> +
> +        amqp_basic_consume(s->conn, DEFAULT_CHANNEL, queuename,
> amqp_empty_bytes,
> +                           0, 1, 0, amqp_empty_table);
> +
> +        broker_reply = amqp_get_rpc_reply(s->conn);
> +        if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
> +            av_log(h, AV_LOG_ERROR, "Set consume error\n");
> +            server_msg = AMQP_INTERNAL_ERROR;
> +            goto close_channel;
> +        }
> +    }
> +
> +    av_freep(&credentials_decoded);
> +    return 0;
> +
> +close_channel:
> +    amqp_channel_close(s->conn, DEFAULT_CHANNEL, server_msg);
> +close_connection:
> +    amqp_connection_close(s->conn, server_msg);
> +destroy_connection:
> +    amqp_destroy_connection(s->conn);
> +
> +    av_freep(&credentials_decoded);
> +    return AVERROR_EXTERNAL;
> +}
> +
> +static int amqp_proto_write(URLContext *h, const unsigned char *buf, int
> size)
> +{
> +    int ret;
> +    AMQPContext *s = h->priv_data;
> +    int fd = amqp_socket_get_sockfd(s->socket);
> +
> +    amqp_bytes_t message = { size, (void *)buf };
> +    amqp_basic_properties_t props;
> +
> +    ret = ff_network_wait_fd_timeout(fd, 1, h->rw_timeout,
> &h->interrupt_callback);
> +    if (ret)
> +        return ret;
> +
> +    props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG |
> AMQP_BASIC_DELIVERY_MODE_FLAG;
> +    props.content_type = amqp_cstring_bytes("octet/stream");
> +    props.delivery_mode = 2; /* persistent delivery mode */
> +
> +    ret = amqp_basic_publish(s->conn, DEFAULT_CHANNEL,
> amqp_cstring_bytes(s->exchange),
> +                             amqp_cstring_bytes(s->routing_key), 0, 0,
> +                             &props, message);
> +
> +    if (ret) {
> +        av_log(h, AV_LOG_ERROR, "Error publish\n");
> +        return AVERROR_EXTERNAL;
> +    }
> +
> +    return size;
> +}
> +
> +static int amqp_proto_read(URLContext *h, unsigned char *buf, int size)
> +{
> +    AMQPContext *s = h->priv_data;
> +    int fd = amqp_socket_get_sockfd(s->socket);
> +    int ret;
> +
> +    amqp_rpc_reply_t broker_reply;
> +    amqp_envelope_t envelope;
> +
> +    ret = ff_network_wait_fd_timeout(fd, 0, h->rw_timeout,
> &h->interrupt_callback);
> +    if (ret)
> +        return ret;
> +
> +    amqp_maybe_release_buffers(s->conn);
> +    broker_reply = amqp_consume_message(s->conn, &envelope, NULL, 0);
> +
> +    if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL)
> +        return AVERROR_EXTERNAL;
> +
> +    if (envelope.message.body.len > size) {
> +        s->pkt_size_overflow = FFMAX(s->pkt_size_overflow,
> envelope.message.body.len);
> +        av_log(h, AV_LOG_WARNING, "Message exceeds space in the buffer. "
> +                                  "Message will be truncated. Setting
> -pkt_size %d "
> +                                  "may resolve this issue.\n",
> s->pkt_size_overflow);
> +    }
> +    size = FFMIN(size, envelope.message.body.len);
> +
> +    memcpy(buf, envelope.message.body.bytes, size);
> +    amqp_destroy_envelope(&envelope);
> +
> +    return size;
> +}
> +
> +static int amqp_proto_close(URLContext *h)
> +{
> +    AMQPContext *s = h->priv_data;
> +    amqp_channel_close(s->conn, DEFAULT_CHANNEL, AMQP_REPLY_SUCCESS);
> +    amqp_connection_close(s->conn, AMQP_REPLY_SUCCESS);
> +    amqp_destroy_connection(s->conn);
> +
> +    return 0;
> +}
> +
> +static const AVClass amqp_context_class = {
> +    .class_name = "amqp",
> +    .item_name  = av_default_item_name,
> +    .option     = options,
> +    .version    = LIBAVUTIL_VERSION_INT,
> +};
> +
> +const URLProtocol ff_libamqp_protocol = {
> +    .name            = "amqp",
> +    .url_close       = amqp_proto_close,
> +    .url_open        = amqp_proto_open,
> +    .url_read        = amqp_proto_read,
> +    .url_write       = amqp_proto_write,
> +    .priv_data_size  = sizeof(AMQPContext),
> +    .priv_data_class = &amqp_context_class,
> +    .flags           = URL_PROTOCOL_FLAG_NETWORK,
> +};
> diff --git a/libavformat/protocols.c b/libavformat/protocols.c
> index 29fb99e7fa3..f1b8eab0fd6 100644
> --- a/libavformat/protocols.c
> +++ b/libavformat/protocols.c
> @@ -60,6 +60,7 @@ extern const URLProtocol ff_tls_protocol;
>  extern const URLProtocol ff_udp_protocol;
>  extern const URLProtocol ff_udplite_protocol;
>  extern const URLProtocol ff_unix_protocol;
> +extern const URLProtocol ff_libamqp_protocol;
>  extern const URLProtocol ff_librtmp_protocol;
>  extern const URLProtocol ff_librtmpe_protocol;
>  extern const URLProtocol ff_librtmps_protocol;
> diff --git a/libavformat/version.h b/libavformat/version.h
> index 4724269b3c4..a233b673512 100644
> --- a/libavformat/version.h
> +++ b/libavformat/version.h
> @@ -32,8 +32,8 @@
>  // Major bumping may affect Ticket5467, 5421, 5451(compatibility with
> Chromium)
>  // Also please add any ticket numbers that you believe might be affected
> here
>  #define LIBAVFORMAT_VERSION_MAJOR  58
> -#define LIBAVFORMAT_VERSION_MINOR  39
> -#define LIBAVFORMAT_VERSION_MICRO 101
> +#define LIBAVFORMAT_VERSION_MINOR  40
> +#define LIBAVFORMAT_VERSION_MICRO 100
>
>  #define LIBAVFORMAT_VERSION_INT AV_VERSION_INT(LIBAVFORMAT_VERSION_MAJOR, \
>                                                 LIBAVFORMAT_VERSION_MINOR, \
> --
> 2.25.0
>
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel@ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
Carl Eugen Hoyos Feb. 29, 2020, 11:32 a.m. UTC | #2
Am Fr., 28. Feb. 2020 um 21:57 Uhr schrieb Andriy Gelman
<andriy.gelman@gmail.com>:

> @@ -1789,6 +1790,7 @@ EXTERNAL_LIBRARY_LIST="
>      libopenmpt
>      libopus
>      libpulse
> +    librabbitmq
>      librav1e
>      librsvg
>      librtmp

If the patch is acceptable, I believe this belongs in EXTERNAL_LIBRARY_GPL_LIST

Carl Eugen
Andriy Gelman Feb. 29, 2020, 3:43 p.m. UTC | #3
On Sat, 29. Feb 11:21, Paul B Mahol wrote:
> I think this was already rejected?

Thilo had a question on broker performance, which I believe addressed:
http://ffmpeg.org/pipermail/ffmpeg-devel/2020-February/257483.html

j-b had concerns about how it fits into libavformat:
http://ffmpeg.org/pipermail/ffmpeg-devel/2020-February/256977.html
As I see it, this is just another general purpose protocol.
Paul B Mahol Feb. 29, 2020, 4 p.m. UTC | #4
On 2/29/20, Andriy Gelman <andriy.gelman@gmail.com> wrote:
> On Sat, 29. Feb 11:21, Paul B Mahol wrote:
>> I think this was already rejected?
>
> Thilo had a question on broker performance, which I believe addressed:
> http://ffmpeg.org/pipermail/ffmpeg-devel/2020-February/257483.html
>
> j-b had concerns about how it fits into libavformat:
> http://ffmpeg.org/pipermail/ffmpeg-devel/2020-February/256977.html
> As I see it, this is just another general purpose protocol.

Well, i still do not care, but care if others are against it.

>
> --
> Andriy
>
Marton Balint Feb. 29, 2020, 4:09 p.m. UTC | #5
On Sat, 29 Feb 2020, Paul B Mahol wrote:

> I think this was already rejected?

jb questioned if this belongs to libavformat, and timo asked how well the 
message brokers handle high bitrates/big message sizes, no hard rejects 
were made as far as I remember.

Andriy provided numbers for scaling, I have not answered the concerns 
regarding libavformat integration, because I am not sure I understand the 
concern. AMQP is a general purpose protocol for message transfer, it even 
has an official URL scheme, so when we integrate it into libavformat as a 
*protocol* I don't really see why it would not fit into the framework or 
what can be gained if it is implemented separately.

If people still have hard feelings against merging this, please speak up, 
but I honestly don't see a problem with it.

Thanks,
Marton

>
> On 2/28/20, Andriy Gelman <andriy.gelman@gmail.com> wrote:
>> From: Andriy Gelman <andriy.gelman@gmail.com>
>>
>> Supports connecting to a RabbitMQ broker via AMQP version 0-9-1.
>>
>> Signed-off-by: Andriy Gelman <andriy.gelman@gmail.com>
>> ---
>>
>> Changes in v2:
>>     - Addressed comments from Marton
>>     - Updated documentation
>>
>> Compilation notes:
>>     - Requires librabbitmq-dev package (on ubuntu).
>>     - The pkg-config libprabbitmq.pc has a corrupt entry.
>>       **update: fixed on the github master branch**
>>       The line "Libs.private: rt; -lpthread" should be changed to
>>       "Libs.private: -lrt -pthread".
>>     - Compile FFmpeg with --enable-librabbitmq
>>
>> To run an example:
>> #
>> # Start the RabbitMQ broker (I use docker)
>> # The following starts the broker on localhost:5672. A webui is available on
>> # localhost:15672 (User/password is "guest" by default)
>> #
>> $ docker run -it --rm --name rabbitmq -p 127.0.0.1:5672:5672 -p
>> 127.0.0.1:15672:15672 rabbitmq:3-management
>>
>> #
>> # Stream to the RabbitMQ broker:
>> #
>> $ ./ffmpeg -re -f lavfi -i yuvtestsrc -codec:v libx264 -f mpegts
>> -routing_key "amqp" -exchange "amq.direct" amqp://localhost
>>
>> #
>> # Connect any number of clients to fetch data from the broker:
>> # The clients are filtered by the routing_key and exchange.
>> #
>> $ ./ffplay -routing_key "amqp" -exchange "amq.direct" amqp://localhost
>>
>>  Changelog               |   1 +
>>  configure               |   5 +
>>  doc/general.texi        |   1 +
>>  doc/protocols.texi      |  60 +++++++++
>>  libavformat/Makefile    |   1 +
>>  libavformat/libamqp.c   | 286 ++++++++++++++++++++++++++++++++++++++++
>>  libavformat/protocols.c |   1 +
>>  libavformat/version.h   |   4 +-
>>  8 files changed, 357 insertions(+), 2 deletions(-)
>>  create mode 100644 libavformat/libamqp.c
>>
>> diff --git a/Changelog b/Changelog
>> index cb310a3abc2..ab30d670a15 100644
>> --- a/Changelog
>> +++ b/Changelog
>> @@ -43,6 +43,7 @@ version <next>:
>>  - Rayman 2 ADPCM decoder
>>  - Rayman 2 APM demuxer
>>  - cas video filter
>> +- AMQP 0-9-1 protocol (RabbitMQ)
>>
>>
>>  version 4.2:
>> diff --git a/configure b/configure
>> index 06e3a7b2a88..8b171349440 100755
>> --- a/configure
>> +++ b/configure
>> @@ -255,6 +255,7 @@ External library support:
>>    --enable-libopenmpt      enable decoding tracked files via libopenmpt
>> [no]
>>    --enable-libopus         enable Opus de/encoding via libopus [no]
>>    --enable-libpulse        enable Pulseaudio input via libpulse [no]
>> +  --enable-librabbitmq     enable RabbitMQ library [no]
>>    --enable-librav1e        enable AV1 encoding via rav1e [no]
>>    --enable-librsvg         enable SVG rasterization via librsvg [no]
>>    --enable-librubberband   enable rubberband needed for rubberband filter
>> [no]
>> @@ -1789,6 +1790,7 @@ EXTERNAL_LIBRARY_LIST="
>>      libopenmpt
>>      libopus
>>      libpulse
>> +    librabbitmq
>>      librav1e
>>      librsvg
>>      librtmp
>> @@ -3434,6 +3436,8 @@ unix_protocol_deps="sys_un_h"
>>  unix_protocol_select="network"
>>
>>  # external library protocols
>> +libamqp_protocol_deps="librabbitmq"
>> +libamqp_protocol_select="network"
>>  librtmp_protocol_deps="librtmp"
>>  librtmpe_protocol_deps="librtmp"
>>  librtmps_protocol_deps="librtmp"
>> @@ -6317,6 +6321,7 @@ enabled libopus           && {
>>      }
>>  }
>>  enabled libpulse          && require_pkg_config libpulse libpulse
>> pulse/pulseaudio.h pa_context_new
>> +enabled librabbitmq       && require_pkg_config librabbitmq "librabbitmq >=
>> 0.7.1" amqp.h amqp_new_connection
>>  enabled librav1e          && require_pkg_config librav1e "rav1e >= 0.1.0"
>> rav1e.h rav1e_context_new
>>  enabled librsvg           && require_pkg_config librsvg librsvg-2.0
>> librsvg-2.0/librsvg/rsvg.h rsvg_handle_render_cairo
>>  enabled librtmp           && require_pkg_config librtmp librtmp
>> librtmp/rtmp.h RTMP_Socket
>> diff --git a/doc/general.texi b/doc/general.texi
>> index dbdc3485982..623566dabea 100644
>> --- a/doc/general.texi
>> +++ b/doc/general.texi
>> @@ -1330,6 +1330,7 @@ performance on systems without hardware floating point
>> support).
>>
>>  @multitable @columnfractions .4 .1
>>  @item Name         @tab Support
>> +@item AMQP         @tab X
>>  @item file         @tab X
>>  @item FTP          @tab X
>>  @item Gopher       @tab X
>> diff --git a/doc/protocols.texi b/doc/protocols.texi
>> index 54a287f488b..dc5f49ba8cc 100644
>> --- a/doc/protocols.texi
>> +++ b/doc/protocols.texi
>> @@ -51,6 +51,66 @@ in microseconds.
>>
>>  A description of the currently available protocols follows.
>>
>> +@section amqp
>> +
>> +Advanced Message Queueing Protocol (AMQP) version 0-9-1 is a broker based
>> +publish-subscribe communication protocol.
>> +
>> +FFmpeg must be compiled with --enable-librabbitmq to support AMQP. A
>> separate
>> +AMQP broker must also be run. An example open-source AMQP broker is
>> RabbitMQ.
>> +
>> +After starting the broker, an FFmpeg client may stream data to the broker
>> using
>> +the command:
>> +
>> +@example
>> +ffmpeg -re -i input -f mpegts amqp://[[user]:[password]@@]hostname[:port]
>> +@end example
>> +
>> +Where hostname and port (default is 5672) is the address of the broker. The
>> +client may also set a user/password for authentication. The default for
>> both
>> +fields is "guest".
>> +
>> +Muliple subscribers may stream from the broker using the command:
>> +@example
>> +ffplay amqp://[[user]:[password]@@]hostname[:port]
>> +@end example
>> +
>> +In RabbitMQ all data published to the broker flows through a specific
>> exchange,
>> +and each subscribing client has an assigned queue/buffer. When a packet
>> arrives
>> +at an exchange, it may be copied to a client's queue depending on the
>> exchange
>> +and routing_key fields.
>> +
>> +The following options are supported:
>> +
>> +@table @option
>> +
>> +@item exchange
>> +Sets the exchange to use on the broker. RabbitMQ has several predefined
>> +exchanges: "amq.direct" is the default exchange, where the publisher and
>> +subscriber must have a matching routing_key; "amq.fanout" is the same as a
>> +broadcast operation (i.e. the data is forwarded to all queues on the fanout
>> +exchange independent of the routing_key); and "amq.topic" is similar to
>> +"amq.direct", but allows for more complex pattern matching (refer to the
>> RabbitMQ
>> +documentation).
>> +
>> +@item routing_key
>> +Sets the routing key. The default value is "amqp". The routing key is used
>> on
>> +the "amq.direct" and "amq.topic" exchanges to decide whether packets are
>> written
>> +to the queue of a subscriber.
>> +
>> +@item pkt_size
>> +Maximum size of each packet sent/received to the broker. Default is 131072.
>> +Minimum is 4096 and max is any large value (representable by an int). When
>> +receiving packets, this sets an internal buffer size in FFmpeg. It should
>> be
>> +equal to or greater than the size of the published packets to the broker.
>> Otherwise
>> +the received message may be truncated causing decoding errors.
>> +
>> +@item connection_timeout
>> +The timeout in microseconds during the initial connection to the broker.
>> The
>> +default value is rw_timeout, or 5000000 microseconds if rw_timeout is not
>> set.
>> +
>> +@end table
>> +
>>  @section async
>>
>>  Asynchronous data filling wrapper for input stream.
>> diff --git a/libavformat/Makefile b/libavformat/Makefile
>> index e0681058a29..52b44289fee 100644
>> --- a/libavformat/Makefile
>> +++ b/libavformat/Makefile
>> @@ -629,6 +629,7 @@ OBJS-$(CONFIG_UDPLITE_PROTOCOL)          += udp.o ip.o
>>  OBJS-$(CONFIG_UNIX_PROTOCOL)             += unix.o
>>
>>  # external library protocols
>> +OBJS-$(CONFIG_LIBAMQP_PROTOCOL)          += libamqp.o
>>  OBJS-$(CONFIG_LIBRTMP_PROTOCOL)          += librtmp.o
>>  OBJS-$(CONFIG_LIBRTMPE_PROTOCOL)         += librtmp.o
>>  OBJS-$(CONFIG_LIBRTMPS_PROTOCOL)         += librtmp.o
>> diff --git a/libavformat/libamqp.c b/libavformat/libamqp.c
>> new file mode 100644
>> index 00000000000..baa42306bb0
>> --- /dev/null
>> +++ b/libavformat/libamqp.c
>> @@ -0,0 +1,286 @@
>> +/*
>> + * Advanced Message Queuing Protocol (AMQP) 0-9-1
>> + * Copyright (c) 2020 Andriy Gelman
>> + *
>> + * This file is part of FFmpeg.
>> + *
>> + * FFmpeg is free software; you can redistribute it and/or
>> + * modify it under the terms of the GNU Lesser General Public
>> + * License as published by the Free Software Foundation; either
>> + * version 2.1 of the License, or (at your option) any later version.
>> + *
>> + * FFmpeg is distributed in the hope that it will be useful,
>> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
>> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
>> + * Lesser General Public License for more details.
>> + *
>> + * You should have received a copy of the GNU Lesser General Public
>> + * License along with FFmpeg; if not, write to the Free Software
>> + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
>> USA
>> + */
>> +
>> +#include <amqp.h>
>> +#include <amqp_tcp_socket.h>
>> +#include <sys/time.h>
>> +#include "avformat.h"
>> +#include "libavformat/urldecode.h"
>> +#include "libavutil/avstring.h"
>> +#include "libavutil/opt.h"
>> +#include "libavutil/time.h"
>> +#include "network.h"
>> +#include "url.h"
>> +
>> +typedef struct AMQPContext {
>> +    const AVClass *class;
>> +    amqp_connection_state_t conn;
>> +    amqp_socket_t *socket;
>> +    const char *exchange;
>> +    const char *routing_key;
>> +    int pkt_size;
>> +    int64_t connection_timeout;
>> +    int pkt_size_overflow;
>> +} AMQPContext;
>> +
>> +#define STR_LEN           1024
>> +#define DEFAULT_CHANNEL   1
>> +
>> +#define OFFSET(x) offsetof(AMQPContext, x)
>> +#define D AV_OPT_FLAG_DECODING_PARAM
>> +#define E AV_OPT_FLAG_ENCODING_PARAM
>> +static const AVOption options[] = {
>> +    { "pkt_size", "Maximum send/read packet size", OFFSET(pkt_size),
>> AV_OPT_TYPE_INT, { .i64 = 131072 }, 4096, INT_MAX, .flags = D | E },
>> +    { "exchange", "Exchange to send/read packets", OFFSET(exchange),
>> AV_OPT_TYPE_STRING, { .str = "amq.direct" }, 0, 0, .flags = D | E },
>> +    { "routing_key", "Key to filter streams", OFFSET(routing_key),
>> AV_OPT_TYPE_STRING, { .str = "amqp" }, 0, 0, .flags = D | E },
>> +    { "connection_timeout", "Initial connection timeout",
>> OFFSET(connection_timeout), AV_OPT_TYPE_DURATION, { .i64 = -1 }, -1,
>> INT_MAX, .flags = D | E},
>> +    { NULL }
>> +};
>> +
>> +static int amqp_proto_open(URLContext *h, const char *uri, int flags)
>> +{
>> +    int ret, server_msg;
>> +    char hostname[STR_LEN], credentials[STR_LEN];
>> +    char *credentials_decoded;
>> +    int port;
>> +    const char *user, *password = NULL;
>> +    char *p;
>> +    amqp_rpc_reply_t broker_reply;
>> +    struct timeval tval = { 0 };
>> +
>> +    AMQPContext *s = h->priv_data;
>> +
>> +    h->is_streamed     = 1;
>> +    h->max_packet_size = s->pkt_size;
>> +
>> +    av_url_split(NULL, 0, credentials, sizeof(credentials),
>> +                 hostname, sizeof(hostname), &port, NULL, 0, uri);
>> +
>> +    if (port < 0)
>> +        port = 5672;
>> +
>> +    if (hostname[0] == '\0' || port <= 0 || port > 65535 ) {
>> +        av_log(h, AV_LOG_ERROR, "Invalid hostname/port\n");
>> +        return AVERROR(EINVAL);
>> +    }
>> +
>> +    credentials_decoded = ff_urldecode(credentials, 0);
>> +    if (!credentials_decoded)
>> +        return AVERROR(ENOMEM);
>> +
>> +    p = strchr(credentials_decoded, ':');
>> +    if (p) {
>> +        *p = '\0';
>> +        password = p + 1;
>> +    }
>> +
>> +    if (!password || *password == '\0')
>> +        password = "guest";
>> +
>> +    user = credentials_decoded;
>> +    if (*user == '\0')
>> +        user = "guest";
>> +
>> +    s->conn = amqp_new_connection();
>> +    if (!s->conn) {
>> +        av_log(h, AV_LOG_ERROR, "Error creating connection\n");
>> +        return AVERROR_EXTERNAL;
>> +    }
>> +
>> +    s->socket = amqp_tcp_socket_new(s->conn);
>> +    if (!s->socket) {
>> +        av_log(h, AV_LOG_ERROR, "Error creating socket\n");
>> +        goto destroy_connection;
>> +    }
>> +
>> +    if (s->connection_timeout < 0)
>> +        s->connection_timeout = (h->rw_timeout > 0 ? h->rw_timeout :
>> 5000000);
>> +
>> +    tval.tv_sec  = s->connection_timeout / 1000000;
>> +    tval.tv_usec = s->connection_timeout % 1000000;
>> +    ret = amqp_socket_open_noblock(s->socket, hostname, port, &tval);
>> +
>> +    if (ret) {
>> +        av_log(h, AV_LOG_ERROR, "Error connecting to server\n");
>> +        goto destroy_connection;
>> +    }
>> +
>> +    broker_reply = amqp_login(s->conn, "/", 0, s->pkt_size, 0,
>> +                              AMQP_SASL_METHOD_PLAIN, user, password);
>> +
>> +    if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
>> +        av_log(h, AV_LOG_ERROR, "Error login\n");
>> +        server_msg = AMQP_ACCESS_REFUSED;
>> +        goto close_connection;
>> +    }
>> +
>> +    amqp_channel_open(s->conn, DEFAULT_CHANNEL);
>> +    broker_reply = amqp_get_rpc_reply(s->conn);
>> +
>> +    if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
>> +        av_log(h, AV_LOG_ERROR, "Error set channel\n");
>> +        server_msg = AMQP_CHANNEL_ERROR;
>> +        goto close_connection;
>> +    }
>> +
>> +    if (h->flags & AVIO_FLAG_READ) {
>> +        amqp_bytes_t queuename;
>> +        char queuename_buff[STR_LEN];
>> +        amqp_queue_declare_ok_t *r;
>> +
>> +        r = amqp_queue_declare(s->conn, DEFAULT_CHANNEL, amqp_empty_bytes,
>> +                               0, 0, 0, 1, amqp_empty_table);
>> +        broker_reply = amqp_get_rpc_reply(s->conn);
>> +        if (!r || broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
>> +            av_log(h, AV_LOG_ERROR, "Error declare queue\n");
>> +            server_msg = AMQP_RESOURCE_ERROR;
>> +            goto close_channel;
>> +        }
>> +
>> +        /* store queuename */
>> +        queuename.bytes = queuename_buff;
>> +        queuename.len = FFMIN(r->queue.len, STR_LEN);
>> +        memcpy(queuename.bytes, r->queue.bytes, queuename.len);
>> +
>> +        amqp_queue_bind(s->conn, DEFAULT_CHANNEL, queuename,
>> amqp_cstring_bytes(s->exchange),
>> +                        amqp_cstring_bytes(s->routing_key),
>> amqp_empty_table);
>> +
>> +        broker_reply = amqp_get_rpc_reply(s->conn);
>> +        if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
>> +            av_log(h, AV_LOG_ERROR, "Queue bind error\n");
>> +            server_msg = AMQP_INTERNAL_ERROR;
>> +            goto close_channel;
>> +        }
>> +
>> +        amqp_basic_consume(s->conn, DEFAULT_CHANNEL, queuename,
>> amqp_empty_bytes,
>> +                           0, 1, 0, amqp_empty_table);
>> +
>> +        broker_reply = amqp_get_rpc_reply(s->conn);
>> +        if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
>> +            av_log(h, AV_LOG_ERROR, "Set consume error\n");
>> +            server_msg = AMQP_INTERNAL_ERROR;
>> +            goto close_channel;
>> +        }
>> +    }
>> +
>> +    av_freep(&credentials_decoded);
>> +    return 0;
>> +
>> +close_channel:
>> +    amqp_channel_close(s->conn, DEFAULT_CHANNEL, server_msg);
>> +close_connection:
>> +    amqp_connection_close(s->conn, server_msg);
>> +destroy_connection:
>> +    amqp_destroy_connection(s->conn);
>> +
>> +    av_freep(&credentials_decoded);
>> +    return AVERROR_EXTERNAL;
>> +}
>> +
>> +static int amqp_proto_write(URLContext *h, const unsigned char *buf, int
>> size)
>> +{
>> +    int ret;
>> +    AMQPContext *s = h->priv_data;
>> +    int fd = amqp_socket_get_sockfd(s->socket);
>> +
>> +    amqp_bytes_t message = { size, (void *)buf };
>> +    amqp_basic_properties_t props;
>> +
>> +    ret = ff_network_wait_fd_timeout(fd, 1, h->rw_timeout,
>> &h->interrupt_callback);
>> +    if (ret)
>> +        return ret;
>> +
>> +    props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG |
>> AMQP_BASIC_DELIVERY_MODE_FLAG;
>> +    props.content_type = amqp_cstring_bytes("octet/stream");
>> +    props.delivery_mode = 2; /* persistent delivery mode */
>> +
>> +    ret = amqp_basic_publish(s->conn, DEFAULT_CHANNEL,
>> amqp_cstring_bytes(s->exchange),
>> +                             amqp_cstring_bytes(s->routing_key), 0, 0,
>> +                             &props, message);
>> +
>> +    if (ret) {
>> +        av_log(h, AV_LOG_ERROR, "Error publish\n");
>> +        return AVERROR_EXTERNAL;
>> +    }
>> +
>> +    return size;
>> +}
>> +
>> +static int amqp_proto_read(URLContext *h, unsigned char *buf, int size)
>> +{
>> +    AMQPContext *s = h->priv_data;
>> +    int fd = amqp_socket_get_sockfd(s->socket);
>> +    int ret;
>> +
>> +    amqp_rpc_reply_t broker_reply;
>> +    amqp_envelope_t envelope;
>> +
>> +    ret = ff_network_wait_fd_timeout(fd, 0, h->rw_timeout,
>> &h->interrupt_callback);
>> +    if (ret)
>> +        return ret;
>> +
>> +    amqp_maybe_release_buffers(s->conn);
>> +    broker_reply = amqp_consume_message(s->conn, &envelope, NULL, 0);
>> +
>> +    if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL)
>> +        return AVERROR_EXTERNAL;
>> +
>> +    if (envelope.message.body.len > size) {
>> +        s->pkt_size_overflow = FFMAX(s->pkt_size_overflow,
>> envelope.message.body.len);
>> +        av_log(h, AV_LOG_WARNING, "Message exceeds space in the buffer. "
>> +                                  "Message will be truncated. Setting
>> -pkt_size %d "
>> +                                  "may resolve this issue.\n",
>> s->pkt_size_overflow);
>> +    }
>> +    size = FFMIN(size, envelope.message.body.len);
>> +
>> +    memcpy(buf, envelope.message.body.bytes, size);
>> +    amqp_destroy_envelope(&envelope);
>> +
>> +    return size;
>> +}
>> +
>> +static int amqp_proto_close(URLContext *h)
>> +{
>> +    AMQPContext *s = h->priv_data;
>> +    amqp_channel_close(s->conn, DEFAULT_CHANNEL, AMQP_REPLY_SUCCESS);
>> +    amqp_connection_close(s->conn, AMQP_REPLY_SUCCESS);
>> +    amqp_destroy_connection(s->conn);
>> +
>> +    return 0;
>> +}
>> +
>> +static const AVClass amqp_context_class = {
>> +    .class_name = "amqp",
>> +    .item_name  = av_default_item_name,
>> +    .option     = options,
>> +    .version    = LIBAVUTIL_VERSION_INT,
>> +};
>> +
>> +const URLProtocol ff_libamqp_protocol = {
>> +    .name            = "amqp",
>> +    .url_close       = amqp_proto_close,
>> +    .url_open        = amqp_proto_open,
>> +    .url_read        = amqp_proto_read,
>> +    .url_write       = amqp_proto_write,
>> +    .priv_data_size  = sizeof(AMQPContext),
>> +    .priv_data_class = &amqp_context_class,
>> +    .flags           = URL_PROTOCOL_FLAG_NETWORK,
>> +};
>> diff --git a/libavformat/protocols.c b/libavformat/protocols.c
>> index 29fb99e7fa3..f1b8eab0fd6 100644
>> --- a/libavformat/protocols.c
>> +++ b/libavformat/protocols.c
>> @@ -60,6 +60,7 @@ extern const URLProtocol ff_tls_protocol;
>>  extern const URLProtocol ff_udp_protocol;
>>  extern const URLProtocol ff_udplite_protocol;
>>  extern const URLProtocol ff_unix_protocol;
>> +extern const URLProtocol ff_libamqp_protocol;
>>  extern const URLProtocol ff_librtmp_protocol;
>>  extern const URLProtocol ff_librtmpe_protocol;
>>  extern const URLProtocol ff_librtmps_protocol;
>> diff --git a/libavformat/version.h b/libavformat/version.h
>> index 4724269b3c4..a233b673512 100644
>> --- a/libavformat/version.h
>> +++ b/libavformat/version.h
>> @@ -32,8 +32,8 @@
>>  // Major bumping may affect Ticket5467, 5421, 5451(compatibility with
>> Chromium)
>>  // Also please add any ticket numbers that you believe might be affected
>> here
>>  #define LIBAVFORMAT_VERSION_MAJOR  58
>> -#define LIBAVFORMAT_VERSION_MINOR  39
>> -#define LIBAVFORMAT_VERSION_MICRO 101
>> +#define LIBAVFORMAT_VERSION_MINOR  40
>> +#define LIBAVFORMAT_VERSION_MICRO 100
>>
>>  #define LIBAVFORMAT_VERSION_INT AV_VERSION_INT(LIBAVFORMAT_VERSION_MAJOR, \
>>                                                 LIBAVFORMAT_VERSION_MINOR, \
>> --
>> 2.25.0
>>
>> _______________________________________________
>> ffmpeg-devel mailing list
>> ffmpeg-devel@ffmpeg.org
>> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>>
>> To unsubscribe, visit link above, or email
>> ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel@ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
Marton Balint Feb. 29, 2020, 4:19 p.m. UTC | #6
On Sat, 29 Feb 2020, Carl Eugen Hoyos wrote:

> Am Fr., 28. Feb. 2020 um 21:57 Uhr schrieb Andriy Gelman
> <andriy.gelman@gmail.com>:
>
>> @@ -1789,6 +1790,7 @@ EXTERNAL_LIBRARY_LIST="
>>      libopenmpt
>>      libopus
>>      libpulse
>> +    librabbitmq
>>      librav1e
>>      librsvg
>>      librtmp
>
> If the patch is acceptable, I believe this belongs in EXTERNAL_LIBRARY_GPL_LIST

librabbitmq license is MIT, so are you sure about this?

https://github.com/alanxz/rabbitmq-c/blob/master/LICENSE-MIT

Thanks,
Marton
Carl Eugen Hoyos Feb. 29, 2020, 6:38 p.m. UTC | #7
> Am 29.02.2020 um 17:19 schrieb Marton Balint <cus@passwd.hu>:
> 
> 
> 
>> On Sat, 29 Feb 2020, Carl Eugen Hoyos wrote:
>> 
>> Am Fr., 28. Feb. 2020 um 21:57 Uhr schrieb Andriy Gelman
>> <andriy.gelman@gmail.com>:
>> 
>>> @@ -1789,6 +1790,7 @@ EXTERNAL_LIBRARY_LIST="
>>>     libopenmpt
>>>     libopus
>>>     libpulse
>>> +    librabbitmq
>>>     librav1e
>>>     librsvg
>>>     librtmp
>> 
>> If the patch is acceptable, I believe this belongs in EXTERNAL_LIBRARY_GPL_LIST
> 
> librabbitmq license is MIT, so are you sure about this?
> 
> https://github.com/alanxz/rabbitmq-c/blob/master/LICENSE-MIT

Yes, definitely.

Sorry for the noise, Carl Eugen
Anton Khirnov March 2, 2020, 10:41 a.m. UTC | #8
Quoting Marton Balint (2020-02-29 17:09:58)
> 
> 
> On Sat, 29 Feb 2020, Paul B Mahol wrote:
> 
> > I think this was already rejected?
> 
> jb questioned if this belongs to libavformat, and timo asked how well the 
> message brokers handle high bitrates/big message sizes, no hard rejects 
> were made as far as I remember.
> 
> Andriy provided numbers for scaling, I have not answered the concerns 
> regarding libavformat integration, because I am not sure I understand the 
> concern. AMQP is a general purpose protocol for message transfer, it even 
> has an official URL scheme, so when we integrate it into libavformat as a 
> *protocol* I don't really see why it would not fit into the framework or 
> what can be gained if it is implemented separately.
> 
> If people still have hard feelings against merging this, please speak up, 
> but I honestly don't see a problem with it.

I would say the question is what is it for? Is anyone using (or
intending to use) this for multimedia transfer? What are the advantages
of doing that over other existing protocols? "why not" does not seem
like a sufficient argument to me.
Carl Eugen Hoyos March 2, 2020, 10:44 a.m. UTC | #9
Am Mo., 2. März 2020 um 11:41 Uhr schrieb Anton Khirnov <anton@khirnov.net>:
>
> Quoting Marton Balint (2020-02-29 17:09:58)
> >
> >
> > On Sat, 29 Feb 2020, Paul B Mahol wrote:
> >
> > > I think this was already rejected?
> >
> > jb questioned if this belongs to libavformat, and timo asked how well the
> > message brokers handle high bitrates/big message sizes, no hard rejects
> > were made as far as I remember.
> >
> > Andriy provided numbers for scaling, I have not answered the concerns
> > regarding libavformat integration, because I am not sure I understand the
> > concern. AMQP is a general purpose protocol for message transfer, it even
> > has an official URL scheme, so when we integrate it into libavformat as a
> > *protocol* I don't really see why it would not fit into the framework or
> > what can be gained if it is implemented separately.
> >
> > If people still have hard feelings against merging this, please speak up,
> > but I honestly don't see a problem with it.
>
> I would say the question is what is it for? Is anyone using (or
> intending to use) this for multimedia transfer? What are the advantages
> of doing that over other existing protocols? "why not" does not seem
> like a sufficient argument to me.

I can only say that I (heavily) object to your argumentation, it is
basically against everything FFmpeg stands for (imo).

Carl Eugen
Andriy Gelman March 2, 2020, 4:32 p.m. UTC | #10
On Mon, 02. Mar 11:41, Anton Khirnov wrote:
> Quoting Marton Balint (2020-02-29 17:09:58)
> > 
> > 
> > On Sat, 29 Feb 2020, Paul B Mahol wrote:
> > 
> > > I think this was already rejected?
> > 
> > jb questioned if this belongs to libavformat, and timo asked how well the 
> > message brokers handle high bitrates/big message sizes, no hard rejects 
> > were made as far as I remember.
> > 
> > Andriy provided numbers for scaling, I have not answered the concerns 
> > regarding libavformat integration, because I am not sure I understand the 
> > concern. AMQP is a general purpose protocol for message transfer, it even 
> > has an official URL scheme, so when we integrate it into libavformat as a 
> > *protocol* I don't really see why it would not fit into the framework or 
> > what can be gained if it is implemented separately.
> > 
> > If people still have hard feelings against merging this, please speak up, 
> > but I honestly don't see a problem with it.

> 
> I would say the question is what is it for? Is anyone using (or
> intending to use) this for multimedia transfer? What are the advantages
> of doing that over other existing protocols? "why not" does not seem
> like a sufficient argument to me.

We have 5-10 nodes that are already talking to each other via a RabbitMQ broker.
A few of these nodes may be publishing low bitrate multimedia content, and new
ones may join the network.

The idea is to re-use the RabbitMQ broker instead of using a separate video
server, where we can view all the logs (queue sizes, etc.) on the same webui.

Also Marton mentioned a use case here:
http://ffmpeg.org/pipermail/ffmpeg-devel/2020-February/256976.html
Marton Balint March 8, 2020, 11:25 a.m. UTC | #11
> Subject: [FFmpeg-devel] [PATCH v2 1/2] avformat: Add AMQP version 0-9-1 protocol support
> 
> From: Andriy Gelman <andriy.gelman@gmail.com>
> 
> Supports connecting to a RabbitMQ broker via AMQP version 0-9-1.
> 
> Signed-off-by: Andriy Gelman <andriy.gelman@gmail.com>
> ---
> 
> Changes in v2:
>     - Addressed comments from Marton
>     - Updated documentation
> 
> Compilation notes:
>     - Requires librabbitmq-dev package (on ubuntu).
>     - The pkg-config libprabbitmq.pc has a corrupt entry.
>       **update: fixed on the github master branch**
>       The line "Libs.private: rt; -lpthread" should be changed to
>       "Libs.private: -lrt -pthread".
>     - Compile FFmpeg with --enable-librabbitmq
>

[...]

> +@item connection_timeout
> +The timeout in microseconds during the initial connection to the broker. The

In *seconds* (because it is an AV_OPT_TYPE_DURATION)

> +default value is rw_timeout, or 5000000 microseconds if rw_timeout is not set.

5 seconds

[...]

> +static const AVOption options[] = {
> +    { "pkt_size", "Maximum send/read packet size", OFFSET(pkt_size), AV_OPT_TYPE_INT, { .i64 = 131072 }, 4096, INT_MAX, .flags = D | E },
> +    { "exchange", "Exchange to send/read packets", OFFSET(exchange), AV_OPT_TYPE_STRING, { .str = "amq.direct" }, 0, 0, .flags = D | E },
> +    { "routing_key", "Key to filter streams", OFFSET(routing_key), AV_OPT_TYPE_STRING, { .str = "amqp" }, 0, 0, .flags = D | E },
> +    { "connection_timeout", "Initial connection timeout", OFFSET(connection_timeout), AV_OPT_TYPE_DURATION, { .i64 = -1 }, -1, INT_MAX, .flags = D | E},

INT64_MAX can be the maximum.

> +    { NULL }
> +};
> +
> +static int amqp_proto_open(URLContext *h, const char *uri, int flags)
> +{
> +    int ret, server_msg;
> +    char hostname[STR_LEN], credentials[STR_LEN];
> +    char *credentials_decoded;
> +    int port;
> +    const char *user, *password = NULL;
> +    char *p;
> +    amqp_rpc_reply_t broker_reply;
> +    struct timeval tval = { 0 };
> +
> +    AMQPContext *s = h->priv_data;
> +
> +    h->is_streamed     = 1;
> +    h->max_packet_size = s->pkt_size;
> +
> +    av_url_split(NULL, 0, credentials, sizeof(credentials),
> +                 hostname, sizeof(hostname), &port, NULL, 0, uri);
> +
> +    if (port < 0)
> +        port = 5672;
> +
> +    if (hostname[0] == '\0' || port <= 0 || port > 65535 ) {
> +        av_log(h, AV_LOG_ERROR, "Invalid hostname/port\n");
> +        return AVERROR(EINVAL);
> +    }
> +
> +    credentials_decoded = ff_urldecode(credentials, 0);

This is not entirely correct, becase the username may contain ':'
characters... So you should split first and urldecode the splitted
components...

> +    if (!credentials_decoded)
> +        return AVERROR(ENOMEM);
> +
> +    p = strchr(credentials_decoded, ':');
> +    if (p) {
> +        *p = '\0';
> +        password = p + 1;
> +    }
> +
> +    if (!password || *password == '\0')
> +        password = "guest";
> +
> +    user = credentials_decoded;
> +    if (*user == '\0')
> +        user = "guest";
> +
> +    s->conn = amqp_new_connection();
> +    if (!s->conn) {
> +        av_log(h, AV_LOG_ERROR, "Error creating connection\n");
> +        return AVERROR_EXTERNAL;
> +    }
> +
> +    s->socket = amqp_tcp_socket_new(s->conn);
> +    if (!s->socket) {
> +        av_log(h, AV_LOG_ERROR, "Error creating socket\n");
> +        goto destroy_connection;
> +    }
> +
> +    if (s->connection_timeout < 0)
> +        s->connection_timeout = (h->rw_timeout > 0 ? h->rw_timeout : 5000000);
> +
> +    tval.tv_sec  = s->connection_timeout / 1000000;
> +    tval.tv_usec = s->connection_timeout % 1000000;
> +    ret = amqp_socket_open_noblock(s->socket, hostname, port, &tval);
> +
> +    if (ret) {
> +        av_log(h, AV_LOG_ERROR, "Error connecting to server\n");

This should log the useful error, e.g:
         av_log(h, AV_LOG_ERROR, "Error connecting to server: %s\n", amqp_error_string2(ret));

[...]

> +static int amqp_proto_write(URLContext *h, const unsigned char *buf, int size)
> +{
> +    int ret;
> +    AMQPContext *s = h->priv_data;
> +    int fd = amqp_socket_get_sockfd(s->socket);
> +
> +    amqp_bytes_t message = { size, (void *)buf };
> +    amqp_basic_properties_t props;
> +
> +    ret = ff_network_wait_fd_timeout(fd, 1, h->rw_timeout, &h->interrupt_callback);
> +    if (ret)
> +        return ret;
> +
> +    props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
> +    props.content_type = amqp_cstring_bytes("octet/stream");
> +    props.delivery_mode = 2; /* persistent delivery mode */
> +
> +    ret = amqp_basic_publish(s->conn, DEFAULT_CHANNEL, amqp_cstring_bytes(s->exchange),
> +                             amqp_cstring_bytes(s->routing_key), 0, 0,
> +                             &props, message);
> +
> +    if (ret) {
> +        av_log(h, AV_LOG_ERROR, "Error publish\n");

Same here

> +        return AVERROR_EXTERNAL;
> +    }
> +
> +    return size;
> +}
> +

[...]

Thanks,
Marton
Andriy Gelman March 8, 2020, 8:41 p.m. UTC | #12
On Sun, 08. Mar 12:25, Marton Balint wrote:
> 
> > Subject: [FFmpeg-devel] [PATCH v2 1/2] avformat: Add AMQP version 0-9-1 protocol support
> > 
> > From: Andriy Gelman <andriy.gelman@gmail.com>
> > 
> > Supports connecting to a RabbitMQ broker via AMQP version 0-9-1.
> > 
> > Signed-off-by: Andriy Gelman <andriy.gelman@gmail.com>
> > ---
> > 
> > Changes in v2:
> >     - Addressed comments from Marton
> >     - Updated documentation
> > 
> > Compilation notes:
> >     - Requires librabbitmq-dev package (on ubuntu).
> >     - The pkg-config libprabbitmq.pc has a corrupt entry.
> >       **update: fixed on the github master branch**
> >       The line "Libs.private: rt; -lpthread" should be changed to
> >       "Libs.private: -lrt -pthread".
> >     - Compile FFmpeg with --enable-librabbitmq
> > 
> 
> [...]
> 
> > +@item connection_timeout
> > +The timeout in microseconds during the initial connection to the broker. The
> 
> In *seconds* (because it is an AV_OPT_TYPE_DURATION)
> 
> > +default value is rw_timeout, or 5000000 microseconds if rw_timeout is not set.
> 
> 5 seconds
> 
> [...]
> 
> > +static const AVOption options[] = {
> > +    { "pkt_size", "Maximum send/read packet size", OFFSET(pkt_size), AV_OPT_TYPE_INT, { .i64 = 131072 }, 4096, INT_MAX, .flags = D | E },
> > +    { "exchange", "Exchange to send/read packets", OFFSET(exchange), AV_OPT_TYPE_STRING, { .str = "amq.direct" }, 0, 0, .flags = D | E },
> > +    { "routing_key", "Key to filter streams", OFFSET(routing_key), AV_OPT_TYPE_STRING, { .str = "amqp" }, 0, 0, .flags = D | E },
> > +    { "connection_timeout", "Initial connection timeout", OFFSET(connection_timeout), AV_OPT_TYPE_DURATION, { .i64 = -1 }, -1, INT_MAX, .flags = D | E},
> 
> INT64_MAX can be the maximum.
> 
> > +    { NULL }
> > +};
> > +
> > +static int amqp_proto_open(URLContext *h, const char *uri, int flags)
> > +{
> > +    int ret, server_msg;
> > +    char hostname[STR_LEN], credentials[STR_LEN];
> > +    char *credentials_decoded;
> > +    int port;
> > +    const char *user, *password = NULL;
> > +    char *p;
> > +    amqp_rpc_reply_t broker_reply;
> > +    struct timeval tval = { 0 };
> > +
> > +    AMQPContext *s = h->priv_data;
> > +
> > +    h->is_streamed     = 1;
> > +    h->max_packet_size = s->pkt_size;
> > +
> > +    av_url_split(NULL, 0, credentials, sizeof(credentials),
> > +                 hostname, sizeof(hostname), &port, NULL, 0, uri);
> > +
> > +    if (port < 0)
> > +        port = 5672;
> > +
> > +    if (hostname[0] == '\0' || port <= 0 || port > 65535 ) {
> > +        av_log(h, AV_LOG_ERROR, "Invalid hostname/port\n");
> > +        return AVERROR(EINVAL);
> > +    }
> > +
> > +    credentials_decoded = ff_urldecode(credentials, 0);

> 
> This is not entirely correct, becase the username may contain ':'
> characters... So you should split first and urldecode the splitted
> components...
> 

Thanks for reviewing and catching these two.
diff mbox series

Patch

diff --git a/Changelog b/Changelog
index cb310a3abc2..ab30d670a15 100644
--- a/Changelog
+++ b/Changelog
@@ -43,6 +43,7 @@  version <next>:
 - Rayman 2 ADPCM decoder
 - Rayman 2 APM demuxer
 - cas video filter
+- AMQP 0-9-1 protocol (RabbitMQ)
 
 
 version 4.2:
diff --git a/configure b/configure
index 06e3a7b2a88..8b171349440 100755
--- a/configure
+++ b/configure
@@ -255,6 +255,7 @@  External library support:
   --enable-libopenmpt      enable decoding tracked files via libopenmpt [no]
   --enable-libopus         enable Opus de/encoding via libopus [no]
   --enable-libpulse        enable Pulseaudio input via libpulse [no]
+  --enable-librabbitmq     enable RabbitMQ library [no]
   --enable-librav1e        enable AV1 encoding via rav1e [no]
   --enable-librsvg         enable SVG rasterization via librsvg [no]
   --enable-librubberband   enable rubberband needed for rubberband filter [no]
@@ -1789,6 +1790,7 @@  EXTERNAL_LIBRARY_LIST="
     libopenmpt
     libopus
     libpulse
+    librabbitmq
     librav1e
     librsvg
     librtmp
@@ -3434,6 +3436,8 @@  unix_protocol_deps="sys_un_h"
 unix_protocol_select="network"
 
 # external library protocols
+libamqp_protocol_deps="librabbitmq"
+libamqp_protocol_select="network"
 librtmp_protocol_deps="librtmp"
 librtmpe_protocol_deps="librtmp"
 librtmps_protocol_deps="librtmp"
@@ -6317,6 +6321,7 @@  enabled libopus           && {
     }
 }
 enabled libpulse          && require_pkg_config libpulse libpulse pulse/pulseaudio.h pa_context_new
+enabled librabbitmq       && require_pkg_config librabbitmq "librabbitmq >= 0.7.1" amqp.h amqp_new_connection
 enabled librav1e          && require_pkg_config librav1e "rav1e >= 0.1.0" rav1e.h rav1e_context_new
 enabled librsvg           && require_pkg_config librsvg librsvg-2.0 librsvg-2.0/librsvg/rsvg.h rsvg_handle_render_cairo
 enabled librtmp           && require_pkg_config librtmp librtmp librtmp/rtmp.h RTMP_Socket
diff --git a/doc/general.texi b/doc/general.texi
index dbdc3485982..623566dabea 100644
--- a/doc/general.texi
+++ b/doc/general.texi
@@ -1330,6 +1330,7 @@  performance on systems without hardware floating point support).
 
 @multitable @columnfractions .4 .1
 @item Name         @tab Support
+@item AMQP         @tab X
 @item file         @tab X
 @item FTP          @tab X
 @item Gopher       @tab X
diff --git a/doc/protocols.texi b/doc/protocols.texi
index 54a287f488b..dc5f49ba8cc 100644
--- a/doc/protocols.texi
+++ b/doc/protocols.texi
@@ -51,6 +51,66 @@  in microseconds.
 
 A description of the currently available protocols follows.
 
+@section amqp
+
+Advanced Message Queueing Protocol (AMQP) version 0-9-1 is a broker based
+publish-subscribe communication protocol.
+
+FFmpeg must be compiled with --enable-librabbitmq to support AMQP. A separate
+AMQP broker must also be run. An example open-source AMQP broker is RabbitMQ.
+
+After starting the broker, an FFmpeg client may stream data to the broker using
+the command:
+
+@example
+ffmpeg -re -i input -f mpegts amqp://[[user]:[password]@@]hostname[:port]
+@end example
+
+Where hostname and port (default is 5672) is the address of the broker. The
+client may also set a user/password for authentication. The default for both
+fields is "guest".
+
+Muliple subscribers may stream from the broker using the command:
+@example
+ffplay amqp://[[user]:[password]@@]hostname[:port]
+@end example
+
+In RabbitMQ all data published to the broker flows through a specific exchange,
+and each subscribing client has an assigned queue/buffer. When a packet arrives
+at an exchange, it may be copied to a client's queue depending on the exchange
+and routing_key fields.
+
+The following options are supported:
+
+@table @option
+
+@item exchange
+Sets the exchange to use on the broker. RabbitMQ has several predefined
+exchanges: "amq.direct" is the default exchange, where the publisher and
+subscriber must have a matching routing_key; "amq.fanout" is the same as a
+broadcast operation (i.e. the data is forwarded to all queues on the fanout
+exchange independent of the routing_key); and "amq.topic" is similar to
+"amq.direct", but allows for more complex pattern matching (refer to the RabbitMQ
+documentation).
+
+@item routing_key
+Sets the routing key. The default value is "amqp". The routing key is used on
+the "amq.direct" and "amq.topic" exchanges to decide whether packets are written
+to the queue of a subscriber.
+
+@item pkt_size
+Maximum size of each packet sent/received to the broker. Default is 131072.
+Minimum is 4096 and max is any large value (representable by an int). When
+receiving packets, this sets an internal buffer size in FFmpeg. It should be
+equal to or greater than the size of the published packets to the broker. Otherwise
+the received message may be truncated causing decoding errors.
+
+@item connection_timeout
+The timeout in microseconds during the initial connection to the broker. The
+default value is rw_timeout, or 5000000 microseconds if rw_timeout is not set.
+
+@end table
+
 @section async
 
 Asynchronous data filling wrapper for input stream.
diff --git a/libavformat/Makefile b/libavformat/Makefile
index e0681058a29..52b44289fee 100644
--- a/libavformat/Makefile
+++ b/libavformat/Makefile
@@ -629,6 +629,7 @@  OBJS-$(CONFIG_UDPLITE_PROTOCOL)          += udp.o ip.o
 OBJS-$(CONFIG_UNIX_PROTOCOL)             += unix.o
 
 # external library protocols
+OBJS-$(CONFIG_LIBAMQP_PROTOCOL)          += libamqp.o
 OBJS-$(CONFIG_LIBRTMP_PROTOCOL)          += librtmp.o
 OBJS-$(CONFIG_LIBRTMPE_PROTOCOL)         += librtmp.o
 OBJS-$(CONFIG_LIBRTMPS_PROTOCOL)         += librtmp.o
diff --git a/libavformat/libamqp.c b/libavformat/libamqp.c
new file mode 100644
index 00000000000..baa42306bb0
--- /dev/null
+++ b/libavformat/libamqp.c
@@ -0,0 +1,286 @@ 
+/*
+ * Advanced Message Queuing Protocol (AMQP) 0-9-1
+ * Copyright (c) 2020 Andriy Gelman
+ *
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <amqp.h>
+#include <amqp_tcp_socket.h>
+#include <sys/time.h>
+#include "avformat.h"
+#include "libavformat/urldecode.h"
+#include "libavutil/avstring.h"
+#include "libavutil/opt.h"
+#include "libavutil/time.h"
+#include "network.h"
+#include "url.h"
+
+typedef struct AMQPContext {
+    const AVClass *class;
+    amqp_connection_state_t conn;
+    amqp_socket_t *socket;
+    const char *exchange;
+    const char *routing_key;
+    int pkt_size;
+    int64_t connection_timeout;
+    int pkt_size_overflow;
+} AMQPContext;
+
+#define STR_LEN           1024
+#define DEFAULT_CHANNEL   1
+
+#define OFFSET(x) offsetof(AMQPContext, x)
+#define D AV_OPT_FLAG_DECODING_PARAM
+#define E AV_OPT_FLAG_ENCODING_PARAM
+static const AVOption options[] = {
+    { "pkt_size", "Maximum send/read packet size", OFFSET(pkt_size), AV_OPT_TYPE_INT, { .i64 = 131072 }, 4096, INT_MAX, .flags = D | E },
+    { "exchange", "Exchange to send/read packets", OFFSET(exchange), AV_OPT_TYPE_STRING, { .str = "amq.direct" }, 0, 0, .flags = D | E },
+    { "routing_key", "Key to filter streams", OFFSET(routing_key), AV_OPT_TYPE_STRING, { .str = "amqp" }, 0, 0, .flags = D | E },
+    { "connection_timeout", "Initial connection timeout", OFFSET(connection_timeout), AV_OPT_TYPE_DURATION, { .i64 = -1 }, -1, INT_MAX, .flags = D | E},
+    { NULL }
+};
+
+static int amqp_proto_open(URLContext *h, const char *uri, int flags)
+{
+    int ret, server_msg;
+    char hostname[STR_LEN], credentials[STR_LEN];
+    char *credentials_decoded;
+    int port;
+    const char *user, *password = NULL;
+    char *p;
+    amqp_rpc_reply_t broker_reply;
+    struct timeval tval = { 0 };
+
+    AMQPContext *s = h->priv_data;
+
+    h->is_streamed     = 1;
+    h->max_packet_size = s->pkt_size;
+
+    av_url_split(NULL, 0, credentials, sizeof(credentials),
+                 hostname, sizeof(hostname), &port, NULL, 0, uri);
+
+    if (port < 0)
+        port = 5672;
+
+    if (hostname[0] == '\0' || port <= 0 || port > 65535 ) {
+        av_log(h, AV_LOG_ERROR, "Invalid hostname/port\n");
+        return AVERROR(EINVAL);
+    }
+
+    credentials_decoded = ff_urldecode(credentials, 0);
+    if (!credentials_decoded)
+        return AVERROR(ENOMEM);
+
+    p = strchr(credentials_decoded, ':');
+    if (p) {
+        *p = '\0';
+        password = p + 1;
+    }
+
+    if (!password || *password == '\0')
+        password = "guest";
+
+    user = credentials_decoded;
+    if (*user == '\0')
+        user = "guest";
+
+    s->conn = amqp_new_connection();
+    if (!s->conn) {
+        av_log(h, AV_LOG_ERROR, "Error creating connection\n");
+        return AVERROR_EXTERNAL;
+    }
+
+    s->socket = amqp_tcp_socket_new(s->conn);
+    if (!s->socket) {
+        av_log(h, AV_LOG_ERROR, "Error creating socket\n");
+        goto destroy_connection;
+    }
+
+    if (s->connection_timeout < 0)
+        s->connection_timeout = (h->rw_timeout > 0 ? h->rw_timeout : 5000000);
+
+    tval.tv_sec  = s->connection_timeout / 1000000;
+    tval.tv_usec = s->connection_timeout % 1000000;
+    ret = amqp_socket_open_noblock(s->socket, hostname, port, &tval);
+
+    if (ret) {
+        av_log(h, AV_LOG_ERROR, "Error connecting to server\n");
+        goto destroy_connection;
+    }
+
+    broker_reply = amqp_login(s->conn, "/", 0, s->pkt_size, 0,
+                              AMQP_SASL_METHOD_PLAIN, user, password);
+
+    if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
+        av_log(h, AV_LOG_ERROR, "Error login\n");
+        server_msg = AMQP_ACCESS_REFUSED;
+        goto close_connection;
+    }
+
+    amqp_channel_open(s->conn, DEFAULT_CHANNEL);
+    broker_reply = amqp_get_rpc_reply(s->conn);
+
+    if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
+        av_log(h, AV_LOG_ERROR, "Error set channel\n");
+        server_msg = AMQP_CHANNEL_ERROR;
+        goto close_connection;
+    }
+
+    if (h->flags & AVIO_FLAG_READ) {
+        amqp_bytes_t queuename;
+        char queuename_buff[STR_LEN];
+        amqp_queue_declare_ok_t *r;
+
+        r = amqp_queue_declare(s->conn, DEFAULT_CHANNEL, amqp_empty_bytes,
+                               0, 0, 0, 1, amqp_empty_table);
+        broker_reply = amqp_get_rpc_reply(s->conn);
+        if (!r || broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
+            av_log(h, AV_LOG_ERROR, "Error declare queue\n");
+            server_msg = AMQP_RESOURCE_ERROR;
+            goto close_channel;
+        }
+
+        /* store queuename */
+        queuename.bytes = queuename_buff;
+        queuename.len = FFMIN(r->queue.len, STR_LEN);
+        memcpy(queuename.bytes, r->queue.bytes, queuename.len);
+
+        amqp_queue_bind(s->conn, DEFAULT_CHANNEL, queuename, amqp_cstring_bytes(s->exchange),
+                        amqp_cstring_bytes(s->routing_key), amqp_empty_table);
+
+        broker_reply = amqp_get_rpc_reply(s->conn);
+        if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
+            av_log(h, AV_LOG_ERROR, "Queue bind error\n");
+            server_msg = AMQP_INTERNAL_ERROR;
+            goto close_channel;
+        }
+
+        amqp_basic_consume(s->conn, DEFAULT_CHANNEL, queuename, amqp_empty_bytes,
+                           0, 1, 0, amqp_empty_table);
+
+        broker_reply = amqp_get_rpc_reply(s->conn);
+        if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
+            av_log(h, AV_LOG_ERROR, "Set consume error\n");
+            server_msg = AMQP_INTERNAL_ERROR;
+            goto close_channel;
+        }
+    }
+
+    av_freep(&credentials_decoded);
+    return 0;
+
+close_channel:
+    amqp_channel_close(s->conn, DEFAULT_CHANNEL, server_msg);
+close_connection:
+    amqp_connection_close(s->conn, server_msg);
+destroy_connection:
+    amqp_destroy_connection(s->conn);
+
+    av_freep(&credentials_decoded);
+    return AVERROR_EXTERNAL;
+}
+
+static int amqp_proto_write(URLContext *h, const unsigned char *buf, int size)
+{
+    int ret;
+    AMQPContext *s = h->priv_data;
+    int fd = amqp_socket_get_sockfd(s->socket);
+
+    amqp_bytes_t message = { size, (void *)buf };
+    amqp_basic_properties_t props;
+
+    ret = ff_network_wait_fd_timeout(fd, 1, h->rw_timeout, &h->interrupt_callback);
+    if (ret)
+        return ret;
+
+    props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
+    props.content_type = amqp_cstring_bytes("octet/stream");
+    props.delivery_mode = 2; /* persistent delivery mode */
+
+    ret = amqp_basic_publish(s->conn, DEFAULT_CHANNEL, amqp_cstring_bytes(s->exchange),
+                             amqp_cstring_bytes(s->routing_key), 0, 0,
+                             &props, message);
+
+    if (ret) {
+        av_log(h, AV_LOG_ERROR, "Error publish\n");
+        return AVERROR_EXTERNAL;
+    }
+
+    return size;
+}
+
+static int amqp_proto_read(URLContext *h, unsigned char *buf, int size)
+{
+    AMQPContext *s = h->priv_data;
+    int fd = amqp_socket_get_sockfd(s->socket);
+    int ret;
+
+    amqp_rpc_reply_t broker_reply;
+    amqp_envelope_t envelope;
+
+    ret = ff_network_wait_fd_timeout(fd, 0, h->rw_timeout, &h->interrupt_callback);
+    if (ret)
+        return ret;
+
+    amqp_maybe_release_buffers(s->conn);
+    broker_reply = amqp_consume_message(s->conn, &envelope, NULL, 0);
+
+    if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL)
+        return AVERROR_EXTERNAL;
+
+    if (envelope.message.body.len > size) {
+        s->pkt_size_overflow = FFMAX(s->pkt_size_overflow, envelope.message.body.len);
+        av_log(h, AV_LOG_WARNING, "Message exceeds space in the buffer. "
+                                  "Message will be truncated. Setting -pkt_size %d "
+                                  "may resolve this issue.\n", s->pkt_size_overflow);
+    }
+    size = FFMIN(size, envelope.message.body.len);
+
+    memcpy(buf, envelope.message.body.bytes, size);
+    amqp_destroy_envelope(&envelope);
+
+    return size;
+}
+
+static int amqp_proto_close(URLContext *h)
+{
+    AMQPContext *s = h->priv_data;
+    amqp_channel_close(s->conn, DEFAULT_CHANNEL, AMQP_REPLY_SUCCESS);
+    amqp_connection_close(s->conn, AMQP_REPLY_SUCCESS);
+    amqp_destroy_connection(s->conn);
+
+    return 0;
+}
+
+static const AVClass amqp_context_class = {
+    .class_name = "amqp",
+    .item_name  = av_default_item_name,
+    .option     = options,
+    .version    = LIBAVUTIL_VERSION_INT,
+};
+
+const URLProtocol ff_libamqp_protocol = {
+    .name            = "amqp",
+    .url_close       = amqp_proto_close,
+    .url_open        = amqp_proto_open,
+    .url_read        = amqp_proto_read,
+    .url_write       = amqp_proto_write,
+    .priv_data_size  = sizeof(AMQPContext),
+    .priv_data_class = &amqp_context_class,
+    .flags           = URL_PROTOCOL_FLAG_NETWORK,
+};
diff --git a/libavformat/protocols.c b/libavformat/protocols.c
index 29fb99e7fa3..f1b8eab0fd6 100644
--- a/libavformat/protocols.c
+++ b/libavformat/protocols.c
@@ -60,6 +60,7 @@  extern const URLProtocol ff_tls_protocol;
 extern const URLProtocol ff_udp_protocol;
 extern const URLProtocol ff_udplite_protocol;
 extern const URLProtocol ff_unix_protocol;
+extern const URLProtocol ff_libamqp_protocol;
 extern const URLProtocol ff_librtmp_protocol;
 extern const URLProtocol ff_librtmpe_protocol;
 extern const URLProtocol ff_librtmps_protocol;
diff --git a/libavformat/version.h b/libavformat/version.h
index 4724269b3c4..a233b673512 100644
--- a/libavformat/version.h
+++ b/libavformat/version.h
@@ -32,8 +32,8 @@ 
 // Major bumping may affect Ticket5467, 5421, 5451(compatibility with Chromium)
 // Also please add any ticket numbers that you believe might be affected here
 #define LIBAVFORMAT_VERSION_MAJOR  58
-#define LIBAVFORMAT_VERSION_MINOR  39
-#define LIBAVFORMAT_VERSION_MICRO 101
+#define LIBAVFORMAT_VERSION_MINOR  40
+#define LIBAVFORMAT_VERSION_MICRO 100
 
 #define LIBAVFORMAT_VERSION_INT AV_VERSION_INT(LIBAVFORMAT_VERSION_MAJOR, \
                                                LIBAVFORMAT_VERSION_MINOR, \