xref: /rockchip-linux_mpp/mpp/base/mpp_cluster.c (revision 437bfbeb9567cca9cd9080e3f6954aa9d6a94f18)
1*437bfbebSnyanmisaka /* SPDX-License-Identifier: Apache-2.0 OR MIT */
2*437bfbebSnyanmisaka /*
3*437bfbebSnyanmisaka  * Copyright (c) 2021 Rockchip Electronics Co., Ltd.
4*437bfbebSnyanmisaka  */
5*437bfbebSnyanmisaka 
6*437bfbebSnyanmisaka #define  MODULE_TAG "mpp_cluster"
7*437bfbebSnyanmisaka 
8*437bfbebSnyanmisaka #include <string.h>
9*437bfbebSnyanmisaka 
10*437bfbebSnyanmisaka #include "mpp_mem.h"
11*437bfbebSnyanmisaka #include "mpp_env.h"
12*437bfbebSnyanmisaka #include "mpp_lock.h"
13*437bfbebSnyanmisaka #include "mpp_time.h"
14*437bfbebSnyanmisaka #include "mpp_debug.h"
15*437bfbebSnyanmisaka #include "mpp_common.h"
16*437bfbebSnyanmisaka #include "mpp_singleton.h"
17*437bfbebSnyanmisaka 
18*437bfbebSnyanmisaka #include "mpp_cluster.h"
19*437bfbebSnyanmisaka #include "mpp_dev_defs.h"
20*437bfbebSnyanmisaka 
21*437bfbebSnyanmisaka #define MPP_CLUSTER_DBG_FLOW            (0x00000001)
22*437bfbebSnyanmisaka #define MPP_CLUSTER_DBG_LOCK            (0x00000002)
23*437bfbebSnyanmisaka 
24*437bfbebSnyanmisaka #define cluster_dbg(flag, fmt, ...)     _mpp_dbg(mpp_cluster_debug, flag, fmt, ## __VA_ARGS__)
25*437bfbebSnyanmisaka #define cluster_dbg_f(flag, fmt, ...)   _mpp_dbg_f(mpp_cluster_debug, flag, fmt, ## __VA_ARGS__)
26*437bfbebSnyanmisaka 
27*437bfbebSnyanmisaka #define cluster_dbg_flow(fmt, ...)      cluster_dbg(MPP_CLUSTER_DBG_FLOW, fmt, ## __VA_ARGS__)
28*437bfbebSnyanmisaka #define cluster_dbg_lock(fmt, ...)      cluster_dbg(MPP_CLUSTER_DBG_LOCK, fmt, ## __VA_ARGS__)
29*437bfbebSnyanmisaka 
30*437bfbebSnyanmisaka RK_U32 mpp_cluster_debug = 0;
31*437bfbebSnyanmisaka RK_U32 mpp_cluster_thd_cnt = 1;
32*437bfbebSnyanmisaka 
33*437bfbebSnyanmisaka typedef struct MppNodeProc_s    MppNodeProc;
34*437bfbebSnyanmisaka typedef struct MppNodeTask_s    MppNodeTask;
35*437bfbebSnyanmisaka typedef struct MppNodeImpl_s    MppNodeImpl;
36*437bfbebSnyanmisaka 
37*437bfbebSnyanmisaka typedef struct ClusterQueue_s   ClusterQueue;
38*437bfbebSnyanmisaka typedef struct ClusterWorker_s  ClusterWorker;
39*437bfbebSnyanmisaka typedef struct MppCluster_s     MppCluster;
40*437bfbebSnyanmisaka 
41*437bfbebSnyanmisaka #define NODE_VALID              (0x00000001)
42*437bfbebSnyanmisaka #define NODE_IDLE               (0x00000002)
43*437bfbebSnyanmisaka #define NODE_SIGNAL             (0x00000004)
44*437bfbebSnyanmisaka #define NODE_WAIT               (0x00000008)
45*437bfbebSnyanmisaka #define NODE_RUN                (0x00000010)
46*437bfbebSnyanmisaka 
47*437bfbebSnyanmisaka #define NODE_ACT_NONE           (0x00000000)
48*437bfbebSnyanmisaka #define NODE_ACT_IDLE_TO_WAIT   (0x00000001)
49*437bfbebSnyanmisaka #define NODE_ACT_RUN_TO_SIGNAL  (0x00000002)
50*437bfbebSnyanmisaka 
51*437bfbebSnyanmisaka typedef enum MppWorkerState_e {
52*437bfbebSnyanmisaka     WORKER_IDLE,
53*437bfbebSnyanmisaka     WORKER_RUNNING,
54*437bfbebSnyanmisaka 
55*437bfbebSnyanmisaka     WORKER_STATE_BUTT,
56*437bfbebSnyanmisaka } MppWorkerState;
57*437bfbebSnyanmisaka 
58*437bfbebSnyanmisaka struct MppNodeProc_s {
59*437bfbebSnyanmisaka     TaskProc                proc;
60*437bfbebSnyanmisaka     void                    *param;
61*437bfbebSnyanmisaka 
62*437bfbebSnyanmisaka     /* timing statistic */
63*437bfbebSnyanmisaka     RK_U32                  run_count;
64*437bfbebSnyanmisaka     RK_S64                  run_time;
65*437bfbebSnyanmisaka };
66*437bfbebSnyanmisaka 
67*437bfbebSnyanmisaka struct MppNodeTask_s {
68*437bfbebSnyanmisaka     struct list_head        list_sched;
69*437bfbebSnyanmisaka     MppNodeImpl             *node;
70*437bfbebSnyanmisaka     const char              *node_name;
71*437bfbebSnyanmisaka 
72*437bfbebSnyanmisaka     /* lock ptr to cluster queue lock */
73*437bfbebSnyanmisaka     ClusterQueue            *queue;
74*437bfbebSnyanmisaka 
75*437bfbebSnyanmisaka     MppNodeProc             *proc;
76*437bfbebSnyanmisaka };
77*437bfbebSnyanmisaka 
78*437bfbebSnyanmisaka /* MppNode will be embeded in MppCtx */
79*437bfbebSnyanmisaka struct MppNodeImpl_s {
80*437bfbebSnyanmisaka     char                    name[32];
81*437bfbebSnyanmisaka     /* list linked to scheduler */
82*437bfbebSnyanmisaka     RK_S32                  node_id;
83*437bfbebSnyanmisaka     RK_U32                  state;
84*437bfbebSnyanmisaka 
85*437bfbebSnyanmisaka     MppNodeProc             work;
86*437bfbebSnyanmisaka 
87*437bfbebSnyanmisaka     RK_U32                  priority;
88*437bfbebSnyanmisaka     RK_S32                  attached;
89*437bfbebSnyanmisaka     sem_t                   sem_detach;
90*437bfbebSnyanmisaka 
91*437bfbebSnyanmisaka     /* for cluster schedule */
92*437bfbebSnyanmisaka     MppNodeTask             task;
93*437bfbebSnyanmisaka };
94*437bfbebSnyanmisaka 
95*437bfbebSnyanmisaka struct ClusterQueue_s {
96*437bfbebSnyanmisaka     MppCluster              *cluster;
97*437bfbebSnyanmisaka 
98*437bfbebSnyanmisaka     pthread_mutex_t         lock;
99*437bfbebSnyanmisaka     struct list_head        list;
100*437bfbebSnyanmisaka     RK_S32                  count;
101*437bfbebSnyanmisaka };
102*437bfbebSnyanmisaka 
103*437bfbebSnyanmisaka struct ClusterWorker_s {
104*437bfbebSnyanmisaka     char                    name[32];
105*437bfbebSnyanmisaka     MppCluster              *cluster;
106*437bfbebSnyanmisaka     RK_S32                  worker_id;
107*437bfbebSnyanmisaka 
108*437bfbebSnyanmisaka     MppThread               *thd;
109*437bfbebSnyanmisaka     MppWorkerState          state;
110*437bfbebSnyanmisaka 
111*437bfbebSnyanmisaka     RK_S32                  batch_count;
112*437bfbebSnyanmisaka     RK_S32                  work_count;
113*437bfbebSnyanmisaka     struct list_head        list_task;
114*437bfbebSnyanmisaka };
115*437bfbebSnyanmisaka 
116*437bfbebSnyanmisaka struct MppCluster_s {
117*437bfbebSnyanmisaka     char                    name[16];
118*437bfbebSnyanmisaka     pid_t                   pid;
119*437bfbebSnyanmisaka     RK_S32                  client_type;
120*437bfbebSnyanmisaka     RK_S32                  node_id;
121*437bfbebSnyanmisaka     RK_S32                  worker_id;
122*437bfbebSnyanmisaka 
123*437bfbebSnyanmisaka     ClusterQueue            queue[MAX_PRIORITY];
124*437bfbebSnyanmisaka     RK_S32                  node_count;
125*437bfbebSnyanmisaka 
126*437bfbebSnyanmisaka     /* multi-worker info */
127*437bfbebSnyanmisaka     RK_S32                  worker_count;
128*437bfbebSnyanmisaka     ClusterWorker           *worker;
129*437bfbebSnyanmisaka     MppThreadFunc           worker_func;
130*437bfbebSnyanmisaka };
131*437bfbebSnyanmisaka 
132*437bfbebSnyanmisaka typedef struct MppClusterServer_s {
133*437bfbebSnyanmisaka     MppMutex                mutex;
134*437bfbebSnyanmisaka     MppCluster              *clusters[VPU_CLIENT_BUTT];
135*437bfbebSnyanmisaka } MppClusterServer;
136*437bfbebSnyanmisaka 
137*437bfbebSnyanmisaka static MppClusterServer *srv_cluster = NULL;
138*437bfbebSnyanmisaka 
139*437bfbebSnyanmisaka static MppCluster *cluster_server_get(MppClientType client_type);
140*437bfbebSnyanmisaka static MPP_RET cluster_server_put(MppClientType client_type);
141*437bfbebSnyanmisaka 
142*437bfbebSnyanmisaka #define mpp_node_task_schedule(task) \
143*437bfbebSnyanmisaka     mpp_node_task_schedule_f(__FUNCTION__, task)
144*437bfbebSnyanmisaka 
145*437bfbebSnyanmisaka #define mpp_node_task_schedule_from(caller, task) \
146*437bfbebSnyanmisaka     mpp_node_task_schedule_f(caller, task)
147*437bfbebSnyanmisaka 
148*437bfbebSnyanmisaka #define cluster_queue_lock(queue)   cluster_queue_lock_f(__FUNCTION__, queue)
149*437bfbebSnyanmisaka #define cluster_queue_unlock(queue) cluster_queue_unlock_f(__FUNCTION__, queue)
150*437bfbebSnyanmisaka 
151*437bfbebSnyanmisaka #define get_srv_cluster_f() \
152*437bfbebSnyanmisaka     ({ \
153*437bfbebSnyanmisaka         MppClusterServer *__tmp; \
154*437bfbebSnyanmisaka         if (srv_cluster) { \
155*437bfbebSnyanmisaka             __tmp = srv_cluster; \
156*437bfbebSnyanmisaka         } else { \
157*437bfbebSnyanmisaka             mpp_cluster_srv_init(); \
158*437bfbebSnyanmisaka             __tmp = srv_cluster; \
159*437bfbebSnyanmisaka             if (!__tmp) \
160*437bfbebSnyanmisaka                 mpp_err("mpp cluster srv not init at %s\n", __FUNCTION__); \
161*437bfbebSnyanmisaka         } \
162*437bfbebSnyanmisaka         __tmp; \
163*437bfbebSnyanmisaka     })
164*437bfbebSnyanmisaka 
cluster_queue_lock_f(const char * caller,ClusterQueue * queue)165*437bfbebSnyanmisaka static MPP_RET cluster_queue_lock_f(const char *caller, ClusterQueue *queue)
166*437bfbebSnyanmisaka {
167*437bfbebSnyanmisaka     MppCluster *cluster = queue->cluster;
168*437bfbebSnyanmisaka     RK_S32 ret;
169*437bfbebSnyanmisaka 
170*437bfbebSnyanmisaka     cluster_dbg_lock("%s lock queue at %s start\n", cluster->name, caller);
171*437bfbebSnyanmisaka 
172*437bfbebSnyanmisaka     ret = pthread_mutex_lock(&queue->lock);
173*437bfbebSnyanmisaka 
174*437bfbebSnyanmisaka     cluster_dbg_lock("%s lock queue at %s ret %d \n", cluster->name, caller, ret);
175*437bfbebSnyanmisaka 
176*437bfbebSnyanmisaka     return (ret) ? MPP_NOK : MPP_OK;
177*437bfbebSnyanmisaka }
178*437bfbebSnyanmisaka 
cluster_queue_unlock_f(const char * caller,ClusterQueue * queue)179*437bfbebSnyanmisaka static MPP_RET cluster_queue_unlock_f(const char *caller, ClusterQueue *queue)
180*437bfbebSnyanmisaka {
181*437bfbebSnyanmisaka     MppCluster *cluster = queue->cluster;
182*437bfbebSnyanmisaka     RK_S32 ret;
183*437bfbebSnyanmisaka 
184*437bfbebSnyanmisaka     cluster_dbg_lock("%s unlock queue at %s start\n", cluster->name, caller);
185*437bfbebSnyanmisaka 
186*437bfbebSnyanmisaka     ret = pthread_mutex_unlock(&queue->lock);
187*437bfbebSnyanmisaka 
188*437bfbebSnyanmisaka     cluster_dbg_lock("%s unlock queue at %s ret %d \n", cluster->name, caller, ret);
189*437bfbebSnyanmisaka 
190*437bfbebSnyanmisaka     return (ret) ? MPP_NOK : MPP_OK;
191*437bfbebSnyanmisaka }
192*437bfbebSnyanmisaka 
193*437bfbebSnyanmisaka void cluster_signal_f(const char *caller, MppCluster *p);
194*437bfbebSnyanmisaka 
mpp_cluster_queue_init(ClusterQueue * queue,MppCluster * cluster)195*437bfbebSnyanmisaka MPP_RET mpp_cluster_queue_init(ClusterQueue *queue, MppCluster *cluster)
196*437bfbebSnyanmisaka {
197*437bfbebSnyanmisaka     pthread_mutexattr_t attr;
198*437bfbebSnyanmisaka 
199*437bfbebSnyanmisaka     pthread_mutexattr_init(&attr);
200*437bfbebSnyanmisaka     pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
201*437bfbebSnyanmisaka     pthread_mutex_init(&queue->lock, &attr);
202*437bfbebSnyanmisaka     pthread_mutexattr_destroy(&attr);
203*437bfbebSnyanmisaka 
204*437bfbebSnyanmisaka     queue->cluster = cluster;
205*437bfbebSnyanmisaka     INIT_LIST_HEAD(&queue->list);
206*437bfbebSnyanmisaka     queue->count = 0;
207*437bfbebSnyanmisaka 
208*437bfbebSnyanmisaka     return MPP_OK;
209*437bfbebSnyanmisaka }
210*437bfbebSnyanmisaka 
mpp_cluster_queue_deinit(ClusterQueue * queue)211*437bfbebSnyanmisaka MPP_RET mpp_cluster_queue_deinit(ClusterQueue *queue)
212*437bfbebSnyanmisaka {
213*437bfbebSnyanmisaka     mpp_assert(!queue->count);
214*437bfbebSnyanmisaka     mpp_assert(list_empty(&queue->list));
215*437bfbebSnyanmisaka 
216*437bfbebSnyanmisaka     pthread_mutex_destroy(&queue->lock);
217*437bfbebSnyanmisaka 
218*437bfbebSnyanmisaka     return MPP_OK;
219*437bfbebSnyanmisaka }
220*437bfbebSnyanmisaka 
mpp_node_task_attach(MppNodeTask * task,MppNodeImpl * node,ClusterQueue * queue,MppNodeProc * proc)221*437bfbebSnyanmisaka MPP_RET mpp_node_task_attach(MppNodeTask *task, MppNodeImpl *node,
222*437bfbebSnyanmisaka                              ClusterQueue *queue, MppNodeProc *proc)
223*437bfbebSnyanmisaka {
224*437bfbebSnyanmisaka     INIT_LIST_HEAD(&task->list_sched);
225*437bfbebSnyanmisaka 
226*437bfbebSnyanmisaka     task->node = node;
227*437bfbebSnyanmisaka     task->node_name = node->name;
228*437bfbebSnyanmisaka 
229*437bfbebSnyanmisaka     task->queue = queue;
230*437bfbebSnyanmisaka     task->proc = proc;
231*437bfbebSnyanmisaka 
232*437bfbebSnyanmisaka     node->state = NODE_VALID | NODE_IDLE;
233*437bfbebSnyanmisaka     node->attached = 1;
234*437bfbebSnyanmisaka 
235*437bfbebSnyanmisaka     return MPP_OK;
236*437bfbebSnyanmisaka }
237*437bfbebSnyanmisaka 
mpp_node_task_schedule_f(const char * caller,MppNodeTask * task)238*437bfbebSnyanmisaka MPP_RET mpp_node_task_schedule_f(const char *caller, MppNodeTask *task)
239*437bfbebSnyanmisaka {
240*437bfbebSnyanmisaka     ClusterQueue *queue = task->queue;
241*437bfbebSnyanmisaka     MppCluster *cluster = queue->cluster;
242*437bfbebSnyanmisaka     MppNodeImpl *node = task->node;
243*437bfbebSnyanmisaka     MppNodeProc *proc = task->proc;
244*437bfbebSnyanmisaka     const char *node_name = task->node_name;
245*437bfbebSnyanmisaka     RK_U32 new_st;
246*437bfbebSnyanmisaka     RK_U32 action = NODE_ACT_NONE;
247*437bfbebSnyanmisaka     bool ret = false;
248*437bfbebSnyanmisaka 
249*437bfbebSnyanmisaka     cluster_dbg_flow("%s sched from %s before [%d:%d] queue %d\n",
250*437bfbebSnyanmisaka                      node_name, caller, node->state, proc->run_count, queue->count);
251*437bfbebSnyanmisaka 
252*437bfbebSnyanmisaka     do {
253*437bfbebSnyanmisaka         RK_U32 old_st = node->state;
254*437bfbebSnyanmisaka 
255*437bfbebSnyanmisaka         action = NODE_ACT_NONE;
256*437bfbebSnyanmisaka         new_st = 0;
257*437bfbebSnyanmisaka 
258*437bfbebSnyanmisaka         if (old_st & NODE_WAIT) {
259*437bfbebSnyanmisaka             cluster_dbg_flow("%s sched task %x stay  wait\n", node_name, old_st);
260*437bfbebSnyanmisaka             break;
261*437bfbebSnyanmisaka         }
262*437bfbebSnyanmisaka 
263*437bfbebSnyanmisaka         if (old_st & NODE_IDLE) {
264*437bfbebSnyanmisaka             new_st = old_st ^ (NODE_IDLE | NODE_WAIT);
265*437bfbebSnyanmisaka             cluster_dbg_flow("%s sched task %x -> %x wait\n", node_name, old_st, new_st);
266*437bfbebSnyanmisaka             action = NODE_ACT_IDLE_TO_WAIT;
267*437bfbebSnyanmisaka         } else if (old_st & NODE_RUN) {
268*437bfbebSnyanmisaka             new_st = old_st | NODE_SIGNAL;
269*437bfbebSnyanmisaka             action = NODE_ACT_RUN_TO_SIGNAL;
270*437bfbebSnyanmisaka             cluster_dbg_flow("%s sched task %x -> %x signal\n", node_name, old_st, new_st);
271*437bfbebSnyanmisaka         } else {
272*437bfbebSnyanmisaka             cluster_dbg_flow("%s sched task %x unknow state %x\n", node_name, old_st);
273*437bfbebSnyanmisaka         }
274*437bfbebSnyanmisaka 
275*437bfbebSnyanmisaka         ret = MPP_BOOL_CAS(&node->state, old_st, new_st);
276*437bfbebSnyanmisaka         cluster_dbg_flow("%s sched task %x -> %x cas ret %d act %d\n",
277*437bfbebSnyanmisaka                          node_name, old_st, new_st, ret, action);
278*437bfbebSnyanmisaka     } while (!ret);
279*437bfbebSnyanmisaka 
280*437bfbebSnyanmisaka     switch (action) {
281*437bfbebSnyanmisaka     case NODE_ACT_IDLE_TO_WAIT : {
282*437bfbebSnyanmisaka         cluster_queue_lock(queue);
283*437bfbebSnyanmisaka         mpp_assert(list_empty(&task->list_sched));
284*437bfbebSnyanmisaka         list_add_tail(&task->list_sched, &queue->list);
285*437bfbebSnyanmisaka         queue->count++;
286*437bfbebSnyanmisaka         cluster_dbg_flow("%s sched task -> wq %s:%d\n", node_name, cluster->name, queue->count);
287*437bfbebSnyanmisaka         cluster_queue_unlock(queue);
288*437bfbebSnyanmisaka 
289*437bfbebSnyanmisaka         cluster_dbg_flow("%s sched signal from %s\n", node_name, caller);
290*437bfbebSnyanmisaka         cluster_signal_f(caller, cluster);
291*437bfbebSnyanmisaka     } break;
292*437bfbebSnyanmisaka     case NODE_ACT_RUN_TO_SIGNAL : {
293*437bfbebSnyanmisaka         cluster_dbg_flow("%s sched signal from %s\n", node_name, caller);
294*437bfbebSnyanmisaka         cluster_signal_f(caller, cluster);
295*437bfbebSnyanmisaka     } break;
296*437bfbebSnyanmisaka     }
297*437bfbebSnyanmisaka 
298*437bfbebSnyanmisaka     cluster_dbg_flow("%s sched from %s after  [%d:%d] queue %d\n",
299*437bfbebSnyanmisaka                      node_name, caller, node->state, proc->run_count, queue->count);
300*437bfbebSnyanmisaka 
301*437bfbebSnyanmisaka     return MPP_OK;
302*437bfbebSnyanmisaka }
303*437bfbebSnyanmisaka 
mpp_node_task_detach(MppNodeTask * task)304*437bfbebSnyanmisaka MPP_RET mpp_node_task_detach(MppNodeTask *task)
305*437bfbebSnyanmisaka {
306*437bfbebSnyanmisaka     MppNodeImpl *node = task->node;
307*437bfbebSnyanmisaka     MPP_RET ret = MPP_OK;
308*437bfbebSnyanmisaka 
309*437bfbebSnyanmisaka     if (node->attached) {
310*437bfbebSnyanmisaka         const char *node_name = task->node_name;
311*437bfbebSnyanmisaka         MppNodeProc *proc = task->proc;
312*437bfbebSnyanmisaka 
313*437bfbebSnyanmisaka         MPP_FETCH_AND(&node->state, ~NODE_VALID);
314*437bfbebSnyanmisaka 
315*437bfbebSnyanmisaka         mpp_node_task_schedule(task);
316*437bfbebSnyanmisaka 
317*437bfbebSnyanmisaka         cluster_dbg_flow("%s state %x:%d wait detach start\n",
318*437bfbebSnyanmisaka                          node_name, node->state, proc->run_count);
319*437bfbebSnyanmisaka 
320*437bfbebSnyanmisaka         sem_wait(&node->sem_detach);
321*437bfbebSnyanmisaka         mpp_assert(node->attached == 0);
322*437bfbebSnyanmisaka 
323*437bfbebSnyanmisaka         cluster_dbg_flow("%s state %x:%d wait detach done\n",
324*437bfbebSnyanmisaka                          node_name, node->state, proc->run_count);
325*437bfbebSnyanmisaka     }
326*437bfbebSnyanmisaka 
327*437bfbebSnyanmisaka     return ret;
328*437bfbebSnyanmisaka }
329*437bfbebSnyanmisaka 
mpp_node_init(MppNode * node)330*437bfbebSnyanmisaka MPP_RET mpp_node_init(MppNode *node)
331*437bfbebSnyanmisaka {
332*437bfbebSnyanmisaka     MppNodeImpl *p = mpp_calloc(MppNodeImpl, 1);
333*437bfbebSnyanmisaka     if (p)
334*437bfbebSnyanmisaka         sem_init(&p->sem_detach, 0, 0);
335*437bfbebSnyanmisaka 
336*437bfbebSnyanmisaka     *node = p;
337*437bfbebSnyanmisaka 
338*437bfbebSnyanmisaka     return p ? MPP_OK : MPP_NOK;
339*437bfbebSnyanmisaka }
340*437bfbebSnyanmisaka 
mpp_node_deinit(MppNode node)341*437bfbebSnyanmisaka MPP_RET mpp_node_deinit(MppNode node)
342*437bfbebSnyanmisaka {
343*437bfbebSnyanmisaka     MppNodeImpl *p = (MppNodeImpl *)node;
344*437bfbebSnyanmisaka 
345*437bfbebSnyanmisaka     if (p) {
346*437bfbebSnyanmisaka         if (p->attached)
347*437bfbebSnyanmisaka             mpp_node_task_detach(&p->task);
348*437bfbebSnyanmisaka 
349*437bfbebSnyanmisaka         mpp_assert(p->attached == 0);
350*437bfbebSnyanmisaka 
351*437bfbebSnyanmisaka         sem_destroy(&p->sem_detach);
352*437bfbebSnyanmisaka 
353*437bfbebSnyanmisaka         mpp_free(p);
354*437bfbebSnyanmisaka     }
355*437bfbebSnyanmisaka 
356*437bfbebSnyanmisaka     return MPP_OK;
357*437bfbebSnyanmisaka }
358*437bfbebSnyanmisaka 
mpp_node_set_func(MppNode node,TaskProc proc,void * param)359*437bfbebSnyanmisaka MPP_RET mpp_node_set_func(MppNode node, TaskProc proc, void *param)
360*437bfbebSnyanmisaka {
361*437bfbebSnyanmisaka     MppNodeImpl *p = (MppNodeImpl *)node;
362*437bfbebSnyanmisaka     if (!p)
363*437bfbebSnyanmisaka         return MPP_NOK;
364*437bfbebSnyanmisaka 
365*437bfbebSnyanmisaka     p->work.proc = proc;
366*437bfbebSnyanmisaka     p->work.param = param;
367*437bfbebSnyanmisaka 
368*437bfbebSnyanmisaka     return MPP_OK;
369*437bfbebSnyanmisaka }
370*437bfbebSnyanmisaka 
cluster_worker_init(ClusterWorker * p,MppCluster * cluster)371*437bfbebSnyanmisaka MPP_RET cluster_worker_init(ClusterWorker *p, MppCluster *cluster)
372*437bfbebSnyanmisaka {
373*437bfbebSnyanmisaka     MppThread *thd = NULL;
374*437bfbebSnyanmisaka     MPP_RET ret = MPP_NOK;
375*437bfbebSnyanmisaka 
376*437bfbebSnyanmisaka     INIT_LIST_HEAD(&p->list_task);
377*437bfbebSnyanmisaka     p->worker_id = cluster->worker_id++;
378*437bfbebSnyanmisaka 
379*437bfbebSnyanmisaka     p->batch_count = 1;
380*437bfbebSnyanmisaka     p->work_count = 0;
381*437bfbebSnyanmisaka     p->cluster = cluster;
382*437bfbebSnyanmisaka     p->state = WORKER_IDLE;
383*437bfbebSnyanmisaka     snprintf(p->name, sizeof(p->name) - 1, "%d:W%d", cluster->pid, p->worker_id);
384*437bfbebSnyanmisaka     thd = mpp_thread_create(cluster->worker_func, p, p->name);
385*437bfbebSnyanmisaka     if (thd) {
386*437bfbebSnyanmisaka         p->thd = thd;
387*437bfbebSnyanmisaka         mpp_thread_start(thd);
388*437bfbebSnyanmisaka         ret = MPP_OK;
389*437bfbebSnyanmisaka     }
390*437bfbebSnyanmisaka 
391*437bfbebSnyanmisaka     return ret;
392*437bfbebSnyanmisaka }
393*437bfbebSnyanmisaka 
cluster_worker_deinit(ClusterWorker * p)394*437bfbebSnyanmisaka MPP_RET cluster_worker_deinit(ClusterWorker *p)
395*437bfbebSnyanmisaka {
396*437bfbebSnyanmisaka     if (p->thd) {
397*437bfbebSnyanmisaka         mpp_thread_stop(p->thd);
398*437bfbebSnyanmisaka         mpp_thread_destroy(p->thd);
399*437bfbebSnyanmisaka         p->thd = NULL;
400*437bfbebSnyanmisaka     }
401*437bfbebSnyanmisaka 
402*437bfbebSnyanmisaka     mpp_assert(list_empty(&p->list_task));
403*437bfbebSnyanmisaka     mpp_assert(p->work_count == 0);
404*437bfbebSnyanmisaka 
405*437bfbebSnyanmisaka     p->batch_count = 0;
406*437bfbebSnyanmisaka     p->cluster = NULL;
407*437bfbebSnyanmisaka 
408*437bfbebSnyanmisaka     return MPP_OK;
409*437bfbebSnyanmisaka }
410*437bfbebSnyanmisaka 
cluster_worker_get_task(ClusterWorker * p)411*437bfbebSnyanmisaka RK_S32 cluster_worker_get_task(ClusterWorker *p)
412*437bfbebSnyanmisaka {
413*437bfbebSnyanmisaka     MppCluster *cluster = p->cluster;
414*437bfbebSnyanmisaka     RK_S32 batch_count = p->batch_count;
415*437bfbebSnyanmisaka     RK_S32 count = 0;
416*437bfbebSnyanmisaka     RK_U32 new_st;
417*437bfbebSnyanmisaka     RK_U32 old_st;
418*437bfbebSnyanmisaka     bool ret;
419*437bfbebSnyanmisaka     RK_S32 i;
420*437bfbebSnyanmisaka 
421*437bfbebSnyanmisaka     cluster_dbg_flow("%s get %d task start\n", p->name, batch_count);
422*437bfbebSnyanmisaka 
423*437bfbebSnyanmisaka     for (i = 0; i < MAX_PRIORITY; i++) {
424*437bfbebSnyanmisaka         ClusterQueue *queue = &cluster->queue[i];
425*437bfbebSnyanmisaka         MppNodeTask *task = NULL;
426*437bfbebSnyanmisaka         MppNodeImpl *node = NULL;
427*437bfbebSnyanmisaka 
428*437bfbebSnyanmisaka         do {
429*437bfbebSnyanmisaka             cluster_queue_lock(queue);
430*437bfbebSnyanmisaka 
431*437bfbebSnyanmisaka             if (list_empty(&queue->list)) {
432*437bfbebSnyanmisaka                 mpp_assert(queue->count == 0);
433*437bfbebSnyanmisaka                 cluster_dbg_flow("%s get P%d task ret no task\n", p->name, i);
434*437bfbebSnyanmisaka                 cluster_queue_unlock(queue);
435*437bfbebSnyanmisaka                 break;
436*437bfbebSnyanmisaka             }
437*437bfbebSnyanmisaka 
438*437bfbebSnyanmisaka             mpp_assert(queue->count);
439*437bfbebSnyanmisaka             task = list_first_entry(&queue->list, MppNodeTask, list_sched);
440*437bfbebSnyanmisaka             list_del_init(&task->list_sched);
441*437bfbebSnyanmisaka             node = task->node;
442*437bfbebSnyanmisaka 
443*437bfbebSnyanmisaka             queue->count--;
444*437bfbebSnyanmisaka 
445*437bfbebSnyanmisaka             do {
446*437bfbebSnyanmisaka                 old_st = node->state;
447*437bfbebSnyanmisaka                 new_st = old_st ^ (NODE_WAIT | NODE_RUN);
448*437bfbebSnyanmisaka 
449*437bfbebSnyanmisaka                 mpp_assert(old_st & NODE_WAIT);
450*437bfbebSnyanmisaka                 ret = MPP_BOOL_CAS(&node->state, old_st, new_st);
451*437bfbebSnyanmisaka             } while (!ret);
452*437bfbebSnyanmisaka 
453*437bfbebSnyanmisaka             list_add_tail(&task->list_sched, &p->list_task);
454*437bfbebSnyanmisaka             p->work_count++;
455*437bfbebSnyanmisaka             count++;
456*437bfbebSnyanmisaka 
457*437bfbebSnyanmisaka             cluster_dbg_flow("%s get P%d %s -> rq %d\n", p->name, i, node->name, p->work_count);
458*437bfbebSnyanmisaka 
459*437bfbebSnyanmisaka             cluster_queue_unlock(queue);
460*437bfbebSnyanmisaka 
461*437bfbebSnyanmisaka             if (count >= batch_count)
462*437bfbebSnyanmisaka                 break;
463*437bfbebSnyanmisaka         } while (1);
464*437bfbebSnyanmisaka 
465*437bfbebSnyanmisaka         if (count >= batch_count)
466*437bfbebSnyanmisaka             break;
467*437bfbebSnyanmisaka     }
468*437bfbebSnyanmisaka 
469*437bfbebSnyanmisaka     cluster_dbg_flow("%s get %d task ret %d\n", p->name, batch_count, count);
470*437bfbebSnyanmisaka 
471*437bfbebSnyanmisaka     return count;
472*437bfbebSnyanmisaka }
473*437bfbebSnyanmisaka 
cluster_worker_run_task(ClusterWorker * p)474*437bfbebSnyanmisaka static void cluster_worker_run_task(ClusterWorker *p)
475*437bfbebSnyanmisaka {
476*437bfbebSnyanmisaka     RK_U32 new_st;
477*437bfbebSnyanmisaka     RK_U32 old_st;
478*437bfbebSnyanmisaka     bool cas_ret;
479*437bfbebSnyanmisaka 
480*437bfbebSnyanmisaka     cluster_dbg_flow("%s run %d work start\n", p->name, p->work_count);
481*437bfbebSnyanmisaka 
482*437bfbebSnyanmisaka     while (!list_empty(&p->list_task)) {
483*437bfbebSnyanmisaka         MppNodeTask *task = list_first_entry(&p->list_task, MppNodeTask, list_sched);
484*437bfbebSnyanmisaka         MppNodeProc *proc = task->proc;
485*437bfbebSnyanmisaka         MppNodeImpl *node = task->node;
486*437bfbebSnyanmisaka         RK_S64 time_start;
487*437bfbebSnyanmisaka         RK_S64 time_end;
488*437bfbebSnyanmisaka         RK_U32 state;
489*437bfbebSnyanmisaka         MPP_RET proc_ret;
490*437bfbebSnyanmisaka 
491*437bfbebSnyanmisaka         /* check trigger for re-add task */
492*437bfbebSnyanmisaka         cluster_dbg_flow("%s run %s start atate %d\n", p->name, task->node_name, node->state);
493*437bfbebSnyanmisaka         mpp_assert(node->state & NODE_RUN);
494*437bfbebSnyanmisaka         if (!(node->state & NODE_RUN))
495*437bfbebSnyanmisaka             mpp_err_f("%s run state check %x is invalid on run", p->name, node->state);
496*437bfbebSnyanmisaka 
497*437bfbebSnyanmisaka         time_start = mpp_time();
498*437bfbebSnyanmisaka         proc_ret = proc->proc(proc->param);
499*437bfbebSnyanmisaka         time_end = mpp_time();
500*437bfbebSnyanmisaka 
501*437bfbebSnyanmisaka         cluster_dbg_flow("%s run %s ret %d\n", p->name, task->node_name, proc_ret);
502*437bfbebSnyanmisaka         proc->run_time += time_end - time_start;
503*437bfbebSnyanmisaka         proc->run_count++;
504*437bfbebSnyanmisaka 
505*437bfbebSnyanmisaka         state = node->state;
506*437bfbebSnyanmisaka         if (!(state & NODE_VALID)) {
507*437bfbebSnyanmisaka             cluster_dbg_flow("%s run found destroy\n", p->name);
508*437bfbebSnyanmisaka             list_del_init(&task->list_sched);
509*437bfbebSnyanmisaka             node->attached = 0;
510*437bfbebSnyanmisaka 
511*437bfbebSnyanmisaka             sem_post(&node->sem_detach);
512*437bfbebSnyanmisaka             cluster_dbg_flow("%s run sem post done\n", p->name);
513*437bfbebSnyanmisaka         } else if (state & NODE_SIGNAL) {
514*437bfbebSnyanmisaka             ClusterQueue *queue = task->queue;
515*437bfbebSnyanmisaka 
516*437bfbebSnyanmisaka             list_del_init(&task->list_sched);
517*437bfbebSnyanmisaka 
518*437bfbebSnyanmisaka             do {
519*437bfbebSnyanmisaka                 old_st = state;
520*437bfbebSnyanmisaka                 // NOTE: clear NODE_RUN and NODE_SIGNAL, set NODE_WAIT
521*437bfbebSnyanmisaka                 new_st = old_st ^ (NODE_SIGNAL | NODE_WAIT | NODE_RUN);
522*437bfbebSnyanmisaka                 cas_ret = MPP_BOOL_CAS(&node->state, old_st, new_st);
523*437bfbebSnyanmisaka             } while (!cas_ret);
524*437bfbebSnyanmisaka 
525*437bfbebSnyanmisaka             cluster_dbg_flow("%s run state %x -> %x signal -> wait\n", p->name, old_st, new_st);
526*437bfbebSnyanmisaka 
527*437bfbebSnyanmisaka             cluster_queue_lock(queue);
528*437bfbebSnyanmisaka             list_add_tail(&task->list_sched, &queue->list);
529*437bfbebSnyanmisaka             queue->count++;
530*437bfbebSnyanmisaka             cluster_queue_unlock(queue);
531*437bfbebSnyanmisaka         } else {
532*437bfbebSnyanmisaka             list_del_init(&task->list_sched);
533*437bfbebSnyanmisaka             do {
534*437bfbebSnyanmisaka                 old_st = node->state;
535*437bfbebSnyanmisaka                 new_st = old_st ^ (NODE_IDLE | NODE_RUN);
536*437bfbebSnyanmisaka 
537*437bfbebSnyanmisaka                 cas_ret = MPP_BOOL_CAS(&node->state, old_st, new_st);
538*437bfbebSnyanmisaka             } while (!cas_ret);
539*437bfbebSnyanmisaka             mpp_assert(node->state & NODE_IDLE);
540*437bfbebSnyanmisaka             mpp_assert(!(node->state & NODE_RUN));
541*437bfbebSnyanmisaka 
542*437bfbebSnyanmisaka             cluster_dbg_flow("%s run state %x -> %x run -> idle\n", p->name, old_st, new_st);
543*437bfbebSnyanmisaka         }
544*437bfbebSnyanmisaka 
545*437bfbebSnyanmisaka         p->work_count--;
546*437bfbebSnyanmisaka     }
547*437bfbebSnyanmisaka 
548*437bfbebSnyanmisaka     mpp_assert(p->work_count == 0);
549*437bfbebSnyanmisaka 
550*437bfbebSnyanmisaka     cluster_dbg_flow("%s run all done\n", p->name);
551*437bfbebSnyanmisaka }
552*437bfbebSnyanmisaka 
cluster_worker(void * data)553*437bfbebSnyanmisaka static void *cluster_worker(void *data)
554*437bfbebSnyanmisaka {
555*437bfbebSnyanmisaka     ClusterWorker *p = (ClusterWorker *)data;
556*437bfbebSnyanmisaka     MppThread *thd = p->thd;
557*437bfbebSnyanmisaka 
558*437bfbebSnyanmisaka     while (1) {
559*437bfbebSnyanmisaka         {
560*437bfbebSnyanmisaka             RK_S32 task_count = 0;
561*437bfbebSnyanmisaka 
562*437bfbebSnyanmisaka             cluster_dbg_lock("%s lock start\n", p->name);
563*437bfbebSnyanmisaka             mpp_thread_lock(thd, THREAD_WORK);
564*437bfbebSnyanmisaka             cluster_dbg_lock("%s lock done\n", p->name);
565*437bfbebSnyanmisaka 
566*437bfbebSnyanmisaka             if (MPP_THREAD_RUNNING != mpp_thread_get_status(thd, THREAD_WORK)) {
567*437bfbebSnyanmisaka                 mpp_thread_unlock(thd, THREAD_WORK);
568*437bfbebSnyanmisaka                 break;
569*437bfbebSnyanmisaka             }
570*437bfbebSnyanmisaka 
571*437bfbebSnyanmisaka             task_count = cluster_worker_get_task(p);
572*437bfbebSnyanmisaka             if (!task_count) {
573*437bfbebSnyanmisaka                 p->state = WORKER_IDLE;
574*437bfbebSnyanmisaka                 mpp_thread_wait(thd, THREAD_WORK);
575*437bfbebSnyanmisaka                 p->state = WORKER_RUNNING;
576*437bfbebSnyanmisaka             }
577*437bfbebSnyanmisaka             mpp_thread_unlock(thd, THREAD_WORK);
578*437bfbebSnyanmisaka         }
579*437bfbebSnyanmisaka 
580*437bfbebSnyanmisaka         cluster_worker_run_task(p);
581*437bfbebSnyanmisaka     }
582*437bfbebSnyanmisaka 
583*437bfbebSnyanmisaka     return NULL;
584*437bfbebSnyanmisaka }
585*437bfbebSnyanmisaka 
cluster_signal_f(const char * caller,MppCluster * p)586*437bfbebSnyanmisaka void cluster_signal_f(const char *caller, MppCluster *p)
587*437bfbebSnyanmisaka {
588*437bfbebSnyanmisaka     RK_S32 i;
589*437bfbebSnyanmisaka 
590*437bfbebSnyanmisaka     cluster_dbg_flow("%s signal from %s\n", p->name, caller);
591*437bfbebSnyanmisaka 
592*437bfbebSnyanmisaka     for (i = 0; i < p->worker_count; i++) {
593*437bfbebSnyanmisaka         ClusterWorker *worker = &p->worker[i];
594*437bfbebSnyanmisaka         MppThread *thd = worker->thd;
595*437bfbebSnyanmisaka 
596*437bfbebSnyanmisaka         mpp_thread_lock(thd, THREAD_WORK);
597*437bfbebSnyanmisaka 
598*437bfbebSnyanmisaka         if (worker->state == WORKER_IDLE) {
599*437bfbebSnyanmisaka             mpp_thread_signal(thd, THREAD_WORK);
600*437bfbebSnyanmisaka             cluster_dbg_flow("%s signal\n", p->name);
601*437bfbebSnyanmisaka             mpp_thread_unlock(thd, THREAD_WORK);
602*437bfbebSnyanmisaka             break;
603*437bfbebSnyanmisaka         }
604*437bfbebSnyanmisaka 
605*437bfbebSnyanmisaka         mpp_thread_unlock(thd, THREAD_WORK);
606*437bfbebSnyanmisaka     }
607*437bfbebSnyanmisaka }
608*437bfbebSnyanmisaka 
mpp_cluster_srv_init(void)609*437bfbebSnyanmisaka static void mpp_cluster_srv_init(void)
610*437bfbebSnyanmisaka {
611*437bfbebSnyanmisaka     MppClusterServer *srv = srv_cluster;
612*437bfbebSnyanmisaka 
613*437bfbebSnyanmisaka     mpp_env_get_u32("mpp_cluster_debug", &mpp_cluster_debug, 0);
614*437bfbebSnyanmisaka     mpp_env_get_u32("mpp_cluster_thd_cnt", &mpp_cluster_thd_cnt, 1);
615*437bfbebSnyanmisaka 
616*437bfbebSnyanmisaka     if (srv)
617*437bfbebSnyanmisaka         return;
618*437bfbebSnyanmisaka 
619*437bfbebSnyanmisaka     srv = mpp_calloc(MppClusterServer, 1);
620*437bfbebSnyanmisaka     if (!srv) {
621*437bfbebSnyanmisaka         mpp_err_f("failed to allocate cluster server\n");
622*437bfbebSnyanmisaka         return;
623*437bfbebSnyanmisaka     }
624*437bfbebSnyanmisaka 
625*437bfbebSnyanmisaka     memset(srv->clusters, 0, sizeof(srv->clusters));
626*437bfbebSnyanmisaka     mpp_mutex_init(&srv->mutex);
627*437bfbebSnyanmisaka 
628*437bfbebSnyanmisaka     srv_cluster = srv;
629*437bfbebSnyanmisaka }
630*437bfbebSnyanmisaka 
mpp_cluster_srv_deinit(void)631*437bfbebSnyanmisaka static void mpp_cluster_srv_deinit(void)
632*437bfbebSnyanmisaka {
633*437bfbebSnyanmisaka     MppClusterServer *srv = srv_cluster;
634*437bfbebSnyanmisaka     RK_S32 i;
635*437bfbebSnyanmisaka 
636*437bfbebSnyanmisaka     if (!srv)
637*437bfbebSnyanmisaka         return;
638*437bfbebSnyanmisaka 
639*437bfbebSnyanmisaka     for (i = 0; i < VPU_CLIENT_BUTT; i++)
640*437bfbebSnyanmisaka         cluster_server_put((MppClientType)i);
641*437bfbebSnyanmisaka 
642*437bfbebSnyanmisaka     mpp_mutex_destroy(&srv->mutex);
643*437bfbebSnyanmisaka     MPP_FREE(srv_cluster);
644*437bfbebSnyanmisaka }
645*437bfbebSnyanmisaka 
MPP_SINGLETON(MPP_SGLN_CLUSTER,mpp_cluster,mpp_cluster_srv_init,mpp_cluster_srv_deinit)646*437bfbebSnyanmisaka MPP_SINGLETON(MPP_SGLN_CLUSTER, mpp_cluster, mpp_cluster_srv_init, mpp_cluster_srv_deinit)
647*437bfbebSnyanmisaka 
648*437bfbebSnyanmisaka static MppCluster *cluster_server_get(MppClientType client_type)
649*437bfbebSnyanmisaka {
650*437bfbebSnyanmisaka     MppClusterServer *srv = get_srv_cluster_f();
651*437bfbebSnyanmisaka     RK_S32 i;
652*437bfbebSnyanmisaka     MppCluster *p = NULL;
653*437bfbebSnyanmisaka 
654*437bfbebSnyanmisaka     if (!srv || client_type >= VPU_CLIENT_BUTT)
655*437bfbebSnyanmisaka         goto done;
656*437bfbebSnyanmisaka 
657*437bfbebSnyanmisaka     {
658*437bfbebSnyanmisaka         mpp_mutex_lock(&srv->mutex);
659*437bfbebSnyanmisaka 
660*437bfbebSnyanmisaka         p = srv->clusters[client_type];
661*437bfbebSnyanmisaka         if (p) {
662*437bfbebSnyanmisaka             mpp_mutex_unlock(&srv->mutex);
663*437bfbebSnyanmisaka             goto done;
664*437bfbebSnyanmisaka         }
665*437bfbebSnyanmisaka 
666*437bfbebSnyanmisaka         p = mpp_malloc(MppCluster, 1);
667*437bfbebSnyanmisaka         if (p) {
668*437bfbebSnyanmisaka             for (i = 0; i < MAX_PRIORITY; i++)
669*437bfbebSnyanmisaka                 mpp_cluster_queue_init(&p->queue[i], p);
670*437bfbebSnyanmisaka 
671*437bfbebSnyanmisaka             p->pid  = getpid();
672*437bfbebSnyanmisaka             p->client_type = client_type;
673*437bfbebSnyanmisaka             snprintf(p->name, sizeof(p->name) - 1, "%d:%d", p->pid, client_type);
674*437bfbebSnyanmisaka             p->node_id = 0;
675*437bfbebSnyanmisaka             p->worker_id = 0;
676*437bfbebSnyanmisaka             p->worker_func = cluster_worker;
677*437bfbebSnyanmisaka             p->worker_count = mpp_cluster_thd_cnt;
678*437bfbebSnyanmisaka 
679*437bfbebSnyanmisaka             mpp_assert(p->worker_count > 0);
680*437bfbebSnyanmisaka 
681*437bfbebSnyanmisaka             p->worker = mpp_malloc(ClusterWorker, p->worker_count);
682*437bfbebSnyanmisaka 
683*437bfbebSnyanmisaka             for (i = 0; i < p->worker_count; i++)
684*437bfbebSnyanmisaka                 cluster_worker_init(&p->worker[i], p);
685*437bfbebSnyanmisaka 
686*437bfbebSnyanmisaka             srv->clusters[client_type] = p;
687*437bfbebSnyanmisaka             cluster_dbg_flow("%s created\n", p->name);
688*437bfbebSnyanmisaka         }
689*437bfbebSnyanmisaka 
690*437bfbebSnyanmisaka         mpp_mutex_unlock(&srv->mutex);
691*437bfbebSnyanmisaka     }
692*437bfbebSnyanmisaka 
693*437bfbebSnyanmisaka done:
694*437bfbebSnyanmisaka     if (p)
695*437bfbebSnyanmisaka         cluster_dbg_flow("%s get\n", p->name);
696*437bfbebSnyanmisaka     else
697*437bfbebSnyanmisaka         cluster_dbg_flow("%d get cluster %d failed\n", getpid(), client_type);
698*437bfbebSnyanmisaka 
699*437bfbebSnyanmisaka     return p;
700*437bfbebSnyanmisaka }
701*437bfbebSnyanmisaka 
cluster_server_put(MppClientType client_type)702*437bfbebSnyanmisaka static MPP_RET cluster_server_put(MppClientType client_type)
703*437bfbebSnyanmisaka {
704*437bfbebSnyanmisaka     MppClusterServer *srv = get_srv_cluster_f();
705*437bfbebSnyanmisaka     MppCluster *p;
706*437bfbebSnyanmisaka     RK_S32 i;
707*437bfbebSnyanmisaka 
708*437bfbebSnyanmisaka     if (!srv || client_type >= VPU_CLIENT_BUTT)
709*437bfbebSnyanmisaka         return MPP_NOK;
710*437bfbebSnyanmisaka 
711*437bfbebSnyanmisaka     mpp_mutex_lock(&srv->mutex);
712*437bfbebSnyanmisaka 
713*437bfbebSnyanmisaka     p = srv->clusters[client_type];
714*437bfbebSnyanmisaka     if (!p) {
715*437bfbebSnyanmisaka         mpp_mutex_unlock(&srv->mutex);
716*437bfbebSnyanmisaka         return MPP_NOK;
717*437bfbebSnyanmisaka     }
718*437bfbebSnyanmisaka 
719*437bfbebSnyanmisaka     for (i = 0; i < p->worker_count; i++)
720*437bfbebSnyanmisaka         cluster_worker_deinit(&p->worker[i]);
721*437bfbebSnyanmisaka 
722*437bfbebSnyanmisaka     for (i = 0; i < MAX_PRIORITY; i++)
723*437bfbebSnyanmisaka         mpp_cluster_queue_deinit(&p->queue[i]);
724*437bfbebSnyanmisaka 
725*437bfbebSnyanmisaka     cluster_dbg_flow("put %s\n", p->name);
726*437bfbebSnyanmisaka 
727*437bfbebSnyanmisaka     MPP_FREE(p->worker);
728*437bfbebSnyanmisaka     mpp_free(p);
729*437bfbebSnyanmisaka     srv->clusters[client_type] = NULL;
730*437bfbebSnyanmisaka 
731*437bfbebSnyanmisaka     mpp_mutex_unlock(&srv->mutex);
732*437bfbebSnyanmisaka 
733*437bfbebSnyanmisaka     return MPP_OK;
734*437bfbebSnyanmisaka }
735*437bfbebSnyanmisaka 
mpp_node_attach(MppNode node,MppClientType type)736*437bfbebSnyanmisaka MPP_RET mpp_node_attach(MppNode node, MppClientType type)
737*437bfbebSnyanmisaka {
738*437bfbebSnyanmisaka     MppNodeImpl *impl = (MppNodeImpl *)node;
739*437bfbebSnyanmisaka     MppCluster *p = cluster_server_get(type);
740*437bfbebSnyanmisaka     RK_U32 priority = impl->priority;
741*437bfbebSnyanmisaka     ClusterQueue *queue = &p->queue[priority];
742*437bfbebSnyanmisaka 
743*437bfbebSnyanmisaka     mpp_assert(priority < MAX_PRIORITY);
744*437bfbebSnyanmisaka     mpp_assert(p);
745*437bfbebSnyanmisaka 
746*437bfbebSnyanmisaka     impl->node_id = MPP_FETCH_ADD(&p->node_id, 1);
747*437bfbebSnyanmisaka 
748*437bfbebSnyanmisaka     snprintf(impl->name, sizeof(impl->name) - 1, "%s:%d", p->name, impl->node_id);
749*437bfbebSnyanmisaka 
750*437bfbebSnyanmisaka     mpp_node_task_attach(&impl->task, impl, queue, &impl->work);
751*437bfbebSnyanmisaka 
752*437bfbebSnyanmisaka     MPP_FETCH_ADD(&p->node_count, 1);
753*437bfbebSnyanmisaka 
754*437bfbebSnyanmisaka     cluster_dbg_flow("%s:%d attached %d\n", p->name, impl->node_id, p->node_count);
755*437bfbebSnyanmisaka 
756*437bfbebSnyanmisaka     /* attach and run once first */
757*437bfbebSnyanmisaka     mpp_node_task_schedule(&impl->task);
758*437bfbebSnyanmisaka     cluster_dbg_flow("%s trigger signal from %s\n", impl->name, __FUNCTION__);
759*437bfbebSnyanmisaka 
760*437bfbebSnyanmisaka     return MPP_OK;
761*437bfbebSnyanmisaka }
762*437bfbebSnyanmisaka 
mpp_node_detach(MppNode node)763*437bfbebSnyanmisaka MPP_RET mpp_node_detach(MppNode node)
764*437bfbebSnyanmisaka {
765*437bfbebSnyanmisaka     MppNodeImpl *impl = (MppNodeImpl *)node;
766*437bfbebSnyanmisaka 
767*437bfbebSnyanmisaka     mpp_node_task_detach(&impl->task);
768*437bfbebSnyanmisaka     cluster_dbg_flow("%s detached\n", impl->name);
769*437bfbebSnyanmisaka 
770*437bfbebSnyanmisaka     return MPP_OK;
771*437bfbebSnyanmisaka }
772*437bfbebSnyanmisaka 
mpp_node_trigger_f(const char * caller,MppNode node,RK_S32 trigger)773*437bfbebSnyanmisaka MPP_RET mpp_node_trigger_f(const char *caller, MppNode node, RK_S32 trigger)
774*437bfbebSnyanmisaka {
775*437bfbebSnyanmisaka     if (trigger) {
776*437bfbebSnyanmisaka         MppNodeImpl *impl = (MppNodeImpl *)node;
777*437bfbebSnyanmisaka 
778*437bfbebSnyanmisaka         mpp_node_task_schedule_from(caller, &impl->task);
779*437bfbebSnyanmisaka     }
780*437bfbebSnyanmisaka 
781*437bfbebSnyanmisaka     return MPP_OK;
782*437bfbebSnyanmisaka }
783