diff mbox series

[FFmpeg-devel] avformat: Add AMQP version 0-9-1 protocol support

Message ID 20200201190214.10457-1-andriy.gelman@gmail.com
State Superseded
Headers show
Series [FFmpeg-devel] avformat: Add AMQP version 0-9-1 protocol support
Related show

Checks

Context Check Description
andriy/ffmpeg-patchwork pending
andriy/ffmpeg-patchwork success Applied patch
andriy/ffmpeg-patchwork success Configure finished
andriy/ffmpeg-patchwork success Make finished
andriy/ffmpeg-patchwork success Make fate finished

Commit Message

Andriy Gelman Feb. 1, 2020, 7:02 p.m. UTC
From: Andriy Gelman <andriy.gelman@gmail.com>

Supports connecting to a RabbitMQ broker via AMQP version 0-9-1. The
broker can redistribute content to other clients based on "exchange" and
"routing_key" fields.
---

Compilation notes:
- Requires librabbitmq-dev package (on ubuntu). 
- The pkg-config libprabbitmq.pc has a corrupt entry. 
  The line "Libs.private: rt; -lpthread" should be changed to 
  "Libs.private: -lrt -lpthread". I have made a bug report.
- Compile FFmpeg with --enable-librabbitmq 

To run an example: 
#
# Start the RabbitMQ broker (I use docker)
# The following starts the broker on localhost:5672. A webui is available on
# localhost:15672 (User/password is "guest" by default)
#
$ docker run -it --rm --name rabbitmq -p 127.0.0.1:5672:5672 -p 127.0.0.1:15672:15672 rabbitmq:3-management

#
# Stream to the RabbitMQ broker:
#
$ ./ffmpeg -re -f lavfi -i yuvtestsrc -codec:v libx264 -f mpegts -routing_key "amqp" -exchange "amq.direct" amqp://localhost:5672

#
# Connect any number of clients to fetch data from the broker:
# The clients are filtered by the routing_key and exchange.
#
$ ./ffplay -routing_key "amqp" -exchange "amq.direct" amqp://localhost:5672


 Changelog               |   1 +
 configure               |   5 +
 doc/general.texi        |   1 +
 doc/protocols.texi      |  53 ++++++++
 libavformat/Makefile    |   1 +
 libavformat/libamqp.c   | 272 ++++++++++++++++++++++++++++++++++++++++
 libavformat/protocols.c |   1 +
 7 files changed, 334 insertions(+)
 create mode 100644 libavformat/libamqp.c

Comments

Marton Balint Feb. 9, 2020, 12:54 p.m. UTC | #1
On Sat, 1 Feb 2020, Andriy Gelman wrote:

> From: Andriy Gelman <andriy.gelman@gmail.com>
>
> Supports connecting to a RabbitMQ broker via AMQP version 0-9-1. The
> broker can redistribute content to other clients based on "exchange" and
> "routing_key" fields.
> ---
>
> Compilation notes:
> - Requires librabbitmq-dev package (on ubuntu). 
> - The pkg-config libprabbitmq.pc has a corrupt entry.
>  The line "Libs.private: rt; -lpthread" should be changed to
>  "Libs.private: -lrt -lpthread". I have made a bug report.
> - Compile FFmpeg with --enable-librabbitmq 
>
> To run an example: 
> #
> # Start the RabbitMQ broker (I use docker)
> # The following starts the broker on localhost:5672. A webui is available on
> # localhost:15672 (User/password is "guest" by default)
> #
> $ docker run -it --rm --name rabbitmq -p 127.0.0.1:5672:5672 -p 127.0.0.1:15672:15672 rabbitmq:3-management
>
> #
> # Stream to the RabbitMQ broker:
> #
> $ ./ffmpeg -re -f lavfi -i yuvtestsrc -codec:v libx264 -f mpegts -routing_key "amqp" -exchange "amq.direct" amqp://localhost:5672
>
> #
> # Connect any number of clients to fetch data from the broker:
> # The clients are filtered by the routing_key and exchange.
> #
> $ ./ffplay -routing_key "amqp" -exchange "amq.direct" amqp://localhost:5672
>
>
> Changelog               |   1 +
> configure               |   5 +
> doc/general.texi        |   1 +
> doc/protocols.texi      |  53 ++++++++
> libavformat/Makefile    |   1 +
> libavformat/libamqp.c   | 272 ++++++++++++++++++++++++++++++++++++++++
> libavformat/protocols.c |   1 +
> 7 files changed, 334 insertions(+)
> create mode 100644 libavformat/libamqp.c
>
> diff --git a/Changelog b/Changelog
> index a4d20a94310..0d2c1dcc2d9 100644
> --- a/Changelog
> +++ b/Changelog
> @@ -33,6 +33,7 @@ version <next>:
> - Argonaut Games ADPCM decoder
> - Argonaut Games ASF demuxer
> - xfade video filter
> +- AMQP protocol (RabbitMQ)
> 
> 
> version 4.2:
> diff --git a/configure b/configure
> index c02dbcc8b23..e421ecb5004 100755
> --- a/configure
> +++ b/configure
> @@ -254,6 +254,7 @@ External library support:
>   --enable-libopenmpt      enable decoding tracked files via libopenmpt [no]
>   --enable-libopus         enable Opus de/encoding via libopus [no]
>   --enable-libpulse        enable Pulseaudio input via libpulse [no]
> +  --enable-librabbitmq     enable rabbitmq library [no]
>   --enable-librav1e        enable AV1 encoding via rav1e [no]
>   --enable-librsvg         enable SVG rasterization via librsvg [no]
>   --enable-librubberband   enable rubberband needed for rubberband filter [no]
> @@ -1786,6 +1787,7 @@ EXTERNAL_LIBRARY_LIST="
>     libopenmpt
>     libopus
>     libpulse
> +    librabbitmq
>     librav1e
>     librsvg
>     librtmp
> @@ -3430,6 +3432,8 @@ unix_protocol_deps="sys_un_h"
> unix_protocol_select="network"
> 
> # external library protocols
> +libamqp_protocol_deps="librabbitmq"
> +libamqp_protocol_select="network"
> librtmp_protocol_deps="librtmp"
> librtmpe_protocol_deps="librtmp"
> librtmps_protocol_deps="librtmp"
> @@ -6305,6 +6309,7 @@ enabled libopus           && {
>     }
> }
> enabled libpulse          && require_pkg_config libpulse libpulse pulse/pulseaudio.h pa_context_new
> +enabled librabbitmq       && require_pkg_config librabbitmq "librabbitmq >= 0.7.1" amqp.h amqp_new_connection
> enabled librav1e          && require_pkg_config librav1e "rav1e >= 0.1.0" rav1e.h rav1e_context_new
> enabled librsvg           && require_pkg_config librsvg librsvg-2.0 librsvg-2.0/librsvg/rsvg.h rsvg_handle_render_cairo
> enabled librtmp           && require_pkg_config librtmp librtmp librtmp/rtmp.h RTMP_Socket
> diff --git a/doc/general.texi b/doc/general.texi
> index 85db50462c2..4057a07632d 100644
> --- a/doc/general.texi
> +++ b/doc/general.texi
> @@ -1326,6 +1326,7 @@ performance on systems without hardware floating point support).
> 
> @multitable @columnfractions .4 .1
> @item Name         @tab Support
> +@item AMQP         @tab X
> @item file         @tab X
> @item FTP          @tab X
> @item Gopher       @tab X
> diff --git a/doc/protocols.texi b/doc/protocols.texi
> index 5e8c97d1649..3d236291e77 100644
> --- a/doc/protocols.texi
> +++ b/doc/protocols.texi
> @@ -51,6 +51,59 @@ in microseconds.
> 
> A description of the currently available protocols follows.
> 
> +@section amqp
> +
> +Advanced Message Queueing Protocol (AMQP) version 0-9-1 is a broker based
> +publish-subscribe communication protocol.
> +
> +FFmpeg must be compiled with --enable-librabbitmq to support AMQP. A separate
> +AMQP broker must also be run. An example open-source AMQP broker is RabbitMQ.
> +
> +When connecting to the broker, a client sets an "exchange" and a "routing key".
> +These keys are used to filter connections: A streaming client will only receive
> +the data that matches their "exchange" and "routing key".
> +
> +After starting the broker, an FFmpeg client may stream data to the broker using
> +the command:
> +
> +@example
> +ffmpeg -re -i input -f mpegts amqp://[user:password@@]hostname:port
> +@end example
> +
> +Where hostname and port is the location of the broker. The client may also
> +set a user/password for authentication. The defaults for both fields are
> +"guest".
> +
> +A separate instance can stream from the broker using the command:
> +@example
> +ffplay amqp://[user:password@@]hostname:port
> +@end example
> +
> +The protocol supports the following options:
> +
> +@table @option
> +
> +@item routing_key
> +Sets the routing key. The default value is "amqp". Clients can
> +only stream data which has the same key. Multiple clients may stream data to the
> +broker with different keys.
> +
> +@item exchange
> +Sets the exchange to use on the broker. The default value is "amqp.direct".  A
> +broker may have multiple exchanges which are configured on the broker side.
> +
> +@item pkt_size
> +Maximum size of each packet sent/received to the broker. Default is 131072.
> +Minimum is 4096 and max is any large value (representable by an int). When
> +receiving packets, this sets an internal buffer size in FFmpeg. It should be
> +equal to or greater than the size of the sent packets to the broker. Otherwise
> +the received message may be truncated causing decoding errors.
> +
> +@item connection_timeout
> +The timeout in milliseconds during the initial connection to the broker.
> +
> +@end table
> +
> @section async
> 
> Asynchronous data filling wrapper for input stream.
> diff --git a/libavformat/Makefile b/libavformat/Makefile
> index ba6ea8c4a62..8889f60cb92 100644
> --- a/libavformat/Makefile
> +++ b/libavformat/Makefile
> @@ -627,6 +627,7 @@ OBJS-$(CONFIG_UDPLITE_PROTOCOL)          += udp.o ip.o
> OBJS-$(CONFIG_UNIX_PROTOCOL)             += unix.o
> 
> # external library protocols
> +OBJS-$(CONFIG_LIBAMQP_PROTOCOL)          += libamqp.o
> OBJS-$(CONFIG_LIBRTMP_PROTOCOL)          += librtmp.o
> OBJS-$(CONFIG_LIBRTMPE_PROTOCOL)         += librtmp.o
> OBJS-$(CONFIG_LIBRTMPS_PROTOCOL)         += librtmp.o
> diff --git a/libavformat/libamqp.c b/libavformat/libamqp.c
> new file mode 100644
> index 00000000000..2b45fdaf193
> --- /dev/null
> +++ b/libavformat/libamqp.c
> @@ -0,0 +1,272 @@
> +/*
> + * AMQP Protocol
> + * Copyright (c) 2020 Andriy Gelman
> + *
> + * This file is part of FFmpeg.
> + *
> + * FFmpeg is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * FFmpeg is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with FFmpeg; if not, write to the Free Software
> + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> + */
> +
> +#include <amqp.h>
> +#include <amqp_tcp_socket.h>
> +#include <sys/time.h>
> +#include "avformat.h"
> +#include "libavutil/avstring.h"
> +#include "libavutil/opt.h"
> +#include "libavutil/time.h"
> +#include "network.h"
> +#include "url.h"
> +
> +typedef struct AMQPContext {
> +    const AVClass *class;
> +    amqp_connection_state_t conn;
> +    amqp_socket_t *socket;
> +    const char *routing_key;
> +    const char *exchange;
> +    int pkt_size;
> +    int connection_timeout;

int64_t

> +    int pkt_size_overflow;
> +} AMQPContext;
> +
> +#define ARRAY_LEN           1024

