xref: /OK3568_Linux_fs/external/mpp/mpp/base/mpp_cluster.cpp (revision 4882a59341e53eb6f0b4789bf948001014eff981)
1 /*
2  * Copyright 2021 Rockchip Electronics Co. LTD
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define  MODULE_TAG "mpp_cluster"
18 
19 #include <string.h>
20 
21 #include "mpp_mem.h"
22 #include "mpp_env.h"
23 #include "mpp_lock.h"
24 #include "mpp_time.h"
25 #include "mpp_debug.h"
26 #include "mpp_common.h"
27 
28 #include "mpp_cluster.h"
29 #include "mpp_dev_defs.h"
30 
31 #define MPP_CLUSTER_DBG_FLOW            (0x00000001)
32 #define MPP_CLUSTER_DBG_LOCK            (0x00000002)
33 
34 #define cluster_dbg(flag, fmt, ...)     _mpp_dbg(mpp_cluster_debug, flag, fmt, ## __VA_ARGS__)
35 #define cluster_dbg_f(flag, fmt, ...)   _mpp_dbg_f(mpp_cluster_debug, flag, fmt, ## __VA_ARGS__)
36 
37 #define cluster_dbg_flow(fmt, ...)      cluster_dbg(MPP_CLUSTER_DBG_FLOW, fmt, ## __VA_ARGS__)
38 #define cluster_dbg_lock(fmt, ...)      cluster_dbg(MPP_CLUSTER_DBG_LOCK, fmt, ## __VA_ARGS__)
39 
40 RK_U32 mpp_cluster_debug = 0;
41 RK_U32 mpp_cluster_thd_cnt = 1;
42 
43 typedef struct MppNodeProc_s    MppNodeProc;
44 typedef struct MppNodeTask_s    MppNodeTask;
45 typedef struct MppNodeImpl_s    MppNodeImpl;
46 
47 typedef struct ClusterQueue_s   ClusterQueue;
48 typedef struct ClusterWorker_s  ClusterWorker;
49 typedef struct MppCluster_s     MppCluster;
50 
51 #define NODE_VALID              (0x00000001)
52 #define NODE_IDLE               (0x00000002)
53 #define NODE_SIGNAL             (0x00000004)
54 #define NODE_WAIT               (0x00000008)
55 #define NODE_RUN                (0x00000010)
56 
57 #define NODE_ACT_NONE           (0x00000000)
58 #define NODE_ACT_IDLE_TO_WAIT   (0x00000001)
59 #define NODE_ACT_RUN_TO_SIGNAL  (0x00000002)
60 
61 typedef enum MppWorkerState_e {
62     WORKER_IDLE,
63     WORKER_RUNNING,
64 
65     WORKER_STATE_BUTT,
66 } MppWorkerState;
67 
68 struct MppNodeProc_s {
69     TaskProc                proc;
70     void                    *param;
71 
72     /* timing statistic */
73     RK_U32                  run_count;
74     RK_S64                  run_time;
75 };
76 
77 struct MppNodeTask_s {
78     struct list_head        list_sched;
79     MppNodeImpl             *node;
80     const char              *node_name;
81 
82     /* lock ptr to cluster queue lock */
83     ClusterQueue            *queue;
84 
85     MppNodeProc             *proc;
86 };
87 
88 /* MppNode will be embeded in MppCtx */
89 struct MppNodeImpl_s {
90     char                    name[32];
91     /* list linked to scheduler */
92     RK_S32                  node_id;
93     RK_U32                  state;
94 
95     MppNodeProc             work;
96 
97     RK_U32                  priority;
98     RK_S32                  attached;
99     sem_t                   sem_detach;
100 
101     /* for cluster schedule */
102     MppNodeTask             task;
103 };
104 
105 struct ClusterQueue_s {
106     MppCluster              *cluster;
107 
108     pthread_mutex_t         lock;
109     struct list_head        list;
110     RK_S32                  count;
111 };
112 
113 struct ClusterWorker_s {
114     char                    name[32];
115     MppCluster              *cluster;
116     RK_S32                  worker_id;
117 
118     MppThread               *thd;
119     MppWorkerState          state;
120 
121     RK_S32                  batch_count;
122     RK_S32                  work_count;
123     struct list_head        list_task;
124 };
125 
126 struct MppCluster_s {
127     char                    name[16];
128     pid_t                   pid;
129     RK_S32                  client_type;
130     RK_S32                  node_id;
131     RK_S32                  worker_id;
132 
133     ClusterQueue            queue[MAX_PRIORITY];
134     RK_S32                  node_count;
135 
136     /* multi-worker info */
137     RK_S32                  worker_count;
138     ClusterWorker           *worker;
139     MppThreadFunc           worker_func;
140 };
141 
142 #define mpp_node_task_schedule(task) \
143     mpp_node_task_schedule_f(__FUNCTION__, task)
144 
145 #define mpp_node_task_schedule_from(caller, task) \
146     mpp_node_task_schedule_f(caller, task)
147 
148 #define cluster_queue_lock(queue)   cluster_queue_lock_f(__FUNCTION__, queue)
149 #define cluster_queue_unlock(queue) cluster_queue_unlock_f(__FUNCTION__, queue)
150 
cluster_queue_lock_f(const char * caller,ClusterQueue * queue)151 static MPP_RET cluster_queue_lock_f(const char *caller, ClusterQueue *queue)
152 {
153     MppCluster *cluster = queue->cluster;
154     RK_S32 ret;
155 
156     cluster_dbg_lock("%s lock queue at %s start\n", cluster->name, caller);
157 
158     ret = pthread_mutex_lock(&queue->lock);
159 
160     cluster_dbg_lock("%s lock queue at %s ret %d \n", cluster->name, caller, ret);
161 
162     return (ret) ? MPP_NOK : MPP_OK;
163 }
164 
cluster_queue_unlock_f(const char * caller,ClusterQueue * queue)165 static MPP_RET cluster_queue_unlock_f(const char *caller, ClusterQueue *queue)
166 {
167     MppCluster *cluster = queue->cluster;
168     RK_S32 ret;
169 
170     cluster_dbg_lock("%s unlock queue at %s start\n", cluster->name, caller);
171 
172     ret = pthread_mutex_unlock(&queue->lock);
173 
174     cluster_dbg_lock("%s unlock queue at %s ret %d \n", cluster->name, caller, ret);
175 
176     return (ret) ? MPP_NOK : MPP_OK;
177 }
178 
179 void cluster_signal_f(const char *caller, MppCluster *p);
180 
mpp_cluster_queue_init(ClusterQueue * queue,MppCluster * cluster)181 MPP_RET mpp_cluster_queue_init(ClusterQueue *queue, MppCluster *cluster)
182 {
183     pthread_mutexattr_t attr;
184 
185     pthread_mutexattr_init(&attr);
186     pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
187     pthread_mutex_init(&queue->lock, &attr);
188     pthread_mutexattr_destroy(&attr);
189 
190     queue->cluster = cluster;
191     INIT_LIST_HEAD(&queue->list);
192     queue->count = 0;
193 
194     return MPP_OK;
195 }
196 
mpp_cluster_queue_deinit(ClusterQueue * queue)197 MPP_RET mpp_cluster_queue_deinit(ClusterQueue *queue)
198 {
199     mpp_assert(!queue->count);
200     mpp_assert(list_empty(&queue->list));
201 
202     pthread_mutex_destroy(&queue->lock);
203 
204     return MPP_OK;
205 }
206 
mpp_node_task_attach(MppNodeTask * task,MppNodeImpl * node,ClusterQueue * queue,MppNodeProc * proc)207 MPP_RET mpp_node_task_attach(MppNodeTask *task, MppNodeImpl *node,
208                              ClusterQueue *queue, MppNodeProc *proc)
209 {
210     INIT_LIST_HEAD(&task->list_sched);
211 
212     task->node = node;
213     task->node_name = node->name;
214 
215     task->queue = queue;
216     task->proc = proc;
217 
218     node->state = NODE_VALID | NODE_IDLE;
219     node->attached = 1;
220 
221     return MPP_OK;
222 }
223 
mpp_node_task_schedule_f(const char * caller,MppNodeTask * task)224 MPP_RET mpp_node_task_schedule_f(const char *caller, MppNodeTask *task)
225 {
226     ClusterQueue *queue = task->queue;
227     MppCluster *cluster = queue->cluster;
228     MppNodeImpl *node = task->node;
229     MppNodeProc *proc = task->proc;
230     const char *node_name = task->node_name;
231     RK_U32 new_st;
232     RK_U32 action = NODE_ACT_NONE;
233     bool ret = false;
234 
235     cluster_dbg_flow("%s sched from %s before [%d:%d] queue %d\n",
236                      node_name, caller, node->state, proc->run_count, queue->count);
237 
238     do {
239         RK_U32 old_st = node->state;
240 
241         action = NODE_ACT_NONE;
242         new_st = 0;
243 
244         if (old_st & NODE_WAIT) {
245             cluster_dbg_flow("%s sched task %x stay  wait\n", node_name, old_st);
246             break;
247         }
248 
249         if (old_st & NODE_IDLE) {
250             new_st = old_st ^ (NODE_IDLE | NODE_WAIT);
251             cluster_dbg_flow("%s sched task %x -> %x wait\n", node_name, old_st, new_st);
252             action = NODE_ACT_IDLE_TO_WAIT;
253         } else if (old_st & NODE_RUN) {
254             new_st = old_st | NODE_SIGNAL;
255             action = NODE_ACT_RUN_TO_SIGNAL;
256             cluster_dbg_flow("%s sched task %x -> %x signal\n", node_name, old_st, new_st);
257         } else {
258             cluster_dbg_flow("%s sched task %x unknow state %x\n", node_name, old_st);
259         }
260 
261         ret = MPP_BOOL_CAS(&node->state, old_st, new_st);
262         cluster_dbg_flow("%s sched task %x -> %x cas ret %d act %d\n",
263                          node_name, old_st, new_st, ret, action);
264     } while (!ret);
265 
266     switch (action) {
267     case NODE_ACT_IDLE_TO_WAIT : {
268         cluster_queue_lock(queue);
269         mpp_assert(list_empty(&task->list_sched));
270         list_add_tail(&task->list_sched, &queue->list);
271         queue->count++;
272         cluster_dbg_flow("%s sched task -> wq %s:%d\n", node_name, cluster->name, queue->count);
273         cluster_queue_unlock(queue);
274 
275         cluster_dbg_flow("%s sched signal from %s\n", node_name, caller);
276         cluster_signal_f(caller, cluster);
277     } break;
278     case NODE_ACT_RUN_TO_SIGNAL : {
279         cluster_dbg_flow("%s sched signal from %s\n", node_name, caller);
280         cluster_signal_f(caller, cluster);
281     } break;
282     }
283 
284     cluster_dbg_flow("%s sched from %s after  [%d:%d] queue %d\n",
285                      node_name, caller, node->state, proc->run_count, queue->count);
286 
287     return MPP_OK;
288 }
289 
mpp_node_task_detach(MppNodeTask * task)290 MPP_RET mpp_node_task_detach(MppNodeTask *task)
291 {
292     MppNodeImpl *node = task->node;
293     MPP_RET ret = MPP_OK;
294 
295     if (node->attached) {
296         const char *node_name = task->node_name;
297         MppNodeProc *proc = task->proc;
298 
299         MPP_FETCH_AND(&node->state, ~NODE_VALID);
300 
301         mpp_node_task_schedule(task);
302 
303         cluster_dbg_flow("%s state %x:%d wait detach start\n",
304                          node_name, node->state, proc->run_count);
305 
306         sem_wait(&node->sem_detach);
307         mpp_assert(node->attached == 0);
308 
309         cluster_dbg_flow("%s state %x:%d wait detach done\n",
310                          node_name, node->state, proc->run_count);
311     }
312 
313     return ret;
314 }
315 
mpp_node_init(MppNode * node)316 MPP_RET mpp_node_init(MppNode *node)
317 {
318     MppNodeImpl *p = mpp_calloc(MppNodeImpl, 1);
319     if (p)
320         sem_init(&p->sem_detach, 0, 0);
321 
322     *node = p;
323 
324     return p ? MPP_OK : MPP_NOK;
325 }
326 
mpp_node_deinit(MppNode node)327 MPP_RET mpp_node_deinit(MppNode node)
328 {
329     MppNodeImpl *p = (MppNodeImpl *)node;
330 
331     if (p) {
332         if (p->attached)
333             mpp_node_task_detach(&p->task);
334 
335         mpp_assert(p->attached == 0);
336 
337         sem_destroy(&p->sem_detach);
338 
339         mpp_free(p);
340     }
341 
342     return MPP_OK;
343 }
344 
mpp_node_set_func(MppNode node,TaskProc proc,void * param)345 MPP_RET mpp_node_set_func(MppNode node, TaskProc proc, void *param)
346 {
347     MppNodeImpl *p = (MppNodeImpl *)node;
348     if (!p)
349         return MPP_NOK;
350 
351     p->work.proc = proc;
352     p->work.param = param;
353 
354     return MPP_OK;
355 }
356 
cluster_worker_init(ClusterWorker * p,MppCluster * cluster)357 MPP_RET cluster_worker_init(ClusterWorker *p, MppCluster *cluster)
358 {
359     MppThread *thd = NULL;
360     MPP_RET ret = MPP_NOK;
361 
362     INIT_LIST_HEAD(&p->list_task);
363     p->worker_id = cluster->worker_id++;
364 
365     p->batch_count = 1;
366     p->work_count = 0;
367     p->cluster = cluster;
368     p->state = WORKER_IDLE;
369     snprintf(p->name, sizeof(p->name) - 1, "%d:W%d", cluster->pid, p->worker_id);
370     thd = new MppThread(cluster->worker_func, p, p->name);
371     if (thd) {
372         p->thd = thd;
373         thd->start();
374         ret = MPP_OK;
375     }
376 
377     return ret;
378 }
379 
cluster_worker_deinit(ClusterWorker * p)380 MPP_RET cluster_worker_deinit(ClusterWorker *p)
381 {
382     if (p->thd) {
383         p->thd->stop();
384         delete p->thd;
385         p->thd = NULL;
386     }
387 
388     mpp_assert(list_empty(&p->list_task));
389     mpp_assert(p->work_count == 0);
390 
391     p->batch_count = 0;
392     p->cluster = NULL;
393 
394     return MPP_OK;
395 }
396 
cluster_worker_get_task(ClusterWorker * p)397 RK_S32 cluster_worker_get_task(ClusterWorker *p)
398 {
399     MppCluster *cluster = p->cluster;
400     RK_S32 batch_count = p->batch_count;
401     RK_S32 count = 0;
402     RK_U32 new_st;
403     RK_U32 old_st;
404     bool ret;
405     RK_S32 i;
406 
407     cluster_dbg_flow("%s get %d task start\n", p->name, batch_count);
408 
409     for (i = 0; i < MAX_PRIORITY; i++) {
410         ClusterQueue *queue = &cluster->queue[i];
411         MppNodeTask *task = NULL;
412         MppNodeImpl *node = NULL;
413 
414         do {
415             cluster_queue_lock(queue);
416 
417             if (list_empty(&queue->list)) {
418                 mpp_assert(queue->count == 0);
419                 cluster_dbg_flow("%s get P%d task ret no task\n", p->name, i);
420                 cluster_queue_unlock(queue);
421                 break;
422             }
423 
424             mpp_assert(queue->count);
425             task = list_first_entry(&queue->list, MppNodeTask, list_sched);
426             list_del_init(&task->list_sched);
427             node = task->node;
428 
429             queue->count--;
430 
431             do {
432                 old_st = node->state;
433                 new_st = old_st ^ (NODE_WAIT | NODE_RUN);
434 
435                 mpp_assert(old_st & NODE_WAIT);
436                 ret = MPP_BOOL_CAS(&node->state, old_st, new_st);
437             } while (!ret);
438 
439             list_add_tail(&task->list_sched, &p->list_task);
440             p->work_count++;
441             count++;
442 
443             cluster_dbg_flow("%s get P%d %s -> rq %d\n", p->name, i, node->name, p->work_count);
444 
445             cluster_queue_unlock(queue);
446 
447             if (count >= batch_count)
448                 break;
449         } while (1);
450 
451         if (count >= batch_count)
452             break;
453     }
454 
455     cluster_dbg_flow("%s get %d task ret %d\n", p->name, batch_count, count);
456 
457     return count;
458 }
459 
cluster_worker_run_task(ClusterWorker * p)460 static void cluster_worker_run_task(ClusterWorker *p)
461 {
462     RK_U32 new_st;
463     RK_U32 old_st;
464     bool cas_ret;
465 
466     cluster_dbg_flow("%s run %d work start\n", p->name, p->work_count);
467 
468     while (!list_empty(&p->list_task)) {
469         MppNodeTask *task = list_first_entry(&p->list_task, MppNodeTask, list_sched);
470         MppNodeProc *proc = task->proc;
471         MppNodeImpl *node = task->node;
472         RK_S64 time_start;
473         RK_S64 time_end;
474         RK_U32 state;
475         MPP_RET proc_ret;
476 
477         /* check trigger for re-add task */
478         cluster_dbg_flow("%s run %s start atate %d\n", p->name, task->node_name, node->state);
479         mpp_assert(node->state & NODE_RUN);
480         if (!(node->state & NODE_RUN))
481             mpp_err_f("%s run state check %x is invalid on run", p->name, node->state);
482 
483         time_start = mpp_time();
484         proc_ret = proc->proc(proc->param);
485         time_end = mpp_time();
486 
487         cluster_dbg_flow("%s run %s ret %d\n", p->name, task->node_name, proc_ret);
488         proc->run_time += time_end - time_start;
489         proc->run_count++;
490 
491         state = node->state;
492         if (!(state & NODE_VALID)) {
493             cluster_dbg_flow("%s run found destroy\n", p->name);
494             list_del_init(&task->list_sched);
495             node->attached = 0;
496 
497             sem_post(&node->sem_detach);
498             cluster_dbg_flow("%s run sem post done\n", p->name);
499         } else if (state & NODE_SIGNAL) {
500             ClusterQueue *queue = task->queue;
501 
502             list_del_init(&task->list_sched);
503 
504             do {
505                 old_st = state;
506                 // NOTE: clear NODE_RUN and NODE_SIGNAL, set NODE_WAIT
507                 new_st = old_st ^ (NODE_SIGNAL | NODE_WAIT | NODE_RUN);
508                 cas_ret = MPP_BOOL_CAS(&node->state, old_st, new_st);
509             } while (!cas_ret);
510 
511             cluster_dbg_flow("%s run state %x -> %x signal -> wait\n", p->name, old_st, new_st);
512 
513             cluster_queue_lock(queue);
514             list_add_tail(&task->list_sched, &queue->list);
515             queue->count++;
516             cluster_queue_unlock(queue);
517         } else {
518             list_del_init(&task->list_sched);
519             do {
520                 old_st = node->state;
521                 new_st = old_st ^ (NODE_IDLE | NODE_RUN);
522 
523                 cas_ret = MPP_BOOL_CAS(&node->state, old_st, new_st);
524             } while (!cas_ret);
525             mpp_assert(node->state & NODE_IDLE);
526             mpp_assert(!(node->state & NODE_RUN));
527 
528             cluster_dbg_flow("%s run state %x -> %x run -> idle\n", p->name, old_st, new_st);
529         }
530 
531         p->work_count--;
532     }
533 
534     mpp_assert(p->work_count == 0);
535 
536     cluster_dbg_flow("%s run all done\n", p->name);
537 }
538 
cluster_worker(void * data)539 static void *cluster_worker(void *data)
540 {
541     ClusterWorker *p = (ClusterWorker *)data;
542     MppThread *thd = p->thd;
543 
544     while (1) {
545         {
546             RK_S32 task_count = 0;
547 
548             cluster_dbg_lock("%s lock start\n", p->name);
549             AutoMutex autolock(thd->mutex());
550             cluster_dbg_lock("%s lock done\n", p->name);
551 
552             if (MPP_THREAD_RUNNING != thd->get_status())
553                 break;
554 
555             task_count = cluster_worker_get_task(p);
556             if (!task_count) {
557                 p->state = WORKER_IDLE;
558                 thd->wait();
559                 p->state = WORKER_RUNNING;
560             }
561         }
562 
563         cluster_worker_run_task(p);
564     }
565 
566     return NULL;
567 }
568 
cluster_signal_f(const char * caller,MppCluster * p)569 void cluster_signal_f(const char *caller, MppCluster *p)
570 {
571     RK_S32 i;
572 
573     cluster_dbg_flow("%s signal from %s\n", p->name, caller);
574 
575     for (i = 0; i < p->worker_count; i++) {
576         ClusterWorker *worker = &p->worker[i];
577         MppThread *thd = worker->thd;
578         AutoMutex auto_lock(thd->mutex());
579 
580         if (worker->state == WORKER_IDLE) {
581             thd->signal();
582             cluster_dbg_flow("%s signal\n", p->name);
583             break;
584         }
585     }
586 }
587 
588 class MppClusterServer;
589 
590 MppClusterServer *cluster_server = NULL;
591 
592 class MppClusterServer : Mutex
593 {
594 private:
595     // avoid any unwanted function
596     MppClusterServer();
597     ~MppClusterServer();
598     MppClusterServer(const MppClusterServer &);
599     MppClusterServer &operator=(const MppClusterServer &);
600 
601     MppCluster  *mClusters[VPU_CLIENT_BUTT];
602 
603 public:
single()604     static MppClusterServer *single() {
605         static MppClusterServer inst;
606         cluster_server = &inst;
607         return &inst;
608     }
609 
610     MppCluster  *get(MppClientType client_type);
611     MPP_RET     put(MppClientType client_type);
612 };
613 
MppClusterServer()614 MppClusterServer::MppClusterServer()
615 {
616     memset(mClusters, 0, sizeof(mClusters));
617 
618     mpp_env_get_u32("mpp_cluster_debug", &mpp_cluster_debug, 0);
619     mpp_env_get_u32("mpp_cluster_thd_cnt", &mpp_cluster_thd_cnt, 1);
620 }
621 
~MppClusterServer()622 MppClusterServer::~MppClusterServer()
623 {
624     RK_S32 i;
625 
626     for (i = 0; i < VPU_CLIENT_BUTT; i++)
627         put((MppClientType)i);
628 }
629 
get(MppClientType client_type)630 MppCluster *MppClusterServer::get(MppClientType client_type)
631 {
632     RK_S32 i;
633     MppCluster *p = NULL;
634 
635     if (client_type >= VPU_CLIENT_BUTT)
636         goto done;
637 
638     {
639         AutoMutex auto_lock(this);
640 
641         p = mClusters[client_type];
642         if (p)
643             goto done;
644 
645         p = mpp_malloc(MppCluster, 1);
646         if (p) {
647             for (i = 0; i < MAX_PRIORITY; i++)
648                 mpp_cluster_queue_init(&p->queue[i], p);
649 
650             p->pid  = getpid();
651             p->client_type = client_type;
652             snprintf(p->name, sizeof(p->name) - 1, "%d:%d", p->pid, client_type);
653             p->node_id = 0;
654             p->worker_id = 0;
655             p->worker_func = cluster_worker;
656             p->worker_count = mpp_cluster_thd_cnt;
657 
658             mpp_assert(p->worker_count > 0);
659 
660             p->worker = mpp_malloc(ClusterWorker, p->worker_count);
661 
662             for (i = 0; i < p->worker_count; i++)
663                 cluster_worker_init(&p->worker[i], p);
664 
665             mClusters[client_type] = p;
666             cluster_dbg_flow("%s created\n", p->name);
667         }
668     }
669 
670 done:
671     if (p)
672         cluster_dbg_flow("%s get\n", p->name);
673     else
674         cluster_dbg_flow("%d get cluster %d failed\n", getpid(), client_type);
675 
676     return p;
677 }
678 
put(MppClientType client_type)679 MPP_RET MppClusterServer::put(MppClientType client_type)
680 {
681     RK_S32 i;
682 
683     if (client_type >= VPU_CLIENT_BUTT)
684         return MPP_NOK;
685 
686     AutoMutex auto_lock(this);
687     MppCluster *p = mClusters[client_type];
688 
689     if (!p)
690         return MPP_NOK;
691 
692     for (i = 0; i < p->worker_count; i++)
693         cluster_worker_deinit(&p->worker[i]);
694 
695     cluster_dbg_flow("put %s\n", p->name);
696 
697     mpp_free(p);
698 
699     return MPP_OK;
700 }
701 
mpp_node_attach(MppNode node,MppClientType type)702 MPP_RET mpp_node_attach(MppNode node, MppClientType type)
703 {
704     MppNodeImpl *impl = (MppNodeImpl *)node;
705     MppCluster *p = MppClusterServer::single()->get(type);
706     RK_U32 priority = impl->priority;
707     ClusterQueue *queue = &p->queue[priority];
708 
709     mpp_assert(priority < MAX_PRIORITY);
710     mpp_assert(p);
711 
712     impl->node_id = MPP_FETCH_ADD(&p->node_id, 1);
713 
714     snprintf(impl->name, sizeof(impl->name) - 1, "%s:%d", p->name, impl->node_id);
715 
716     mpp_node_task_attach(&impl->task, impl, queue, &impl->work);
717 
718     MPP_FETCH_ADD(&p->node_count, 1);
719 
720     cluster_dbg_flow("%s:%d attached %d\n", p->name, impl->node_id, p->node_count);
721 
722     /* attach and run once first */
723     mpp_node_task_schedule(&impl->task);
724     cluster_dbg_flow("%s trigger signal from %s\n", impl->name, __FUNCTION__);
725 
726     return MPP_OK;
727 }
728 
mpp_node_detach(MppNode node)729 MPP_RET mpp_node_detach(MppNode node)
730 {
731     MppNodeImpl *impl = (MppNodeImpl *)node;
732 
733     mpp_node_task_detach(&impl->task);
734     cluster_dbg_flow("%s detached\n", impl->name);
735 
736     return MPP_OK;
737 }
738 
mpp_node_trigger_f(const char * caller,MppNode node,RK_S32 trigger)739 MPP_RET mpp_node_trigger_f(const char *caller, MppNode node, RK_S32 trigger)
740 {
741     if (trigger) {
742         MppNodeImpl *impl = (MppNodeImpl *)node;
743 
744         mpp_node_task_schedule_from(caller, &impl->task);
745     }
746 
747     return MPP_OK;
748 }
749