xref: /OK3568_Linux_fs/external/rockit/tgi/sdk/include/RTThreadPool.h (revision 4882a59341e53eb6f0b4789bf948001014eff981)
1 /*
2  * Copyright 2020 Rockchip Electronics Co. LTD
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  * src author: <mediapipe-team@google.com>
17  * new author: modified by <rimon.xu@rock-chips.com>
18  *       date: 2020-03-15
19  *        ref: https://github.com/google/mediapipe
20  */
21 
22 #ifndef SRC_RT_TASK_TASK_GRAPH_RTTHREADPOOL_H_
23 #define SRC_RT_TASK_TASK_GRAPH_RTTHREADPOOL_H_
24 
25 #include <deque>
26 #include <functional>
27 #include <string>
28 #include <vector>
29 #include <map>
30 
31 #include "rt_header.h"              // NOLINT
32 #include "RTThreadOptions.h"        // NOLINT
33 #include "rt_thread.h"
34 #include "rt_mutex.h"
35 
36 typedef enum _RTThreadPoolMode {
37     RT_THREAD_POOL_RANDOM_MODE,
38     RT_THREAD_POOL_ASSIGN_MODE,
39 } RTThreadPoolMode;
40 
41 // A thread pool consists of a set of threads that sit around waiting
42 // for callbacks to appear on a queue.  When that happens, one of the
43 // threads pulls a callback off the queue and runs it.
44 //
45 // The thread pool is shut down when the pool is destroyed.
46 //
47 // Sample usage:
48 //
49 // {
50 //   RTThreadPool pool("testpool", num_workers);
51 //   pool.startWorkers();
52 //   for (INT32 i = 0; i < N; ++i) {
53 //     pool.schedule([i]() { DoWork(i); });
54 //   }
55 // }
56 //
57 class RTThreadPool {
58  public:
59     // Create a thread pool that provides a concurrency of "numThreads"
60     // threads. I.e., if "numThreads" items are added, they are all
61     // guaranteed to run concurrently without excessive delay.
62     // It has an effectively infinite maximum queue length.
63     // If numThreads is 1, the callbacks are run in FIFO order.
64     explicit RTThreadPool(INT32 numThreads);
65     RTThreadPool(const RTThreadPool&) = delete;
66     RTThreadPool& operator=(const RTThreadPool&) = delete;
67 
68     // Like the RTThreadPool(INT32 numThreads) constructor, except that
69     // it also associates "namePrefix" with each of the threads
70     // in the thread pool.
71     RTThreadPool(const std::string& namePrefix, INT32 numThreads,
72                 RTThreadPoolMode mode = RT_THREAD_POOL_RANDOM_MODE);
73 
74     // Create a thread pool that creates and can use up to "numThreads"
75     // threads.  Any standard thread options, such as stack size, should
76     // be passed via "threadOptions".  "namePrefix" specifies the
77     // thread name prefix.
78     RTThreadPool(const RTThreadOptions& threadOptions,
79                const std::string& namePrefix, INT32 numThreads,
80                RTThreadPoolMode mode = RT_THREAD_POOL_RANDOM_MODE);
81 
82     // Waits for closures (if any) to complete. May be called without
83     // having called startWorkers().
84     ~RTThreadPool();
85 
86     // REQUIRES: startWorkers has not been called
87     // Actually start the worker threads.
88     void startWorkers();
89 
90     // REQUIRES: startWorkers has been called
91     // Add specified callback to queue of pending callbacks.  Eventually a
92     // thread will pull this callback off the queue and execute it.
93     void schedule(std::function<void()> callback, INT32 lockThreadId = 0);
94 
95     // Provided for debugging and testing only.
96     INT32 getNumThreads() const;
97 
98     // Standard thread options.  Use this accessor to get them.
99     const RTThreadOptions& getThreadOptions() const;
100 
101  private:
102     class RTWorkerThread;
103     typedef struct RTAssignWorker {
104         RTWorkerThread *worker;
105         std::deque<std::function<void()>> tasks;
106         RtMutex mutex;
107         RtCondition condition;
108     } RTAssignWorker;
109 
110     void  runWorker(INT32 threadId);
111     void  randomAndRunWorker();
112     void  assignAndRunWorker(INT32 threadId);
113     void  randomScheduleToWorker(std::function<void()> callback);
114     void  assignScheduleToWorker(std::function<void()> callback, INT32 threadId);
115     void  setThreadCount(INT32 numThreads);
116 
117     std::string mNamePrefix;
118     /* thread id and worker */
119     std::map<int, RTAssignWorker *> mWorkers;
120     INT32 mNumThreads;
121 
122     RtMutex mMutex;
123     RtCondition mCondition;
124     bool mStopped = false;
125 
126     std::deque<std::function<void()>> mTasks;
127 
128     RTThreadOptions mThreadOptions;
129     RTThreadPoolMode mMode;
130 };
131 
132 namespace internal {
133 
134 // Creates name for thread in a thread pool based on provided prefix and
135 // thread id. Length of the resulting name is guaranteed to be less or equal
136 // to 15. Name or thread id can be truncated to achieve that, see truncation
137 // samples below:
138 // namePrefix, 1234       -> namePrefix/123
139 // namePrefix, 1234567    -> namePrefix/123
140 // namePrefix_long, 1234  -> namePrefix_lon
141 std::string createThreadName(const std::string& prefix, INT32 threadId);
142 
143 }  // namespace internal
144 
145 #endif  // SRC_RT_TASK_TASK_GRAPH_RTTHREADPOOL_H_
146