I'd rather call this STR_LEN or something similar, as this is always used 
as a char array...

> +#define DEFAULT_CHANNEL     1
> +
> +#define OFFSET(x) offsetof(AMQPContext, x)
> +#define D AV_OPT_FLAG_DECODING_PARAM
> +#define E AV_OPT_FLAG_ENCODING_PARAM
> +static const AVOption options[] = {
> +    { "pkt_size", "Maximum send/read packet size", OFFSET(pkt_size), AV_OPT_TYPE_INT, { .i64 = 131072 }, 4096, INT_MAX, .flags = D | E },

This is larger than libzmq. I think for the sake of consistency either 
this should be reduced or libzmq pkt_size should be increased in a 
spearate patch so in the end both messaging protocols have the same 
default.

> +    { "routing_key", "Key to filter streams", OFFSET(routing_key), AV_OPT_TYPE_STRING, { .str = "amqp" }, 0, 0, .flags = D | E },
> +    { "exchange", "Exchange to send/read packets", OFFSET(exchange), AV_OPT_TYPE_STRING, { .str = "amq.direct" }, 0, 0, .flags = D | E },
> +    { "connection_timeout", "Initial connection timeout", OFFSET(connection_timeout), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D | E},

AV_OPT_TYPE_DURATION

> +    { NULL }
> +};
> +
> +static int amqp_proto_open(URLContext *h, const char *uri, int flags)
> +{
> +    int ret, server_msg;
> +    char hostname[ARRAY_LEN], credentials[ARRAY_LEN];
> +    int port;
> +    const char *user, *password;
> +    char *end;
> +    struct timeval tval = { 0 };

Maybe move the timeval declaration to the connection_timeout block.

> +
> +    amqp_rpc_reply_t broker_reply;
> +
> +    AMQPContext *s = h->priv_data;
> +
> +    h->is_streamed     = 1;
> +    h->max_packet_size = s->pkt_size;
> +
> +    av_url_split(NULL, 0, credentials, sizeof(credentials),
> +                 hostname, sizeof(hostname), &port, NULL, 0, uri);

if (port < 0)
     port = 5672;

> +
> +    if (hostname[0] == '\0' || port < 0 || port > 65535 ) {

port <= 0

> +        av_log(h, AV_LOG_ERROR, "Invalid hostname/port\n");
> +        return AVERROR(EIO);

AVERROR(EINVAL)

> +    }
> +
> +    user = av_strtok(credentials, ":", &end);
> +    if (!user)
> +        user = "guest";
> +
> +    password = av_strtok(NULL, ":", &end);
> +    if (!password)
> +        password = "guest";

I don't think strtok is really fit for this splitting, it splits leading 
(and trailing) separators, etc. I'd vote for a good old strchr here. Also 
consider using urldecode for parsing username and password, as the 
password might contain special characters...

> +
> +    s->conn = amqp_new_connection();
> +    if (!s->conn) {
> +        av_log(h, AV_LOG_ERROR, "Error creating connection\n");
> +        return AVERROR_EXTERNAL;
> +    }
> +
> +    s->socket = amqp_tcp_socket_new(s->conn);
> +    if (!s->socket) {
> +        av_log(h, AV_LOG_ERROR, "Error creating socket\n");
> +        goto destroy_connection;
> +    }
> +

For the initial connection maybe a 5 second default timeout is better then
infinity. At least the TCP protocol seems to follow this path. The URL 
context RW timeout might also be considerered, so I suggest something like 
this:

if (s->connection_timeout < 0)
     s->connection_timeout = (h->rw_timeout > 0 ? h->rw_timeout : 5000000);

> +    if (s->connection_timeout > 0) {
> +        tval.tv_sec  = s->connection_timeout / 1000000;
> +        tval.tv_usec = s->connection_timeout % 1000000;
> +        ret = amqp_socket_open_noblock(s->socket, hostname, port, &tval);
> +    }
> +    else
> +        ret = amqp_socket_open_noblock(s->socket, hostname, port, NULL);
> +
> +    if (ret) {
> +        av_log(h, AV_LOG_ERROR, "Error connecting to server\n");
> +        goto destroy_connection;
> +    }
> +
> +    broker_reply = amqp_login(s->conn, "/", 0, s->pkt_size, 0,
> +                              AMQP_SASL_METHOD_PLAIN, user, password);
> +
> +    if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
> +        av_log(h, AV_LOG_ERROR, "Error login\n");
> +        server_msg = AMQP_ACCESS_REFUSED;
> +        goto close_connection;
> +    }
> +
> +    amqp_channel_open(s->conn, DEFAULT_CHANNEL);
> +    broker_reply = amqp_get_rpc_reply(s->conn);
> +
> +    if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
> +        av_log(h, AV_LOG_ERROR, "Error set channel\n");
> +        server_msg = AMQP_CHANNEL_ERROR;
> +        goto close_connection;
> +    }
> +
> +    if (h->flags & AVIO_FLAG_READ) {
> +        amqp_bytes_t queuename;
> +        char queuename_buff[ARRAY_LEN];
> +        amqp_queue_declare_ok_t *r;
> +
> +        r = amqp_queue_declare(s->conn, DEFAULT_CHANNEL, amqp_empty_bytes,
> +                               0, 0, 0, 1, amqp_empty_table);
> +        broker_reply = amqp_get_rpc_reply(s->conn);
> +        if (!r || broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
> +            av_log(h, AV_LOG_ERROR, "Error declare queue\n");
> +            server_msg = AMQP_RESOURCE_ERROR;
> +            goto close_channel;
> +        }
> +
> +        /* backup queuename */

/* store queuename */

> +        queuename.bytes = queuename_buff;
> +        queuename.len = FFMIN(r->queue.len, ARRAY_LEN);
> +        memcpy(queuename.bytes, r->queue.bytes, queuename.len);
> +
> +        amqp_queue_bind(s->conn, DEFAULT_CHANNEL, queuename, amqp_cstring_bytes(s->exchange),
> +                        amqp_cstring_bytes(s->routing_key), amqp_empty_table);
> +
> +        broker_reply = amqp_get_rpc_reply(s->conn);
> +        if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
> +            av_log(h, AV_LOG_ERROR, "Queue bind error\n");
> +            server_msg = AMQP_INTERNAL_ERROR;
> +            goto close_channel;
> +        }
> +
> +        amqp_basic_consume(s->conn, DEFAULT_CHANNEL, queuename, amqp_empty_bytes,
> +                           0, 1, 0, amqp_empty_table);
> +
> +        broker_reply = amqp_get_rpc_reply(s->conn);
> +        if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
> +            av_log(h, AV_LOG_ERROR, "Set consume error\n");
> +            server_msg = AMQP_INTERNAL_ERROR;
> +            goto close_channel;
> +        }
> +    }
> +
> +    return 0;
> +
> +close_channel:
> +    amqp_channel_close(s->conn, DEFAULT_CHANNEL, server_msg);
> +close_connection:
> +    amqp_connection_close(s->conn, server_msg);
> +destroy_connection:
> +    amqp_destroy_connection(s->conn);
> +
> +    return AVERROR_EXTERNAL;
> +}
> +
> +static int amqp_proto_write(URLContext *h, const unsigned char *buf, int size)
> +{
> +    int ret;
> +    AMQPContext *s = h->priv_data;
> +    int fd = amqp_socket_get_sockfd(s->socket);
> +
> +    amqp_bytes_t message = { size, (void *)buf };
> +    amqp_basic_properties_t props;
> +
> +    ret = ff_network_wait_fd_timeout(fd, 1, h->rw_timeout, &h->interrupt_callback);
> +    if (ret)
> +        return ret;
> +
> +    props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
> +    props.content_type = amqp_cstring_bytes("octet/stream");
> +    props.delivery_mode = 2; /* persistent delivery mode */
> +
> +    ret = amqp_basic_publish(s->conn, DEFAULT_CHANNEL, amqp_cstring_bytes(s->exchange),
> +                             amqp_cstring_bytes(s->routing_key), 0, 0,
> +                             &props, message);
> +
> +    if (ret) {
> +        av_log(h, AV_LOG_ERROR, "Error publish\n");
> +        return AVERROR_EXTERNAL;
> +    }
> +
> +    return size;
> +}
> +
> +static int amqp_proto_read(URLContext *h, unsigned char *buf, int size)
> +{
> +    AMQPContext *s = h->priv_data;
> +    int fd = amqp_socket_get_sockfd(s->socket);
> +    int ret;
> +
> +    amqp_rpc_reply_t broker_reply;
> +    amqp_envelope_t envelope;
> +
> +    ret = ff_network_wait_fd_timeout(fd, 0, h->rw_timeout, &h->interrupt_callback);
> +    if (ret)
> +        return ret;
> +
> +    amqp_maybe_release_buffers(s->conn);
> +    broker_reply = amqp_consume_message(s->conn, &envelope, NULL, 0);
> +
> +    if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL)
> +        return AVERROR_EXTERNAL;
> +
> +    if (envelope.message.body.len > size) {
> +        s->pkt_size_overflow = FFMAX(s->pkt_size_overflow, envelope.message.body.len);
> +        av_log(h, AV_LOG_WARNING, "Message exceeds space in the buffer. "
> +                                  "Message will be truncated. Setting -pkt_size %d "
> +                                  "may resolve the issue.\n", s->pkt_size_overflow);
> +        envelope.message.body.len = size;
> +    }
> +
> +    memcpy(buf, envelope.message.body.bytes, envelope.message.body.len);
> +    amqp_destroy_envelope(&envelope);
> +
> +    return envelope.message.body.len;

