1 /* 2 * task_service.h 3 * 4 * Copyright (c) 2021 Rockchip Electronics Co., Ltd. 5 * 6 * Licensed under the Apache License, Version 2.0 (the "License"); 7 * you may not use this file except in compliance with the License. 8 * You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 * 18 * Author: Cody Xie <cody.xie@rock-chips.com> 19 */ 20 #ifndef XCORE_TASK_SERVICE_H 21 #define XCORE_TASK_SERVICE_H 22 23 #include <algorithm> 24 #include <atomic> 25 #include <cassert> 26 #include <chrono> 27 #include <condition_variable> 28 #include <cstdint> 29 #include <deque> 30 #include <functional> 31 #include <iostream> 32 #include <mutex> 33 #include <thread> 34 #include <utility> 35 36 #include "task_traits.h" 37 38 #define DEBUG 1 39 #ifndef TASK_LOG 40 #define TASK_LOG(format, ...) \ 41 printf("DEBUG: " #format "\n", ##__VA_ARGS__) 42 #endif 43 44 namespace XCam { 45 46 // SERVICE CONCEPTS: 47 // * A aynchronous threaded service to execute specific tasks that deal with 48 // INPUTS and OUTPUTS parameters 49 // * Service will calculate the MAX_PROCESS_TIME of the task consumed 50 // * Support run ONCE/SYNC/ASYNC on a task 51 // * ASYNC: Run task in a threaded loop 52 // * SYNC: Call ENQUEUE/DEQUEUE synchronous by user 53 // * ONCE: RunOnce for one param input 54 // * Support ENQUEUE/DEQUEUE methods 55 // * ENQUEUE will signal loop immediately to run task 56 // * DEQUEUE will wait until results not empty and the MAY_BLOCK is true 57 // * DEQUEUE may return empty param if no result and the MAY_BLOCK is false 58 // * Support buffer overrun/underrun behaviors on 59 // * BUSY (fifo full) 60 // * IDLE (fifo empty for longer than 2 period) 61 // * LATE (execution time longer than 1 period) 62 // * ERROR (process error) 63 // * Support config and config change event ??? 64 // to negotiate with other service ??? 65 // 66 // * Users can ENQUEUE new params, DEQUEUE PROCESSED/ERROR/SKIPPED params 67 // * Users can trigger START, STOP the service 68 // * Users can FLUSH the proccessing queue, mark param to SKIPPED 69 // * Users can specific the MAXIMUM PROCESS TIME 70 // 71 // * Parameters are UNIQUE, FIFO even on ERROR/LATE happened 72 // * Parameters have a BUFFER POOL, with MAX_PARAM_COUNT 73 // * Parameters can be ALLOCATED inside or outside of Service 74 // * Last result will be HOLD A COPY if current task failed 75 // 76 // * Task is a FUNCTOR object 77 // * Task does NOT OWN the param 78 // * Task may HOLD several input buffers 79 // * Task may return process result 80 // * SUCCESS - returned, successful processed 81 // * SKIPPED - returned, but this param skipped 82 // * FAILED - returned, but failed in processing 83 // * AGAIN - required more inputs 84 // * Task inputs and outputs may have different id 85 // * Task outputs may have inputs param 86 // 87 88 template <typename T, class Container = std::deque<ServiceParam<T>>> 89 class TaskService { 90 public: 91 typedef TaskService<T> value_type; 92 typedef ServiceTask<T> task_type; 93 typedef ServiceParam<T> param_type; 94 95 TaskService() = delete; 96 TaskService(const TaskService<T>&) = delete; 97 const TaskService<T>& operator=(const TaskService<T>&) = delete; 98 99 explicit TaskService(std::unique_ptr<ServiceTask<T>> task, 100 const bool may_block = default_may_block, 101 const uint8_t max_param_count = default_max_param_count, 102 const TaskDuration max_process_time = default_process_time) max_param_count_(max_param_count)103 : max_param_count_(max_param_count), 104 max_process_time_(max_process_time), 105 may_block_(may_block), 106 started_(false), 107 task_(std::move(task)) { 108 assert(task_.get() != nullptr); 109 allocServiceParam(); 110 } 111 ~TaskService()112 virtual ~TaskService() { 113 stop(); 114 clear(); 115 } 116 loop()117 void loop() { 118 while (started_) { 119 std::unique_lock<std::mutex> in_lock(in_mutex_); 120 auto wait_ret = in_cond_.wait_for(in_lock, max_process_time_, [this]() { 121 return (!started_ || (!in_params_.empty() && 122 in_params_.front().state == ParamState::kReadyForProcess)); 123 }); 124 if (wait_ret == false) { 125 // IDLE 126 continue; 127 } 128 if (!started_) break; 129 130 auto param = std::move(in_params_.front()); 131 in_params_.pop_front(); 132 in_lock.unlock(); 133 param.state = ParamState::kProcessing; 134 const auto start = std::chrono::steady_clock::now(); 135 136 auto ret = (*task_)(param); 137 if (ret == TaskResult::kSuccess) { 138 param.state = ParamState::kProcessedSuccess; 139 } else { 140 param.state = ParamState::kProcessedError; 141 TASK_LOG("task processs failed"); 142 #ifdef USE_TASK_CALLBACK 143 callbacks_.onError(); 144 #endif 145 } 146 147 { 148 std::lock_guard<std::mutex> out_lock(out_mutex_); 149 // TODO(Cody): if we have alllocated params in front, 150 // this will block the result until these params are 151 // dequeued. if we use push_front, we break the sequence 152 // of params 153 out_params_.push_back(std::move(param)); 154 out_cond_.notify_one(); 155 } 156 157 const auto end = std::chrono::steady_clock::now(); 158 159 const std::chrono::duration<double, std::milli> elapsed = end - start; 160 if (elapsed >= max_process_time_) { 161 // TODO(Cody): match the frame id ? 162 TASK_LOG("params processs elapsed %lf exceeds %lf", elapsed.count(), 163 max_process_time_.count()); 164 #if USE_TASK_CALLBACK 165 callbacks_.onLate(); 166 #endif 167 } 168 } 169 } 170 start()171 void start() { 172 if (started_) { 173 return; 174 } 175 started_ = true; 176 executor_ = std::thread([this] { loop(); }); 177 // executor_.detach(); 178 #if USE_TASK_CALLBACK 179 callbacks_.onStarted(); 180 #endif 181 } 182 stop()183 void stop() { 184 if (started_) { 185 started_ = false; 186 executor_.join(); 187 } 188 #if USE_TASK_CALLBACK 189 callbacks_.onStoped(); 190 #endif 191 } 192 allocServiceParam()193 void allocServiceParam() noexcept { 194 std::lock_guard<std::mutex> lock(out_mutex_); 195 while (out_params_.size() < max_param_count_) { 196 // TODO(Cody): Must add try block if -fexceptions enabled 197 // Ensure new object does not throw any exceptions 198 ServiceParam<T> param; 199 param.state = ParamState::kAllocated; 200 param.unique_id = -1; 201 param.payload = std::make_shared<T>(); 202 out_params_.push_back(std::move(param)); 203 out_cond_.notify_one(); 204 } 205 } 206 enqueue(ServiceParam<T> & input)207 void enqueue(ServiceParam<T>& input) { 208 std::lock_guard<std::mutex> lock(in_mutex_); 209 // TODO(Cody): deal with busy case 210 #if 0 211 if (in_params_.size() == max_param_count_) { 212 TASK_LOG("input params exceeds max count!"); 213 #if USE_TASK_CALLBACK 214 callbacks_.onFull(input); 215 #endif 216 return; 217 } 218 #endif 219 input.state = ParamState::kReadyForProcess; 220 in_params_.push_back(input); 221 in_cond_.notify_one(); 222 } 223 enqueue(ServiceParam<T> && input)224 void enqueue(ServiceParam<T>&& input) { 225 std::lock_guard<std::mutex> lock(in_mutex_); 226 // TODO(Cody): deal with busy case 227 #if 0 228 if (in_params_.size() == max_param_count_) { 229 TASK_LOG("input params exceeds max count!"); 230 #if USE_TASK_CALLBACK 231 callbacks_.onFull(input); 232 #endif 233 return; 234 } 235 #endif 236 input.state = ParamState::kReadyForProcess; 237 in_params_.push_back(std::move(input)); 238 in_cond_.notify_one(); 239 } 240 dequeue()241 ServiceParam<T> dequeue() { 242 std::unique_lock<std::mutex> lock(out_mutex_); 243 out_cond_.wait(lock, [this]() { 244 if ((!out_params_.empty() && 245 (out_params_.front().state == ParamState::kAllocated || 246 out_params_.front().state == ParamState::kProcessedSuccess || 247 out_params_.front().state == ParamState::kProcessedError))) { 248 return true; 249 } else if (!may_block_) { 250 ServiceParam<T> empty = { 251 .state = ParamState::kNull, 252 .unique_id = -1, 253 .payload = std::shared_ptr<T>(nullptr), 254 }; 255 out_params_.push_front(std::move(empty)); 256 return true; 257 } else { 258 return false; 259 } 260 }); 261 auto p = std::move(out_params_.front()); 262 out_params_.pop_front(); 263 return p; 264 } 265 flush()266 void flush() { 267 std::lock(in_mutex_, out_mutex_); 268 std::lock_guard<std::mutex> in_lock(in_mutex_, std::adopt_lock); 269 std::lock_guard<std::mutex> out_lock(in_mutex_, std::adopt_lock); 270 if (in_params_.empty()) { 271 return; 272 } 273 auto paramIt = 274 std::remove_if(in_params_.begin(), in_params_.end(), [](const ServiceParam<T>& p) { 275 return (p.state != ParamState::kReadyForProcess) && 276 (p.state != ParamState::kAllocated); 277 }); 278 for (auto param = paramIt; param != in_params_.end(); param++) { 279 param.state = ParamState::kAllocated; 280 param.unique_id = -1; 281 out_params_.push_back(param); 282 out_cond_.notify_one(); 283 } 284 } 285 clear()286 void clear() { 287 std::lock(in_mutex_, out_mutex_); 288 std::lock_guard<std::mutex> in_lock(in_mutex_, std::adopt_lock); 289 std::lock_guard<std::mutex> out_lock(in_mutex_, std::adopt_lock); 290 in_params_.clear(); 291 out_params_.clear(); 292 } 293 getMaxParamCount()294 uint8_t getMaxParamCount() const { return max_param_count_; } setMaxParamCount(const uint8_t count)295 void setMaxParamCount(const uint8_t count) { max_param_count_ = count; } 296 getMaxProceedTime()297 const TaskDuration getMaxProceedTime() { return max_process_time_; } setMaxProceedTime(const TaskDuration duration)298 void setMaxProceedTime(const TaskDuration duration) { max_process_time_ = duration; } setMaxProceedTimeByFps(const int fps)299 void setMaxProceedTimeByFps(const int fps) { 300 max_process_time_ = TaskDuration(std::chrono::milliseconds(1000 / fps)); 301 } 302 303 // TODO(Cody): add support link to other services 304 #if 0 305 template <typename O> 306 friend void link(TaskService<O>* other); 307 template <typename O> 308 friend void unlink(TaskService<O>* other); 309 #endif 310 311 private: 312 uint8_t max_param_count_; 313 TaskDuration max_process_time_; 314 bool may_block_; 315 bool started_; 316 317 std::mutex in_mutex_; 318 std::condition_variable in_cond_; 319 std::mutex out_mutex_; 320 std::condition_variable out_cond_; 321 std::unique_ptr<ServiceTask<T>> task_; 322 std::thread executor_; 323 324 Container in_params_; 325 Container out_params_; 326 327 #if USE_TASK_CALLBACK 328 TaskCallbacks callbacks_; 329 #endif 330 }; 331 332 } // namespace XCam 333 334 #endif // ALGOS_AEIS_TASK_SERVICE_H 335