1 /* 2 * Copyright 2020 Rockchip Electronics Co. LTD 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 * 16 * src author: <mediapipe-team@google.com> 17 * new author: modified by <rimon.xu@rock-chips.com> and <martin.cheng@rock-chips.com> 18 * date: 2020-03-19 19 * reference: https://github.com/google/mediapipe 20 */ 21 22 23 #ifndef SRC_RT_TASK_TASK_GRAPH_RTEXECUTOR_H_ 24 #define SRC_RT_TASK_TASK_GRAPH_RTEXECUTOR_H_ 25 26 #include "rt_header.h" 27 #include "RTThreadOptions.h" 28 #include "RTThreadPool.h" 29 #include "rt_metadata.h" 30 31 // Abstract base class for the task queue. 32 // NOTE: The task queue orders the ready tasks by their priorities. This 33 // enables the executor to run ready tasks in priority order. 34 class RTTaskQueue { 35 public: 36 virtual ~RTTaskQueue(); 37 38 // Runs the next ready task in the current thread. Should be invoked by the 39 // executor. This method should be called exactly as many times as AddTask 40 // was called on the executor. 41 virtual void runNextTask() = 0; 42 }; 43 44 // Abstract base class for the RTExecutor. 45 class RTExecutor { 46 public: 47 virtual ~RTExecutor(); 48 49 // A registered RTExecutor subclass must implement the static factory method 50 // Create. The RTExecutor subclass cannot be registered without it. 51 // 52 // static ::mediapipe::StatusOr<RTExecutor*> Create( 53 // const MediaPipeOptions& extendable_options); 54 // 55 // Create validates extendable_options, then calls the constructor, and 56 // returns the newly allocated RTExecutor object. 57 58 // The scheduler queue calls this method to tell the executor that it has 59 // a new task to run. The executor should use its execution mechanism to 60 // invoke taskQueue->runNextTask. 61 virtual void addTask(RTTaskQueue* taskQueue, INT32 threadId = 0) { 62 schedule([taskQueue] { taskQueue->runNextTask(); }, threadId); 63 } 64 65 // schedule the specified "task" for execution in this executor. 66 virtual void schedule(std::function<void()> task, INT32 threadId = 0) = 0; 67 virtual INT32 getNumThreads() const = 0; 68 }; 69 70 // A multithreaded executor based on a thread pool. 71 class RTThreadPoolExecutor : public RTExecutor { 72 public: 73 static RTExecutor* create(RtMetaData *extendOptions); 74 75 explicit RTThreadPoolExecutor(INT32 numThreads); 76 explicit RTThreadPoolExecutor(const RTThreadOptions& threadOptions, INT32 numThreads, 77 RTThreadPoolMode mode = RT_THREAD_POOL_RANDOM_MODE); 78 ~RTThreadPoolExecutor() override; 79 80 public: 81 void schedule(std::function<void()> task, INT32 threadId = 0) override; 82 getNumThreads()83 INT32 getNumThreads() const { return mThreadPool.getNumThreads(); } 84 // Returns the thread stack size (in bytes). getStackSize()85 size_t getStackSize() const { return mStackSize; } 86 87 private: 88 // Saves the value of the stack size option and starts the thread pool. 89 void start(); 90 91 RTThreadPool mThreadPool; 92 93 // Records the stack size in RTThreadOptions right before we call 94 // mThreadPool.startWorkers(). 95 // 96 // The actual stack size passed to pthread_attr_setstacksize() for the 97 // worker threads differs from the stack size we specified. It includes the 98 // guard size and space for thread-local storage. (See Thread::start() in 99 // thread/thread.cc.) So the unit tests check the stack size in 100 // RTThreadOptions, in addition to trying to recover the specified stack 101 // size from the stack size returned by pthread_getattr_np(), 102 // pthread_attr_getstacksize(), and pthread_attr_getguardsize(). 103 size_t mStackSize = 0; 104 }; 105 106 class RTSingleExecutor : public RTExecutor { 107 public: 108 static RTExecutor* create(void *extendOptions); 109 110 explicit RTSingleExecutor(const RTThreadOptions& threadOptions); 111 ~RTSingleExecutor() override; 112 void schedule(std::function<void()> task, INT32 threadId = 0) override; getNumThreads()113 INT32 getNumThreads() const { return 1; } 114 115 private: 116 RTThreadPool mThreadPool; 117 }; 118 119 #endif // SRC_RT_TASK_TASK_GRAPH_RTEXECUTOR_H_ 120