Instead of overriding envelope.message.body.len maybe simply do a
size = FFMIN(size, envelope.message.body.len);
and then use size everywhere. It is a bit ugly to overwrite fields in the 
evenlope struct...

> +}
> +
> +static int amqp_proto_close(URLContext *h)
> +{
> +    AMQPContext *s = h->priv_data;
> +    amqp_channel_close(s->conn, DEFAULT_CHANNEL, AMQP_REPLY_SUCCESS);
> +    amqp_connection_close(s->conn, AMQP_REPLY_SUCCESS);
> +    amqp_destroy_connection(s->conn);
> +
> +    return 0;
> +}
> +
> +static const AVClass amqp_context_class = {
> +    .class_name = "amqp",
> +    .item_name  = av_default_item_name,
> +    .option     = options,
> +    .version    = LIBAVUTIL_VERSION_INT,
> +};
> +
> +const URLProtocol ff_libamqp_protocol = {
> +    .name            = "amqp",
> +    .url_close       = amqp_proto_close,
> +    .url_open        = amqp_proto_open,
> +    .url_read        = amqp_proto_read,
> +    .url_write       = amqp_proto_write,
> +    .priv_data_size  = sizeof(AMQPContext),
> +    .priv_data_class = &amqp_context_class,
> +    .flags           = URL_PROTOCOL_FLAG_NETWORK,
> +};
> diff --git a/libavformat/protocols.c b/libavformat/protocols.c
> index 29fb99e7fa3..f1b8eab0fd6 100644
> --- a/libavformat/protocols.c
> +++ b/libavformat/protocols.c
> @@ -60,6 +60,7 @@ extern const URLProtocol ff_tls_protocol;
> extern const URLProtocol ff_udp_protocol;
> extern const URLProtocol ff_udplite_protocol;
> extern const URLProtocol ff_unix_protocol;
> +extern const URLProtocol ff_libamqp_protocol;
> extern const URLProtocol ff_librtmp_protocol;
> extern const URLProtocol ff_librtmpe_protocol;
> extern const URLProtocol ff_librtmps_protocol;
> --

Regards,
Marton
Timo Rothenpieler Feb. 9, 2020, 1:32 p.m. UTC | #2
On 01.02.2020 20:02, Andriy Gelman wrote:
> From: Andriy Gelman <andriy.gelman@gmail.com>
> 
> Supports connecting to a RabbitMQ broker via AMQP version 0-9-1. The
> broker can redistribute content to other clients based on "exchange" and
> "routing_key" fields.
> ---
> 
> Compilation notes:
> - Requires librabbitmq-dev package (on ubuntu).
> - The pkg-config libprabbitmq.pc has a corrupt entry.
>    The line "Libs.private: rt; -lpthread" should be changed to
>    "Libs.private: -lrt -lpthread". I have made a bug report.
> - Compile FFmpeg with --enable-librabbitmq
> 
> To run an example:
> #
> # Start the RabbitMQ broker (I use docker)
> # The following starts the broker on localhost:5672. A webui is available on
> # localhost:15672 (User/password is "guest" by default)
> #
> $ docker run -it --rm --name rabbitmq -p 127.0.0.1:5672:5672 -p 127.0.0.1:15672:15672 rabbitmq:3-management
> 
> #
> # Stream to the RabbitMQ broker:
> #
> $ ./ffmpeg -re -f lavfi -i yuvtestsrc -codec:v libx264 -f mpegts -routing_key "amqp" -exchange "amq.direct" amqp://localhost:5672
> 
> #
> # Connect any number of clients to fetch data from the broker:
> # The clients are filtered by the routing_key and exchange.
> #
> $ ./ffplay -routing_key "amqp" -exchange "amq.direct" amqp://localhost:5672
> 

Isn't RabbitMQ, and any message broker like it, more designed to pipe 
short messages around?
I'd imagine it's really not designed to shovel huge amounts of data, 
like a full on video stream, around.
Jean-Baptiste Kempf Feb. 9, 2020, 9:58 p.m. UTC | #3
On Sun, Feb 9, 2020, at 14:32, Timo Rothenpieler wrote:
> On 01.02.2020 20:02, Andriy Gelman wrote:
> > From: Andriy Gelman <andriy.gelman@gmail.com>
> > 
> > Supports connecting to a RabbitMQ broker via AMQP version 0-9-1. The
> > broker can redistribute content to other clients based on "exchange" and
> > "routing_key" fields.
> > ---
> > 
> > Compilation notes:
> > - Requires librabbitmq-dev package (on ubuntu).
> > - The pkg-config libprabbitmq.pc has a corrupt entry.
> >    The line "Libs.private: rt; -lpthread" should be changed to
> >    "Libs.private: -lrt -lpthread". I have made a bug report.
> > - Compile FFmpeg with --enable-librabbitmq
> > 
> > To run an example:
> > #
> > # Start the RabbitMQ broker (I use docker)
> > # The following starts the broker on localhost:5672. A webui is available on
> > # localhost:15672 (User/password is "guest" by default)
> > #
> > $ docker run -it --rm --name rabbitmq -p 127.0.0.1:5672:5672 -p 127.0.0.1:15672:15672 rabbitmq:3-management
> > 
> > #
> > # Stream to the RabbitMQ broker:
> > #
> > $ ./ffmpeg -re -f lavfi -i yuvtestsrc -codec:v libx264 -f mpegts -routing_key "amqp" -exchange "amq.direct" amqp://localhost:5672
> > 
> > #
> > # Connect any number of clients to fetch data from the broker:
> > # The clients are filtered by the routing_key and exchange.
> > #
> > $ ./ffplay -routing_key "amqp" -exchange "amq.direct" amqp://localhost:5672
> > 
> 
> Isn't RabbitMQ, and any message broker like it, more designed to pipe 
> short messages around?
> I'd imagine it's really not designed to shovel huge amounts of data, 
> like a full on video stream, around.

Tbh, it's not a standard protocol for multimedia. It's not a format either.

I have a hard time seeing what this is doing in libavformat.
Andriy Gelman Feb. 9, 2020, 10:30 p.m. UTC | #4
On Sun, 09. Feb 14:32, Timo Rothenpieler wrote:
> On 01.02.2020 20:02, Andriy Gelman wrote:
> > From: Andriy Gelman <andriy.gelman@gmail.com>
> > 
> > Supports connecting to a RabbitMQ broker via AMQP version 0-9-1. The
> > broker can redistribute content to other clients based on "exchange" and
> > "routing_key" fields.
> > ---
> > 
> > Compilation notes:
> > - Requires librabbitmq-dev package (on ubuntu).
> > - The pkg-config libprabbitmq.pc has a corrupt entry.
> >    The line "Libs.private: rt; -lpthread" should be changed to
> >    "Libs.private: -lrt -lpthread". I have made a bug report.
> > - Compile FFmpeg with --enable-librabbitmq
> > 
> > To run an example:
> > #
> > # Start the RabbitMQ broker (I use docker)
> > # The following starts the broker on localhost:5672. A webui is available on
> > # localhost:15672 (User/password is "guest" by default)
> > #
> > $ docker run -it --rm --name rabbitmq -p 127.0.0.1:5672:5672 -p 127.0.0.1:15672:15672 rabbitmq:3-management
> > 
> > #
> > # Stream to the RabbitMQ broker:
> > #
> > $ ./ffmpeg -re -f lavfi -i yuvtestsrc -codec:v libx264 -f mpegts -routing_key "amqp" -exchange "amq.direct" amqp://localhost:5672
> > 
> > #
> > # Connect any number of clients to fetch data from the broker:
> > # The clients are filtered by the routing_key and exchange.
> > #
> > $ ./ffplay -routing_key "amqp" -exchange "amq.direct" amqp://localhost:5672
> > 

> 
> Isn't RabbitMQ, and any message broker like it, more designed to pipe short
> messages around?
> I'd imagine it's really not designed to shovel huge amounts of data, like a
> full on video stream, around.

I didn't have any problems streaming to/from the broker, but I have to evaluate
at which point it breaks. I'll share these numbers with ffmpeg-devel.

