1 /* 2 * Copyright 2020 Rockchip Electronics Co. LTD 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 * 16 * src author: <mediapipe-team@google.com> 17 * new author: modified by <rimon.xu@rock-chips.com> 18 * date: 2020-03-15 19 * ref: https://github.com/google/mediapipe 20 */ 21 22 #ifndef SRC_RT_TASK_TASK_GRAPH_RTTHREADPOOL_H_ 23 #define SRC_RT_TASK_TASK_GRAPH_RTTHREADPOOL_H_ 24 25 #include <deque> 26 #include <functional> 27 #include <string> 28 #include <vector> 29 #include <map> 30 31 #include "rt_header.h" // NOLINT 32 #include "RTThreadOptions.h" // NOLINT 33 #include "rt_thread.h" 34 #include "rt_mutex.h" 35 36 typedef enum _RTThreadPoolMode { 37 RT_THREAD_POOL_RANDOM_MODE, 38 RT_THREAD_POOL_ASSIGN_MODE, 39 } RTThreadPoolMode; 40 41 // A thread pool consists of a set of threads that sit around waiting 42 // for callbacks to appear on a queue. When that happens, one of the 43 // threads pulls a callback off the queue and runs it. 44 // 45 // The thread pool is shut down when the pool is destroyed. 46 // 47 // Sample usage: 48 // 49 // { 50 // RTThreadPool pool("testpool", num_workers); 51 // pool.startWorkers(); 52 // for (INT32 i = 0; i < N; ++i) { 53 // pool.schedule([i]() { DoWork(i); }); 54 // } 55 // } 56 // 57 class RTThreadPool { 58 public: 59 // Create a thread pool that provides a concurrency of "numThreads" 60 // threads. I.e., if "numThreads" items are added, they are all 61 // guaranteed to run concurrently without excessive delay. 62 // It has an effectively infinite maximum queue length. 63 // If numThreads is 1, the callbacks are run in FIFO order. 64 explicit RTThreadPool(INT32 numThreads); 65 RTThreadPool(const RTThreadPool&) = delete; 66 RTThreadPool& operator=(const RTThreadPool&) = delete; 67 68 // Like the RTThreadPool(INT32 numThreads) constructor, except that 69 // it also associates "namePrefix" with each of the threads 70 // in the thread pool. 71 RTThreadPool(const std::string& namePrefix, INT32 numThreads, 72 RTThreadPoolMode mode = RT_THREAD_POOL_RANDOM_MODE); 73 74 // Create a thread pool that creates and can use up to "numThreads" 75 // threads. Any standard thread options, such as stack size, should 76 // be passed via "threadOptions". "namePrefix" specifies the 77 // thread name prefix. 78 RTThreadPool(const RTThreadOptions& threadOptions, 79 const std::string& namePrefix, INT32 numThreads, 80 RTThreadPoolMode mode = RT_THREAD_POOL_RANDOM_MODE); 81 82 // Waits for closures (if any) to complete. May be called without 83 // having called startWorkers(). 84 ~RTThreadPool(); 85 86 // REQUIRES: startWorkers has not been called 87 // Actually start the worker threads. 88 void startWorkers(); 89 90 // REQUIRES: startWorkers has been called 91 // Add specified callback to queue of pending callbacks. Eventually a 92 // thread will pull this callback off the queue and execute it. 93 void schedule(std::function<void()> callback, INT32 lockThreadId = 0); 94 95 // Provided for debugging and testing only. 96 INT32 getNumThreads() const; 97 98 // Standard thread options. Use this accessor to get them. 99 const RTThreadOptions& getThreadOptions() const; 100 101 private: 102 class RTWorkerThread; 103 typedef struct RTAssignWorker { 104 RTWorkerThread *worker; 105 std::deque<std::function<void()>> tasks; 106 RtMutex mutex; 107 RtCondition condition; 108 } RTAssignWorker; 109 110 void runWorker(INT32 threadId); 111 void randomAndRunWorker(); 112 void assignAndRunWorker(INT32 threadId); 113 void randomScheduleToWorker(std::function<void()> callback); 114 void assignScheduleToWorker(std::function<void()> callback, INT32 threadId); 115 void setThreadCount(INT32 numThreads); 116 117 std::string mNamePrefix; 118 /* thread id and worker */ 119 std::map<int, RTAssignWorker *> mWorkers; 120 INT32 mNumThreads; 121 122 RtMutex mMutex; 123 RtCondition mCondition; 124 bool mStopped = false; 125 126 std::deque<std::function<void()>> mTasks; 127 128 RTThreadOptions mThreadOptions; 129 RTThreadPoolMode mMode; 130 }; 131 132 namespace internal { 133 134 // Creates name for thread in a thread pool based on provided prefix and 135 // thread id. Length of the resulting name is guaranteed to be less or equal 136 // to 15. Name or thread id can be truncated to achieve that, see truncation 137 // samples below: 138 // namePrefix, 1234 -> namePrefix/123 139 // namePrefix, 1234567 -> namePrefix/123 140 // namePrefix_long, 1234 -> namePrefix_lon 141 std::string createThreadName(const std::string& prefix, INT32 threadId); 142 143 } // namespace internal 144 145 #endif // SRC_RT_TASK_TASK_GRAPH_RTTHREADPOOL_H_ 146