1*4882a593Smuzhiyun /* 2*4882a593Smuzhiyun * Copyright 2020 Rockchip Electronics Co. LTD 3*4882a593Smuzhiyun * 4*4882a593Smuzhiyun * Licensed under the Apache License, Version 2.0 (the "License"); 5*4882a593Smuzhiyun * you may not use this file except in compliance with the License. 6*4882a593Smuzhiyun * You may obtain a copy of the License at 7*4882a593Smuzhiyun * 8*4882a593Smuzhiyun * http://www.apache.org/licenses/LICENSE-2.0 9*4882a593Smuzhiyun * 10*4882a593Smuzhiyun * Unless required by applicable law or agreed to in writing, software 11*4882a593Smuzhiyun * distributed under the License is distributed on an "AS IS" BASIS, 12*4882a593Smuzhiyun * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13*4882a593Smuzhiyun * See the License for the specific language governing permissions and 14*4882a593Smuzhiyun * limitations under the License. 15*4882a593Smuzhiyun * 16*4882a593Smuzhiyun * src author: <mediapipe-team@google.com> 17*4882a593Smuzhiyun * new author: modified by <rimon.xu@rock-chips.com> and <martin.cheng@rock-chips.com> 18*4882a593Smuzhiyun * date: 2020-03-19 19*4882a593Smuzhiyun * reference: https://github.com/google/mediapipe 20*4882a593Smuzhiyun */ 21*4882a593Smuzhiyun 22*4882a593Smuzhiyun #ifndef SRC_RT_TASK_TASK_GRAPH_RTSCHEDULER_H_ 23*4882a593Smuzhiyun #define SRC_RT_TASK_TASK_GRAPH_RTSCHEDULER_H_ 24*4882a593Smuzhiyun 25*4882a593Smuzhiyun #include <atomic> 26*4882a593Smuzhiyun #include <functional> 27*4882a593Smuzhiyun #include <map> 28*4882a593Smuzhiyun #include <memory> 29*4882a593Smuzhiyun #include <queue> 30*4882a593Smuzhiyun #include <set> 31*4882a593Smuzhiyun #include <utility> 32*4882a593Smuzhiyun #include <vector> 33*4882a593Smuzhiyun 34*4882a593Smuzhiyun #include "RTExecutor.h" 35*4882a593Smuzhiyun #include "RTSchedulerQueue.h" 36*4882a593Smuzhiyun 37*4882a593Smuzhiyun class RTMediaBuffer; 38*4882a593Smuzhiyun class RTTaskNodeContext; 39*4882a593Smuzhiyun class RTTaskGraph; 40*4882a593Smuzhiyun // The class scheduling a calculator graph. 41*4882a593Smuzhiyun class RTScheduler { 42*4882a593Smuzhiyun public: 43*4882a593Smuzhiyun RTScheduler(const RTScheduler&) = delete; 44*4882a593Smuzhiyun RTScheduler& operator=(const RTScheduler&) = delete; 45*4882a593Smuzhiyun 46*4882a593Smuzhiyun explicit RTScheduler(RTTaskGraph *ctx); 47*4882a593Smuzhiyun 48*4882a593Smuzhiyun ~RTScheduler(); 49*4882a593Smuzhiyun 50*4882a593Smuzhiyun void setDefaultExecutor(RTExecutor* executor); 51*4882a593Smuzhiyun RT_RET setExecutor(RTTaskNode *node, RTExecutor* executor); 52*4882a593Smuzhiyun 53*4882a593Smuzhiyun void scheduleNode(RTTaskNode *node, RTTaskNodeContext *nodeContext); 54*4882a593Smuzhiyun 55*4882a593Smuzhiyun void scheduleUnthrottledNodes(const std::vector<RTTaskNode *>& nodesToSchedule); 56*4882a593Smuzhiyun void scheduleSuspendThrottledNodes(const std::vector<RTTaskNode *>& nodes); 57*4882a593Smuzhiyun 58*4882a593Smuzhiyun RT_RET waitUntilDone(INT64 timeoutUs = -1); 59*4882a593Smuzhiyun // Wait until the running graph is in the idle mode, which is when nothing can 60*4882a593Smuzhiyun // be scheduled and nothing is running in the worker threads. This function 61*4882a593Smuzhiyun // can be called only after Start(). 62*4882a593Smuzhiyun // Runs application thread tasks while waiting. 63*4882a593Smuzhiyun RT_RET waitUntilIdle(INT64 timeoutUs = -1); 64*4882a593Smuzhiyun RT_RET waitUntilEos(INT64 timeoutUs = -1); 65*4882a593Smuzhiyun RT_RET cleanupAfterRun(); 66*4882a593Smuzhiyun RT_RET applicationThreadAwait( 67*4882a593Smuzhiyun const std::function<bool()>& wakeupCondition, INT64 timeoutUs = -1); 68*4882a593Smuzhiyun 69*4882a593Smuzhiyun void queueIdleStateChanged(bool idle); 70*4882a593Smuzhiyun void addNodeToOpenedQueue(RTTaskNode *node); 71*4882a593Smuzhiyun void removeNodeFromOpenedQueue(RTTaskNode *node); 72*4882a593Smuzhiyun RT_BOOL checkScheduleDone(); 73*4882a593Smuzhiyun RT_RET tryToScheduleIdleNode(); 74*4882a593Smuzhiyun void handleIdle(); 75*4882a593Smuzhiyun bool isIdle(); 76*4882a593Smuzhiyun RT_BOOL isRunning(); 77*4882a593Smuzhiyun void start(); 78*4882a593Smuzhiyun 79*4882a593Smuzhiyun void flush(); 80*4882a593Smuzhiyun 81*4882a593Smuzhiyun void reset(); 82*4882a593Smuzhiyun 83*4882a593Smuzhiyun void quit(); 84*4882a593Smuzhiyun 85*4882a593Smuzhiyun void reachEos(); 86*4882a593Smuzhiyun 87*4882a593Smuzhiyun void pause() LOCKS_EXCLUDED(mStateMutex); 88*4882a593Smuzhiyun 89*4882a593Smuzhiyun void resume() LOCKS_EXCLUDED(mStateMutex); 90*4882a593Smuzhiyun 91*4882a593Smuzhiyun void stop() LOCKS_EXCLUDED(mStateMutex); 92*4882a593Smuzhiyun 93*4882a593Smuzhiyun void pauseAndWait(); 94*4882a593Smuzhiyun 95*4882a593Smuzhiyun void setQueuesRunning(bool running); 96*4882a593Smuzhiyun void submitWaitingTasksOnQueues(); 97*4882a593Smuzhiyun 98*4882a593Smuzhiyun void notifyHasError(RT_BOOL hasError); 99*4882a593Smuzhiyun void notifySchedulerPaused(); 100*4882a593Smuzhiyun 101*4882a593Smuzhiyun void throttledGraphInputStream(); 102*4882a593Smuzhiyun void unthrottledGraphInputStream(); 103*4882a593Smuzhiyun RT_RET waitUntilGraphInputStreamUnthrottled(RtMutex *secondaryMutex, INT64 timeoutUs); 104*4882a593Smuzhiyun void emittedObservedOutput(); 105*4882a593Smuzhiyun RT_RET waitForObservedOutput(INT64 timeoutUs = -1); 106*4882a593Smuzhiyun RT_RET waitForUntilPaused(); 107*4882a593Smuzhiyun 108*4882a593Smuzhiyun private: 109*4882a593Smuzhiyun // State of the scheduler. The figure shows the allowed state transitons. 110*4882a593Smuzhiyun // 111*4882a593Smuzhiyun // NOT_STARTED 112*4882a593Smuzhiyun // | 113*4882a593Smuzhiyun // v 114*4882a593Smuzhiyun // RUNNING--+ 115*4882a593Smuzhiyun // | | ^ | 116*4882a593Smuzhiyun // | | \ | 117*4882a593Smuzhiyun // | | \ v 118*4882a593Smuzhiyun // | | PAUSED 119*4882a593Smuzhiyun // | | | 120*4882a593Smuzhiyun // | v v 121*4882a593Smuzhiyun // | CANCELLING 122*4882a593Smuzhiyun // | | 123*4882a593Smuzhiyun // v v 124*4882a593Smuzhiyun // TERMINATING 125*4882a593Smuzhiyun // | 126*4882a593Smuzhiyun // v 127*4882a593Smuzhiyun // TERMINATED 128*4882a593Smuzhiyun enum State { 129*4882a593Smuzhiyun STATE_NOT_STARTED = 0, // The initial state. 130*4882a593Smuzhiyun STATE_RUNNING = 1, // The scheduler is running and scheduling nodes. 131*4882a593Smuzhiyun STATE_PAUSED = 2, // The scheduler is not scheduling nodes. 132*4882a593Smuzhiyun STATE_CANCELLING = 3, // The scheduler is being cancelled. The scheduler 133*4882a593Smuzhiyun // cannot be paused in this state so that 134*4882a593Smuzhiyun // scheduler_queue_ can be drained. 135*4882a593Smuzhiyun STATE_TERMINATED = 4, // The scheduler has terminated. 136*4882a593Smuzhiyun }; 137*4882a593Smuzhiyun std::atomic<State> mState = ATOMIC_VAR_INIT(STATE_NOT_STARTED); 138*4882a593Smuzhiyun RtMutex mStateMutex; 139*4882a593Smuzhiyun RtMutex mThrottleMutex; 140*4882a593Smuzhiyun RtCondition mStateCondition; 141*4882a593Smuzhiyun // Queue of nodes that need to be run. 142*4882a593Smuzhiyun RTSchedulerQueue mDefaultQueue; 143*4882a593Smuzhiyun std::vector<RTSchedulerQueue *> mSchedulerQueues; 144*4882a593Smuzhiyun // Number of queues which are not idle. 145*4882a593Smuzhiyun // Note: this indicates two slightly different things: 146*4882a593Smuzhiyun // a. the number of queues which still have nodes running; 147*4882a593Smuzhiyun // b. the number of queues whose executors may still access the scheduler. 148*4882a593Smuzhiyun // When a queue becomes idle, it has stopped running nodes, and the scheduler 149*4882a593Smuzhiyun // decrements the count. However, it is not done accessing the scheduler 150*4882a593Smuzhiyun // until HandleIdle returns. Therefore, a and b are briefly out of sync. 151*4882a593Smuzhiyun // This is ok, because it happens within a single critical section, which is 152*4882a593Smuzhiyun // guarded by state_mutex_. If we wanted to split this critical section, we 153*4882a593Smuzhiyun // would have to separate a and b into two variables. 154*4882a593Smuzhiyun INT32 mNonIdleQueueCount = 0; 155*4882a593Smuzhiyun // Used by HandleIdle to avoid multiple concurrent executions. 156*4882a593Smuzhiyun // We cannot simply hold a mutex throughout it, for two reasons: 157*4882a593Smuzhiyun // - We need it to be reentrant, which Mutex does not support. 158*4882a593Smuzhiyun // - We want simultaneous calls to return immediately instead of waiting, 159*4882a593Smuzhiyun // and Mutex's TryLock is not guaranteed to work. 160*4882a593Smuzhiyun bool mHandlingIdle = false; 161*4882a593Smuzhiyun std::vector<RTTaskNode *> mOpendNodeQueue; 162*4882a593Smuzhiyun std::vector<RTTaskNode *> mActiveSourceNodes; 163*4882a593Smuzhiyun // Data accessed by all SchedulerQueues. 164*4882a593Smuzhiyun RTSchedulerShared mShared; 165*4882a593Smuzhiyun RTTaskGraph *mTaskGraph; 166*4882a593Smuzhiyun // Number of throttled graph input streams. 167*4882a593Smuzhiyun INT32 mThrottledGraphInputStreamCount = 0; 168*4882a593Smuzhiyun // Used to stop WaitUntilGraphInputStreamUnthrottled. 169*4882a593Smuzhiyun INT32 mUnthrottleSeqNum = 0; 170*4882a593Smuzhiyun // Used to stop WaitForObservedOutput. 171*4882a593Smuzhiyun RT_BOOL mObservedOutputSignal = RT_FALSE; 172*4882a593Smuzhiyun // True if an application thread is waiting in waitForObservedOutput. 173*4882a593Smuzhiyun RT_BOOL mWaitingForObservedOutput = RT_FALSE; 174*4882a593Smuzhiyun RT_BOOL mWaitindForPaused = RT_FALSE; 175*4882a593Smuzhiyun RT_BOOL mReachEos; 176*4882a593Smuzhiyun }; 177*4882a593Smuzhiyun 178*4882a593Smuzhiyun #endif // SRC_RT_TASK_TASK_GRAPH_RTSCHEDULER_H_ 179*4882a593Smuzhiyun 180