Overall the idea is that you have a central broker that deals with these short
messages but also the high rate streams. You don't have to run a separate video
server and everything is logged at one place.
Andriy Gelman Feb. 9, 2020, 10:43 p.m. UTC | #5
On Sun, 09. Feb 22:58, Jean-Baptiste Kempf wrote:
> 
> 
> On Sun, Feb 9, 2020, at 14:32, Timo Rothenpieler wrote:
> > On 01.02.2020 20:02, Andriy Gelman wrote:
> > > From: Andriy Gelman <andriy.gelman@gmail.com>
> > > 
> > > Supports connecting to a RabbitMQ broker via AMQP version 0-9-1. The
> > > broker can redistribute content to other clients based on "exchange" and
> > > "routing_key" fields.
> > > ---
> > > 
> > > Compilation notes:
> > > - Requires librabbitmq-dev package (on ubuntu).
> > > - The pkg-config libprabbitmq.pc has a corrupt entry.
> > >    The line "Libs.private: rt; -lpthread" should be changed to
> > >    "Libs.private: -lrt -lpthread". I have made a bug report.
> > > - Compile FFmpeg with --enable-librabbitmq
> > > 
> > > To run an example:
> > > #
> > > # Start the RabbitMQ broker (I use docker)
> > > # The following starts the broker on localhost:5672. A webui is available on
> > > # localhost:15672 (User/password is "guest" by default)
> > > #
> > > $ docker run -it --rm --name rabbitmq -p 127.0.0.1:5672:5672 -p 127.0.0.1:15672:15672 rabbitmq:3-management
> > > 
> > > #
> > > # Stream to the RabbitMQ broker:
> > > #
> > > $ ./ffmpeg -re -f lavfi -i yuvtestsrc -codec:v libx264 -f mpegts -routing_key "amqp" -exchange "amq.direct" amqp://localhost:5672
> > > 
> > > #
> > > # Connect any number of clients to fetch data from the broker:
> > > # The clients are filtered by the routing_key and exchange.
> > > #
> > > $ ./ffplay -routing_key "amqp" -exchange "amq.direct" amqp://localhost:5672
> > > 
> > 
> > Isn't RabbitMQ, and any message broker like it, more designed to pipe 
> > short messages around?
> > I'd imagine it's really not designed to shovel huge amounts of data, 
> > like a full on video stream, around.

> 
> Tbh, it's not a standard protocol for multimedia. It's not a format either.
> 

Sure, I understand it's quite niche. 

> I have a hard time seeing what this is doing in libavformat.
> 

