1*4882a593Smuzhiyun /* 2*4882a593Smuzhiyun * Copyright 2020 Rockchip Electronics Co. LTD 3*4882a593Smuzhiyun * 4*4882a593Smuzhiyun * Licensed under the Apache License, Version 2.0 (the "License"); 5*4882a593Smuzhiyun * you may not use this file except in compliance with the License. 6*4882a593Smuzhiyun * You may obtain a copy of the License at 7*4882a593Smuzhiyun * 8*4882a593Smuzhiyun * http://www.apache.org/licenses/LICENSE-2.0 9*4882a593Smuzhiyun * 10*4882a593Smuzhiyun * Unless required by applicable law or agreed to in writing, software 11*4882a593Smuzhiyun * distributed under the License is distributed on an "AS IS" BASIS, 12*4882a593Smuzhiyun * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13*4882a593Smuzhiyun * See the License for the specific language governing permissions and 14*4882a593Smuzhiyun * limitations under the License. 15*4882a593Smuzhiyun * 16*4882a593Smuzhiyun * src author: <mediapipe-team@google.com> 17*4882a593Smuzhiyun * new author: modified by <rimon.xu@rock-chips.com> 18*4882a593Smuzhiyun * date: 2020-03-15 19*4882a593Smuzhiyun * ref: https://github.com/google/mediapipe 20*4882a593Smuzhiyun */ 21*4882a593Smuzhiyun 22*4882a593Smuzhiyun #ifndef SRC_RT_TASK_TASK_GRAPH_RTTHREADPOOL_H_ 23*4882a593Smuzhiyun #define SRC_RT_TASK_TASK_GRAPH_RTTHREADPOOL_H_ 24*4882a593Smuzhiyun 25*4882a593Smuzhiyun #include <deque> 26*4882a593Smuzhiyun #include <functional> 27*4882a593Smuzhiyun #include <string> 28*4882a593Smuzhiyun #include <vector> 29*4882a593Smuzhiyun #include <map> 30*4882a593Smuzhiyun 31*4882a593Smuzhiyun #include "rt_header.h" // NOLINT 32*4882a593Smuzhiyun #include "RTThreadOptions.h" // NOLINT 33*4882a593Smuzhiyun #include "rt_thread.h" 34*4882a593Smuzhiyun #include "rt_mutex.h" 35*4882a593Smuzhiyun 36*4882a593Smuzhiyun typedef enum _RTThreadPoolMode { 37*4882a593Smuzhiyun RT_THREAD_POOL_RANDOM_MODE, 38*4882a593Smuzhiyun RT_THREAD_POOL_ASSIGN_MODE, 39*4882a593Smuzhiyun } RTThreadPoolMode; 40*4882a593Smuzhiyun 41*4882a593Smuzhiyun // A thread pool consists of a set of threads that sit around waiting 42*4882a593Smuzhiyun // for callbacks to appear on a queue. When that happens, one of the 43*4882a593Smuzhiyun // threads pulls a callback off the queue and runs it. 44*4882a593Smuzhiyun // 45*4882a593Smuzhiyun // The thread pool is shut down when the pool is destroyed. 46*4882a593Smuzhiyun // 47*4882a593Smuzhiyun // Sample usage: 48*4882a593Smuzhiyun // 49*4882a593Smuzhiyun // { 50*4882a593Smuzhiyun // RTThreadPool pool("testpool", num_workers); 51*4882a593Smuzhiyun // pool.startWorkers(); 52*4882a593Smuzhiyun // for (INT32 i = 0; i < N; ++i) { 53*4882a593Smuzhiyun // pool.schedule([i]() { DoWork(i); }); 54*4882a593Smuzhiyun // } 55*4882a593Smuzhiyun // } 56*4882a593Smuzhiyun // 57*4882a593Smuzhiyun class RTThreadPool { 58*4882a593Smuzhiyun public: 59*4882a593Smuzhiyun // Create a thread pool that provides a concurrency of "numThreads" 60*4882a593Smuzhiyun // threads. I.e., if "numThreads" items are added, they are all 61*4882a593Smuzhiyun // guaranteed to run concurrently without excessive delay. 62*4882a593Smuzhiyun // It has an effectively infinite maximum queue length. 63*4882a593Smuzhiyun // If numThreads is 1, the callbacks are run in FIFO order. 64*4882a593Smuzhiyun explicit RTThreadPool(INT32 numThreads); 65*4882a593Smuzhiyun RTThreadPool(const RTThreadPool&) = delete; 66*4882a593Smuzhiyun RTThreadPool& operator=(const RTThreadPool&) = delete; 67*4882a593Smuzhiyun 68*4882a593Smuzhiyun // Like the RTThreadPool(INT32 numThreads) constructor, except that 69*4882a593Smuzhiyun // it also associates "namePrefix" with each of the threads 70*4882a593Smuzhiyun // in the thread pool. 71*4882a593Smuzhiyun RTThreadPool(const std::string& namePrefix, INT32 numThreads, 72*4882a593Smuzhiyun RTThreadPoolMode mode = RT_THREAD_POOL_RANDOM_MODE); 73*4882a593Smuzhiyun 74*4882a593Smuzhiyun // Create a thread pool that creates and can use up to "numThreads" 75*4882a593Smuzhiyun // threads. Any standard thread options, such as stack size, should 76*4882a593Smuzhiyun // be passed via "threadOptions". "namePrefix" specifies the 77*4882a593Smuzhiyun // thread name prefix. 78*4882a593Smuzhiyun RTThreadPool(const RTThreadOptions& threadOptions, 79*4882a593Smuzhiyun const std::string& namePrefix, INT32 numThreads, 80*4882a593Smuzhiyun RTThreadPoolMode mode = RT_THREAD_POOL_RANDOM_MODE); 81*4882a593Smuzhiyun 82*4882a593Smuzhiyun // Waits for closures (if any) to complete. May be called without 83*4882a593Smuzhiyun // having called startWorkers(). 84*4882a593Smuzhiyun ~RTThreadPool(); 85*4882a593Smuzhiyun 86*4882a593Smuzhiyun // REQUIRES: startWorkers has not been called 87*4882a593Smuzhiyun // Actually start the worker threads. 88*4882a593Smuzhiyun void startWorkers(); 89*4882a593Smuzhiyun 90*4882a593Smuzhiyun // REQUIRES: startWorkers has been called 91*4882a593Smuzhiyun // Add specified callback to queue of pending callbacks. Eventually a 92*4882a593Smuzhiyun // thread will pull this callback off the queue and execute it. 93*4882a593Smuzhiyun void schedule(std::function<void()> callback, INT32 lockThreadId = 0); 94*4882a593Smuzhiyun 95*4882a593Smuzhiyun // Provided for debugging and testing only. 96*4882a593Smuzhiyun INT32 getNumThreads() const; 97*4882a593Smuzhiyun 98*4882a593Smuzhiyun // Standard thread options. Use this accessor to get them. 99*4882a593Smuzhiyun const RTThreadOptions& getThreadOptions() const; 100*4882a593Smuzhiyun 101*4882a593Smuzhiyun private: 102*4882a593Smuzhiyun class RTWorkerThread; 103*4882a593Smuzhiyun typedef struct RTAssignWorker { 104*4882a593Smuzhiyun RTWorkerThread *worker; 105*4882a593Smuzhiyun std::deque<std::function<void()>> tasks; 106*4882a593Smuzhiyun RtMutex mutex; 107*4882a593Smuzhiyun RtCondition condition; 108*4882a593Smuzhiyun } RTAssignWorker; 109*4882a593Smuzhiyun 110*4882a593Smuzhiyun void runWorker(INT32 threadId); 111*4882a593Smuzhiyun void randomAndRunWorker(); 112*4882a593Smuzhiyun void assignAndRunWorker(INT32 threadId); 113*4882a593Smuzhiyun void randomScheduleToWorker(std::function<void()> callback); 114*4882a593Smuzhiyun void assignScheduleToWorker(std::function<void()> callback, INT32 threadId); 115*4882a593Smuzhiyun void setThreadCount(INT32 numThreads); 116*4882a593Smuzhiyun 117*4882a593Smuzhiyun std::string mNamePrefix; 118*4882a593Smuzhiyun /* thread id and worker */ 119*4882a593Smuzhiyun std::map<int, RTAssignWorker *> mWorkers; 120*4882a593Smuzhiyun INT32 mNumThreads; 121*4882a593Smuzhiyun 122*4882a593Smuzhiyun RtMutex mMutex; 123*4882a593Smuzhiyun RtCondition mCondition; 124*4882a593Smuzhiyun bool mStopped = false; 125*4882a593Smuzhiyun 126*4882a593Smuzhiyun std::deque<std::function<void()>> mTasks; 127*4882a593Smuzhiyun 128*4882a593Smuzhiyun RTThreadOptions mThreadOptions; 129*4882a593Smuzhiyun RTThreadPoolMode mMode; 130*4882a593Smuzhiyun }; 131*4882a593Smuzhiyun 132*4882a593Smuzhiyun namespace internal { 133*4882a593Smuzhiyun 134*4882a593Smuzhiyun // Creates name for thread in a thread pool based on provided prefix and 135*4882a593Smuzhiyun // thread id. Length of the resulting name is guaranteed to be less or equal 136*4882a593Smuzhiyun // to 15. Name or thread id can be truncated to achieve that, see truncation 137*4882a593Smuzhiyun // samples below: 138*4882a593Smuzhiyun // namePrefix, 1234 -> namePrefix/123 139*4882a593Smuzhiyun // namePrefix, 1234567 -> namePrefix/123 140*4882a593Smuzhiyun // namePrefix_long, 1234 -> namePrefix_lon 141*4882a593Smuzhiyun std::string createThreadName(const std::string& prefix, INT32 threadId); 142*4882a593Smuzhiyun 143*4882a593Smuzhiyun } // namespace internal 144*4882a593Smuzhiyun 145*4882a593Smuzhiyun #endif // SRC_RT_TASK_TASK_GRAPH_RTTHREADPOOL_H_ 146