diff mbox

[FFmpeg-devel,2/4] libavformat/tcp: Added an option to reuse sockets

Message ID 1509697623-2745-2-git-send-email-kjeyapal@akamai.com
State New
Headers show

Commit Message

Jeyapal, Karthick Nov. 3, 2017, 8:27 a.m. UTC
---
 doc/protocols.texi  |   4 ++
 libavformat/tcp.c   | 150 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 libavformat/tcp.h   |  27 ++++++++++
 libavformat/utils.c |   2 +
 4 files changed, 183 insertions(+)
 create mode 100644 libavformat/tcp.h

Comments

Hendrik Leppkes Nov. 3, 2017, 8:38 a.m. UTC | #1
On Fri, Nov 3, 2017 at 9:27 AM, Karthick J <kjeyapal@akamai.com> wrote:
> ---
>  doc/protocols.texi  |   4 ++
>  libavformat/tcp.c   | 150 ++++++++++++++++++++++++++++++++++++++++++++++++++++
>  libavformat/tcp.h   |  27 ++++++++++
>  libavformat/utils.c |   2 +
>  4 files changed, 183 insertions(+)
>  create mode 100644 libavformat/tcp.h
>
> diff --git a/doc/protocols.texi b/doc/protocols.texi
> index a7968ff..62d317d 100644
> --- a/doc/protocols.texi
> +++ b/doc/protocols.texi
> @@ -1242,6 +1242,10 @@ Set receive buffer size, expressed bytes.
>
>  @item send_buffer_size=@var{bytes}
>  Set send buffer size, expressed bytes.
> +
> +@item reuse_sockets=@var{1|0}
> +Reuse sockets instead of opening a new socket each time.
> +Default value is 0.
>  @end table
>
>  The following example shows how to setup a listening TCP connection
> diff --git a/libavformat/tcp.c b/libavformat/tcp.c
> index 06368ff..8bca628 100644
> --- a/libavformat/tcp.c
> +++ b/libavformat/tcp.c
> @@ -1,6 +1,7 @@
>  /*
>   * TCP protocol
>   * Copyright (c) 2002 Fabrice Bellard
> + * Copyright (c) 2017 Akamai Technologies, Inc
>   *
>   * This file is part of FFmpeg.
>   *
> @@ -19,6 +20,8 @@
>   * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
>   */
>  #include "avformat.h"
> +#include "tcp.h"
> +#include "libavcodec/internal.h"
>  #include "libavutil/avassert.h"
>  #include "libavutil/parseutils.h"
>  #include "libavutil/opt.h"
> @@ -38,6 +41,7 @@ typedef struct TCPOptions {
>      int listen_timeout;
>      int recv_buffer_size;
>      int send_buffer_size;
> +    int reuse_sockets;
>  } TCPOptions;
>
>  typedef struct TCPContext {
> @@ -47,6 +51,16 @@ typedef struct TCPContext {
>      TCPOptions options;
>  } TCPContext;
>
> +typedef struct TCPSocket {
> +    char *hostname;
> +    int  port;
> +    int in_use;
> +    int64_t last_close_time;
> +    int  fd;
> +    TCPOptions options;
> +    struct TCPSocket *next;
> +} TCPSocket;
> +
>  #define OFFSET(x) (offsetof(TCPContext, options) + offsetof(TCPOptions, x))
>  #define D AV_OPT_FLAG_DECODING_PARAM
>  #define E AV_OPT_FLAG_ENCODING_PARAM
> @@ -56,6 +70,7 @@ static const AVOption options[] = {
>      { "listen_timeout",  "Connection awaiting timeout (in milliseconds)",      OFFSET(listen_timeout), AV_OPT_TYPE_INT, { .i64 = -1 },         -1, INT_MAX, .flags = D|E },
>      { "send_buffer_size", "Socket send buffer size (in bytes)",                OFFSET(send_buffer_size), AV_OPT_TYPE_INT, { .i64 = -1 },         -1, INT_MAX, .flags = D|E },
>      { "recv_buffer_size", "Socket receive buffer size (in bytes)",             OFFSET(recv_buffer_size), AV_OPT_TYPE_INT, { .i64 = -1 },         -1, INT_MAX, .flags = D|E },
> +    { "reuse_sockets", "Reuse sockets instead of opening a new socket each time", OFFSET(reuse_sockets), AV_OPT_TYPE_BOOL, { .i64 = 0 },         0, 1, .flags = D|E },
>      { NULL }
>  };
>
> @@ -66,6 +81,116 @@ static const AVClass tcp_class = {
>      .version    = LIBAVUTIL_VERSION_INT,
>  };
>
> +static TCPSocket *first_socket = NULL;

We're not very fond of global state, especially in new code. A better
way would be to have a socket pool structure that a caller can manage,
instead of having it globally.
(Additionally, we're also trying to really get rid of the global
avformat lock, which this is using).
Nicolas George Nov. 3, 2017, 8:48 a.m. UTC | #2
Le tridi 13 brumaire, an CCXXVI, Karthick J a écrit :
> +static TCPSocket *first_socket = NULL;

<snip>

> +    avpriv_lock_avformat();
> +    socket->next = first_socket;
> +    first_socket = socket;
> +    avpriv_unlock_avformat();

I strongly oppose this: we do not want new global state.

Also, I am very doubtful about the feature itself. It is public, and as
such should be usable for many uses, but I do not see it useful for
anything but patches 3 and 4. Furthermore, it is a break of abstraction,
sockets lose their simple and clear API, they are not really closed when
they are closed, etc.

If you are trying to implement keepalive HTTP for HLS, then most of the
code should be in http.c and hls*.c, not tcp.c.

Regards,
Jeyapal, Karthick Nov. 3, 2017, 11:10 a.m. UTC | #3
>On 11/3/17, 2:18 PM, "Nicolas George" <george@nsup.org> wrote:


>I strongly oppose this: we do not want new global state.

I also agree. I am open to suggestions, and I would be happy re-implement this feature without new global state.

>Also, I am very doubtful about the feature itself. It is public, and as

>such should be usable for many uses, but I do not see it useful for

>anything but patches 3 and 4. 

Well beyond patch 3 and 4, this feature is usable for HLS player, DASH player and DASH encoder. 
Any other segmented streaming standard would also benefit from this feature.
With internet streaming having moved towards segmented streaming standards, I would think this feature is very useful.
Right now, the performance of hlsenc, dashenc etc., suffers terribly for http output with lower segment sizes.
Also, this feature could be extended for HTTPS/TLS connections as well.

>Furthermore, it is a break of abstraction,

>sockets lose their simple and clear API, they are not really closed when

>they are closed, etc.

Technically it doesn’t break any abstraction. Internally reusing socket connections,
could be thought of as an implementation level detail to improve performance. 
For example, android HTTP APIs reuse the sockets internally to improve performance
without affecting the abstraction.
https://developer.android.com/reference/java/net/HttpURLConnection.html
 
>If you are trying to implement keepalive HTTP for HLS, then most of the

>code should be in http.c and hls*.c, not tcp.c.

That’s a very good point. I had also thought in similar lines. Here is an 
approach that I could think of. 
- Open just one http connection from hlsenc.c
- Somehow call http_shutdown instead of http_close during end of segments/file
- For opening a new segment/file hlsenc calls http_write (with relevant http headers 
composed by hlsenc) instead of a fresh http_open
If ffmpeg maintainers agree to the above approach, I am willing to implement it 
that way. 

Or if anybody proposes a better approach, I am willing to consider that as well.

>Regards,

>

>-- 

>  Nicolas George
Aman Karmani Nov. 3, 2017, 4:06 p.m. UTC | #4
On Fri, Nov 3, 2017 at 4:10 AM, Jeyapal, Karthick <kjeyapal@akamai.com>
wrote:

> >On 11/3/17, 2:18 PM, "Nicolas George" <george@nsup.org> wrote:
>
> >I strongly oppose this: we do not want new global state.
> I also agree. I am open to suggestions, and I would be happy re-implement
> this feature without new global state.
>
> >Also, I am very doubtful about the feature itself. It is public, and as
> >such should be usable for many uses, but I do not see it useful for
> >anything but patches 3 and 4.
> Well beyond patch 3 and 4, this feature is usable for HLS player, DASH
> player and DASH encoder.
> Any other segmented streaming standard would also benefit from this
> feature.
> With internet streaming having moved towards segmented streaming
> standards, I would think this feature is very useful.
> Right now, the performance of hlsenc, dashenc etc., suffers terribly for
> http output with lower segment sizes.
> Also, this feature could be extended for HTTPS/TLS connections as well.
>
> >Furthermore, it is a break of abstraction,
> >sockets lose their simple and clear API, they are not really closed when
> >they are closed, etc.
> Technically it doesn’t break any abstraction. Internally reusing socket
> connections,
> could be thought of as an implementation level detail to improve
> performance.
> For example, android HTTP APIs reuse the sockets internally to improve
> performance
> without affecting the abstraction.
> https://developer.android.com/reference/java/net/HttpURLConnection.html
>
> >If you are trying to implement keepalive HTTP for HLS, then most of the
> >code should be in http.c and hls*.c, not tcp.c.
> That’s a very good point. I had also thought in similar lines. Here is an
> approach that I could think of.
> - Open just one http connection from hlsenc.c
> - Somehow call http_shutdown instead of http_close during end of
> segments/file
> - For opening a new segment/file hlsenc calls http_write (with relevant
> http headers
> composed by hlsenc) instead of a fresh http_open
> If ffmpeg maintainers agree to the above approach, I am willing to
> implement it
> that way.
>


I recently implemented keepalive support in the hls demuxer, and have been
using it with a lot of success. See my patchset:
http://ffmpeg.org/pipermail/ffmpeg-devel/2017-October/217353.html

Aman


>
> Or if anybody proposes a better approach, I am willing to consider that as
> well.
>
> >Regards,
> >
> >--
> >  Nicolas George
>
>
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel@ffmpeg.org
> http://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
Jeyapal, Karthick Nov. 7, 2017, 5:14 a.m. UTC | #5
>On 11/3/17, 9:44 PM, "Aman Gupta" <ffmpeg@tmm1.net> wrote:

>

>I recently implemented keepalive support in the hls demuxer, and have been

>using it with a lot of success. See my patchset:

>http://ffmpeg.org/pipermail/ffmpeg-devel/2017-October/217353.html

>

>Aman


Thanks a lot for your excellent suggestion. 
I was not aware of the global function ff_http_do_new_request.
Now I will attempt to re-implement this feature along similar lines of your patch.

Regards,
Karthick
Jeyapal, Karthick Nov. 7, 2017, 10:40 a.m. UTC | #6
>On 11/7/17, 10:45 AM, "Jeyapal, Karthick" <kjeyapal@akamai.com> wrote:

>

>>On 11/3/17, 9:44 PM, "Aman Gupta" <ffmpeg@tmm1.net> wrote:

>>

>>I recently implemented keepalive support in the hls demuxer, and have been

>>using it with a lot of success. See my patchset:

>>http://ffmpeg.org/pipermail/ffmpeg-devel/2017-October/217353.html

>>

>>Aman

>

>Thanks a lot for your excellent suggestion. 

>I was not aware of the global function ff_http_do_new_request.

>Now I will attempt to re-implement this feature along similar lines of your patch.

>

>Regards,

>Karthick 


Well, usage of global function ff_http_do_new_request made it much simpler.
Have submitted a completely new patch series for the same feature.
http://ffmpeg.org/pipermail/ffmpeg-devel/2017-November/219301.html
Hope the new approach is acceptable :)