ok np
Andriy Gelman Feb. 9, 2020, 10:50 p.m. UTC | #6
On Sun, 09. Feb 13:54, Marton Balint wrote:
> 
> 
> On Sat, 1 Feb 2020, Andriy Gelman wrote:
> 
> > From: Andriy Gelman <andriy.gelman@gmail.com>
> > 
> > Supports connecting to a RabbitMQ broker via AMQP version 0-9-1. The
> > broker can redistribute content to other clients based on "exchange" and
> > "routing_key" fields.
> > ---
> > 
> > Compilation notes:
> > - Requires librabbitmq-dev package (on ubuntu). - The pkg-config
> > libprabbitmq.pc has a corrupt entry.
> >  The line "Libs.private: rt; -lpthread" should be changed to
> >  "Libs.private: -lrt -lpthread". I have made a bug report.
> > - Compile FFmpeg with --enable-librabbitmq
> > 
> > To run an example: #
> > # Start the RabbitMQ broker (I use docker)
> > # The following starts the broker on localhost:5672. A webui is available on
> > # localhost:15672 (User/password is "guest" by default)
> > #
> > $ docker run -it --rm --name rabbitmq -p 127.0.0.1:5672:5672 -p 127.0.0.1:15672:15672 rabbitmq:3-management
> > 
> > #
> > # Stream to the RabbitMQ broker:
> > #
> > $ ./ffmpeg -re -f lavfi -i yuvtestsrc -codec:v libx264 -f mpegts -routing_key "amqp" -exchange "amq.direct" amqp://localhost:5672
> > 
> > #
> > # Connect any number of clients to fetch data from the broker:
> > # The clients are filtered by the routing_key and exchange.
> > #
> > $ ./ffplay -routing_key "amqp" -exchange "amq.direct" amqp://localhost:5672
> > 
> > 
> > Changelog               |   1 +
> > configure               |   5 +
> > doc/general.texi        |   1 +
> > doc/protocols.texi      |  53 ++++++++
> > libavformat/Makefile    |   1 +
> > libavformat/libamqp.c   | 272 ++++++++++++++++++++++++++++++++++++++++
> > libavformat/protocols.c |   1 +
> > 7 files changed, 334 insertions(+)
> > create mode 100644 libavformat/libamqp.c
> > 
> > diff --git a/Changelog b/Changelog
> > index a4d20a94310..0d2c1dcc2d9 100644
> > --- a/Changelog
> > +++ b/Changelog
> > @@ -33,6 +33,7 @@ version <next>:
> > - Argonaut Games ADPCM decoder
> > - Argonaut Games ASF demuxer
> > - xfade video filter
> > +- AMQP protocol (RabbitMQ)
> > 
> > 
> > version 4.2:
> > diff --git a/configure b/configure
> > index c02dbcc8b23..e421ecb5004 100755
> > --- a/configure
> > +++ b/configure
> > @@ -254,6 +254,7 @@ External library support:
> >   --enable-libopenmpt      enable decoding tracked files via libopenmpt [no]
> >   --enable-libopus         enable Opus de/encoding via libopus [no]
> >   --enable-libpulse        enable Pulseaudio input via libpulse [no]
> > +  --enable-librabbitmq     enable rabbitmq library [no]
> >   --enable-librav1e        enable AV1 encoding via rav1e [no]
> >   --enable-librsvg         enable SVG rasterization via librsvg [no]
> >   --enable-librubberband   enable rubberband needed for rubberband filter [no]
> > @@ -1786,6 +1787,7 @@ EXTERNAL_LIBRARY_LIST="
> >     libopenmpt
> >     libopus
> >     libpulse
> > +    librabbitmq
> >     librav1e
> >     librsvg
> >     librtmp
> > @@ -3430,6 +3432,8 @@ unix_protocol_deps="sys_un_h"
> > unix_protocol_select="network"
> > 
> > # external library protocols
> > +libamqp_protocol_deps="librabbitmq"
> > +libamqp_protocol_select="network"
> > librtmp_protocol_deps="librtmp"
> > librtmpe_protocol_deps="librtmp"
> > librtmps_protocol_deps="librtmp"
> > @@ -6305,6 +6309,7 @@ enabled libopus           && {
> >     }
> > }
> > enabled libpulse          && require_pkg_config libpulse libpulse pulse/pulseaudio.h pa_context_new
> > +enabled librabbitmq       && require_pkg_config librabbitmq "librabbitmq >= 0.7.1" amqp.h amqp_new_connection
> > enabled librav1e          && require_pkg_config librav1e "rav1e >= 0.1.0" rav1e.h rav1e_context_new
> > enabled librsvg           && require_pkg_config librsvg librsvg-2.0 librsvg-2.0/librsvg/rsvg.h rsvg_handle_render_cairo
> > enabled librtmp           && require_pkg_config librtmp librtmp librtmp/rtmp.h RTMP_Socket
> > diff --git a/doc/general.texi b/doc/general.texi
> > index 85db50462c2..4057a07632d 100644
> > --- a/doc/general.texi
> > +++ b/doc/general.texi
> > @@ -1326,6 +1326,7 @@ performance on systems without hardware floating point support).
> > 
> > @multitable @columnfractions .4 .1
> > @item Name         @tab Support
> > +@item AMQP         @tab X
> > @item file         @tab X
> > @item FTP          @tab X
> > @item Gopher       @tab X
> > diff --git a/doc/protocols.texi b/doc/protocols.texi
> > index 5e8c97d1649..3d236291e77 100644
> > --- a/doc/protocols.texi
> > +++ b/doc/protocols.texi
> > @@ -51,6 +51,59 @@ in microseconds.
> > 
> > A description of the currently available protocols follows.
> > 
> > +@section amqp
> > +
> > +Advanced Message Queueing Protocol (AMQP) version 0-9-1 is a broker based
> > +publish-subscribe communication protocol.
> > +
> > +FFmpeg must be compiled with --enable-librabbitmq to support AMQP. A separate
> > +AMQP broker must also be run. An example open-source AMQP broker is RabbitMQ.
> > +
> > +When connecting to the broker, a client sets an "exchange" and a "routing key".
> > +These keys are used to filter connections: A streaming client will only receive
> > +the data that matches their "exchange" and "routing key".
> > +
> > +After starting the broker, an FFmpeg client may stream data to the broker using
> > +the command:
> > +
> > +@example
> > +ffmpeg -re -i input -f mpegts amqp://[user:password@@]hostname:port
> > +@end example
> > +
> > +Where hostname and port is the location of the broker. The client may also
> > +set a user/password for authentication. The defaults for both fields are
> > +"guest".
> > +
> > +A separate instance can stream from the broker using the command:
> > +@example
> > +ffplay amqp://[user:password@@]hostname:port
> > +@end example
> > +
> > +The protocol supports the following options:
> > +
> > +@table @option
> > +
> > +@item routing_key
> > +Sets the routing key. The default value is "amqp". Clients can
> > +only stream data which has the same key. Multiple clients may stream data to the
> > +broker with different keys.
> > +
> > +@item exchange
> > +Sets the exchange to use on the broker. The default value is "amqp.direct".  A
> > +broker may have multiple exchanges which are configured on the broker side.
> > +
> > +@item pkt_size
> > +Maximum size of each packet sent/received to the broker. Default is 131072.
> > +Minimum is 4096 and max is any large value (representable by an int). When
> > +receiving packets, this sets an internal buffer size in FFmpeg. It should be
> > +equal to or greater than the size of the sent packets to the broker. Otherwise
> > +the received message may be truncated causing decoding errors.
> > +
> > +@item connection_timeout
> > +The timeout in milliseconds during the initial connection to the broker.
> > +
> > +@end table
> > +
> > @section async
> > 
> > Asynchronous data filling wrapper for input stream.
> > diff --git a/libavformat/Makefile b/libavformat/Makefile
> > index ba6ea8c4a62..8889f60cb92 100644
> > --- a/libavformat/Makefile
> > +++ b/libavformat/Makefile
> > @@ -627,6 +627,7 @@ OBJS-$(CONFIG_UDPLITE_PROTOCOL)          += udp.o ip.o
> > OBJS-$(CONFIG_UNIX_PROTOCOL)             += unix.o
> > 
> > # external library protocols
> > +OBJS-$(CONFIG_LIBAMQP_PROTOCOL)          += libamqp.o
> > OBJS-$(CONFIG_LIBRTMP_PROTOCOL)          += librtmp.o
> > OBJS-$(CONFIG_LIBRTMPE_PROTOCOL)         += librtmp.o
> > OBJS-$(CONFIG_LIBRTMPS_PROTOCOL)         += librtmp.o
> > diff --git a/libavformat/libamqp.c b/libavformat/libamqp.c
> > new file mode 100644
> > index 00000000000..2b45fdaf193
> > --- /dev/null
> > +++ b/libavformat/libamqp.c
> > @@ -0,0 +1,272 @@
> > +/*
> > + * AMQP Protocol
> > + * Copyright (c) 2020 Andriy Gelman
> > + *
> > + * This file is part of FFmpeg.
> > + *
> > + * FFmpeg is free software; you can redistribute it and/or
> > + * modify it under the terms of the GNU Lesser General Public
> > + * License as published by the Free Software Foundation; either
> > + * version 2.1 of the License, or (at your option) any later version.
> > + *
> > + * FFmpeg is distributed in the hope that it will be useful,
> > + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> > + * Lesser General Public License for more details.
> > + *
> > + * You should have received a copy of the GNU Lesser General Public
> > + * License along with FFmpeg; if not, write to the Free Software
> > + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> > + */
> > +
> > +#include <amqp.h>
> > +#include <amqp_tcp_socket.h>
> > +#include <sys/time.h>
> > +#include "avformat.h"
> > +#include "libavutil/avstring.h"
> > +#include "libavutil/opt.h"
> > +#include "libavutil/time.h"
> > +#include "network.h"
> > +#include "url.h"
> > +
> > +typedef struct AMQPContext {
> > +    const AVClass *class;
> > +    amqp_connection_state_t conn;
> > +    amqp_socket_t *socket;
> > +    const char *routing_key;
> > +    const char *exchange;
> > +    int pkt_size;
> > +    int connection_timeout;
> 
> int64_t
> 
> > +    int pkt_size_overflow;
> > +} AMQPContext;
> > +
> > +#define ARRAY_LEN           1024
> 
> I'd rather call this STR_LEN or something similar, as this is always used as
> a char array...
> 
> > +#define DEFAULT_CHANNEL     1
> > +
> > +#define OFFSET(x) offsetof(AMQPContext, x)
> > +#define D AV_OPT_FLAG_DECODING_PARAM
> > +#define E AV_OPT_FLAG_ENCODING_PARAM
> > +static const AVOption options[] = {
> > +    { "pkt_size", "Maximum send/read packet size", OFFSET(pkt_size), AV_OPT_TYPE_INT, { .i64 = 131072 }, 4096, INT_MAX, .flags = D | E },
> 
> This is larger than libzmq. I think for the sake of consistency either this
> should be reduced or libzmq pkt_size should be increased in a spearate patch
> so in the end both messaging protocols have the same default.
> 
> > +    { "routing_key", "Key to filter streams", OFFSET(routing_key), AV_OPT_TYPE_STRING, { .str = "amqp" }, 0, 0, .flags = D | E },
> > +    { "exchange", "Exchange to send/read packets", OFFSET(exchange), AV_OPT_TYPE_STRING, { .str = "amq.direct" }, 0, 0, .flags = D | E },
> > +    { "connection_timeout", "Initial connection timeout", OFFSET(connection_timeout), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D | E},
> 
> AV_OPT_TYPE_DURATION
> 
> > +    { NULL }
> > +};
> > +
> > +static int amqp_proto_open(URLContext *h, const char *uri, int flags)
> > +{
> > +    int ret, server_msg;
> > +    char hostname[ARRAY_LEN], credentials[ARRAY_LEN];
> > +    int port;
> > +    const char *user, *password;
> > +    char *end;
> > +    struct timeval tval = { 0 };
> 
> Maybe move the timeval declaration to the connection_timeout block.
> 
> > +
> > +    amqp_rpc_reply_t broker_reply;
> > +
> > +    AMQPContext *s = h->priv_data;
> > +
> > +    h->is_streamed     = 1;
> > +    h->max_packet_size = s->pkt_size;
> > +
> > +    av_url_split(NULL, 0, credentials, sizeof(credentials),
> > +                 hostname, sizeof(hostname), &port, NULL, 0, uri);
> 
> if (port < 0)
>     port = 5672;
> 
> > +
> > +    if (hostname[0] == '\0' || port < 0 || port > 65535 ) {
> 
> port <= 0
> 
> > +        av_log(h, AV_LOG_ERROR, "Invalid hostname/port\n");
> > +        return AVERROR(EIO);
> 
> AVERROR(EINVAL)
> 
> > +    }
> > +
> > +    user = av_strtok(credentials, ":", &end);
> > +    if (!user)
> > +        user = "guest";
> > +
> > +    password = av_strtok(NULL, ":", &end);
> > +    if (!password)
> > +        password = "guest";
> 
> I don't think strtok is really fit for this splitting, it splits leading
> (and trailing) separators, etc. I'd vote for a good old strchr here. Also
> consider using urldecode for parsing username and password, as the password
> might contain special characters...
> 
> > +
> > +    s->conn = amqp_new_connection();
> > +    if (!s->conn) {
> > +        av_log(h, AV_LOG_ERROR, "Error creating connection\n");
> > +        return AVERROR_EXTERNAL;
> > +    }
> > +
> > +    s->socket = amqp_tcp_socket_new(s->conn);
> > +    if (!s->socket) {
> > +        av_log(h, AV_LOG_ERROR, "Error creating socket\n");
> > +        goto destroy_connection;
> > +    }
> > +
> 
> For the initial connection maybe a 5 second default timeout is better then
> infinity. At least the TCP protocol seems to follow this path. The URL
> context RW timeout might also be considerered, so I suggest something like
> this:
> 
> if (s->connection_timeout < 0)
>     s->connection_timeout = (h->rw_timeout > 0 ? h->rw_timeout : 5000000);
> 
> > +    if (s->connection_timeout > 0) {
> > +        tval.tv_sec  = s->connection_timeout / 1000000;
> > +        tval.tv_usec = s->connection_timeout % 1000000;
> > +        ret = amqp_socket_open_noblock(s->socket, hostname, port, &tval);
> > +    }
> > +    else
> > +        ret = amqp_socket_open_noblock(s->socket, hostname, port, NULL);
> > +
> > +    if (ret) {
> > +        av_log(h, AV_LOG_ERROR, "Error connecting to server\n");
> > +        goto destroy_connection;
> > +    }
> > +
> > +    broker_reply = amqp_login(s->conn, "/", 0, s->pkt_size, 0,
> > +                              AMQP_SASL_METHOD_PLAIN, user, password);
> > +
> > +    if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
> > +        av_log(h, AV_LOG_ERROR, "Error login\n");
> > +        server_msg = AMQP_ACCESS_REFUSED;
> > +        goto close_connection;
> > +    }
> > +
> > +    amqp_channel_open(s->conn, DEFAULT_CHANNEL);
> > +    broker_reply = amqp_get_rpc_reply(s->conn);
> > +
> > +    if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
> > +        av_log(h, AV_LOG_ERROR, "Error set channel\n");
> > +        server_msg = AMQP_CHANNEL_ERROR;
> > +        goto close_connection;
> > +    }
> > +
> > +    if (h->flags & AVIO_FLAG_READ) {
> > +        amqp_bytes_t queuename;
> > +        char queuename_buff[ARRAY_LEN];
> > +        amqp_queue_declare_ok_t *r;
> > +
> > +        r = amqp_queue_declare(s->conn, DEFAULT_CHANNEL, amqp_empty_bytes,
> > +                               0, 0, 0, 1, amqp_empty_table);
> > +        broker_reply = amqp_get_rpc_reply(s->conn);
> > +        if (!r || broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
> > +            av_log(h, AV_LOG_ERROR, "Error declare queue\n");
> > +            server_msg = AMQP_RESOURCE_ERROR;
> > +            goto close_channel;
> > +        }
> > +
> > +        /* backup queuename */
> 
> /* store queuename */
> 
> > +        queuename.bytes = queuename_buff;
> > +        queuename.len = FFMIN(r->queue.len, ARRAY_LEN);
> > +        memcpy(queuename.bytes, r->queue.bytes, queuename.len);
> > +
> > +        amqp_queue_bind(s->conn, DEFAULT_CHANNEL, queuename, amqp_cstring_bytes(s->exchange),
> > +                        amqp_cstring_bytes(s->routing_key), amqp_empty_table);
> > +
> > +        broker_reply = amqp_get_rpc_reply(s->conn);
> > +        if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
> > +            av_log(h, AV_LOG_ERROR, "Queue bind error\n");
> > +            server_msg = AMQP_INTERNAL_ERROR;
> > +            goto close_channel;
> > +        }
> > +
> > +        amqp_basic_consume(s->conn, DEFAULT_CHANNEL, queuename, amqp_empty_bytes,
> > +                           0, 1, 0, amqp_empty_table);
> > +
> > +        broker_reply = amqp_get_rpc_reply(s->conn);
> > +        if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
> > +            av_log(h, AV_LOG_ERROR, "Set consume error\n");
> > +            server_msg = AMQP_INTERNAL_ERROR;
> > +            goto close_channel;
> > +        }
> > +    }
> > +
> > +    return 0;
> > +
> > +close_channel:
> > +    amqp_channel_close(s->conn, DEFAULT_CHANNEL, server_msg);
> > +close_connection:
> > +    amqp_connection_close(s->conn, server_msg);
> > +destroy_connection:
> > +    amqp_destroy_connection(s->conn);
> > +
> > +    return AVERROR_EXTERNAL;
> > +}
> > +
> > +static int amqp_proto_write(URLContext *h, const unsigned char *buf, int size)
> > +{
> > +    int ret;
> > +    AMQPContext *s = h->priv_data;
> > +    int fd = amqp_socket_get_sockfd(s->socket);
> > +
> > +    amqp_bytes_t message = { size, (void *)buf };
> > +    amqp_basic_properties_t props;
> > +
> > +    ret = ff_network_wait_fd_timeout(fd, 1, h->rw_timeout, &h->interrupt_callback);
> > +    if (ret)
> > +        return ret;
> > +
> > +    props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
> > +    props.content_type = amqp_cstring_bytes("octet/stream");
> > +    props.delivery_mode = 2; /* persistent delivery mode */
> > +
> > +    ret = amqp_basic_publish(s->conn, DEFAULT_CHANNEL, amqp_cstring_bytes(s->exchange),
> > +                             amqp_cstring_bytes(s->routing_key), 0, 0,
> > +                             &props, message);
> > +
> > +    if (ret) {
> > +        av_log(h, AV_LOG_ERROR, "Error publish\n");
> > +        return AVERROR_EXTERNAL;
> > +    }
> > +
> > +    return size;
> > +}
> > +
> > +static int amqp_proto_read(URLContext *h, unsigned char *buf, int size)
> > +{
> > +    AMQPContext *s = h->priv_data;
> > +    int fd = amqp_socket_get_sockfd(s->socket);
> > +    int ret;
> > +
> > +    amqp_rpc_reply_t broker_reply;
> > +    amqp_envelope_t envelope;
> > +
> > +    ret = ff_network_wait_fd_timeout(fd, 0, h->rw_timeout, &h->interrupt_callback);
> > +    if (ret)
> > +        return ret;
> > +
> > +    amqp_maybe_release_buffers(s->conn);
> > +    broker_reply = amqp_consume_message(s->conn, &envelope, NULL, 0);
> > +
> > +    if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL)
> > +        return AVERROR_EXTERNAL;
> > +
> > +    if (envelope.message.body.len > size) {
> > +        s->pkt_size_overflow = FFMAX(s->pkt_size_overflow, envelope.message.body.len);
> > +        av_log(h, AV_LOG_WARNING, "Message exceeds space in the buffer. "
> > +                                  "Message will be truncated. Setting -pkt_size %d "
> > +                                  "may resolve the issue.\n", s->pkt_size_overflow);
> > +        envelope.message.body.len = size;
> > +    }
> > +
> > +    memcpy(buf, envelope.message.body.bytes, envelope.message.body.len);
> > +    amqp_destroy_envelope(&envelope);
> > +
> > +    return envelope.message.body.len;
> 
> Instead of overriding envelope.message.body.len maybe simply do a
> size = FFMIN(size, envelope.message.body.len);
> and then use size everywhere. It is a bit ugly to overwrite fields in the
> evenlope struct...
> 
> > +}
> > +
> > +static int amqp_proto_close(URLContext *h)
> > +{
> > +    AMQPContext *s = h->priv_data;
> > +    amqp_channel_close(s->conn, DEFAULT_CHANNEL, AMQP_REPLY_SUCCESS);
> > +    amqp_connection_close(s->conn, AMQP_REPLY_SUCCESS);
> > +    amqp_destroy_connection(s->conn);
> > +
> > +    return 0;
> > +}
> > +
> > +static const AVClass amqp_context_class = {
> > +    .class_name = "amqp",
> > +    .item_name  = av_default_item_name,
> > +    .option     = options,
> > +    .version    = LIBAVUTIL_VERSION_INT,
> > +};
> > +
> > +const URLProtocol ff_libamqp_protocol = {
> > +    .name            = "amqp",
> > +    .url_close       = amqp_proto_close,
> > +    .url_open        = amqp_proto_open,
> > +    .url_read        = amqp_proto_read,
> > +    .url_write       = amqp_proto_write,
> > +    .priv_data_size  = sizeof(AMQPContext),
> > +    .priv_data_class = &amqp_context_class,
> > +    .flags           = URL_PROTOCOL_FLAG_NETWORK,
> > +};
> > diff --git a/libavformat/protocols.c b/libavformat/protocols.c
> > index 29fb99e7fa3..f1b8eab0fd6 100644
> > --- a/libavformat/protocols.c
> > +++ b/libavformat/protocols.c
> > @@ -60,6 +60,7 @@ extern const URLProtocol ff_tls_protocol;
> > extern const URLProtocol ff_udp_protocol;
> > extern const URLProtocol ff_udplite_protocol;
> > extern const URLProtocol ff_unix_protocol;
> > +extern const URLProtocol ff_libamqp_protocol;
> > extern const URLProtocol ff_librtmp_protocol;
> > extern const URLProtocol ff_librtmpe_protocol;
> > extern const URLProtocol ff_librtmps_protocol;
> > --
> 
> Regards,
> Marton

