From patchwork Wed Jul 12 13:44:00 2017 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Muhammad Faiz X-Patchwork-Id: 4310 Delivered-To: ffmpegpatchwork@gmail.com Received: by 10.103.1.76 with SMTP id 73csp831706vsb; Wed, 12 Jul 2017 06:44:42 -0700 (PDT) X-Received: by 10.28.94.201 with SMTP id s192mr2801294wmb.100.1499867082855; Wed, 12 Jul 2017 06:44:42 -0700 (PDT) ARC-Seal: i=1; a=rsa-sha256; t=1499867082; cv=none; d=google.com; s=arc-20160816; b=a2mB+f+g4TTp6NetTWi6O0MA7BQFQoELT0Neou1XwLykFzUaBq3SAWzThtPnEPXsOa ExYHqQEZIdqayccqjieNL/asbJXrJBb1n6onY2tupVSPZjRxaqy3490kak9oaZku8Hfw NhfjB7Qefr8amwacH0JvZbsI3E+Mz0Jatde0BkQVnUowm0eogSCyOw2u2ZOEtbBzQ59k lpQ33Ql6Ltxc1c2xczWD8GQsFvlYYEQyBziqmqjqJV+vhh5A70q0keEaCtq8qI9qWU29 bSEDsMRU2Sg6IUlqp0kinQpg+Lh4ru7NDcdIH4EmnMdESdQLqZOxNCkRNlddQY/71q8L bS6Q== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:content-transfer-encoding:cc:reply-to :list-subscribe:list-help:list-post:list-archive:list-unsubscribe :list-id:precedence:subject:mime-version:references:in-reply-to :message-id:date:to:from:dkim-signature:delivered-to :arc-authentication-results; bh=feGHM1qUXk48hNTNI+WVlbp6XH3yfEjGofmp1ZlqTas=; b=us15QISKK+rdCw6WcN7lgiJ1uYVxVZrj10TdIwvE25UB04Pk+GsTQ/4YLJ/cX5Nd39 Nb2e1Yrgr6fuECJp6seW+smsOaxyxERzMEodwQ0gblhpGGxF+FTrNyAG4QKEFWvUPMGd 2veKegnl8qFkqWYvoB9FxXSMiL8QcqTowOtWq+4SJjTGy3PERZHp2wpXqMC0xJP2i48N OEO/1w1/dPKXLBSpLRQNrkq6maGrY4m0O4DKEcIJUKaeHEzdHdYqyvPrlDHrujSLWnBz 9+Pxk40hGi8+OxswbzV7GNifNOegpNW7oePnuLf32Z3gf87M1TvtA7wB5CTRYNYi+xfY hU7A== ARC-Authentication-Results: i=1; mx.google.com; dkim=neutral (body hash did not verify) header.i=@gmail.com header.b=CAUkJgKK; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org; dmarc=fail (p=NONE sp=NONE dis=NONE) header.from=gmail.com Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org. [79.124.17.100]) by mx.google.com with ESMTP id 67si1900427wrm.255.2017.07.12.06.44.42; Wed, 12 Jul 2017 06:44:42 -0700 (PDT) Received-SPF: pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) client-ip=79.124.17.100; Authentication-Results: mx.google.com; dkim=neutral (body hash did not verify) header.i=@gmail.com header.b=CAUkJgKK; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org; dmarc=fail (p=NONE sp=NONE dis=NONE) header.from=gmail.com Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 431AD689DCD; Wed, 12 Jul 2017 16:44:34 +0300 (EEST) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from mail-pg0-f68.google.com (mail-pg0-f68.google.com [74.125.83.68]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id 60E1B689822 for ; Wed, 12 Jul 2017 16:44:27 +0300 (EEST) Received: by mail-pg0-f68.google.com with SMTP id u62so3104740pgb.0 for ; Wed, 12 Jul 2017 06:44:32 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references :mime-version:content-transfer-encoding; bh=forHZCDilpnha2O1/fxS49HVLb6YOPM0GDhvPLOApTE=; b=CAUkJgKK9N+zKIv0z9VgDeUAjB3kjkTjA/qYObtyfX0KXDLgqHj5LUfoa5xcvegMNt JOMq1wd+e/6/nd+Al8C0iYA2bGDvYoGCbM2eSR+/MnViyttH8JtLIlYXGH2SCoamEOzk m923tbB6ae+No+10bKs7GuwK1OhiwCbvWbGfJ99pMB+cKDEpi8SYdyYNZmyC6Y9uQYwJ iEuyZ4ls6RJZobjYPMCj8bUO/RvZ09eWKkwZJleWxnkDuxl98gmnETw/UtGe0XbDhAhS nnuQi+bZqj1KSvWYN+2fkUEvVRuCy+njO2BhymwWuPpaVk1Dw83KnFP9xMxaejkAvzOY vH6Q== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references:mime-version:content-transfer-encoding; bh=forHZCDilpnha2O1/fxS49HVLb6YOPM0GDhvPLOApTE=; b=DDMM8t3kf0ZAtrbQ9FuL7Hggj547nb9kviS6t0IH0yZajpGF/20bNtA4HzCVBa2wGq KlEAZ5YeiYTaT6+5EnXLVjlGrTn6O2+SNRIsNVpNcatFFSi9Pbl7NUwlqLpDbmaubZi6 0eCZR35FV/HTscB6uEFEm4N/Mxc279arVLlHDdc1jcm4VOAxb1FP5F5Tl7fdZy/sxG9w 0XPfl5qM1H4OiTLGZmMgDpSWeQR5uzDdd+PYaxe8F2Bgcok8/IcUuabp6hImyMSqVVj8 nQ99J4M3JkOjixaWU7dMyrQBPdJb2l5fTdpdzv56mzA5oqaxEOwP7vzC2uun4ZJx6BtZ Palw== X-Gm-Message-State: AIVw113F5mv8iKrqWQxGP2CqOwOON+WQP+DpQQJu2mODYjzFGUEcnaKV vpD4m2rF4IY38x1D X-Received: by 10.84.231.207 with SMTP id g15mr4331765pln.146.1499867070129; Wed, 12 Jul 2017 06:44:30 -0700 (PDT) Received: from localhost.localdomain ([114.124.231.196]) by smtp.gmail.com with ESMTPSA id s62sm6236997pfi.36.2017.07.12.06.44.28 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Wed, 12 Jul 2017 06:44:29 -0700 (PDT) From: Muhammad Faiz To: ffmpeg-devel@ffmpeg.org Date: Wed, 12 Jul 2017 20:44:00 +0700 Message-Id: <20170712134400.29629-1-mfcc64@gmail.com> X-Mailer: git-send-email 2.9.3 In-Reply-To: <20170712112554.GL3740@nb4> References: <20170712112554.GL3740@nb4> MIME-Version: 1.0 Subject: [FFmpeg-devel] [PATCH 1/3 v2] avutil: merge slice threading implementation from avcodec and avfilter X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.20 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Cc: Muhammad Faiz Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" Rework it to improve performance. Now mutex is not shared by workers, instead each worker has its own mutex and condition variable. This reduces lock contention between workers. Also use atomic variable for counter. The interface also allows execute to run special function on main thread, requested by Ronald. Signed-off-by: Muhammad Faiz --- libavutil/Makefile | 1 + libavutil/slicethread.c | 259 ++++++++++++++++++++++++++++++++++++++++++++++++ libavutil/slicethread.h | 52 ++++++++++ 3 files changed, 312 insertions(+) create mode 100644 libavutil/slicethread.c create mode 100644 libavutil/slicethread.h diff --git a/libavutil/Makefile b/libavutil/Makefile index b4464b0..af9fba8 100644 --- a/libavutil/Makefile +++ b/libavutil/Makefile @@ -142,6 +142,7 @@ OBJS = adler32.o \ samplefmt.o \ sha.o \ sha512.o \ + slicethread.o \ spherical.o \ stereo3d.o \ threadmessage.o \ diff --git a/libavutil/slicethread.c b/libavutil/slicethread.c new file mode 100644 index 0000000..c43f87a --- /dev/null +++ b/libavutil/slicethread.c @@ -0,0 +1,259 @@ +/* + * This file is part of FFmpeg. + * + * FFmpeg is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * FFmpeg is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with FFmpeg; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include "slicethread.h" +#include "mem.h" +#include "thread.h" +#include "avassert.h" + +#if HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS2THREADS + +typedef struct WorkerContext { + AVSliceThread *ctx; + pthread_mutex_t mutex; + pthread_cond_t cond; + pthread_t thread; + int done; +} WorkerContext; + +struct AVSliceThread { + WorkerContext *workers; + int nb_threads; + int nb_active_threads; + int nb_jobs; + + atomic_uint first_job; + atomic_uint current_job; + pthread_mutex_t done_mutex; + pthread_cond_t done_cond; + int done; + int finished; + + void *priv; + void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads); + void (*main_func)(void *priv); +}; + +static int run_jobs(AVSliceThread *ctx) +{ + unsigned nb_jobs = ctx->nb_jobs; + unsigned nb_active_threads = ctx->nb_active_threads; + unsigned first_job = atomic_fetch_add_explicit(&ctx->first_job, 1, memory_order_acq_rel); + unsigned current_job = first_job; + + do { + ctx->worker_func(ctx->priv, current_job, first_job, nb_jobs, nb_active_threads); + } while ((current_job = atomic_fetch_add_explicit(&ctx->current_job, 1, memory_order_acq_rel)) < nb_jobs); + + return current_job == nb_jobs + nb_active_threads - 1; +} + +static void *attribute_align_arg thread_worker(void *v) +{ + WorkerContext *w = v; + AVSliceThread *ctx = w->ctx; + + pthread_mutex_lock(&w->mutex); + pthread_cond_signal(&w->cond); + + while (1) { + w->done = 1; + while (w->done) + pthread_cond_wait(&w->cond, &w->mutex); + + if (ctx->finished) { + pthread_mutex_unlock(&w->mutex); + return NULL; + } + + if (run_jobs(ctx)) { + pthread_mutex_lock(&ctx->done_mutex); + ctx->done = 1; + pthread_cond_signal(&ctx->done_cond); + pthread_mutex_unlock(&ctx->done_mutex); + } + } +} + +int avpriv_slicethread_create(AVSliceThread **pctx, void *priv, + void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads), + void (*main_func)(void *priv), + int nb_threads) +{ + AVSliceThread *ctx; + int nb_workers, i; + +#if HAVE_W32THREADS + w32thread_init(); +#endif + + av_assert0(nb_threads >= 0); + if (!nb_threads) { + int nb_cpus = av_cpu_count(); + if (nb_cpus > 1) + nb_threads = nb_cpus + 1; + else + nb_threads = 1; + } + + nb_workers = nb_threads; + if (!main_func) + nb_workers--; + + *pctx = ctx = av_mallocz(sizeof(*ctx)); + if (!ctx) + return AVERROR(ENOMEM); + + if (nb_workers && !(ctx->workers = av_calloc(nb_workers, sizeof(*ctx->workers)))) { + av_freep(pctx); + return AVERROR(ENOMEM); + } + + ctx->priv = priv; + ctx->worker_func = worker_func; + ctx->main_func = main_func; + ctx->nb_threads = nb_threads; + ctx->nb_active_threads = 0; + ctx->nb_jobs = 0; + ctx->finished = 0; + + atomic_init(&ctx->first_job, 0); + atomic_init(&ctx->current_job, 0); + pthread_mutex_init(&ctx->done_mutex, NULL); + pthread_cond_init(&ctx->done_cond, NULL); + ctx->done = 0; + + for (i = 0; i < nb_workers; i++) { + WorkerContext *w = &ctx->workers[i]; + int ret; + w->ctx = ctx; + pthread_mutex_init(&w->mutex, NULL); + pthread_cond_init(&w->cond, NULL); + pthread_mutex_lock(&w->mutex); + w->done = 0; + + if (ret = pthread_create(&w->thread, NULL, thread_worker, w)) { + ctx->nb_threads = main_func ? i : i + 1; + pthread_mutex_unlock(&w->mutex); + pthread_cond_destroy(&w->cond); + pthread_mutex_destroy(&w->mutex); + avpriv_slicethread_free(pctx); + return AVERROR(ret); + } + + while (!w->done) + pthread_cond_wait(&w->cond, &w->mutex); + pthread_mutex_unlock(&w->mutex); + } + + return nb_threads; +} + +void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main) +{ + int nb_workers, i, is_last = 0; + + av_assert0(nb_jobs > 0); + ctx->nb_jobs = nb_jobs; + ctx->nb_active_threads = FFMIN(nb_jobs, ctx->nb_threads); + atomic_store_explicit(&ctx->first_job, 0, memory_order_relaxed); + atomic_store_explicit(&ctx->current_job, ctx->nb_active_threads, memory_order_relaxed); + nb_workers = ctx->nb_active_threads; + if (!ctx->main_func || !execute_main) + nb_workers--; + + for (i = 0; i < nb_workers; i++) { + WorkerContext *w = &ctx->workers[i]; + pthread_mutex_lock(&w->mutex); + w->done = 0; + pthread_cond_signal(&w->cond); + pthread_mutex_unlock(&w->mutex); + } + + if (ctx->main_func && execute_main) + ctx->main_func(ctx->priv); + else + is_last = run_jobs(ctx); + + if (!is_last) { + pthread_mutex_lock(&ctx->done_mutex); + while (!ctx->done) + pthread_cond_wait(&ctx->done_cond, &ctx->done_mutex); + ctx->done = 0; + pthread_mutex_unlock(&ctx->done_mutex); + } +} + +void avpriv_slicethread_free(AVSliceThread **pctx) +{ + AVSliceThread *ctx; + int nb_workers, i; + + if (!pctx || !*pctx) + return; + + ctx = *pctx; + nb_workers = ctx->nb_threads; + if (!ctx->main_func) + nb_workers--; + + ctx->finished = 1; + for (i = 0; i < nb_workers; i++) { + WorkerContext *w = &ctx->workers[i]; + pthread_mutex_lock(&w->mutex); + w->done = 0; + pthread_cond_signal(&w->cond); + pthread_mutex_unlock(&w->mutex); + } + + for (i = 0; i < nb_workers; i++) { + WorkerContext *w = &ctx->workers[i]; + pthread_join(w->thread, NULL); + pthread_cond_destroy(&w->cond); + pthread_mutex_destroy(&w->mutex); + } + + pthread_cond_destroy(&ctx->done_cond); + pthread_mutex_destroy(&ctx->done_mutex); + av_freep(&ctx->workers); + av_freep(pctx); +} + +#else /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */ + +int avpriv_slicethread_create(AVSliceThread **pctx, void *priv, + void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads), + void (*main_func)(void *priv), + int nb_threads) +{ + *pctx = NULL; + return AVERROR(EINVAL); +} + +void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main) +{ + av_assert0(0); +} + +void avpriv_slicethread_free(AVSliceThread **pctx) +{ + av_assert0(!pctx || !*pctx); +} + +#endif /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */ diff --git a/libavutil/slicethread.h b/libavutil/slicethread.h new file mode 100644 index 0000000..f6f6f30 --- /dev/null +++ b/libavutil/slicethread.h @@ -0,0 +1,52 @@ +/* + * This file is part of FFmpeg. + * + * FFmpeg is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * FFmpeg is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with FFmpeg; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef AVUTIL_SLICETHREAD_H +#define AVUTIL_SLICETHREAD_H + +typedef struct AVSliceThread AVSliceThread; + +/** + * Create slice threading context. + * @param pctx slice threading context returned here + * @param priv private pointer to be passed to callback function + * @param worker_func callback function to be executed + * @param main_func special callback function, called from main thread, may be NULL + * @param nb_threads number of threads, 0 for automatic, must be >= 0 + * @return return number of threads or negative AVERROR on failure + */ +int avpriv_slicethread_create(AVSliceThread **pctx, void *priv, + void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads), + void (*main_func)(void *priv), + int nb_threads); + +/** + * Execute slice threading. + * @param ctx slice threading context + * @param nb_jobs number of jobs, must be > 0 + * @param execute_main also execute main_func + */ +void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main); + +/** + * Destroy slice threading context. + * @param pctx pointer to context + */ +void avpriv_slicethread_free(AVSliceThread **pctx); + +#endif