From patchwork Fri Jul 7 14:04:37 2017 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Muhammad Faiz X-Patchwork-Id: 4257 Delivered-To: ffmpegpatchwork@gmail.com Received: by 10.103.1.76 with SMTP id 73csp259526vsb; Fri, 7 Jul 2017 07:05:15 -0700 (PDT) X-Received: by 10.223.164.29 with SMTP id d29mr867440wra.183.1499436315000; Fri, 07 Jul 2017 07:05:15 -0700 (PDT) ARC-Seal: i=1; a=rsa-sha256; t=1499436314; cv=none; d=google.com; s=arc-20160816; b=raLazSW+eNC/m5jwVTtAMmgRjkwpzxLWb6qW/7F+7qWglkvfVz+/y1TXeGttxn2VRc oNTuJ9SsdfRbLqT1qMRXpMN4AvDOgMnS3HotIrQ5TdNGQffI0sCfKbKh03eqJxGn/4jJ jDv40319yaTAtQ1lP5pgW8Jc8ZPvHJSYDSNeFlV5lAnCzwq5PTW/RwSQZJ3+wiuJLDoT AeunX2VF7+eZen+QOwvqCg8FTVZVmgeb9JSULXz1brjEKU5q+IznSQXuqM11vT7kiRFs 1B5Ny+xwUxr/uOzOxon+uW6jLANZ/qQmZQQWKcaF33eTX4LyDNuh69kGjnhyUBmWan05 6C0w== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:content-transfer-encoding:cc:reply-to :list-subscribe:list-help:list-post:list-archive:list-unsubscribe :list-id:precedence:subject:mime-version:message-id:date:to:from :dkim-signature:delivered-to:arc-authentication-results; bh=GCDoNGL9D8cPEyxeKEVZLQ5a4dT5zb6CSFI8/knvfY4=; b=ypsmDWNwXBdjqhnm7j/SnyO6wJZp2Nl0m9+9YruCnnvFaXUzTXEY9Qo65taRDLOAHK RTP9jBVd5EuA8iVYVU3uFD9cO7ZVdfR6omKFrOBWjWSEgNlf29WqZv8Q5OtnJY6FkpL7 uuO5mjYcZLxSfYzopbwmaulldA9x2UTTDnPiW1sDDvUzUSBAOEO8q6vo+uX9WkaIj/6q jeK26/Sri0maUqoVkIWjAFkNAKd4wxYavFRBc6UlT+LJ6pYxFqPbJikkpUviPfJBMqVP zDzH7Q47efzjNogP5KbigO0AHTKdpnnFWH+SgwwNZgOui+cM96EpnyN9Zae9VvIjZ9Mi 5DnQ== ARC-Authentication-Results: i=1; mx.google.com; dkim=neutral (body hash did not verify) header.i=@gmail.com header.b=G4PpwxWq; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org; dmarc=fail (p=NONE sp=NONE dis=NONE) header.from=gmail.com Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org. [79.124.17.100]) by mx.google.com with ESMTP id n184si3121387wmn.34.2017.07.07.07.05.13; Fri, 07 Jul 2017 07:05:14 -0700 (PDT) Received-SPF: pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) client-ip=79.124.17.100; Authentication-Results: mx.google.com; dkim=neutral (body hash did not verify) header.i=@gmail.com header.b=G4PpwxWq; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org; dmarc=fail (p=NONE sp=NONE dis=NONE) header.from=gmail.com Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 4B815689AA0; Fri, 7 Jul 2017 17:05:07 +0300 (EEST) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from mail-pf0-f193.google.com (mail-pf0-f193.google.com [209.85.192.193]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id 55BC0689A87 for ; Fri, 7 Jul 2017 17:05:00 +0300 (EEST) Received: by mail-pf0-f193.google.com with SMTP id q85so4779999pfq.2 for ; Fri, 07 Jul 2017 07:05:03 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:mime-version :content-transfer-encoding; bh=WqEKc8XUHDumSNWqBMmSwIiBnrdIZEI5aFtfVULaZdM=; b=G4PpwxWqmks4y05Oe+nEHv+dxiCkM520K3B3apBt4BnmYlHyV4PmT8kYntjBOVchEE kyYA73a5b3I1BtaGwpxLr96bCQ0H1spa/9A89kA1jF+M5YnKGU/kJRG8wI+QNGegZtWF hUuATrJjHR1WRfOpmqGmoIAAgw5KNmMEkHhte5/wYihO65KWRLZZr89AeCuMhPkiKPiQ Z6LPVCRo/oYz3IEBsoUMNnnXJwW9fHvixYNYHyu8YsHMiGxxK2VfiGHTLrFG0ySdIVgK bAUzwe6aTDuB6JoCegSKXe4NxkzE1Mx+AUryF2UIqO1fZGeCRu6/Sw1MmfzJHRusR3ZD uKPA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:mime-version :content-transfer-encoding; bh=WqEKc8XUHDumSNWqBMmSwIiBnrdIZEI5aFtfVULaZdM=; b=cJS38W3kzvvineuuNpXCNkqFJJMGpq8COZdH+B5Tm3raRPaYsS6qVAgbnfJkEIJWai fwLljtCUHfOrzEi+0FMR3oL0bOcnZhgisxR7Qk+nSqmAXLm/5lVZnT1cR+vIUDjqD4jQ KHuu+Cf7RZI9HL0INkahJtzkoHGB8pgeqKiLEtcIuvI+pk6MvxPm1xaEfwPp+oAfyKIw VqjMwOnbFS5nHmcjwo3YhYPeMkprCcDnw7RiCTegt83E0y2Nnvk9JwUAJBLneQFc7kT+ ehpFI8WdhcHvl3gjKeyFERDqSosYmC1B+cQ9qG5608E5QfjMAzv12qnZ34XpMrNXg+ZS gODg== X-Gm-Message-State: AIVw111KxEXNZ+H2po5n3BGHDTD7svv97AFrBHgiL+L0cSORvOshaxyP NY+sdkO0YmL6mFDR X-Received: by 10.98.16.12 with SMTP id y12mr30899350pfi.85.1499436301468; Fri, 07 Jul 2017 07:05:01 -0700 (PDT) Received: from localhost.localdomain ([114.124.165.70]) by smtp.gmail.com with ESMTPSA id 73sm5252146pgf.41.2017.07.07.07.04.57 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Fri, 07 Jul 2017 07:05:00 -0700 (PDT) From: Muhammad Faiz To: ffmpeg-devel@ffmpeg.org Date: Fri, 7 Jul 2017 21:04:37 +0700 Message-Id: <20170707140438.15026-1-mfcc64@gmail.com> X-Mailer: git-send-email 2.9.3 MIME-Version: 1.0 Subject: [FFmpeg-devel] [PATCH v2] avfilter/pthread: rewrite implementation X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.20 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Cc: Muhammad Faiz Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" Avoid pthread_cond_broadcast that wakes up all workers. Make each of them uses distict mutex/cond. Also let main thread help running jobs. Benchmark using afir with threads=5 and 4096 taps fir: channels=1: old: 1849650 decicycles in afir_execute, 2 runs, 0 skips 1525719 decicycles in afir_execute, 1024 runs, 0 skips 1546032 decicycles in afir_execute, 16356 runs, 28 skips new: 1495525 decicycles in afir_execute, 2 runs, 0 skips 968897 decicycles in afir_execute, 1024 runs, 0 skips 941286 decicycles in afir_execute, 16384 runs, 0 skips channels=2: old: 3135485 decicycles in afir_execute, 2 runs, 0 skips 1967158 decicycles in afir_execute, 1024 runs, 0 skips 1802430 decicycles in afir_execute, 16364 runs, 20 skips new: 1864750 decicycles in afir_execute, 2 runs, 0 skips 1437792 decicycles in afir_execute, 1024 runs, 0 skips 1183963 decicycles in afir_execute, 16382 runs, 2 skips channels=4: old: 4879925 decicycles in afir_execute, 2 runs, 0 skips 3557950 decicycles in afir_execute, 1022 runs, 2 skips 3206843 decicycles in afir_execute, 16379 runs, 5 skips new: 2962320 decicycles in afir_execute, 2 runs, 0 skips 2450430 decicycles in afir_execute, 1024 runs, 0 skips 2446219 decicycles in afir_execute, 16383 runs, 1 skips channels=8: old: 6032455 decicycles in afir_execute, 2 runs, 0 skips 4838614 decicycles in afir_execute, 1023 runs, 1 skips 4720760 decicycles in afir_execute, 16369 runs, 15 skips new: 5228150 decicycles in afir_execute, 2 runs, 0 skips 4592129 decicycles in afir_execute, 1023 runs, 1 skips 4469067 decicycles in afir_execute, 16383 runs, 1 skips Signed-off-by: Muhammad Faiz --- libavfilter/pthread.c | 197 +++++++++++++++++++++++++++++++------------------- 1 file changed, 124 insertions(+), 73 deletions(-) diff --git a/libavfilter/pthread.c b/libavfilter/pthread.c index c7a0021..0ba7864 100644 --- a/libavfilter/pthread.c +++ b/libavfilter/pthread.c @@ -21,6 +21,7 @@ * Libavfilter multithreading support */ +#include #include "config.h" #include "libavutil/common.h" @@ -32,61 +33,76 @@ #include "internal.h" #include "thread.h" +typedef struct WorkerContext WorkerContext; + typedef struct ThreadContext { AVFilterGraph *graph; - int nb_threads; - pthread_t *workers; + int nb_workers; + WorkerContext *workers; avfilter_action_func *func; /* per-execute parameters */ AVFilterContext *ctx; void *arg; int *rets; - int nb_jobs; + unsigned nb_jobs; - pthread_cond_t last_job_cond; - pthread_cond_t current_job_cond; - pthread_mutex_t current_job_lock; - int current_job; - unsigned int current_execute; + pthread_mutex_t mutex_done; + pthread_cond_t cond_done; + atomic_uint current_job; + atomic_uint nb_finished_jobs; int done; + int worker_done; } ThreadContext; -static void* attribute_align_arg worker(void *v) +struct WorkerContext { + ThreadContext *ctx; + pthread_t thread; + pthread_mutex_t mutex; + pthread_cond_t cond; + int done; +}; + +static unsigned run_jobs(ThreadContext *c) { - ThreadContext *c = v; - int our_job = c->nb_jobs; - int nb_threads = c->nb_threads; - unsigned int last_execute = 0; - int ret, self_id; - - pthread_mutex_lock(&c->current_job_lock); - self_id = c->current_job++; - - for (;;) { - while (our_job >= c->nb_jobs) { - if (c->current_job == nb_threads + c->nb_jobs) - pthread_cond_signal(&c->last_job_cond); - - while (last_execute == c->current_execute && !c->done) - pthread_cond_wait(&c->current_job_cond, &c->current_job_lock); - last_execute = c->current_execute; - our_job = self_id; - - if (c->done) { - pthread_mutex_unlock(&c->current_job_lock); - return NULL; - } - } - pthread_mutex_unlock(&c->current_job_lock); + unsigned current_job, nb_finished_jobs = 0; - ret = c->func(c->ctx, c->arg, our_job, c->nb_jobs); + while (nb_finished_jobs != c->nb_jobs && + (current_job = atomic_fetch_add_explicit(&c->current_job, 1, memory_order_acq_rel)) < c->nb_jobs) { + int ret = c->func(c->ctx, c->arg, current_job, c->nb_jobs); if (c->rets) - c->rets[our_job % c->nb_jobs] = ret; + c->rets[current_job] = ret; + nb_finished_jobs = atomic_fetch_add_explicit(&c->nb_finished_jobs, 1, memory_order_acq_rel) + 1; + } + + return nb_finished_jobs; +} + +static void* attribute_align_arg worker(void *v) +{ + WorkerContext *w = v; + ThreadContext *c = w->ctx; + + pthread_mutex_lock(&w->mutex); + pthread_cond_signal(&w->cond); + + while (1) { + w->done = 1; + while (w->done) + pthread_cond_wait(&w->cond, &w->mutex); + + if (c->done) { + pthread_mutex_unlock(&w->mutex); + return NULL; + } - pthread_mutex_lock(&c->current_job_lock); - our_job = c->current_job++; + if (run_jobs(c) == c->nb_jobs) { + pthread_mutex_lock(&c->mutex_done); + c->worker_done = 1; + pthread_cond_signal(&c->cond_done); + pthread_mutex_unlock(&c->mutex_done); + } } } @@ -94,48 +110,72 @@ static void slice_thread_uninit(ThreadContext *c) { int i; - pthread_mutex_lock(&c->current_job_lock); + for (i = 0; i < c->nb_workers; i++) + pthread_mutex_lock(&c->workers[i].mutex); + c->done = 1; - pthread_cond_broadcast(&c->current_job_cond); - pthread_mutex_unlock(&c->current_job_lock); - for (i = 0; i < c->nb_threads; i++) - pthread_join(c->workers[i], NULL); + for (i = 0; i < c->nb_workers; i++) { + WorkerContext *w = &c->workers[i]; + w->done = 0; + pthread_cond_signal(&w->cond); + pthread_mutex_unlock(&w->mutex); + } - pthread_mutex_destroy(&c->current_job_lock); - pthread_cond_destroy(&c->current_job_cond); - pthread_cond_destroy(&c->last_job_cond); - av_freep(&c->workers); -} + for (i = 0; i < c->nb_workers; i++) { + WorkerContext *w = &c->workers[i]; + pthread_join(w->thread, NULL); + pthread_cond_destroy(&w->cond); + pthread_mutex_destroy(&w->mutex); + } -static void slice_thread_park_workers(ThreadContext *c) -{ - while (c->current_job != c->nb_threads + c->nb_jobs) - pthread_cond_wait(&c->last_job_cond, &c->current_job_lock); - pthread_mutex_unlock(&c->current_job_lock); + pthread_cond_destroy(&c->cond_done); + pthread_mutex_destroy(&c->mutex_done); + av_freep(&c->workers); } static int thread_execute(AVFilterContext *ctx, avfilter_action_func *func, - void *arg, int *ret, int nb_jobs) + void *arg, int *rets, int nb_jobs) { ThreadContext *c = ctx->graph->internal->thread; + int nb_workers, i; if (nb_jobs <= 0) return 0; - pthread_mutex_lock(&c->current_job_lock); + if (nb_jobs == 1) { + int ret = func(ctx, arg, 0, 1); + if (rets) + rets[0] = ret; + return 0; + } + + nb_workers = FFMIN(c->nb_workers, nb_jobs - 1); + for (i = 0; i < nb_workers; i++) + pthread_mutex_lock(&c->workers[i].mutex); - c->current_job = c->nb_threads; + atomic_store_explicit(&c->current_job, 0, memory_order_relaxed); + atomic_store_explicit(&c->nb_finished_jobs, 0, memory_order_relaxed); c->nb_jobs = nb_jobs; c->ctx = ctx; c->arg = arg; c->func = func; - c->rets = ret; - c->current_execute++; + c->rets = rets; - pthread_cond_broadcast(&c->current_job_cond); + for (i = 0; i < nb_workers; i++) { + WorkerContext *w = &c->workers[i]; + w->done = 0; + pthread_cond_signal(&w->cond); + pthread_mutex_unlock(&w->mutex); + } - slice_thread_park_workers(c); + if (run_jobs(c) != c->nb_jobs) { + pthread_mutex_lock(&c->mutex_done); + while (!c->worker_done) + pthread_cond_wait(&c->cond_done, &c->mutex_done); + c->worker_done = 0; + pthread_mutex_unlock(&c->mutex_done); + } return 0; } @@ -156,33 +196,44 @@ static int thread_init_internal(ThreadContext *c, int nb_threads) if (nb_threads <= 1) return 1; - c->nb_threads = nb_threads; - c->workers = av_mallocz_array(sizeof(*c->workers), nb_threads); + c->nb_workers = nb_threads - 1; + c->workers = av_mallocz_array(sizeof(*c->workers), c->nb_workers); if (!c->workers) return AVERROR(ENOMEM); - c->current_job = 0; + pthread_mutex_init(&c->mutex_done, NULL); + pthread_cond_init(&c->cond_done, NULL); + atomic_init(&c->current_job, 0); + atomic_init(&c->nb_finished_jobs, 0); c->nb_jobs = 0; c->done = 0; + c->worker_done = 0; - pthread_cond_init(&c->current_job_cond, NULL); - pthread_cond_init(&c->last_job_cond, NULL); + for (i = 0; i < c->nb_workers; i++) { + WorkerContext *w = &c->workers[i]; - pthread_mutex_init(&c->current_job_lock, NULL); - pthread_mutex_lock(&c->current_job_lock); - for (i = 0; i < nb_threads; i++) { - ret = pthread_create(&c->workers[i], NULL, worker, c); + w->ctx = c; + pthread_mutex_init(&w->mutex, NULL); + pthread_cond_init(&w->cond, NULL); + pthread_mutex_lock(&w->mutex); + w->done = 0; + ret = pthread_create(&w->thread, NULL, worker, w); if (ret) { - pthread_mutex_unlock(&c->current_job_lock); - c->nb_threads = i; + c->nb_workers = i; + pthread_mutex_unlock(&w->mutex); + pthread_cond_destroy(&w->cond); + pthread_mutex_destroy(&w->mutex); slice_thread_uninit(c); return AVERROR(ret); } + + while (!w->done) + pthread_cond_wait(&w->cond, &w->mutex); + pthread_mutex_unlock(&w->mutex); } - slice_thread_park_workers(c); - return c->nb_threads; + return c->nb_workers + 1; } int ff_graph_thread_init(AVFilterGraph *graph)