Thanks for reviewing. I agree with your points. 

I'll send a v2 with the metrics in case someone wants to use the patch in the
future.
Marton Balint Feb. 10, 2020, 12:47 a.m. UTC | #7
On Sun, 9 Feb 2020, Andriy Gelman wrote:

> On Sun, 09. Feb 22:58, Jean-Baptiste Kempf wrote:
>> 
>> 
>> On Sun, Feb 9, 2020, at 14:32, Timo Rothenpieler wrote:
>> > On 01.02.2020 20:02, Andriy Gelman wrote:
>> > > From: Andriy Gelman <andriy.gelman@gmail.com>
>> > > 
>> > > Supports connecting to a RabbitMQ broker via AMQP version 0-9-1. The
>> > > broker can redistribute content to other clients based on "exchange" and
>> > > "routing_key" fields.
>> > > ---
>> > > 
>> > > Compilation notes:
>> > > - Requires librabbitmq-dev package (on ubuntu).
>> > > - The pkg-config libprabbitmq.pc has a corrupt entry.
>> > >    The line "Libs.private: rt; -lpthread" should be changed to
>> > >    "Libs.private: -lrt -lpthread". I have made a bug report.
>> > > - Compile FFmpeg with --enable-librabbitmq
>> > > 
>> > > To run an example:
>> > > #
>> > > # Start the RabbitMQ broker (I use docker)
>> > > # The following starts the broker on localhost:5672. A webui is available on
>> > > # localhost:15672 (User/password is "guest" by default)
>> > > #
>> > > $ docker run -it --rm --name rabbitmq -p 127.0.0.1:5672:5672 -p 127.0.0.1:15672:15672 rabbitmq:3-management
>> > > 
>> > > #
>> > > # Stream to the RabbitMQ broker:
>> > > #
>> > > $ ./ffmpeg -re -f lavfi -i yuvtestsrc -codec:v libx264 -f mpegts -routing_key "amqp" -exchange "amq.direct" amqp://localhost:5672
>> > > 
>> > > #
>> > > # Connect any number of clients to fetch data from the broker:
>> > > # The clients are filtered by the routing_key and exchange.
>> > > #
>> > > $ ./ffplay -routing_key "amqp" -exchange "amq.direct" amqp://localhost:5672
>> > > 
>> > 
>> > Isn't RabbitMQ, and any message broker like it, more designed to pipe 
>> > short messages around?
>> > I'd imagine it's really not designed to shovel huge amounts of data, 
>> > like a full on video stream, around.
>
>> 
>> Tbh, it's not a standard protocol for multimedia. It's not a format either.
>> 
>
> Sure, I understand it's quite niche. 
>
>> I have a hard time seeing what this is doing in libavformat.
>>

Just because it is not common, I don't think it is stupid to use message 
brokers for distributing video/audio content. Message brokers are general 
purpose.

If you want to publish images every second or so with low latency then 
using message broker seems a lot cleaner then long-polling/websocket and 
HTTP publish.

Regards,
Marton
Jean-Baptiste Kempf Feb. 10, 2020, 5:43 a.m. UTC | #8
On Mon, Feb 10, 2020, at 01:47, Marton Balint wrote:
> 
> On Sun, 9 Feb 2020, Andriy Gelman wrote:
> 
> > On Sun, 09. Feb 22:58, Jean-Baptiste Kempf wrote:
> >> 
> >> 
> >> On Sun, Feb 9, 2020, at 14:32, Timo Rothenpieler wrote:
> >> > On 01.02.2020 20:02, Andriy Gelman wrote:
> >> > > From: Andriy Gelman <andriy.gelman@gmail.com>
> >> > > 
> >> > > Supports connecting to a RabbitMQ broker via AMQP version 0-9-1. The
> >> > > broker can redistribute content to other clients based on "exchange" and
> >> > > "routing_key" fields.
> >> > > ---
> >> > > 
> >> > > Compilation notes:
> >> > > - Requires librabbitmq-dev package (on ubuntu).
> >> > > - The pkg-config libprabbitmq.pc has a corrupt entry.
> >> > >    The line "Libs.private: rt; -lpthread" should be changed to
> >> > >    "Libs.private: -lrt -lpthread". I have made a bug report.
> >> > > - Compile FFmpeg with --enable-librabbitmq
> >> > > 
> >> > > To run an example:
> >> > > #
> >> > > # Start the RabbitMQ broker (I use docker)
> >> > > # The following starts the broker on localhost:5672. A webui is available on
> >> > > # localhost:15672 (User/password is "guest" by default)
> >> > > #
> >> > > $ docker run -it --rm --name rabbitmq -p 127.0.0.1:5672:5672 -p 127.0.0.1:15672:15672 rabbitmq:3-management
> >> > > 
> >> > > #
> >> > > # Stream to the RabbitMQ broker:
> >> > > #
> >> > > $ ./ffmpeg -re -f lavfi -i yuvtestsrc -codec:v libx264 -f mpegts -routing_key "amqp" -exchange "amq.direct" amqp://localhost:5672
> >> > > 
> >> > > #
> >> > > # Connect any number of clients to fetch data from the broker:
> >> > > # The clients are filtered by the routing_key and exchange.
> >> > > #
> >> > > $ ./ffplay -routing_key "amqp" -exchange "amq.direct" amqp://localhost:5672
> >> > > 
> >> > 
> >> > Isn't RabbitMQ, and any message broker like it, more designed to pipe 
> >> > short messages around?
> >> > I'd imagine it's really not designed to shovel huge amounts of data, 
> >> > like a full on video stream, around.
> >
> >> 
> >> Tbh, it's not a standard protocol for multimedia. It's not a format either.
> >> 
> >
> > Sure, I understand it's quite niche. 
> >
> >> I have a hard time seeing what this is doing in libavformat.
> >>
> 
> Just because it is not common, I don't think it is stupid to use message 
> brokers for distributing video/audio content. Message brokers are general 
> purpose.

