From patchwork Fri May 28 09:24:53 2021 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Shubhanshu Saxena X-Patchwork-Id: 27964 Delivered-To: ffmpegpatchwork2@gmail.com Received: by 2002:a6b:b214:0:0:0:0:0 with SMTP id b20csp368592iof; Fri, 28 May 2021 02:26:37 -0700 (PDT) X-Google-Smtp-Source: ABdhPJxg9Z2AZYFwsJW9HnSFMkVqe/QaOgiwV5NuUMdthX9M16TbE7e1jO5wyoSi5FaiRq0aPgxu X-Received: by 2002:a17:907:6289:: with SMTP id nd9mr8040507ejc.384.1622193997192; Fri, 28 May 2021 02:26:37 -0700 (PDT) ARC-Seal: i=1; a=rsa-sha256; t=1622193997; cv=none; d=google.com; s=arc-20160816; b=nex3iORkUbWqfxO2DfF7x3ypXBGdmYOYmfXWrpMRlSA6unihDzZQql8+JLotVMev3v oGWUEpWMcZmlN2hV/MlYvMYE0ryOj+cUOyPvhkvF2p3t1J3WBFajMTqdTZWCxpNrb7Jx rIudh7KGsFpDiQ/io9A1SHeK2U20T5ON08RUzkBBn9h3IIdEj/YlVelFZkZAAHry0Scv lEhgVL1j4ooQvPe1SnIkK/2i3gEC3IgpV8V4FGz91XzRdCtYXl7t9mgAUhf8S1nfQs21 vIy5aBqIrLhFPEs32YrxNWYzQ+VWqz7KMSb8y+1Ep/pwYaJ0UBB3iu8/F9tOK+p7FmPT qB4w== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:content-transfer-encoding:cc:reply-to :list-subscribe:list-help:list-post:list-archive:list-unsubscribe :list-id:precedence:subject:mime-version:references:in-reply-to :message-id:date:to:from:dkim-signature:delivered-to; bh=LuN/6FBW4LbEdpXK+g+c7i/16EMdF3ElqmfB2Nmj2IU=; b=h0TiE2zKcWf3dG1z5yJ3imAO1Ll4xu8d8V440/+PkmbxbkycQtJjZp+vz1vDXjvs6A CP7gXeYZHv8BwgHBQ33hMhBivYqB0ti16aIqFkWJTBXWFmLdetjX64oh5NiyoKNQGjdt u9m0tJrN5Rxx3dBboDVhW27KZg39FSYMffVrIwrIa0gEQAbK1mdlE+iI+pLA0aQzy5rO mv9k8rY8ywjY7P79TCHNXudzSlgjvqAJShI3EdgXF0MPPPSDDEbxnHaKYuKcllpcjvs+ sIgHp2rgADe7Wzx4BUmcd2jeoPilhlIgjkd/wK8Chl4RkWjIL/v60ZLDr6q5YnXhG5kf DJpw== ARC-Authentication-Results: i=1; mx.google.com; dkim=neutral (body hash did not verify) header.i=@gmail.com header.s=20161025 header.b=aCzMOUKu; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org; dmarc=fail (p=NONE sp=QUARANTINE dis=NONE) header.from=gmail.com Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org. [79.124.17.100]) by mx.google.com with ESMTP id re15si4604567ejb.120.2021.05.28.02.26.36; Fri, 28 May 2021 02:26:37 -0700 (PDT) Received-SPF: pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) client-ip=79.124.17.100; Authentication-Results: mx.google.com; dkim=neutral (body hash did not verify) header.i=@gmail.com header.s=20161025 header.b=aCzMOUKu; spf=pass (google.com: domain of ffmpeg-devel-bounces@ffmpeg.org designates 79.124.17.100 as permitted sender) smtp.mailfrom=ffmpeg-devel-bounces@ffmpeg.org; dmarc=fail (p=NONE sp=QUARANTINE dis=NONE) header.from=gmail.com Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 26669689FFE; Fri, 28 May 2021 12:26:08 +0300 (EEST) X-Original-To: ffmpeg-devel@ffmpeg.org Delivered-To: ffmpeg-devel@ffmpeg.org Received: from mail-pj1-f47.google.com (mail-pj1-f47.google.com [209.85.216.47]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id AA453689F2C for ; Fri, 28 May 2021 12:26:04 +0300 (EEST) Received: by mail-pj1-f47.google.com with SMTP id m8-20020a17090a4148b029015fc5d36343so2190083pjg.1 for ; Fri, 28 May 2021 02:26:04 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references :mime-version:content-transfer-encoding; bh=r5fqZtrtZk3YHOPb3Btd4nrDLrACTsjZm0TvVHkIYIw=; b=aCzMOUKuV5jgN36LoE43cHMGfSPfYc8JfoeI8J+6OmWkFSeZwAksQe04+spMoMJJHE BxplRGnx2RzUnIYizxWgKfOv92Ef9hdr2ivbumoevbDkXLRpbx42FHxON7C0/VGS6VHZ vzRNpe1hW/aLYhCqFfJcl7ym9ytEAOK2rETAG1rsgnmpVLGmgjMzIZj8mFFtkiEiL2MC l+U95RautcqstlMmNGruwjN3l3nG1D0nyPW+49BXv3j3rZAurDoFmIS2rQJ9RFoso4W5 K65eRjGu0zx7Xyxk7M/x0N6E/b+q05UT2jz30/JtCE0BFXmRFFoPRDxW+OzT9G38Weps aNLA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references:mime-version:content-transfer-encoding; bh=r5fqZtrtZk3YHOPb3Btd4nrDLrACTsjZm0TvVHkIYIw=; b=GE8A2JzHP5+k7PgfvKypyY6iQDijlFpp8cFx3gcwIziSXM59sz9MoVQhVnvfbvwyni G3oHl4cGjq/12FPMQPcTfIUsNzr1LGIlH5qF1WxNcuyZzgLDiaNISqc0o6GEcunSEJis 0F9rTeRpVSF0yZ2dH+9lzMYkGLDGjXv3hzSGGijhTMSW+SZCnVrOazKau3bTXD0es2UG hTCYCxtJAi/HmJqKoCT3R5Tp9ZI8vqJipRZtp6qB0DI2AM9UBxL6AVPfe6LGbe5L8z8b lzP5Y2qj6KUqjNbxzBfXfxwXaI+I7UGA6A59u/XycObbcXwFi+QcjG5zucRo6wtbdD3v nudA== X-Gm-Message-State: AOAM531gZB13dYH9Zpx7K7bOLIzuu8o7J2MkaR/+KfAAARxEoXsRLXIU 2NvbLiaxx63DSSWaq2ZJct0Tp/5VROSQLA== X-Received: by 2002:a17:902:728f:b029:fe:11e1:cb67 with SMTP id d15-20020a170902728fb02900fe11e1cb67mr7088117pll.23.1622193962834; Fri, 28 May 2021 02:26:02 -0700 (PDT) Received: from Pavilion-x360.bbrouter ([103.133.121.241]) by smtp.googlemail.com with ESMTPSA id q24sm3846892pgk.32.2021.05.28.02.26.01 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Fri, 28 May 2021 02:26:02 -0700 (PDT) From: Shubhanshu Saxena To: ffmpeg-devel@ffmpeg.org Date: Fri, 28 May 2021 14:54:53 +0530 Message-Id: <20210528092454.31874-9-shubhanshu.e01@gmail.com> X-Mailer: git-send-email 2.25.1 In-Reply-To: <20210528092454.31874-1-shubhanshu.e01@gmail.com> References: <20210528092454.31874-1-shubhanshu.e01@gmail.com> MIME-Version: 1.0 Subject: [FFmpeg-devel] [PATCH 09/10] lavfi/dnn: Async Support for TensorFlow Backend X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Cc: Shubhanshu Saxena Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" X-TUID: 3mYFarKvj2+n This commit adds functions to execute the inference requests to TensorFlow Backend asynchronously in detached threads. Signed-off-by: Shubhanshu Saxena --- libavfilter/dnn/dnn_backend_tf.c | 198 ++++++++++++++++++++++++++++--- libavfilter/dnn/dnn_backend_tf.h | 3 + libavfilter/dnn/dnn_interface.c | 3 + 3 files changed, 187 insertions(+), 17 deletions(-) diff --git a/libavfilter/dnn/dnn_backend_tf.c b/libavfilter/dnn/dnn_backend_tf.c index 31746deef4..296604461b 100644 --- a/libavfilter/dnn/dnn_backend_tf.c +++ b/libavfilter/dnn/dnn_backend_tf.c @@ -35,6 +35,7 @@ #include "dnn_backend_native_layer_maximum.h" #include "dnn_io_proc.h" #include "dnn_backend_common.h" +#include "libavutil/thread.h" #include "safe_queue.h" #include "queue.h" #include @@ -57,6 +58,7 @@ typedef struct TFModel{ TF_Status *status; SafeQueue *request_queue; Queue *inference_queue; + Queue *task_queue; } TFModel; typedef struct tf_infer_request { @@ -69,7 +71,10 @@ typedef struct tf_infer_request { typedef struct RequestItem { tf_infer_request *infer_request; InferenceItem *inference; - // further properties will be added later for async +#if HAVE_PTHREAD_CANCEL + pthread_t thread; + pthread_attr_t thread_attr; +#endif } RequestItem; #define OFFSET(x) offsetof(TFContext, x) @@ -83,6 +88,7 @@ static const AVOption dnn_tensorflow_options[] = { AVFILTER_DEFINE_CLASS(dnn_tensorflow); static DNNReturnType execute_model_tf(RequestItem *request, Queue *inference_queue); +static void infer_completion_callback(void *args); static void free_buffer(void *data, size_t length) { @@ -112,6 +118,59 @@ static tf_infer_request* tf_create_inference_request(void) return infer_request; } +static void tf_start_inference(RequestItem *request) +{ + tf_infer_request *infer_request = request->infer_request; + InferenceItem *inference = request->inference; + TaskItem *task = inference->task; + TFModel *tf_model = task->model; + + TF_SessionRun(tf_model->session, NULL, + infer_request->tf_input, &infer_request->input_tensor, 1, + infer_request->tf_outputs, infer_request->output_tensors, + task->nb_output, NULL, 0, NULL, + tf_model->status); +} + +static void *tf_thread_routine(void *arg) +{ + RequestItem *request = arg; + tf_start_inference(request); + infer_completion_callback(request); +#if HAVE_PTHREAD_CANCEL + pthread_exit(0); +#endif +} + +static DNNReturnType tf_start_inference_async(RequestItem *request) +{ + InferenceItem *inference = request->inference; + TaskItem *task = inference->task; + TFModel *tf_model = task->model; + TFContext *ctx = &tf_model->ctx; + int ret; + +#if HAVE_PTHREAD_CANCEL + ret = pthread_create(&request->thread, &request->thread_attr, tf_thread_routine, request); + if (ret != 0) + { + av_log(ctx, AV_LOG_ERROR, "unable to start async inference\n"); + return DNN_ERROR; + } + return DNN_SUCCESS; +#else + av_log(ctx, AV_LOG_WARNING, "pthreads not supported. Roll back to sync\n"); + tf_start_inference(request); + if (TF_GetCode(tf_model->status) != TF_OK) { + tf_free_request(request->infer_request); + av_log(ctx, AV_LOG_ERROR, "Failed to run session when executing model\n"); + return DNN_ERROR; + } + infer_completion_callback(request); + return (task->inference_done == task->inference_todo) ? DNN_SUCCESS : DNN_ERROR; +#endif +} + static DNNReturnType extract_inference_from_task(TaskItem *task, Queue *inference_queue) { TFModel *tf_model = task->model; @@ -826,7 +885,10 @@ DNNModel *ff_dnn_load_model_tf(const char *model_filename, DNNFunctionType func_ av_freep(&item); goto err; } - +#if HAVE_PTHREAD_CANCEL + pthread_attr_init(&item->thread_attr); + pthread_attr_setdetachstate(&item->thread_attr, PTHREAD_CREATE_DETACHED); +#endif if (ff_safe_queue_push_back(tf_model->request_queue, item) < 0) { av_freep(&item->infer_request); av_freep(&item); @@ -839,6 +901,16 @@ DNNModel *ff_dnn_load_model_tf(const char *model_filename, DNNFunctionType func_ goto err; } + tf_model->task_queue = ff_queue_create(); + if (!tf_model->task_queue) { + goto err; + } + + tf_model->inference_queue = ff_queue_create(); + if (!tf_model->inference_queue) { + goto err; + } + model->model = tf_model; model->get_input = &get_input_tf; model->get_output = &get_output_tf; @@ -1012,10 +1084,9 @@ final: static DNNReturnType execute_model_tf(RequestItem *request, Queue *inference_queue) { TFModel *tf_model; - TFContext *ctx; - tf_infer_request *infer_request; InferenceItem *inference; TaskItem *task; + TFContext *ctx; inference = ff_queue_peek_front(inference_queue); if (!inference) { @@ -1026,22 +1097,16 @@ static DNNReturnType execute_model_tf(RequestItem *request, Queue *inference_que tf_model = task->model; ctx = &tf_model->ctx; - if (task->async) { - avpriv_report_missing_feature(ctx, "Async execution not supported"); + if (fill_model_input_tf(tf_model, request) != DNN_SUCCESS) { return DNN_ERROR; - } else { - if (fill_model_input_tf(tf_model, request) != DNN_SUCCESS) { - return DNN_ERROR; - } + } - infer_request = request->infer_request; - TF_SessionRun(tf_model->session, NULL, - infer_request->tf_input, &infer_request->input_tensor, 1, - infer_request->tf_outputs, infer_request->output_tensors, - task->nb_output, NULL, 0, NULL, - tf_model->status); + if (task->async) { + return tf_start_inference_async(request); + } else { + tf_start_inference(request); if (TF_GetCode(tf_model->status) != TF_OK) { - tf_free_request(infer_request); + tf_free_request(request->infer_request); av_log(ctx, AV_LOG_ERROR, "Failed to run session when executing model\n"); return DNN_ERROR; } @@ -1079,6 +1144,94 @@ DNNReturnType ff_dnn_execute_model_tf(const DNNModel *model, DNNExecBaseParams * return execute_model_tf(request, tf_model->inference_queue); } +DNNReturnType ff_dnn_execute_model_async_tf(const DNNModel *model, DNNExecBaseParams *exec_params) { + TFModel *tf_model = model->model; + TFContext *ctx = &tf_model->ctx; + TaskItem *task; + RequestItem *request; + + if (ff_check_exec_params(ctx, DNN_TF, model->func_type, exec_params) != 0) { + return DNN_ERROR; + } + + task = av_malloc(sizeof(*task)); + if (!task) { + av_log(ctx, AV_LOG_ERROR, "unable to alloc memory for task item.\n"); + return DNN_ERROR; + } + + if (ff_dnn_fill_task(task, exec_params, tf_model, 1, 1) != DNN_SUCCESS) { + av_freep(&task); + return DNN_ERROR; + } + + if (ff_queue_push_back(tf_model->task_queue, task) < 0) { + av_freep(&task); + av_log(ctx, AV_LOG_ERROR, "unable to push back task_queue.\n"); + return DNN_ERROR; + } + + if (extract_inference_from_task(task, tf_model->inference_queue) != DNN_SUCCESS) { + av_log(ctx, AV_LOG_ERROR, "unable to extract inference from task.\n"); + return DNN_ERROR; + } + + request = ff_safe_queue_pop_front(tf_model->request_queue); + if (!request) { + av_log(ctx, AV_LOG_ERROR, "unable to get infer request.\n"); + return DNN_ERROR; + } + return execute_model_tf(request, tf_model->inference_queue); +} + +DNNAsyncStatusType ff_dnn_get_async_result_tf(const DNNModel *model, AVFrame **in, AVFrame **out) +{ + TFModel *tf_model = model->model; + TaskItem *task = ff_queue_peek_front(tf_model->task_queue); + + if (!task) { + return DAST_EMPTY_QUEUE; + } + + if (task->inference_done != task->inference_todo) { + return DAST_NOT_READY; + } + + *in = task->in_frame; + *out = task->out_frame; + ff_queue_pop_front(tf_model->task_queue); + av_freep(&task); + + return DAST_SUCCESS; +} + +DNNReturnType ff_dnn_flush_tf(const DNNModel *model) +{ + TFModel *tf_model = model->model; + TFContext *ctx = &tf_model->ctx; + RequestItem *request; + DNNReturnType ret; + + if (ff_queue_size(tf_model->inference_queue) == 0) { + // no pending task need to flush + return DNN_SUCCESS; + } + + request = ff_safe_queue_pop_front(tf_model->request_queue); + if (!request) { + av_log(ctx, AV_LOG_ERROR, "unable to get infer request.\n"); + return DNN_ERROR; + } + + ret = fill_model_input_tf(tf_model, request); + if (ret != DNN_SUCCESS) { + av_log(ctx, AV_LOG_ERROR, "Failed to fill model input.\n"); + return ret; + } + + return tf_start_inference_async(request); +} + void ff_dnn_free_model_tf(DNNModel **model) { TFModel *tf_model; @@ -1087,6 +1240,9 @@ void ff_dnn_free_model_tf(DNNModel **model) tf_model = (*model)->model; while (ff_safe_queue_size(tf_model->request_queue) != 0) { RequestItem *item = ff_safe_queue_pop_front(tf_model->request_queue); +#if HAVE_PTHREAD_CANCEL + pthread_attr_destroy(&item->thread_attr); +#endif tf_free_request(item->infer_request); av_freep(&item->infer_request); av_freep(&item); @@ -1099,6 +1255,14 @@ void ff_dnn_free_model_tf(DNNModel **model) } ff_queue_destroy(tf_model->inference_queue); + while (ff_queue_size(tf_model->task_queue) != 0) { + TaskItem *item = ff_queue_pop_front(tf_model->task_queue); + av_frame_free(&item->in_frame); + av_frame_free(&item->out_frame); + av_freep(&item); + } + ff_queue_destroy(tf_model->task_queue); + if (tf_model->graph){ TF_DeleteGraph(tf_model->graph); } diff --git a/libavfilter/dnn/dnn_backend_tf.h b/libavfilter/dnn/dnn_backend_tf.h index 3dfd6e4280..aec0fc2011 100644 --- a/libavfilter/dnn/dnn_backend_tf.h +++ b/libavfilter/dnn/dnn_backend_tf.h @@ -32,6 +32,9 @@ DNNModel *ff_dnn_load_model_tf(const char *model_filename, DNNFunctionType func_type, const char *options, AVFilterContext *filter_ctx); DNNReturnType ff_dnn_execute_model_tf(const DNNModel *model, DNNExecBaseParams *exec_params); +DNNReturnType ff_dnn_execute_model_async_tf(const DNNModel *model, DNNExecBaseParams *exec_params); +DNNAsyncStatusType ff_dnn_get_async_result_tf(const DNNModel *model, AVFrame **in, AVFrame **out); +DNNReturnType ff_dnn_flush_tf(const DNNModel *model); void ff_dnn_free_model_tf(DNNModel **model); diff --git a/libavfilter/dnn/dnn_interface.c b/libavfilter/dnn/dnn_interface.c index 02e532fc1b..81af934dd5 100644 --- a/libavfilter/dnn/dnn_interface.c +++ b/libavfilter/dnn/dnn_interface.c @@ -48,6 +48,9 @@ DNNModule *ff_get_dnn_module(DNNBackendType backend_type) #if (CONFIG_LIBTENSORFLOW == 1) dnn_module->load_model = &ff_dnn_load_model_tf; dnn_module->execute_model = &ff_dnn_execute_model_tf; + dnn_module->execute_model_async = &ff_dnn_execute_model_async_tf; + dnn_module->get_async_result = &ff_dnn_get_async_result_tf; + dnn_module->flush = &ff_dnn_flush_tf; dnn_module->free_model = &ff_dnn_free_model_tf; #else av_freep(&dnn_module);