diff mbox series

[FFmpeg-devel,1/1] libavformat/rtmp: Implements RTMP reconnect feature

Message ID 20210926205201.1163-1-jordi.cenzano@gmail.com
State New
Headers show
Series [FFmpeg-devel,1/1] libavformat/rtmp: Implements RTMP reconnect feature | expand

Checks

Context Check Description
andriy/make_x86 success Make finished
andriy/make_fate_x86 success Make fate finished
andriy/make_ppc success Make finished
andriy/make_fate_ppc success Make fate finished

Commit Message

Jordi Cenzano Sept. 26, 2021, 8:52 p.m. UTC
Nowadays when you are streaming to a live platform if the RTMP(s)
server needs to restarted for any reason (ex: deploy new version)
the RTMP connection is interrupted (probably after some draining time).
Facebook will publish a proposal to avoid that by sending a
GoAway message in the RTMP protocol.
This code is the reference client implementation of that proposal.
AFAIK other big live platforms showed their interest in implementing
this mechanism.
This can be already tested against Facebook live production using
the querystring parameter ?ccr_sec=120 (that indicates the backend
to send a disconnect signal after those seconds)
---
 libavformat/rtmppkt.c   |  19 +++
 libavformat/rtmppkt.h   |  10 ++
 libavformat/rtmpproto.c | 356 +++++++++++++++++++++++++++++++++++++---
 3 files changed, 359 insertions(+), 26 deletions(-)

Comments

Alex Converse Oct. 21, 2021, 6:51 p.m. UTC | #1
> Nowadays when you are streaming to a live platform if the RTMP(s)
> server needs to restarted for any reason (ex: deploy new version)
> the RTMP connection is interrupted (probably after some draining time).
> Facebook will publish a proposal to avoid that by sending a
> GoAway message in the RTMP protocol.
> This code is the reference client implementation of that proposal.
> AFAIK other big live platforms showed their interest in implementing
> this mechanism.
> This can be already tested against Facebook live production using
> the querystring parameter ?ccr_sec=120 (that indicates the backend
> to send a disconnect signal after those seconds)

It seems like this approach is operating from the assumption that the
time to setup a new connection and process all RPCs necessary to send
media is on the order of normal jitter. Or am I misunderstanding?

For many services I don't really think that's the case.

And even with a very fast publish response we are looking at like 1.5
RTTs for TCP, another 1.5 for TLS, another 1.5 for RTMP handshake,
another 1 RTT for RTMP connect, an RTT on createStream, and an RTT on
publish. That's like 7.5 RTTs (or 300 ms at 40ms RTT) where we are
leaving the media flow on pause while we are re-building the connection.

This also seems to conflate rebootstrapping a media decode session vs
re-bootstrapping an RMTP session. The cost of doing this seems to be
sending your biggest frame after a pause to resolve a bunch of
synchronous RPCs on a relatively fresh TCP connection.

> ---
>  libavformat/rtmppkt.c   |  19 +++
>  libavformat/rtmppkt.h   |  10 ++
>  libavformat/rtmpproto.c | 356 +++++++++++++++++++++++++++++++++++++---
>  3 files changed, 359 insertions(+), 26 deletions(-)
>
>[...]
> diff --git a/libavformat/rtmppkt.h b/libavformat/rtmppkt.h
> index a15d2a5773..cdb901df89 100644
> --- a/libavformat/rtmppkt.h
> +++ b/libavformat/rtmppkt.h
> @@ -59,6 +59,7 @@ typedef enum RTMPPacketType {
>      RTMP_PT_SHARED_OBJ,         ///< shared object
>      RTMP_PT_INVOKE,             ///< invoke some stream action
>      RTMP_PT_METADATA     = 22,  ///< FLV metadata
> +    RTMP_PT_GO_AWAY      = 32,  ///< Indicates please reconnect ASAP, server is about to go down
>  } RTMPPacketType;
>

I'm curious as to why this is a new top level message rather than just another
type 20 command message. Message types have a small address space while
commands have a large address space and a well chosen command name is unlikely
to conflict with (and therefore can be used in concert with) any other protocol
extensions.

> [snip]