Oh, I very well understand the reason, and nowhere did I say it was stupid.
First, because it is not stupid, and then, because that would be a blatant violation of the conducts I'd like to see in this project.

I just am not sure the place to write this code is in libavformat and not in either the tools, or in the client app.

And again, that's just my opinion.
Andriy Gelman Feb. 23, 2020, 3:58 p.m. UTC | #9
On Sun, 09. Feb 17:30, Andriy Gelman wrote:
> On Sun, 09. Feb 14:32, Timo Rothenpieler wrote:
> > On 01.02.2020 20:02, Andriy Gelman wrote:
> > > From: Andriy Gelman <andriy.gelman@gmail.com>
> > > 
> > > Supports connecting to a RabbitMQ broker via AMQP version 0-9-1. The
> > > broker can redistribute content to other clients based on "exchange" and
> > > "routing_key" fields.
> > > ---
> > > 
> > > Compilation notes:
> > > - Requires librabbitmq-dev package (on ubuntu).
> > > - The pkg-config libprabbitmq.pc has a corrupt entry.
> > >    The line "Libs.private: rt; -lpthread" should be changed to
> > >    "Libs.private: -lrt -lpthread". I have made a bug report.
> > > - Compile FFmpeg with --enable-librabbitmq
> > > 
> > > To run an example:
> > > #
> > > # Start the RabbitMQ broker (I use docker)
> > > # The following starts the broker on localhost:5672. A webui is available on
> > > # localhost:15672 (User/password is "guest" by default)
> > > #
> > > $ docker run -it --rm --name rabbitmq -p 127.0.0.1:5672:5672 -p 127.0.0.1:15672:15672 rabbitmq:3-management
> > > 
> > > #
> > > # Stream to the RabbitMQ broker:
> > > #
> > > $ ./ffmpeg -re -f lavfi -i yuvtestsrc -codec:v libx264 -f mpegts -routing_key "amqp" -exchange "amq.direct" amqp://localhost:5672
> > > 
> > > #
> > > # Connect any number of clients to fetch data from the broker:
> > > # The clients are filtered by the routing_key and exchange.
> > > #
> > > $ ./ffplay -routing_key "amqp" -exchange "amq.direct" amqp://localhost:5672
> > > 
> 
> > 
> > Isn't RabbitMQ, and any message broker like it, more designed to pipe short
> > messages around?
> > I'd imagine it's really not designed to shovel huge amounts of data, like a
> > full on video stream, around.
> 
> I didn't have any problems streaming to/from the broker, but I have to evaluate
> at which point it breaks. I'll share these numbers with ffmpeg-devel.
> 

I did a test on my intel i7-6600U CPU @ 2.6GHz with 16GB ram   

- One RabbitMQ broker started with: 
$ docker run -it --rm --name rabbitmq -p 127.0.0.1:5672:5672 -p 127.0.0.1:15672:15672 rabbitmq:3-management

- 1 client streaming to the broker (1280x720, 30fps, bitrate: 1236kb/s) 
$ ./ffmpeg -re input.mp4 -codec:v copy -an -f mpegts amqp://localhost

- 300 clients connecting to the broker and writing the output to null:
$ ./ffmpeg -i amqp://localhost -codec:v copy -f null -

- 2 clients playing the stream with ffplay
$ ./ffplay -i amqp://localhost

All the connections were visible on the broker monitor, and streams were running
at speed ~1.0x
At this point all my cores were close to 100%.
diff mbox series

Patch

diff --git a/Changelog b/Changelog
index a4d20a94310..0d2c1dcc2d9 100644
--- a/Changelog
+++ b/Changelog
@@ -33,6 +33,7 @@  version <next>:
 - Argonaut Games ADPCM decoder
 - Argonaut Games ASF demuxer
 - xfade video filter
+- AMQP protocol (RabbitMQ)
 
 
 version 4.2:
diff --git a/configure b/configure
index c02dbcc8b23..e421ecb5004 100755
--- a/configure
+++ b/configure
@@ -254,6 +254,7 @@  External library support:
   --enable-libopenmpt      enable decoding tracked files via libopenmpt [no]
   --enable-libopus         enable Opus de/encoding via libopus [no]
   --enable-libpulse        enable Pulseaudio input via libpulse [no]
+  --enable-librabbitmq     enable rabbitmq library [no]
   --enable-librav1e        enable AV1 encoding via rav1e [no]
   --enable-librsvg         enable SVG rasterization via librsvg [no]
   --enable-librubberband   enable rubberband needed for rubberband filter [no]
@@ -1786,6 +1787,7 @@  EXTERNAL_LIBRARY_LIST="
     libopenmpt
     libopus
     libpulse
+    librabbitmq
     librav1e
     librsvg
     librtmp
@@ -3430,6 +3432,8 @@  unix_protocol_deps="sys_un_h"
 unix_protocol_select="network"
 
 # external library protocols
+libamqp_protocol_deps="librabbitmq"
+libamqp_protocol_select="network"
 librtmp_protocol_deps="librtmp"
 librtmpe_protocol_deps="librtmp"
 librtmps_protocol_deps="librtmp"
@@ -6305,6 +6309,7 @@  enabled libopus           && {
     }
 }
 enabled libpulse          && require_pkg_config libpulse libpulse pulse/pulseaudio.h pa_context_new
+enabled librabbitmq       && require_pkg_config librabbitmq "librabbitmq >= 0.7.1" amqp.h amqp_new_connection
 enabled librav1e          && require_pkg_config librav1e "rav1e >= 0.1.0" rav1e.h rav1e_context_new
 enabled librsvg           && require_pkg_config librsvg librsvg-2.0 librsvg-2.0/librsvg/rsvg.h rsvg_handle_render_cairo
 enabled librtmp           && require_pkg_config librtmp librtmp librtmp/rtmp.h RTMP_Socket
diff --git a/doc/general.texi b/doc/general.texi
index 85db50462c2..4057a07632d 100644
--- a/doc/general.texi
+++ b/doc/general.texi
@@ -1326,6 +1326,7 @@  performance on systems without hardware floating point support).
 
 @multitable @columnfractions .4 .1
 @item Name         @tab Support
+@item AMQP         @tab X
 @item file         @tab X
 @item FTP          @tab X
 @item Gopher       @tab X
diff --git a/doc/protocols.texi b/doc/protocols.texi
index 5e8c97d1649..3d236291e77 100644
--- a/doc/protocols.texi
+++ b/doc/protocols.texi
@@ -51,6 +51,59 @@  in microseconds.
 
 A description of the currently available protocols follows.
 
+@section amqp
+
+Advanced Message Queueing Protocol (AMQP) version 0-9-1 is a broker based
+publish-subscribe communication protocol.
+
+FFmpeg must be compiled with --enable-librabbitmq to support AMQP. A separate
+AMQP broker must also be run. An example open-source AMQP broker is RabbitMQ.
+
+When connecting to the broker, a client sets an "exchange" and a "routing key".
+These keys are used to filter connections: A streaming client will only receive
+the data that matches their "exchange" and "routing key".
+
+After starting the broker, an FFmpeg client may stream data to the broker using
+the command:
+
+@example
+ffmpeg -re -i input -f mpegts amqp://[user:password@@]hostname:port
+@end example
+
+Where hostname and port is the location of the broker. The client may also
+set a user/password for authentication. The defaults for both fields are
+"guest".
+
+A separate instance can stream from the broker using the command:
+@example
+ffplay amqp://[user:password@@]hostname:port
+@end example
+
+The protocol supports the following options:
+
+@table @option
+
+@item routing_key
+Sets the routing key. The default value is "amqp". Clients can
+only stream data which has the same key. Multiple clients may stream data to the
+broker with different keys.
+
+@item exchange
+Sets the exchange to use on the broker. The default value is "amqp.direct".  A
+broker may have multiple exchanges which are configured on the broker side.
+
+@item pkt_size
+Maximum size of each packet sent/received to the broker. Default is 131072.
+Minimum is 4096 and max is any large value (representable by an int). When
+receiving packets, this sets an internal buffer size in FFmpeg. It should be
+equal to or greater than the size of the sent packets to the broker. Otherwise
+the received message may be truncated causing decoding errors.
+
+@item connection_timeout
+The timeout in milliseconds during the initial connection to the broker.
+
+@end table
+
 @section async
 
 Asynchronous data filling wrapper for input stream.
diff --git a/libavformat/Makefile b/libavformat/Makefile
index ba6ea8c4a62..8889f60cb92 100644
--- a/libavformat/Makefile
+++ b/libavformat/Makefile
@@ -627,6 +627,7 @@  OBJS-$(CONFIG_UDPLITE_PROTOCOL)          += udp.o ip.o
 OBJS-$(CONFIG_UNIX_PROTOCOL)             += unix.o
 
 # external library protocols
+OBJS-$(CONFIG_LIBAMQP_PROTOCOL)          += libamqp.o
 OBJS-$(CONFIG_LIBRTMP_PROTOCOL)          += librtmp.o
 OBJS-$(CONFIG_LIBRTMPE_PROTOCOL)         += librtmp.o
 OBJS-$(CONFIG_LIBRTMPS_PROTOCOL)         += librtmp.o
