1 #pragma once 2 3 #include <mbgl/actor/scheduler.hpp> 4 #include <mbgl/actor/mailbox.hpp> 5 #include <mbgl/util/noncopyable.hpp> 6 #include <mbgl/util/util.hpp> 7 #include <mbgl/util/work_task.hpp> 8 #include <mbgl/util/work_request.hpp> 9 10 #include <atomic> 11 #include <functional> 12 #include <utility> 13 #include <queue> 14 #include <mutex> 15 16 namespace mbgl { 17 namespace util { 18 19 using LOOP_HANDLE = void *; 20 21 class RunLoop : public Scheduler, 22 private util::noncopyable { 23 public: 24 enum class Type : uint8_t { 25 Default, 26 New, 27 }; 28 29 enum class Priority : bool { 30 Default = false, 31 High = true, 32 }; 33 34 enum class Event : uint8_t { 35 None = 0, 36 Read = 1, 37 Write = 2, 38 ReadWrite = Read | Write, 39 }; 40 41 RunLoop(Type type = Type::Default); 42 ~RunLoop() override; 43 44 static RunLoop* Get(); 45 static LOOP_HANDLE getLoopHandle(); 46 47 void run(); 48 void runOnce(); 49 void stop(); 50 51 // So far only needed by the libcurl backend. 52 void addWatch(int fd, Event, std::function<void(int, Event)>&& callback); 53 void removeWatch(int fd); 54 55 // Invoke fn(args...) on this RunLoop. 56 template <class Fn, class... Args> invoke(Priority priority,Fn && fn,Args &&...args)57 void invoke(Priority priority, Fn&& fn, Args&&... args) { 58 push(priority, WorkTask::make(std::forward<Fn>(fn), std::forward<Args>(args)...)); 59 } 60 61 // Invoke fn(args...) on this RunLoop. 62 template <class Fn, class... Args> invoke(Fn && fn,Args &&...args)63 void invoke(Fn&& fn, Args&&... args) { 64 invoke(Priority::Default, std::forward<Fn>(fn), std::forward<Args>(args)...); 65 } 66 67 // Post the cancellable work fn(args...) to this RunLoop. 68 template <class Fn, class... Args> 69 std::unique_ptr<AsyncRequest> invokeCancellable(Fn && fn,Args &&...args)70 invokeCancellable(Fn&& fn, Args&&... args) { 71 std::shared_ptr<WorkTask> task = WorkTask::make(std::forward<Fn>(fn), std::forward<Args>(args)...); 72 push(Priority::Default, task); 73 return std::make_unique<WorkRequest>(task); 74 } 75 schedule(std::weak_ptr<Mailbox> mailbox)76 void schedule(std::weak_ptr<Mailbox> mailbox) override { 77 invoke([mailbox] () { 78 Mailbox::maybeReceive(mailbox); 79 }); 80 } 81 82 class Impl; 83 84 private: 85 MBGL_STORE_THREAD(tid) 86 87 using Queue = std::queue<std::shared_ptr<WorkTask>>; 88 89 // Wakes up the RunLoop so that it starts processing items in the queue. 90 void wake(); 91 92 // Adds a WorkTask to the queue, and wakes it up. push(Priority priority,std::shared_ptr<WorkTask> task)93 void push(Priority priority, std::shared_ptr<WorkTask> task) { 94 std::lock_guard<std::mutex> lock(mutex); 95 if (priority == Priority::High) { 96 highPriorityQueue.emplace(std::move(task)); 97 } else { 98 defaultQueue.emplace(std::move(task)); 99 } 100 wake(); 101 } 102 process()103 void process() { 104 std::shared_ptr<WorkTask> task; 105 std::unique_lock<std::mutex> lock(mutex); 106 while (true) { 107 if (!highPriorityQueue.empty()) { 108 task = std::move(highPriorityQueue.front()); 109 highPriorityQueue.pop(); 110 } else if (!defaultQueue.empty()) { 111 task = std::move(defaultQueue.front()); 112 defaultQueue.pop(); 113 } else { 114 break; 115 } 116 lock.unlock(); 117 (*task)(); 118 task.reset(); 119 lock.lock(); 120 } 121 } 122 123 Queue defaultQueue; 124 Queue highPriorityQueue; 125 std::mutex mutex; 126 127 std::unique_ptr<Impl> impl; 128 }; 129 130 } // namespace util 131 } // namespace mbgl 132 133 #include <mbgl/util/work_task_impl.hpp> 134