On Sun, Sep 26, 2021 at 1:52 PM Jordi Cenzano <jordi.cenzano@gmail.com> wrote:
>
> Nowadays when you are streaming to a live platform if the RTMP(s)
> server needs to restarted for any reason (ex: deploy new version)
> the RTMP connection is interrupted (probably after some draining time).
> Facebook will publish a proposal to avoid that by sending a
> GoAway message in the RTMP protocol.
> This code is the reference client implementation of that proposal.
> AFAIK other big live platforms showed their interest in implementing
> this mechanism.
> This can be already tested against Facebook live production using
> the querystring parameter ?ccr_sec=120 (that indicates the backend
> to send a disconnect signal after those seconds)
> ---
>  libavformat/rtmppkt.c   |  19 +++
>  libavformat/rtmppkt.h   |  10 ++
>  libavformat/rtmpproto.c | 356 +++++++++++++++++++++++++++++++++++++---
>  3 files changed, 359 insertions(+), 26 deletions(-)
>
> diff --git a/libavformat/rtmppkt.c b/libavformat/rtmppkt.c
> index 4b97c0833f..84ec72740d 100644
> --- a/libavformat/rtmppkt.c
> +++ b/libavformat/rtmppkt.c
> @@ -405,6 +405,25 @@ int ff_rtmp_packet_write(URLContext *h, RTMPPacket *pkt,
>      return written;
>  }
>
> +int ff_rtmp_packet_clone(RTMPPacket *pkt_dst, const RTMPPacket *pkt_src)
> +{
> +    if (pkt_src->size) {
> +        pkt_dst->data = av_realloc(NULL, pkt_src->size);
> +        if (!pkt_dst->data)
> +            return AVERROR(ENOMEM);
> +        else
> +            memcpy(pkt_dst->data, pkt_src->data, pkt_src->size);
> +    }
> +    pkt_dst->size       = pkt_src->size;
> +    pkt_dst->channel_id = pkt_src->channel_id;
> +    pkt_dst->type       = pkt_src->type;
> +    pkt_dst->timestamp  = pkt_src->timestamp;
> +    pkt_dst->extra      = pkt_src->extra;
> +    pkt_dst->ts_field   = pkt_src->ts_field;
> +
> +    return 0;
> +}
> +
>  int ff_rtmp_packet_create(RTMPPacket *pkt, int channel_id, RTMPPacketType type,
>                            int timestamp, int size)
>  {
> diff --git a/libavformat/rtmppkt.h b/libavformat/rtmppkt.h
> index a15d2a5773..cdb901df89 100644
> --- a/libavformat/rtmppkt.h
> +++ b/libavformat/rtmppkt.h
> @@ -59,6 +59,7 @@ typedef enum RTMPPacketType {
>      RTMP_PT_SHARED_OBJ,         ///< shared object
>      RTMP_PT_INVOKE,             ///< invoke some stream action
>      RTMP_PT_METADATA     = 22,  ///< FLV metadata
> +    RTMP_PT_GO_AWAY      = 32,  ///< Indicates please reconnect ASAP, server is about to go down
>  } RTMPPacketType;
>
>  /**
> @@ -99,6 +100,15 @@ typedef struct RTMPPacket {
>  int ff_rtmp_packet_create(RTMPPacket *pkt, int channel_id, RTMPPacketType type,
>                            int timestamp, int size);
>
> +/**
> + * Clone RTMP packet
> + *
> + * @param pkt_dst packet destination
> + * @param pkt_src packet source
> + * @return zero on success, negative value otherwise
> + */
> +int ff_rtmp_packet_clone(RTMPPacket *pkt_dst, const RTMPPacket *pkt_src);
> +
>  /**
>   * Free RTMP packet.
>   *
> diff --git a/libavformat/rtmpproto.c b/libavformat/rtmpproto.c
> index b14d23b919..ea37b9880a 100644
> --- a/libavformat/rtmpproto.c
> +++ b/libavformat/rtmpproto.c
> @@ -124,11 +124,21 @@ typedef struct RTMPContext {
>      int           nb_streamid;                ///< The next stream id to return on createStream calls
>      double        duration;                   ///< Duration of the stream in seconds as returned by the server (only valid if non-zero)
>      int           tcp_nodelay;                ///< Use TCP_NODELAY to disable Nagle's algorithm if set to 1
> +    int           reconnect_interval;         ///< Forces a reconnected every Xs (in media time)
>      char          username[50];
>      char          password[50];
>      char          auth_params[500];
>      int           do_reconnect;
> +    uint32_t      last_reconnect_timestamp;
>      int           auth_tried;
> +    int           force_reconnection_now;
> +    int           go_away_received;
> +    AVDictionary* original_opts;
> +    char          original_uri[TCURL_MAX_LENGTH];
> +    int           original_flags;
> +    RTMPPacket    last_avc_seq_header_pkt;    ///< rtmp packet, used to save last AVC video header, used on reconnection
> +    RTMPPacket    last_aac_seq_header_pkt;    ///< rtmp packet, used to save last AAC audio header, used on reconnection
> +    RTMPPacket    last_metadata_pkt;        ///< rtmp packet, used to save last onMetadata info, used on reconnection
>  } RTMPContext;
>
>  #define PLAYER_KEY_OPEN_PART_LEN 30   ///< length of partial key used for first client digest signing
> @@ -224,7 +234,7 @@ static void free_tracked_methods(RTMPContext *rt)
>      rt->nb_tracked_methods   = 0;
>  }
>
> -static int rtmp_send_packet(RTMPContext *rt, RTMPPacket *pkt, int track)
> +static int rtmp_send_packet(RTMPContext *rt, RTMPPacket *pkt, int track, int destroy)
>  {
>      int ret;
>
> @@ -248,7 +258,9 @@ static int rtmp_send_packet(RTMPContext *rt, RTMPPacket *pkt, int track)
>      ret = ff_rtmp_packet_write(rt->stream, pkt, rt->out_chunk_size,
>                                 &rt->prev_pkt[1], &rt->nb_prev_pkt[1]);
>  fail:
> -    ff_rtmp_packet_destroy(pkt);
> +    if (destroy)
> +        ff_rtmp_packet_destroy(pkt);
> +
>      return ret;
>  }
>
> @@ -336,6 +348,9 @@ static int gen_connect(URLContext *s, RTMPContext *rt)
>      if (!rt->is_input) {
>          ff_amf_write_field_name(&p, "type");
>          ff_amf_write_string(&p, "nonprivate");
> +        // Indicates accepts goaway
> +        ff_amf_write_field_name(&p, "supportsGoAway");
> +        ff_amf_write_bool(&p, 1);
>      }
>      ff_amf_write_field_name(&p, "flashVer");
>      ff_amf_write_string(&p, rt->flashver);
> @@ -400,7 +415,7 @@ static int gen_connect(URLContext *s, RTMPContext *rt)
>
>      pkt.size = p - pkt.data;
>
> -    return rtmp_send_packet(rt, &pkt, 1);
> +    return rtmp_send_packet(rt, &pkt, 1, 1);
>  }
>
>
> @@ -611,7 +626,7 @@ static int gen_release_stream(URLContext *s, RTMPContext *rt)
>      ff_amf_write_null(&p);
>      ff_amf_write_string(&p, rt->playpath);
>
> -    return rtmp_send_packet(rt, &pkt, 1);
> +    return rtmp_send_packet(rt, &pkt, 1, 1);
>  }
>
>  /**
> @@ -635,7 +650,7 @@ static int gen_fcpublish_stream(URLContext *s, RTMPContext *rt)
>      ff_amf_write_null(&p);
>      ff_amf_write_string(&p, rt->playpath);
>
> -    return rtmp_send_packet(rt, &pkt, 1);
> +    return rtmp_send_packet(rt, &pkt, 1, 1);
>  }
>
>  /**
> @@ -659,7 +674,7 @@ static int gen_fcunpublish_stream(URLContext *s, RTMPContext *rt)
>      ff_amf_write_null(&p);
>      ff_amf_write_string(&p, rt->playpath);
>
> -    return rtmp_send_packet(rt, &pkt, 0);
> +    return rtmp_send_packet(rt, &pkt, 0, 1);
>  }
>
>  /**
> @@ -683,7 +698,7 @@ static int gen_create_stream(URLContext *s, RTMPContext *rt)
>      ff_amf_write_number(&p, ++rt->nb_invokes);
>      ff_amf_write_null(&p);
>
> -    return rtmp_send_packet(rt, &pkt, 1);
> +    return rtmp_send_packet(rt, &pkt, 1, 1);
>  }
>
>
> @@ -709,7 +724,7 @@ static int gen_delete_stream(URLContext *s, RTMPContext *rt)
>      ff_amf_write_null(&p);
>      ff_amf_write_number(&p, rt->stream_id);
>
> -    return rtmp_send_packet(rt, &pkt, 0);
> +    return rtmp_send_packet(rt, &pkt, 0, 1);
>  }
>
>  /**
> @@ -733,7 +748,7 @@ static int gen_get_stream_length(URLContext *s, RTMPContext *rt)
>      ff_amf_write_null(&p);
>      ff_amf_write_string(&p, rt->playpath);
>
> -    return rtmp_send_packet(rt, &pkt, 1);
> +    return rtmp_send_packet(rt, &pkt, 1, 1);
>  }
>
>  /**
> @@ -754,7 +769,7 @@ static int gen_buffer_time(URLContext *s, RTMPContext *rt)
>      bytestream_put_be32(&p, rt->stream_id);
>      bytestream_put_be32(&p, rt->client_buffer_time);
>
> -    return rtmp_send_packet(rt, &pkt, 0);
> +    return rtmp_send_packet(rt, &pkt, 0, 1);
>  }
>
>  /**
> @@ -782,7 +797,7 @@ static int gen_play(URLContext *s, RTMPContext *rt)
>      ff_amf_write_string(&p, rt->playpath);
>      ff_amf_write_number(&p, rt->live * 1000);
>
> -    return rtmp_send_packet(rt, &pkt, 1);
> +    return rtmp_send_packet(rt, &pkt, 1, 1);
>  }
>
>  static int gen_seek(URLContext *s, RTMPContext *rt, int64_t timestamp)
> @@ -805,7 +820,7 @@ static int gen_seek(URLContext *s, RTMPContext *rt, int64_t timestamp)
>      ff_amf_write_null(&p); //as usual, the first null param
>      ff_amf_write_number(&p, timestamp); //where we want to jump
>
> -    return rtmp_send_packet(rt, &pkt, 1);
> +    return rtmp_send_packet(rt, &pkt, 1, 1);
>  }
>
>  /**
> @@ -832,7 +847,7 @@ static int gen_pause(URLContext *s, RTMPContext *rt, int pause, uint32_t timesta
>      ff_amf_write_bool(&p, pause); // pause or unpause
>      ff_amf_write_number(&p, timestamp); //where we pause the stream
>
> -    return rtmp_send_packet(rt, &pkt, 1);
> +    return rtmp_send_packet(rt, &pkt, 1, 1);
>  }
>
>  /**
> @@ -859,7 +874,7 @@ static int gen_publish(URLContext *s, RTMPContext *rt)
>      ff_amf_write_string(&p, rt->playpath);
>      ff_amf_write_string(&p, "live");
>
> -    return rtmp_send_packet(rt, &pkt, 1);
> +    return rtmp_send_packet(rt, &pkt, 1, 1);
>  }
>
>  /**
> @@ -885,7 +900,7 @@ static int gen_pong(URLContext *s, RTMPContext *rt, RTMPPacket *ppkt)
>      bytestream_put_be16(&p, 7); // PingResponse
>      bytestream_put_be32(&p, AV_RB32(ppkt->data+2));
>
> -    return rtmp_send_packet(rt, &pkt, 0);
> +    return rtmp_send_packet(rt, &pkt, 0, 1);
>  }
>
>  /**
> @@ -906,7 +921,7 @@ static int gen_swf_verification(URLContext *s, RTMPContext *rt)
>      bytestream_put_be16(&p, 27);
>      memcpy(p, rt->swfverification, 42);
>
> -    return rtmp_send_packet(rt, &pkt, 0);
> +    return rtmp_send_packet(rt, &pkt, 0, 1);
>  }
>
>  /**
> @@ -925,7 +940,7 @@ static int gen_window_ack_size(URLContext *s, RTMPContext *rt)
>      p = pkt.data;
>      bytestream_put_be32(&p, rt->max_sent_unacked);
>
> -    return rtmp_send_packet(rt, &pkt, 0);
> +    return rtmp_send_packet(rt, &pkt, 0, 1);
>  }
>
>  /**
> @@ -946,7 +961,7 @@ static int gen_check_bw(URLContext *s, RTMPContext *rt)
>      ff_amf_write_number(&p, ++rt->nb_invokes);
>      ff_amf_write_null(&p);
>
> -    return rtmp_send_packet(rt, &pkt, 1);
> +    return rtmp_send_packet(rt, &pkt, 1, 1);
>  }
>
>  /**
> @@ -965,7 +980,7 @@ static int gen_bytes_read(URLContext *s, RTMPContext *rt, uint32_t ts)
>      p = pkt.data;
>      bytestream_put_be32(&p, rt->bytes_read);
>
> -    return rtmp_send_packet(rt, &pkt, 0);
> +    return rtmp_send_packet(rt, &pkt, 0, 1);
>  }
>
>  static int gen_fcsubscribe_stream(URLContext *s, RTMPContext *rt,
> @@ -985,7 +1000,7 @@ static int gen_fcsubscribe_stream(URLContext *s, RTMPContext *rt,
>      ff_amf_write_null(&p);
>      ff_amf_write_string(&p, subscribe);
>
> -    return rtmp_send_packet(rt, &pkt, 1);
> +    return rtmp_send_packet(rt, &pkt, 1, 1);
>  }
>
>  /**
> @@ -2153,6 +2168,16 @@ static int handle_invoke_status(URLContext *s, RTMPPacket *pkt)
>      return 0;
>  }
>
> +static int handle_go_away(URLContext *s, RTMPPacket *pkt) {
> +    RTMPContext *rt = s->priv_data;
> +
> +    av_log(s, AV_LOG_TRACE, "go away signal received");
> +
> +    rt->go_away_received = 1;
> +
> +    return 0;
> +}
> +
>  static int handle_invoke(URLContext *s, RTMPPacket *pkt)
>  {
>      RTMPContext *rt = s->priv_data;
> @@ -2331,6 +2356,10 @@ static int rtmp_parse_result(URLContext *s, RTMPContext *rt, RTMPPacket *pkt)
>          if ((ret = handle_invoke(s, pkt)) < 0)
>              return ret;
>          break;
> +    case RTMP_PT_GO_AWAY:
> +        if ((ret = handle_go_away(s, pkt)) < 0)
> +            return ret;
> +        break;
>      case RTMP_PT_VIDEO:
>      case RTMP_PT_AUDIO:
>      case RTMP_PT_METADATA:
> @@ -2513,6 +2542,15 @@ static int rtmp_close(URLContext *h)
>      free_tracked_methods(rt);
>      av_freep(&rt->flv_data);
>      ffurl_closep(&rt->stream);
> +    if (rt->last_avc_seq_header_pkt.size)
> +        ff_rtmp_packet_destroy(&rt->last_avc_seq_header_pkt);
> +
> +    if (rt->last_aac_seq_header_pkt.size)
> +        ff_rtmp_packet_destroy(&rt->last_aac_seq_header_pkt);
> +
> +    if (rt->last_metadata_pkt.size)
> +        ff_rtmp_packet_destroy(&rt->last_metadata_pkt);
> +
>      return ret;
>  }
>
> @@ -2871,14 +2909,23 @@ reconnect:
>                  goto fail;
>          }
>      } else {
> -        rt->flv_size = 0;
> -        rt->flv_data = NULL;
> -        rt->flv_off  = 0;
> -        rt->skip_bytes = 13;
> +        // Do not clean buffers if it is a forced reconnection
> +        if (rt->force_reconnection_now <= 0) {
> +            rt->flv_size = 0;
> +            rt->flv_data = NULL;
> +            rt->flv_off  = 0;
> +            rt->skip_bytes = 13;
> +        }
>      }
>
>      s->max_packet_size = rt->stream->max_packet_size;
>      s->is_streamed     = 1;
> +
> +    // Copy original params
> +    av_dict_copy(&rt->original_opts, *opts, 0);
> +    rt->original_flags = flags;
> +    av_strlcpy(rt->original_uri, uri, TCURL_MAX_LENGTH);
> +
>      return 0;
>
>  fail:
> @@ -2951,6 +2998,107 @@ static int rtmp_pause(URLContext *s, int pause)
>      return 0;
>  }
>
> +/**
> + * Reconnect RTMP connection.
> +*/
> +static int rtmp_reconnect(URLContext *s) {
> +    RTMPContext *rt = s->priv_data;
> +    int i;
> +
> +    // Close current RTMP connection
> +    av_log(s, AV_LOG_INFO, "reconnecting!\n");
> +
> +    ffurl_closep(&rt->stream);
> +    rt->do_reconnect = 0;
> +    rt->nb_invokes   = 0;
> +    for (i = 0; i < 2; i++)
> +        memset(rt->prev_pkt[i], 0, sizeof(**rt->prev_pkt) * rt->nb_prev_pkt[i]);
> +
> +    free_tracked_methods(rt);
> +
> +    // Connect RTMP again using orignal values
> +    return rtmp_open(s, rt->original_uri, rt->original_flags, &rt->original_opts);
> +}
> +
> +/**
> + * Checks RTMP packet and return 1 when it contains an AAC header
> +*/
> +static int rtmp_packet_is_aac_audio_header(RTMPPacket *pkt) {
> +    uint8_t sound_format;
> +    uint8_t aac_packet_type;
> +
> +    if ((!pkt) || (pkt->size < 2) || pkt->type != RTMP_PT_AUDIO)
> +        return 0;
> +
> +    sound_format = (pkt->data[0] & 0xF0) >> 4;
> +    aac_packet_type = pkt->data[1];
> +    // Check codec == AVC and avc contains seq header
> +    if (sound_format == 10 && aac_packet_type == 0)
> +        return 1;
> +
> +    return 0;
> +}
> +
> +/**
> + * Checks RTMP packet and return 1 when it contains an AVC header
> +*/
> +static int rtmp_packet_is_avc_video_header(RTMPPacket *pkt) {
> +    uint8_t codec_id;
> +    uint8_t avc_packet_type;
> +
> +    if ((!pkt) || (pkt->size < 2) || pkt->type != RTMP_PT_VIDEO)
> +        return 0;
> +
> +    codec_id = pkt->data[0] & 0xF;
> +    avc_packet_type = pkt->data[1];
> +    // Check codec == AVC and avc contains seq header
> +    if (codec_id == 7 && avc_packet_type == 0)
> +        return 1;
> +
> +    return 0;
> +}
> +
> +/**
> + * Checks RTMP packet and return 1 when it contains video IDR point
> +*/
> +static int rtmp_packet_is_video_avc_IDR(RTMPPacket *pkt) {
> +    uint8_t frame_type;
> +    uint8_t codec_id;
> +
> +    if ((!pkt) || (pkt->size < 1) || pkt->type != RTMP_PT_VIDEO)
> +        return 0;
> +
> +    frame_type = (pkt->data[0] & 0xF0) >> 4;
> +    codec_id = pkt->data[0] & 0xF;
> +    // Check codec == AVC and videoFrame == Keyframe / seekable (assuming that means IDR)
> +    if (codec_id == 7 && frame_type == 1)
> +        return 1;
> +
> +    return 0;
> +}
> +
> +/**
> + * Checks RTMP packet and return 1 when it contains onMetadata info
> +*/
> +static int rtmp_packet_is_onMetadata_packet(RTMPPacket *pkt) {
> +    uint8_t commandbuffer[64];
> +    int stringlen;
> +    GetByteContext gbc;
> +
> +    if ((!pkt) || (pkt->size < 10) || pkt->type != RTMP_PT_NOTIFY)
> +        return 0;
> +
> +    bytestream2_init(&gbc, pkt->data, pkt->size);
> +    if (ff_amf_read_string(&gbc, commandbuffer, sizeof(commandbuffer),&stringlen))
> +        return 0;
> +
> +    // onMetadata is prepended by "@setDataFrame"
> +    if (!strcmp(commandbuffer, "@setDataFrame"))
> +        return 1;
> +
> +    return 0;
> +}
> +
>  static int rtmp_write(URLContext *s, const uint8_t *buf, int size)
>  {
>      RTMPContext *rt = s->priv_data;
> @@ -2960,6 +3108,8 @@ static int rtmp_write(URLContext *s, const uint8_t *buf, int size)
>      const uint8_t *buf_temp = buf;
>      uint8_t c;
>      int ret;
> +    int execute_reconnection = 0;
> +    int is_idr = 0;
>
>      do {
>          if (rt->skip_bytes) {
> @@ -2988,8 +3138,13 @@ static int rtmp_write(URLContext *s, const uint8_t *buf, int size)
>              bytestream_get_be24(&header);
>              rt->flv_size = pktsize;
>
> -            if (pkttype == RTMP_PT_VIDEO)
> +            if (pkttype == RTMP_PT_VIDEO) {
>                  channel = RTMP_VIDEO_CHANNEL;
> +                rt->has_video = 1;
> +            }
> +            if (pkttype == RTMP_PT_AUDIO) {
> +                rt->has_audio = 1;
> +            }
>
>              if (((pkttype == RTMP_PT_VIDEO || pkttype == RTMP_PT_AUDIO) && ts == 0) ||
>                  pkttype == RTMP_PT_NOTIFY) {
> @@ -3047,7 +3202,155 @@ static int rtmp_write(URLContext *s, const uint8_t *buf, int size)
>                  }
>              }
>
> -            if ((ret = rtmp_send_packet(rt, &rt->out_pkt, 0)) < 0)
> +            // Check if a reconnection is required
> +            // Per interval
> +            if ((rt->reconnect_interval > 0) &&
> +                (rt->out_pkt.timestamp >= (rt->last_reconnect_timestamp + rt->reconnect_interval * 1000))) {
> +                rt->last_reconnect_timestamp = rt->out_pkt.timestamp;
> +                rt->force_reconnection_now = 1;
> +                av_log(s, AV_LOG_TRACE,
> +                       "trigered internal interval reconnection\n");
> +            }
> +            // Per go away signal
> +            if (rt->go_away_received > 0) {
> +                rt->go_away_received = 0;
> +                rt->force_reconnection_now = 1;
> +                av_log(s, AV_LOG_TRACE,
> +                       "detected go away signal from the peer\n");
> +            }
> +
> +            if (rtmp_packet_is_avc_video_header(&rt->out_pkt)) {
> +                // Save last video header
> +                if (rt->last_avc_seq_header_pkt.size) {
> +                    av_log(s, AV_LOG_DEBUG,
> +                           "freeing last video header packet saved\n");
> +                    ff_rtmp_packet_destroy(&rt->last_avc_seq_header_pkt);
> +                }
> +                // Save AVC seq header packet
> +                if ((ret = ff_rtmp_packet_clone(&rt->last_avc_seq_header_pkt, &rt->out_pkt)) < 0) {
> +                    return ret;
> +                }
> +                av_log(s, AV_LOG_DEBUG, "saved video header packet\n");
> +            } else if (rtmp_packet_is_aac_audio_header(&rt->out_pkt)) {
> +                // Save last audio header
> +                if (rt->last_aac_seq_header_pkt.size) {
> +                    av_log(s, AV_LOG_DEBUG,
> +                           "freeing last audio header packet saved\n");
> +                    ff_rtmp_packet_destroy(&rt->last_aac_seq_header_pkt);
> +                }
> +                // Save AAC seq header packet
> +                if ((ret = ff_rtmp_packet_clone(&rt->last_aac_seq_header_pkt, &rt->out_pkt)) < 0) {
> +                    return ret;
> +                }
> +                av_log(s, AV_LOG_DEBUG, "saved audio header packet\n");
> +            } else if (rtmp_packet_is_onMetadata_packet(&rt->out_pkt)) {
> +                // Save last onMetadata packet
> +                if (rt->last_metadata_pkt.size) {
> +                    av_log(s, AV_LOG_DEBUG,
> +                           "freeing last onMetadata packet saved\n");
> +                    ff_rtmp_packet_destroy(&rt->last_metadata_pkt);
> +                }
> +                // Save onMetadata packet
> +                if ((ret = ff_rtmp_packet_clone(&rt->last_metadata_pkt, &rt->out_pkt)) < 0) {
> +                    return ret;
> +                }
> +                av_log(s, AV_LOG_DEBUG, "saved onMetadata packet\n");
> +            }
> +
> +            // Reconnection has been requested
> +            if (rt->force_reconnection_now >= 1) {
> +                // Check if packet is video IDR
> +                is_idr = rtmp_packet_is_video_avc_IDR(&rt->out_pkt);
> +                av_log(s, AV_LOG_DEBUG,
> +                       "looking for the right disconnect point. Is IDR: %d, "
> +                       "has_video: %d, has_audio: %d, state: %d, "
> +                       "last_avc_seq_header_pkt.size: %d, "
> +                       "last_aac_seq_header_pkt.size: %d\n",
> +                       is_idr, rt->has_video, rt->has_audio, rt->state,
> +                       rt->last_avc_seq_header_pkt.size,
> +                       rt->last_aac_seq_header_pkt.size);
> +
> +                if (rt->has_video && rt->has_audio &&
> +                    (rt->state == STATE_PUBLISHING)) {
> +                    // If we only video let's do the reconnection in an IDR
> +                    // frame when we have both headers saved
> +                    if (is_idr && rt->last_avc_seq_header_pkt.size &&
> +                        rt->last_aac_seq_header_pkt.size)
> +                        execute_reconnection = 1;
> +                } else if (rt->has_video && !rt->has_audio &&
> +                           (rt->state == STATE_PUBLISHING)) {
> +                    // If we have video and NO audio let's do the reconnection
> +                    // in an IDR frame when we have video header saved
> +                    if (is_idr && rt->last_avc_seq_header_pkt.size)
> +                        execute_reconnection = 1;
> +                } else if (!rt->has_video &&
> +                           rt->has_audio & (rt->state == STATE_PUBLISHING)) {
> +                    // If we have only audio let's do the reconnection when we
> +                    // have the audio header saved
> +                    if (rt->last_aac_seq_header_pkt.size)
> +                        execute_reconnection = 1;
> +                } else {
> +                    av_log(s, AV_LOG_DEBUG,
> +                           "reconnection is requested but can NOT be executed "
> +                           "now, waiting! rt->state: %d, has_video: %d, "
> +                           "has_audio: %d, is_idr: %d\n",
> +                           rt->state, rt->has_video, rt->has_audio, is_idr);
> +                }
> +            }
> +
> +            if (execute_reconnection) {
> +                execute_reconnection = 0;
> +
> +                av_log(s, AV_LOG_DEBUG,
> +                       "executing reconnection. rt->flv_off: %d, rt->flv_size: "
> +                       "%d\n",
> +                       rt->flv_off, rt->flv_size);
> +
> +                if ((ret = rtmp_reconnect(s)) < 0)
> +                    return ret;
> +
> +                // Reconnect executed, clear the flag
> +                rt->force_reconnection_now = 0;
> +
> +                av_log(s, AV_LOG_DEBUG,
> +                       "reconnected. rt->flv_off: %d, rt->flv_size: %d\n",
> +                       rt->flv_off, rt->flv_size);
> +
> +                // Send last video header if it is saved
> +                if (rt->last_avc_seq_header_pkt.size) {
> +                    av_log(s, AV_LOG_DEBUG,
> +                           "sending last saved video header\n");
> +                    rt->last_avc_seq_header_pkt.timestamp =
> +                        rt->out_pkt.timestamp;
> +                    if ((ret = rtmp_send_packet(
> +                             rt, &rt->last_avc_seq_header_pkt, 0, 0)) < 0)
> +                        return ret;
> +                }
> +
> +                // Send last audio header if it is saved
> +                if (rt->last_aac_seq_header_pkt.size) {
> +                    av_log(s, AV_LOG_DEBUG,
> +                           "sending last saved audio header\n");
> +                    rt->last_aac_seq_header_pkt.timestamp =
> +                        rt->out_pkt.timestamp;
> +                    if ((ret = rtmp_send_packet(
> +                             rt, &rt->last_aac_seq_header_pkt, 0, 0)) < 0)
> +                        return ret;
> +                }
> +
> +                // Send last onMetadata packet, optional
> +                if (rt->last_metadata_pkt.size) {
> +                    av_log(s, AV_LOG_DEBUG,
> +                           "sending last saved onMetadata header\n");
> +                    rt->last_metadata_pkt.timestamp = rt->out_pkt.timestamp;
> +                    if ((ret = rtmp_send_packet(rt, &rt->last_metadata_pkt, 0,
> +                                                0)) < 0)
> +                        return ret;
> +                }
> +            }
> +
> +            // Send actual packet
> +            if ((ret = rtmp_send_packet(rt, &rt->out_pkt, 0, 1)) < 0)
>                  return ret;
>              rt->flv_size = 0;
>              rt->flv_off = 0;
> @@ -3118,6 +3421,7 @@ static const AVOption rtmp_options[] = {
>      {"listen",      "Listen for incoming rtmp connections", OFFSET(listen), AV_OPT_TYPE_INT, {.i64 = 0}, INT_MIN, INT_MAX, DEC, "rtmp_listen" },
>      {"tcp_nodelay", "Use TCP_NODELAY to disable Nagle's algorithm", OFFSET(tcp_nodelay), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1, DEC|ENC},
>      {"timeout", "Maximum timeout (in seconds) to wait for incoming connections. -1 is infinite. Implies -rtmp_listen 1",  OFFSET(listen_timeout), AV_OPT_TYPE_INT, {.i64 = -1}, INT_MIN, INT_MAX, DEC, "rtmp_listen" },
> +    {"rtmp_reconnect_time", "Interval (in seconds) to force a client reconnection, it is based on media time. By default is 0 (no reconnection)", OFFSET(reconnect_interval), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, ENC },
>      { NULL },
>  };
>
> --
> 2.32.0
>
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel@ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
Jordi Cenzano Dec. 6, 2021, 1:30 a.m. UTC | #2
>
> > Nowadays when you are streaming to a live platform if the RTMP(s)
> > server needs to restarted for any reason (ex: deploy new version)
> > the RTMP connection is interrupted (probably after some draining time).
> > Facebook will publish a proposal to avoid that by sending a
> > GoAway message in the RTMP protocol.
> > This code is the reference client implementation of that proposal.
> > AFAIK other big live platforms showed their interest in implementing
> > this mechanism.
> > This can be already tested against Facebook live production using
> > the querystring parameter ?ccr_sec=120 (that indicates the backend
> > to send a disconnect signal after those seconds)
>
> It seems like this approach is operating from the assumption that the
> time to setup a new connection and process all RPCs necessary to send
> media is on the order of normal jitter. Or am I misunderstanding?
>

You are right!

>
> For many services I don't really think that's the case.
>

Fair enough

>
> And even with a very fast publish response we are looking at like 1.5
> RTTs for TCP, another 1.5 for TLS, another 1.5 for RTMP handshake,
> another 1 RTT for RTMP connect, an RTT on createStream, and an RTT on
> publish. That's like 7.5 RTTs (or 300 ms at 40ms RTT) where we are
> leaving the media flow on pause while we are re-building the connection.
>

Yes, and we are aware that is NOT ideal. But we thought it would be better
to add ~400ms of latency (that in most cases the disruption will be hidden
by the player buffer) rather than just drop frames until the next IDR, that
will produce (usually) a much longer disruption since GOPs are usually 2s+
long

>
> This also seems to conflate rebootstrapping a media decode session vs
> re-bootstrapping an RMTP session. The cost of doing this seems to be
> sending your biggest frame after a pause to resolve a bunch of
> synchronous RPCs on a relatively fresh TCP connection.
>

YES, and as I mention we know that is NOT ideal at all.

BTW here we are talking about LIVE streams.

>
> > ---
> >  libavformat/rtmppkt.c   |  19 +++
> >  libavformat/rtmppkt.h   |  10 ++
> >  libavformat/rtmpproto.c | 356 +++++++++++++++++++++++++++++++++++++---
> >  3 files changed, 359 insertions(+), 26 deletions(-)
> >
> >[...]
> > diff --git a/libavformat/rtmppkt.h b/libavformat/rtmppkt.h
> > index a15d2a5773..cdb901df89 100644
> > --- a/libavformat/rtmppkt.h
> > +++ b/libavformat/rtmppkt.h
> > @@ -59,6 +59,7 @@ typedef enum RTMPPacketType {
> >      RTMP_PT_SHARED_OBJ,         ///< shared object
> >      RTMP_PT_INVOKE,             ///< invoke some stream action
> >      RTMP_PT_METADATA     = 22,  ///< FLV metadata
> > +    RTMP_PT_GO_AWAY      = 32,  ///< Indicates please reconnect ASAP,
> server is about to go down
> >  } RTMPPacketType;
> >
>
> I'm curious as to why this is a new top level message rather than just
> another
> type 20 command message. Message types have a small address space while
> commands have a large address space and a well chosen command name is
> unlikely
> to conflict with (and therefore can be used in concert with) any other
> protocol
> extensions.
>

We thought about it, and it seemed more logical to add it as a packet type
since it was affecting the RTMP connections itself. If you read the
definition of Command message, that seems more oriented to send data /
actions over the underlying connection.

Thanks a lot for your comments Alex, happy to hear ideas to improve this
proposal. And sorry for my late reply

>
> > [snip]
>
>
> On Sun, Sep 26, 2021 at 1:52 PM Jordi Cenzano <jordi.cenzano@gmail.com>
> wrote:
> >
> > Nowadays when you are streaming to a live platform if the RTMP(s)
> > server needs to restarted for any reason (ex: deploy new version)
> > the RTMP connection is interrupted (probably after some draining time).
> > Facebook will publish a proposal to avoid that by sending a
> > GoAway message in the RTMP protocol.
> > This code is the reference client implementation of that proposal.
> > AFAIK other big live platforms showed their interest in implementing
> > this mechanism.
> > This can be already tested against Facebook live production using
> > the querystring parameter ?ccr_sec=120 (that indicates the backend
> > to send a disconnect signal after those seconds)
> > ---
> >  libavformat/rtmppkt.c   |  19 +++
> >  libavformat/rtmppkt.h   |  10 ++
> >  libavformat/rtmpproto.c | 356 +++++++++++++++++++++++++++++++++++++---
> >  3 files changed, 359 insertions(+), 26 deletions(-)
> >
> > diff --git a/libavformat/rtmppkt.c b/libavformat/rtmppkt.c
> > index 4b97c0833f..84ec72740d 100644
> > --- a/libavformat/rtmppkt.c
> > +++ b/libavformat/rtmppkt.c
> > @@ -405,6 +405,25 @@ int ff_rtmp_packet_write(URLContext *h, RTMPPacket
> *pkt,
> >      return written;
> >  }
> >
> > +int ff_rtmp_packet_clone(RTMPPacket *pkt_dst, const RTMPPacket *pkt_src)
> > +{
> > +    if (pkt_src->size) {
> > +        pkt_dst->data = av_realloc(NULL, pkt_src->size);
> > +        if (!pkt_dst->data)
> > +            return AVERROR(ENOMEM);
> > +        else
> > +            memcpy(pkt_dst->data, pkt_src->data, pkt_src->size);
> > +    }
> > +    pkt_dst->size       = pkt_src->size;
> > +    pkt_dst->channel_id = pkt_src->channel_id;
> > +    pkt_dst->type       = pkt_src->type;
> > +    pkt_dst->timestamp  = pkt_src->timestamp;
> > +    pkt_dst->extra      = pkt_src->extra;
> > +    pkt_dst->ts_field   = pkt_src->ts_field;
> > +
> > +    return 0;
> > +}
> > +
> >  int ff_rtmp_packet_create(RTMPPacket *pkt, int channel_id,
> RTMPPacketType type,
> >                            int timestamp, int size)
> >  {
> > diff --git a/libavformat/rtmppkt.h b/libavformat/rtmppkt.h
> > index a15d2a5773..cdb901df89 100644
> > --- a/libavformat/rtmppkt.h
> > +++ b/libavformat/rtmppkt.h
> > @@ -59,6 +59,7 @@ typedef enum RTMPPacketType {
> >      RTMP_PT_SHARED_OBJ,         ///< shared object
> >      RTMP_PT_INVOKE,             ///< invoke some stream action
> >      RTMP_PT_METADATA     = 22,  ///< FLV metadata
> > +    RTMP_PT_GO_AWAY      = 32,  ///< Indicates please reconnect ASAP,
> server is about to go down
> >  } RTMPPacketType;
> >
> >  /**
> > @@ -99,6 +100,15 @@ typedef struct RTMPPacket {
> >  int ff_rtmp_packet_create(RTMPPacket *pkt, int channel_id,
> RTMPPacketType type,
> >                            int timestamp, int size);
> >
> > +/**
> > + * Clone RTMP packet
> > + *
> > + * @param pkt_dst packet destination
> > + * @param pkt_src packet source
> > + * @return zero on success, negative value otherwise
> > + */
> > +int ff_rtmp_packet_clone(RTMPPacket *pkt_dst, const RTMPPacket
> *pkt_src);
> > +
> >  /**
> >   * Free RTMP packet.
> >   *
> > diff --git a/libavformat/rtmpproto.c b/libavformat/rtmpproto.c
> > index b14d23b919..ea37b9880a 100644
> > --- a/libavformat/rtmpproto.c
> > +++ b/libavformat/rtmpproto.c
> > @@ -124,11 +124,21 @@ typedef struct RTMPContext {
> >      int           nb_streamid;                ///< The next stream id
> to return on createStream calls
> >      double        duration;                   ///< Duration of the
> stream in seconds as returned by the server (only valid if non-zero)
> >      int           tcp_nodelay;                ///< Use TCP_NODELAY to
> disable Nagle's algorithm if set to 1
> > +    int           reconnect_interval;         ///< Forces a reconnected
> every Xs (in media time)
> >      char          username[50];
> >      char          password[50];
> >      char          auth_params[500];
> >      int           do_reconnect;
> > +    uint32_t      last_reconnect_timestamp;
> >      int           auth_tried;
> > +    int           force_reconnection_now;
> > +    int           go_away_received;
> > +    AVDictionary* original_opts;
> > +    char          original_uri[TCURL_MAX_LENGTH];
> > +    int           original_flags;
> > +    RTMPPacket    last_avc_seq_header_pkt;    ///< rtmp packet, used to
> save last AVC video header, used on reconnection
> > +    RTMPPacket    last_aac_seq_header_pkt;    ///< rtmp packet, used to
> save last AAC audio header, used on reconnection
> > +    RTMPPacket    last_metadata_pkt;        ///< rtmp packet, used to
> save last onMetadata info, used on reconnection
> >  } RTMPContext;
> >
> >  #define PLAYER_KEY_OPEN_PART_LEN 30   ///< length of partial key used
> for first client digest signing
> > @@ -224,7 +234,7 @@ static void free_tracked_methods(RTMPContext *rt)
> >      rt->nb_tracked_methods   = 0;
> >  }
> >
> > -static int rtmp_send_packet(RTMPContext *rt, RTMPPacket *pkt, int track)
> > +static int rtmp_send_packet(RTMPContext *rt, RTMPPacket *pkt, int
> track, int destroy)
> >  {
> >      int ret;
> >
> > @@ -248,7 +258,9 @@ static int rtmp_send_packet(RTMPContext *rt,
> RTMPPacket *pkt, int track)
> >      ret = ff_rtmp_packet_write(rt->stream, pkt, rt->out_chunk_size,
> >                                 &rt->prev_pkt[1], &rt->nb_prev_pkt[1]);
> >  fail:
> > -    ff_rtmp_packet_destroy(pkt);
> > +    if (destroy)
> > +        ff_rtmp_packet_destroy(pkt);
> > +
> >      return ret;
> >  }
> >
> > @@ -336,6 +348,9 @@ static int gen_connect(URLContext *s, RTMPContext
> *rt)
> >      if (!rt->is_input) {
> >          ff_amf_write_field_name(&p, "type");
> >          ff_amf_write_string(&p, "nonprivate");
> > +        // Indicates accepts goaway
> > +        ff_amf_write_field_name(&p, "supportsGoAway");
> > +        ff_amf_write_bool(&p, 1);
> >      }
> >      ff_amf_write_field_name(&p, "flashVer");
> >      ff_amf_write_string(&p, rt->flashver);
> > @@ -400,7 +415,7 @@ static int gen_connect(URLContext *s, RTMPContext
> *rt)
> >
> >      pkt.size = p - pkt.data;
> >
> > -    return rtmp_send_packet(rt, &pkt, 1);
> > +    return rtmp_send_packet(rt, &pkt, 1, 1);
> >  }
> >
> >
> > @@ -611,7 +626,7 @@ static int gen_release_stream(URLContext *s,
> RTMPContext *rt)
> >      ff_amf_write_null(&p);
> >      ff_amf_write_string(&p, rt->playpath);
> >
> > -    return rtmp_send_packet(rt, &pkt, 1);
> > +    return rtmp_send_packet(rt, &pkt, 1, 1);
> >  }
> >
> >  /**
> > @@ -635,7 +650,7 @@ static int gen_fcpublish_stream(URLContext *s,
> RTMPContext *rt)
> >      ff_amf_write_null(&p);
> >      ff_amf_write_string(&p, rt->playpath);
> >
> > -    return rtmp_send_packet(rt, &pkt, 1);
> > +    return rtmp_send_packet(rt, &pkt, 1, 1);
> >  }
> >
> >  /**
> > @@ -659,7 +674,7 @@ static int gen_fcunpublish_stream(URLContext *s,
> RTMPContext *rt)
> >      ff_amf_write_null(&p);
> >      ff_amf_write_string(&p, rt->playpath);
> >
> > -    return rtmp_send_packet(rt, &pkt, 0);
> > +    return rtmp_send_packet(rt, &pkt, 0, 1);
> >  }
> >
> >  /**
> > @@ -683,7 +698,7 @@ static int gen_create_stream(URLContext *s,
> RTMPContext *rt)
> >      ff_amf_write_number(&p, ++rt->nb_invokes);
> >      ff_amf_write_null(&p);
> >
> > -    return rtmp_send_packet(rt, &pkt, 1);
> > +    return rtmp_send_packet(rt, &pkt, 1, 1);
> >  }
> >
> >
> > @@ -709,7 +724,7 @@ static int gen_delete_stream(URLContext *s,
> RTMPContext *rt)
> >      ff_amf_write_null(&p);
> >      ff_amf_write_number(&p, rt->stream_id);
> >
> > -    return rtmp_send_packet(rt, &pkt, 0);
> > +    return rtmp_send_packet(rt, &pkt, 0, 1);
> >  }
> >
> >  /**
> > @@ -733,7 +748,7 @@ static int gen_get_stream_length(URLContext *s,
> RTMPContext *rt)
> >      ff_amf_write_null(&p);
> >      ff_amf_write_string(&p, rt->playpath);
> >
> > -    return rtmp_send_packet(rt, &pkt, 1);
> > +    return rtmp_send_packet(rt, &pkt, 1, 1);
> >  }
> >
> >  /**
> > @@ -754,7 +769,7 @@ static int gen_buffer_time(URLContext *s,
> RTMPContext *rt)
> >      bytestream_put_be32(&p, rt->stream_id);
> >      bytestream_put_be32(&p, rt->client_buffer_time);
> >
> > -    return rtmp_send_packet(rt, &pkt, 0);
> > +    return rtmp_send_packet(rt, &pkt, 0, 1);
> >  }
> >
> >  /**
> > @@ -782,7 +797,7 @@ static int gen_play(URLContext *s, RTMPContext *rt)
> >      ff_amf_write_string(&p, rt->playpath);
> >      ff_amf_write_number(&p, rt->live * 1000);
> >
> > -    return rtmp_send_packet(rt, &pkt, 1);
> > +    return rtmp_send_packet(rt, &pkt, 1, 1);
> >  }
> >
> >  static int gen_seek(URLContext *s, RTMPContext *rt, int64_t timestamp)
> > @@ -805,7 +820,7 @@ static int gen_seek(URLContext *s, RTMPContext *rt,
> int64_t timestamp)
> >      ff_amf_write_null(&p); //as usual, the first null param
> >      ff_amf_write_number(&p, timestamp); //where we want to jump
> >
> > -    return rtmp_send_packet(rt, &pkt, 1);
> > +    return rtmp_send_packet(rt, &pkt, 1, 1);
> >  }
> >
> >  /**
> > @@ -832,7 +847,7 @@ static int gen_pause(URLContext *s, RTMPContext *rt,
> int pause, uint32_t timesta
> >      ff_amf_write_bool(&p, pause); // pause or unpause
> >      ff_amf_write_number(&p, timestamp); //where we pause the stream
> >
> > -    return rtmp_send_packet(rt, &pkt, 1);
> > +    return rtmp_send_packet(rt, &pkt, 1, 1);
> >  }
> >
> >  /**
> > @@ -859,7 +874,7 @@ static int gen_publish(URLContext *s, RTMPContext
> *rt)
> >      ff_amf_write_string(&p, rt->playpath);
> >      ff_amf_write_string(&p, "live");
> >
> > -    return rtmp_send_packet(rt, &pkt, 1);
> > +    return rtmp_send_packet(rt, &pkt, 1, 1);
> >  }
> >
> >  /**
> > @@ -885,7 +900,7 @@ static int gen_pong(URLContext *s, RTMPContext *rt,
> RTMPPacket *ppkt)
> >      bytestream_put_be16(&p, 7); // PingResponse
> >      bytestream_put_be32(&p, AV_RB32(ppkt->data+2));
> >
> > -    return rtmp_send_packet(rt, &pkt, 0);
> > +    return rtmp_send_packet(rt, &pkt, 0, 1);
> >  }
> >
> >  /**
> > @@ -906,7 +921,7 @@ static int gen_swf_verification(URLContext *s,
> RTMPContext *rt)
> >      bytestream_put_be16(&p, 27);
> >      memcpy(p, rt->swfverification, 42);
> >
> > -    return rtmp_send_packet(rt, &pkt, 0);
> > +    return rtmp_send_packet(rt, &pkt, 0, 1);
> >  }
> >
> >  /**
> > @@ -925,7 +940,7 @@ static int gen_window_ack_size(URLContext *s,
> RTMPContext *rt)
> >      p = pkt.data;
> >      bytestream_put_be32(&p, rt->max_sent_unacked);
> >
> > -    return rtmp_send_packet(rt, &pkt, 0);
> > +    return rtmp_send_packet(rt, &pkt, 0, 1);
> >  }
> >
> >  /**
> > @@ -946,7 +961,7 @@ static int gen_check_bw(URLContext *s, RTMPContext
> *rt)
> >      ff_amf_write_number(&p, ++rt->nb_invokes);
> >      ff_amf_write_null(&p);
> >
> > -    return rtmp_send_packet(rt, &pkt, 1);
> > +    return rtmp_send_packet(rt, &pkt, 1, 1);
> >  }
> >
> >  /**
> > @@ -965,7 +980,7 @@ static int gen_bytes_read(URLContext *s, RTMPContext
> *rt, uint32_t ts)
> >      p = pkt.data;
> >      bytestream_put_be32(&p, rt->bytes_read);
> >
> > -    return rtmp_send_packet(rt, &pkt, 0);
> > +    return rtmp_send_packet(rt, &pkt, 0, 1);
> >  }
> >
> >  static int gen_fcsubscribe_stream(URLContext *s, RTMPContext *rt,
> > @@ -985,7 +1000,7 @@ static int gen_fcsubscribe_stream(URLContext *s,
> RTMPContext *rt,
> >      ff_amf_write_null(&p);
> >      ff_amf_write_string(&p, subscribe);
> >
> > -    return rtmp_send_packet(rt, &pkt, 1);
> > +    return rtmp_send_packet(rt, &pkt, 1, 1);
> >  }
> >
> >  /**
> > @@ -2153,6 +2168,16 @@ static int handle_invoke_status(URLContext *s,
> RTMPPacket *pkt)
> >      return 0;
> >  }
> >
> > +static int handle_go_away(URLContext *s, RTMPPacket *pkt) {
> > +    RTMPContext *rt = s->priv_data;
> > +
> > +    av_log(s, AV_LOG_TRACE, "go away signal received");
> > +
> > +    rt->go_away_received = 1;
> > +
> > +    return 0;
> > +}
> > +
> >  static int handle_invoke(URLContext *s, RTMPPacket *pkt)
> >  {
> >      RTMPContext *rt = s->priv_data;
> > @@ -2331,6 +2356,10 @@ static int rtmp_parse_result(URLContext *s,
> RTMPContext *rt, RTMPPacket *pkt)
> >          if ((ret = handle_invoke(s, pkt)) < 0)
> >              return ret;
> >          break;
> > +    case RTMP_PT_GO_AWAY:
> > +        if ((ret = handle_go_away(s, pkt)) < 0)
> > +            return ret;
> > +        break;
> >      case RTMP_PT_VIDEO:
> >      case RTMP_PT_AUDIO:
> >      case RTMP_PT_METADATA:
> > @@ -2513,6 +2542,15 @@ static int rtmp_close(URLContext *h)
> >      free_tracked_methods(rt);
> >      av_freep(&rt->flv_data);
> >      ffurl_closep(&rt->stream);
> > +    if (rt->last_avc_seq_header_pkt.size)
> > +        ff_rtmp_packet_destroy(&rt->last_avc_seq_header_pkt);
> > +
> > +    if (rt->last_aac_seq_header_pkt.size)
> > +        ff_rtmp_packet_destroy(&rt->last_aac_seq_header_pkt);
> > +
> > +    if (rt->last_metadata_pkt.size)
> > +        ff_rtmp_packet_destroy(&rt->last_metadata_pkt);
> > +
> >      return ret;
> >  }
> >
> > @@ -2871,14 +2909,23 @@ reconnect:
> >                  goto fail;
> >          }
> >      } else {
> > -        rt->flv_size = 0;
> > -        rt->flv_data = NULL;
> > -        rt->flv_off  = 0;
> > -        rt->skip_bytes = 13;
> > +        // Do not clean buffers if it is a forced reconnection
> > +        if (rt->force_reconnection_now <= 0) {
> > +            rt->flv_size = 0;
> > +            rt->flv_data = NULL;
> > +            rt->flv_off  = 0;
> > +            rt->skip_bytes = 13;
> > +        }
> >      }
> >
> >      s->max_packet_size = rt->stream->max_packet_size;
> >      s->is_streamed     = 1;
> > +
> > +    // Copy original params
> > +    av_dict_copy(&rt->original_opts, *opts, 0);
> > +    rt->original_flags = flags;
> > +    av_strlcpy(rt->original_uri, uri, TCURL_MAX_LENGTH);
> > +
> >      return 0;
> >
> >  fail:
> > @@ -2951,6 +2998,107 @@ static int rtmp_pause(URLContext *s, int pause)
> >      return 0;
> >  }
> >
> > +/**
> > + * Reconnect RTMP connection.
> > +*/
> > +static int rtmp_reconnect(URLContext *s) {
> > +    RTMPContext *rt = s->priv_data;
> > +    int i;
> > +
> > +    // Close current RTMP connection
> > +    av_log(s, AV_LOG_INFO, "reconnecting!\n");
> > +
> > +    ffurl_closep(&rt->stream);
> > +    rt->do_reconnect = 0;
> > +    rt->nb_invokes   = 0;
> > +    for (i = 0; i < 2; i++)
> > +        memset(rt->prev_pkt[i], 0, sizeof(**rt->prev_pkt) *
> rt->nb_prev_pkt[i]);
> > +
> > +    free_tracked_methods(rt);
> > +
> > +    // Connect RTMP again using orignal values
> > +    return rtmp_open(s, rt->original_uri, rt->original_flags,
> &rt->original_opts);
> > +}
> > +
> > +/**
> > + * Checks RTMP packet and return 1 when it contains an AAC header
> > +*/
> > +static int rtmp_packet_is_aac_audio_header(RTMPPacket *pkt) {
> > +    uint8_t sound_format;
> > +    uint8_t aac_packet_type;
> > +
> > +    if ((!pkt) || (pkt->size < 2) || pkt->type != RTMP_PT_AUDIO)
> > +        return 0;
> > +
> > +    sound_format = (pkt->data[0] & 0xF0) >> 4;
> > +    aac_packet_type = pkt->data[1];
> > +    // Check codec == AVC and avc contains seq header
> > +    if (sound_format == 10 && aac_packet_type == 0)
> > +        return 1;
> > +
> > +    return 0;
> > +}
> > +
> > +/**
> > + * Checks RTMP packet and return 1 when it contains an AVC header
> > +*/
> > +static int rtmp_packet_is_avc_video_header(RTMPPacket *pkt) {
> > +    uint8_t codec_id;
> > +    uint8_t avc_packet_type;
> > +
> > +    if ((!pkt) || (pkt->size < 2) || pkt->type != RTMP_PT_VIDEO)
> > +        return 0;
> > +
> > +    codec_id = pkt->data[0] & 0xF;
> > +    avc_packet_type = pkt->data[1];
> > +    // Check codec == AVC and avc contains seq header
> > +    if (codec_id == 7 && avc_packet_type == 0)
> > +        return 1;
> > +
> > +    return 0;
> > +}
> > +
> > +/**
> > + * Checks RTMP packet and return 1 when it contains video IDR point
> > +*/
> > +static int rtmp_packet_is_video_avc_IDR(RTMPPacket *pkt) {
> > +    uint8_t frame_type;
> > +    uint8_t codec_id;
> > +
> > +    if ((!pkt) || (pkt->size < 1) || pkt->type != RTMP_PT_VIDEO)
> > +        return 0;
> > +
> > +    frame_type = (pkt->data[0] & 0xF0) >> 4;
> > +    codec_id = pkt->data[0] & 0xF;
> > +    // Check codec == AVC and videoFrame == Keyframe / seekable
> (assuming that means IDR)
> > +    if (codec_id == 7 && frame_type == 1)
> > +        return 1;
> > +
> > +    return 0;
> > +}
> > +
> > +/**
> > + * Checks RTMP packet and return 1 when it contains onMetadata info
> > +*/
> > +static int rtmp_packet_is_onMetadata_packet(RTMPPacket *pkt) {
> > +    uint8_t commandbuffer[64];
> > +    int stringlen;
> > +    GetByteContext gbc;
> > +
> > +    if ((!pkt) || (pkt->size < 10) || pkt->type != RTMP_PT_NOTIFY)
> > +        return 0;
> > +
> > +    bytestream2_init(&gbc, pkt->data, pkt->size);
> > +    if (ff_amf_read_string(&gbc, commandbuffer,
> sizeof(commandbuffer),&stringlen))
> > +        return 0;
> > +
> > +    // onMetadata is prepended by "@setDataFrame"
> > +    if (!strcmp(commandbuffer, "@setDataFrame"))
> > +        return 1;
> > +
> > +    return 0;
> > +}
> > +
> >  static int rtmp_write(URLContext *s, const uint8_t *buf, int size)
> >  {
> >      RTMPContext *rt = s->priv_data;
> > @@ -2960,6 +3108,8 @@ static int rtmp_write(URLContext *s, const uint8_t
> *buf, int size)
> >      const uint8_t *buf_temp = buf;
> >      uint8_t c;
> >      int ret;
> > +    int execute_reconnection = 0;
> > +    int is_idr = 0;
> >
> >      do {
> >          if (rt->skip_bytes) {
> > @@ -2988,8 +3138,13 @@ static int rtmp_write(URLContext *s, const
> uint8_t *buf, int size)
> >              bytestream_get_be24(&header);
> >              rt->flv_size = pktsize;
> >
> > -            if (pkttype == RTMP_PT_VIDEO)
> > +            if (pkttype == RTMP_PT_VIDEO) {
> >                  channel = RTMP_VIDEO_CHANNEL;
> > +                rt->has_video = 1;
> > +            }
> > +            if (pkttype == RTMP_PT_AUDIO) {
> > +                rt->has_audio = 1;
> > +            }
> >
> >              if (((pkttype == RTMP_PT_VIDEO || pkttype == RTMP_PT_AUDIO)
> && ts == 0) ||
> >                  pkttype == RTMP_PT_NOTIFY) {
> > @@ -3047,7 +3202,155 @@ static int rtmp_write(URLContext *s, const
> uint8_t *buf, int size)
> >                  }
> >              }
> >
> > -            if ((ret = rtmp_send_packet(rt, &rt->out_pkt, 0)) < 0)
> > +            // Check if a reconnection is required
> > +            // Per interval
> > +            if ((rt->reconnect_interval > 0) &&
> > +                (rt->out_pkt.timestamp >= (rt->last_reconnect_timestamp
> + rt->reconnect_interval * 1000))) {
> > +                rt->last_reconnect_timestamp = rt->out_pkt.timestamp;
> > +                rt->force_reconnection_now = 1;
> > +                av_log(s, AV_LOG_TRACE,
> > +                       "trigered internal interval reconnection\n");
> > +            }
> > +            // Per go away signal
> > +            if (rt->go_away_received > 0) {
> > +                rt->go_away_received = 0;
> > +                rt->force_reconnection_now = 1;
> > +                av_log(s, AV_LOG_TRACE,
> > +                       "detected go away signal from the peer\n");
> > +            }
> > +
> > +            if (rtmp_packet_is_avc_video_header(&rt->out_pkt)) {
> > +                // Save last video header
> > +                if (rt->last_avc_seq_header_pkt.size) {
> > +                    av_log(s, AV_LOG_DEBUG,
> > +                           "freeing last video header packet saved\n");
> > +
> ff_rtmp_packet_destroy(&rt->last_avc_seq_header_pkt);
> > +                }
> > +                // Save AVC seq header packet
> > +                if ((ret =
> ff_rtmp_packet_clone(&rt->last_avc_seq_header_pkt, &rt->out_pkt)) < 0) {
> > +                    return ret;
> > +                }
> > +                av_log(s, AV_LOG_DEBUG, "saved video header packet\n");
> > +            } else if (rtmp_packet_is_aac_audio_header(&rt->out_pkt)) {
> > +                // Save last audio header
> > +                if (rt->last_aac_seq_header_pkt.size) {
> > +                    av_log(s, AV_LOG_DEBUG,
> > +                           "freeing last audio header packet saved\n");
> > +
> ff_rtmp_packet_destroy(&rt->last_aac_seq_header_pkt);
> > +                }
> > +                // Save AAC seq header packet
> > +                if ((ret =
> ff_rtmp_packet_clone(&rt->last_aac_seq_header_pkt, &rt->out_pkt)) < 0) {
> > +                    return ret;
> > +                }
> > +                av_log(s, AV_LOG_DEBUG, "saved audio header packet\n");
> > +            } else if (rtmp_packet_is_onMetadata_packet(&rt->out_pkt)) {
> > +                // Save last onMetadata packet
> > +                if (rt->last_metadata_pkt.size) {
> > +                    av_log(s, AV_LOG_DEBUG,
> > +                           "freeing last onMetadata packet saved\n");
> > +                    ff_rtmp_packet_destroy(&rt->last_metadata_pkt);
> > +                }
> > +                // Save onMetadata packet
> > +                if ((ret = ff_rtmp_packet_clone(&rt->last_metadata_pkt,
> &rt->out_pkt)) < 0) {
> > +                    return ret;
> > +                }
> > +                av_log(s, AV_LOG_DEBUG, "saved onMetadata packet\n");
> > +            }
> > +
> > +            // Reconnection has been requested
> > +            if (rt->force_reconnection_now >= 1) {
> > +                // Check if packet is video IDR
> > +                is_idr = rtmp_packet_is_video_avc_IDR(&rt->out_pkt);
> > +                av_log(s, AV_LOG_DEBUG,
> > +                       "looking for the right disconnect point. Is IDR:
> %d, "
> > +                       "has_video: %d, has_audio: %d, state: %d, "
> > +                       "last_avc_seq_header_pkt.size: %d, "
> > +                       "last_aac_seq_header_pkt.size: %d\n",
> > +                       is_idr, rt->has_video, rt->has_audio, rt->state,
> > +                       rt->last_avc_seq_header_pkt.size,
> > +                       rt->last_aac_seq_header_pkt.size);
> > +
> > +                if (rt->has_video && rt->has_audio &&
> > +                    (rt->state == STATE_PUBLISHING)) {
> > +                    // If we only video let's do the reconnection in an
> IDR
> > +                    // frame when we have both headers saved
> > +                    if (is_idr && rt->last_avc_seq_header_pkt.size &&
> > +                        rt->last_aac_seq_header_pkt.size)
> > +                        execute_reconnection = 1;
> > +                } else if (rt->has_video && !rt->has_audio &&
> > +                           (rt->state == STATE_PUBLISHING)) {
> > +                    // If we have video and NO audio let's do the
> reconnection
> > +                    // in an IDR frame when we have video header saved
> > +                    if (is_idr && rt->last_avc_seq_header_pkt.size)
> > +                        execute_reconnection = 1;
> > +                } else if (!rt->has_video &&
> > +                           rt->has_audio & (rt->state ==
> STATE_PUBLISHING)) {
> > +                    // If we have only audio let's do the reconnection
> when we
> > +                    // have the audio header saved
> > +                    if (rt->last_aac_seq_header_pkt.size)
> > +                        execute_reconnection = 1;
> > +                } else {
> > +                    av_log(s, AV_LOG_DEBUG,
> > +                           "reconnection is requested but can NOT be
> executed "
> > +                           "now, waiting! rt->state: %d, has_video: %d,
> "
> > +                           "has_audio: %d, is_idr: %d\n",
> > +                           rt->state, rt->has_video, rt->has_audio,
> is_idr);
> > +                }
> > +            }
> > +
> > +            if (execute_reconnection) {
> > +                execute_reconnection = 0;
> > +
> > +                av_log(s, AV_LOG_DEBUG,
> > +                       "executing reconnection. rt->flv_off: %d,
> rt->flv_size: "
> > +                       "%d\n",
> > +                       rt->flv_off, rt->flv_size);
> > +
> > +                if ((ret = rtmp_reconnect(s)) < 0)
> > +                    return ret;
> > +
> > +                // Reconnect executed, clear the flag
> > +                rt->force_reconnection_now = 0;
> > +
> > +                av_log(s, AV_LOG_DEBUG,
> > +                       "reconnected. rt->flv_off: %d, rt->flv_size:
> %d\n",
> > +                       rt->flv_off, rt->flv_size);
> > +
> > +                // Send last video header if it is saved
> > +                if (rt->last_avc_seq_header_pkt.size) {
> > +                    av_log(s, AV_LOG_DEBUG,
> > +                           "sending last saved video header\n");
> > +                    rt->last_avc_seq_header_pkt.timestamp =
> > +                        rt->out_pkt.timestamp;
> > +                    if ((ret = rtmp_send_packet(
> > +                             rt, &rt->last_avc_seq_header_pkt, 0, 0)) <
> 0)
> > +                        return ret;
> > +                }
> > +
> > +                // Send last audio header if it is saved
> > +                if (rt->last_aac_seq_header_pkt.size) {
> > +                    av_log(s, AV_LOG_DEBUG,
> > +                           "sending last saved audio header\n");
> > +                    rt->last_aac_seq_header_pkt.timestamp =
> > +                        rt->out_pkt.timestamp;
> > +                    if ((ret = rtmp_send_packet(
> > +                             rt, &rt->last_aac_seq_header_pkt, 0, 0)) <
> 0)
> > +                        return ret;
> > +                }
> > +
> > +                // Send last onMetadata packet, optional
> > +                if (rt->last_metadata_pkt.size) {
> > +                    av_log(s, AV_LOG_DEBUG,
> > +                           "sending last saved onMetadata header\n");
> > +                    rt->last_metadata_pkt.timestamp =
> rt->out_pkt.timestamp;
> > +                    if ((ret = rtmp_send_packet(rt,
> &rt->last_metadata_pkt, 0,
> > +                                                0)) < 0)
> > +                        return ret;
> > +                }
> > +            }
> > +
> > +            // Send actual packet
> > +            if ((ret = rtmp_send_packet(rt, &rt->out_pkt, 0, 1)) < 0)
> >                  return ret;
> >              rt->flv_size = 0;
> >              rt->flv_off = 0;
> > @@ -3118,6 +3421,7 @@ static const AVOption rtmp_options[] = {
> >      {"listen",      "Listen for incoming rtmp connections",
> OFFSET(listen), AV_OPT_TYPE_INT, {.i64 = 0}, INT_MIN, INT_MAX, DEC,
> "rtmp_listen" },
> >      {"tcp_nodelay", "Use TCP_NODELAY to disable Nagle's algorithm",
> OFFSET(tcp_nodelay), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1, DEC|ENC},
> >      {"timeout", "Maximum timeout (in seconds) to wait for incoming
> connections. -1 is infinite. Implies -rtmp_listen 1",
> OFFSET(listen_timeout), AV_OPT_TYPE_INT, {.i64 = -1}, INT_MIN, INT_MAX,
> DEC, "rtmp_listen" },
> > +    {"rtmp_reconnect_time", "Interval (in seconds) to force a client
> reconnection, it is based on media time. By default is 0 (no
> reconnection)", OFFSET(reconnect_interval), AV_OPT_TYPE_INT, {.i64 = 0}, 0,
> INT_MAX, ENC },
> >      { NULL },
> >  };
> >
> > --
> > 2.32.0
> >
> > _______________________________________________
> > ffmpeg-devel mailing list
> > ffmpeg-devel@ffmpeg.org
> > https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
> >
> > To unsubscribe, visit link above, or email
> > ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel@ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
>
diff mbox series

Patch

diff --git a/libavformat/rtmppkt.c b/libavformat/rtmppkt.c
index 4b97c0833f..84ec72740d 100644
--- a/libavformat/rtmppkt.c
+++ b/libavformat/rtmppkt.c
@@ -405,6 +405,25 @@  int ff_rtmp_packet_write(URLContext *h, RTMPPacket *pkt,
     return written;
 }
 
+int ff_rtmp_packet_clone(RTMPPacket *pkt_dst, const RTMPPacket *pkt_src)
+{
+    if (pkt_src->size) {
+        pkt_dst->data = av_realloc(NULL, pkt_src->size);
+        if (!pkt_dst->data)
+            return AVERROR(ENOMEM);
+        else
+            memcpy(pkt_dst->data, pkt_src->data, pkt_src->size);
+    }
+    pkt_dst->size       = pkt_src->size;
+    pkt_dst->channel_id = pkt_src->channel_id;
+    pkt_dst->type       = pkt_src->type;
+    pkt_dst->timestamp  = pkt_src->timestamp;
+    pkt_dst->extra      = pkt_src->extra;
+    pkt_dst->ts_field   = pkt_src->ts_field;
+
+    return 0;
+}
+
 int ff_rtmp_packet_create(RTMPPacket *pkt, int channel_id, RTMPPacketType type,
                           int timestamp, int size)
 {
diff --git a/libavformat/rtmppkt.h b/libavformat/rtmppkt.h
index a15d2a5773..cdb901df89 100644
--- a/libavformat/rtmppkt.h
+++ b/libavformat/rtmppkt.h
@@ -59,6 +59,7 @@  typedef enum RTMPPacketType {
     RTMP_PT_SHARED_OBJ,         ///< shared object
     RTMP_PT_INVOKE,             ///< invoke some stream action
     RTMP_PT_METADATA     = 22,  ///< FLV metadata
+    RTMP_PT_GO_AWAY      = 32,  ///< Indicates please reconnect ASAP, server is about to go down
 } RTMPPacketType;
 
 /**
@@ -99,6 +100,15 @@  typedef struct RTMPPacket {
 int ff_rtmp_packet_create(RTMPPacket *pkt, int channel_id, RTMPPacketType type,
                           int timestamp, int size);
 
+/**
+ * Clone RTMP packet
+ *
+ * @param pkt_dst packet destination
+ * @param pkt_src packet source
+ * @return zero on success, negative value otherwise
+ */
+int ff_rtmp_packet_clone(RTMPPacket *pkt_dst, const RTMPPacket *pkt_src);
+
 /**
  * Free RTMP packet.
  *
diff --git a/libavformat/rtmpproto.c b/libavformat/rtmpproto.c
index b14d23b919..ea37b9880a 100644
--- a/libavformat/rtmpproto.c
+++ b/libavformat/rtmpproto.c
@@ -124,11 +124,21 @@  typedef struct RTMPContext {
     int           nb_streamid;                ///< The next stream id to return on createStream calls
     double        duration;                   ///< Duration of the stream in seconds as returned by the server (only valid if non-zero)
     int           tcp_nodelay;                ///< Use TCP_NODELAY to disable Nagle's algorithm if set to 1
+    int           reconnect_interval;         ///< Forces a reconnected every Xs (in media time)
     char          username[50];
     char          password[50];
     char          auth_params[500];
     int           do_reconnect;
+    uint32_t      last_reconnect_timestamp;
     int           auth_tried;
+    int           force_reconnection_now;
+    int           go_away_received;
+    AVDictionary* original_opts;
+    char          original_uri[TCURL_MAX_LENGTH];
+    int           original_flags;
+    RTMPPacket    last_avc_seq_header_pkt;    ///< rtmp packet, used to save last AVC video header, used on reconnection
+    RTMPPacket    last_aac_seq_header_pkt;    ///< rtmp packet, used to save last AAC audio header, used on reconnection
+    RTMPPacket    last_metadata_pkt;        ///< rtmp packet, used to save last onMetadata info, used on reconnection
 } RTMPContext;
 
 #define PLAYER_KEY_OPEN_PART_LEN 30   ///< length of partial key used for first client digest signing
@@ -224,7 +234,7 @@  static void free_tracked_methods(RTMPContext *rt)
     rt->nb_tracked_methods   = 0;
 }
 
-static int rtmp_send_packet(RTMPContext *rt, RTMPPacket *pkt, int track)
+static int rtmp_send_packet(RTMPContext *rt, RTMPPacket *pkt, int track, int destroy)
 {
     int ret;
 
@@ -248,7 +258,9 @@  static int rtmp_send_packet(RTMPContext *rt, RTMPPacket *pkt, int track)
     ret = ff_rtmp_packet_write(rt->stream, pkt, rt->out_chunk_size,
                                &rt->prev_pkt[1], &rt->nb_prev_pkt[1]);
 fail:
-    ff_rtmp_packet_destroy(pkt);
+    if (destroy)
+        ff_rtmp_packet_destroy(pkt);
+
     return ret;
 }
 
@@ -336,6 +348,9 @@  static int gen_connect(URLContext *s, RTMPContext *rt)
     if (!rt->is_input) {
         ff_amf_write_field_name(&p, "type");
         ff_amf_write_string(&p, "nonprivate");
+        // Indicates accepts goaway
+        ff_amf_write_field_name(&p, "supportsGoAway");
+        ff_amf_write_bool(&p, 1);
     }
     ff_amf_write_field_name(&p, "flashVer");
     ff_amf_write_string(&p, rt->flashver);
@@ -400,7 +415,7 @@  static int gen_connect(URLContext *s, RTMPContext *rt)
 
     pkt.size = p - pkt.data;
 
-    return rtmp_send_packet(rt, &pkt, 1);
+    return rtmp_send_packet(rt, &pkt, 1, 1);
 }
 
 
@@ -611,7 +626,7 @@  static int gen_release_stream(URLContext *s, RTMPContext *rt)
     ff_amf_write_null(&p);
     ff_amf_write_string(&p, rt->playpath);
 
-    return rtmp_send_packet(rt, &pkt, 1);
+    return rtmp_send_packet(rt, &pkt, 1, 1);
 }
 
 /**
@@ -635,7 +650,7 @@  static int gen_fcpublish_stream(URLContext *s, RTMPContext *rt)
     ff_amf_write_null(&p);
     ff_amf_write_string(&p, rt->playpath);
 
-    return rtmp_send_packet(rt, &pkt, 1);
+    return rtmp_send_packet(rt, &pkt, 1, 1);
 }
 
 /**
@@ -659,7 +674,7 @@  static int gen_fcunpublish_stream(URLContext *s, RTMPContext *rt)
     ff_amf_write_null(&p);
     ff_amf_write_string(&p, rt->playpath);
 
-    return rtmp_send_packet(rt, &pkt, 0);
+    return rtmp_send_packet(rt, &pkt, 0, 1);
 }
 
 /**
@@ -683,7 +698,7 @@  static int gen_create_stream(URLContext *s, RTMPContext *rt)
     ff_amf_write_number(&p, ++rt->nb_invokes);
     ff_amf_write_null(&p);
 
-    return rtmp_send_packet(rt, &pkt, 1);
+    return rtmp_send_packet(rt, &pkt, 1, 1);
 }
 
 
@@ -709,7 +724,7 @@  static int gen_delete_stream(URLContext *s, RTMPContext *rt)
     ff_amf_write_null(&p);
     ff_amf_write_number(&p, rt->stream_id);
 
-    return rtmp_send_packet(rt, &pkt, 0);
+    return rtmp_send_packet(rt, &pkt, 0, 1);
 }
 
 /**
@@ -733,7 +748,7 @@  static int gen_get_stream_length(URLContext *s, RTMPContext *rt)
     ff_amf_write_null(&p);
     ff_amf_write_string(&p, rt->playpath);
 
-    return rtmp_send_packet(rt, &pkt, 1);
+    return rtmp_send_packet(rt, &pkt, 1, 1);
 }
 
 /**
@@ -754,7 +769,7 @@  static int gen_buffer_time(URLContext *s, RTMPContext *rt)
     bytestream_put_be32(&p, rt->stream_id);
     bytestream_put_be32(&p, rt->client_buffer_time);
 
-    return rtmp_send_packet(rt, &pkt, 0);
+    return rtmp_send_packet(rt, &pkt, 0, 1);
 }
 
 /**
@@ -782,7 +797,7 @@  static int gen_play(URLContext *s, RTMPContext *rt)
     ff_amf_write_string(&p, rt->playpath);
     ff_amf_write_number(&p, rt->live * 1000);
 
-    return rtmp_send_packet(rt, &pkt, 1);
+    return rtmp_send_packet(rt, &pkt, 1, 1);
 }
 
 static int gen_seek(URLContext *s, RTMPContext *rt, int64_t timestamp)
@@ -805,7 +820,7 @@  static int gen_seek(URLContext *s, RTMPContext *rt, int64_t timestamp)
     ff_amf_write_null(&p); //as usual, the first null param
     ff_amf_write_number(&p, timestamp); //where we want to jump
 
-    return rtmp_send_packet(rt, &pkt, 1);
+    return rtmp_send_packet(rt, &pkt, 1, 1);
 }
 
 /**
@@ -832,7 +847,7 @@  static int gen_pause(URLContext *s, RTMPContext *rt, int pause, uint32_t timesta
     ff_amf_write_bool(&p, pause); // pause or unpause
     ff_amf_write_number(&p, timestamp); //where we pause the stream
 
-    return rtmp_send_packet(rt, &pkt, 1);
+    return rtmp_send_packet(rt, &pkt, 1, 1);
 }
 
 /**
@@ -859,7 +874,7 @@  static int gen_publish(URLContext *s, RTMPContext *rt)
     ff_amf_write_string(&p, rt->playpath);
     ff_amf_write_string(&p, "live");
 
-    return rtmp_send_packet(rt, &pkt, 1);
+    return rtmp_send_packet(rt, &pkt, 1, 1);
 }
 
 /**
@@ -885,7 +900,7 @@  static int gen_pong(URLContext *s, RTMPContext *rt, RTMPPacket *ppkt)
     bytestream_put_be16(&p, 7); // PingResponse
     bytestream_put_be32(&p, AV_RB32(ppkt->data+2));
 
-    return rtmp_send_packet(rt, &pkt, 0);
+    return rtmp_send_packet(rt, &pkt, 0, 1);
 }
 
 /**
@@ -906,7 +921,7 @@  static int gen_swf_verification(URLContext *s, RTMPContext *rt)
     bytestream_put_be16(&p, 27);
     memcpy(p, rt->swfverification, 42);
 
-    return rtmp_send_packet(rt, &pkt, 0);
+    return rtmp_send_packet(rt, &pkt, 0, 1);
 }
 
 /**
@@ -925,7 +940,7 @@  static int gen_window_ack_size(URLContext *s, RTMPContext *rt)
     p = pkt.data;
     bytestream_put_be32(&p, rt->max_sent_unacked);
 
-    return rtmp_send_packet(rt, &pkt, 0);
+    return rtmp_send_packet(rt, &pkt, 0, 1);
 }
 
 /**
@@ -946,7 +961,7 @@  static int gen_check_bw(URLContext *s, RTMPContext *rt)
     ff_amf_write_number(&p, ++rt->nb_invokes);
     ff_amf_write_null(&p);
 
-    return rtmp_send_packet(rt, &pkt, 1);
+    return rtmp_send_packet(rt, &pkt, 1, 1);
 }
 
 /**
@@ -965,7 +980,7 @@  static int gen_bytes_read(URLContext *s, RTMPContext *rt, uint32_t ts)
     p = pkt.data;
     bytestream_put_be32(&p, rt->bytes_read);
 
-    return rtmp_send_packet(rt, &pkt, 0);
+    return rtmp_send_packet(rt, &pkt, 0, 1);
 }
 
 static int gen_fcsubscribe_stream(URLContext *s, RTMPContext *rt,
@@ -985,7 +1000,7 @@  static int gen_fcsubscribe_stream(URLContext *s, RTMPContext *rt,
     ff_amf_write_null(&p);
     ff_amf_write_string(&p, subscribe);
 
-    return rtmp_send_packet(rt, &pkt, 1);
+    return rtmp_send_packet(rt, &pkt, 1, 1);
 }
 
 /**
@@ -2153,6 +2168,16 @@  static int handle_invoke_status(URLContext *s, RTMPPacket *pkt)
     return 0;
 }
 
+static int handle_go_away(URLContext *s, RTMPPacket *pkt) {
+    RTMPContext *rt = s->priv_data;
+    
+    av_log(s, AV_LOG_TRACE, "go away signal received");
+
+    rt->go_away_received = 1;
+
+    return 0;
+}
+
 static int handle_invoke(URLContext *s, RTMPPacket *pkt)
 {
     RTMPContext *rt = s->priv_data;
@@ -2331,6 +2356,10 @@  static int rtmp_parse_result(URLContext *s, RTMPContext *rt, RTMPPacket *pkt)
         if ((ret = handle_invoke(s, pkt)) < 0)
             return ret;
         break;
+    case RTMP_PT_GO_AWAY:
+        if ((ret = handle_go_away(s, pkt)) < 0)
+            return ret;
+        break;
     case RTMP_PT_VIDEO:
     case RTMP_PT_AUDIO:
     case RTMP_PT_METADATA:
@@ -2513,6 +2542,15 @@  static int rtmp_close(URLContext *h)
     free_tracked_methods(rt);
     av_freep(&rt->flv_data);
     ffurl_closep(&rt->stream);
+    if (rt->last_avc_seq_header_pkt.size)
+        ff_rtmp_packet_destroy(&rt->last_avc_seq_header_pkt);
+
+    if (rt->last_aac_seq_header_pkt.size)
+        ff_rtmp_packet_destroy(&rt->last_aac_seq_header_pkt);
+
+    if (rt->last_metadata_pkt.size)
+        ff_rtmp_packet_destroy(&rt->last_metadata_pkt);
+
     return ret;
 }
 
@@ -2871,14 +2909,23 @@  reconnect:
                 goto fail;
         }
     } else {
-        rt->flv_size = 0;
-        rt->flv_data = NULL;
-        rt->flv_off  = 0;
-        rt->skip_bytes = 13;
+        // Do not clean buffers if it is a forced reconnection
+        if (rt->force_reconnection_now <= 0) {
+            rt->flv_size = 0;
+            rt->flv_data = NULL;
+            rt->flv_off  = 0;
+            rt->skip_bytes = 13;
+        }
     }
 
     s->max_packet_size = rt->stream->max_packet_size;
     s->is_streamed     = 1;
+
+    // Copy original params
+    av_dict_copy(&rt->original_opts, *opts, 0);
+    rt->original_flags = flags;
+    av_strlcpy(rt->original_uri, uri, TCURL_MAX_LENGTH);
+
     return 0;
 
 fail:
@@ -2951,6 +2998,107 @@  static int rtmp_pause(URLContext *s, int pause)
     return 0;
 }
 
+/**
+ * Reconnect RTMP connection.
+*/
+static int rtmp_reconnect(URLContext *s) {
+    RTMPContext *rt = s->priv_data;
+    int i;
+
+    // Close current RTMP connection
+    av_log(s, AV_LOG_INFO, "reconnecting!\n");
+
+    ffurl_closep(&rt->stream);
+    rt->do_reconnect = 0;
+    rt->nb_invokes   = 0;
+    for (i = 0; i < 2; i++)
+        memset(rt->prev_pkt[i], 0, sizeof(**rt->prev_pkt) * rt->nb_prev_pkt[i]);
+
+    free_tracked_methods(rt);
+
+    // Connect RTMP again using orignal values
+    return rtmp_open(s, rt->original_uri, rt->original_flags, &rt->original_opts);
+}
+
+/**
+ * Checks RTMP packet and return 1 when it contains an AAC header
+*/
+static int rtmp_packet_is_aac_audio_header(RTMPPacket *pkt) {
+    uint8_t sound_format;
+    uint8_t aac_packet_type;
+
+    if ((!pkt) || (pkt->size < 2) || pkt->type != RTMP_PT_AUDIO)
+        return 0;
+
+    sound_format = (pkt->data[0] & 0xF0) >> 4;
+    aac_packet_type = pkt->data[1];
+    // Check codec == AVC and avc contains seq header
+    if (sound_format == 10 && aac_packet_type == 0)
+        return 1;
+
+    return 0;
+}
+
+/**
+ * Checks RTMP packet and return 1 when it contains an AVC header
+*/
+static int rtmp_packet_is_avc_video_header(RTMPPacket *pkt) {
+    uint8_t codec_id;
+    uint8_t avc_packet_type;
+
+    if ((!pkt) || (pkt->size < 2) || pkt->type != RTMP_PT_VIDEO)
+        return 0;
+
+    codec_id = pkt->data[0] & 0xF;
+    avc_packet_type = pkt->data[1];
+    // Check codec == AVC and avc contains seq header
+    if (codec_id == 7 && avc_packet_type == 0)
+        return 1;
+
+    return 0;
+}
+
+/**
+ * Checks RTMP packet and return 1 when it contains video IDR point
+*/
+static int rtmp_packet_is_video_avc_IDR(RTMPPacket *pkt) {
+    uint8_t frame_type;
+    uint8_t codec_id;
+
+    if ((!pkt) || (pkt->size < 1) || pkt->type != RTMP_PT_VIDEO)
+        return 0;
+
+    frame_type = (pkt->data[0] & 0xF0) >> 4;
+    codec_id = pkt->data[0] & 0xF;
+    // Check codec == AVC and videoFrame == Keyframe / seekable (assuming that means IDR)
+    if (codec_id == 7 && frame_type == 1)
+        return 1;
+
+    return 0;
+}
+
+/**
+ * Checks RTMP packet and return 1 when it contains onMetadata info
+*/
+static int rtmp_packet_is_onMetadata_packet(RTMPPacket *pkt) {
+    uint8_t commandbuffer[64];
+    int stringlen;
+    GetByteContext gbc;
+
+    if ((!pkt) || (pkt->size < 10) || pkt->type != RTMP_PT_NOTIFY)
+        return 0;
+
+    bytestream2_init(&gbc, pkt->data, pkt->size);
+    if (ff_amf_read_string(&gbc, commandbuffer, sizeof(commandbuffer),&stringlen))
+        return 0;
+
+    // onMetadata is prepended by "@setDataFrame"
+    if (!strcmp(commandbuffer, "@setDataFrame"))
+        return 1;
+
+    return 0;
+}
+
 static int rtmp_write(URLContext *s, const uint8_t *buf, int size)
 {
     RTMPContext *rt = s->priv_data;
@@ -2960,6 +3108,8 @@  static int rtmp_write(URLContext *s, const uint8_t *buf, int size)
     const uint8_t *buf_temp = buf;
     uint8_t c;
     int ret;
+    int execute_reconnection = 0;
+    int is_idr = 0;
 
     do {
         if (rt->skip_bytes) {
@@ -2988,8 +3138,13 @@  static int rtmp_write(URLContext *s, const uint8_t *buf, int size)
             bytestream_get_be24(&header);
             rt->flv_size = pktsize;
 
-            if (pkttype == RTMP_PT_VIDEO)
+            if (pkttype == RTMP_PT_VIDEO) {
                 channel = RTMP_VIDEO_CHANNEL;
+                rt->has_video = 1;
+            }
+            if (pkttype == RTMP_PT_AUDIO) {
+                rt->has_audio = 1;
+            }
 
             if (((pkttype == RTMP_PT_VIDEO || pkttype == RTMP_PT_AUDIO) && ts == 0) ||
                 pkttype == RTMP_PT_NOTIFY) {
@@ -3047,7 +3202,155 @@  static int rtmp_write(URLContext *s, const uint8_t *buf, int size)
                 }
             }
 
-            if ((ret = rtmp_send_packet(rt, &rt->out_pkt, 0)) < 0)
+            // Check if a reconnection is required
+            // Per interval
+            if ((rt->reconnect_interval > 0) &&
+                (rt->out_pkt.timestamp >= (rt->last_reconnect_timestamp + rt->reconnect_interval * 1000))) {
+                rt->last_reconnect_timestamp = rt->out_pkt.timestamp;
+                rt->force_reconnection_now = 1;
+                av_log(s, AV_LOG_TRACE,
+                       "trigered internal interval reconnection\n");
+            }
+            // Per go away signal
+            if (rt->go_away_received > 0) {
+                rt->go_away_received = 0;
+                rt->force_reconnection_now = 1;
+                av_log(s, AV_LOG_TRACE,
+                       "detected go away signal from the peer\n");
+            }
+
+            if (rtmp_packet_is_avc_video_header(&rt->out_pkt)) {
+                // Save last video header
+                if (rt->last_avc_seq_header_pkt.size) {
+                    av_log(s, AV_LOG_DEBUG,
+                           "freeing last video header packet saved\n");
+                    ff_rtmp_packet_destroy(&rt->last_avc_seq_header_pkt);
+                }
+                // Save AVC seq header packet
+                if ((ret = ff_rtmp_packet_clone(&rt->last_avc_seq_header_pkt, &rt->out_pkt)) < 0) {
+                    return ret;
+                }
+                av_log(s, AV_LOG_DEBUG, "saved video header packet\n");
+            } else if (rtmp_packet_is_aac_audio_header(&rt->out_pkt)) {
+                // Save last audio header
+                if (rt->last_aac_seq_header_pkt.size) {
+                    av_log(s, AV_LOG_DEBUG,
+                           "freeing last audio header packet saved\n");
+                    ff_rtmp_packet_destroy(&rt->last_aac_seq_header_pkt);
+                }
+                // Save AAC seq header packet
+                if ((ret = ff_rtmp_packet_clone(&rt->last_aac_seq_header_pkt, &rt->out_pkt)) < 0) {
+                    return ret;
+                }
+                av_log(s, AV_LOG_DEBUG, "saved audio header packet\n");
+            } else if (rtmp_packet_is_onMetadata_packet(&rt->out_pkt)) {
+                // Save last onMetadata packet
+                if (rt->last_metadata_pkt.size) {
+                    av_log(s, AV_LOG_DEBUG,
+                           "freeing last onMetadata packet saved\n");
+                    ff_rtmp_packet_destroy(&rt->last_metadata_pkt);
+                }
+                // Save onMetadata packet
+                if ((ret = ff_rtmp_packet_clone(&rt->last_metadata_pkt, &rt->out_pkt)) < 0) {
+                    return ret;
+                }
+                av_log(s, AV_LOG_DEBUG, "saved onMetadata packet\n");
+            }
+
+            // Reconnection has been requested
+            if (rt->force_reconnection_now >= 1) {
+                // Check if packet is video IDR
+                is_idr = rtmp_packet_is_video_avc_IDR(&rt->out_pkt);
+                av_log(s, AV_LOG_DEBUG,
+                       "looking for the right disconnect point. Is IDR: %d, "
+                       "has_video: %d, has_audio: %d, state: %d, "
+                       "last_avc_seq_header_pkt.size: %d, "
+                       "last_aac_seq_header_pkt.size: %d\n",
+                       is_idr, rt->has_video, rt->has_audio, rt->state,
+                       rt->last_avc_seq_header_pkt.size,
+                       rt->last_aac_seq_header_pkt.size);
+
+                if (rt->has_video && rt->has_audio &&
+                    (rt->state == STATE_PUBLISHING)) {
+                    // If we only video let's do the reconnection in an IDR
+                    // frame when we have both headers saved
+                    if (is_idr && rt->last_avc_seq_header_pkt.size &&
+                        rt->last_aac_seq_header_pkt.size)
+                        execute_reconnection = 1;
+                } else if (rt->has_video && !rt->has_audio &&
+                           (rt->state == STATE_PUBLISHING)) {
+                    // If we have video and NO audio let's do the reconnection
+                    // in an IDR frame when we have video header saved
+                    if (is_idr && rt->last_avc_seq_header_pkt.size)
+                        execute_reconnection = 1;
+                } else if (!rt->has_video &&
+                           rt->has_audio & (rt->state == STATE_PUBLISHING)) {
+                    // If we have only audio let's do the reconnection when we
+                    // have the audio header saved
+                    if (rt->last_aac_seq_header_pkt.size)
+                        execute_reconnection = 1;
+                } else {
+                    av_log(s, AV_LOG_DEBUG,
+                           "reconnection is requested but can NOT be executed "
+                           "now, waiting! rt->state: %d, has_video: %d, "
+                           "has_audio: %d, is_idr: %d\n",
+                           rt->state, rt->has_video, rt->has_audio, is_idr);
+                }
+            }
+
+            if (execute_reconnection) {
+                execute_reconnection = 0;
+
+                av_log(s, AV_LOG_DEBUG,
+                       "executing reconnection. rt->flv_off: %d, rt->flv_size: "
+                       "%d\n",
+                       rt->flv_off, rt->flv_size);
+
+                if ((ret = rtmp_reconnect(s)) < 0)
+                    return ret;
+
+                // Reconnect executed, clear the flag
+                rt->force_reconnection_now = 0;
+
+                av_log(s, AV_LOG_DEBUG,
+                       "reconnected. rt->flv_off: %d, rt->flv_size: %d\n",
+                       rt->flv_off, rt->flv_size);
+
+                // Send last video header if it is saved
+                if (rt->last_avc_seq_header_pkt.size) {
+                    av_log(s, AV_LOG_DEBUG,
+                           "sending last saved video header\n");
+                    rt->last_avc_seq_header_pkt.timestamp =
+                        rt->out_pkt.timestamp;
+                    if ((ret = rtmp_send_packet(
+                             rt, &rt->last_avc_seq_header_pkt, 0, 0)) < 0)
+                        return ret;
+                }
+
+                // Send last audio header if it is saved
+                if (rt->last_aac_seq_header_pkt.size) {
+                    av_log(s, AV_LOG_DEBUG,
+                           "sending last saved audio header\n");
+                    rt->last_aac_seq_header_pkt.timestamp =
+                        rt->out_pkt.timestamp;
+                    if ((ret = rtmp_send_packet(
+                             rt, &rt->last_aac_seq_header_pkt, 0, 0)) < 0)
+                        return ret;
+                }
+
+                // Send last onMetadata packet, optional
+                if (rt->last_metadata_pkt.size) {
+                    av_log(s, AV_LOG_DEBUG,
+                           "sending last saved onMetadata header\n");
+                    rt->last_metadata_pkt.timestamp = rt->out_pkt.timestamp;
+                    if ((ret = rtmp_send_packet(rt, &rt->last_metadata_pkt, 0,
+                                                0)) < 0)
+                        return ret;
+                }
+            }
+
+            // Send actual packet
+            if ((ret = rtmp_send_packet(rt, &rt->out_pkt, 0, 1)) < 0)
                 return ret;
             rt->flv_size = 0;
             rt->flv_off = 0;
@@ -3118,6 +3421,7 @@  static const AVOption rtmp_options[] = {
     {"listen",      "Listen for incoming rtmp connections", OFFSET(listen), AV_OPT_TYPE_INT, {.i64 = 0}, INT_MIN, INT_MAX, DEC, "rtmp_listen" },
     {"tcp_nodelay", "Use TCP_NODELAY to disable Nagle's algorithm", OFFSET(tcp_nodelay), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1, DEC|ENC},
     {"timeout", "Maximum timeout (in seconds) to wait for incoming connections. -1 is infinite. Implies -rtmp_listen 1",  OFFSET(listen_timeout), AV_OPT_TYPE_INT, {.i64 = -1}, INT_MIN, INT_MAX, DEC, "rtmp_listen" },
+    {"rtmp_reconnect_time", "Interval (in seconds) to force a client reconnection, it is based on media time. By default is 0 (no reconnection)", OFFSET(reconnect_interval), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, ENC },
     { NULL },
 };