Regards,
Karthick
diff mbox

Patch

diff --git a/doc/protocols.texi b/doc/protocols.texi
index a7968ff..62d317d 100644
--- a/doc/protocols.texi
+++ b/doc/protocols.texi
@@ -1242,6 +1242,10 @@  Set receive buffer size, expressed bytes.
 
 @item send_buffer_size=@var{bytes}
 Set send buffer size, expressed bytes.
+
+@item reuse_sockets=@var{1|0}
+Reuse sockets instead of opening a new socket each time.
+Default value is 0.
 @end table
 
 The following example shows how to setup a listening TCP connection
diff --git a/libavformat/tcp.c b/libavformat/tcp.c
index 06368ff..8bca628 100644
--- a/libavformat/tcp.c
+++ b/libavformat/tcp.c
@@ -1,6 +1,7 @@ 
 /*
  * TCP protocol
  * Copyright (c) 2002 Fabrice Bellard
+ * Copyright (c) 2017 Akamai Technologies, Inc
  *
  * This file is part of FFmpeg.
  *
@@ -19,6 +20,8 @@ 
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  */
 #include "avformat.h"
+#include "tcp.h"
+#include "libavcodec/internal.h"
 #include "libavutil/avassert.h"
 #include "libavutil/parseutils.h"
 #include "libavutil/opt.h"
