xref: /OK3568_Linux_fs/external/camera_engine_rkaiq/rkaiq/xcore/task_service.h (revision 4882a59341e53eb6f0b4789bf948001014eff981)
1 /*
2  * task_service.h
3  *
4  *  Copyright (c) 2021 Rockchip Electronics Co., Ltd.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * Author: Cody Xie <cody.xie@rock-chips.com>
19  */
20 #ifndef XCORE_TASK_SERVICE_H
21 #define XCORE_TASK_SERVICE_H
22 
23 #include <algorithm>
24 #include <atomic>
25 #include <cassert>
26 #include <chrono>
27 #include <condition_variable>
28 #include <cstdint>
29 #include <deque>
30 #include <functional>
31 #include <iostream>
32 #include <mutex>
33 #include <thread>
34 #include <utility>
35 
36 #include "task_traits.h"
37 
38 #define DEBUG 1
39 #ifndef TASK_LOG
40 #define TASK_LOG(format, ...) \
41     printf("DEBUG: " #format "\n", ##__VA_ARGS__)
42 #endif
43 
44 namespace XCam {
45 
46 // SERVICE CONCEPTS:
47 // * A aynchronous threaded service to execute specific tasks that deal with
48 //   INPUTS and OUTPUTS parameters
49 // * Service will calculate the MAX_PROCESS_TIME of the task consumed
50 // * Support run ONCE/SYNC/ASYNC on a task
51 //   * ASYNC: Run task in a threaded loop
52 //   * SYNC: Call ENQUEUE/DEQUEUE synchronous by user
53 //   * ONCE: RunOnce for one param input
54 // * Support ENQUEUE/DEQUEUE methods
55 //   * ENQUEUE will signal loop immediately to run task
56 //   * DEQUEUE will wait until results not empty and the MAY_BLOCK is true
57 //   * DEQUEUE may return empty param if no result and the MAY_BLOCK is false
58 // * Support buffer overrun/underrun behaviors on
59 //   * BUSY (fifo full)
60 //   * IDLE (fifo empty for longer than 2 period)
61 //   * LATE (execution time longer than 1 period)
62 //   * ERROR (process error)
63 // * Support config and config change event ???
64 //   to negotiate with other service ???
65 //
66 // * Users can ENQUEUE new params, DEQUEUE PROCESSED/ERROR/SKIPPED params
67 // * Users can trigger START, STOP the service
68 // * Users can FLUSH the proccessing queue, mark param to SKIPPED
69 // * Users can specific the MAXIMUM PROCESS TIME
70 //
71 // * Parameters are UNIQUE, FIFO even on ERROR/LATE happened
72 // * Parameters have a BUFFER POOL, with MAX_PARAM_COUNT
73 // * Parameters can be ALLOCATED inside or outside of Service
74 // * Last result will be HOLD A COPY if current task failed
75 //
76 // * Task is a FUNCTOR object
77 // * Task does NOT OWN the param
78 // * Task may HOLD several input buffers
79 // * Task may return process result
80 //   * SUCCESS - returned, successful processed
81 //   * SKIPPED - returned, but this param skipped
82 //   * FAILED - returned, but failed in processing
83 //   * AGAIN - required more inputs
84 // * Task inputs and outputs may have different id
85 // * Task outputs may have inputs param
86 //
87 
88 template <typename T, class Container = std::deque<ServiceParam<T>>>
89 class TaskService {
90  public:
91     typedef TaskService<T> value_type;
92     typedef ServiceTask<T> task_type;
93     typedef ServiceParam<T> param_type;
94 
95     TaskService()                      = delete;
96     TaskService(const TaskService<T>&) = delete;
97     const TaskService<T>& operator=(const TaskService<T>&) = delete;
98 
99     explicit TaskService(std::unique_ptr<ServiceTask<T>> task,
100                          const bool may_block                = default_may_block,
101                          const uint8_t max_param_count       = default_max_param_count,
102                          const TaskDuration max_process_time = default_process_time)
max_param_count_(max_param_count)103         : max_param_count_(max_param_count),
104           max_process_time_(max_process_time),
105           may_block_(may_block),
106           started_(false),
107           task_(std::move(task)) {
108         assert(task_.get() != nullptr);
109         allocServiceParam();
110     }
111 
~TaskService()112     virtual ~TaskService() {
113         stop();
114         clear();
115     }
116 
loop()117     void loop() {
118         while (started_) {
119             std::unique_lock<std::mutex> in_lock(in_mutex_);
120             auto wait_ret = in_cond_.wait_for(in_lock, max_process_time_, [this]() {
121                 return (!started_ || (!in_params_.empty() &&
122                                       in_params_.front().state == ParamState::kReadyForProcess));
123             });
124             if (wait_ret == false) {
125                 // IDLE
126                 continue;
127             }
128             if (!started_) break;
129 
130             auto param = std::move(in_params_.front());
131             in_params_.pop_front();
132             in_lock.unlock();
133             param.state = ParamState::kProcessing;
134             const auto start = std::chrono::steady_clock::now();
135 
136             auto ret = (*task_)(param);
137             if (ret == TaskResult::kSuccess) {
138                 param.state = ParamState::kProcessedSuccess;
139             } else {
140                 param.state = ParamState::kProcessedError;
141                 TASK_LOG("task processs failed");
142 #ifdef USE_TASK_CALLBACK
143                 callbacks_.onError();
144 #endif
145             }
146 
147             {
148                 std::lock_guard<std::mutex> out_lock(out_mutex_);
149                 // TODO(Cody): if we have alllocated params in front,
150                 // this will block the result until these params are
151                 // dequeued. if we use push_front, we break the sequence
152                 // of params
153                 out_params_.push_back(std::move(param));
154                 out_cond_.notify_one();
155             }
156 
157             const auto end = std::chrono::steady_clock::now();
158 
159             const std::chrono::duration<double, std::milli> elapsed = end - start;
160             if (elapsed >= max_process_time_) {
161                 // TODO(Cody): match the frame id ?
162                 TASK_LOG("params processs elapsed %lf exceeds %lf", elapsed.count(),
163                      max_process_time_.count());
164 #if USE_TASK_CALLBACK
165                 callbacks_.onLate();
166 #endif
167             }
168         }
169     }
170 
start()171     void start() {
172         if (started_) {
173             return;
174         }
175         started_  = true;
176         executor_ = std::thread([this] { loop(); });
177         // executor_.detach();
178 #if USE_TASK_CALLBACK
179         callbacks_.onStarted();
180 #endif
181     }
182 
stop()183     void stop() {
184         if (started_) {
185             started_ = false;
186             executor_.join();
187         }
188 #if USE_TASK_CALLBACK
189         callbacks_.onStoped();
190 #endif
191     }
192 
allocServiceParam()193     void allocServiceParam() noexcept {
194         std::lock_guard<std::mutex> lock(out_mutex_);
195         while (out_params_.size() < max_param_count_) {
196             // TODO(Cody): Must add try block if -fexceptions enabled
197             // Ensure new object does not throw any exceptions
198             ServiceParam<T> param;
199             param.state = ParamState::kAllocated;
200             param.unique_id = -1;
201             param.payload = std::make_shared<T>();
202             out_params_.push_back(std::move(param));
203             out_cond_.notify_one();
204         }
205     }
206 
enqueue(ServiceParam<T> & input)207     void enqueue(ServiceParam<T>& input) {
208         std::lock_guard<std::mutex> lock(in_mutex_);
209         // TODO(Cody): deal with busy case
210 #if 0
211         if (in_params_.size() == max_param_count_) {
212             TASK_LOG("input params exceeds max count!");
213 #if USE_TASK_CALLBACK
214             callbacks_.onFull(input);
215 #endif
216             return;
217         }
218 #endif
219         input.state = ParamState::kReadyForProcess;
220         in_params_.push_back(input);
221         in_cond_.notify_one();
222     }
223 
enqueue(ServiceParam<T> && input)224     void enqueue(ServiceParam<T>&& input) {
225         std::lock_guard<std::mutex> lock(in_mutex_);
226         // TODO(Cody): deal with busy case
227 #if 0
228         if (in_params_.size() == max_param_count_) {
229             TASK_LOG("input params exceeds max count!");
230 #if USE_TASK_CALLBACK
231             callbacks_.onFull(input);
232 #endif
233             return;
234         }
235 #endif
236         input.state = ParamState::kReadyForProcess;
237         in_params_.push_back(std::move(input));
238         in_cond_.notify_one();
239     }
240 
dequeue()241     ServiceParam<T> dequeue() {
242         std::unique_lock<std::mutex> lock(out_mutex_);
243         out_cond_.wait(lock, [this]() {
244             if ((!out_params_.empty() &&
245                  (out_params_.front().state == ParamState::kAllocated ||
246                   out_params_.front().state == ParamState::kProcessedSuccess ||
247                   out_params_.front().state == ParamState::kProcessedError))) {
248                 return true;
249             } else if (!may_block_) {
250                 ServiceParam<T> empty = {
251                     .state = ParamState::kNull,
252                     .unique_id = -1,
253                     .payload = std::shared_ptr<T>(nullptr),
254                 };
255                 out_params_.push_front(std::move(empty));
256                 return true;
257             } else {
258                 return false;
259             }
260         });
261         auto p = std::move(out_params_.front());
262         out_params_.pop_front();
263         return p;
264     }
265 
flush()266     void flush() {
267         std::lock(in_mutex_, out_mutex_);
268         std::lock_guard<std::mutex> in_lock(in_mutex_, std::adopt_lock);
269         std::lock_guard<std::mutex> out_lock(in_mutex_, std::adopt_lock);
270         if (in_params_.empty()) {
271             return;
272         }
273         auto paramIt =
274             std::remove_if(in_params_.begin(), in_params_.end(), [](const ServiceParam<T>& p) {
275                 return (p.state != ParamState::kReadyForProcess) &&
276                        (p.state != ParamState::kAllocated);
277             });
278         for (auto param = paramIt; param != in_params_.end(); param++) {
279             param.state = ParamState::kAllocated;
280             param.unique_id = -1;
281             out_params_.push_back(param);
282             out_cond_.notify_one();
283         }
284     }
285 
clear()286     void clear() {
287         std::lock(in_mutex_, out_mutex_);
288         std::lock_guard<std::mutex> in_lock(in_mutex_, std::adopt_lock);
289         std::lock_guard<std::mutex> out_lock(in_mutex_, std::adopt_lock);
290         in_params_.clear();
291         out_params_.clear();
292     }
293 
getMaxParamCount()294     uint8_t getMaxParamCount() const { return max_param_count_; }
setMaxParamCount(const uint8_t count)295     void setMaxParamCount(const uint8_t count) { max_param_count_ = count; }
296 
getMaxProceedTime()297     const TaskDuration getMaxProceedTime() { return max_process_time_; }
setMaxProceedTime(const TaskDuration duration)298     void setMaxProceedTime(const TaskDuration duration) { max_process_time_ = duration; }
setMaxProceedTimeByFps(const int fps)299     void setMaxProceedTimeByFps(const int fps) {
300         max_process_time_ = TaskDuration(std::chrono::milliseconds(1000 / fps));
301     }
302 
303     // TODO(Cody): add support link to other services
304 #if 0
305     template <typename O>
306     friend void link(TaskService<O>* other);
307     template <typename O>
308     friend void unlink(TaskService<O>* other);
309 #endif
310 
311  private:
312     uint8_t max_param_count_;
313     TaskDuration max_process_time_;
314     bool may_block_;
315     bool started_;
316 
317     std::mutex in_mutex_;
318     std::condition_variable in_cond_;
319     std::mutex out_mutex_;
320     std::condition_variable out_cond_;
321     std::unique_ptr<ServiceTask<T>> task_;
322     std::thread executor_;
323 
324     Container in_params_;
325     Container out_params_;
326 
327 #if USE_TASK_CALLBACK
328     TaskCallbacks callbacks_;
329 #endif
330 };
331 
332 }  // namespace XCam
333 
334 #endif  // ALGOS_AEIS_TASK_SERVICE_H
335