1 /* SPDX-License-Identifier: Apache-2.0 OR MIT */
2 /*
3 * Copyright (c) 2015 Rockchip Electronics Co., Ltd.
4 */
5
6 #define MODULE_TAG "mpp_task_impl"
7
8 #include <string.h>
9
10 #include "mpp_env.h"
11 #include "mpp_mem.h"
12 #include "mpp_debug.h"
13
14 #include "mpp_task_impl.h"
15 #include "mpp_meta_impl.h"
16
17 #define MAX_TASK_COUNT 8
18
19 #define MPP_TASK_DBG_FUNCTION (0x00000001)
20 #define MPP_TASK_DBG_FLOW (0x00000002)
21
22 #define mpp_task_dbg(flag, fmt, ...) _mpp_dbg(mpp_task_debug, flag, fmt, ## __VA_ARGS__)
23 #define mpp_task_dbg_f(flag, fmt, ...) _mpp_dbg_f(mpp_task_debug, flag, fmt, ## __VA_ARGS__)
24
25 #define mpp_task_dbg_func(fmt, ...) mpp_task_dbg_f(MPP_TASK_DBG_FUNCTION, fmt, ## __VA_ARGS__)
26 #define mpp_task_dbg_flow(fmt, ...) mpp_task_dbg(MPP_TASK_DBG_FLOW, fmt, ## __VA_ARGS__)
27
28 typedef struct MppTaskStatusInfo_t {
29 struct list_head list;
30 RK_S32 count;
31 MppTaskStatus status;
32 MppCond cond;
33 } MppTaskStatusInfo;
34
35 typedef struct MppTaskQueueImpl_t {
36 char name[32];
37 void *mpp;
38 MppMutex lock;
39 RK_S32 task_count;
40 RK_S32 ready; // flag for deinit
41
42 // two ports inside of task queue
43 MppPort input;
44 MppPort output;
45
46 MppTaskImpl *tasks;
47
48 MppTaskStatusInfo info[MPP_TASK_STATUS_BUTT];
49 } MppTaskQueueImpl;
50
51 typedef struct MppPortImpl_t {
52 MppPortType type;
53 MppTaskQueueImpl *queue;
54
55 MppTaskStatus status_curr;
56 MppTaskStatus next_on_dequeue;
57 MppTaskStatus next_on_enqueue;
58 } MppPortImpl;
59
60 static const char *module_name = MODULE_TAG;
61 static const char *port_type_str[] = {
62 "input",
63 "output",
64 "NULL",
65 };
66
67 static const char *task_status_str[] = {
68 "input_port",
69 "input_hold",
70 "output_port",
71 "output_hold",
72 "NULL",
73 };
74
75 RK_U32 mpp_task_debug = 0;
76
setup_mpp_task_name(MppTaskImpl * task)77 static inline void setup_mpp_task_name(MppTaskImpl *task)
78 {
79 task->name = module_name;
80 }
81
check_mpp_task_name(MppTask task)82 MPP_RET check_mpp_task_name(MppTask task)
83 {
84 if (task && ((MppTaskImpl *)task)->name == module_name)
85 return MPP_OK;
86
87 mpp_err_f("pointer %p failed on check\n", task);
88 mpp_abort();
89 return MPP_NOK;
90 }
91
mpp_port_init(MppTaskQueueImpl * queue,MppPortType type,MppPort * port)92 static MPP_RET mpp_port_init(MppTaskQueueImpl *queue, MppPortType type, MppPort *port)
93 {
94 MppPortImpl *impl = mpp_malloc(MppPortImpl, 1);
95 if (!impl) {
96 mpp_err_f("failed to malloc MppPort type %d\n", type);
97 return MPP_ERR_MALLOC;
98 }
99
100 mpp_task_dbg_func("enter queue %p type %d\n", queue, type);
101
102 impl->type = type;
103 impl->queue = queue;
104
105 if (MPP_PORT_INPUT == type) {
106 impl->status_curr = MPP_INPUT_PORT;
107 impl->next_on_dequeue = MPP_INPUT_HOLD;
108 impl->next_on_enqueue = MPP_OUTPUT_PORT;
109 } else {
110 impl->status_curr = MPP_OUTPUT_PORT;
111 impl->next_on_dequeue = MPP_OUTPUT_HOLD;
112 impl->next_on_enqueue = MPP_INPUT_PORT;
113 }
114
115 *port = (MppPort *)impl;
116
117 mpp_task_dbg_func("leave queue %p port %p\n", queue, impl);
118
119 return MPP_OK;
120 }
121
mpp_port_deinit(MppPort port)122 static MPP_RET mpp_port_deinit(MppPort port)
123 {
124 mpp_task_dbg_func("enter port %p\n", port);
125 mpp_free(port);
126 mpp_task_dbg_func("leave\n");
127 return MPP_OK;
128 }
129
_mpp_port_poll(const char * caller,MppPort port,MppPollType timeout)130 MPP_RET _mpp_port_poll(const char *caller, MppPort port, MppPollType timeout)
131 {
132 MppPortImpl *port_impl = (MppPortImpl *)port;
133 MppTaskQueueImpl *queue = port_impl->queue;
134 MppTaskStatusInfo *curr = NULL;
135 MPP_RET ret = MPP_NOK;
136
137 mpp_mutex_lock(&queue->lock);
138
139 mpp_task_dbg_func("enter port %p\n", port);
140 if (!queue->ready) {
141 mpp_err("try to query when %s queue is not ready\n",
142 port_type_str[port_impl->type]);
143 goto RET;
144 }
145
146 curr = &queue->info[port_impl->status_curr];
147 if (curr->count) {
148 mpp_assert(!list_empty(&curr->list));
149 ret = (MPP_RET)curr->count;
150 mpp_task_dbg_flow("mpp %p %s from %s poll %s port timeout %d count %d\n",
151 queue->mpp, queue->name, caller,
152 port_type_str[port_impl->type],
153 timeout, curr->count);
154 } else {
155 mpp_assert(list_empty(&curr->list));
156
157 /* timeout
158 * zero - non-block
159 * negtive - block
160 * positive - timeout value
161 */
162 if (timeout) {
163 MppCond *cond = &curr->cond;
164
165 if (timeout < 0) {
166 mpp_task_dbg_flow("mpp %p %s from %s poll %s port block wait start\n",
167 queue->mpp, queue->name, caller,
168 port_type_str[port_impl->type]);
169
170 ret = (MPP_RET)mpp_cond_wait(cond, &queue->lock);
171 } else {
172 mpp_task_dbg_flow("mpp %p %s from %s poll %s port %d timeout wait start\n",
173 queue->mpp, queue->name, caller,
174 port_type_str[port_impl->type], timeout);
175 ret = (MPP_RET)mpp_cond_timedwait(cond, &queue->lock, timeout);
176 }
177
178 if (curr->count) {
179 mpp_assert(!list_empty(&curr->list));
180 ret = (MPP_RET)curr->count;
181 } else if (ret > 0)
182 ret = MPP_NOK;
183 }
184
185 mpp_task_dbg_flow("mpp %p %s from %s poll %s port timeout %d ret %d\n",
186 queue->mpp, queue->name, caller,
187 port_type_str[port_impl->type], timeout, ret);
188 }
189 RET:
190 mpp_mutex_unlock(&queue->lock);
191 mpp_task_dbg_func("leave\n");
192 return ret;
193 }
194
_mpp_port_move(const char * caller,MppPort port,MppTask task,MppTaskStatus status)195 MPP_RET _mpp_port_move(const char *caller, MppPort port, MppTask task,
196 MppTaskStatus status)
197 {
198 MppTaskImpl *task_impl = (MppTaskImpl *)task;
199 MppPortImpl *port_impl = (MppPortImpl *)port;
200 MppTaskQueueImpl *queue = port_impl->queue;
201 MppTaskStatusInfo *curr = NULL;
202 MppTaskStatusInfo *next = NULL;
203 MPP_RET ret = MPP_NOK;
204
205 mpp_mutex_lock(&queue->lock);
206
207 mpp_task_dbg_func("caller %s enter port %p task %p\n", caller, port, task);
208
209 if (!queue->ready) {
210 mpp_err("try to move task when %s queue is not ready\n",
211 port_type_str[port_impl->type]);
212 goto RET;
213 }
214
215 check_mpp_task_name(task);
216
217 mpp_assert(task_impl->queue == (MppTaskQueue)queue);
218
219 curr = &queue->info[task_impl->status];
220 next = &queue->info[status];
221
222 list_del_init(&task_impl->list);
223 curr->count--;
224 list_add_tail(&task_impl->list, &next->list);
225 next->count++;
226
227 mpp_task_dbg_flow("mpp %p %s from %s move %s port task %p %s -> %s done\n",
228 queue->mpp, queue->name, caller,
229 port_type_str[port_impl->type], task_impl,
230 task_status_str[task_impl->status],
231 task_status_str[status]);
232 task_impl->status = status;
233
234 mpp_cond_signal(&next->cond);
235 mpp_task_dbg_func("signal port %p\n", next);
236 ret = MPP_OK;
237 RET:
238 mpp_task_dbg_func("caller %s leave port %p task %p ret %d\n", caller, port, task, ret);
239 mpp_mutex_unlock(&queue->lock);
240
241 return ret;
242 }
243
_mpp_port_dequeue(const char * caller,MppPort port,MppTask * task)244 MPP_RET _mpp_port_dequeue(const char *caller, MppPort port, MppTask *task)
245 {
246 MppPortImpl *port_impl = (MppPortImpl *)port;
247 MppTaskQueueImpl *queue = port_impl->queue;
248 MppTaskStatusInfo *curr = NULL;
249 MppTaskStatusInfo *next = NULL;
250 MppTaskImpl *task_impl = NULL;
251 MppTask p = NULL;
252 MPP_RET ret = MPP_NOK;
253
254 mpp_mutex_lock(&queue->lock);
255
256 mpp_task_dbg_func("caller %s enter port %p\n", caller, port);
257
258 if (!queue->ready) {
259 mpp_err("try to dequeue when %s queue is not ready\n",
260 port_type_str[port_impl->type]);
261 goto RET;
262 }
263
264 curr = &queue->info[port_impl->status_curr];
265 next = &queue->info[port_impl->next_on_dequeue];
266
267 *task = NULL;
268 if (curr->count == 0) {
269 mpp_assert(list_empty(&curr->list));
270 mpp_task_dbg_flow("mpp %p %s from %s dequeue %s port task %s -> %s failed\n",
271 queue->mpp, queue->name, caller,
272 port_type_str[port_impl->type],
273 task_status_str[port_impl->status_curr],
274 task_status_str[port_impl->next_on_dequeue]);
275 goto RET;
276 }
277
278 mpp_assert(!list_empty(&curr->list));
279 task_impl = list_entry(curr->list.next, MppTaskImpl, list);
280 p = (MppTask)task_impl;
281 check_mpp_task_name(p);
282 list_del_init(&task_impl->list);
283 curr->count--;
284 mpp_assert(curr->count >= 0);
285
286 list_add_tail(&task_impl->list, &next->list);
287 next->count++;
288 task_impl->status = next->status;
289
290 mpp_task_dbg_flow("mpp %p %s from %s dequeue %s port task %p %s -> %s done\n",
291 queue->mpp, queue->name, caller,
292 port_type_str[port_impl->type], task_impl,
293 task_status_str[port_impl->status_curr],
294 task_status_str[port_impl->next_on_dequeue]);
295
296 *task = p;
297 ret = MPP_OK;
298 RET:
299 mpp_task_dbg_func("caller %s leave port %p task %p ret %d\n", caller, port, *task, ret);
300 mpp_mutex_unlock(&queue->lock);
301
302 return ret;
303 }
304
_mpp_port_enqueue(const char * caller,MppPort port,MppTask task)305 MPP_RET _mpp_port_enqueue(const char *caller, MppPort port, MppTask task)
306 {
307 MppTaskImpl *task_impl = (MppTaskImpl *)task;
308 MppPortImpl *port_impl = (MppPortImpl *)port;
309 MppTaskQueueImpl *queue = port_impl->queue;
310 MppTaskStatusInfo *curr = NULL;
311 MppTaskStatusInfo *next = NULL;
312 MPP_RET ret = MPP_NOK;
313
314 mpp_mutex_lock(&queue->lock);
315
316 mpp_task_dbg_func("caller %s enter port %p task %p\n", caller, port, task);
317
318 if (!queue->ready) {
319 mpp_err("try to enqueue when %s queue is not ready\n",
320 port_type_str[port_impl->type]);
321 mpp_mutex_unlock(&queue->lock);
322 goto RET;
323 }
324
325 check_mpp_task_name(task);
326
327 mpp_assert(task_impl->queue == (MppTaskQueue)queue);
328 mpp_assert(task_impl->status == port_impl->next_on_dequeue);
329
330 curr = &queue->info[task_impl->status];
331 next = &queue->info[port_impl->next_on_enqueue];
332
333 list_del_init(&task_impl->list);
334 curr->count--;
335 list_add_tail(&task_impl->list, &next->list);
336 next->count++;
337 task_impl->status = next->status;
338
339 mpp_task_dbg_flow("mpp %p %s from %s enqueue %s port task %p %s -> %s done\n",
340 queue->mpp, queue->name, caller,
341 port_type_str[port_impl->type], task_impl,
342 task_status_str[port_impl->next_on_dequeue],
343 task_status_str[port_impl->next_on_enqueue]);
344
345 mpp_cond_signal(&next->cond);
346 mpp_task_dbg_func("signal port %p\n", next);
347 ret = MPP_OK;
348 RET:
349 mpp_task_dbg_func("caller %s leave port %p task %p ret %d\n", caller, port, task, ret);
350 mpp_mutex_unlock(&queue->lock);
351
352 return ret;
353 }
354
_mpp_port_awake(const char * caller,MppPort port)355 MPP_RET _mpp_port_awake(const char *caller, MppPort port)
356 {
357 if (port == NULL)
358 return MPP_NOK;
359
360 mpp_task_dbg_func("caller %s enter port %p\n", caller, port);
361 MppPortImpl *port_impl = (MppPortImpl *)port;
362 MppTaskQueueImpl *queue = port_impl->queue;
363 MppTaskStatusInfo *curr = NULL;
364 if (queue) {
365 mpp_mutex_lock(&queue->lock);
366 curr = &queue->info[port_impl->status_curr];
367 if (curr) {
368 mpp_cond_signal(&curr->cond);
369 }
370 mpp_mutex_unlock(&queue->lock);
371 }
372
373 mpp_task_dbg_func("caller %s leave port %p\n", caller, port);
374 return MPP_OK;
375 }
376
mpp_task_queue_init(MppTaskQueue * queue,void * mpp,const char * name)377 MPP_RET mpp_task_queue_init(MppTaskQueue *queue, void *mpp, const char *name)
378 {
379 if (!queue) {
380 mpp_err_f("invalid NULL input\n");
381 return MPP_ERR_NULL_PTR;
382 }
383
384 MPP_RET ret = MPP_NOK;
385 MppTaskQueueImpl *p = NULL;
386 RK_S32 i;
387
388 mpp_env_get_u32("mpp_task_debug", &mpp_task_debug, 0);
389 mpp_task_dbg_func("enter\n");
390
391 *queue = NULL;
392
393 p = mpp_calloc(MppTaskQueueImpl, 1);
394 if (!p) {
395 mpp_err_f("malloc queue failed\n");
396 goto RET;
397 }
398
399 for (i = 0; i < MPP_TASK_STATUS_BUTT; i++) {
400 INIT_LIST_HEAD(&p->info[i].list);
401 p->info[i].count = 0;
402 p->info[i].status = (MppTaskStatus)i;
403 if (i == MPP_INPUT_PORT || i == MPP_OUTPUT_PORT)
404 mpp_cond_init(&p->info[i].cond);
405 }
406
407 mpp_mutex_init(&p->lock);
408
409 if (mpp_port_init(p, MPP_PORT_INPUT, &p->input))
410 goto RET;
411
412 if (mpp_port_init(p, MPP_PORT_OUTPUT, &p->output)) {
413 mpp_port_deinit(p->input);
414 goto RET;
415 }
416
417 p->mpp = mpp;
418 if (name)
419 strncpy(p->name, name, sizeof(p->name) - 1);
420 else
421 strncpy(p->name, "none", sizeof(p->name) - 1);
422
423 ret = MPP_OK;
424 RET:
425 if (ret) {
426 mpp_mutex_destroy(&p->lock);
427 mpp_cond_destroy(&p->info[MPP_INPUT_PORT].cond);
428 mpp_cond_destroy(&p->info[MPP_OUTPUT_PORT].cond);
429 MPP_FREE(p);
430 }
431
432 *queue = p;
433
434 mpp_task_dbg_func("leave ret %d queue %p\n", ret, p);
435 return ret;
436 }
437
mpp_task_queue_setup(MppTaskQueue queue,RK_S32 task_count)438 MPP_RET mpp_task_queue_setup(MppTaskQueue queue, RK_S32 task_count)
439 {
440 MppTaskQueueImpl *impl = (MppTaskQueueImpl *)queue;
441 MppTaskStatusInfo *info;
442 MppTaskImpl *tasks;
443 RK_S32 i;
444
445 mpp_mutex_lock(&impl->lock);
446
447 // NOTE: queue can only be setup once
448 mpp_assert(impl->tasks == NULL);
449 mpp_assert(impl->task_count == 0);
450 tasks = mpp_calloc(MppTaskImpl, task_count);
451 if (!tasks) {
452 mpp_err_f("malloc tasks list failed\n");
453 mpp_mutex_unlock(&impl->lock);
454 return MPP_ERR_MALLOC;
455 }
456
457 impl->tasks = tasks;
458 impl->task_count = task_count;
459
460 info = &impl->info[MPP_INPUT_PORT];
461
462 for (i = 0; i < task_count; i++) {
463 setup_mpp_task_name(&tasks[i]);
464 INIT_LIST_HEAD(&tasks[i].list);
465 tasks[i].index = i;
466 tasks[i].queue = queue;
467 tasks[i].status = MPP_INPUT_PORT;
468 mpp_meta_get(&tasks[i].meta);
469
470 list_add_tail(&tasks[i].list, &info->list);
471 info->count++;
472 }
473 impl->ready = 1;
474
475 mpp_mutex_unlock(&impl->lock);
476
477 return MPP_OK;
478 }
479
mpp_task_queue_deinit(MppTaskQueue queue)480 MPP_RET mpp_task_queue_deinit(MppTaskQueue queue)
481 {
482 MppTaskQueueImpl *p = (MppTaskQueueImpl *)queue;
483 RK_S32 i;
484
485 if (!p) {
486 mpp_err_f("found NULL input queue\n");
487 return MPP_ERR_NULL_PTR;
488 }
489
490 mpp_mutex_lock(&p->lock);
491
492 p->ready = 0;
493 mpp_cond_signal(&p->info[MPP_INPUT_PORT].cond);
494 mpp_cond_signal(&p->info[MPP_OUTPUT_PORT].cond);
495
496 if (p->tasks) {
497 for (i = 0; i < p->task_count; i++) {
498 MppMeta meta = p->tasks[i].meta;
499
500 /* we must ensure that all task return to init status */
501 if (mpp_meta_size(meta)) {
502 mpp_err_f("%s queue idx %d task %p status %d meta size %d\n",
503 p->name, i, &p->tasks[i], p->tasks[i].status,
504 mpp_meta_size(meta));
505 mpp_meta_dump(meta);
506 }
507 mpp_meta_put(p->tasks[i].meta);
508 }
509 mpp_free(p->tasks);
510 }
511
512 if (p->input) {
513 mpp_port_deinit(p->input);
514 p->input = NULL;
515 }
516 if (p->output) {
517 mpp_port_deinit(p->output);
518 p->output = NULL;
519 }
520 mpp_mutex_unlock(&p->lock);
521 mpp_mutex_destroy(&p->lock);
522
523 mpp_cond_destroy(&p->info[MPP_INPUT_PORT].cond);
524 mpp_cond_destroy(&p->info[MPP_OUTPUT_PORT].cond);
525
526 mpp_free(p);
527 return MPP_OK;
528 }
529
mpp_task_queue_get_port(MppTaskQueue queue,MppPortType type)530 MppPort mpp_task_queue_get_port(MppTaskQueue queue, MppPortType type)
531 {
532 if (!queue || type >= MPP_PORT_BUTT) {
533 mpp_err_f("invalid input queue %p type %d\n", queue, type);
534 return NULL;
535 }
536
537 MppTaskQueueImpl *impl = (MppTaskQueueImpl *)queue;
538 return (type == MPP_PORT_INPUT) ? (impl->input) : (impl->output);
539 }
540
mpp_task_get_meta(MppTask task)541 MppMeta mpp_task_get_meta(MppTask task)
542 {
543 MppMeta meta = NULL;
544
545 if (task)
546 meta = ((MppTaskImpl *)task)->meta;
547
548 return meta;
549 }
550