xref: /rockchip-linux_mpp/mpp/base/mpp_cluster.c (revision 437bfbeb9567cca9cd9080e3f6954aa9d6a94f18)
1 /* SPDX-License-Identifier: Apache-2.0 OR MIT */
2 /*
3  * Copyright (c) 2021 Rockchip Electronics Co., Ltd.
4  */
5 
6 #define  MODULE_TAG "mpp_cluster"
7 
8 #include <string.h>
9 
10 #include "mpp_mem.h"
11 #include "mpp_env.h"
12 #include "mpp_lock.h"
13 #include "mpp_time.h"
14 #include "mpp_debug.h"
15 #include "mpp_common.h"
16 #include "mpp_singleton.h"
17 
18 #include "mpp_cluster.h"
19 #include "mpp_dev_defs.h"
20 
21 #define MPP_CLUSTER_DBG_FLOW            (0x00000001)
22 #define MPP_CLUSTER_DBG_LOCK            (0x00000002)
23 
24 #define cluster_dbg(flag, fmt, ...)     _mpp_dbg(mpp_cluster_debug, flag, fmt, ## __VA_ARGS__)
25 #define cluster_dbg_f(flag, fmt, ...)   _mpp_dbg_f(mpp_cluster_debug, flag, fmt, ## __VA_ARGS__)
26 
27 #define cluster_dbg_flow(fmt, ...)      cluster_dbg(MPP_CLUSTER_DBG_FLOW, fmt, ## __VA_ARGS__)
28 #define cluster_dbg_lock(fmt, ...)      cluster_dbg(MPP_CLUSTER_DBG_LOCK, fmt, ## __VA_ARGS__)
29 
30 RK_U32 mpp_cluster_debug = 0;
31 RK_U32 mpp_cluster_thd_cnt = 1;
32 
33 typedef struct MppNodeProc_s    MppNodeProc;
34 typedef struct MppNodeTask_s    MppNodeTask;
35 typedef struct MppNodeImpl_s    MppNodeImpl;
36 
37 typedef struct ClusterQueue_s   ClusterQueue;
38 typedef struct ClusterWorker_s  ClusterWorker;
39 typedef struct MppCluster_s     MppCluster;
40 
41 #define NODE_VALID              (0x00000001)
42 #define NODE_IDLE               (0x00000002)
43 #define NODE_SIGNAL             (0x00000004)
44 #define NODE_WAIT               (0x00000008)
45 #define NODE_RUN                (0x00000010)
46 
47 #define NODE_ACT_NONE           (0x00000000)
48 #define NODE_ACT_IDLE_TO_WAIT   (0x00000001)
49 #define NODE_ACT_RUN_TO_SIGNAL  (0x00000002)
50 
51 typedef enum MppWorkerState_e {
52     WORKER_IDLE,
53     WORKER_RUNNING,
54 
55     WORKER_STATE_BUTT,
56 } MppWorkerState;
57 
58 struct MppNodeProc_s {
59     TaskProc                proc;
60     void                    *param;
61 
62     /* timing statistic */
63     RK_U32                  run_count;
64     RK_S64                  run_time;
65 };
66 
67 struct MppNodeTask_s {
68     struct list_head        list_sched;
69     MppNodeImpl             *node;
70     const char              *node_name;
71 
72     /* lock ptr to cluster queue lock */
73     ClusterQueue            *queue;
74 
75     MppNodeProc             *proc;
76 };
77 
78 /* MppNode will be embeded in MppCtx */
79 struct MppNodeImpl_s {
80     char                    name[32];
81     /* list linked to scheduler */
82     RK_S32                  node_id;
83     RK_U32                  state;
84 
85     MppNodeProc             work;
86 
87     RK_U32                  priority;
88     RK_S32                  attached;
89     sem_t                   sem_detach;
90 
91     /* for cluster schedule */
92     MppNodeTask             task;
93 };
94 
95 struct ClusterQueue_s {
96     MppCluster              *cluster;
97 
98     pthread_mutex_t         lock;
99     struct list_head        list;
100     RK_S32                  count;
101 };
102 
103 struct ClusterWorker_s {
104     char                    name[32];
105     MppCluster              *cluster;
106     RK_S32                  worker_id;
107 
108     MppThread               *thd;
109     MppWorkerState          state;
110 
111     RK_S32                  batch_count;
112     RK_S32                  work_count;
113     struct list_head        list_task;
114 };
115 
116 struct MppCluster_s {
117     char                    name[16];
118     pid_t                   pid;
119     RK_S32                  client_type;
120     RK_S32                  node_id;
121     RK_S32                  worker_id;
122 
123     ClusterQueue            queue[MAX_PRIORITY];
124     RK_S32                  node_count;
125 
126     /* multi-worker info */
127     RK_S32                  worker_count;
128     ClusterWorker           *worker;
129     MppThreadFunc           worker_func;
130 };
131 
132 typedef struct MppClusterServer_s {
133     MppMutex                mutex;
134     MppCluster              *clusters[VPU_CLIENT_BUTT];
135 } MppClusterServer;
136 
137 static MppClusterServer *srv_cluster = NULL;
138 
139 static MppCluster *cluster_server_get(MppClientType client_type);
140 static MPP_RET cluster_server_put(MppClientType client_type);
141 
142 #define mpp_node_task_schedule(task) \
143     mpp_node_task_schedule_f(__FUNCTION__, task)
144 
145 #define mpp_node_task_schedule_from(caller, task) \
146     mpp_node_task_schedule_f(caller, task)
147 
148 #define cluster_queue_lock(queue)   cluster_queue_lock_f(__FUNCTION__, queue)
149 #define cluster_queue_unlock(queue) cluster_queue_unlock_f(__FUNCTION__, queue)
150 
151 #define get_srv_cluster_f() \
152     ({ \
153         MppClusterServer *__tmp; \
154         if (srv_cluster) { \
155             __tmp = srv_cluster; \
156         } else { \
157             mpp_cluster_srv_init(); \
158             __tmp = srv_cluster; \
159             if (!__tmp) \
160                 mpp_err("mpp cluster srv not init at %s\n", __FUNCTION__); \
161         } \
162         __tmp; \
163     })
164 
cluster_queue_lock_f(const char * caller,ClusterQueue * queue)165 static MPP_RET cluster_queue_lock_f(const char *caller, ClusterQueue *queue)
166 {
167     MppCluster *cluster = queue->cluster;
168     RK_S32 ret;
169 
170     cluster_dbg_lock("%s lock queue at %s start\n", cluster->name, caller);
171 
172     ret = pthread_mutex_lock(&queue->lock);
173 
174     cluster_dbg_lock("%s lock queue at %s ret %d \n", cluster->name, caller, ret);
175 
176     return (ret) ? MPP_NOK : MPP_OK;
177 }
178 
cluster_queue_unlock_f(const char * caller,ClusterQueue * queue)179 static MPP_RET cluster_queue_unlock_f(const char *caller, ClusterQueue *queue)
180 {
181     MppCluster *cluster = queue->cluster;
182     RK_S32 ret;
183 
184     cluster_dbg_lock("%s unlock queue at %s start\n", cluster->name, caller);
185 
186     ret = pthread_mutex_unlock(&queue->lock);
187 
188     cluster_dbg_lock("%s unlock queue at %s ret %d \n", cluster->name, caller, ret);
189 
190     return (ret) ? MPP_NOK : MPP_OK;
191 }
192 
193 void cluster_signal_f(const char *caller, MppCluster *p);
194 
mpp_cluster_queue_init(ClusterQueue * queue,MppCluster * cluster)195 MPP_RET mpp_cluster_queue_init(ClusterQueue *queue, MppCluster *cluster)
196 {
197     pthread_mutexattr_t attr;
198 
199     pthread_mutexattr_init(&attr);
200     pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
201     pthread_mutex_init(&queue->lock, &attr);
202     pthread_mutexattr_destroy(&attr);
203 
204     queue->cluster = cluster;
205     INIT_LIST_HEAD(&queue->list);
206     queue->count = 0;
207 
208     return MPP_OK;
209 }
210 
mpp_cluster_queue_deinit(ClusterQueue * queue)211 MPP_RET mpp_cluster_queue_deinit(ClusterQueue *queue)
212 {
213     mpp_assert(!queue->count);
214     mpp_assert(list_empty(&queue->list));
215 
216     pthread_mutex_destroy(&queue->lock);
217 
218     return MPP_OK;
219 }
220 
mpp_node_task_attach(MppNodeTask * task,MppNodeImpl * node,ClusterQueue * queue,MppNodeProc * proc)221 MPP_RET mpp_node_task_attach(MppNodeTask *task, MppNodeImpl *node,
222                              ClusterQueue *queue, MppNodeProc *proc)
223 {
224     INIT_LIST_HEAD(&task->list_sched);
225 
226     task->node = node;
227     task->node_name = node->name;
228 
229     task->queue = queue;
230     task->proc = proc;
231 
232     node->state = NODE_VALID | NODE_IDLE;
233     node->attached = 1;
234 
235     return MPP_OK;
236 }
237 
mpp_node_task_schedule_f(const char * caller,MppNodeTask * task)238 MPP_RET mpp_node_task_schedule_f(const char *caller, MppNodeTask *task)
239 {
240     ClusterQueue *queue = task->queue;
241     MppCluster *cluster = queue->cluster;
242     MppNodeImpl *node = task->node;
243     MppNodeProc *proc = task->proc;
244     const char *node_name = task->node_name;
245     RK_U32 new_st;
246     RK_U32 action = NODE_ACT_NONE;
247     bool ret = false;
248 
249     cluster_dbg_flow("%s sched from %s before [%d:%d] queue %d\n",
250                      node_name, caller, node->state, proc->run_count, queue->count);
251 
252     do {
253         RK_U32 old_st = node->state;
254 
255         action = NODE_ACT_NONE;
256         new_st = 0;
257 
258         if (old_st & NODE_WAIT) {
259             cluster_dbg_flow("%s sched task %x stay  wait\n", node_name, old_st);
260             break;
261         }
262 
263         if (old_st & NODE_IDLE) {
264             new_st = old_st ^ (NODE_IDLE | NODE_WAIT);
265             cluster_dbg_flow("%s sched task %x -> %x wait\n", node_name, old_st, new_st);
266             action = NODE_ACT_IDLE_TO_WAIT;
267         } else if (old_st & NODE_RUN) {
268             new_st = old_st | NODE_SIGNAL;
269             action = NODE_ACT_RUN_TO_SIGNAL;
270             cluster_dbg_flow("%s sched task %x -> %x signal\n", node_name, old_st, new_st);
271         } else {
272             cluster_dbg_flow("%s sched task %x unknow state %x\n", node_name, old_st);
273         }
274 
275         ret = MPP_BOOL_CAS(&node->state, old_st, new_st);
276         cluster_dbg_flow("%s sched task %x -> %x cas ret %d act %d\n",
277                          node_name, old_st, new_st, ret, action);
278     } while (!ret);
279 
280     switch (action) {
281     case NODE_ACT_IDLE_TO_WAIT : {
282         cluster_queue_lock(queue);
283         mpp_assert(list_empty(&task->list_sched));
284         list_add_tail(&task->list_sched, &queue->list);
285         queue->count++;
286         cluster_dbg_flow("%s sched task -> wq %s:%d\n", node_name, cluster->name, queue->count);
287         cluster_queue_unlock(queue);
288 
289         cluster_dbg_flow("%s sched signal from %s\n", node_name, caller);
290         cluster_signal_f(caller, cluster);
291     } break;
292     case NODE_ACT_RUN_TO_SIGNAL : {
293         cluster_dbg_flow("%s sched signal from %s\n", node_name, caller);
294         cluster_signal_f(caller, cluster);
295     } break;
296     }
297 
298     cluster_dbg_flow("%s sched from %s after  [%d:%d] queue %d\n",
299                      node_name, caller, node->state, proc->run_count, queue->count);
300 
301     return MPP_OK;
302 }
303 
mpp_node_task_detach(MppNodeTask * task)304 MPP_RET mpp_node_task_detach(MppNodeTask *task)
305 {
306     MppNodeImpl *node = task->node;
307     MPP_RET ret = MPP_OK;
308 
309     if (node->attached) {
310         const char *node_name = task->node_name;
311         MppNodeProc *proc = task->proc;
312 
313         MPP_FETCH_AND(&node->state, ~NODE_VALID);
314 
315         mpp_node_task_schedule(task);
316 
317         cluster_dbg_flow("%s state %x:%d wait detach start\n",
318                          node_name, node->state, proc->run_count);
319 
320         sem_wait(&node->sem_detach);
321         mpp_assert(node->attached == 0);
322 
323         cluster_dbg_flow("%s state %x:%d wait detach done\n",
324                          node_name, node->state, proc->run_count);
325     }
326 
327     return ret;
328 }
329 
mpp_node_init(MppNode * node)330 MPP_RET mpp_node_init(MppNode *node)
331 {
332     MppNodeImpl *p = mpp_calloc(MppNodeImpl, 1);
333     if (p)
334         sem_init(&p->sem_detach, 0, 0);
335 
336     *node = p;
337 
338     return p ? MPP_OK : MPP_NOK;
339 }
340 
mpp_node_deinit(MppNode node)341 MPP_RET mpp_node_deinit(MppNode node)
342 {
343     MppNodeImpl *p = (MppNodeImpl *)node;
344 
345     if (p) {
346         if (p->attached)
347             mpp_node_task_detach(&p->task);
348 
349         mpp_assert(p->attached == 0);
350 
351         sem_destroy(&p->sem_detach);
352 
353         mpp_free(p);
354     }
355 
356     return MPP_OK;
357 }
358 
mpp_node_set_func(MppNode node,TaskProc proc,void * param)359 MPP_RET mpp_node_set_func(MppNode node, TaskProc proc, void *param)
360 {
361     MppNodeImpl *p = (MppNodeImpl *)node;
362     if (!p)
363         return MPP_NOK;
364 
365     p->work.proc = proc;
366     p->work.param = param;
367 
368     return MPP_OK;
369 }
370 
cluster_worker_init(ClusterWorker * p,MppCluster * cluster)371 MPP_RET cluster_worker_init(ClusterWorker *p, MppCluster *cluster)
372 {
373     MppThread *thd = NULL;
374     MPP_RET ret = MPP_NOK;
375 
376     INIT_LIST_HEAD(&p->list_task);
377     p->worker_id = cluster->worker_id++;
378 
379     p->batch_count = 1;
380     p->work_count = 0;
381     p->cluster = cluster;
382     p->state = WORKER_IDLE;
383     snprintf(p->name, sizeof(p->name) - 1, "%d:W%d", cluster->pid, p->worker_id);
384     thd = mpp_thread_create(cluster->worker_func, p, p->name);
385     if (thd) {
386         p->thd = thd;
387         mpp_thread_start(thd);
388         ret = MPP_OK;
389     }
390 
391     return ret;
392 }
393 
cluster_worker_deinit(ClusterWorker * p)394 MPP_RET cluster_worker_deinit(ClusterWorker *p)
395 {
396     if (p->thd) {
397         mpp_thread_stop(p->thd);
398         mpp_thread_destroy(p->thd);
399         p->thd = NULL;
400     }
401 
402     mpp_assert(list_empty(&p->list_task));
403     mpp_assert(p->work_count == 0);
404 
405     p->batch_count = 0;
406     p->cluster = NULL;
407 
408     return MPP_OK;
409 }
410 
cluster_worker_get_task(ClusterWorker * p)411 RK_S32 cluster_worker_get_task(ClusterWorker *p)
412 {
413     MppCluster *cluster = p->cluster;
414     RK_S32 batch_count = p->batch_count;
415     RK_S32 count = 0;
416     RK_U32 new_st;
417     RK_U32 old_st;
418     bool ret;
419     RK_S32 i;
420 
421     cluster_dbg_flow("%s get %d task start\n", p->name, batch_count);
422 
423     for (i = 0; i < MAX_PRIORITY; i++) {
424         ClusterQueue *queue = &cluster->queue[i];
425         MppNodeTask *task = NULL;
426         MppNodeImpl *node = NULL;
427 
428         do {
429             cluster_queue_lock(queue);
430 
431             if (list_empty(&queue->list)) {
432                 mpp_assert(queue->count == 0);
433                 cluster_dbg_flow("%s get P%d task ret no task\n", p->name, i);
434                 cluster_queue_unlock(queue);
435                 break;
436             }
437 
438             mpp_assert(queue->count);
439             task = list_first_entry(&queue->list, MppNodeTask, list_sched);
440             list_del_init(&task->list_sched);
441             node = task->node;
442 
443             queue->count--;
444 
445             do {
446                 old_st = node->state;
447                 new_st = old_st ^ (NODE_WAIT | NODE_RUN);
448 
449                 mpp_assert(old_st & NODE_WAIT);
450                 ret = MPP_BOOL_CAS(&node->state, old_st, new_st);
451             } while (!ret);
452 
453             list_add_tail(&task->list_sched, &p->list_task);
454             p->work_count++;
455             count++;
456 
457             cluster_dbg_flow("%s get P%d %s -> rq %d\n", p->name, i, node->name, p->work_count);
458 
459             cluster_queue_unlock(queue);
460 
461             if (count >= batch_count)
462                 break;
463         } while (1);
464 
465         if (count >= batch_count)
466             break;
467     }
468 
469     cluster_dbg_flow("%s get %d task ret %d\n", p->name, batch_count, count);
470 
471     return count;
472 }
473 
cluster_worker_run_task(ClusterWorker * p)474 static void cluster_worker_run_task(ClusterWorker *p)
475 {
476     RK_U32 new_st;
477     RK_U32 old_st;
478     bool cas_ret;
479 
480     cluster_dbg_flow("%s run %d work start\n", p->name, p->work_count);
481 
482     while (!list_empty(&p->list_task)) {
483         MppNodeTask *task = list_first_entry(&p->list_task, MppNodeTask, list_sched);
484         MppNodeProc *proc = task->proc;
485         MppNodeImpl *node = task->node;
486         RK_S64 time_start;
487         RK_S64 time_end;
488         RK_U32 state;
489         MPP_RET proc_ret;
490 
491         /* check trigger for re-add task */
492         cluster_dbg_flow("%s run %s start atate %d\n", p->name, task->node_name, node->state);
493         mpp_assert(node->state & NODE_RUN);
494         if (!(node->state & NODE_RUN))
495             mpp_err_f("%s run state check %x is invalid on run", p->name, node->state);
496 
497         time_start = mpp_time();
498         proc_ret = proc->proc(proc->param);
499         time_end = mpp_time();
500 
501         cluster_dbg_flow("%s run %s ret %d\n", p->name, task->node_name, proc_ret);
502         proc->run_time += time_end - time_start;
503         proc->run_count++;
504 
505         state = node->state;
506         if (!(state & NODE_VALID)) {
507             cluster_dbg_flow("%s run found destroy\n", p->name);
508             list_del_init(&task->list_sched);
509             node->attached = 0;
510 
511             sem_post(&node->sem_detach);
512             cluster_dbg_flow("%s run sem post done\n", p->name);
513         } else if (state & NODE_SIGNAL) {
514             ClusterQueue *queue = task->queue;
515 
516             list_del_init(&task->list_sched);
517 
518             do {
519                 old_st = state;
520                 // NOTE: clear NODE_RUN and NODE_SIGNAL, set NODE_WAIT
521                 new_st = old_st ^ (NODE_SIGNAL | NODE_WAIT | NODE_RUN);
522                 cas_ret = MPP_BOOL_CAS(&node->state, old_st, new_st);
523             } while (!cas_ret);
524 
525             cluster_dbg_flow("%s run state %x -> %x signal -> wait\n", p->name, old_st, new_st);
526 
527             cluster_queue_lock(queue);
528             list_add_tail(&task->list_sched, &queue->list);
529             queue->count++;
530             cluster_queue_unlock(queue);
531         } else {
532             list_del_init(&task->list_sched);
533             do {
534                 old_st = node->state;
535                 new_st = old_st ^ (NODE_IDLE | NODE_RUN);
536 
537                 cas_ret = MPP_BOOL_CAS(&node->state, old_st, new_st);
538             } while (!cas_ret);
539             mpp_assert(node->state & NODE_IDLE);
540             mpp_assert(!(node->state & NODE_RUN));
541 
542             cluster_dbg_flow("%s run state %x -> %x run -> idle\n", p->name, old_st, new_st);
543         }
544 
545         p->work_count--;
546     }
547 
548     mpp_assert(p->work_count == 0);
549 
550     cluster_dbg_flow("%s run all done\n", p->name);
551 }
552 
cluster_worker(void * data)553 static void *cluster_worker(void *data)
554 {
555     ClusterWorker *p = (ClusterWorker *)data;
556     MppThread *thd = p->thd;
557 
558     while (1) {
559         {
560             RK_S32 task_count = 0;
561 
562             cluster_dbg_lock("%s lock start\n", p->name);
563             mpp_thread_lock(thd, THREAD_WORK);
564             cluster_dbg_lock("%s lock done\n", p->name);
565 
566             if (MPP_THREAD_RUNNING != mpp_thread_get_status(thd, THREAD_WORK)) {
567                 mpp_thread_unlock(thd, THREAD_WORK);
568                 break;
569             }
570 
571             task_count = cluster_worker_get_task(p);
572             if (!task_count) {
573                 p->state = WORKER_IDLE;
574                 mpp_thread_wait(thd, THREAD_WORK);
575                 p->state = WORKER_RUNNING;
576             }
577             mpp_thread_unlock(thd, THREAD_WORK);
578         }
579 
580         cluster_worker_run_task(p);
581     }
582 
583     return NULL;
584 }
585 
cluster_signal_f(const char * caller,MppCluster * p)586 void cluster_signal_f(const char *caller, MppCluster *p)
587 {
588     RK_S32 i;
589 
590     cluster_dbg_flow("%s signal from %s\n", p->name, caller);
591 
592     for (i = 0; i < p->worker_count; i++) {
593         ClusterWorker *worker = &p->worker[i];
594         MppThread *thd = worker->thd;
595 
596         mpp_thread_lock(thd, THREAD_WORK);
597 
598         if (worker->state == WORKER_IDLE) {
599             mpp_thread_signal(thd, THREAD_WORK);
600             cluster_dbg_flow("%s signal\n", p->name);
601             mpp_thread_unlock(thd, THREAD_WORK);
602             break;
603         }
604 
605         mpp_thread_unlock(thd, THREAD_WORK);
606     }
607 }
608 
mpp_cluster_srv_init(void)609 static void mpp_cluster_srv_init(void)
610 {
611     MppClusterServer *srv = srv_cluster;
612 
613     mpp_env_get_u32("mpp_cluster_debug", &mpp_cluster_debug, 0);
614     mpp_env_get_u32("mpp_cluster_thd_cnt", &mpp_cluster_thd_cnt, 1);
615 
616     if (srv)
617         return;
618 
619     srv = mpp_calloc(MppClusterServer, 1);
620     if (!srv) {
621         mpp_err_f("failed to allocate cluster server\n");
622         return;
623     }
624 
625     memset(srv->clusters, 0, sizeof(srv->clusters));
626     mpp_mutex_init(&srv->mutex);
627 
628     srv_cluster = srv;
629 }
630 
mpp_cluster_srv_deinit(void)631 static void mpp_cluster_srv_deinit(void)
632 {
633     MppClusterServer *srv = srv_cluster;
634     RK_S32 i;
635 
636     if (!srv)
637         return;
638 
639     for (i = 0; i < VPU_CLIENT_BUTT; i++)
640         cluster_server_put((MppClientType)i);
641 
642     mpp_mutex_destroy(&srv->mutex);
643     MPP_FREE(srv_cluster);
644 }
645 
MPP_SINGLETON(MPP_SGLN_CLUSTER,mpp_cluster,mpp_cluster_srv_init,mpp_cluster_srv_deinit)646 MPP_SINGLETON(MPP_SGLN_CLUSTER, mpp_cluster, mpp_cluster_srv_init, mpp_cluster_srv_deinit)
647 
648 static MppCluster *cluster_server_get(MppClientType client_type)
649 {
650     MppClusterServer *srv = get_srv_cluster_f();
651     RK_S32 i;
652     MppCluster *p = NULL;
653 
654     if (!srv || client_type >= VPU_CLIENT_BUTT)
655         goto done;
656 
657     {
658         mpp_mutex_lock(&srv->mutex);
659 
660         p = srv->clusters[client_type];
661         if (p) {
662             mpp_mutex_unlock(&srv->mutex);
663             goto done;
664         }
665 
666         p = mpp_malloc(MppCluster, 1);
667         if (p) {
668             for (i = 0; i < MAX_PRIORITY; i++)
669                 mpp_cluster_queue_init(&p->queue[i], p);
670 
671             p->pid  = getpid();
672             p->client_type = client_type;
673             snprintf(p->name, sizeof(p->name) - 1, "%d:%d", p->pid, client_type);
674             p->node_id = 0;
675             p->worker_id = 0;
676             p->worker_func = cluster_worker;
677             p->worker_count = mpp_cluster_thd_cnt;
678 
679             mpp_assert(p->worker_count > 0);
680 
681             p->worker = mpp_malloc(ClusterWorker, p->worker_count);
682 
683             for (i = 0; i < p->worker_count; i++)
684                 cluster_worker_init(&p->worker[i], p);
685 
686             srv->clusters[client_type] = p;
687             cluster_dbg_flow("%s created\n", p->name);
688         }
689 
690         mpp_mutex_unlock(&srv->mutex);
691     }
692 
693 done:
694     if (p)
695         cluster_dbg_flow("%s get\n", p->name);
696     else
697         cluster_dbg_flow("%d get cluster %d failed\n", getpid(), client_type);
698 
699     return p;
700 }
701 
cluster_server_put(MppClientType client_type)702 static MPP_RET cluster_server_put(MppClientType client_type)
703 {
704     MppClusterServer *srv = get_srv_cluster_f();
705     MppCluster *p;
706     RK_S32 i;
707 
708     if (!srv || client_type >= VPU_CLIENT_BUTT)
709         return MPP_NOK;
710 
711     mpp_mutex_lock(&srv->mutex);
712 
713     p = srv->clusters[client_type];
714     if (!p) {
715         mpp_mutex_unlock(&srv->mutex);
716         return MPP_NOK;
717     }
718 
719     for (i = 0; i < p->worker_count; i++)
720         cluster_worker_deinit(&p->worker[i]);
721 
722     for (i = 0; i < MAX_PRIORITY; i++)
723         mpp_cluster_queue_deinit(&p->queue[i]);
724 
725     cluster_dbg_flow("put %s\n", p->name);
726 
727     MPP_FREE(p->worker);
728     mpp_free(p);
729     srv->clusters[client_type] = NULL;
730 
731     mpp_mutex_unlock(&srv->mutex);
732 
733     return MPP_OK;
734 }
735 
mpp_node_attach(MppNode node,MppClientType type)736 MPP_RET mpp_node_attach(MppNode node, MppClientType type)
737 {
738     MppNodeImpl *impl = (MppNodeImpl *)node;
739     MppCluster *p = cluster_server_get(type);
740     RK_U32 priority = impl->priority;
741     ClusterQueue *queue = &p->queue[priority];
742 
743     mpp_assert(priority < MAX_PRIORITY);
744     mpp_assert(p);
745 
746     impl->node_id = MPP_FETCH_ADD(&p->node_id, 1);
747 
748     snprintf(impl->name, sizeof(impl->name) - 1, "%s:%d", p->name, impl->node_id);
749 
750     mpp_node_task_attach(&impl->task, impl, queue, &impl->work);
751 
752     MPP_FETCH_ADD(&p->node_count, 1);
753 
754     cluster_dbg_flow("%s:%d attached %d\n", p->name, impl->node_id, p->node_count);
755 
756     /* attach and run once first */
757     mpp_node_task_schedule(&impl->task);
758     cluster_dbg_flow("%s trigger signal from %s\n", impl->name, __FUNCTION__);
759 
760     return MPP_OK;
761 }
762 
mpp_node_detach(MppNode node)763 MPP_RET mpp_node_detach(MppNode node)
764 {
765     MppNodeImpl *impl = (MppNodeImpl *)node;
766 
767     mpp_node_task_detach(&impl->task);
768     cluster_dbg_flow("%s detached\n", impl->name);
769 
770     return MPP_OK;
771 }
772 
mpp_node_trigger_f(const char * caller,MppNode node,RK_S32 trigger)773 MPP_RET mpp_node_trigger_f(const char *caller, MppNode node, RK_S32 trigger)
774 {
775     if (trigger) {
776         MppNodeImpl *impl = (MppNodeImpl *)node;
777 
778         mpp_node_task_schedule_from(caller, &impl->task);
779     }
780 
781     return MPP_OK;
782 }
783