xref: /OK3568_Linux_fs/external/mpp/mpp/base/mpp_task_impl.cpp (revision 4882a59341e53eb6f0b4789bf948001014eff981)
1 /*
2  * Copyright 2015 Rockchip Electronics Co. LTD
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define MODULE_TAG "mpp_task_impl"
18 
19 #include <string.h>
20 
21 #include "mpp_env.h"
22 #include "mpp_mem.h"
23 #include "mpp_debug.h"
24 
25 #include "mpp_task_impl.h"
26 #include "mpp_meta_impl.h"
27 
28 #define MAX_TASK_COUNT      8
29 
30 #define MPP_TASK_DBG_FUNCTION       (0x00000001)
31 #define MPP_TASK_DBG_FLOW           (0x00000002)
32 
33 #define mpp_task_dbg(flag, fmt, ...)     _mpp_dbg(mpp_task_debug, flag, fmt, ## __VA_ARGS__)
34 #define mpp_task_dbg_f(flag, fmt, ...)   _mpp_dbg_f(mpp_task_debug, flag, fmt, ## __VA_ARGS__)
35 
36 #define mpp_task_dbg_func(fmt, ...)      mpp_task_dbg_f(MPP_TASK_DBG_FUNCTION, fmt, ## __VA_ARGS__)
37 #define mpp_task_dbg_flow(fmt, ...)      mpp_task_dbg(MPP_TASK_DBG_FLOW, fmt, ## __VA_ARGS__)
38 
39 typedef struct MppTaskStatusInfo_t {
40     struct list_head    list;
41     RK_S32              count;
42     MppTaskStatus       status;
43     Condition           *cond;
44 } MppTaskStatusInfo;
45 
46 typedef struct MppTaskQueueImpl_t {
47     char                name[32];
48     void                *mpp;
49     Mutex               *lock;
50     RK_S32              task_count;
51     RK_S32              ready;          // flag for deinit
52 
53     // two ports inside of task queue
54     MppPort             input;
55     MppPort             output;
56 
57     MppTaskImpl         *tasks;
58 
59     MppTaskStatusInfo   info[MPP_TASK_STATUS_BUTT];
60 } MppTaskQueueImpl;
61 
62 typedef struct MppPortImpl_t {
63     MppPortType         type;
64     MppTaskQueueImpl    *queue;
65 
66     MppTaskStatus       status_curr;
67     MppTaskStatus       next_on_dequeue;
68     MppTaskStatus       next_on_enqueue;
69 } MppPortImpl;
70 
71 static const char *module_name = MODULE_TAG;
72 static const char *port_type_str[] = {
73     "input",
74     "output",
75     "NULL",
76 };
77 
78 static const char *task_status_str[] = {
79     "input_port",
80     "input_hold",
81     "output_port",
82     "output_hold",
83     "NULL",
84 };
85 
86 RK_U32 mpp_task_debug = 0;
87 
setup_mpp_task_name(MppTaskImpl * task)88 static inline void setup_mpp_task_name(MppTaskImpl *task)
89 {
90     task->name = module_name;
91 }
92 
check_mpp_task_name(MppTask task)93 MPP_RET check_mpp_task_name(MppTask task)
94 {
95     if (task && ((MppTaskImpl *)task)->name == module_name)
96         return MPP_OK;
97 
98     mpp_err_f("pointer %p failed on check\n", task);
99     mpp_abort();
100     return MPP_NOK;
101 }
102 
mpp_port_init(MppTaskQueueImpl * queue,MppPortType type,MppPort * port)103 static MPP_RET mpp_port_init(MppTaskQueueImpl *queue, MppPortType type, MppPort *port)
104 {
105     MppPortImpl *impl = mpp_malloc(MppPortImpl, 1);
106     if (NULL == impl) {
107         mpp_err_f("failed to malloc MppPort type %d\n", type);
108         return MPP_ERR_MALLOC;
109     }
110 
111     mpp_task_dbg_func("enter queue %p type %d\n", queue, type);
112 
113     impl->type  = type;
114     impl->queue = queue;
115 
116     if (MPP_PORT_INPUT == type) {
117         impl->status_curr     = MPP_INPUT_PORT;
118         impl->next_on_dequeue = MPP_INPUT_HOLD;
119         impl->next_on_enqueue = MPP_OUTPUT_PORT;
120     } else {
121         impl->status_curr     = MPP_OUTPUT_PORT;
122         impl->next_on_dequeue = MPP_OUTPUT_HOLD;
123         impl->next_on_enqueue = MPP_INPUT_PORT;
124     }
125 
126     *port = (MppPort *)impl;
127 
128     mpp_task_dbg_func("leave queue %p port %p\n", queue, impl);
129 
130     return MPP_OK;
131 }
132 
mpp_port_deinit(MppPort port)133 static MPP_RET mpp_port_deinit(MppPort port)
134 {
135     mpp_task_dbg_func("enter port %p\n", port);
136     mpp_free(port);
137     mpp_task_dbg_func("leave\n");
138     return MPP_OK;
139 }
140 
_mpp_port_poll(const char * caller,MppPort port,MppPollType timeout)141 MPP_RET _mpp_port_poll(const char *caller, MppPort port, MppPollType timeout)
142 {
143     MppPortImpl *port_impl = (MppPortImpl *)port;
144     MppTaskQueueImpl *queue = port_impl->queue;
145 
146     AutoMutex auto_lock(queue->lock);
147     MppTaskStatusInfo *curr = NULL;
148     MPP_RET ret = MPP_NOK;
149 
150     mpp_task_dbg_func("enter port %p\n", port);
151     if (!queue->ready) {
152         mpp_err("try to query when %s queue is not ready\n",
153                 port_type_str[port_impl->type]);
154         goto RET;
155     }
156 
157     curr = &queue->info[port_impl->status_curr];
158     if (curr->count) {
159         mpp_assert(!list_empty(&curr->list));
160         ret = (MPP_RET)curr->count;
161         mpp_task_dbg_flow("mpp %p %s from %s poll %s port timeout %d count %d\n",
162                           queue->mpp, queue->name, caller,
163                           port_type_str[port_impl->type],
164                           timeout, curr->count);
165     } else {
166         mpp_assert(list_empty(&curr->list));
167 
168         /* timeout
169          * zero     - non-block
170          * negtive  - block
171          * positive - timeout value
172          */
173         if (timeout) {
174             mpp_assert(curr->cond);
175             Condition *cond = curr->cond;
176 
177             if (timeout < 0) {
178                 mpp_task_dbg_flow("mpp %p %s from %s poll %s port block wait start\n",
179                                   queue->mpp, queue->name, caller,
180                                   port_type_str[port_impl->type]);
181 
182                 ret = (MPP_RET)cond->wait(queue->lock);
183             } else {
184                 mpp_task_dbg_flow("mpp %p %s from %s poll %s port %d timeout wait start\n",
185                                   queue->mpp, queue->name, caller,
186                                   port_type_str[port_impl->type], timeout);
187                 ret = (MPP_RET)cond->timedwait(queue->lock, timeout);
188             }
189 
190             if (curr->count) {
191                 mpp_assert(!list_empty(&curr->list));
192                 ret = (MPP_RET)curr->count;
193             } else if (ret > 0)
194                 ret = MPP_NOK;
195         }
196 
197         mpp_task_dbg_flow("mpp %p %s from %s poll %s port timeout %d ret %d\n",
198                           queue->mpp, queue->name, caller,
199                           port_type_str[port_impl->type], timeout, ret);
200     }
201 RET:
202     mpp_task_dbg_func("leave\n");
203     return ret;
204 }
205 
_mpp_port_move(const char * caller,MppPort port,MppTask task,MppTaskStatus status)206 MPP_RET _mpp_port_move(const char *caller, MppPort port, MppTask task,
207                        MppTaskStatus status)
208 {
209     MppTaskImpl *task_impl = (MppTaskImpl *)task;
210     MppPortImpl *port_impl = (MppPortImpl *)port;
211     MppTaskQueueImpl *queue = port_impl->queue;
212     MppTaskStatusInfo *curr = NULL;
213     MppTaskStatusInfo *next = NULL;
214 
215     AutoMutex auto_lock(queue->lock);
216     MPP_RET ret = MPP_NOK;
217 
218     mpp_task_dbg_func("caller %s enter port %p task %p\n", caller, port, task);
219 
220     if (!queue->ready) {
221         mpp_err("try to move task when %s queue is not ready\n",
222                 port_type_str[port_impl->type]);
223         goto RET;
224     }
225 
226     check_mpp_task_name(task);
227 
228     mpp_assert(task_impl->queue == (MppTaskQueue)queue);
229 
230     curr = &queue->info[task_impl->status];
231     next = &queue->info[status];
232 
233     list_del_init(&task_impl->list);
234     curr->count--;
235     list_add_tail(&task_impl->list, &next->list);
236     next->count++;
237 
238     mpp_task_dbg_flow("mpp %p %s from %s move %s port task %p %s -> %s done\n",
239                       queue->mpp, queue->name, caller,
240                       port_type_str[port_impl->type], task_impl,
241                       task_status_str[task_impl->status],
242                       task_status_str[status]);
243     task_impl->status = status;
244 
245     next->cond->signal();
246     mpp_task_dbg_func("signal port %p\n", next);
247     ret = MPP_OK;
248 RET:
249     mpp_task_dbg_func("caller %s leave port %p task %p ret %d\n", caller, port, task, ret);
250 
251     return ret;
252 }
253 
_mpp_port_dequeue(const char * caller,MppPort port,MppTask * task)254 MPP_RET _mpp_port_dequeue(const char *caller, MppPort port, MppTask *task)
255 {
256     MppPortImpl *port_impl = (MppPortImpl *)port;
257     MppTaskQueueImpl *queue = port_impl->queue;
258     MppTaskStatusInfo *curr = NULL;
259     MppTaskStatusInfo *next = NULL;
260     MppTaskImpl *task_impl = NULL;
261     MppTask p = NULL;
262 
263     AutoMutex auto_lock(queue->lock);
264     MPP_RET ret = MPP_NOK;
265 
266     mpp_task_dbg_func("caller %s enter port %p\n", caller, port);
267 
268     if (!queue->ready) {
269         mpp_err("try to dequeue when %s queue is not ready\n",
270                 port_type_str[port_impl->type]);
271         goto RET;
272     }
273 
274     curr = &queue->info[port_impl->status_curr];
275     next = &queue->info[port_impl->next_on_dequeue];
276 
277     *task = NULL;
278     if (curr->count == 0) {
279         mpp_assert(list_empty(&curr->list));
280         mpp_task_dbg_flow("mpp %p %s from %s dequeue %s port task %s -> %s failed\n",
281                           queue->mpp, queue->name, caller,
282                           port_type_str[port_impl->type],
283                           task_status_str[port_impl->status_curr],
284                           task_status_str[port_impl->next_on_dequeue]);
285         goto RET;
286     }
287 
288     mpp_assert(!list_empty(&curr->list));
289     task_impl = list_entry(curr->list.next, MppTaskImpl, list);
290     p = (MppTask)task_impl;
291     check_mpp_task_name(p);
292     list_del_init(&task_impl->list);
293     curr->count--;
294     mpp_assert(curr->count >= 0);
295 
296     list_add_tail(&task_impl->list, &next->list);
297     next->count++;
298     task_impl->status = next->status;
299 
300     mpp_task_dbg_flow("mpp %p %s from %s dequeue %s port task %p %s -> %s done\n",
301                       queue->mpp, queue->name, caller,
302                       port_type_str[port_impl->type], task_impl,
303                       task_status_str[port_impl->status_curr],
304                       task_status_str[port_impl->next_on_dequeue]);
305 
306     *task = p;
307     ret = MPP_OK;
308 RET:
309     mpp_task_dbg_func("caller %s leave port %p task %p ret %d\n", caller, port, *task, ret);
310 
311     return ret;
312 }
313 
_mpp_port_enqueue(const char * caller,MppPort port,MppTask task)314 MPP_RET _mpp_port_enqueue(const char *caller, MppPort port, MppTask task)
315 {
316     MppTaskImpl *task_impl = (MppTaskImpl *)task;
317     MppPortImpl *port_impl = (MppPortImpl *)port;
318     MppTaskQueueImpl *queue = port_impl->queue;
319     MppTaskStatusInfo *curr = NULL;
320     MppTaskStatusInfo *next = NULL;
321 
322     AutoMutex auto_lock(queue->lock);
323     MPP_RET ret = MPP_NOK;
324 
325     mpp_task_dbg_func("caller %s enter port %p task %p\n", caller, port, task);
326 
327     if (!queue->ready) {
328         mpp_err("try to enqueue when %s queue is not ready\n",
329                 port_type_str[port_impl->type]);
330         goto RET;
331     }
332 
333     check_mpp_task_name(task);
334 
335     mpp_assert(task_impl->queue  == (MppTaskQueue)queue);
336     mpp_assert(task_impl->status == port_impl->next_on_dequeue);
337 
338     curr = &queue->info[task_impl->status];
339     next = &queue->info[port_impl->next_on_enqueue];
340 
341     list_del_init(&task_impl->list);
342     curr->count--;
343     list_add_tail(&task_impl->list, &next->list);
344     next->count++;
345     task_impl->status = next->status;
346 
347     mpp_task_dbg_flow("mpp %p %s from %s enqueue %s port task %p %s -> %s done\n",
348                       queue->mpp, queue->name, caller,
349                       port_type_str[port_impl->type], task_impl,
350                       task_status_str[port_impl->next_on_dequeue],
351                       task_status_str[port_impl->next_on_enqueue]);
352 
353     next->cond->signal();
354     mpp_task_dbg_func("signal port %p\n", next);
355     ret = MPP_OK;
356 RET:
357     mpp_task_dbg_func("caller %s leave port %p task %p ret %d\n", caller, port, task, ret);
358 
359     return ret;
360 }
361 
_mpp_port_awake(const char * caller,MppPort port)362 MPP_RET _mpp_port_awake(const char *caller, MppPort port)
363 {
364     if (port == NULL)
365         return MPP_NOK;
366 
367     mpp_task_dbg_func("caller %s enter port %p\n", caller, port);
368     MppPortImpl *port_impl = (MppPortImpl *)port;
369     MppTaskQueueImpl *queue = port_impl->queue;
370     MppTaskStatusInfo *curr = NULL;
371     if (queue) {
372         AutoMutex auto_lock(queue->lock);
373         curr = &queue->info[port_impl->status_curr];
374         if (curr) {
375             curr->cond->signal();
376         }
377     }
378 
379     mpp_task_dbg_func("caller %s leave port %p\n", caller, port);
380     return MPP_OK;
381 }
382 
mpp_task_queue_init(MppTaskQueue * queue,void * mpp,const char * name)383 MPP_RET mpp_task_queue_init(MppTaskQueue *queue, void *mpp, const char *name)
384 {
385     if (NULL == queue) {
386         mpp_err_f("invalid NULL input\n");
387         return MPP_ERR_NULL_PTR;
388     }
389 
390     MPP_RET ret = MPP_NOK;
391     MppTaskQueueImpl *p = NULL;
392     Mutex *lock = NULL;
393     Condition *cond[MPP_TASK_STATUS_BUTT] = { NULL };
394     RK_S32 i;
395 
396     mpp_env_get_u32("mpp_task_debug", &mpp_task_debug, 0);
397     mpp_task_dbg_func("enter\n");
398 
399     *queue = NULL;
400 
401     p = mpp_calloc(MppTaskQueueImpl, 1);
402     if (NULL == p) {
403         mpp_err_f("malloc queue failed\n");
404         goto RET;
405     }
406 
407     cond[MPP_INPUT_PORT] = new Condition();
408     cond[MPP_INPUT_HOLD] = NULL;
409     cond[MPP_OUTPUT_PORT] = new Condition();
410     cond[MPP_OUTPUT_HOLD] = NULL;
411 
412     if (NULL == cond[MPP_INPUT_PORT] ||
413         NULL == cond[MPP_OUTPUT_PORT]) {
414         mpp_err_f("new condition failed\n");
415         goto RET;
416     }
417 
418     for (i = 0; i < MPP_TASK_STATUS_BUTT; i++) {
419         INIT_LIST_HEAD(&p->info[i].list);
420         p->info[i].count  = 0;
421         p->info[i].status = (MppTaskStatus)i;
422         p->info[i].cond = cond[i];
423     }
424 
425     lock = new Mutex();
426     if (NULL == lock) {
427         mpp_err_f("new lock failed\n");
428         goto RET;
429     }
430 
431     p->lock = lock;
432 
433     if (mpp_port_init(p, MPP_PORT_INPUT, &p->input))
434         goto RET;
435 
436     if (mpp_port_init(p, MPP_PORT_OUTPUT, &p->output)) {
437         mpp_port_deinit(p->input);
438         goto RET;
439     }
440 
441     p->mpp = mpp;
442     if (name)
443         strncpy(p->name, name, sizeof(p->name) - 1);
444     else
445         strncpy(p->name, "none", sizeof(p->name) - 1);
446 
447     ret = MPP_OK;
448 RET:
449     if (ret) {
450         if (lock)
451             delete lock;
452         if (cond[MPP_INPUT_PORT])
453             delete cond[MPP_INPUT_PORT];
454         if (cond[MPP_OUTPUT_PORT])
455             delete cond[MPP_OUTPUT_PORT];
456         MPP_FREE(p);
457     }
458 
459     *queue = p;
460 
461     mpp_task_dbg_func("leave ret %d queue %p\n", ret, p);
462     return ret;
463 }
464 
mpp_task_queue_setup(MppTaskQueue queue,RK_S32 task_count)465 MPP_RET mpp_task_queue_setup(MppTaskQueue queue, RK_S32 task_count)
466 {
467     MppTaskQueueImpl *impl = (MppTaskQueueImpl *)queue;
468     AutoMutex auto_lock(impl->lock);
469 
470     // NOTE: queue can only be setup once
471     mpp_assert(impl->tasks == NULL);
472     mpp_assert(impl->task_count == 0);
473     MppTaskImpl *tasks = mpp_calloc(MppTaskImpl, task_count);
474     if (NULL == tasks) {
475         mpp_err_f("malloc tasks list failed\n");
476         return MPP_ERR_MALLOC;
477     }
478 
479     impl->tasks = tasks;
480     impl->task_count = task_count;
481 
482     MppTaskStatusInfo *info = &impl->info[MPP_INPUT_PORT];
483 
484     for (RK_S32 i = 0; i < task_count; i++) {
485         setup_mpp_task_name(&tasks[i]);
486         INIT_LIST_HEAD(&tasks[i].list);
487         tasks[i].index  = i;
488         tasks[i].queue  = queue;
489         tasks[i].status = MPP_INPUT_PORT;
490         mpp_meta_get(&tasks[i].meta);
491 
492         list_add_tail(&tasks[i].list, &info->list);
493         info->count++;
494     }
495     impl->ready = 1;
496     return MPP_OK;
497 }
498 
mpp_task_queue_deinit(MppTaskQueue queue)499 MPP_RET mpp_task_queue_deinit(MppTaskQueue queue)
500 {
501     if (NULL == queue) {
502         mpp_err_f("found NULL input queue\n");
503         return MPP_ERR_NULL_PTR;
504     }
505 
506     MppTaskQueueImpl *p = (MppTaskQueueImpl *)queue;
507     p->lock->lock();
508 
509     p->ready = 0;
510     p->info[MPP_INPUT_PORT].cond->signal();
511     p->info[MPP_OUTPUT_PORT].cond->signal();
512     if (p->tasks) {
513         for (RK_S32 i = 0; i < p->task_count; i++) {
514             MppMeta meta = p->tasks[i].meta;
515 
516             /* we must ensure that all task return to init status */
517             if (mpp_meta_size(meta)) {
518                 mpp_err_f("%s queue idx %d task %p status %d meta size %d\n",
519                           p->name, i, &p->tasks[i], p->tasks[i].status,
520                           mpp_meta_size(meta));
521                 mpp_meta_dump(meta);
522             }
523             mpp_meta_put(p->tasks[i].meta);
524         }
525         mpp_free(p->tasks);
526     }
527 
528     if (p->input) {
529         mpp_port_deinit(p->input);
530         p->input = NULL;
531     }
532     if (p->output) {
533         mpp_port_deinit(p->output);
534         p->output = NULL;
535     }
536     p->lock->unlock();
537     if (p->lock)
538         delete p->lock;
539     if (p->info[MPP_INPUT_PORT].cond) {
540         delete p->info[MPP_INPUT_PORT].cond;
541         p->info[MPP_INPUT_PORT].cond = NULL;
542     }
543     if (p->info[MPP_OUTPUT_PORT].cond) {
544         delete p->info[MPP_OUTPUT_PORT].cond;
545         p->info[MPP_OUTPUT_PORT].cond = NULL;
546     }
547     mpp_free(p);
548     return MPP_OK;
549 }
550 
mpp_task_queue_get_port(MppTaskQueue queue,MppPortType type)551 MppPort mpp_task_queue_get_port(MppTaskQueue queue, MppPortType type)
552 {
553     if (NULL == queue || type >= MPP_PORT_BUTT) {
554         mpp_err_f("invalid input queue %p type %d\n", queue, type);
555         return NULL;
556     }
557 
558     MppTaskQueueImpl *impl = (MppTaskQueueImpl *)queue;
559     return (type == MPP_PORT_INPUT) ? (impl->input) : (impl->output);
560 }
561 
mpp_task_get_meta(MppTask task)562 MppMeta mpp_task_get_meta(MppTask task)
563 {
564     MppMeta meta = NULL;
565 
566     if (task)
567         meta = ((MppTaskImpl *)task)->meta;
568 
569     return meta;
570 }
571