From patchwork Thu Aug 11 12:48:23 2016 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: sebechlebskyjan@gmail.com X-Patchwork-Id: 154 Delivered-To: ffmpegpatchwork@gmail.com Received: by 10.103.140.67 with SMTP id o64csp47767vsd; Thu, 11 Aug 2016 05:48:46 -0700 (PDT) X-Received: by 10.28.185.202 with SMTP id j193mr9348362wmf.78.1470919726697; Thu, 11 Aug 2016 05:48:46 -0700 (PDT) Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org. [79.124.17.100]) by mx.google.com with ESMTP id 80si12542886wmy.26.2016.08.11.05.48.45; Thu, 11 Aug 2016 05:48:46 -0700 (PDT) Received-SPF: pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) client-ip=79.124.17.100; Authentication-Results: mx.google.com; dkim=neutral (body hash did not verify) header.i=@gmail.com; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org; dmarc=fail (p=NONE dis=NONE) header.from=gmail.com Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 978DB68A654; Thu, 11 Aug 2016 15:48:42 +0300 (EEST) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from mail-wm0-f67.google.com (mail-wm0-f67.google.com [74.125.82.67]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id B3C9168A666 for ; Thu, 11 Aug 2016 15:48:34 +0300 (EEST) Received: by mail-wm0-f67.google.com with SMTP id o80so1205472wme.0 for ; Thu, 11 Aug 2016 05:48:35 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=from:to:cc:subject:date:message-id:in-reply-to:references; bh=PWec0u2rkGn5DqehK6cNdX1eLMV0jUUC6uFVQBGVfc8=; b=odJ9YQHTEOaF2kQ8VtK1mT5KJr3JostgGnCzs/u12Xyyn2LT+jKNXvl+83q1AsRLmM sOmvGzReQHe1aRa+nHAgl9MsmqkLePqDESR88W04TutxMqC/FcI3ADroX5SNVCK8LfG4 OnlTvY/vNi6YbmLeWIucF2CJPDptZtk967zZ8iZpav8op/f+Yj5Nb/Z80/EZrjOd5bmC V7waNY3zRJltZXBeoc/E0BhorM9i5+wMOF3WVyz4o6AKoh/VjwAt8SZUQVMRROzAyoDG dMU97Eqv2fXDthpvAzVr6Ek9fecLPfFySHO9Aig/+nty2I+JWC0q2vdT7h3a4szePc4/ BGVw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=PWec0u2rkGn5DqehK6cNdX1eLMV0jUUC6uFVQBGVfc8=; b=CXreAxSD2NKVRAs4k91aBl2hQq5S3dyFIVeZImYF+XmfbnKcjXB3FoYlzd4gd8+N5Z Y8jKXBG2LRgp3a2R7NKsxuF7dMP8ygD9yclINf+XTaeAKic+tcTkT/XUieeS8lGm6VCo t6MiM5i60BpDUi0Y9XLZwQpyDI+lK7VQ+Yi08Z7XblMZqbaJZ+cBEYTA5a397ahXZJo5 ev2v/KWJX8CmbLJ1dyMI8V4gZR41IzwshnYAtpKS+uPmeWUbU1Msxt6hos+rjOCoHqlB QnzTITNYMIJ8QKwDyxA5SWGLimhgxJOsRBue2HePdCBHKmj7dp23XdV+OXEkqMXAFqSM cjdw== X-Gm-Message-State: AEkoousdRdUBC7i26AE42SeDsAiDYeXHZqcN/pH8OQvbZ/EGhVVwt9nqvPOhyVg0UWiCpA== X-Received: by 10.194.148.232 with SMTP id tv8mr11582087wjb.113.1470919714708; Thu, 11 Aug 2016 05:48:34 -0700 (PDT) Received: from localhost.localdomain (157.174.broadband3.iol.cz. [85.70.174.157]) by smtp.gmail.com with ESMTPSA id q4sm2584454wjk.24.2016.08.11.05.48.33 (version=TLS1_2 cipher=ECDHE-RSA-AES128-SHA bits=128/128); Thu, 11 Aug 2016 05:48:34 -0700 (PDT) From: sebechlebskyjan@gmail.com To: ffmpeg-devel@ffmpeg.org Date: Thu, 11 Aug 2016 14:48:23 +0200 Message-Id: <1470919703-4007-1-git-send-email-sebechlebskyjan@gmail.com> X-Mailer: git-send-email 1.9.1 In-Reply-To: <1470315855-19209-1-git-send-email-sebechlebskyjan@gmail.com> References: <1470315855-19209-1-git-send-email-sebechlebskyjan@gmail.com> Subject: [FFmpeg-devel] [PATCH v4 10/11] avformat/fifo: Add AVFMT_FLAG_NONBLOCK support X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.20 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Cc: Jan Sebechlebsky MIME-Version: 1.0 Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" From: Jan Sebechlebsky Add support for nonblocking calls. Signed-off-by: Jan Sebechlebsky --- Changes since the last version: - fixed wrong flag passed to av_thread_message_queue_recv() - fixed memleak when queue is full in nonblocking mode libavformat/fifo.c | 61 ++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 55 insertions(+), 6 deletions(-) diff --git a/libavformat/fifo.c b/libavformat/fifo.c index 8896b9e..cdf9f49 100644 --- a/libavformat/fifo.c +++ b/libavformat/fifo.c @@ -19,12 +19,14 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ +#include "libavutil/atomic.h" #include "libavutil/opt.h" #include "libavutil/time.h" #include "libavutil/thread.h" #include "libavutil/threadmessage.h" #include "avformat.h" #include "internal.h" +#include "url.h" #define FIFO_DEFAULT_QUEUE_SIZE 60 #define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS 16 @@ -77,6 +79,17 @@ typedef struct FifoContext { /* Value > 0 signals queue overflow */ volatile uint8_t overflow_flag; + /* Whether termination was requested by invoking deinit + * before the thread was finished. Used only in non-blocking + * mode - when AVFMT_FLAG_NONBLOCK is set. */ + volatile int termination_requested; + + /* Initially 0, set to 1 immediately before thread function + * returns */ + volatile int thread_finished_flag; + + /* Original interrupt callback of the underlying muxer. */ + AVIOInterruptCB orig_interrupt_callback; } FifoContext; typedef struct FifoThreadContext { @@ -111,6 +124,16 @@ typedef struct FifoMessage { AVPacket pkt; } FifoMessage; +static int fifo_interrupt_callback_wrapper(void *arg) +{ + FifoContext *ctx = arg; + + if (avpriv_atomic_int_get(&ctx->termination_requested)) + return 1; + + return ff_check_interrupt(&ctx->orig_interrupt_callback); +} + static int fifo_thread_write_header(FifoThreadContext *ctx) { AVFormatContext *avf = ctx->avf; @@ -433,12 +456,17 @@ static void *fifo_consumer_thread(void *data) fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx); + /* This must be only return path from fifo_consumer_thread function, + * so the thread_finised_flag is set. */ + avpriv_atomic_int_set(&fifo->thread_finished_flag, 1); return NULL; } static int fifo_mux_init(AVFormatContext *avf) { FifoContext *fifo = avf->priv_data; + AVIOInterruptCB interrupt_cb = {.callback = fifo_interrupt_callback_wrapper, + .opaque = fifo}; AVFormatContext *avf2; int ret = 0, i; @@ -449,7 +477,8 @@ static int fifo_mux_init(AVFormatContext *avf) fifo->avf = avf2; - avf2->interrupt_callback = avf->interrupt_callback; + fifo->orig_interrupt_callback = avf->interrupt_callback; + avf2->interrupt_callback = interrupt_cb; avf2->max_delay = avf->max_delay; ret = av_dict_copy(&avf2->metadata, avf->metadata, 0); if (ret < 0) @@ -536,7 +565,7 @@ static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt) { FifoContext *fifo = avf->priv_data; FifoMessage msg = {.type = pkt ? FIFO_WRITE_PACKET : FIFO_FLUSH_OUTPUT}; - int ret; + int ret, queue_flags = 0; if (pkt) { av_init_packet(&msg.pkt); @@ -545,15 +574,21 @@ static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt) return ret; } - ret = av_thread_message_queue_send(fifo->queue, &msg, - fifo->drop_pkts_on_overflow ? - AV_THREAD_MESSAGE_NONBLOCK : 0); + if (fifo->drop_pkts_on_overflow || (avf->flags & AVFMT_FLAG_NONBLOCK)) + queue_flags |= AV_THREAD_MESSAGE_NONBLOCK; + + ret = av_thread_message_queue_send(fifo->queue, &msg, queue_flags); + if (ret == AVERROR(EAGAIN)) { - uint8_t overflow_set = 0; + uint8_t overflow_set; + + if (avf->flags & AVFMT_FLAG_NONBLOCK) + goto fail; /* Queue is full, set fifo->overflow_flag to 1 * to let consumer thread know the queue should * be flushed. */ + overflow_set = 0; pthread_mutex_lock(&fifo->overflow_flag_lock); if (!fifo->overflow_flag) fifo->overflow_flag = overflow_set = 1; @@ -581,6 +616,10 @@ static int fifo_write_trailer(AVFormatContext *avf) av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF); + if ((avf->flags & AVFMT_FLAG_NONBLOCK) && + !avpriv_atomic_int_get(&fifo->thread_finished_flag)) + return AVERROR(EAGAIN); + ret = pthread_join( fifo->writer_thread, NULL); if (ret < 0) { av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n", @@ -596,6 +635,16 @@ static void fifo_deinit(AVFormatContext *avf) { FifoContext *fifo = avf->priv_data; + if (avf->flags & AVFMT_FLAG_NONBLOCK) { + int ret; + avpriv_atomic_int_set(&fifo->termination_requested, 1); + ret = pthread_join( fifo->writer_thread, NULL); + if (ret < 0) { + av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n", + av_err2str(AVERROR(ret))); + } + } + if (fifo->format_options) av_dict_free(&fifo->format_options);