@@ -38,6 +41,7 @@  typedef struct TCPOptions {
     int listen_timeout;
     int recv_buffer_size;
     int send_buffer_size;
+    int reuse_sockets;
 } TCPOptions;
 
 typedef struct TCPContext {
@@ -47,6 +51,16 @@  typedef struct TCPContext {
     TCPOptions options;
 } TCPContext;
 
+typedef struct TCPSocket {
+    char *hostname;
+    int  port;
+    int in_use;
+    int64_t last_close_time;
+    int  fd;
+    TCPOptions options;
+    struct TCPSocket *next;
+} TCPSocket;
+
 #define OFFSET(x) (offsetof(TCPContext, options) + offsetof(TCPOptions, x))
 #define D AV_OPT_FLAG_DECODING_PARAM
 #define E AV_OPT_FLAG_ENCODING_PARAM
@@ -56,6 +70,7 @@  static const AVOption options[] = {
     { "listen_timeout",  "Connection awaiting timeout (in milliseconds)",      OFFSET(listen_timeout), AV_OPT_TYPE_INT, { .i64 = -1 },         -1, INT_MAX, .flags = D|E },
     { "send_buffer_size", "Socket send buffer size (in bytes)",                OFFSET(send_buffer_size), AV_OPT_TYPE_INT, { .i64 = -1 },         -1, INT_MAX, .flags = D|E },
     { "recv_buffer_size", "Socket receive buffer size (in bytes)",             OFFSET(recv_buffer_size), AV_OPT_TYPE_INT, { .i64 = -1 },         -1, INT_MAX, .flags = D|E },
+    { "reuse_sockets", "Reuse sockets instead of opening a new socket each time", OFFSET(reuse_sockets), AV_OPT_TYPE_BOOL, { .i64 = 0 },         0, 1, .flags = D|E },
     { NULL }
 };
 
@@ -66,6 +81,116 @@  static const AVClass tcp_class = {
     .version    = LIBAVUTIL_VERSION_INT,
 };
 
+static TCPSocket *first_socket = NULL;
+
+static TCPSocket* tcp_socket_create (TCPContext *s, char *name, int port)
+{
+    TCPSocket *p = NULL;
+
+    p = (TCPSocket *) calloc (1, sizeof(TCPSocket));
+    if (p == NULL) {
+        return NULL;
+    }
+
+    p->hostname = strdup(name);
+    p->port = port;
+    p->in_use = 1;
+    p->fd = s->fd;
+    p->options = s->options;
+    p->next = NULL;
+
+    return p;
+}
+
+static int tcp_socket_free (TCPSocket *socket)
+{
+    if (socket) {
+        closesocket(socket->fd);
+        free(socket->hostname);
+        free(socket);
+    }
+    return 0;
+}
+
+static void tcp_list_add(TCPContext *s, char *name, int port)
+{
+    TCPSocket *socket = tcp_socket_create(s, name, port);
+
+    if (!socket)
+        return;
+    avpriv_lock_avformat();
+    socket->next = first_socket;
+    first_socket = socket;
+    avpriv_unlock_avformat();
+
+}
+
+static void tcp_list_remove_next(TCPSocket *socket)
+{
+    TCPSocket *temp;
+    if (socket) {
+        temp = socket->next;
+        socket->next = socket->next->next;
+    } else {
+        temp = first_socket;
+        first_socket = first_socket->next;
+    }
+    tcp_socket_free(temp);
+
+}
+
+static int tcp_socket_find (TCPContext *s, char *name, int port)
+{
+    int     lfd = -1;
+    TCPSocket *p = first_socket;
+    TCPSocket *prev = NULL;
+    int64_t current_time = av_gettime();
+    avpriv_lock_avformat();
+    while(p) {
+        const int idle_timeout = 60 * 1000000;
+        // Remove idle connections
+        if (!p->in_use && (current_time - p->last_close_time) > idle_timeout) {
+            tcp_list_remove_next(prev);
+        } else {
+            // Reuse the connection if a correct match if found
+            if (!p->in_use && lfd == -1 &&
+                !strcmp(p->hostname, name) && (p->port == port) &&
+                !memcmp(&p->options, &s->options, sizeof(s->options))) {
+                p->in_use = 1;
+                lfd = p->fd;
+            }
+            prev = p;
+        }
+        p = p->next;
+
+    }
+    avpriv_unlock_avformat();
+    return lfd;
+}
+
+static void tcp_socket_release (int search_fd)
+{
+    TCPSocket *p = first_socket;
+
+    while(p) {
+        if (p->fd == search_fd) {
+            p->last_close_time = av_gettime();
+            p->in_use = 0;
+            break;
+        }
+        p = p->next;
+    }
+}
+
+void ff_tcp_deinit(void)
+{
+    avpriv_lock_avformat();
+    while(first_socket) {
+        tcp_list_remove_next(NULL);
+    }
+    avpriv_unlock_avformat();
+}
+
 /* return non zero if error */
 static int tcp_open(URLContext *h, const char *uri, int flags)
 {
@@ -107,6 +232,16 @@  static int tcp_open(URLContext *h, const char *uri, int flags)
         s->open_timeout =
         h->rw_timeout   = s->options.rw_timeout;
     }
+    /* Check for an existing connection */
+    if (s->options.reuse_sockets) {
+        int reuse_fd = tcp_socket_find(s, hostname, port);
+        if (-1 != reuse_fd) {
+            h->is_streamed = 1;
+            s->fd = reuse_fd;
+            av_log (h, AV_LOG_INFO, "reusing socket fd = %d\n", reuse_fd);
+            return 0;
+        }
+    }
     hints.ai_family = AF_UNSPEC;
     hints.ai_socktype = SOCK_STREAM;
     snprintf(portstr, sizeof(portstr), "%d", port);
@@ -178,6 +313,10 @@  static int tcp_open(URLContext *h, const char *uri, int flags)
     h->is_streamed = 1;
     s->fd = fd;
 
+    if (s->options.reuse_sockets) {
+        tcp_list_add (s, hostname, port);
+        av_log (h, AV_LOG_DEBUG, "add socket fd = %d\n", fd);
+    }
     freeaddrinfo(ai);
     return 0;
 
@@ -246,6 +385,11 @@  static int tcp_shutdown(URLContext *h, int flags)
     TCPContext *s = h->priv_data;
     int how;
 
+    if (s->options.reuse_sockets) {
+        av_log (h, AV_LOG_DEBUG, "Not shutting down.. assuming Re-use later\n");
+        return 0;
+    }
+
     if (flags & AVIO_FLAG_WRITE && flags & AVIO_FLAG_READ) {
         how = SHUT_RDWR;
     } else if (flags & AVIO_FLAG_WRITE) {
@@ -260,6 +404,12 @@  static int tcp_shutdown(URLContext *h, int flags)
 static int tcp_close(URLContext *h)
 {
     TCPContext *s = h->priv_data;
+
+    if (s->options.reuse_sockets) {
+        av_log (h, AV_LOG_DEBUG, "Not closing.. assuming Re-use later\n");
+        tcp_socket_release(s->fd);
+        return 0;
+    }
     closesocket(s->fd);
     return 0;
 }
diff --git a/libavformat/tcp.h b/libavformat/tcp.h
new file mode 100644
index 0000000..c36a4de
--- /dev/null
+++ b/libavformat/tcp.h
@@ -0,0 +1,27 @@ 
+/*
+ * TCP Protocol
+ * Copyright (c) 2017 Akamai Technologies, Inc.
+ *
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef AVFORMAT_TCP_H_
+#define AVFORMAT_TCP_H_
+
+void ff_tcp_deinit(void);
+
+#endif /* AVFORMAT_TCP_H_ */
diff --git a/libavformat/utils.c b/libavformat/utils.c
index cbfb78b..9b22c47 100644
--- a/libavformat/utils.c
+++ b/libavformat/utils.c
@@ -48,6 +48,7 @@ 
 #include "metadata.h"
 #if CONFIG_NETWORK
 #include "network.h"
+#include "tcp.h"
 #endif
 #include "riff.h"
 #include "url.h"
@@ -4859,6 +4860,7 @@  int avformat_network_deinit(void)
 #if CONFIG_NETWORK
     ff_network_close();
     ff_tls_deinit();
+    ff_tcp_deinit();
     ff_network_inited_globally = 0;
 #endif
     return 0;