diff mbox

[FFmpeg-devel,v5,10/11] avformat/fifo: Add AVFMT_FLAG_NONBLOCK support

Message ID 1471209308-25838-2-git-send-email-sebechlebskyjan@gmail.com
State Superseded
Headers show

Commit Message

sebechlebskyjan@gmail.com Aug. 14, 2016, 9:15 p.m. UTC
From: Jan Sebechlebsky <sebechlebskyjan@gmail.com>

Add support for nonblocking calls.

Signed-off-by: Jan Sebechlebsky <sebechlebskyjan@gmail.com>
---
 No changes since the last version of the patch, just rebased because
 of changes in previous patch.

 libavformat/fifo.c | 61 ++++++++++++++++++++++++++++++++++++++++++++++++------
 1 file changed, 55 insertions(+), 6 deletions(-)
diff mbox

Patch

diff --git a/libavformat/fifo.c b/libavformat/fifo.c
index 3748fa9..ef0fda4 100644
--- a/libavformat/fifo.c
+++ b/libavformat/fifo.c
@@ -19,12 +19,14 @@ 
  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  */
 
+#include "libavutil/atomic.h"
 #include "libavutil/opt.h"
 #include "libavutil/time.h"
 #include "libavutil/thread.h"
 #include "libavutil/threadmessage.h"
 #include "avformat.h"
 #include "internal.h"
+#include "url.h"
 
 #define FIFO_DEFAULT_QUEUE_SIZE              60
 #define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS   0
@@ -77,6 +79,17 @@  typedef struct FifoContext {
     /* Value > 0 signals queue overflow */
     volatile uint8_t overflow_flag;
 
+    /* Whether termination was requested by invoking deinit
+     * before the thread was finished. Used only in non-blocking
+     * mode - when AVFMT_FLAG_NONBLOCK is set. */
+    volatile int termination_requested;
+
+    /* Initially 0, set to 1 immediately before thread function
+     * returns */
+    volatile int thread_finished_flag;
+
+    /* Original interrupt callback of the underlying muxer. */
+    AVIOInterruptCB orig_interrupt_callback;
 } FifoContext;
 
 typedef struct FifoThreadContext {
@@ -111,6 +124,16 @@  typedef struct FifoMessage {
     AVPacket pkt;
 } FifoMessage;
 
+static int fifo_interrupt_callback_wrapper(void *arg)
+{
+    FifoContext *ctx = arg;
+
+    if (avpriv_atomic_int_get(&ctx->termination_requested))
+        return 1;
+
+    return ff_check_interrupt(&ctx->orig_interrupt_callback);
+}
+
 static int fifo_thread_write_header(FifoThreadContext *ctx)
 {
     AVFormatContext *avf = ctx->avf;
@@ -430,12 +453,17 @@  static void *fifo_consumer_thread(void *data)
 
     fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx);
 
+    /* This must be only return path from fifo_consumer_thread function,
+     * so the thread_finised_flag is set. */
+    avpriv_atomic_int_set(&fifo->thread_finished_flag, 1);
     return NULL;
 }
 
 static int fifo_mux_init(AVFormatContext *avf)
 {
     FifoContext *fifo = avf->priv_data;
+    AVIOInterruptCB interrupt_cb = {.callback = fifo_interrupt_callback_wrapper,
+                                    .opaque = fifo};
     AVFormatContext *avf2;
     int ret = 0, i;
 
@@ -446,7 +474,8 @@  static int fifo_mux_init(AVFormatContext *avf)
 
     fifo->avf = avf2;
 
-    avf2->interrupt_callback = avf->interrupt_callback;
+    fifo->orig_interrupt_callback = avf->interrupt_callback;
+    avf2->interrupt_callback = interrupt_cb;
     avf2->max_delay = avf->max_delay;
     ret = av_dict_copy(&avf2->metadata, avf->metadata, 0);
     if (ret < 0)
@@ -533,7 +562,7 @@  static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
 {
     FifoContext *fifo = avf->priv_data;
     FifoMessage msg = {.type = pkt ? FIFO_WRITE_PACKET : FIFO_FLUSH_OUTPUT};
-    int ret;
+    int ret, queue_flags = 0;
 
     if (pkt) {
         av_init_packet(&msg.pkt);
@@ -542,15 +571,21 @@  static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
             return ret;
     }
 
-    ret = av_thread_message_queue_send(fifo->queue, &msg,
-                                       fifo->drop_pkts_on_overflow ?
-                                       AV_THREAD_MESSAGE_NONBLOCK : 0);
+    if (fifo->drop_pkts_on_overflow || (avf->flags & AVFMT_FLAG_NONBLOCK))
+        queue_flags |= AV_THREAD_MESSAGE_NONBLOCK;
+
+    ret = av_thread_message_queue_send(fifo->queue, &msg, queue_flags);
+
     if (ret == AVERROR(EAGAIN)) {
-        uint8_t overflow_set = 0;
+        uint8_t overflow_set;
+
+        if (avf->flags & AVFMT_FLAG_NONBLOCK)
+            goto fail;
 
         /* Queue is full, set fifo->overflow_flag to 1
          * to let consumer thread know the queue should
          * be flushed. */
+        overflow_set = 0;
         pthread_mutex_lock(&fifo->overflow_flag_lock);
         if (!fifo->overflow_flag)
             fifo->overflow_flag = overflow_set = 1;
@@ -578,6 +613,10 @@  static int fifo_write_trailer(AVFormatContext *avf)
 
     av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF);
 
+    if ((avf->flags & AVFMT_FLAG_NONBLOCK) &&
+        !avpriv_atomic_int_get(&fifo->thread_finished_flag))
+       return AVERROR(EAGAIN);
+
     ret = pthread_join( fifo->writer_thread, NULL);
     if (ret < 0) {
         av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
@@ -593,6 +632,16 @@  static void fifo_deinit(AVFormatContext *avf)
 {
     FifoContext *fifo = avf->priv_data;
 
+    if (avf->flags & AVFMT_FLAG_NONBLOCK) {
+        int ret;
+        avpriv_atomic_int_set(&fifo->termination_requested, 1);
+        ret = pthread_join( fifo->writer_thread, NULL);
+        if (ret < 0) {
+            av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
+                   av_err2str(AVERROR(ret)));
+        }
+    }
+
     if (fifo->format_options)
         av_dict_free(&fifo->format_options);