diff --git a/libavformat/libamqp.c b/libavformat/libamqp.c
new file mode 100644
index 00000000000..2b45fdaf193
--- /dev/null
+++ b/libavformat/libamqp.c
@@ -0,0 +1,272 @@ 
+/*
+ * AMQP Protocol
+ * Copyright (c) 2020 Andriy Gelman
+ *
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <amqp.h>
+#include <amqp_tcp_socket.h>
+#include <sys/time.h>
+#include "avformat.h"
+#include "libavutil/avstring.h"
+#include "libavutil/opt.h"
+#include "libavutil/time.h"
+#include "network.h"
+#include "url.h"
+
+typedef struct AMQPContext {
+    const AVClass *class;
+    amqp_connection_state_t conn;
+    amqp_socket_t *socket;
+    const char *routing_key;
+    const char *exchange;
+    int pkt_size;
+    int connection_timeout;
+    int pkt_size_overflow;
+} AMQPContext;
+
+#define ARRAY_LEN           1024
+#define DEFAULT_CHANNEL     1
+
+#define OFFSET(x) offsetof(AMQPContext, x)
+#define D AV_OPT_FLAG_DECODING_PARAM
+#define E AV_OPT_FLAG_ENCODING_PARAM
+static const AVOption options[] = {
+    { "pkt_size", "Maximum send/read packet size", OFFSET(pkt_size), AV_OPT_TYPE_INT, { .i64 = 131072 }, 4096, INT_MAX, .flags = D | E },
+    { "routing_key", "Key to filter streams", OFFSET(routing_key), AV_OPT_TYPE_STRING, { .str = "amqp" }, 0, 0, .flags = D | E },
+    { "exchange", "Exchange to send/read packets", OFFSET(exchange), AV_OPT_TYPE_STRING, { .str = "amq.direct" }, 0, 0, .flags = D | E },
+    { "connection_timeout", "Initial connection timeout", OFFSET(connection_timeout), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D | E},
+    { NULL }
+};
+
+static int amqp_proto_open(URLContext *h, const char *uri, int flags)
+{
+    int ret, server_msg;
+    char hostname[ARRAY_LEN], credentials[ARRAY_LEN];
+    int port;
+    const char *user, *password;
+    char *end;
+    struct timeval tval = { 0 };
+
+    amqp_rpc_reply_t broker_reply;
+
+    AMQPContext *s = h->priv_data;
+
+    h->is_streamed     = 1;
+    h->max_packet_size = s->pkt_size;
+
+    av_url_split(NULL, 0, credentials, sizeof(credentials),
+                 hostname, sizeof(hostname), &port, NULL, 0, uri);
+
+    if (hostname[0] == '\0' || port < 0 || port > 65535 ) {
+        av_log(h, AV_LOG_ERROR, "Invalid hostname/port\n");
+        return AVERROR(EIO);
+    }
+
+    user = av_strtok(credentials, ":", &end);
+    if (!user)
+        user = "guest";
+
+    password = av_strtok(NULL, ":", &end);
+    if (!password)
+        password = "guest";
+
+    s->conn = amqp_new_connection();
+    if (!s->conn) {
+        av_log(h, AV_LOG_ERROR, "Error creating connection\n");
+        return AVERROR_EXTERNAL;
+    }
+
+    s->socket = amqp_tcp_socket_new(s->conn);
+    if (!s->socket) {
+        av_log(h, AV_LOG_ERROR, "Error creating socket\n");
+        goto destroy_connection;
+    }
+
+    if (s->connection_timeout > 0) {
+        tval.tv_sec  = s->connection_timeout / 1000000;
+        tval.tv_usec = s->connection_timeout % 1000000;
+        ret = amqp_socket_open_noblock(s->socket, hostname, port, &tval);
+    }
+    else
+        ret = amqp_socket_open_noblock(s->socket, hostname, port, NULL);
+
+    if (ret) {
+        av_log(h, AV_LOG_ERROR, "Error connecting to server\n");
+        goto destroy_connection;
+    }
+
+    broker_reply = amqp_login(s->conn, "/", 0, s->pkt_size, 0,
+                              AMQP_SASL_METHOD_PLAIN, user, password);
+
+    if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
+        av_log(h, AV_LOG_ERROR, "Error login\n");
+        server_msg = AMQP_ACCESS_REFUSED;
+        goto close_connection;
+    }
+
+    amqp_channel_open(s->conn, DEFAULT_CHANNEL);
+    broker_reply = amqp_get_rpc_reply(s->conn);
+
+    if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
+        av_log(h, AV_LOG_ERROR, "Error set channel\n");
+        server_msg = AMQP_CHANNEL_ERROR;
+        goto close_connection;
+    }
+
+    if (h->flags & AVIO_FLAG_READ) {
+        amqp_bytes_t queuename;
+        char queuename_buff[ARRAY_LEN];
+        amqp_queue_declare_ok_t *r;
+
+        r = amqp_queue_declare(s->conn, DEFAULT_CHANNEL, amqp_empty_bytes,
+                               0, 0, 0, 1, amqp_empty_table);
+        broker_reply = amqp_get_rpc_reply(s->conn);
+        if (!r || broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
+            av_log(h, AV_LOG_ERROR, "Error declare queue\n");
+            server_msg = AMQP_RESOURCE_ERROR;
+            goto close_channel;
+        }
+
+        /* backup queuename */
+        queuename.bytes = queuename_buff;
+        queuename.len = FFMIN(r->queue.len, ARRAY_LEN);
+        memcpy(queuename.bytes, r->queue.bytes, queuename.len);
+
+        amqp_queue_bind(s->conn, DEFAULT_CHANNEL, queuename, amqp_cstring_bytes(s->exchange),
+                        amqp_cstring_bytes(s->routing_key), amqp_empty_table);
+
+        broker_reply = amqp_get_rpc_reply(s->conn);
+        if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
+            av_log(h, AV_LOG_ERROR, "Queue bind error\n");
+            server_msg = AMQP_INTERNAL_ERROR;
+            goto close_channel;
+        }
+
+        amqp_basic_consume(s->conn, DEFAULT_CHANNEL, queuename, amqp_empty_bytes,
+                           0, 1, 0, amqp_empty_table);
+
+        broker_reply = amqp_get_rpc_reply(s->conn);
+        if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
+            av_log(h, AV_LOG_ERROR, "Set consume error\n");
+            server_msg = AMQP_INTERNAL_ERROR;
+            goto close_channel;
+        }
+    }
+
+    return 0;
+
+close_channel:
+    amqp_channel_close(s->conn, DEFAULT_CHANNEL, server_msg);
+close_connection:
+    amqp_connection_close(s->conn, server_msg);
+destroy_connection:
+    amqp_destroy_connection(s->conn);
+
+    return AVERROR_EXTERNAL;
+}
+
+static int amqp_proto_write(URLContext *h, const unsigned char *buf, int size)
+{
+    int ret;
+    AMQPContext *s = h->priv_data;
+    int fd = amqp_socket_get_sockfd(s->socket);
+
+    amqp_bytes_t message = { size, (void *)buf };
+    amqp_basic_properties_t props;
+
+    ret = ff_network_wait_fd_timeout(fd, 1, h->rw_timeout, &h->interrupt_callback);
+    if (ret)
+        return ret;
+
+    props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
+    props.content_type = amqp_cstring_bytes("octet/stream");
+    props.delivery_mode = 2; /* persistent delivery mode */
+
+    ret = amqp_basic_publish(s->conn, DEFAULT_CHANNEL, amqp_cstring_bytes(s->exchange),
+                             amqp_cstring_bytes(s->routing_key), 0, 0,
+                             &props, message);
+
+    if (ret) {
+        av_log(h, AV_LOG_ERROR, "Error publish\n");
+        return AVERROR_EXTERNAL;
+    }
+
+    return size;
+}
+
+static int amqp_proto_read(URLContext *h, unsigned char *buf, int size)
+{
+    AMQPContext *s = h->priv_data;
+    int fd = amqp_socket_get_sockfd(s->socket);
+    int ret;
+
+    amqp_rpc_reply_t broker_reply;
+    amqp_envelope_t envelope;
+
+    ret = ff_network_wait_fd_timeout(fd, 0, h->rw_timeout, &h->interrupt_callback);
+    if (ret)
+        return ret;
+
+    amqp_maybe_release_buffers(s->conn);
+    broker_reply = amqp_consume_message(s->conn, &envelope, NULL, 0);
+
+    if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL)
+        return AVERROR_EXTERNAL;
+
+    if (envelope.message.body.len > size) {
+        s->pkt_size_overflow = FFMAX(s->pkt_size_overflow, envelope.message.body.len);
+        av_log(h, AV_LOG_WARNING, "Message exceeds space in the buffer. "
+                                  "Message will be truncated. Setting -pkt_size %d "
+                                  "may resolve the issue.\n", s->pkt_size_overflow);
+        envelope.message.body.len = size;
+    }
+
+    memcpy(buf, envelope.message.body.bytes, envelope.message.body.len);
+    amqp_destroy_envelope(&envelope);
+
+    return envelope.message.body.len;
+}
+
+static int amqp_proto_close(URLContext *h)
+{
+    AMQPContext *s = h->priv_data;
+    amqp_channel_close(s->conn, DEFAULT_CHANNEL, AMQP_REPLY_SUCCESS);
+    amqp_connection_close(s->conn, AMQP_REPLY_SUCCESS);
+    amqp_destroy_connection(s->conn);
+
+    return 0;
+}
+
+static const AVClass amqp_context_class = {
+    .class_name = "amqp",
+    .item_name  = av_default_item_name,
+    .option     = options,
+    .version    = LIBAVUTIL_VERSION_INT,
+};
+
+const URLProtocol ff_libamqp_protocol = {
+    .name            = "amqp",
+    .url_close       = amqp_proto_close,
+    .url_open        = amqp_proto_open,
+    .url_read        = amqp_proto_read,
+    .url_write       = amqp_proto_write,
+    .priv_data_size  = sizeof(AMQPContext),
+    .priv_data_class = &amqp_context_class,
+    .flags           = URL_PROTOCOL_FLAG_NETWORK,
+};
diff --git a/libavformat/protocols.c b/libavformat/protocols.c
index 29fb99e7fa3..f1b8eab0fd6 100644
--- a/libavformat/protocols.c
+++ b/libavformat/protocols.c
@@ -60,6 +60,7 @@  extern const URLProtocol ff_tls_protocol;
 extern const URLProtocol ff_udp_protocol;
 extern const URLProtocol ff_udplite_protocol;
 extern const URLProtocol ff_unix_protocol;
+extern const URLProtocol ff_libamqp_protocol;
 extern const URLProtocol ff_librtmp_protocol;
 extern const URLProtocol ff_librtmpe_protocol;
 extern const URLProtocol ff_librtmps_protocol;