xref: /rockchip-linux_mpp/mpp/base/mpp_task_impl.c (revision 437bfbeb9567cca9cd9080e3f6954aa9d6a94f18)
1 /* SPDX-License-Identifier: Apache-2.0 OR MIT */
2 /*
3  * Copyright (c) 2015 Rockchip Electronics Co., Ltd.
4  */
5 
6 #define MODULE_TAG "mpp_task_impl"
7 
8 #include <string.h>
9 
10 #include "mpp_env.h"
11 #include "mpp_mem.h"
12 #include "mpp_debug.h"
13 
14 #include "mpp_task_impl.h"
15 #include "mpp_meta_impl.h"
16 
17 #define MAX_TASK_COUNT      8
18 
19 #define MPP_TASK_DBG_FUNCTION       (0x00000001)
20 #define MPP_TASK_DBG_FLOW           (0x00000002)
21 
22 #define mpp_task_dbg(flag, fmt, ...)     _mpp_dbg(mpp_task_debug, flag, fmt, ## __VA_ARGS__)
23 #define mpp_task_dbg_f(flag, fmt, ...)   _mpp_dbg_f(mpp_task_debug, flag, fmt, ## __VA_ARGS__)
24 
25 #define mpp_task_dbg_func(fmt, ...)      mpp_task_dbg_f(MPP_TASK_DBG_FUNCTION, fmt, ## __VA_ARGS__)
26 #define mpp_task_dbg_flow(fmt, ...)      mpp_task_dbg(MPP_TASK_DBG_FLOW, fmt, ## __VA_ARGS__)
27 
28 typedef struct MppTaskStatusInfo_t {
29     struct list_head    list;
30     RK_S32              count;
31     MppTaskStatus       status;
32     MppCond             cond;
33 } MppTaskStatusInfo;
34 
35 typedef struct MppTaskQueueImpl_t {
36     char                name[32];
37     void                *mpp;
38     MppMutex            lock;
39     RK_S32              task_count;
40     RK_S32              ready;          // flag for deinit
41 
42     // two ports inside of task queue
43     MppPort             input;
44     MppPort             output;
45 
46     MppTaskImpl         *tasks;
47 
48     MppTaskStatusInfo   info[MPP_TASK_STATUS_BUTT];
49 } MppTaskQueueImpl;
50 
51 typedef struct MppPortImpl_t {
52     MppPortType         type;
53     MppTaskQueueImpl    *queue;
54 
55     MppTaskStatus       status_curr;
56     MppTaskStatus       next_on_dequeue;
57     MppTaskStatus       next_on_enqueue;
58 } MppPortImpl;
59 
60 static const char *module_name = MODULE_TAG;
61 static const char *port_type_str[] = {
62     "input",
63     "output",
64     "NULL",
65 };
66 
67 static const char *task_status_str[] = {
68     "input_port",
69     "input_hold",
70     "output_port",
71     "output_hold",
72     "NULL",
73 };
74 
75 RK_U32 mpp_task_debug = 0;
76 
setup_mpp_task_name(MppTaskImpl * task)77 static inline void setup_mpp_task_name(MppTaskImpl *task)
78 {
79     task->name = module_name;
80 }
81 
check_mpp_task_name(MppTask task)82 MPP_RET check_mpp_task_name(MppTask task)
83 {
84     if (task && ((MppTaskImpl *)task)->name == module_name)
85         return MPP_OK;
86 
87     mpp_err_f("pointer %p failed on check\n", task);
88     mpp_abort();
89     return MPP_NOK;
90 }
91 
mpp_port_init(MppTaskQueueImpl * queue,MppPortType type,MppPort * port)92 static MPP_RET mpp_port_init(MppTaskQueueImpl *queue, MppPortType type, MppPort *port)
93 {
94     MppPortImpl *impl = mpp_malloc(MppPortImpl, 1);
95     if (!impl) {
96         mpp_err_f("failed to malloc MppPort type %d\n", type);
97         return MPP_ERR_MALLOC;
98     }
99 
100     mpp_task_dbg_func("enter queue %p type %d\n", queue, type);
101 
102     impl->type  = type;
103     impl->queue = queue;
104 
105     if (MPP_PORT_INPUT == type) {
106         impl->status_curr     = MPP_INPUT_PORT;
107         impl->next_on_dequeue = MPP_INPUT_HOLD;
108         impl->next_on_enqueue = MPP_OUTPUT_PORT;
109     } else {
110         impl->status_curr     = MPP_OUTPUT_PORT;
111         impl->next_on_dequeue = MPP_OUTPUT_HOLD;
112         impl->next_on_enqueue = MPP_INPUT_PORT;
113     }
114 
115     *port = (MppPort *)impl;
116 
117     mpp_task_dbg_func("leave queue %p port %p\n", queue, impl);
118 
119     return MPP_OK;
120 }
121 
mpp_port_deinit(MppPort port)122 static MPP_RET mpp_port_deinit(MppPort port)
123 {
124     mpp_task_dbg_func("enter port %p\n", port);
125     mpp_free(port);
126     mpp_task_dbg_func("leave\n");
127     return MPP_OK;
128 }
129 
_mpp_port_poll(const char * caller,MppPort port,MppPollType timeout)130 MPP_RET _mpp_port_poll(const char *caller, MppPort port, MppPollType timeout)
131 {
132     MppPortImpl *port_impl = (MppPortImpl *)port;
133     MppTaskQueueImpl *queue = port_impl->queue;
134     MppTaskStatusInfo *curr = NULL;
135     MPP_RET ret = MPP_NOK;
136 
137     mpp_mutex_lock(&queue->lock);
138 
139     mpp_task_dbg_func("enter port %p\n", port);
140     if (!queue->ready) {
141         mpp_err("try to query when %s queue is not ready\n",
142                 port_type_str[port_impl->type]);
143         goto RET;
144     }
145 
146     curr = &queue->info[port_impl->status_curr];
147     if (curr->count) {
148         mpp_assert(!list_empty(&curr->list));
149         ret = (MPP_RET)curr->count;
150         mpp_task_dbg_flow("mpp %p %s from %s poll %s port timeout %d count %d\n",
151                           queue->mpp, queue->name, caller,
152                           port_type_str[port_impl->type],
153                           timeout, curr->count);
154     } else {
155         mpp_assert(list_empty(&curr->list));
156 
157         /* timeout
158          * zero     - non-block
159          * negtive  - block
160          * positive - timeout value
161          */
162         if (timeout) {
163             MppCond *cond = &curr->cond;
164 
165             if (timeout < 0) {
166                 mpp_task_dbg_flow("mpp %p %s from %s poll %s port block wait start\n",
167                                   queue->mpp, queue->name, caller,
168                                   port_type_str[port_impl->type]);
169 
170                 ret = (MPP_RET)mpp_cond_wait(cond, &queue->lock);
171             } else {
172                 mpp_task_dbg_flow("mpp %p %s from %s poll %s port %d timeout wait start\n",
173                                   queue->mpp, queue->name, caller,
174                                   port_type_str[port_impl->type], timeout);
175                 ret = (MPP_RET)mpp_cond_timedwait(cond, &queue->lock, timeout);
176             }
177 
178             if (curr->count) {
179                 mpp_assert(!list_empty(&curr->list));
180                 ret = (MPP_RET)curr->count;
181             } else if (ret > 0)
182                 ret = MPP_NOK;
183         }
184 
185         mpp_task_dbg_flow("mpp %p %s from %s poll %s port timeout %d ret %d\n",
186                           queue->mpp, queue->name, caller,
187                           port_type_str[port_impl->type], timeout, ret);
188     }
189 RET:
190     mpp_mutex_unlock(&queue->lock);
191     mpp_task_dbg_func("leave\n");
192     return ret;
193 }
194 
_mpp_port_move(const char * caller,MppPort port,MppTask task,MppTaskStatus status)195 MPP_RET _mpp_port_move(const char *caller, MppPort port, MppTask task,
196                        MppTaskStatus status)
197 {
198     MppTaskImpl *task_impl = (MppTaskImpl *)task;
199     MppPortImpl *port_impl = (MppPortImpl *)port;
200     MppTaskQueueImpl *queue = port_impl->queue;
201     MppTaskStatusInfo *curr = NULL;
202     MppTaskStatusInfo *next = NULL;
203     MPP_RET ret = MPP_NOK;
204 
205     mpp_mutex_lock(&queue->lock);
206 
207     mpp_task_dbg_func("caller %s enter port %p task %p\n", caller, port, task);
208 
209     if (!queue->ready) {
210         mpp_err("try to move task when %s queue is not ready\n",
211                 port_type_str[port_impl->type]);
212         goto RET;
213     }
214 
215     check_mpp_task_name(task);
216 
217     mpp_assert(task_impl->queue == (MppTaskQueue)queue);
218 
219     curr = &queue->info[task_impl->status];
220     next = &queue->info[status];
221 
222     list_del_init(&task_impl->list);
223     curr->count--;
224     list_add_tail(&task_impl->list, &next->list);
225     next->count++;
226 
227     mpp_task_dbg_flow("mpp %p %s from %s move %s port task %p %s -> %s done\n",
228                       queue->mpp, queue->name, caller,
229                       port_type_str[port_impl->type], task_impl,
230                       task_status_str[task_impl->status],
231                       task_status_str[status]);
232     task_impl->status = status;
233 
234     mpp_cond_signal(&next->cond);
235     mpp_task_dbg_func("signal port %p\n", next);
236     ret = MPP_OK;
237 RET:
238     mpp_task_dbg_func("caller %s leave port %p task %p ret %d\n", caller, port, task, ret);
239     mpp_mutex_unlock(&queue->lock);
240 
241     return ret;
242 }
243 
_mpp_port_dequeue(const char * caller,MppPort port,MppTask * task)244 MPP_RET _mpp_port_dequeue(const char *caller, MppPort port, MppTask *task)
245 {
246     MppPortImpl *port_impl = (MppPortImpl *)port;
247     MppTaskQueueImpl *queue = port_impl->queue;
248     MppTaskStatusInfo *curr = NULL;
249     MppTaskStatusInfo *next = NULL;
250     MppTaskImpl *task_impl = NULL;
251     MppTask p = NULL;
252     MPP_RET ret = MPP_NOK;
253 
254     mpp_mutex_lock(&queue->lock);
255 
256     mpp_task_dbg_func("caller %s enter port %p\n", caller, port);
257 
258     if (!queue->ready) {
259         mpp_err("try to dequeue when %s queue is not ready\n",
260                 port_type_str[port_impl->type]);
261         goto RET;
262     }
263 
264     curr = &queue->info[port_impl->status_curr];
265     next = &queue->info[port_impl->next_on_dequeue];
266 
267     *task = NULL;
268     if (curr->count == 0) {
269         mpp_assert(list_empty(&curr->list));
270         mpp_task_dbg_flow("mpp %p %s from %s dequeue %s port task %s -> %s failed\n",
271                           queue->mpp, queue->name, caller,
272                           port_type_str[port_impl->type],
273                           task_status_str[port_impl->status_curr],
274                           task_status_str[port_impl->next_on_dequeue]);
275         goto RET;
276     }
277 
278     mpp_assert(!list_empty(&curr->list));
279     task_impl = list_entry(curr->list.next, MppTaskImpl, list);
280     p = (MppTask)task_impl;
281     check_mpp_task_name(p);
282     list_del_init(&task_impl->list);
283     curr->count--;
284     mpp_assert(curr->count >= 0);
285 
286     list_add_tail(&task_impl->list, &next->list);
287     next->count++;
288     task_impl->status = next->status;
289 
290     mpp_task_dbg_flow("mpp %p %s from %s dequeue %s port task %p %s -> %s done\n",
291                       queue->mpp, queue->name, caller,
292                       port_type_str[port_impl->type], task_impl,
293                       task_status_str[port_impl->status_curr],
294                       task_status_str[port_impl->next_on_dequeue]);
295 
296     *task = p;
297     ret = MPP_OK;
298 RET:
299     mpp_task_dbg_func("caller %s leave port %p task %p ret %d\n", caller, port, *task, ret);
300     mpp_mutex_unlock(&queue->lock);
301 
302     return ret;
303 }
304 
_mpp_port_enqueue(const char * caller,MppPort port,MppTask task)305 MPP_RET _mpp_port_enqueue(const char *caller, MppPort port, MppTask task)
306 {
307     MppTaskImpl *task_impl = (MppTaskImpl *)task;
308     MppPortImpl *port_impl = (MppPortImpl *)port;
309     MppTaskQueueImpl *queue = port_impl->queue;
310     MppTaskStatusInfo *curr = NULL;
311     MppTaskStatusInfo *next = NULL;
312     MPP_RET ret = MPP_NOK;
313 
314     mpp_mutex_lock(&queue->lock);
315 
316     mpp_task_dbg_func("caller %s enter port %p task %p\n", caller, port, task);
317 
318     if (!queue->ready) {
319         mpp_err("try to enqueue when %s queue is not ready\n",
320                 port_type_str[port_impl->type]);
321         mpp_mutex_unlock(&queue->lock);
322         goto RET;
323     }
324 
325     check_mpp_task_name(task);
326 
327     mpp_assert(task_impl->queue  == (MppTaskQueue)queue);
328     mpp_assert(task_impl->status == port_impl->next_on_dequeue);
329 
330     curr = &queue->info[task_impl->status];
331     next = &queue->info[port_impl->next_on_enqueue];
332 
333     list_del_init(&task_impl->list);
334     curr->count--;
335     list_add_tail(&task_impl->list, &next->list);
336     next->count++;
337     task_impl->status = next->status;
338 
339     mpp_task_dbg_flow("mpp %p %s from %s enqueue %s port task %p %s -> %s done\n",
340                       queue->mpp, queue->name, caller,
341                       port_type_str[port_impl->type], task_impl,
342                       task_status_str[port_impl->next_on_dequeue],
343                       task_status_str[port_impl->next_on_enqueue]);
344 
345     mpp_cond_signal(&next->cond);
346     mpp_task_dbg_func("signal port %p\n", next);
347     ret = MPP_OK;
348 RET:
349     mpp_task_dbg_func("caller %s leave port %p task %p ret %d\n", caller, port, task, ret);
350     mpp_mutex_unlock(&queue->lock);
351 
352     return ret;
353 }
354 
_mpp_port_awake(const char * caller,MppPort port)355 MPP_RET _mpp_port_awake(const char *caller, MppPort port)
356 {
357     if (port == NULL)
358         return MPP_NOK;
359 
360     mpp_task_dbg_func("caller %s enter port %p\n", caller, port);
361     MppPortImpl *port_impl = (MppPortImpl *)port;
362     MppTaskQueueImpl *queue = port_impl->queue;
363     MppTaskStatusInfo *curr = NULL;
364     if (queue) {
365         mpp_mutex_lock(&queue->lock);
366         curr = &queue->info[port_impl->status_curr];
367         if (curr) {
368             mpp_cond_signal(&curr->cond);
369         }
370         mpp_mutex_unlock(&queue->lock);
371     }
372 
373     mpp_task_dbg_func("caller %s leave port %p\n", caller, port);
374     return MPP_OK;
375 }
376 
mpp_task_queue_init(MppTaskQueue * queue,void * mpp,const char * name)377 MPP_RET mpp_task_queue_init(MppTaskQueue *queue, void *mpp, const char *name)
378 {
379     if (!queue) {
380         mpp_err_f("invalid NULL input\n");
381         return MPP_ERR_NULL_PTR;
382     }
383 
384     MPP_RET ret = MPP_NOK;
385     MppTaskQueueImpl *p = NULL;
386     RK_S32 i;
387 
388     mpp_env_get_u32("mpp_task_debug", &mpp_task_debug, 0);
389     mpp_task_dbg_func("enter\n");
390 
391     *queue = NULL;
392 
393     p = mpp_calloc(MppTaskQueueImpl, 1);
394     if (!p) {
395         mpp_err_f("malloc queue failed\n");
396         goto RET;
397     }
398 
399     for (i = 0; i < MPP_TASK_STATUS_BUTT; i++) {
400         INIT_LIST_HEAD(&p->info[i].list);
401         p->info[i].count  = 0;
402         p->info[i].status = (MppTaskStatus)i;
403         if (i == MPP_INPUT_PORT || i == MPP_OUTPUT_PORT)
404             mpp_cond_init(&p->info[i].cond);
405     }
406 
407     mpp_mutex_init(&p->lock);
408 
409     if (mpp_port_init(p, MPP_PORT_INPUT, &p->input))
410         goto RET;
411 
412     if (mpp_port_init(p, MPP_PORT_OUTPUT, &p->output)) {
413         mpp_port_deinit(p->input);
414         goto RET;
415     }
416 
417     p->mpp = mpp;
418     if (name)
419         strncpy(p->name, name, sizeof(p->name) - 1);
420     else
421         strncpy(p->name, "none", sizeof(p->name) - 1);
422 
423     ret = MPP_OK;
424 RET:
425     if (ret) {
426         mpp_mutex_destroy(&p->lock);
427         mpp_cond_destroy(&p->info[MPP_INPUT_PORT].cond);
428         mpp_cond_destroy(&p->info[MPP_OUTPUT_PORT].cond);
429         MPP_FREE(p);
430     }
431 
432     *queue = p;
433 
434     mpp_task_dbg_func("leave ret %d queue %p\n", ret, p);
435     return ret;
436 }
437 
mpp_task_queue_setup(MppTaskQueue queue,RK_S32 task_count)438 MPP_RET mpp_task_queue_setup(MppTaskQueue queue, RK_S32 task_count)
439 {
440     MppTaskQueueImpl *impl = (MppTaskQueueImpl *)queue;
441     MppTaskStatusInfo *info;
442     MppTaskImpl *tasks;
443     RK_S32 i;
444 
445     mpp_mutex_lock(&impl->lock);
446 
447     // NOTE: queue can only be setup once
448     mpp_assert(impl->tasks == NULL);
449     mpp_assert(impl->task_count == 0);
450     tasks = mpp_calloc(MppTaskImpl, task_count);
451     if (!tasks) {
452         mpp_err_f("malloc tasks list failed\n");
453         mpp_mutex_unlock(&impl->lock);
454         return MPP_ERR_MALLOC;
455     }
456 
457     impl->tasks = tasks;
458     impl->task_count = task_count;
459 
460     info = &impl->info[MPP_INPUT_PORT];
461 
462     for (i = 0; i < task_count; i++) {
463         setup_mpp_task_name(&tasks[i]);
464         INIT_LIST_HEAD(&tasks[i].list);
465         tasks[i].index  = i;
466         tasks[i].queue  = queue;
467         tasks[i].status = MPP_INPUT_PORT;
468         mpp_meta_get(&tasks[i].meta);
469 
470         list_add_tail(&tasks[i].list, &info->list);
471         info->count++;
472     }
473     impl->ready = 1;
474 
475     mpp_mutex_unlock(&impl->lock);
476 
477     return MPP_OK;
478 }
479 
mpp_task_queue_deinit(MppTaskQueue queue)480 MPP_RET mpp_task_queue_deinit(MppTaskQueue queue)
481 {
482     MppTaskQueueImpl *p = (MppTaskQueueImpl *)queue;
483     RK_S32 i;
484 
485     if (!p) {
486         mpp_err_f("found NULL input queue\n");
487         return MPP_ERR_NULL_PTR;
488     }
489 
490     mpp_mutex_lock(&p->lock);
491 
492     p->ready = 0;
493     mpp_cond_signal(&p->info[MPP_INPUT_PORT].cond);
494     mpp_cond_signal(&p->info[MPP_OUTPUT_PORT].cond);
495 
496     if (p->tasks) {
497         for (i = 0; i < p->task_count; i++) {
498             MppMeta meta = p->tasks[i].meta;
499 
500             /* we must ensure that all task return to init status */
501             if (mpp_meta_size(meta)) {
502                 mpp_err_f("%s queue idx %d task %p status %d meta size %d\n",
503                           p->name, i, &p->tasks[i], p->tasks[i].status,
504                           mpp_meta_size(meta));
505                 mpp_meta_dump(meta);
506             }
507             mpp_meta_put(p->tasks[i].meta);
508         }
509         mpp_free(p->tasks);
510     }
511 
512     if (p->input) {
513         mpp_port_deinit(p->input);
514         p->input = NULL;
515     }
516     if (p->output) {
517         mpp_port_deinit(p->output);
518         p->output = NULL;
519     }
520     mpp_mutex_unlock(&p->lock);
521     mpp_mutex_destroy(&p->lock);
522 
523     mpp_cond_destroy(&p->info[MPP_INPUT_PORT].cond);
524     mpp_cond_destroy(&p->info[MPP_OUTPUT_PORT].cond);
525 
526     mpp_free(p);
527     return MPP_OK;
528 }
529 
mpp_task_queue_get_port(MppTaskQueue queue,MppPortType type)530 MppPort mpp_task_queue_get_port(MppTaskQueue queue, MppPortType type)
531 {
532     if (!queue || type >= MPP_PORT_BUTT) {
533         mpp_err_f("invalid input queue %p type %d\n", queue, type);
534         return NULL;
535     }
536 
537     MppTaskQueueImpl *impl = (MppTaskQueueImpl *)queue;
538     return (type == MPP_PORT_INPUT) ? (impl->input) : (impl->output);
539 }
540 
mpp_task_get_meta(MppTask task)541 MppMeta mpp_task_get_meta(MppTask task)
542 {
543     MppMeta meta = NULL;
544 
545     if (task)
546         meta = ((MppTaskImpl *)task)->meta;
547 
548     return meta;
549 }
550