1*437bfbebSnyanmisaka /* SPDX-License-Identifier: Apache-2.0 OR MIT */
2*437bfbebSnyanmisaka /*
3*437bfbebSnyanmisaka * Copyright (c) 2021 Rockchip Electronics Co., Ltd.
4*437bfbebSnyanmisaka */
5*437bfbebSnyanmisaka
6*437bfbebSnyanmisaka #define MODULE_TAG "mpp_cluster"
7*437bfbebSnyanmisaka
8*437bfbebSnyanmisaka #include <string.h>
9*437bfbebSnyanmisaka
10*437bfbebSnyanmisaka #include "mpp_mem.h"
11*437bfbebSnyanmisaka #include "mpp_env.h"
12*437bfbebSnyanmisaka #include "mpp_lock.h"
13*437bfbebSnyanmisaka #include "mpp_time.h"
14*437bfbebSnyanmisaka #include "mpp_debug.h"
15*437bfbebSnyanmisaka #include "mpp_common.h"
16*437bfbebSnyanmisaka #include "mpp_singleton.h"
17*437bfbebSnyanmisaka
18*437bfbebSnyanmisaka #include "mpp_cluster.h"
19*437bfbebSnyanmisaka #include "mpp_dev_defs.h"
20*437bfbebSnyanmisaka
21*437bfbebSnyanmisaka #define MPP_CLUSTER_DBG_FLOW (0x00000001)
22*437bfbebSnyanmisaka #define MPP_CLUSTER_DBG_LOCK (0x00000002)
23*437bfbebSnyanmisaka
24*437bfbebSnyanmisaka #define cluster_dbg(flag, fmt, ...) _mpp_dbg(mpp_cluster_debug, flag, fmt, ## __VA_ARGS__)
25*437bfbebSnyanmisaka #define cluster_dbg_f(flag, fmt, ...) _mpp_dbg_f(mpp_cluster_debug, flag, fmt, ## __VA_ARGS__)
26*437bfbebSnyanmisaka
27*437bfbebSnyanmisaka #define cluster_dbg_flow(fmt, ...) cluster_dbg(MPP_CLUSTER_DBG_FLOW, fmt, ## __VA_ARGS__)
28*437bfbebSnyanmisaka #define cluster_dbg_lock(fmt, ...) cluster_dbg(MPP_CLUSTER_DBG_LOCK, fmt, ## __VA_ARGS__)
29*437bfbebSnyanmisaka
30*437bfbebSnyanmisaka RK_U32 mpp_cluster_debug = 0;
31*437bfbebSnyanmisaka RK_U32 mpp_cluster_thd_cnt = 1;
32*437bfbebSnyanmisaka
33*437bfbebSnyanmisaka typedef struct MppNodeProc_s MppNodeProc;
34*437bfbebSnyanmisaka typedef struct MppNodeTask_s MppNodeTask;
35*437bfbebSnyanmisaka typedef struct MppNodeImpl_s MppNodeImpl;
36*437bfbebSnyanmisaka
37*437bfbebSnyanmisaka typedef struct ClusterQueue_s ClusterQueue;
38*437bfbebSnyanmisaka typedef struct ClusterWorker_s ClusterWorker;
39*437bfbebSnyanmisaka typedef struct MppCluster_s MppCluster;
40*437bfbebSnyanmisaka
41*437bfbebSnyanmisaka #define NODE_VALID (0x00000001)
42*437bfbebSnyanmisaka #define NODE_IDLE (0x00000002)
43*437bfbebSnyanmisaka #define NODE_SIGNAL (0x00000004)
44*437bfbebSnyanmisaka #define NODE_WAIT (0x00000008)
45*437bfbebSnyanmisaka #define NODE_RUN (0x00000010)
46*437bfbebSnyanmisaka
47*437bfbebSnyanmisaka #define NODE_ACT_NONE (0x00000000)
48*437bfbebSnyanmisaka #define NODE_ACT_IDLE_TO_WAIT (0x00000001)
49*437bfbebSnyanmisaka #define NODE_ACT_RUN_TO_SIGNAL (0x00000002)
50*437bfbebSnyanmisaka
51*437bfbebSnyanmisaka typedef enum MppWorkerState_e {
52*437bfbebSnyanmisaka WORKER_IDLE,
53*437bfbebSnyanmisaka WORKER_RUNNING,
54*437bfbebSnyanmisaka
55*437bfbebSnyanmisaka WORKER_STATE_BUTT,
56*437bfbebSnyanmisaka } MppWorkerState;
57*437bfbebSnyanmisaka
58*437bfbebSnyanmisaka struct MppNodeProc_s {
59*437bfbebSnyanmisaka TaskProc proc;
60*437bfbebSnyanmisaka void *param;
61*437bfbebSnyanmisaka
62*437bfbebSnyanmisaka /* timing statistic */
63*437bfbebSnyanmisaka RK_U32 run_count;
64*437bfbebSnyanmisaka RK_S64 run_time;
65*437bfbebSnyanmisaka };
66*437bfbebSnyanmisaka
67*437bfbebSnyanmisaka struct MppNodeTask_s {
68*437bfbebSnyanmisaka struct list_head list_sched;
69*437bfbebSnyanmisaka MppNodeImpl *node;
70*437bfbebSnyanmisaka const char *node_name;
71*437bfbebSnyanmisaka
72*437bfbebSnyanmisaka /* lock ptr to cluster queue lock */
73*437bfbebSnyanmisaka ClusterQueue *queue;
74*437bfbebSnyanmisaka
75*437bfbebSnyanmisaka MppNodeProc *proc;
76*437bfbebSnyanmisaka };
77*437bfbebSnyanmisaka
78*437bfbebSnyanmisaka /* MppNode will be embeded in MppCtx */
79*437bfbebSnyanmisaka struct MppNodeImpl_s {
80*437bfbebSnyanmisaka char name[32];
81*437bfbebSnyanmisaka /* list linked to scheduler */
82*437bfbebSnyanmisaka RK_S32 node_id;
83*437bfbebSnyanmisaka RK_U32 state;
84*437bfbebSnyanmisaka
85*437bfbebSnyanmisaka MppNodeProc work;
86*437bfbebSnyanmisaka
87*437bfbebSnyanmisaka RK_U32 priority;
88*437bfbebSnyanmisaka RK_S32 attached;
89*437bfbebSnyanmisaka sem_t sem_detach;
90*437bfbebSnyanmisaka
91*437bfbebSnyanmisaka /* for cluster schedule */
92*437bfbebSnyanmisaka MppNodeTask task;
93*437bfbebSnyanmisaka };
94*437bfbebSnyanmisaka
95*437bfbebSnyanmisaka struct ClusterQueue_s {
96*437bfbebSnyanmisaka MppCluster *cluster;
97*437bfbebSnyanmisaka
98*437bfbebSnyanmisaka pthread_mutex_t lock;
99*437bfbebSnyanmisaka struct list_head list;
100*437bfbebSnyanmisaka RK_S32 count;
101*437bfbebSnyanmisaka };
102*437bfbebSnyanmisaka
103*437bfbebSnyanmisaka struct ClusterWorker_s {
104*437bfbebSnyanmisaka char name[32];
105*437bfbebSnyanmisaka MppCluster *cluster;
106*437bfbebSnyanmisaka RK_S32 worker_id;
107*437bfbebSnyanmisaka
108*437bfbebSnyanmisaka MppThread *thd;
109*437bfbebSnyanmisaka MppWorkerState state;
110*437bfbebSnyanmisaka
111*437bfbebSnyanmisaka RK_S32 batch_count;
112*437bfbebSnyanmisaka RK_S32 work_count;
113*437bfbebSnyanmisaka struct list_head list_task;
114*437bfbebSnyanmisaka };
115*437bfbebSnyanmisaka
116*437bfbebSnyanmisaka struct MppCluster_s {
117*437bfbebSnyanmisaka char name[16];
118*437bfbebSnyanmisaka pid_t pid;
119*437bfbebSnyanmisaka RK_S32 client_type;
120*437bfbebSnyanmisaka RK_S32 node_id;
121*437bfbebSnyanmisaka RK_S32 worker_id;
122*437bfbebSnyanmisaka
123*437bfbebSnyanmisaka ClusterQueue queue[MAX_PRIORITY];
124*437bfbebSnyanmisaka RK_S32 node_count;
125*437bfbebSnyanmisaka
126*437bfbebSnyanmisaka /* multi-worker info */
127*437bfbebSnyanmisaka RK_S32 worker_count;
128*437bfbebSnyanmisaka ClusterWorker *worker;
129*437bfbebSnyanmisaka MppThreadFunc worker_func;
130*437bfbebSnyanmisaka };
131*437bfbebSnyanmisaka
132*437bfbebSnyanmisaka typedef struct MppClusterServer_s {
133*437bfbebSnyanmisaka MppMutex mutex;
134*437bfbebSnyanmisaka MppCluster *clusters[VPU_CLIENT_BUTT];
135*437bfbebSnyanmisaka } MppClusterServer;
136*437bfbebSnyanmisaka
137*437bfbebSnyanmisaka static MppClusterServer *srv_cluster = NULL;
138*437bfbebSnyanmisaka
139*437bfbebSnyanmisaka static MppCluster *cluster_server_get(MppClientType client_type);
140*437bfbebSnyanmisaka static MPP_RET cluster_server_put(MppClientType client_type);
141*437bfbebSnyanmisaka
142*437bfbebSnyanmisaka #define mpp_node_task_schedule(task) \
143*437bfbebSnyanmisaka mpp_node_task_schedule_f(__FUNCTION__, task)
144*437bfbebSnyanmisaka
145*437bfbebSnyanmisaka #define mpp_node_task_schedule_from(caller, task) \
146*437bfbebSnyanmisaka mpp_node_task_schedule_f(caller, task)
147*437bfbebSnyanmisaka
148*437bfbebSnyanmisaka #define cluster_queue_lock(queue) cluster_queue_lock_f(__FUNCTION__, queue)
149*437bfbebSnyanmisaka #define cluster_queue_unlock(queue) cluster_queue_unlock_f(__FUNCTION__, queue)
150*437bfbebSnyanmisaka
151*437bfbebSnyanmisaka #define get_srv_cluster_f() \
152*437bfbebSnyanmisaka ({ \
153*437bfbebSnyanmisaka MppClusterServer *__tmp; \
154*437bfbebSnyanmisaka if (srv_cluster) { \
155*437bfbebSnyanmisaka __tmp = srv_cluster; \
156*437bfbebSnyanmisaka } else { \
157*437bfbebSnyanmisaka mpp_cluster_srv_init(); \
158*437bfbebSnyanmisaka __tmp = srv_cluster; \
159*437bfbebSnyanmisaka if (!__tmp) \
160*437bfbebSnyanmisaka mpp_err("mpp cluster srv not init at %s\n", __FUNCTION__); \
161*437bfbebSnyanmisaka } \
162*437bfbebSnyanmisaka __tmp; \
163*437bfbebSnyanmisaka })
164*437bfbebSnyanmisaka
cluster_queue_lock_f(const char * caller,ClusterQueue * queue)165*437bfbebSnyanmisaka static MPP_RET cluster_queue_lock_f(const char *caller, ClusterQueue *queue)
166*437bfbebSnyanmisaka {
167*437bfbebSnyanmisaka MppCluster *cluster = queue->cluster;
168*437bfbebSnyanmisaka RK_S32 ret;
169*437bfbebSnyanmisaka
170*437bfbebSnyanmisaka cluster_dbg_lock("%s lock queue at %s start\n", cluster->name, caller);
171*437bfbebSnyanmisaka
172*437bfbebSnyanmisaka ret = pthread_mutex_lock(&queue->lock);
173*437bfbebSnyanmisaka
174*437bfbebSnyanmisaka cluster_dbg_lock("%s lock queue at %s ret %d \n", cluster->name, caller, ret);
175*437bfbebSnyanmisaka
176*437bfbebSnyanmisaka return (ret) ? MPP_NOK : MPP_OK;
177*437bfbebSnyanmisaka }
178*437bfbebSnyanmisaka
cluster_queue_unlock_f(const char * caller,ClusterQueue * queue)179*437bfbebSnyanmisaka static MPP_RET cluster_queue_unlock_f(const char *caller, ClusterQueue *queue)
180*437bfbebSnyanmisaka {
181*437bfbebSnyanmisaka MppCluster *cluster = queue->cluster;
182*437bfbebSnyanmisaka RK_S32 ret;
183*437bfbebSnyanmisaka
184*437bfbebSnyanmisaka cluster_dbg_lock("%s unlock queue at %s start\n", cluster->name, caller);
185*437bfbebSnyanmisaka
186*437bfbebSnyanmisaka ret = pthread_mutex_unlock(&queue->lock);
187*437bfbebSnyanmisaka
188*437bfbebSnyanmisaka cluster_dbg_lock("%s unlock queue at %s ret %d \n", cluster->name, caller, ret);
189*437bfbebSnyanmisaka
190*437bfbebSnyanmisaka return (ret) ? MPP_NOK : MPP_OK;
191*437bfbebSnyanmisaka }
192*437bfbebSnyanmisaka
193*437bfbebSnyanmisaka void cluster_signal_f(const char *caller, MppCluster *p);
194*437bfbebSnyanmisaka
mpp_cluster_queue_init(ClusterQueue * queue,MppCluster * cluster)195*437bfbebSnyanmisaka MPP_RET mpp_cluster_queue_init(ClusterQueue *queue, MppCluster *cluster)
196*437bfbebSnyanmisaka {
197*437bfbebSnyanmisaka pthread_mutexattr_t attr;
198*437bfbebSnyanmisaka
199*437bfbebSnyanmisaka pthread_mutexattr_init(&attr);
200*437bfbebSnyanmisaka pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
201*437bfbebSnyanmisaka pthread_mutex_init(&queue->lock, &attr);
202*437bfbebSnyanmisaka pthread_mutexattr_destroy(&attr);
203*437bfbebSnyanmisaka
204*437bfbebSnyanmisaka queue->cluster = cluster;
205*437bfbebSnyanmisaka INIT_LIST_HEAD(&queue->list);
206*437bfbebSnyanmisaka queue->count = 0;
207*437bfbebSnyanmisaka
208*437bfbebSnyanmisaka return MPP_OK;
209*437bfbebSnyanmisaka }
210*437bfbebSnyanmisaka
mpp_cluster_queue_deinit(ClusterQueue * queue)211*437bfbebSnyanmisaka MPP_RET mpp_cluster_queue_deinit(ClusterQueue *queue)
212*437bfbebSnyanmisaka {
213*437bfbebSnyanmisaka mpp_assert(!queue->count);
214*437bfbebSnyanmisaka mpp_assert(list_empty(&queue->list));
215*437bfbebSnyanmisaka
216*437bfbebSnyanmisaka pthread_mutex_destroy(&queue->lock);
217*437bfbebSnyanmisaka
218*437bfbebSnyanmisaka return MPP_OK;
219*437bfbebSnyanmisaka }
220*437bfbebSnyanmisaka
mpp_node_task_attach(MppNodeTask * task,MppNodeImpl * node,ClusterQueue * queue,MppNodeProc * proc)221*437bfbebSnyanmisaka MPP_RET mpp_node_task_attach(MppNodeTask *task, MppNodeImpl *node,
222*437bfbebSnyanmisaka ClusterQueue *queue, MppNodeProc *proc)
223*437bfbebSnyanmisaka {
224*437bfbebSnyanmisaka INIT_LIST_HEAD(&task->list_sched);
225*437bfbebSnyanmisaka
226*437bfbebSnyanmisaka task->node = node;
227*437bfbebSnyanmisaka task->node_name = node->name;
228*437bfbebSnyanmisaka
229*437bfbebSnyanmisaka task->queue = queue;
230*437bfbebSnyanmisaka task->proc = proc;
231*437bfbebSnyanmisaka
232*437bfbebSnyanmisaka node->state = NODE_VALID | NODE_IDLE;
233*437bfbebSnyanmisaka node->attached = 1;
234*437bfbebSnyanmisaka
235*437bfbebSnyanmisaka return MPP_OK;
236*437bfbebSnyanmisaka }
237*437bfbebSnyanmisaka
mpp_node_task_schedule_f(const char * caller,MppNodeTask * task)238*437bfbebSnyanmisaka MPP_RET mpp_node_task_schedule_f(const char *caller, MppNodeTask *task)
239*437bfbebSnyanmisaka {
240*437bfbebSnyanmisaka ClusterQueue *queue = task->queue;
241*437bfbebSnyanmisaka MppCluster *cluster = queue->cluster;
242*437bfbebSnyanmisaka MppNodeImpl *node = task->node;
243*437bfbebSnyanmisaka MppNodeProc *proc = task->proc;
244*437bfbebSnyanmisaka const char *node_name = task->node_name;
245*437bfbebSnyanmisaka RK_U32 new_st;
246*437bfbebSnyanmisaka RK_U32 action = NODE_ACT_NONE;
247*437bfbebSnyanmisaka bool ret = false;
248*437bfbebSnyanmisaka
249*437bfbebSnyanmisaka cluster_dbg_flow("%s sched from %s before [%d:%d] queue %d\n",
250*437bfbebSnyanmisaka node_name, caller, node->state, proc->run_count, queue->count);
251*437bfbebSnyanmisaka
252*437bfbebSnyanmisaka do {
253*437bfbebSnyanmisaka RK_U32 old_st = node->state;
254*437bfbebSnyanmisaka
255*437bfbebSnyanmisaka action = NODE_ACT_NONE;
256*437bfbebSnyanmisaka new_st = 0;
257*437bfbebSnyanmisaka
258*437bfbebSnyanmisaka if (old_st & NODE_WAIT) {
259*437bfbebSnyanmisaka cluster_dbg_flow("%s sched task %x stay wait\n", node_name, old_st);
260*437bfbebSnyanmisaka break;
261*437bfbebSnyanmisaka }
262*437bfbebSnyanmisaka
263*437bfbebSnyanmisaka if (old_st & NODE_IDLE) {
264*437bfbebSnyanmisaka new_st = old_st ^ (NODE_IDLE | NODE_WAIT);
265*437bfbebSnyanmisaka cluster_dbg_flow("%s sched task %x -> %x wait\n", node_name, old_st, new_st);
266*437bfbebSnyanmisaka action = NODE_ACT_IDLE_TO_WAIT;
267*437bfbebSnyanmisaka } else if (old_st & NODE_RUN) {
268*437bfbebSnyanmisaka new_st = old_st | NODE_SIGNAL;
269*437bfbebSnyanmisaka action = NODE_ACT_RUN_TO_SIGNAL;
270*437bfbebSnyanmisaka cluster_dbg_flow("%s sched task %x -> %x signal\n", node_name, old_st, new_st);
271*437bfbebSnyanmisaka } else {
272*437bfbebSnyanmisaka cluster_dbg_flow("%s sched task %x unknow state %x\n", node_name, old_st);
273*437bfbebSnyanmisaka }
274*437bfbebSnyanmisaka
275*437bfbebSnyanmisaka ret = MPP_BOOL_CAS(&node->state, old_st, new_st);
276*437bfbebSnyanmisaka cluster_dbg_flow("%s sched task %x -> %x cas ret %d act %d\n",
277*437bfbebSnyanmisaka node_name, old_st, new_st, ret, action);
278*437bfbebSnyanmisaka } while (!ret);
279*437bfbebSnyanmisaka
280*437bfbebSnyanmisaka switch (action) {
281*437bfbebSnyanmisaka case NODE_ACT_IDLE_TO_WAIT : {
282*437bfbebSnyanmisaka cluster_queue_lock(queue);
283*437bfbebSnyanmisaka mpp_assert(list_empty(&task->list_sched));
284*437bfbebSnyanmisaka list_add_tail(&task->list_sched, &queue->list);
285*437bfbebSnyanmisaka queue->count++;
286*437bfbebSnyanmisaka cluster_dbg_flow("%s sched task -> wq %s:%d\n", node_name, cluster->name, queue->count);
287*437bfbebSnyanmisaka cluster_queue_unlock(queue);
288*437bfbebSnyanmisaka
289*437bfbebSnyanmisaka cluster_dbg_flow("%s sched signal from %s\n", node_name, caller);
290*437bfbebSnyanmisaka cluster_signal_f(caller, cluster);
291*437bfbebSnyanmisaka } break;
292*437bfbebSnyanmisaka case NODE_ACT_RUN_TO_SIGNAL : {
293*437bfbebSnyanmisaka cluster_dbg_flow("%s sched signal from %s\n", node_name, caller);
294*437bfbebSnyanmisaka cluster_signal_f(caller, cluster);
295*437bfbebSnyanmisaka } break;
296*437bfbebSnyanmisaka }
297*437bfbebSnyanmisaka
298*437bfbebSnyanmisaka cluster_dbg_flow("%s sched from %s after [%d:%d] queue %d\n",
299*437bfbebSnyanmisaka node_name, caller, node->state, proc->run_count, queue->count);
300*437bfbebSnyanmisaka
301*437bfbebSnyanmisaka return MPP_OK;
302*437bfbebSnyanmisaka }
303*437bfbebSnyanmisaka
mpp_node_task_detach(MppNodeTask * task)304*437bfbebSnyanmisaka MPP_RET mpp_node_task_detach(MppNodeTask *task)
305*437bfbebSnyanmisaka {
306*437bfbebSnyanmisaka MppNodeImpl *node = task->node;
307*437bfbebSnyanmisaka MPP_RET ret = MPP_OK;
308*437bfbebSnyanmisaka
309*437bfbebSnyanmisaka if (node->attached) {
310*437bfbebSnyanmisaka const char *node_name = task->node_name;
311*437bfbebSnyanmisaka MppNodeProc *proc = task->proc;
312*437bfbebSnyanmisaka
313*437bfbebSnyanmisaka MPP_FETCH_AND(&node->state, ~NODE_VALID);
314*437bfbebSnyanmisaka
315*437bfbebSnyanmisaka mpp_node_task_schedule(task);
316*437bfbebSnyanmisaka
317*437bfbebSnyanmisaka cluster_dbg_flow("%s state %x:%d wait detach start\n",
318*437bfbebSnyanmisaka node_name, node->state, proc->run_count);
319*437bfbebSnyanmisaka
320*437bfbebSnyanmisaka sem_wait(&node->sem_detach);
321*437bfbebSnyanmisaka mpp_assert(node->attached == 0);
322*437bfbebSnyanmisaka
323*437bfbebSnyanmisaka cluster_dbg_flow("%s state %x:%d wait detach done\n",
324*437bfbebSnyanmisaka node_name, node->state, proc->run_count);
325*437bfbebSnyanmisaka }
326*437bfbebSnyanmisaka
327*437bfbebSnyanmisaka return ret;
328*437bfbebSnyanmisaka }
329*437bfbebSnyanmisaka
mpp_node_init(MppNode * node)330*437bfbebSnyanmisaka MPP_RET mpp_node_init(MppNode *node)
331*437bfbebSnyanmisaka {
332*437bfbebSnyanmisaka MppNodeImpl *p = mpp_calloc(MppNodeImpl, 1);
333*437bfbebSnyanmisaka if (p)
334*437bfbebSnyanmisaka sem_init(&p->sem_detach, 0, 0);
335*437bfbebSnyanmisaka
336*437bfbebSnyanmisaka *node = p;
337*437bfbebSnyanmisaka
338*437bfbebSnyanmisaka return p ? MPP_OK : MPP_NOK;
339*437bfbebSnyanmisaka }
340*437bfbebSnyanmisaka
mpp_node_deinit(MppNode node)341*437bfbebSnyanmisaka MPP_RET mpp_node_deinit(MppNode node)
342*437bfbebSnyanmisaka {
343*437bfbebSnyanmisaka MppNodeImpl *p = (MppNodeImpl *)node;
344*437bfbebSnyanmisaka
345*437bfbebSnyanmisaka if (p) {
346*437bfbebSnyanmisaka if (p->attached)
347*437bfbebSnyanmisaka mpp_node_task_detach(&p->task);
348*437bfbebSnyanmisaka
349*437bfbebSnyanmisaka mpp_assert(p->attached == 0);
350*437bfbebSnyanmisaka
351*437bfbebSnyanmisaka sem_destroy(&p->sem_detach);
352*437bfbebSnyanmisaka
353*437bfbebSnyanmisaka mpp_free(p);
354*437bfbebSnyanmisaka }
355*437bfbebSnyanmisaka
356*437bfbebSnyanmisaka return MPP_OK;
357*437bfbebSnyanmisaka }
358*437bfbebSnyanmisaka
mpp_node_set_func(MppNode node,TaskProc proc,void * param)359*437bfbebSnyanmisaka MPP_RET mpp_node_set_func(MppNode node, TaskProc proc, void *param)
360*437bfbebSnyanmisaka {
361*437bfbebSnyanmisaka MppNodeImpl *p = (MppNodeImpl *)node;
362*437bfbebSnyanmisaka if (!p)
363*437bfbebSnyanmisaka return MPP_NOK;
364*437bfbebSnyanmisaka
365*437bfbebSnyanmisaka p->work.proc = proc;
366*437bfbebSnyanmisaka p->work.param = param;
367*437bfbebSnyanmisaka
368*437bfbebSnyanmisaka return MPP_OK;
369*437bfbebSnyanmisaka }
370*437bfbebSnyanmisaka
cluster_worker_init(ClusterWorker * p,MppCluster * cluster)371*437bfbebSnyanmisaka MPP_RET cluster_worker_init(ClusterWorker *p, MppCluster *cluster)
372*437bfbebSnyanmisaka {
373*437bfbebSnyanmisaka MppThread *thd = NULL;
374*437bfbebSnyanmisaka MPP_RET ret = MPP_NOK;
375*437bfbebSnyanmisaka
376*437bfbebSnyanmisaka INIT_LIST_HEAD(&p->list_task);
377*437bfbebSnyanmisaka p->worker_id = cluster->worker_id++;
378*437bfbebSnyanmisaka
379*437bfbebSnyanmisaka p->batch_count = 1;
380*437bfbebSnyanmisaka p->work_count = 0;
381*437bfbebSnyanmisaka p->cluster = cluster;
382*437bfbebSnyanmisaka p->state = WORKER_IDLE;
383*437bfbebSnyanmisaka snprintf(p->name, sizeof(p->name) - 1, "%d:W%d", cluster->pid, p->worker_id);
384*437bfbebSnyanmisaka thd = mpp_thread_create(cluster->worker_func, p, p->name);
385*437bfbebSnyanmisaka if (thd) {
386*437bfbebSnyanmisaka p->thd = thd;
387*437bfbebSnyanmisaka mpp_thread_start(thd);
388*437bfbebSnyanmisaka ret = MPP_OK;
389*437bfbebSnyanmisaka }
390*437bfbebSnyanmisaka
391*437bfbebSnyanmisaka return ret;
392*437bfbebSnyanmisaka }
393*437bfbebSnyanmisaka
cluster_worker_deinit(ClusterWorker * p)394*437bfbebSnyanmisaka MPP_RET cluster_worker_deinit(ClusterWorker *p)
395*437bfbebSnyanmisaka {
396*437bfbebSnyanmisaka if (p->thd) {
397*437bfbebSnyanmisaka mpp_thread_stop(p->thd);
398*437bfbebSnyanmisaka mpp_thread_destroy(p->thd);
399*437bfbebSnyanmisaka p->thd = NULL;
400*437bfbebSnyanmisaka }
401*437bfbebSnyanmisaka
402*437bfbebSnyanmisaka mpp_assert(list_empty(&p->list_task));
403*437bfbebSnyanmisaka mpp_assert(p->work_count == 0);
404*437bfbebSnyanmisaka
405*437bfbebSnyanmisaka p->batch_count = 0;
406*437bfbebSnyanmisaka p->cluster = NULL;
407*437bfbebSnyanmisaka
408*437bfbebSnyanmisaka return MPP_OK;
409*437bfbebSnyanmisaka }
410*437bfbebSnyanmisaka
cluster_worker_get_task(ClusterWorker * p)411*437bfbebSnyanmisaka RK_S32 cluster_worker_get_task(ClusterWorker *p)
412*437bfbebSnyanmisaka {
413*437bfbebSnyanmisaka MppCluster *cluster = p->cluster;
414*437bfbebSnyanmisaka RK_S32 batch_count = p->batch_count;
415*437bfbebSnyanmisaka RK_S32 count = 0;
416*437bfbebSnyanmisaka RK_U32 new_st;
417*437bfbebSnyanmisaka RK_U32 old_st;
418*437bfbebSnyanmisaka bool ret;
419*437bfbebSnyanmisaka RK_S32 i;
420*437bfbebSnyanmisaka
421*437bfbebSnyanmisaka cluster_dbg_flow("%s get %d task start\n", p->name, batch_count);
422*437bfbebSnyanmisaka
423*437bfbebSnyanmisaka for (i = 0; i < MAX_PRIORITY; i++) {
424*437bfbebSnyanmisaka ClusterQueue *queue = &cluster->queue[i];
425*437bfbebSnyanmisaka MppNodeTask *task = NULL;
426*437bfbebSnyanmisaka MppNodeImpl *node = NULL;
427*437bfbebSnyanmisaka
428*437bfbebSnyanmisaka do {
429*437bfbebSnyanmisaka cluster_queue_lock(queue);
430*437bfbebSnyanmisaka
431*437bfbebSnyanmisaka if (list_empty(&queue->list)) {
432*437bfbebSnyanmisaka mpp_assert(queue->count == 0);
433*437bfbebSnyanmisaka cluster_dbg_flow("%s get P%d task ret no task\n", p->name, i);
434*437bfbebSnyanmisaka cluster_queue_unlock(queue);
435*437bfbebSnyanmisaka break;
436*437bfbebSnyanmisaka }
437*437bfbebSnyanmisaka
438*437bfbebSnyanmisaka mpp_assert(queue->count);
439*437bfbebSnyanmisaka task = list_first_entry(&queue->list, MppNodeTask, list_sched);
440*437bfbebSnyanmisaka list_del_init(&task->list_sched);
441*437bfbebSnyanmisaka node = task->node;
442*437bfbebSnyanmisaka
443*437bfbebSnyanmisaka queue->count--;
444*437bfbebSnyanmisaka
445*437bfbebSnyanmisaka do {
446*437bfbebSnyanmisaka old_st = node->state;
447*437bfbebSnyanmisaka new_st = old_st ^ (NODE_WAIT | NODE_RUN);
448*437bfbebSnyanmisaka
449*437bfbebSnyanmisaka mpp_assert(old_st & NODE_WAIT);
450*437bfbebSnyanmisaka ret = MPP_BOOL_CAS(&node->state, old_st, new_st);
451*437bfbebSnyanmisaka } while (!ret);
452*437bfbebSnyanmisaka
453*437bfbebSnyanmisaka list_add_tail(&task->list_sched, &p->list_task);
454*437bfbebSnyanmisaka p->work_count++;
455*437bfbebSnyanmisaka count++;
456*437bfbebSnyanmisaka
457*437bfbebSnyanmisaka cluster_dbg_flow("%s get P%d %s -> rq %d\n", p->name, i, node->name, p->work_count);
458*437bfbebSnyanmisaka
459*437bfbebSnyanmisaka cluster_queue_unlock(queue);
460*437bfbebSnyanmisaka
461*437bfbebSnyanmisaka if (count >= batch_count)
462*437bfbebSnyanmisaka break;
463*437bfbebSnyanmisaka } while (1);
464*437bfbebSnyanmisaka
465*437bfbebSnyanmisaka if (count >= batch_count)
466*437bfbebSnyanmisaka break;
467*437bfbebSnyanmisaka }
468*437bfbebSnyanmisaka
469*437bfbebSnyanmisaka cluster_dbg_flow("%s get %d task ret %d\n", p->name, batch_count, count);
470*437bfbebSnyanmisaka
471*437bfbebSnyanmisaka return count;
472*437bfbebSnyanmisaka }
473*437bfbebSnyanmisaka
cluster_worker_run_task(ClusterWorker * p)474*437bfbebSnyanmisaka static void cluster_worker_run_task(ClusterWorker *p)
475*437bfbebSnyanmisaka {
476*437bfbebSnyanmisaka RK_U32 new_st;
477*437bfbebSnyanmisaka RK_U32 old_st;
478*437bfbebSnyanmisaka bool cas_ret;
479*437bfbebSnyanmisaka
480*437bfbebSnyanmisaka cluster_dbg_flow("%s run %d work start\n", p->name, p->work_count);
481*437bfbebSnyanmisaka
482*437bfbebSnyanmisaka while (!list_empty(&p->list_task)) {
483*437bfbebSnyanmisaka MppNodeTask *task = list_first_entry(&p->list_task, MppNodeTask, list_sched);
484*437bfbebSnyanmisaka MppNodeProc *proc = task->proc;
485*437bfbebSnyanmisaka MppNodeImpl *node = task->node;
486*437bfbebSnyanmisaka RK_S64 time_start;
487*437bfbebSnyanmisaka RK_S64 time_end;
488*437bfbebSnyanmisaka RK_U32 state;
489*437bfbebSnyanmisaka MPP_RET proc_ret;
490*437bfbebSnyanmisaka
491*437bfbebSnyanmisaka /* check trigger for re-add task */
492*437bfbebSnyanmisaka cluster_dbg_flow("%s run %s start atate %d\n", p->name, task->node_name, node->state);
493*437bfbebSnyanmisaka mpp_assert(node->state & NODE_RUN);
494*437bfbebSnyanmisaka if (!(node->state & NODE_RUN))
495*437bfbebSnyanmisaka mpp_err_f("%s run state check %x is invalid on run", p->name, node->state);
496*437bfbebSnyanmisaka
497*437bfbebSnyanmisaka time_start = mpp_time();
498*437bfbebSnyanmisaka proc_ret = proc->proc(proc->param);
499*437bfbebSnyanmisaka time_end = mpp_time();
500*437bfbebSnyanmisaka
501*437bfbebSnyanmisaka cluster_dbg_flow("%s run %s ret %d\n", p->name, task->node_name, proc_ret);
502*437bfbebSnyanmisaka proc->run_time += time_end - time_start;
503*437bfbebSnyanmisaka proc->run_count++;
504*437bfbebSnyanmisaka
505*437bfbebSnyanmisaka state = node->state;
506*437bfbebSnyanmisaka if (!(state & NODE_VALID)) {
507*437bfbebSnyanmisaka cluster_dbg_flow("%s run found destroy\n", p->name);
508*437bfbebSnyanmisaka list_del_init(&task->list_sched);
509*437bfbebSnyanmisaka node->attached = 0;
510*437bfbebSnyanmisaka
511*437bfbebSnyanmisaka sem_post(&node->sem_detach);
512*437bfbebSnyanmisaka cluster_dbg_flow("%s run sem post done\n", p->name);
513*437bfbebSnyanmisaka } else if (state & NODE_SIGNAL) {
514*437bfbebSnyanmisaka ClusterQueue *queue = task->queue;
515*437bfbebSnyanmisaka
516*437bfbebSnyanmisaka list_del_init(&task->list_sched);
517*437bfbebSnyanmisaka
518*437bfbebSnyanmisaka do {
519*437bfbebSnyanmisaka old_st = state;
520*437bfbebSnyanmisaka // NOTE: clear NODE_RUN and NODE_SIGNAL, set NODE_WAIT
521*437bfbebSnyanmisaka new_st = old_st ^ (NODE_SIGNAL | NODE_WAIT | NODE_RUN);
522*437bfbebSnyanmisaka cas_ret = MPP_BOOL_CAS(&node->state, old_st, new_st);
523*437bfbebSnyanmisaka } while (!cas_ret);
524*437bfbebSnyanmisaka
525*437bfbebSnyanmisaka cluster_dbg_flow("%s run state %x -> %x signal -> wait\n", p->name, old_st, new_st);
526*437bfbebSnyanmisaka
527*437bfbebSnyanmisaka cluster_queue_lock(queue);
528*437bfbebSnyanmisaka list_add_tail(&task->list_sched, &queue->list);
529*437bfbebSnyanmisaka queue->count++;
530*437bfbebSnyanmisaka cluster_queue_unlock(queue);
531*437bfbebSnyanmisaka } else {
532*437bfbebSnyanmisaka list_del_init(&task->list_sched);
533*437bfbebSnyanmisaka do {
534*437bfbebSnyanmisaka old_st = node->state;
535*437bfbebSnyanmisaka new_st = old_st ^ (NODE_IDLE | NODE_RUN);
536*437bfbebSnyanmisaka
537*437bfbebSnyanmisaka cas_ret = MPP_BOOL_CAS(&node->state, old_st, new_st);
538*437bfbebSnyanmisaka } while (!cas_ret);
539*437bfbebSnyanmisaka mpp_assert(node->state & NODE_IDLE);
540*437bfbebSnyanmisaka mpp_assert(!(node->state & NODE_RUN));
541*437bfbebSnyanmisaka
542*437bfbebSnyanmisaka cluster_dbg_flow("%s run state %x -> %x run -> idle\n", p->name, old_st, new_st);
543*437bfbebSnyanmisaka }
544*437bfbebSnyanmisaka
545*437bfbebSnyanmisaka p->work_count--;
546*437bfbebSnyanmisaka }
547*437bfbebSnyanmisaka
548*437bfbebSnyanmisaka mpp_assert(p->work_count == 0);
549*437bfbebSnyanmisaka
550*437bfbebSnyanmisaka cluster_dbg_flow("%s run all done\n", p->name);
551*437bfbebSnyanmisaka }
552*437bfbebSnyanmisaka
cluster_worker(void * data)553*437bfbebSnyanmisaka static void *cluster_worker(void *data)
554*437bfbebSnyanmisaka {
555*437bfbebSnyanmisaka ClusterWorker *p = (ClusterWorker *)data;
556*437bfbebSnyanmisaka MppThread *thd = p->thd;
557*437bfbebSnyanmisaka
558*437bfbebSnyanmisaka while (1) {
559*437bfbebSnyanmisaka {
560*437bfbebSnyanmisaka RK_S32 task_count = 0;
561*437bfbebSnyanmisaka
562*437bfbebSnyanmisaka cluster_dbg_lock("%s lock start\n", p->name);
563*437bfbebSnyanmisaka mpp_thread_lock(thd, THREAD_WORK);
564*437bfbebSnyanmisaka cluster_dbg_lock("%s lock done\n", p->name);
565*437bfbebSnyanmisaka
566*437bfbebSnyanmisaka if (MPP_THREAD_RUNNING != mpp_thread_get_status(thd, THREAD_WORK)) {
567*437bfbebSnyanmisaka mpp_thread_unlock(thd, THREAD_WORK);
568*437bfbebSnyanmisaka break;
569*437bfbebSnyanmisaka }
570*437bfbebSnyanmisaka
571*437bfbebSnyanmisaka task_count = cluster_worker_get_task(p);
572*437bfbebSnyanmisaka if (!task_count) {
573*437bfbebSnyanmisaka p->state = WORKER_IDLE;
574*437bfbebSnyanmisaka mpp_thread_wait(thd, THREAD_WORK);
575*437bfbebSnyanmisaka p->state = WORKER_RUNNING;
576*437bfbebSnyanmisaka }
577*437bfbebSnyanmisaka mpp_thread_unlock(thd, THREAD_WORK);
578*437bfbebSnyanmisaka }
579*437bfbebSnyanmisaka
580*437bfbebSnyanmisaka cluster_worker_run_task(p);
581*437bfbebSnyanmisaka }
582*437bfbebSnyanmisaka
583*437bfbebSnyanmisaka return NULL;
584*437bfbebSnyanmisaka }
585*437bfbebSnyanmisaka
cluster_signal_f(const char * caller,MppCluster * p)586*437bfbebSnyanmisaka void cluster_signal_f(const char *caller, MppCluster *p)
587*437bfbebSnyanmisaka {
588*437bfbebSnyanmisaka RK_S32 i;
589*437bfbebSnyanmisaka
590*437bfbebSnyanmisaka cluster_dbg_flow("%s signal from %s\n", p->name, caller);
591*437bfbebSnyanmisaka
592*437bfbebSnyanmisaka for (i = 0; i < p->worker_count; i++) {
593*437bfbebSnyanmisaka ClusterWorker *worker = &p->worker[i];
594*437bfbebSnyanmisaka MppThread *thd = worker->thd;
595*437bfbebSnyanmisaka
596*437bfbebSnyanmisaka mpp_thread_lock(thd, THREAD_WORK);
597*437bfbebSnyanmisaka
598*437bfbebSnyanmisaka if (worker->state == WORKER_IDLE) {
599*437bfbebSnyanmisaka mpp_thread_signal(thd, THREAD_WORK);
600*437bfbebSnyanmisaka cluster_dbg_flow("%s signal\n", p->name);
601*437bfbebSnyanmisaka mpp_thread_unlock(thd, THREAD_WORK);
602*437bfbebSnyanmisaka break;
603*437bfbebSnyanmisaka }
604*437bfbebSnyanmisaka
605*437bfbebSnyanmisaka mpp_thread_unlock(thd, THREAD_WORK);
606*437bfbebSnyanmisaka }
607*437bfbebSnyanmisaka }
608*437bfbebSnyanmisaka
mpp_cluster_srv_init(void)609*437bfbebSnyanmisaka static void mpp_cluster_srv_init(void)
610*437bfbebSnyanmisaka {
611*437bfbebSnyanmisaka MppClusterServer *srv = srv_cluster;
612*437bfbebSnyanmisaka
613*437bfbebSnyanmisaka mpp_env_get_u32("mpp_cluster_debug", &mpp_cluster_debug, 0);
614*437bfbebSnyanmisaka mpp_env_get_u32("mpp_cluster_thd_cnt", &mpp_cluster_thd_cnt, 1);
615*437bfbebSnyanmisaka
616*437bfbebSnyanmisaka if (srv)
617*437bfbebSnyanmisaka return;
618*437bfbebSnyanmisaka
619*437bfbebSnyanmisaka srv = mpp_calloc(MppClusterServer, 1);
620*437bfbebSnyanmisaka if (!srv) {
621*437bfbebSnyanmisaka mpp_err_f("failed to allocate cluster server\n");
622*437bfbebSnyanmisaka return;
623*437bfbebSnyanmisaka }
624*437bfbebSnyanmisaka
625*437bfbebSnyanmisaka memset(srv->clusters, 0, sizeof(srv->clusters));
626*437bfbebSnyanmisaka mpp_mutex_init(&srv->mutex);
627*437bfbebSnyanmisaka
628*437bfbebSnyanmisaka srv_cluster = srv;
629*437bfbebSnyanmisaka }
630*437bfbebSnyanmisaka
mpp_cluster_srv_deinit(void)631*437bfbebSnyanmisaka static void mpp_cluster_srv_deinit(void)
632*437bfbebSnyanmisaka {
633*437bfbebSnyanmisaka MppClusterServer *srv = srv_cluster;
634*437bfbebSnyanmisaka RK_S32 i;
635*437bfbebSnyanmisaka
636*437bfbebSnyanmisaka if (!srv)
637*437bfbebSnyanmisaka return;
638*437bfbebSnyanmisaka
639*437bfbebSnyanmisaka for (i = 0; i < VPU_CLIENT_BUTT; i++)
640*437bfbebSnyanmisaka cluster_server_put((MppClientType)i);
641*437bfbebSnyanmisaka
642*437bfbebSnyanmisaka mpp_mutex_destroy(&srv->mutex);
643*437bfbebSnyanmisaka MPP_FREE(srv_cluster);
644*437bfbebSnyanmisaka }
645*437bfbebSnyanmisaka
MPP_SINGLETON(MPP_SGLN_CLUSTER,mpp_cluster,mpp_cluster_srv_init,mpp_cluster_srv_deinit)646*437bfbebSnyanmisaka MPP_SINGLETON(MPP_SGLN_CLUSTER, mpp_cluster, mpp_cluster_srv_init, mpp_cluster_srv_deinit)
647*437bfbebSnyanmisaka
648*437bfbebSnyanmisaka static MppCluster *cluster_server_get(MppClientType client_type)
649*437bfbebSnyanmisaka {
650*437bfbebSnyanmisaka MppClusterServer *srv = get_srv_cluster_f();
651*437bfbebSnyanmisaka RK_S32 i;
652*437bfbebSnyanmisaka MppCluster *p = NULL;
653*437bfbebSnyanmisaka
654*437bfbebSnyanmisaka if (!srv || client_type >= VPU_CLIENT_BUTT)
655*437bfbebSnyanmisaka goto done;
656*437bfbebSnyanmisaka
657*437bfbebSnyanmisaka {
658*437bfbebSnyanmisaka mpp_mutex_lock(&srv->mutex);
659*437bfbebSnyanmisaka
660*437bfbebSnyanmisaka p = srv->clusters[client_type];
661*437bfbebSnyanmisaka if (p) {
662*437bfbebSnyanmisaka mpp_mutex_unlock(&srv->mutex);
663*437bfbebSnyanmisaka goto done;
664*437bfbebSnyanmisaka }
665*437bfbebSnyanmisaka
666*437bfbebSnyanmisaka p = mpp_malloc(MppCluster, 1);
667*437bfbebSnyanmisaka if (p) {
668*437bfbebSnyanmisaka for (i = 0; i < MAX_PRIORITY; i++)
669*437bfbebSnyanmisaka mpp_cluster_queue_init(&p->queue[i], p);
670*437bfbebSnyanmisaka
671*437bfbebSnyanmisaka p->pid = getpid();
672*437bfbebSnyanmisaka p->client_type = client_type;
673*437bfbebSnyanmisaka snprintf(p->name, sizeof(p->name) - 1, "%d:%d", p->pid, client_type);
674*437bfbebSnyanmisaka p->node_id = 0;
675*437bfbebSnyanmisaka p->worker_id = 0;
676*437bfbebSnyanmisaka p->worker_func = cluster_worker;
677*437bfbebSnyanmisaka p->worker_count = mpp_cluster_thd_cnt;
678*437bfbebSnyanmisaka
679*437bfbebSnyanmisaka mpp_assert(p->worker_count > 0);
680*437bfbebSnyanmisaka
681*437bfbebSnyanmisaka p->worker = mpp_malloc(ClusterWorker, p->worker_count);
682*437bfbebSnyanmisaka
683*437bfbebSnyanmisaka for (i = 0; i < p->worker_count; i++)
684*437bfbebSnyanmisaka cluster_worker_init(&p->worker[i], p);
685*437bfbebSnyanmisaka
686*437bfbebSnyanmisaka srv->clusters[client_type] = p;
687*437bfbebSnyanmisaka cluster_dbg_flow("%s created\n", p->name);
688*437bfbebSnyanmisaka }
689*437bfbebSnyanmisaka
690*437bfbebSnyanmisaka mpp_mutex_unlock(&srv->mutex);
691*437bfbebSnyanmisaka }
692*437bfbebSnyanmisaka
693*437bfbebSnyanmisaka done:
694*437bfbebSnyanmisaka if (p)
695*437bfbebSnyanmisaka cluster_dbg_flow("%s get\n", p->name);
696*437bfbebSnyanmisaka else
697*437bfbebSnyanmisaka cluster_dbg_flow("%d get cluster %d failed\n", getpid(), client_type);
698*437bfbebSnyanmisaka
699*437bfbebSnyanmisaka return p;
700*437bfbebSnyanmisaka }
701*437bfbebSnyanmisaka
cluster_server_put(MppClientType client_type)702*437bfbebSnyanmisaka static MPP_RET cluster_server_put(MppClientType client_type)
703*437bfbebSnyanmisaka {
704*437bfbebSnyanmisaka MppClusterServer *srv = get_srv_cluster_f();
705*437bfbebSnyanmisaka MppCluster *p;
706*437bfbebSnyanmisaka RK_S32 i;
707*437bfbebSnyanmisaka
708*437bfbebSnyanmisaka if (!srv || client_type >= VPU_CLIENT_BUTT)
709*437bfbebSnyanmisaka return MPP_NOK;
710*437bfbebSnyanmisaka
711*437bfbebSnyanmisaka mpp_mutex_lock(&srv->mutex);
712*437bfbebSnyanmisaka
713*437bfbebSnyanmisaka p = srv->clusters[client_type];
714*437bfbebSnyanmisaka if (!p) {
715*437bfbebSnyanmisaka mpp_mutex_unlock(&srv->mutex);
716*437bfbebSnyanmisaka return MPP_NOK;
717*437bfbebSnyanmisaka }
718*437bfbebSnyanmisaka
719*437bfbebSnyanmisaka for (i = 0; i < p->worker_count; i++)
720*437bfbebSnyanmisaka cluster_worker_deinit(&p->worker[i]);
721*437bfbebSnyanmisaka
722*437bfbebSnyanmisaka for (i = 0; i < MAX_PRIORITY; i++)
723*437bfbebSnyanmisaka mpp_cluster_queue_deinit(&p->queue[i]);
724*437bfbebSnyanmisaka
725*437bfbebSnyanmisaka cluster_dbg_flow("put %s\n", p->name);
726*437bfbebSnyanmisaka
727*437bfbebSnyanmisaka MPP_FREE(p->worker);
728*437bfbebSnyanmisaka mpp_free(p);
729*437bfbebSnyanmisaka srv->clusters[client_type] = NULL;
730*437bfbebSnyanmisaka
731*437bfbebSnyanmisaka mpp_mutex_unlock(&srv->mutex);
732*437bfbebSnyanmisaka
733*437bfbebSnyanmisaka return MPP_OK;
734*437bfbebSnyanmisaka }
735*437bfbebSnyanmisaka
mpp_node_attach(MppNode node,MppClientType type)736*437bfbebSnyanmisaka MPP_RET mpp_node_attach(MppNode node, MppClientType type)
737*437bfbebSnyanmisaka {
738*437bfbebSnyanmisaka MppNodeImpl *impl = (MppNodeImpl *)node;
739*437bfbebSnyanmisaka MppCluster *p = cluster_server_get(type);
740*437bfbebSnyanmisaka RK_U32 priority = impl->priority;
741*437bfbebSnyanmisaka ClusterQueue *queue = &p->queue[priority];
742*437bfbebSnyanmisaka
743*437bfbebSnyanmisaka mpp_assert(priority < MAX_PRIORITY);
744*437bfbebSnyanmisaka mpp_assert(p);
745*437bfbebSnyanmisaka
746*437bfbebSnyanmisaka impl->node_id = MPP_FETCH_ADD(&p->node_id, 1);
747*437bfbebSnyanmisaka
748*437bfbebSnyanmisaka snprintf(impl->name, sizeof(impl->name) - 1, "%s:%d", p->name, impl->node_id);
749*437bfbebSnyanmisaka
750*437bfbebSnyanmisaka mpp_node_task_attach(&impl->task, impl, queue, &impl->work);
751*437bfbebSnyanmisaka
752*437bfbebSnyanmisaka MPP_FETCH_ADD(&p->node_count, 1);
753*437bfbebSnyanmisaka
754*437bfbebSnyanmisaka cluster_dbg_flow("%s:%d attached %d\n", p->name, impl->node_id, p->node_count);
755*437bfbebSnyanmisaka
756*437bfbebSnyanmisaka /* attach and run once first */
757*437bfbebSnyanmisaka mpp_node_task_schedule(&impl->task);
758*437bfbebSnyanmisaka cluster_dbg_flow("%s trigger signal from %s\n", impl->name, __FUNCTION__);
759*437bfbebSnyanmisaka
760*437bfbebSnyanmisaka return MPP_OK;
761*437bfbebSnyanmisaka }
762*437bfbebSnyanmisaka
mpp_node_detach(MppNode node)763*437bfbebSnyanmisaka MPP_RET mpp_node_detach(MppNode node)
764*437bfbebSnyanmisaka {
765*437bfbebSnyanmisaka MppNodeImpl *impl = (MppNodeImpl *)node;
766*437bfbebSnyanmisaka
767*437bfbebSnyanmisaka mpp_node_task_detach(&impl->task);
768*437bfbebSnyanmisaka cluster_dbg_flow("%s detached\n", impl->name);
769*437bfbebSnyanmisaka
770*437bfbebSnyanmisaka return MPP_OK;
771*437bfbebSnyanmisaka }
772*437bfbebSnyanmisaka
mpp_node_trigger_f(const char * caller,MppNode node,RK_S32 trigger)773*437bfbebSnyanmisaka MPP_RET mpp_node_trigger_f(const char *caller, MppNode node, RK_S32 trigger)
774*437bfbebSnyanmisaka {
775*437bfbebSnyanmisaka if (trigger) {
776*437bfbebSnyanmisaka MppNodeImpl *impl = (MppNodeImpl *)node;
777*437bfbebSnyanmisaka
778*437bfbebSnyanmisaka mpp_node_task_schedule_from(caller, &impl->task);
779*437bfbebSnyanmisaka }
780*437bfbebSnyanmisaka
781*437bfbebSnyanmisaka return MPP_OK;
782*437bfbebSnyanmisaka }
783