1 #pragma once
2 
3 #include <mbgl/actor/actor.hpp>
4 #include <mbgl/actor/mailbox.hpp>
5 #include <mbgl/actor/scheduler.hpp>
6 #include <mbgl/util/platform.hpp>
7 #include <mbgl/util/run_loop.hpp>
8 #include <mbgl/util/util.hpp>
9 
10 #include <cassert>
11 #include <future>
12 #include <memory>
13 #include <mutex>
14 #include <queue>
15 #include <string>
16 #include <thread>
17 #include <utility>
18 
19 namespace mbgl {
20 namespace util {
21 
22 // Manages a thread with `Object`.
23 
24 // Upon creation of this object, it launches a thread and creates an object of type `Object`
25 // in that thread. When the `Thread<>` object is destructed, the destructor waits
26 // for thread termination. The `Thread<>` constructor blocks until the thread and
27 // the `Object` are fully created, so after the object creation, it's safe to obtain the
28 // `Object` stored in this thread. The thread created will always have low priority on
29 // the platforms that support setting thread priority.
30 //
31 // The following properties make this class different from `ThreadPool`:
32 //
33 // - Only one thread is created.
34 // - `Object` will live in a single thread, providing thread affinity.
35 // - It is safe to use `ThreadLocal` in an `Object` managed by `Thread<>`
36 // - A `RunLoop` is created for the `Object` thread.
37 // - `Object` can use `Timer` and do asynchronous I/O, like wait for sockets events.
38 //
39 template<class Object>
40 class Thread {
41 public:
42     template <class... Args>
Thread(const std::string & name,Args &&...args)43     Thread(const std::string& name, Args&&... args) {
44 
45         std::promise<void> running_;
46         running = running_.get_future();
47 
48         auto capturedArgs = std::make_tuple(std::forward<Args>(args)...);
49 
50         thread = std::thread([
51             this,
52             name,
53             capturedArgs = std::move(capturedArgs),
54             runningPromise = std::move(running_)
55         ] () mutable {
56             platform::setCurrentThreadName(name);
57             platform::makeThreadLowPriority();
58 
59             util::RunLoop loop_(util::RunLoop::Type::New);
60             loop = &loop_;
61             EstablishedActor<Object> establishedActor(loop_, object, std::move(capturedArgs));
62 
63             runningPromise.set_value();
64 
65             loop->run();
66 
67             (void) establishedActor;
68 
69             loop = nullptr;
70         });
71     }
72 
~Thread()73     ~Thread() {
74         if (paused) {
75             resume();
76         }
77 
78         std::promise<void> stoppable;
79 
80         running.wait();
81 
82         // Invoke a noop task on the run loop to ensure that we're executing
83         // run() before we call stop()
84         loop->invoke([&] {
85             stoppable.set_value();
86         });
87 
88         stoppable.get_future().get();
89 
90         loop->stop();
91         thread.join();
92     }
93 
94     // Returns a non-owning reference to `Object` that
95     // can be used to send messages to `Object`. It is safe
96     // to the non-owning reference to outlive this object
97     // and be used after the `Thread<>` gets destroyed.
actor()98     ActorRef<std::decay_t<Object>> actor() {
99         return object.self();
100     }
101 
102     // Pauses the `Object` thread. It will prevent the object to wake
103     // up from events such as timers and file descriptor I/O. Messages
104     // sent to a paused `Object` will be queued and only processed after
105     // `resume()` is called.
pause()106     void pause() {
107         MBGL_VERIFY_THREAD(tid);
108 
109         assert(!paused);
110 
111         paused = std::make_unique<std::promise<void>>();
112         resumed = std::make_unique<std::promise<void>>();
113 
114         auto pausing = paused->get_future();
115 
116         running.wait();
117 
118         loop->invoke(RunLoop::Priority::High, [this] {
119             auto resuming = resumed->get_future();
120             paused->set_value();
121             resuming.get();
122         });
123 
124         pausing.get();
125     }
126 
127     // Resumes the `Object` thread previously paused by `pause()`.
resume()128     void resume() {
129         MBGL_VERIFY_THREAD(tid);
130 
131         assert(paused);
132 
133         resumed->set_value();
134 
135         resumed.reset();
136         paused.reset();
137     }
138 
139 private:
140     MBGL_STORE_THREAD(tid);
141 
142     AspiringActor<Object> object;
143 
144     std::thread thread;
145 
146     std::future<void> running;
147 
148     std::unique_ptr<std::promise<void>> paused;
149     std::unique_ptr<std::promise<void>> resumed;
150 
151     util::RunLoop* loop = nullptr;
152 };
153 
154 } // namespace util
155 } // namespace mbgl
156