From patchwork Fri Jul 7 02:25:46 2017 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Muhammad Faiz X-Patchwork-Id: 4253 Delivered-To: ffmpegpatchwork@gmail.com Received: by 10.103.1.76 with SMTP id 73csp2944087vsb; Thu, 6 Jul 2017 19:32:34 -0700 (PDT) X-Received: by 10.28.93.74 with SMTP id r71mr495119wmb.126.1499394754504; Thu, 06 Jul 2017 19:32:34 -0700 (PDT) ARC-Seal: i=1; a=rsa-sha256; t=1499394754; cv=none; d=google.com; s=arc-20160816; b=iR//oBeePbyPCawYZTFFX2USnR04Q3KbmPNsn4D9zCT23VvtgmPW7JrW8iqNgXAv3R 7wGwVsx6kSglzmBBwej9DiOAUvFx/3yHWgLg9U19icDyTDCSjPoN4NDKiVrT2YtjQBmB znHFwEn9/VhWJGAl1ZNK8RuOCIhfg+M2IOS8XtUPewPGGwc3dJBvxUdhR3Qqifb9hLGt AKZh8SEn3eCsUSaYoPcn2XAOXP2CB8OlejzPR5K2fTV4qC1fSJP3BCj7KjonPMa/v/70 R7BVV6bibA8+Ypki4TFwF/VcStpTN1KPk/07d7Qx1abRf74mrO4HTdsXVZI8sElPzjgL GsCQ== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:content-transfer-encoding:cc:reply-to :list-subscribe:list-help:list-post:list-archive:list-unsubscribe :list-id:precedence:subject:mime-version:message-id:date:to:from :dkim-signature:delivered-to:arc-authentication-results; bh=3cQd2bB1lhNkrzyyx8Ysvb6XfqjGP+F/l7bDPaErc44=; b=1B/NVwWxwjYBTXCS4xVPUgQhBhN+4Brn51de70gTIc9LCnwSDZenoQ/Ozqo1sPPqIG C+lH/iCcI+wFg4G1AIe/StlDnA1+XlMD6mxDxVf8mLhaDb+2XEwii/4SjYVzUJGPmPqU ySQcrodcnH5q8obiymuO0lSZhMRdLgIyxTKbM/z4UM2HnQESNEE2k9wvGuXgWYsurk0c 35nhMltiHdqV+btCNS4jy5f9Aquap2M1/IWZ61n1Af5W9F/ybmtkkOKHlvFS/YRuvSx3 kd0/d7RBoVm8OJLElM4b54IM5TIccKfMMaDvaICPGu7XKXqNwR0xznjIF/0ECxCy/64g V7QQ== ARC-Authentication-Results: i=1; mx.google.com; dkim=neutral (body hash did not verify) header.i=@gmail.com header.b=KwwGWX6Z; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org; dmarc=fail (p=NONE sp=NONE dis=NONE) header.from=gmail.com Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org. [79.124.17.100]) by mx.google.com with ESMTP id 34si1146620wrg.43.2017.07.06.19.32.34; Thu, 06 Jul 2017 19:32:34 -0700 (PDT) Received-SPF: pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) client-ip=79.124.17.100; Authentication-Results: mx.google.com; dkim=neutral (body hash did not verify) header.i=@gmail.com header.b=KwwGWX6Z; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org; dmarc=fail (p=NONE sp=NONE dis=NONE) header.from=gmail.com Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 1C291689950; Fri, 7 Jul 2017 05:32:29 +0300 (EEST) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from mail-pf0-f193.google.com (mail-pf0-f193.google.com [209.85.192.193]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id 92EDF689913 for ; Fri, 7 Jul 2017 05:32:21 +0300 (EEST) Received: by mail-pf0-f193.google.com with SMTP id e199so2600389pfh.0 for ; Thu, 06 Jul 2017 19:32:24 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:mime-version :content-transfer-encoding; bh=YJ6xWaWUDQgkhtpxqae9kSg68mMzHQLGfbbngKw3+14=; b=KwwGWX6ZoVuK/E22/+LARp8y+KQWokQimlXVkhusTABGsVq8cGh5CTFDURclKEfz/C eqE9QyvUFgo+QpyEBPDTeydGKsgfkEhJgMJHwwkyoef+8h+3crgsZ0f8VMgkctB6vU6q p3BQmANZzU/1pWSp7jXX6kSyM/QatzZ8PMQXOaFmrtka2ESFn2JZOiveBAO4RKj+jsXM WYboFT2dkYTO0BoK/eaUzjydtn232TQ6MY5RqWpmkzIFr9xVmsc4jnQ/sxy4EdLo9GdM gKwFs66qFaRxPbiL/XddZD7RVtCmbkxsCU8YsyUWPdeTQU31y+SU0mzFuRbzg1+g0h24 f9FA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:mime-version :content-transfer-encoding; bh=YJ6xWaWUDQgkhtpxqae9kSg68mMzHQLGfbbngKw3+14=; b=m4mplBwBbV282PPwP52k4U1p/RNSAG5tCfK4hOem5YO2f3JFXVyjPYCI7rSpAkxzpt 5SLzPAGtQheAljZeYB6mV0cPLorkGUXLvG4x7I0lM/3SaaGdwiZ7TcYPe3mDaDdJ+Cnt ebviInAtbt3N5vfO2b5kLBkVdVTNM/V9Se3Syuh1/frDXyYuQUBFJUvEltLG3KsO/0Cn gYNprjhd7LosrsvPoP4C5bM5U5kQ3V2KHmSsa965YjnhMzaMIDF9GH+q58TiCpaJG2XH tvajZtxuFViCFuW3CwNFhoXy9KjcQm+ElubbE+oJQ068OiV73kJmX9eSZ1OURbkq/xae iYEw== X-Gm-Message-State: AIVw113WMOUGwy0yrQxvkLWV8Pxe4ZBScx6cYjYK8HkNxVAWcc2adWrX cPkU44viKR72wFJy X-Received: by 10.99.117.68 with SMTP id f4mr22690423pgn.56.1499394369022; Thu, 06 Jul 2017 19:26:09 -0700 (PDT) Received: from localhost.localdomain ([114.124.171.108]) by smtp.gmail.com with ESMTPSA id n81sm3332409pfb.31.2017.07.06.19.26.06 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Thu, 06 Jul 2017 19:26:08 -0700 (PDT) From: Muhammad Faiz To: ffmpeg-devel@ffmpeg.org Date: Fri, 7 Jul 2017 09:25:46 +0700 Message-Id: <20170707022546.6164-1-mfcc64@gmail.com> X-Mailer: git-send-email 2.9.3 MIME-Version: 1.0 Subject: [FFmpeg-devel] [PATCH] avfilter/pthread: rewrite implementation X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.20 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Cc: Muhammad Faiz Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" Avoid pthread_cond_broadcast that wakes up all workers. Make each of them uses distict mutex/cond. Benchmark using afir with threads=4 and 4096 taps fir: channels=1: old: 2128210 decicycles in afir_execute, 2 runs, 0 skips 1382927 decicycles in afir_execute, 1024 runs, 0 skips 1367985 decicycles in afir_execute, 16374 runs, 10 skips new: 1011270 decicycles in afir_execute, 2 runs, 0 skips 939891 decicycles in afir_execute, 1024 runs, 0 skips 955812 decicycles in afir_execute, 16383 runs, 1 skips channels=2: old: 2801720 decicycles in afir_execute, 2 runs, 0 skips 1624556 decicycles in afir_execute, 1024 runs, 0 skips 1722584 decicycles in afir_execute, 16380 runs, 4 skips new: 1864780 decicycles in afir_execute, 2 runs, 0 skips 1307955 decicycles in afir_execute, 1024 runs, 0 skips 1110917 decicycles in afir_execute, 16384 runs, 0 skips channels=3: old: 3031255 decicycles in afir_execute, 2 runs, 0 skips 2545295 decicycles in afir_execute, 1024 runs, 0 skips 2498368 decicycles in afir_execute, 16384 runs, 0 skips new: 2213540 decicycles in afir_execute, 2 runs, 0 skips 2305479 decicycles in afir_execute, 1024 runs, 0 skips 2001942 decicycles in afir_execute, 16382 runs, 2 skips channels=4: old: 4642510 decicycles in afir_execute, 2 runs, 0 skips 3356856 decicycles in afir_execute, 1024 runs, 0 skips 2994766 decicycles in afir_execute, 16382 runs, 2 skips new: 3590650 decicycles in afir_execute, 2 runs, 0 skips 2456035 decicycles in afir_execute, 1024 runs, 0 skips 2332966 decicycles in afir_execute, 16384 runs, 0 skips channels=6: old: 5057785 decicycles in afir_execute, 2 runs, 0 skips 4279000 decicycles in afir_execute, 1023 runs, 1 skips 4102256 decicycles in afir_execute, 16383 runs, 1 skips new: 4244160 decicycles in afir_execute, 2 runs, 0 skips 3851306 decicycles in afir_execute, 1024 runs, 0 skips 3343221 decicycles in afir_execute, 16384 runs, 0 skips channels=8: old: 4871740 decicycles in afir_execute, 2 runs, 0 skips 4807337 decicycles in afir_execute, 1023 runs, 1 skips 4454018 decicycles in afir_execute, 16374 runs, 10 skips new: 5055460 decicycles in afir_execute, 2 runs, 0 skips 4554674 decicycles in afir_execute, 1023 runs, 1 skips 4433398 decicycles in afir_execute, 16382 runs, 2 skips Signed-off-by: Muhammad Faiz --- libavfilter/pthread.c | 189 +++++++++++++++++++++++++++++++------------------- 1 file changed, 116 insertions(+), 73 deletions(-) diff --git a/libavfilter/pthread.c b/libavfilter/pthread.c index c7a0021..8fb3409 100644 --- a/libavfilter/pthread.c +++ b/libavfilter/pthread.c @@ -21,6 +21,7 @@ * Libavfilter multithreading support */ +#include #include "config.h" #include "libavutil/common.h" @@ -32,61 +33,75 @@ #include "internal.h" #include "thread.h" +typedef struct WorkerContext WorkerContext; + typedef struct ThreadContext { AVFilterGraph *graph; - int nb_threads; - pthread_t *workers; + int nb_workers; + WorkerContext *workers; avfilter_action_func *func; /* per-execute parameters */ AVFilterContext *ctx; void *arg; int *rets; - int nb_jobs; + unsigned nb_jobs; - pthread_cond_t last_job_cond; - pthread_cond_t current_job_cond; - pthread_mutex_t current_job_lock; - int current_job; - unsigned int current_execute; + pthread_mutex_t mutex_done; + pthread_cond_t cond_done; + atomic_uint current_job; + atomic_uint nb_finished_jobs; int done; } ThreadContext; -static void* attribute_align_arg worker(void *v) +struct WorkerContext { + ThreadContext *ctx; + pthread_t thread; + pthread_mutex_t mutex; + pthread_cond_t cond; + int done; +}; + +static unsigned run_jobs(ThreadContext *c) { - ThreadContext *c = v; - int our_job = c->nb_jobs; - int nb_threads = c->nb_threads; - unsigned int last_execute = 0; - int ret, self_id; - - pthread_mutex_lock(&c->current_job_lock); - self_id = c->current_job++; - - for (;;) { - while (our_job >= c->nb_jobs) { - if (c->current_job == nb_threads + c->nb_jobs) - pthread_cond_signal(&c->last_job_cond); - - while (last_execute == c->current_execute && !c->done) - pthread_cond_wait(&c->current_job_cond, &c->current_job_lock); - last_execute = c->current_execute; - our_job = self_id; - - if (c->done) { - pthread_mutex_unlock(&c->current_job_lock); - return NULL; - } - } - pthread_mutex_unlock(&c->current_job_lock); + unsigned current_job, nb_finished_jobs = 0; - ret = c->func(c->ctx, c->arg, our_job, c->nb_jobs); + while (nb_finished_jobs != c->nb_jobs && + (current_job = atomic_fetch_add_explicit(&c->current_job, 1, memory_order_acq_rel)) < c->nb_jobs) { + int ret = c->func(c->ctx, c->arg, current_job, c->nb_jobs); if (c->rets) - c->rets[our_job % c->nb_jobs] = ret; + c->rets[current_job] = ret; + nb_finished_jobs = atomic_fetch_add_explicit(&c->nb_finished_jobs, 1, memory_order_acq_rel) + 1; + } + + return nb_finished_jobs; +} + +static void* attribute_align_arg worker(void *v) +{ + WorkerContext *w = v; + ThreadContext *c = w->ctx; + + pthread_mutex_lock(&w->mutex); + pthread_cond_signal(&w->cond); + + while (1) { + w->done = 1; + while (w->done) + pthread_cond_wait(&w->cond, &w->mutex); + + if (c->done) { + pthread_mutex_unlock(&w->mutex); + return NULL; + } - pthread_mutex_lock(&c->current_job_lock); - our_job = c->current_job++; + if (run_jobs(c) == c->nb_jobs) { + pthread_mutex_lock(&c->mutex_done); + c->done = 1; + pthread_cond_signal(&c->cond_done); + pthread_mutex_unlock(&c->mutex_done); + } } } @@ -94,48 +109,66 @@ static void slice_thread_uninit(ThreadContext *c) { int i; - pthread_mutex_lock(&c->current_job_lock); c->done = 1; - pthread_cond_broadcast(&c->current_job_cond); - pthread_mutex_unlock(&c->current_job_lock); + for (i = 0; i < c->nb_workers; i++) { + WorkerContext *w = &c->workers[i]; - for (i = 0; i < c->nb_threads; i++) - pthread_join(c->workers[i], NULL); + pthread_mutex_lock(&w->mutex); + w->done = 0; + pthread_cond_signal(&w->cond); + pthread_mutex_unlock(&w->mutex); - pthread_mutex_destroy(&c->current_job_lock); - pthread_cond_destroy(&c->current_job_cond); - pthread_cond_destroy(&c->last_job_cond); - av_freep(&c->workers); -} + pthread_join(w->thread, NULL); + pthread_cond_destroy(&w->cond); + pthread_mutex_destroy(&w->mutex); + } -static void slice_thread_park_workers(ThreadContext *c) -{ - while (c->current_job != c->nb_threads + c->nb_jobs) - pthread_cond_wait(&c->last_job_cond, &c->current_job_lock); - pthread_mutex_unlock(&c->current_job_lock); + pthread_cond_destroy(&c->cond_done); + pthread_mutex_destroy(&c->mutex_done); + av_freep(&c->workers); } static int thread_execute(AVFilterContext *ctx, avfilter_action_func *func, - void *arg, int *ret, int nb_jobs) + void *arg, int *rets, int nb_jobs) { ThreadContext *c = ctx->graph->internal->thread; + int nb_workers, i; if (nb_jobs <= 0) return 0; - pthread_mutex_lock(&c->current_job_lock); + if (nb_jobs == 1) { + int ret = func(ctx, arg, 0, 1); + if (rets) + rets[0] = ret; + return 0; + } - c->current_job = c->nb_threads; + nb_workers = FFMIN(c->nb_workers, nb_jobs - 1); + atomic_store_explicit(&c->current_job, 0, memory_order_relaxed); + atomic_store_explicit(&c->nb_finished_jobs, 0, memory_order_relaxed); c->nb_jobs = nb_jobs; c->ctx = ctx; c->arg = arg; c->func = func; - c->rets = ret; - c->current_execute++; + c->rets = rets; + + for (i = 0; i < nb_workers; i++) { + WorkerContext *w = &c->workers[i]; - pthread_cond_broadcast(&c->current_job_cond); + pthread_mutex_lock(&w->mutex); + w->done = 0; + pthread_cond_signal(&w->cond); + pthread_mutex_unlock(&w->mutex); + } - slice_thread_park_workers(c); + if (run_jobs(c) != c->nb_jobs) { + pthread_mutex_lock(&c->mutex_done); + while (!c->done) + pthread_cond_wait(&c->cond_done, &c->mutex_done); + c->done = 0; + pthread_mutex_unlock(&c->mutex_done); + } return 0; } @@ -156,33 +189,43 @@ static int thread_init_internal(ThreadContext *c, int nb_threads) if (nb_threads <= 1) return 1; - c->nb_threads = nb_threads; - c->workers = av_mallocz_array(sizeof(*c->workers), nb_threads); + c->nb_workers = nb_threads - 1; + c->workers = av_mallocz_array(sizeof(*c->workers), c->nb_workers); if (!c->workers) return AVERROR(ENOMEM); - c->current_job = 0; + pthread_mutex_init(&c->mutex_done, NULL); + pthread_cond_init(&c->cond_done, NULL); + atomic_init(&c->current_job, 0); + atomic_init(&c->nb_finished_jobs, 0); c->nb_jobs = 0; c->done = 0; - pthread_cond_init(&c->current_job_cond, NULL); - pthread_cond_init(&c->last_job_cond, NULL); + for (i = 0; i < c->nb_workers; i++) { + WorkerContext *w = &c->workers[i]; - pthread_mutex_init(&c->current_job_lock, NULL); - pthread_mutex_lock(&c->current_job_lock); - for (i = 0; i < nb_threads; i++) { - ret = pthread_create(&c->workers[i], NULL, worker, c); + w->ctx = c; + pthread_mutex_init(&w->mutex, NULL); + pthread_cond_init(&w->cond, NULL); + pthread_mutex_lock(&w->mutex); + w->done = 0; + ret = pthread_create(&w->thread, NULL, worker, w); if (ret) { - pthread_mutex_unlock(&c->current_job_lock); - c->nb_threads = i; + c->nb_workers = i; + pthread_mutex_unlock(&w->mutex); + pthread_cond_destroy(&w->cond); + pthread_mutex_destroy(&w->mutex); slice_thread_uninit(c); return AVERROR(ret); } + + while (!w->done) + pthread_cond_wait(&w->cond, &w->mutex); + pthread_mutex_unlock(&w->mutex); } - slice_thread_park_workers(c); - return c->nb_threads; + return c->nb_workers + 1; } int ff_graph_thread_init(AVFilterGraph *graph)