1 /* 2 * Copyright 2020 Rockchip Electronics Co. LTD 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 * 16 * src author: <mediapipe-team@google.com> 17 * new author: modified by <rimon.xu@rock-chips.com> and <martin.cheng@rock-chips.com> 18 * date: 2020-03-19 19 * reference: https://github.com/google/mediapipe 20 */ 21 22 #ifndef SRC_RT_TASK_TASK_GRAPH_RTSCHEDULER_H_ 23 #define SRC_RT_TASK_TASK_GRAPH_RTSCHEDULER_H_ 24 25 #include <atomic> 26 #include <functional> 27 #include <map> 28 #include <memory> 29 #include <queue> 30 #include <set> 31 #include <utility> 32 #include <vector> 33 34 #include "RTExecutor.h" 35 #include "RTSchedulerQueue.h" 36 37 class RTMediaBuffer; 38 class RTTaskNodeContext; 39 class RTTaskGraph; 40 // The class scheduling a calculator graph. 41 class RTScheduler { 42 public: 43 RTScheduler(const RTScheduler&) = delete; 44 RTScheduler& operator=(const RTScheduler&) = delete; 45 46 explicit RTScheduler(RTTaskGraph *ctx); 47 48 ~RTScheduler(); 49 50 void setDefaultExecutor(RTExecutor* executor); 51 RT_RET setExecutor(RTTaskNode *node, RTExecutor* executor); 52 53 void scheduleNode(RTTaskNode *node, RTTaskNodeContext *nodeContext); 54 55 void scheduleUnthrottledNodes(const std::vector<RTTaskNode *>& nodesToSchedule); 56 void scheduleSuspendThrottledNodes(const std::vector<RTTaskNode *>& nodes); 57 58 RT_RET waitUntilDone(INT64 timeoutUs = -1); 59 // Wait until the running graph is in the idle mode, which is when nothing can 60 // be scheduled and nothing is running in the worker threads. This function 61 // can be called only after Start(). 62 // Runs application thread tasks while waiting. 63 RT_RET waitUntilIdle(INT64 timeoutUs = -1); 64 RT_RET waitUntilEos(INT64 timeoutUs = -1); 65 RT_RET cleanupAfterRun(); 66 RT_RET applicationThreadAwait( 67 const std::function<bool()>& wakeupCondition, INT64 timeoutUs = -1); 68 69 void queueIdleStateChanged(bool idle); 70 void addNodeToOpenedQueue(RTTaskNode *node); 71 void removeNodeFromOpenedQueue(RTTaskNode *node); 72 RT_BOOL checkScheduleDone(); 73 RT_RET tryToScheduleIdleNode(); 74 void handleIdle(); 75 bool isIdle(); 76 RT_BOOL isRunning(); 77 void start(); 78 79 void flush(); 80 81 void reset(); 82 83 void quit(); 84 85 void reachEos(); 86 87 void pause() LOCKS_EXCLUDED(mStateMutex); 88 89 void resume() LOCKS_EXCLUDED(mStateMutex); 90 91 void stop() LOCKS_EXCLUDED(mStateMutex); 92 93 void pauseAndWait(); 94 95 void setQueuesRunning(bool running); 96 void submitWaitingTasksOnQueues(); 97 98 void notifyHasError(RT_BOOL hasError); 99 void notifySchedulerPaused(); 100 101 void throttledGraphInputStream(); 102 void unthrottledGraphInputStream(); 103 RT_RET waitUntilGraphInputStreamUnthrottled(RtMutex *secondaryMutex, INT64 timeoutUs); 104 void emittedObservedOutput(); 105 RT_RET waitForObservedOutput(INT64 timeoutUs = -1); 106 RT_RET waitForUntilPaused(); 107 108 private: 109 // State of the scheduler. The figure shows the allowed state transitons. 110 // 111 // NOT_STARTED 112 // | 113 // v 114 // RUNNING--+ 115 // | | ^ | 116 // | | \ | 117 // | | \ v 118 // | | PAUSED 119 // | | | 120 // | v v 121 // | CANCELLING 122 // | | 123 // v v 124 // TERMINATING 125 // | 126 // v 127 // TERMINATED 128 enum State { 129 STATE_NOT_STARTED = 0, // The initial state. 130 STATE_RUNNING = 1, // The scheduler is running and scheduling nodes. 131 STATE_PAUSED = 2, // The scheduler is not scheduling nodes. 132 STATE_CANCELLING = 3, // The scheduler is being cancelled. The scheduler 133 // cannot be paused in this state so that 134 // scheduler_queue_ can be drained. 135 STATE_TERMINATED = 4, // The scheduler has terminated. 136 }; 137 std::atomic<State> mState = ATOMIC_VAR_INIT(STATE_NOT_STARTED); 138 RtMutex mStateMutex; 139 RtMutex mThrottleMutex; 140 RtCondition mStateCondition; 141 // Queue of nodes that need to be run. 142 RTSchedulerQueue mDefaultQueue; 143 std::vector<RTSchedulerQueue *> mSchedulerQueues; 144 // Number of queues which are not idle. 145 // Note: this indicates two slightly different things: 146 // a. the number of queues which still have nodes running; 147 // b. the number of queues whose executors may still access the scheduler. 148 // When a queue becomes idle, it has stopped running nodes, and the scheduler 149 // decrements the count. However, it is not done accessing the scheduler 150 // until HandleIdle returns. Therefore, a and b are briefly out of sync. 151 // This is ok, because it happens within a single critical section, which is 152 // guarded by state_mutex_. If we wanted to split this critical section, we 153 // would have to separate a and b into two variables. 154 INT32 mNonIdleQueueCount = 0; 155 // Used by HandleIdle to avoid multiple concurrent executions. 156 // We cannot simply hold a mutex throughout it, for two reasons: 157 // - We need it to be reentrant, which Mutex does not support. 158 // - We want simultaneous calls to return immediately instead of waiting, 159 // and Mutex's TryLock is not guaranteed to work. 160 bool mHandlingIdle = false; 161 std::vector<RTTaskNode *> mOpendNodeQueue; 162 std::vector<RTTaskNode *> mActiveSourceNodes; 163 // Data accessed by all SchedulerQueues. 164 RTSchedulerShared mShared; 165 RTTaskGraph *mTaskGraph; 166 // Number of throttled graph input streams. 167 INT32 mThrottledGraphInputStreamCount = 0; 168 // Used to stop WaitUntilGraphInputStreamUnthrottled. 169 INT32 mUnthrottleSeqNum = 0; 170 // Used to stop WaitForObservedOutput. 171 RT_BOOL mObservedOutputSignal = RT_FALSE; 172 // True if an application thread is waiting in waitForObservedOutput. 173 RT_BOOL mWaitingForObservedOutput = RT_FALSE; 174 RT_BOOL mWaitindForPaused = RT_FALSE; 175 RT_BOOL mReachEos; 176 }; 177 178 #endif // SRC_RT_TASK_TASK_GRAPH_RTSCHEDULER_H_ 179 180