1 /* SPDX-License-Identifier: Apache-2.0 OR MIT */
2 /*
3 * Copyright (c) 2021 Rockchip Electronics Co., Ltd.
4 */
5
6 #define MODULE_TAG "mpp_cluster"
7
8 #include <string.h>
9
10 #include "mpp_mem.h"
11 #include "mpp_env.h"
12 #include "mpp_lock.h"
13 #include "mpp_time.h"
14 #include "mpp_debug.h"
15 #include "mpp_common.h"
16 #include "mpp_singleton.h"
17
18 #include "mpp_cluster.h"
19 #include "mpp_dev_defs.h"
20
21 #define MPP_CLUSTER_DBG_FLOW (0x00000001)
22 #define MPP_CLUSTER_DBG_LOCK (0x00000002)
23
24 #define cluster_dbg(flag, fmt, ...) _mpp_dbg(mpp_cluster_debug, flag, fmt, ## __VA_ARGS__)
25 #define cluster_dbg_f(flag, fmt, ...) _mpp_dbg_f(mpp_cluster_debug, flag, fmt, ## __VA_ARGS__)
26
27 #define cluster_dbg_flow(fmt, ...) cluster_dbg(MPP_CLUSTER_DBG_FLOW, fmt, ## __VA_ARGS__)
28 #define cluster_dbg_lock(fmt, ...) cluster_dbg(MPP_CLUSTER_DBG_LOCK, fmt, ## __VA_ARGS__)
29
30 RK_U32 mpp_cluster_debug = 0;
31 RK_U32 mpp_cluster_thd_cnt = 1;
32
33 typedef struct MppNodeProc_s MppNodeProc;
34 typedef struct MppNodeTask_s MppNodeTask;
35 typedef struct MppNodeImpl_s MppNodeImpl;
36
37 typedef struct ClusterQueue_s ClusterQueue;
38 typedef struct ClusterWorker_s ClusterWorker;
39 typedef struct MppCluster_s MppCluster;
40
41 #define NODE_VALID (0x00000001)
42 #define NODE_IDLE (0x00000002)
43 #define NODE_SIGNAL (0x00000004)
44 #define NODE_WAIT (0x00000008)
45 #define NODE_RUN (0x00000010)
46
47 #define NODE_ACT_NONE (0x00000000)
48 #define NODE_ACT_IDLE_TO_WAIT (0x00000001)
49 #define NODE_ACT_RUN_TO_SIGNAL (0x00000002)
50
51 typedef enum MppWorkerState_e {
52 WORKER_IDLE,
53 WORKER_RUNNING,
54
55 WORKER_STATE_BUTT,
56 } MppWorkerState;
57
58 struct MppNodeProc_s {
59 TaskProc proc;
60 void *param;
61
62 /* timing statistic */
63 RK_U32 run_count;
64 RK_S64 run_time;
65 };
66
67 struct MppNodeTask_s {
68 struct list_head list_sched;
69 MppNodeImpl *node;
70 const char *node_name;
71
72 /* lock ptr to cluster queue lock */
73 ClusterQueue *queue;
74
75 MppNodeProc *proc;
76 };
77
78 /* MppNode will be embeded in MppCtx */
79 struct MppNodeImpl_s {
80 char name[32];
81 /* list linked to scheduler */
82 RK_S32 node_id;
83 RK_U32 state;
84
85 MppNodeProc work;
86
87 RK_U32 priority;
88 RK_S32 attached;
89 sem_t sem_detach;
90
91 /* for cluster schedule */
92 MppNodeTask task;
93 };
94
95 struct ClusterQueue_s {
96 MppCluster *cluster;
97
98 pthread_mutex_t lock;
99 struct list_head list;
100 RK_S32 count;
101 };
102
103 struct ClusterWorker_s {
104 char name[32];
105 MppCluster *cluster;
106 RK_S32 worker_id;
107
108 MppThread *thd;
109 MppWorkerState state;
110
111 RK_S32 batch_count;
112 RK_S32 work_count;
113 struct list_head list_task;
114 };
115
116 struct MppCluster_s {
117 char name[16];
118 pid_t pid;
119 RK_S32 client_type;
120 RK_S32 node_id;
121 RK_S32 worker_id;
122
123 ClusterQueue queue[MAX_PRIORITY];
124 RK_S32 node_count;
125
126 /* multi-worker info */
127 RK_S32 worker_count;
128 ClusterWorker *worker;
129 MppThreadFunc worker_func;
130 };
131
132 typedef struct MppClusterServer_s {
133 MppMutex mutex;
134 MppCluster *clusters[VPU_CLIENT_BUTT];
135 } MppClusterServer;
136
137 static MppClusterServer *srv_cluster = NULL;
138
139 static MppCluster *cluster_server_get(MppClientType client_type);
140 static MPP_RET cluster_server_put(MppClientType client_type);
141
142 #define mpp_node_task_schedule(task) \
143 mpp_node_task_schedule_f(__FUNCTION__, task)
144
145 #define mpp_node_task_schedule_from(caller, task) \
146 mpp_node_task_schedule_f(caller, task)
147
148 #define cluster_queue_lock(queue) cluster_queue_lock_f(__FUNCTION__, queue)
149 #define cluster_queue_unlock(queue) cluster_queue_unlock_f(__FUNCTION__, queue)
150
151 #define get_srv_cluster_f() \
152 ({ \
153 MppClusterServer *__tmp; \
154 if (srv_cluster) { \
155 __tmp = srv_cluster; \
156 } else { \
157 mpp_cluster_srv_init(); \
158 __tmp = srv_cluster; \
159 if (!__tmp) \
160 mpp_err("mpp cluster srv not init at %s\n", __FUNCTION__); \
161 } \
162 __tmp; \
163 })
164
cluster_queue_lock_f(const char * caller,ClusterQueue * queue)165 static MPP_RET cluster_queue_lock_f(const char *caller, ClusterQueue *queue)
166 {
167 MppCluster *cluster = queue->cluster;
168 RK_S32 ret;
169
170 cluster_dbg_lock("%s lock queue at %s start\n", cluster->name, caller);
171
172 ret = pthread_mutex_lock(&queue->lock);
173
174 cluster_dbg_lock("%s lock queue at %s ret %d \n", cluster->name, caller, ret);
175
176 return (ret) ? MPP_NOK : MPP_OK;
177 }
178
cluster_queue_unlock_f(const char * caller,ClusterQueue * queue)179 static MPP_RET cluster_queue_unlock_f(const char *caller, ClusterQueue *queue)
180 {
181 MppCluster *cluster = queue->cluster;
182 RK_S32 ret;
183
184 cluster_dbg_lock("%s unlock queue at %s start\n", cluster->name, caller);
185
186 ret = pthread_mutex_unlock(&queue->lock);
187
188 cluster_dbg_lock("%s unlock queue at %s ret %d \n", cluster->name, caller, ret);
189
190 return (ret) ? MPP_NOK : MPP_OK;
191 }
192
193 void cluster_signal_f(const char *caller, MppCluster *p);
194
mpp_cluster_queue_init(ClusterQueue * queue,MppCluster * cluster)195 MPP_RET mpp_cluster_queue_init(ClusterQueue *queue, MppCluster *cluster)
196 {
197 pthread_mutexattr_t attr;
198
199 pthread_mutexattr_init(&attr);
200 pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
201 pthread_mutex_init(&queue->lock, &attr);
202 pthread_mutexattr_destroy(&attr);
203
204 queue->cluster = cluster;
205 INIT_LIST_HEAD(&queue->list);
206 queue->count = 0;
207
208 return MPP_OK;
209 }
210
mpp_cluster_queue_deinit(ClusterQueue * queue)211 MPP_RET mpp_cluster_queue_deinit(ClusterQueue *queue)
212 {
213 mpp_assert(!queue->count);
214 mpp_assert(list_empty(&queue->list));
215
216 pthread_mutex_destroy(&queue->lock);
217
218 return MPP_OK;
219 }
220
mpp_node_task_attach(MppNodeTask * task,MppNodeImpl * node,ClusterQueue * queue,MppNodeProc * proc)221 MPP_RET mpp_node_task_attach(MppNodeTask *task, MppNodeImpl *node,
222 ClusterQueue *queue, MppNodeProc *proc)
223 {
224 INIT_LIST_HEAD(&task->list_sched);
225
226 task->node = node;
227 task->node_name = node->name;
228
229 task->queue = queue;
230 task->proc = proc;
231
232 node->state = NODE_VALID | NODE_IDLE;
233 node->attached = 1;
234
235 return MPP_OK;
236 }
237
mpp_node_task_schedule_f(const char * caller,MppNodeTask * task)238 MPP_RET mpp_node_task_schedule_f(const char *caller, MppNodeTask *task)
239 {
240 ClusterQueue *queue = task->queue;
241 MppCluster *cluster = queue->cluster;
242 MppNodeImpl *node = task->node;
243 MppNodeProc *proc = task->proc;
244 const char *node_name = task->node_name;
245 RK_U32 new_st;
246 RK_U32 action = NODE_ACT_NONE;
247 bool ret = false;
248
249 cluster_dbg_flow("%s sched from %s before [%d:%d] queue %d\n",
250 node_name, caller, node->state, proc->run_count, queue->count);
251
252 do {
253 RK_U32 old_st = node->state;
254
255 action = NODE_ACT_NONE;
256 new_st = 0;
257
258 if (old_st & NODE_WAIT) {
259 cluster_dbg_flow("%s sched task %x stay wait\n", node_name, old_st);
260 break;
261 }
262
263 if (old_st & NODE_IDLE) {
264 new_st = old_st ^ (NODE_IDLE | NODE_WAIT);
265 cluster_dbg_flow("%s sched task %x -> %x wait\n", node_name, old_st, new_st);
266 action = NODE_ACT_IDLE_TO_WAIT;
267 } else if (old_st & NODE_RUN) {
268 new_st = old_st | NODE_SIGNAL;
269 action = NODE_ACT_RUN_TO_SIGNAL;
270 cluster_dbg_flow("%s sched task %x -> %x signal\n", node_name, old_st, new_st);
271 } else {
272 cluster_dbg_flow("%s sched task %x unknow state %x\n", node_name, old_st);
273 }
274
275 ret = MPP_BOOL_CAS(&node->state, old_st, new_st);
276 cluster_dbg_flow("%s sched task %x -> %x cas ret %d act %d\n",
277 node_name, old_st, new_st, ret, action);
278 } while (!ret);
279
280 switch (action) {
281 case NODE_ACT_IDLE_TO_WAIT : {
282 cluster_queue_lock(queue);
283 mpp_assert(list_empty(&task->list_sched));
284 list_add_tail(&task->list_sched, &queue->list);
285 queue->count++;
286 cluster_dbg_flow("%s sched task -> wq %s:%d\n", node_name, cluster->name, queue->count);
287 cluster_queue_unlock(queue);
288
289 cluster_dbg_flow("%s sched signal from %s\n", node_name, caller);
290 cluster_signal_f(caller, cluster);
291 } break;
292 case NODE_ACT_RUN_TO_SIGNAL : {
293 cluster_dbg_flow("%s sched signal from %s\n", node_name, caller);
294 cluster_signal_f(caller, cluster);
295 } break;
296 }
297
298 cluster_dbg_flow("%s sched from %s after [%d:%d] queue %d\n",
299 node_name, caller, node->state, proc->run_count, queue->count);
300
301 return MPP_OK;
302 }
303
mpp_node_task_detach(MppNodeTask * task)304 MPP_RET mpp_node_task_detach(MppNodeTask *task)
305 {
306 MppNodeImpl *node = task->node;
307 MPP_RET ret = MPP_OK;
308
309 if (node->attached) {
310 const char *node_name = task->node_name;
311 MppNodeProc *proc = task->proc;
312
313 MPP_FETCH_AND(&node->state, ~NODE_VALID);
314
315 mpp_node_task_schedule(task);
316
317 cluster_dbg_flow("%s state %x:%d wait detach start\n",
318 node_name, node->state, proc->run_count);
319
320 sem_wait(&node->sem_detach);
321 mpp_assert(node->attached == 0);
322
323 cluster_dbg_flow("%s state %x:%d wait detach done\n",
324 node_name, node->state, proc->run_count);
325 }
326
327 return ret;
328 }
329
mpp_node_init(MppNode * node)330 MPP_RET mpp_node_init(MppNode *node)
331 {
332 MppNodeImpl *p = mpp_calloc(MppNodeImpl, 1);
333 if (p)
334 sem_init(&p->sem_detach, 0, 0);
335
336 *node = p;
337
338 return p ? MPP_OK : MPP_NOK;
339 }
340
mpp_node_deinit(MppNode node)341 MPP_RET mpp_node_deinit(MppNode node)
342 {
343 MppNodeImpl *p = (MppNodeImpl *)node;
344
345 if (p) {
346 if (p->attached)
347 mpp_node_task_detach(&p->task);
348
349 mpp_assert(p->attached == 0);
350
351 sem_destroy(&p->sem_detach);
352
353 mpp_free(p);
354 }
355
356 return MPP_OK;
357 }
358
mpp_node_set_func(MppNode node,TaskProc proc,void * param)359 MPP_RET mpp_node_set_func(MppNode node, TaskProc proc, void *param)
360 {
361 MppNodeImpl *p = (MppNodeImpl *)node;
362 if (!p)
363 return MPP_NOK;
364
365 p->work.proc = proc;
366 p->work.param = param;
367
368 return MPP_OK;
369 }
370
cluster_worker_init(ClusterWorker * p,MppCluster * cluster)371 MPP_RET cluster_worker_init(ClusterWorker *p, MppCluster *cluster)
372 {
373 MppThread *thd = NULL;
374 MPP_RET ret = MPP_NOK;
375
376 INIT_LIST_HEAD(&p->list_task);
377 p->worker_id = cluster->worker_id++;
378
379 p->batch_count = 1;
380 p->work_count = 0;
381 p->cluster = cluster;
382 p->state = WORKER_IDLE;
383 snprintf(p->name, sizeof(p->name) - 1, "%d:W%d", cluster->pid, p->worker_id);
384 thd = mpp_thread_create(cluster->worker_func, p, p->name);
385 if (thd) {
386 p->thd = thd;
387 mpp_thread_start(thd);
388 ret = MPP_OK;
389 }
390
391 return ret;
392 }
393
cluster_worker_deinit(ClusterWorker * p)394 MPP_RET cluster_worker_deinit(ClusterWorker *p)
395 {
396 if (p->thd) {
397 mpp_thread_stop(p->thd);
398 mpp_thread_destroy(p->thd);
399 p->thd = NULL;
400 }
401
402 mpp_assert(list_empty(&p->list_task));
403 mpp_assert(p->work_count == 0);
404
405 p->batch_count = 0;
406 p->cluster = NULL;
407
408 return MPP_OK;
409 }
410
cluster_worker_get_task(ClusterWorker * p)411 RK_S32 cluster_worker_get_task(ClusterWorker *p)
412 {
413 MppCluster *cluster = p->cluster;
414 RK_S32 batch_count = p->batch_count;
415 RK_S32 count = 0;
416 RK_U32 new_st;
417 RK_U32 old_st;
418 bool ret;
419 RK_S32 i;
420
421 cluster_dbg_flow("%s get %d task start\n", p->name, batch_count);
422
423 for (i = 0; i < MAX_PRIORITY; i++) {
424 ClusterQueue *queue = &cluster->queue[i];
425 MppNodeTask *task = NULL;
426 MppNodeImpl *node = NULL;
427
428 do {
429 cluster_queue_lock(queue);
430
431 if (list_empty(&queue->list)) {
432 mpp_assert(queue->count == 0);
433 cluster_dbg_flow("%s get P%d task ret no task\n", p->name, i);
434 cluster_queue_unlock(queue);
435 break;
436 }
437
438 mpp_assert(queue->count);
439 task = list_first_entry(&queue->list, MppNodeTask, list_sched);
440 list_del_init(&task->list_sched);
441 node = task->node;
442
443 queue->count--;
444
445 do {
446 old_st = node->state;
447 new_st = old_st ^ (NODE_WAIT | NODE_RUN);
448
449 mpp_assert(old_st & NODE_WAIT);
450 ret = MPP_BOOL_CAS(&node->state, old_st, new_st);
451 } while (!ret);
452
453 list_add_tail(&task->list_sched, &p->list_task);
454 p->work_count++;
455 count++;
456
457 cluster_dbg_flow("%s get P%d %s -> rq %d\n", p->name, i, node->name, p->work_count);
458
459 cluster_queue_unlock(queue);
460
461 if (count >= batch_count)
462 break;
463 } while (1);
464
465 if (count >= batch_count)
466 break;
467 }
468
469 cluster_dbg_flow("%s get %d task ret %d\n", p->name, batch_count, count);
470
471 return count;
472 }
473
cluster_worker_run_task(ClusterWorker * p)474 static void cluster_worker_run_task(ClusterWorker *p)
475 {
476 RK_U32 new_st;
477 RK_U32 old_st;
478 bool cas_ret;
479
480 cluster_dbg_flow("%s run %d work start\n", p->name, p->work_count);
481
482 while (!list_empty(&p->list_task)) {
483 MppNodeTask *task = list_first_entry(&p->list_task, MppNodeTask, list_sched);
484 MppNodeProc *proc = task->proc;
485 MppNodeImpl *node = task->node;
486 RK_S64 time_start;
487 RK_S64 time_end;
488 RK_U32 state;
489 MPP_RET proc_ret;
490
491 /* check trigger for re-add task */
492 cluster_dbg_flow("%s run %s start atate %d\n", p->name, task->node_name, node->state);
493 mpp_assert(node->state & NODE_RUN);
494 if (!(node->state & NODE_RUN))
495 mpp_err_f("%s run state check %x is invalid on run", p->name, node->state);
496
497 time_start = mpp_time();
498 proc_ret = proc->proc(proc->param);
499 time_end = mpp_time();
500
501 cluster_dbg_flow("%s run %s ret %d\n", p->name, task->node_name, proc_ret);
502 proc->run_time += time_end - time_start;
503 proc->run_count++;
504
505 state = node->state;
506 if (!(state & NODE_VALID)) {
507 cluster_dbg_flow("%s run found destroy\n", p->name);
508 list_del_init(&task->list_sched);
509 node->attached = 0;
510
511 sem_post(&node->sem_detach);
512 cluster_dbg_flow("%s run sem post done\n", p->name);
513 } else if (state & NODE_SIGNAL) {
514 ClusterQueue *queue = task->queue;
515
516 list_del_init(&task->list_sched);
517
518 do {
519 old_st = state;
520 // NOTE: clear NODE_RUN and NODE_SIGNAL, set NODE_WAIT
521 new_st = old_st ^ (NODE_SIGNAL | NODE_WAIT | NODE_RUN);
522 cas_ret = MPP_BOOL_CAS(&node->state, old_st, new_st);
523 } while (!cas_ret);
524
525 cluster_dbg_flow("%s run state %x -> %x signal -> wait\n", p->name, old_st, new_st);
526
527 cluster_queue_lock(queue);
528 list_add_tail(&task->list_sched, &queue->list);
529 queue->count++;
530 cluster_queue_unlock(queue);
531 } else {
532 list_del_init(&task->list_sched);
533 do {
534 old_st = node->state;
535 new_st = old_st ^ (NODE_IDLE | NODE_RUN);
536
537 cas_ret = MPP_BOOL_CAS(&node->state, old_st, new_st);
538 } while (!cas_ret);
539 mpp_assert(node->state & NODE_IDLE);
540 mpp_assert(!(node->state & NODE_RUN));
541
542 cluster_dbg_flow("%s run state %x -> %x run -> idle\n", p->name, old_st, new_st);
543 }
544
545 p->work_count--;
546 }
547
548 mpp_assert(p->work_count == 0);
549
550 cluster_dbg_flow("%s run all done\n", p->name);
551 }
552
cluster_worker(void * data)553 static void *cluster_worker(void *data)
554 {
555 ClusterWorker *p = (ClusterWorker *)data;
556 MppThread *thd = p->thd;
557
558 while (1) {
559 {
560 RK_S32 task_count = 0;
561
562 cluster_dbg_lock("%s lock start\n", p->name);
563 mpp_thread_lock(thd, THREAD_WORK);
564 cluster_dbg_lock("%s lock done\n", p->name);
565
566 if (MPP_THREAD_RUNNING != mpp_thread_get_status(thd, THREAD_WORK)) {
567 mpp_thread_unlock(thd, THREAD_WORK);
568 break;
569 }
570
571 task_count = cluster_worker_get_task(p);
572 if (!task_count) {
573 p->state = WORKER_IDLE;
574 mpp_thread_wait(thd, THREAD_WORK);
575 p->state = WORKER_RUNNING;
576 }
577 mpp_thread_unlock(thd, THREAD_WORK);
578 }
579
580 cluster_worker_run_task(p);
581 }
582
583 return NULL;
584 }
585
cluster_signal_f(const char * caller,MppCluster * p)586 void cluster_signal_f(const char *caller, MppCluster *p)
587 {
588 RK_S32 i;
589
590 cluster_dbg_flow("%s signal from %s\n", p->name, caller);
591
592 for (i = 0; i < p->worker_count; i++) {
593 ClusterWorker *worker = &p->worker[i];
594 MppThread *thd = worker->thd;
595
596 mpp_thread_lock(thd, THREAD_WORK);
597
598 if (worker->state == WORKER_IDLE) {
599 mpp_thread_signal(thd, THREAD_WORK);
600 cluster_dbg_flow("%s signal\n", p->name);
601 mpp_thread_unlock(thd, THREAD_WORK);
602 break;
603 }
604
605 mpp_thread_unlock(thd, THREAD_WORK);
606 }
607 }
608
mpp_cluster_srv_init(void)609 static void mpp_cluster_srv_init(void)
610 {
611 MppClusterServer *srv = srv_cluster;
612
613 mpp_env_get_u32("mpp_cluster_debug", &mpp_cluster_debug, 0);
614 mpp_env_get_u32("mpp_cluster_thd_cnt", &mpp_cluster_thd_cnt, 1);
615
616 if (srv)
617 return;
618
619 srv = mpp_calloc(MppClusterServer, 1);
620 if (!srv) {
621 mpp_err_f("failed to allocate cluster server\n");
622 return;
623 }
624
625 memset(srv->clusters, 0, sizeof(srv->clusters));
626 mpp_mutex_init(&srv->mutex);
627
628 srv_cluster = srv;
629 }
630
mpp_cluster_srv_deinit(void)631 static void mpp_cluster_srv_deinit(void)
632 {
633 MppClusterServer *srv = srv_cluster;
634 RK_S32 i;
635
636 if (!srv)
637 return;
638
639 for (i = 0; i < VPU_CLIENT_BUTT; i++)
640 cluster_server_put((MppClientType)i);
641
642 mpp_mutex_destroy(&srv->mutex);
643 MPP_FREE(srv_cluster);
644 }
645
MPP_SINGLETON(MPP_SGLN_CLUSTER,mpp_cluster,mpp_cluster_srv_init,mpp_cluster_srv_deinit)646 MPP_SINGLETON(MPP_SGLN_CLUSTER, mpp_cluster, mpp_cluster_srv_init, mpp_cluster_srv_deinit)
647
648 static MppCluster *cluster_server_get(MppClientType client_type)
649 {
650 MppClusterServer *srv = get_srv_cluster_f();
651 RK_S32 i;
652 MppCluster *p = NULL;
653
654 if (!srv || client_type >= VPU_CLIENT_BUTT)
655 goto done;
656
657 {
658 mpp_mutex_lock(&srv->mutex);
659
660 p = srv->clusters[client_type];
661 if (p) {
662 mpp_mutex_unlock(&srv->mutex);
663 goto done;
664 }
665
666 p = mpp_malloc(MppCluster, 1);
667 if (p) {
668 for (i = 0; i < MAX_PRIORITY; i++)
669 mpp_cluster_queue_init(&p->queue[i], p);
670
671 p->pid = getpid();
672 p->client_type = client_type;
673 snprintf(p->name, sizeof(p->name) - 1, "%d:%d", p->pid, client_type);
674 p->node_id = 0;
675 p->worker_id = 0;
676 p->worker_func = cluster_worker;
677 p->worker_count = mpp_cluster_thd_cnt;
678
679 mpp_assert(p->worker_count > 0);
680
681 p->worker = mpp_malloc(ClusterWorker, p->worker_count);
682
683 for (i = 0; i < p->worker_count; i++)
684 cluster_worker_init(&p->worker[i], p);
685
686 srv->clusters[client_type] = p;
687 cluster_dbg_flow("%s created\n", p->name);
688 }
689
690 mpp_mutex_unlock(&srv->mutex);
691 }
692
693 done:
694 if (p)
695 cluster_dbg_flow("%s get\n", p->name);
696 else
697 cluster_dbg_flow("%d get cluster %d failed\n", getpid(), client_type);
698
699 return p;
700 }
701
cluster_server_put(MppClientType client_type)702 static MPP_RET cluster_server_put(MppClientType client_type)
703 {
704 MppClusterServer *srv = get_srv_cluster_f();
705 MppCluster *p;
706 RK_S32 i;
707
708 if (!srv || client_type >= VPU_CLIENT_BUTT)
709 return MPP_NOK;
710
711 mpp_mutex_lock(&srv->mutex);
712
713 p = srv->clusters[client_type];
714 if (!p) {
715 mpp_mutex_unlock(&srv->mutex);
716 return MPP_NOK;
717 }
718
719 for (i = 0; i < p->worker_count; i++)
720 cluster_worker_deinit(&p->worker[i]);
721
722 for (i = 0; i < MAX_PRIORITY; i++)
723 mpp_cluster_queue_deinit(&p->queue[i]);
724
725 cluster_dbg_flow("put %s\n", p->name);
726
727 MPP_FREE(p->worker);
728 mpp_free(p);
729 srv->clusters[client_type] = NULL;
730
731 mpp_mutex_unlock(&srv->mutex);
732
733 return MPP_OK;
734 }
735
mpp_node_attach(MppNode node,MppClientType type)736 MPP_RET mpp_node_attach(MppNode node, MppClientType type)
737 {
738 MppNodeImpl *impl = (MppNodeImpl *)node;
739 MppCluster *p = cluster_server_get(type);
740 RK_U32 priority = impl->priority;
741 ClusterQueue *queue = &p->queue[priority];
742
743 mpp_assert(priority < MAX_PRIORITY);
744 mpp_assert(p);
745
746 impl->node_id = MPP_FETCH_ADD(&p->node_id, 1);
747
748 snprintf(impl->name, sizeof(impl->name) - 1, "%s:%d", p->name, impl->node_id);
749
750 mpp_node_task_attach(&impl->task, impl, queue, &impl->work);
751
752 MPP_FETCH_ADD(&p->node_count, 1);
753
754 cluster_dbg_flow("%s:%d attached %d\n", p->name, impl->node_id, p->node_count);
755
756 /* attach and run once first */
757 mpp_node_task_schedule(&impl->task);
758 cluster_dbg_flow("%s trigger signal from %s\n", impl->name, __FUNCTION__);
759
760 return MPP_OK;
761 }
762
mpp_node_detach(MppNode node)763 MPP_RET mpp_node_detach(MppNode node)
764 {
765 MppNodeImpl *impl = (MppNodeImpl *)node;
766
767 mpp_node_task_detach(&impl->task);
768 cluster_dbg_flow("%s detached\n", impl->name);
769
770 return MPP_OK;
771 }
772
mpp_node_trigger_f(const char * caller,MppNode node,RK_S32 trigger)773 MPP_RET mpp_node_trigger_f(const char *caller, MppNode node, RK_S32 trigger)
774 {
775 if (trigger) {
776 MppNodeImpl *impl = (MppNodeImpl *)node;
777
778 mpp_node_task_schedule_from(caller, &impl->task);
779 }
780
781 return MPP_OK;
782 }
783