diff mbox

[FFmpeg-devel,10/11] avformat/fifo: Add AVFMT_FLAG_NONBLOCK support

Message ID 1470144262-13167-11-git-send-email-sebechlebskyjan@gmail.com
State Superseded
Headers show

Commit Message

sebechlebskyjan@gmail.com Aug. 2, 2016, 1:24 p.m. UTC
From: Jan Sebechlebsky <sebechlebskyjan@gmail.com>

Add support for nonblocking calls.

Signed-off-by: Jan Sebechlebsky <sebechlebskyjan@gmail.com>
---
 libavformat/fifo.c | 70 +++++++++++++++++++++++++++++++++++++++++++++---------
 1 file changed, 59 insertions(+), 11 deletions(-)

Comments

Michael Niedermayer Aug. 2, 2016, 9:31 p.m. UTC | #1
On Tue, Aug 02, 2016 at 03:24:21PM +0200, sebechlebskyjan@gmail.com wrote:
> From: Jan Sebechlebsky <sebechlebskyjan@gmail.com>
> 
> Add support for nonblocking calls.
> 
> Signed-off-by: Jan Sebechlebsky <sebechlebskyjan@gmail.com>
> ---
>  libavformat/fifo.c | 70 +++++++++++++++++++++++++++++++++++++++++++++---------
>  1 file changed, 59 insertions(+), 11 deletions(-)

breaks on x86-32 linux

src/libavformat/fifo.c: In function ‘fifo_write_trailer’:
src/libavformat/fifo.c:626:9: error: implicit declaration of function ‘pthread_tryjoin_np’ [-Werror=implicit-function-declaration]
cc1: some warnings being treated as errors
make: *** [libavformat/fifo.o] Error 1

