1 /*
2 * Copyright 2015 Rockchip Electronics Co. LTD
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define MODULE_TAG "mpp_task_impl"
18
19 #include <string.h>
20
21 #include "mpp_env.h"
22 #include "mpp_mem.h"
23 #include "mpp_debug.h"
24
25 #include "mpp_task_impl.h"
26 #include "mpp_meta_impl.h"
27
28 #define MAX_TASK_COUNT 8
29
30 #define MPP_TASK_DBG_FUNCTION (0x00000001)
31 #define MPP_TASK_DBG_FLOW (0x00000002)
32
33 #define mpp_task_dbg(flag, fmt, ...) _mpp_dbg(mpp_task_debug, flag, fmt, ## __VA_ARGS__)
34 #define mpp_task_dbg_f(flag, fmt, ...) _mpp_dbg_f(mpp_task_debug, flag, fmt, ## __VA_ARGS__)
35
36 #define mpp_task_dbg_func(fmt, ...) mpp_task_dbg_f(MPP_TASK_DBG_FUNCTION, fmt, ## __VA_ARGS__)
37 #define mpp_task_dbg_flow(fmt, ...) mpp_task_dbg(MPP_TASK_DBG_FLOW, fmt, ## __VA_ARGS__)
38
39 typedef struct MppTaskStatusInfo_t {
40 struct list_head list;
41 RK_S32 count;
42 MppTaskStatus status;
43 Condition *cond;
44 } MppTaskStatusInfo;
45
46 typedef struct MppTaskQueueImpl_t {
47 char name[32];
48 void *mpp;
49 Mutex *lock;
50 RK_S32 task_count;
51 RK_S32 ready; // flag for deinit
52
53 // two ports inside of task queue
54 MppPort input;
55 MppPort output;
56
57 MppTaskImpl *tasks;
58
59 MppTaskStatusInfo info[MPP_TASK_STATUS_BUTT];
60 } MppTaskQueueImpl;
61
62 typedef struct MppPortImpl_t {
63 MppPortType type;
64 MppTaskQueueImpl *queue;
65
66 MppTaskStatus status_curr;
67 MppTaskStatus next_on_dequeue;
68 MppTaskStatus next_on_enqueue;
69 } MppPortImpl;
70
71 static const char *module_name = MODULE_TAG;
72 static const char *port_type_str[] = {
73 "input",
74 "output",
75 "NULL",
76 };
77
78 static const char *task_status_str[] = {
79 "input_port",
80 "input_hold",
81 "output_port",
82 "output_hold",
83 "NULL",
84 };
85
86 RK_U32 mpp_task_debug = 0;
87
setup_mpp_task_name(MppTaskImpl * task)88 static inline void setup_mpp_task_name(MppTaskImpl *task)
89 {
90 task->name = module_name;
91 }
92
check_mpp_task_name(MppTask task)93 MPP_RET check_mpp_task_name(MppTask task)
94 {
95 if (task && ((MppTaskImpl *)task)->name == module_name)
96 return MPP_OK;
97
98 mpp_err_f("pointer %p failed on check\n", task);
99 mpp_abort();
100 return MPP_NOK;
101 }
102
mpp_port_init(MppTaskQueueImpl * queue,MppPortType type,MppPort * port)103 static MPP_RET mpp_port_init(MppTaskQueueImpl *queue, MppPortType type, MppPort *port)
104 {
105 MppPortImpl *impl = mpp_malloc(MppPortImpl, 1);
106 if (NULL == impl) {
107 mpp_err_f("failed to malloc MppPort type %d\n", type);
108 return MPP_ERR_MALLOC;
109 }
110
111 mpp_task_dbg_func("enter queue %p type %d\n", queue, type);
112
113 impl->type = type;
114 impl->queue = queue;
115
116 if (MPP_PORT_INPUT == type) {
117 impl->status_curr = MPP_INPUT_PORT;
118 impl->next_on_dequeue = MPP_INPUT_HOLD;
119 impl->next_on_enqueue = MPP_OUTPUT_PORT;
120 } else {
121 impl->status_curr = MPP_OUTPUT_PORT;
122 impl->next_on_dequeue = MPP_OUTPUT_HOLD;
123 impl->next_on_enqueue = MPP_INPUT_PORT;
124 }
125
126 *port = (MppPort *)impl;
127
128 mpp_task_dbg_func("leave queue %p port %p\n", queue, impl);
129
130 return MPP_OK;
131 }
132
mpp_port_deinit(MppPort port)133 static MPP_RET mpp_port_deinit(MppPort port)
134 {
135 mpp_task_dbg_func("enter port %p\n", port);
136 mpp_free(port);
137 mpp_task_dbg_func("leave\n");
138 return MPP_OK;
139 }
140
_mpp_port_poll(const char * caller,MppPort port,MppPollType timeout)141 MPP_RET _mpp_port_poll(const char *caller, MppPort port, MppPollType timeout)
142 {
143 MppPortImpl *port_impl = (MppPortImpl *)port;
144 MppTaskQueueImpl *queue = port_impl->queue;
145
146 AutoMutex auto_lock(queue->lock);
147 MppTaskStatusInfo *curr = NULL;
148 MPP_RET ret = MPP_NOK;
149
150 mpp_task_dbg_func("enter port %p\n", port);
151 if (!queue->ready) {
152 mpp_err("try to query when %s queue is not ready\n",
153 port_type_str[port_impl->type]);
154 goto RET;
155 }
156
157 curr = &queue->info[port_impl->status_curr];
158 if (curr->count) {
159 mpp_assert(!list_empty(&curr->list));
160 ret = (MPP_RET)curr->count;
161 mpp_task_dbg_flow("mpp %p %s from %s poll %s port timeout %d count %d\n",
162 queue->mpp, queue->name, caller,
163 port_type_str[port_impl->type],
164 timeout, curr->count);
165 } else {
166 mpp_assert(list_empty(&curr->list));
167
168 /* timeout
169 * zero - non-block
170 * negtive - block
171 * positive - timeout value
172 */
173 if (timeout) {
174 mpp_assert(curr->cond);
175 Condition *cond = curr->cond;
176
177 if (timeout < 0) {
178 mpp_task_dbg_flow("mpp %p %s from %s poll %s port block wait start\n",
179 queue->mpp, queue->name, caller,
180 port_type_str[port_impl->type]);
181
182 ret = (MPP_RET)cond->wait(queue->lock);
183 } else {
184 mpp_task_dbg_flow("mpp %p %s from %s poll %s port %d timeout wait start\n",
185 queue->mpp, queue->name, caller,
186 port_type_str[port_impl->type], timeout);
187 ret = (MPP_RET)cond->timedwait(queue->lock, timeout);
188 }
189
190 if (curr->count) {
191 mpp_assert(!list_empty(&curr->list));
192 ret = (MPP_RET)curr->count;
193 } else if (ret > 0)
194 ret = MPP_NOK;
195 }
196
197 mpp_task_dbg_flow("mpp %p %s from %s poll %s port timeout %d ret %d\n",
198 queue->mpp, queue->name, caller,
199 port_type_str[port_impl->type], timeout, ret);
200 }
201 RET:
202 mpp_task_dbg_func("leave\n");
203 return ret;
204 }
205
_mpp_port_move(const char * caller,MppPort port,MppTask task,MppTaskStatus status)206 MPP_RET _mpp_port_move(const char *caller, MppPort port, MppTask task,
207 MppTaskStatus status)
208 {
209 MppTaskImpl *task_impl = (MppTaskImpl *)task;
210 MppPortImpl *port_impl = (MppPortImpl *)port;
211 MppTaskQueueImpl *queue = port_impl->queue;
212 MppTaskStatusInfo *curr = NULL;
213 MppTaskStatusInfo *next = NULL;
214
215 AutoMutex auto_lock(queue->lock);
216 MPP_RET ret = MPP_NOK;
217
218 mpp_task_dbg_func("caller %s enter port %p task %p\n", caller, port, task);
219
220 if (!queue->ready) {
221 mpp_err("try to move task when %s queue is not ready\n",
222 port_type_str[port_impl->type]);
223 goto RET;
224 }
225
226 check_mpp_task_name(task);
227
228 mpp_assert(task_impl->queue == (MppTaskQueue)queue);
229
230 curr = &queue->info[task_impl->status];
231 next = &queue->info[status];
232
233 list_del_init(&task_impl->list);
234 curr->count--;
235 list_add_tail(&task_impl->list, &next->list);
236 next->count++;
237
238 mpp_task_dbg_flow("mpp %p %s from %s move %s port task %p %s -> %s done\n",
239 queue->mpp, queue->name, caller,
240 port_type_str[port_impl->type], task_impl,
241 task_status_str[task_impl->status],
242 task_status_str[status]);
243 task_impl->status = status;
244
245 next->cond->signal();
246 mpp_task_dbg_func("signal port %p\n", next);
247 ret = MPP_OK;
248 RET:
249 mpp_task_dbg_func("caller %s leave port %p task %p ret %d\n", caller, port, task, ret);
250
251 return ret;
252 }
253
_mpp_port_dequeue(const char * caller,MppPort port,MppTask * task)254 MPP_RET _mpp_port_dequeue(const char *caller, MppPort port, MppTask *task)
255 {
256 MppPortImpl *port_impl = (MppPortImpl *)port;
257 MppTaskQueueImpl *queue = port_impl->queue;
258 MppTaskStatusInfo *curr = NULL;
259 MppTaskStatusInfo *next = NULL;
260 MppTaskImpl *task_impl = NULL;
261 MppTask p = NULL;
262
263 AutoMutex auto_lock(queue->lock);
264 MPP_RET ret = MPP_NOK;
265
266 mpp_task_dbg_func("caller %s enter port %p\n", caller, port);
267
268 if (!queue->ready) {
269 mpp_err("try to dequeue when %s queue is not ready\n",
270 port_type_str[port_impl->type]);
271 goto RET;
272 }
273
274 curr = &queue->info[port_impl->status_curr];
275 next = &queue->info[port_impl->next_on_dequeue];
276
277 *task = NULL;
278 if (curr->count == 0) {
279 mpp_assert(list_empty(&curr->list));
280 mpp_task_dbg_flow("mpp %p %s from %s dequeue %s port task %s -> %s failed\n",
281 queue->mpp, queue->name, caller,
282 port_type_str[port_impl->type],
283 task_status_str[port_impl->status_curr],
284 task_status_str[port_impl->next_on_dequeue]);
285 goto RET;
286 }
287
288 mpp_assert(!list_empty(&curr->list));
289 task_impl = list_entry(curr->list.next, MppTaskImpl, list);
290 p = (MppTask)task_impl;
291 check_mpp_task_name(p);
292 list_del_init(&task_impl->list);
293 curr->count--;
294 mpp_assert(curr->count >= 0);
295
296 list_add_tail(&task_impl->list, &next->list);
297 next->count++;
298 task_impl->status = next->status;
299
300 mpp_task_dbg_flow("mpp %p %s from %s dequeue %s port task %p %s -> %s done\n",
301 queue->mpp, queue->name, caller,
302 port_type_str[port_impl->type], task_impl,
303 task_status_str[port_impl->status_curr],
304 task_status_str[port_impl->next_on_dequeue]);
305
306 *task = p;
307 ret = MPP_OK;
308 RET:
309 mpp_task_dbg_func("caller %s leave port %p task %p ret %d\n", caller, port, *task, ret);
310
311 return ret;
312 }
313
_mpp_port_enqueue(const char * caller,MppPort port,MppTask task)314 MPP_RET _mpp_port_enqueue(const char *caller, MppPort port, MppTask task)
315 {
316 MppTaskImpl *task_impl = (MppTaskImpl *)task;
317 MppPortImpl *port_impl = (MppPortImpl *)port;
318 MppTaskQueueImpl *queue = port_impl->queue;
319 MppTaskStatusInfo *curr = NULL;
320 MppTaskStatusInfo *next = NULL;
321
322 AutoMutex auto_lock(queue->lock);
323 MPP_RET ret = MPP_NOK;
324
325 mpp_task_dbg_func("caller %s enter port %p task %p\n", caller, port, task);
326
327 if (!queue->ready) {
328 mpp_err("try to enqueue when %s queue is not ready\n",
329 port_type_str[port_impl->type]);
330 goto RET;
331 }
332
333 check_mpp_task_name(task);
334
335 mpp_assert(task_impl->queue == (MppTaskQueue)queue);
336 mpp_assert(task_impl->status == port_impl->next_on_dequeue);
337
338 curr = &queue->info[task_impl->status];
339 next = &queue->info[port_impl->next_on_enqueue];
340
341 list_del_init(&task_impl->list);
342 curr->count--;
343 list_add_tail(&task_impl->list, &next->list);
344 next->count++;
345 task_impl->status = next->status;
346
347 mpp_task_dbg_flow("mpp %p %s from %s enqueue %s port task %p %s -> %s done\n",
348 queue->mpp, queue->name, caller,
349 port_type_str[port_impl->type], task_impl,
350 task_status_str[port_impl->next_on_dequeue],
351 task_status_str[port_impl->next_on_enqueue]);
352
353 next->cond->signal();
354 mpp_task_dbg_func("signal port %p\n", next);
355 ret = MPP_OK;
356 RET:
357 mpp_task_dbg_func("caller %s leave port %p task %p ret %d\n", caller, port, task, ret);
358
359 return ret;
360 }
361
_mpp_port_awake(const char * caller,MppPort port)362 MPP_RET _mpp_port_awake(const char *caller, MppPort port)
363 {
364 if (port == NULL)
365 return MPP_NOK;
366
367 mpp_task_dbg_func("caller %s enter port %p\n", caller, port);
368 MppPortImpl *port_impl = (MppPortImpl *)port;
369 MppTaskQueueImpl *queue = port_impl->queue;
370 MppTaskStatusInfo *curr = NULL;
371 if (queue) {
372 AutoMutex auto_lock(queue->lock);
373 curr = &queue->info[port_impl->status_curr];
374 if (curr) {
375 curr->cond->signal();
376 }
377 }
378
379 mpp_task_dbg_func("caller %s leave port %p\n", caller, port);
380 return MPP_OK;
381 }
382
mpp_task_queue_init(MppTaskQueue * queue,void * mpp,const char * name)383 MPP_RET mpp_task_queue_init(MppTaskQueue *queue, void *mpp, const char *name)
384 {
385 if (NULL == queue) {
386 mpp_err_f("invalid NULL input\n");
387 return MPP_ERR_NULL_PTR;
388 }
389
390 MPP_RET ret = MPP_NOK;
391 MppTaskQueueImpl *p = NULL;
392 Mutex *lock = NULL;
393 Condition *cond[MPP_TASK_STATUS_BUTT] = { NULL };
394 RK_S32 i;
395
396 mpp_env_get_u32("mpp_task_debug", &mpp_task_debug, 0);
397 mpp_task_dbg_func("enter\n");
398
399 *queue = NULL;
400
401 p = mpp_calloc(MppTaskQueueImpl, 1);
402 if (NULL == p) {
403 mpp_err_f("malloc queue failed\n");
404 goto RET;
405 }
406
407 cond[MPP_INPUT_PORT] = new Condition();
408 cond[MPP_INPUT_HOLD] = NULL;
409 cond[MPP_OUTPUT_PORT] = new Condition();
410 cond[MPP_OUTPUT_HOLD] = NULL;
411
412 if (NULL == cond[MPP_INPUT_PORT] ||
413 NULL == cond[MPP_OUTPUT_PORT]) {
414 mpp_err_f("new condition failed\n");
415 goto RET;
416 }
417
418 for (i = 0; i < MPP_TASK_STATUS_BUTT; i++) {
419 INIT_LIST_HEAD(&p->info[i].list);
420 p->info[i].count = 0;
421 p->info[i].status = (MppTaskStatus)i;
422 p->info[i].cond = cond[i];
423 }
424
425 lock = new Mutex();
426 if (NULL == lock) {
427 mpp_err_f("new lock failed\n");
428 goto RET;
429 }
430
431 p->lock = lock;
432
433 if (mpp_port_init(p, MPP_PORT_INPUT, &p->input))
434 goto RET;
435
436 if (mpp_port_init(p, MPP_PORT_OUTPUT, &p->output)) {
437 mpp_port_deinit(p->input);
438 goto RET;
439 }
440
441 p->mpp = mpp;
442 if (name)
443 strncpy(p->name, name, sizeof(p->name) - 1);
444 else
445 strncpy(p->name, "none", sizeof(p->name) - 1);
446
447 ret = MPP_OK;
448 RET:
449 if (ret) {
450 if (lock)
451 delete lock;
452 if (cond[MPP_INPUT_PORT])
453 delete cond[MPP_INPUT_PORT];
454 if (cond[MPP_OUTPUT_PORT])
455 delete cond[MPP_OUTPUT_PORT];
456 MPP_FREE(p);
457 }
458
459 *queue = p;
460
461 mpp_task_dbg_func("leave ret %d queue %p\n", ret, p);
462 return ret;
463 }
464
mpp_task_queue_setup(MppTaskQueue queue,RK_S32 task_count)465 MPP_RET mpp_task_queue_setup(MppTaskQueue queue, RK_S32 task_count)
466 {
467 MppTaskQueueImpl *impl = (MppTaskQueueImpl *)queue;
468 AutoMutex auto_lock(impl->lock);
469
470 // NOTE: queue can only be setup once
471 mpp_assert(impl->tasks == NULL);
472 mpp_assert(impl->task_count == 0);
473 MppTaskImpl *tasks = mpp_calloc(MppTaskImpl, task_count);
474 if (NULL == tasks) {
475 mpp_err_f("malloc tasks list failed\n");
476 return MPP_ERR_MALLOC;
477 }
478
479 impl->tasks = tasks;
480 impl->task_count = task_count;
481
482 MppTaskStatusInfo *info = &impl->info[MPP_INPUT_PORT];
483
484 for (RK_S32 i = 0; i < task_count; i++) {
485 setup_mpp_task_name(&tasks[i]);
486 INIT_LIST_HEAD(&tasks[i].list);
487 tasks[i].index = i;
488 tasks[i].queue = queue;
489 tasks[i].status = MPP_INPUT_PORT;
490 mpp_meta_get(&tasks[i].meta);
491
492 list_add_tail(&tasks[i].list, &info->list);
493 info->count++;
494 }
495 impl->ready = 1;
496 return MPP_OK;
497 }
498
mpp_task_queue_deinit(MppTaskQueue queue)499 MPP_RET mpp_task_queue_deinit(MppTaskQueue queue)
500 {
501 if (NULL == queue) {
502 mpp_err_f("found NULL input queue\n");
503 return MPP_ERR_NULL_PTR;
504 }
505
506 MppTaskQueueImpl *p = (MppTaskQueueImpl *)queue;
507 p->lock->lock();
508
509 p->ready = 0;
510 p->info[MPP_INPUT_PORT].cond->signal();
511 p->info[MPP_OUTPUT_PORT].cond->signal();
512 if (p->tasks) {
513 for (RK_S32 i = 0; i < p->task_count; i++) {
514 MppMeta meta = p->tasks[i].meta;
515
516 /* we must ensure that all task return to init status */
517 if (mpp_meta_size(meta)) {
518 mpp_err_f("%s queue idx %d task %p status %d meta size %d\n",
519 p->name, i, &p->tasks[i], p->tasks[i].status,
520 mpp_meta_size(meta));
521 mpp_meta_dump(meta);
522 }
523 mpp_meta_put(p->tasks[i].meta);
524 }
525 mpp_free(p->tasks);
526 }
527
528 if (p->input) {
529 mpp_port_deinit(p->input);
530 p->input = NULL;
531 }
532 if (p->output) {
533 mpp_port_deinit(p->output);
534 p->output = NULL;
535 }
536 p->lock->unlock();
537 if (p->lock)
538 delete p->lock;
539 if (p->info[MPP_INPUT_PORT].cond) {
540 delete p->info[MPP_INPUT_PORT].cond;
541 p->info[MPP_INPUT_PORT].cond = NULL;
542 }
543 if (p->info[MPP_OUTPUT_PORT].cond) {
544 delete p->info[MPP_OUTPUT_PORT].cond;
545 p->info[MPP_OUTPUT_PORT].cond = NULL;
546 }
547 mpp_free(p);
548 return MPP_OK;
549 }
550
mpp_task_queue_get_port(MppTaskQueue queue,MppPortType type)551 MppPort mpp_task_queue_get_port(MppTaskQueue queue, MppPortType type)
552 {
553 if (NULL == queue || type >= MPP_PORT_BUTT) {
554 mpp_err_f("invalid input queue %p type %d\n", queue, type);
555 return NULL;
556 }
557
558 MppTaskQueueImpl *impl = (MppTaskQueueImpl *)queue;
559 return (type == MPP_PORT_INPUT) ? (impl->input) : (impl->output);
560 }
561
mpp_task_get_meta(MppTask task)562 MppMeta mpp_task_get_meta(MppTask task)
563 {
564 MppMeta meta = NULL;
565
566 if (task)
567 meta = ((MppTaskImpl *)task)->meta;
568
569 return meta;
570 }
571