[...]
Nicolas George Aug. 3, 2016, 1:41 p.m. UTC | #2
Le sextidi 16 thermidor, an CCXXIV, sebechlebskyjan@gmail.com a écrit :
> From: Jan Sebechlebsky <sebechlebskyjan@gmail.com>
> 
> Add support for nonblocking calls.
> 
> Signed-off-by: Jan Sebechlebsky <sebechlebskyjan@gmail.com>
> ---
>  libavformat/fifo.c | 70 +++++++++++++++++++++++++++++++++++++++++++++---------
>  1 file changed, 59 insertions(+), 11 deletions(-)
> 
> diff --git a/libavformat/fifo.c b/libavformat/fifo.c
> index bc8c973..efd91e3 100644
> --- a/libavformat/fifo.c
> +++ b/libavformat/fifo.c
> @@ -25,6 +25,7 @@
>  #include "avformat.h"
>  #include "internal.h"
>  #include "pthread.h"
> +#include "url.h"
>  
>  #define FIFO_DEFAULT_QUEUE_SIZE              60
>  #define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS   16
> @@ -77,6 +78,13 @@ typedef struct FifoContext {
>       * from failure or queue overflow */
>      uint8_t restart_with_keyframe;
>  
> +    /* Whether termination was requested by invoking deinit
> +     * before the thread was finished. Used only in non-blocking
> +     * mode - when AVFMT_FLAG_NONBLOCK is set. */
> +    volatile uint8_t termination_requested;
> +
> +    /* Original interrupt callback of the underlying muxer. */
> +    AVIOInterruptCB orig_interrupt_callback;
>  } FifoContext;
>  
>  typedef struct FifoThreadContext {
> @@ -110,6 +118,16 @@ typedef struct FifoMessage {
>      AVPacket pkt;
>  } FifoMessage;
>  
> +static int fifo_interrupt_callback_wrapper(void *arg)
> +{
> +    FifoContext *ctx = arg;
> +

> +    if (ctx->termination_requested)
> +        return 1;

This is a common misconception: volatile is not enough for thread
communication. IIUC, volatile forces the compiler to issue actual store and
fetch operations to memory, but not a memory barrier. That means it can be
used for communication with a signal handler (provided the object is small
enough so that operations are atomic, cf. sig_atomic_t) or a hardware
device. But for inter-thread communication on SMP systems, the lack of
memory barrier means that the change may be limited to the processor's cache
and only reach the central shared memory and the other core's cache at some
indeterminate point later.

For inter-thread communication, you either need a mutex or a special atomic
and synchronized operation. The second solution is more efficient, but
currently less portable. You can have a look at libavutil/atomic.h, and also
at Anton's efforts to replace it with a more standard and modern API, it
happened just last week on the fork's mailing-list.

> +
> +    return ff_check_interrupt(&ctx->orig_interrupt_callback);

I am not sure if this part is worth your efforts: if the application wants
to cancel a write, it uses the FIFO muxer APIs, it does not need to set up
an interrupt callback. Do you see a case where it would be useful?

> +}
> +
>  static int fifo_thread_write_header(FifoThreadContext *ctx)
>  {
>      AVFormatContext *avf = ctx->avf;
> @@ -448,6 +466,8 @@ static void *fifo_consumer_thread(void *data)
>  static int fifo_mux_init(AVFormatContext *avf)
>  {
>      FifoContext *fifo = avf->priv_data;
> +    AVIOInterruptCB interrupt_cb = {.callback = fifo_interrupt_callback_wrapper,
> +                                    .opaque = fifo};
>      AVFormatContext *avf2;
>      int ret = 0, i;
>  
> @@ -458,7 +478,8 @@ static int fifo_mux_init(AVFormatContext *avf)
>  
>      fifo->avf = avf2;
>  
> -    avf2->interrupt_callback = avf->interrupt_callback;
> +    fifo->orig_interrupt_callback = avf->interrupt_callback;
> +    avf2->interrupt_callback = interrupt_cb;
>      avf2->max_delay = avf->max_delay;
>      ret = av_dict_copy(&avf2->metadata, avf->metadata, 0);
>      if (ret < 0)
> @@ -543,7 +564,7 @@ static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
>  {
>      FifoContext *fifo = avf->priv_data;
>      FifoMessage msg = {.type = pkt ? FIFO_WRITE_PACKET : FIFO_FLUSH_OUTPUT};
> -    int ret;
> +    int ret, queue_flags = 0;
>  
>      if (pkt) {
>          av_init_packet(&msg.pkt);
> @@ -552,15 +573,21 @@ static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
>              return ret;
>      }
>  
> -    ret = av_thread_message_queue_send(fifo->queue, &msg,
> -                                       fifo->drop_pkts_on_overflow ?
> -                                       AV_THREAD_MESSAGE_NONBLOCK : 0);
> +    if (fifo->drop_pkts_on_overflow || (avf->flags & AVFMT_FLAG_NONBLOCK))
> +        queue_flags |= AVFMT_FLAG_NONBLOCK;
> +
> +    ret = av_thread_message_queue_send(fifo->queue, &msg, queue_flags);
> +
>      if (ret == AVERROR(EAGAIN)) {
> -        uint8_t overflow_set = 0;
> +        uint8_t overflow_set;
> +
> +        if (avf->flags & AVFMT_FLAG_NONBLOCK)
> +            return ret;
>  
>          /* Queue is full, set fifo->overflow_flag to 1
>           * to let consumer thread know the queue should
>           * be flushed. */
> +        overflow_set = 0;
>          pthread_mutex_lock(&fifo->overflow_flag_lock);
>          if (!fifo->overflow_flag)
>              fifo->overflow_flag = overflow_set = 1;
> @@ -588,11 +615,22 @@ static int fifo_write_trailer(AVFormatContext *avf)
>  
>      av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF);
>  
> -    ret = pthread_join( fifo->writer_thread, NULL);
> -    if (ret < 0) {
> -        av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
> -               av_err2str(AVERROR(ret)));
> -        return AVERROR(ret);
> +    if (!(avf->flags & AVFMT_FLAG_NONBLOCK)) {
> +        ret = pthread_join( fifo->writer_thread, NULL);
> +        if (ret < 0) {
> +            av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
> +                av_err2str(AVERROR(ret)));
> +            return AVERROR(ret);
> +        }
> +    } else {

> +        ret = pthread_tryjoin_np( fifo->writer_thread, NULL);

This function is a GNU extension (np means nonportable), this is the reason
for the build failure reported by Michael; I am rather surprised you did not
get it too since the build options should hide nonportable functions.

Anyway, this functions is rather unnecessary, you can just set a flag just
before the thread terminates and check the flag before pthread_join().

> +        if (ret == EBUSY)
> +            return AVERROR(EAGAIN);
> +        if (ret) {
> +            av_log(avf, AV_LOG_ERROR, "pthread_tryjoin_np error: %s\n",
> +                   av_err2str(AVERROR(ret)));
> +            return AVERROR(ret);
> +        }
>      }
>  
>      ret = fifo->write_trailer_ret;
> @@ -603,6 +641,16 @@ static void fifo_deinit(AVFormatContext *avf)
>  {
>      FifoContext *fifo = avf->priv_data;
>  
> +    if (avf->flags & AVFMT_FLAG_NONBLOCK) {
> +        int ret;
> +        fifo->termination_requested = 1;
> +        ret = pthread_join( fifo->writer_thread, NULL);
> +        if (ret < 0) {
> +            av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
> +                   av_err2str(AVERROR(ret)));
> +        }
> +    }
> +
>      if (fifo->format_options)
>          av_dict_free(&fifo->format_options);
>  

Regards,
Michael Niedermayer Aug. 3, 2016, 2:23 p.m. UTC | #3
On Wed, Aug 03, 2016 at 03:41:14PM +0200, Nicolas George wrote:
> Le sextidi 16 thermidor, an CCXXIV, sebechlebskyjan@gmail.com a écrit :
> > From: Jan Sebechlebsky <sebechlebskyjan@gmail.com>
[...]
> > +        ret = pthread_tryjoin_np( fifo->writer_thread, NULL);
> 
> This function is a GNU extension (np means nonportable), this is the reason
> for the build failure reported by Michael; I am rather surprised you did not
> get it too since the build options should hide nonportable functions.

drifting a little off topic but

without knowing, the reason why it passed for jan and
likely also for me on x86_64 is i have more stuff instelled and
enabled for x86_64 build than 32bit
And some of the pkgconfig files add very inappropriate flags like
GNU_SOURCE IIRC
Maybe we should actually try to filter that out ...

[...]
sebechlebskyjan@gmail.com Aug. 4, 2016, 9:28 a.m. UTC | #4
On 08/03/2016 03:41 PM, Nicolas George wrote:
> Le sextidi 16 thermidor, an CCXXIV, sebechlebskyjan@gmail.com a écrit :
>
>> +    if (ctx->termination_requested)
>> +        return 1;
> This is a common misconception: volatile is not enough for thread
> communication. IIUC, volatile forces the compiler to issue actual store and
> fetch operations to memory, but not a memory barrier. That means it can be
> used for communication with a signal handler (provided the object is small
> enough so that operations are atomic, cf. sig_atomic_t) or a hardware
> device. But for inter-thread communication on SMP systems, the lack of
> memory barrier means that the change may be limited to the processor's cache
> and only reach the central shared memory and the other core's cache at some
> indeterminate point later.
>
> For inter-thread communication, you either need a mutex or a special atomic
> and synchronized operation. The second solution is more efficient, but
> currently less portable. You can have a look at libavutil/atomic.h, and also
> at Anton's efforts to replace it with a more standard and modern API, it
> happened just last week on the fork's mailing-list.
This was kind of intentional, since the flag is uint8_t and only thing 
what happens is
that it is set to 1(from initial 0) at some point of time I think the 
worst scenario is that flag will be detected later (but that also 
happens when the lock is used), there is no risk that corrupted value 
will be read.
     But I agree this is probably not a good practice, so I will change 
the flag to be int and use atomic get/set.
>> +
>> +    return ff_check_interrupt(&ctx->orig_interrupt_callback);
> I am not sure if this part is worth your efforts: if the application wants
> to cancel a write, it uses the FIFO muxer APIs, it does not need to set up
> an interrupt callback. Do you see a case where it would be useful?
I think there is no reason to disable custom interrupt callback here, 
there is almost no
overhead if it is used, user can choose to ignore it, but when he 
decides to use it it will work as expected.
>> +        ret = pthread_tryjoin_np( fifo->writer_thread, NULL);
> This function is a GNU extension (np means nonportable), this is the reason
> for the build failure reported by Michael; I am rather surprised you did not
> get it too since the build options should hide nonportable functions.
>
> Anyway, this functions is rather unnecessary, you can just set a flag just
> before the thread terminates and check the flag before pthread_join().
Yes, this was a mistake, I've changed that to use flag.

Thanks!

Regards,
Jan
diff mbox

Patch

diff --git a/libavformat/fifo.c b/libavformat/fifo.c
index bc8c973..efd91e3 100644
--- a/libavformat/fifo.c
+++ b/libavformat/fifo.c
@@ -25,6 +25,7 @@ 
 #include "avformat.h"
 #include "internal.h"
 #include "pthread.h"
+#include "url.h"
 
 #define FIFO_DEFAULT_QUEUE_SIZE              60
 #define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS   16
@@ -77,6 +78,13 @@  typedef struct FifoContext {
      * from failure or queue overflow */
     uint8_t restart_with_keyframe;
 
+    /* Whether termination was requested by invoking deinit
+     * before the thread was finished. Used only in non-blocking
+     * mode - when AVFMT_FLAG_NONBLOCK is set. */
+    volatile uint8_t termination_requested;
+
+    /* Original interrupt callback of the underlying muxer. */
+    AVIOInterruptCB orig_interrupt_callback;
 } FifoContext;
 
 typedef struct FifoThreadContext {
@@ -110,6 +118,16 @@  typedef struct FifoMessage {
     AVPacket pkt;
 } FifoMessage;
 
+static int fifo_interrupt_callback_wrapper(void *arg)
+{
+    FifoContext *ctx = arg;
+
+    if (ctx->termination_requested)
+        return 1;
+
+    return ff_check_interrupt(&ctx->orig_interrupt_callback);
+}
+
 static int fifo_thread_write_header(FifoThreadContext *ctx)
 {
     AVFormatContext *avf = ctx->avf;
@@ -448,6 +466,8 @@  static void *fifo_consumer_thread(void *data)
 static int fifo_mux_init(AVFormatContext *avf)
 {
     FifoContext *fifo = avf->priv_data;
+    AVIOInterruptCB interrupt_cb = {.callback = fifo_interrupt_callback_wrapper,
+                                    .opaque = fifo};
     AVFormatContext *avf2;
     int ret = 0, i;
 
@@ -458,7 +478,8 @@  static int fifo_mux_init(AVFormatContext *avf)
 
     fifo->avf = avf2;
 
-    avf2->interrupt_callback = avf->interrupt_callback;
+    fifo->orig_interrupt_callback = avf->interrupt_callback;
+    avf2->interrupt_callback = interrupt_cb;
     avf2->max_delay = avf->max_delay;
     ret = av_dict_copy(&avf2->metadata, avf->metadata, 0);
     if (ret < 0)
@@ -543,7 +564,7 @@  static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
 {
     FifoContext *fifo = avf->priv_data;
     FifoMessage msg = {.type = pkt ? FIFO_WRITE_PACKET : FIFO_FLUSH_OUTPUT};
-    int ret;
+    int ret, queue_flags = 0;
 
     if (pkt) {
         av_init_packet(&msg.pkt);
@@ -552,15 +573,21 @@  static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
             return ret;
     }
 
-    ret = av_thread_message_queue_send(fifo->queue, &msg,
-                                       fifo->drop_pkts_on_overflow ?
-                                       AV_THREAD_MESSAGE_NONBLOCK : 0);
+    if (fifo->drop_pkts_on_overflow || (avf->flags & AVFMT_FLAG_NONBLOCK))
+        queue_flags |= AVFMT_FLAG_NONBLOCK;
+
+    ret = av_thread_message_queue_send(fifo->queue, &msg, queue_flags);
+
     if (ret == AVERROR(EAGAIN)) {
-        uint8_t overflow_set = 0;
+        uint8_t overflow_set;
+
+        if (avf->flags & AVFMT_FLAG_NONBLOCK)
+            return ret;
 
         /* Queue is full, set fifo->overflow_flag to 1
          * to let consumer thread know the queue should
          * be flushed. */
+        overflow_set = 0;
         pthread_mutex_lock(&fifo->overflow_flag_lock);
         if (!fifo->overflow_flag)
             fifo->overflow_flag = overflow_set = 1;
@@ -588,11 +615,22 @@  static int fifo_write_trailer(AVFormatContext *avf)
 
     av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF);
 
-    ret = pthread_join( fifo->writer_thread, NULL);
-    if (ret < 0) {
-        av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
-               av_err2str(AVERROR(ret)));
-        return AVERROR(ret);
+    if (!(avf->flags & AVFMT_FLAG_NONBLOCK)) {
+        ret = pthread_join( fifo->writer_thread, NULL);
+        if (ret < 0) {
+            av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
+                av_err2str(AVERROR(ret)));
+            return AVERROR(ret);
+        }
+    } else {
+        ret = pthread_tryjoin_np( fifo->writer_thread, NULL);
+        if (ret == EBUSY)
+            return AVERROR(EAGAIN);
+        if (ret) {
+            av_log(avf, AV_LOG_ERROR, "pthread_tryjoin_np error: %s\n",
+                   av_err2str(AVERROR(ret)));
+            return AVERROR(ret);
+        }
     }
 
     ret = fifo->write_trailer_ret;
@@ -603,6 +641,16 @@  static void fifo_deinit(AVFormatContext *avf)
 {
     FifoContext *fifo = avf->priv_data;
 
+    if (avf->flags & AVFMT_FLAG_NONBLOCK) {
+        int ret;
+        fifo->termination_requested = 1;
+        ret = pthread_join( fifo->writer_thread, NULL);
+        if (ret < 0) {
+            av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
+                   av_err2str(AVERROR(ret)));
+        }
+    }
+
     if (fifo->format_options)
         av_dict_free(&fifo->format_options);