1 /*
2 * Copyright 2021 Rockchip Electronics Co. LTD
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define MODULE_TAG "mpp_cluster"
18
19 #include <string.h>
20
21 #include "mpp_mem.h"
22 #include "mpp_env.h"
23 #include "mpp_lock.h"
24 #include "mpp_time.h"
25 #include "mpp_debug.h"
26 #include "mpp_common.h"
27
28 #include "mpp_cluster.h"
29 #include "mpp_dev_defs.h"
30
31 #define MPP_CLUSTER_DBG_FLOW (0x00000001)
32 #define MPP_CLUSTER_DBG_LOCK (0x00000002)
33
34 #define cluster_dbg(flag, fmt, ...) _mpp_dbg(mpp_cluster_debug, flag, fmt, ## __VA_ARGS__)
35 #define cluster_dbg_f(flag, fmt, ...) _mpp_dbg_f(mpp_cluster_debug, flag, fmt, ## __VA_ARGS__)
36
37 #define cluster_dbg_flow(fmt, ...) cluster_dbg(MPP_CLUSTER_DBG_FLOW, fmt, ## __VA_ARGS__)
38 #define cluster_dbg_lock(fmt, ...) cluster_dbg(MPP_CLUSTER_DBG_LOCK, fmt, ## __VA_ARGS__)
39
40 RK_U32 mpp_cluster_debug = 0;
41 RK_U32 mpp_cluster_thd_cnt = 1;
42
43 typedef struct MppNodeProc_s MppNodeProc;
44 typedef struct MppNodeTask_s MppNodeTask;
45 typedef struct MppNodeImpl_s MppNodeImpl;
46
47 typedef struct ClusterQueue_s ClusterQueue;
48 typedef struct ClusterWorker_s ClusterWorker;
49 typedef struct MppCluster_s MppCluster;
50
51 #define NODE_VALID (0x00000001)
52 #define NODE_IDLE (0x00000002)
53 #define NODE_SIGNAL (0x00000004)
54 #define NODE_WAIT (0x00000008)
55 #define NODE_RUN (0x00000010)
56
57 #define NODE_ACT_NONE (0x00000000)
58 #define NODE_ACT_IDLE_TO_WAIT (0x00000001)
59 #define NODE_ACT_RUN_TO_SIGNAL (0x00000002)
60
61 typedef enum MppWorkerState_e {
62 WORKER_IDLE,
63 WORKER_RUNNING,
64
65 WORKER_STATE_BUTT,
66 } MppWorkerState;
67
68 struct MppNodeProc_s {
69 TaskProc proc;
70 void *param;
71
72 /* timing statistic */
73 RK_U32 run_count;
74 RK_S64 run_time;
75 };
76
77 struct MppNodeTask_s {
78 struct list_head list_sched;
79 MppNodeImpl *node;
80 const char *node_name;
81
82 /* lock ptr to cluster queue lock */
83 ClusterQueue *queue;
84
85 MppNodeProc *proc;
86 };
87
88 /* MppNode will be embeded in MppCtx */
89 struct MppNodeImpl_s {
90 char name[32];
91 /* list linked to scheduler */
92 RK_S32 node_id;
93 RK_U32 state;
94
95 MppNodeProc work;
96
97 RK_U32 priority;
98 RK_S32 attached;
99 sem_t sem_detach;
100
101 /* for cluster schedule */
102 MppNodeTask task;
103 };
104
105 struct ClusterQueue_s {
106 MppCluster *cluster;
107
108 pthread_mutex_t lock;
109 struct list_head list;
110 RK_S32 count;
111 };
112
113 struct ClusterWorker_s {
114 char name[32];
115 MppCluster *cluster;
116 RK_S32 worker_id;
117
118 MppThread *thd;
119 MppWorkerState state;
120
121 RK_S32 batch_count;
122 RK_S32 work_count;
123 struct list_head list_task;
124 };
125
126 struct MppCluster_s {
127 char name[16];
128 pid_t pid;
129 RK_S32 client_type;
130 RK_S32 node_id;
131 RK_S32 worker_id;
132
133 ClusterQueue queue[MAX_PRIORITY];
134 RK_S32 node_count;
135
136 /* multi-worker info */
137 RK_S32 worker_count;
138 ClusterWorker *worker;
139 MppThreadFunc worker_func;
140 };
141
142 #define mpp_node_task_schedule(task) \
143 mpp_node_task_schedule_f(__FUNCTION__, task)
144
145 #define mpp_node_task_schedule_from(caller, task) \
146 mpp_node_task_schedule_f(caller, task)
147
148 #define cluster_queue_lock(queue) cluster_queue_lock_f(__FUNCTION__, queue)
149 #define cluster_queue_unlock(queue) cluster_queue_unlock_f(__FUNCTION__, queue)
150
cluster_queue_lock_f(const char * caller,ClusterQueue * queue)151 static MPP_RET cluster_queue_lock_f(const char *caller, ClusterQueue *queue)
152 {
153 MppCluster *cluster = queue->cluster;
154 RK_S32 ret;
155
156 cluster_dbg_lock("%s lock queue at %s start\n", cluster->name, caller);
157
158 ret = pthread_mutex_lock(&queue->lock);
159
160 cluster_dbg_lock("%s lock queue at %s ret %d \n", cluster->name, caller, ret);
161
162 return (ret) ? MPP_NOK : MPP_OK;
163 }
164
cluster_queue_unlock_f(const char * caller,ClusterQueue * queue)165 static MPP_RET cluster_queue_unlock_f(const char *caller, ClusterQueue *queue)
166 {
167 MppCluster *cluster = queue->cluster;
168 RK_S32 ret;
169
170 cluster_dbg_lock("%s unlock queue at %s start\n", cluster->name, caller);
171
172 ret = pthread_mutex_unlock(&queue->lock);
173
174 cluster_dbg_lock("%s unlock queue at %s ret %d \n", cluster->name, caller, ret);
175
176 return (ret) ? MPP_NOK : MPP_OK;
177 }
178
179 void cluster_signal_f(const char *caller, MppCluster *p);
180
mpp_cluster_queue_init(ClusterQueue * queue,MppCluster * cluster)181 MPP_RET mpp_cluster_queue_init(ClusterQueue *queue, MppCluster *cluster)
182 {
183 pthread_mutexattr_t attr;
184
185 pthread_mutexattr_init(&attr);
186 pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
187 pthread_mutex_init(&queue->lock, &attr);
188 pthread_mutexattr_destroy(&attr);
189
190 queue->cluster = cluster;
191 INIT_LIST_HEAD(&queue->list);
192 queue->count = 0;
193
194 return MPP_OK;
195 }
196
mpp_cluster_queue_deinit(ClusterQueue * queue)197 MPP_RET mpp_cluster_queue_deinit(ClusterQueue *queue)
198 {
199 mpp_assert(!queue->count);
200 mpp_assert(list_empty(&queue->list));
201
202 pthread_mutex_destroy(&queue->lock);
203
204 return MPP_OK;
205 }
206
mpp_node_task_attach(MppNodeTask * task,MppNodeImpl * node,ClusterQueue * queue,MppNodeProc * proc)207 MPP_RET mpp_node_task_attach(MppNodeTask *task, MppNodeImpl *node,
208 ClusterQueue *queue, MppNodeProc *proc)
209 {
210 INIT_LIST_HEAD(&task->list_sched);
211
212 task->node = node;
213 task->node_name = node->name;
214
215 task->queue = queue;
216 task->proc = proc;
217
218 node->state = NODE_VALID | NODE_IDLE;
219 node->attached = 1;
220
221 return MPP_OK;
222 }
223
mpp_node_task_schedule_f(const char * caller,MppNodeTask * task)224 MPP_RET mpp_node_task_schedule_f(const char *caller, MppNodeTask *task)
225 {
226 ClusterQueue *queue = task->queue;
227 MppCluster *cluster = queue->cluster;
228 MppNodeImpl *node = task->node;
229 MppNodeProc *proc = task->proc;
230 const char *node_name = task->node_name;
231 RK_U32 new_st;
232 RK_U32 action = NODE_ACT_NONE;
233 bool ret = false;
234
235 cluster_dbg_flow("%s sched from %s before [%d:%d] queue %d\n",
236 node_name, caller, node->state, proc->run_count, queue->count);
237
238 do {
239 RK_U32 old_st = node->state;
240
241 action = NODE_ACT_NONE;
242 new_st = 0;
243
244 if (old_st & NODE_WAIT) {
245 cluster_dbg_flow("%s sched task %x stay wait\n", node_name, old_st);
246 break;
247 }
248
249 if (old_st & NODE_IDLE) {
250 new_st = old_st ^ (NODE_IDLE | NODE_WAIT);
251 cluster_dbg_flow("%s sched task %x -> %x wait\n", node_name, old_st, new_st);
252 action = NODE_ACT_IDLE_TO_WAIT;
253 } else if (old_st & NODE_RUN) {
254 new_st = old_st | NODE_SIGNAL;
255 action = NODE_ACT_RUN_TO_SIGNAL;
256 cluster_dbg_flow("%s sched task %x -> %x signal\n", node_name, old_st, new_st);
257 } else {
258 cluster_dbg_flow("%s sched task %x unknow state %x\n", node_name, old_st);
259 }
260
261 ret = MPP_BOOL_CAS(&node->state, old_st, new_st);
262 cluster_dbg_flow("%s sched task %x -> %x cas ret %d act %d\n",
263 node_name, old_st, new_st, ret, action);
264 } while (!ret);
265
266 switch (action) {
267 case NODE_ACT_IDLE_TO_WAIT : {
268 cluster_queue_lock(queue);
269 mpp_assert(list_empty(&task->list_sched));
270 list_add_tail(&task->list_sched, &queue->list);
271 queue->count++;
272 cluster_dbg_flow("%s sched task -> wq %s:%d\n", node_name, cluster->name, queue->count);
273 cluster_queue_unlock(queue);
274
275 cluster_dbg_flow("%s sched signal from %s\n", node_name, caller);
276 cluster_signal_f(caller, cluster);
277 } break;
278 case NODE_ACT_RUN_TO_SIGNAL : {
279 cluster_dbg_flow("%s sched signal from %s\n", node_name, caller);
280 cluster_signal_f(caller, cluster);
281 } break;
282 }
283
284 cluster_dbg_flow("%s sched from %s after [%d:%d] queue %d\n",
285 node_name, caller, node->state, proc->run_count, queue->count);
286
287 return MPP_OK;
288 }
289
mpp_node_task_detach(MppNodeTask * task)290 MPP_RET mpp_node_task_detach(MppNodeTask *task)
291 {
292 MppNodeImpl *node = task->node;
293 MPP_RET ret = MPP_OK;
294
295 if (node->attached) {
296 const char *node_name = task->node_name;
297 MppNodeProc *proc = task->proc;
298
299 MPP_FETCH_AND(&node->state, ~NODE_VALID);
300
301 mpp_node_task_schedule(task);
302
303 cluster_dbg_flow("%s state %x:%d wait detach start\n",
304 node_name, node->state, proc->run_count);
305
306 sem_wait(&node->sem_detach);
307 mpp_assert(node->attached == 0);
308
309 cluster_dbg_flow("%s state %x:%d wait detach done\n",
310 node_name, node->state, proc->run_count);
311 }
312
313 return ret;
314 }
315
mpp_node_init(MppNode * node)316 MPP_RET mpp_node_init(MppNode *node)
317 {
318 MppNodeImpl *p = mpp_calloc(MppNodeImpl, 1);
319 if (p)
320 sem_init(&p->sem_detach, 0, 0);
321
322 *node = p;
323
324 return p ? MPP_OK : MPP_NOK;
325 }
326
mpp_node_deinit(MppNode node)327 MPP_RET mpp_node_deinit(MppNode node)
328 {
329 MppNodeImpl *p = (MppNodeImpl *)node;
330
331 if (p) {
332 if (p->attached)
333 mpp_node_task_detach(&p->task);
334
335 mpp_assert(p->attached == 0);
336
337 sem_destroy(&p->sem_detach);
338
339 mpp_free(p);
340 }
341
342 return MPP_OK;
343 }
344
mpp_node_set_func(MppNode node,TaskProc proc,void * param)345 MPP_RET mpp_node_set_func(MppNode node, TaskProc proc, void *param)
346 {
347 MppNodeImpl *p = (MppNodeImpl *)node;
348 if (!p)
349 return MPP_NOK;
350
351 p->work.proc = proc;
352 p->work.param = param;
353
354 return MPP_OK;
355 }
356
cluster_worker_init(ClusterWorker * p,MppCluster * cluster)357 MPP_RET cluster_worker_init(ClusterWorker *p, MppCluster *cluster)
358 {
359 MppThread *thd = NULL;
360 MPP_RET ret = MPP_NOK;
361
362 INIT_LIST_HEAD(&p->list_task);
363 p->worker_id = cluster->worker_id++;
364
365 p->batch_count = 1;
366 p->work_count = 0;
367 p->cluster = cluster;
368 p->state = WORKER_IDLE;
369 snprintf(p->name, sizeof(p->name) - 1, "%d:W%d", cluster->pid, p->worker_id);
370 thd = new MppThread(cluster->worker_func, p, p->name);
371 if (thd) {
372 p->thd = thd;
373 thd->start();
374 ret = MPP_OK;
375 }
376
377 return ret;
378 }
379
cluster_worker_deinit(ClusterWorker * p)380 MPP_RET cluster_worker_deinit(ClusterWorker *p)
381 {
382 if (p->thd) {
383 p->thd->stop();
384 delete p->thd;
385 p->thd = NULL;
386 }
387
388 mpp_assert(list_empty(&p->list_task));
389 mpp_assert(p->work_count == 0);
390
391 p->batch_count = 0;
392 p->cluster = NULL;
393
394 return MPP_OK;
395 }
396
cluster_worker_get_task(ClusterWorker * p)397 RK_S32 cluster_worker_get_task(ClusterWorker *p)
398 {
399 MppCluster *cluster = p->cluster;
400 RK_S32 batch_count = p->batch_count;
401 RK_S32 count = 0;
402 RK_U32 new_st;
403 RK_U32 old_st;
404 bool ret;
405 RK_S32 i;
406
407 cluster_dbg_flow("%s get %d task start\n", p->name, batch_count);
408
409 for (i = 0; i < MAX_PRIORITY; i++) {
410 ClusterQueue *queue = &cluster->queue[i];
411 MppNodeTask *task = NULL;
412 MppNodeImpl *node = NULL;
413
414 do {
415 cluster_queue_lock(queue);
416
417 if (list_empty(&queue->list)) {
418 mpp_assert(queue->count == 0);
419 cluster_dbg_flow("%s get P%d task ret no task\n", p->name, i);
420 cluster_queue_unlock(queue);
421 break;
422 }
423
424 mpp_assert(queue->count);
425 task = list_first_entry(&queue->list, MppNodeTask, list_sched);
426 list_del_init(&task->list_sched);
427 node = task->node;
428
429 queue->count--;
430
431 do {
432 old_st = node->state;
433 new_st = old_st ^ (NODE_WAIT | NODE_RUN);
434
435 mpp_assert(old_st & NODE_WAIT);
436 ret = MPP_BOOL_CAS(&node->state, old_st, new_st);
437 } while (!ret);
438
439 list_add_tail(&task->list_sched, &p->list_task);
440 p->work_count++;
441 count++;
442
443 cluster_dbg_flow("%s get P%d %s -> rq %d\n", p->name, i, node->name, p->work_count);
444
445 cluster_queue_unlock(queue);
446
447 if (count >= batch_count)
448 break;
449 } while (1);
450
451 if (count >= batch_count)
452 break;
453 }
454
455 cluster_dbg_flow("%s get %d task ret %d\n", p->name, batch_count, count);
456
457 return count;
458 }
459
cluster_worker_run_task(ClusterWorker * p)460 static void cluster_worker_run_task(ClusterWorker *p)
461 {
462 RK_U32 new_st;
463 RK_U32 old_st;
464 bool cas_ret;
465
466 cluster_dbg_flow("%s run %d work start\n", p->name, p->work_count);
467
468 while (!list_empty(&p->list_task)) {
469 MppNodeTask *task = list_first_entry(&p->list_task, MppNodeTask, list_sched);
470 MppNodeProc *proc = task->proc;
471 MppNodeImpl *node = task->node;
472 RK_S64 time_start;
473 RK_S64 time_end;
474 RK_U32 state;
475 MPP_RET proc_ret;
476
477 /* check trigger for re-add task */
478 cluster_dbg_flow("%s run %s start atate %d\n", p->name, task->node_name, node->state);
479 mpp_assert(node->state & NODE_RUN);
480 if (!(node->state & NODE_RUN))
481 mpp_err_f("%s run state check %x is invalid on run", p->name, node->state);
482
483 time_start = mpp_time();
484 proc_ret = proc->proc(proc->param);
485 time_end = mpp_time();
486
487 cluster_dbg_flow("%s run %s ret %d\n", p->name, task->node_name, proc_ret);
488 proc->run_time += time_end - time_start;
489 proc->run_count++;
490
491 state = node->state;
492 if (!(state & NODE_VALID)) {
493 cluster_dbg_flow("%s run found destroy\n", p->name);
494 list_del_init(&task->list_sched);
495 node->attached = 0;
496
497 sem_post(&node->sem_detach);
498 cluster_dbg_flow("%s run sem post done\n", p->name);
499 } else if (state & NODE_SIGNAL) {
500 ClusterQueue *queue = task->queue;
501
502 list_del_init(&task->list_sched);
503
504 do {
505 old_st = state;
506 // NOTE: clear NODE_RUN and NODE_SIGNAL, set NODE_WAIT
507 new_st = old_st ^ (NODE_SIGNAL | NODE_WAIT | NODE_RUN);
508 cas_ret = MPP_BOOL_CAS(&node->state, old_st, new_st);
509 } while (!cas_ret);
510
511 cluster_dbg_flow("%s run state %x -> %x signal -> wait\n", p->name, old_st, new_st);
512
513 cluster_queue_lock(queue);
514 list_add_tail(&task->list_sched, &queue->list);
515 queue->count++;
516 cluster_queue_unlock(queue);
517 } else {
518 list_del_init(&task->list_sched);
519 do {
520 old_st = node->state;
521 new_st = old_st ^ (NODE_IDLE | NODE_RUN);
522
523 cas_ret = MPP_BOOL_CAS(&node->state, old_st, new_st);
524 } while (!cas_ret);
525 mpp_assert(node->state & NODE_IDLE);
526 mpp_assert(!(node->state & NODE_RUN));
527
528 cluster_dbg_flow("%s run state %x -> %x run -> idle\n", p->name, old_st, new_st);
529 }
530
531 p->work_count--;
532 }
533
534 mpp_assert(p->work_count == 0);
535
536 cluster_dbg_flow("%s run all done\n", p->name);
537 }
538
cluster_worker(void * data)539 static void *cluster_worker(void *data)
540 {
541 ClusterWorker *p = (ClusterWorker *)data;
542 MppThread *thd = p->thd;
543
544 while (1) {
545 {
546 RK_S32 task_count = 0;
547
548 cluster_dbg_lock("%s lock start\n", p->name);
549 AutoMutex autolock(thd->mutex());
550 cluster_dbg_lock("%s lock done\n", p->name);
551
552 if (MPP_THREAD_RUNNING != thd->get_status())
553 break;
554
555 task_count = cluster_worker_get_task(p);
556 if (!task_count) {
557 p->state = WORKER_IDLE;
558 thd->wait();
559 p->state = WORKER_RUNNING;
560 }
561 }
562
563 cluster_worker_run_task(p);
564 }
565
566 return NULL;
567 }
568
cluster_signal_f(const char * caller,MppCluster * p)569 void cluster_signal_f(const char *caller, MppCluster *p)
570 {
571 RK_S32 i;
572
573 cluster_dbg_flow("%s signal from %s\n", p->name, caller);
574
575 for (i = 0; i < p->worker_count; i++) {
576 ClusterWorker *worker = &p->worker[i];
577 MppThread *thd = worker->thd;
578 AutoMutex auto_lock(thd->mutex());
579
580 if (worker->state == WORKER_IDLE) {
581 thd->signal();
582 cluster_dbg_flow("%s signal\n", p->name);
583 break;
584 }
585 }
586 }
587
588 class MppClusterServer;
589
590 MppClusterServer *cluster_server = NULL;
591
592 class MppClusterServer : Mutex
593 {
594 private:
595 // avoid any unwanted function
596 MppClusterServer();
597 ~MppClusterServer();
598 MppClusterServer(const MppClusterServer &);
599 MppClusterServer &operator=(const MppClusterServer &);
600
601 MppCluster *mClusters[VPU_CLIENT_BUTT];
602
603 public:
single()604 static MppClusterServer *single() {
605 static MppClusterServer inst;
606 cluster_server = &inst;
607 return &inst;
608 }
609
610 MppCluster *get(MppClientType client_type);
611 MPP_RET put(MppClientType client_type);
612 };
613
MppClusterServer()614 MppClusterServer::MppClusterServer()
615 {
616 memset(mClusters, 0, sizeof(mClusters));
617
618 mpp_env_get_u32("mpp_cluster_debug", &mpp_cluster_debug, 0);
619 mpp_env_get_u32("mpp_cluster_thd_cnt", &mpp_cluster_thd_cnt, 1);
620 }
621
~MppClusterServer()622 MppClusterServer::~MppClusterServer()
623 {
624 RK_S32 i;
625
626 for (i = 0; i < VPU_CLIENT_BUTT; i++)
627 put((MppClientType)i);
628 }
629
get(MppClientType client_type)630 MppCluster *MppClusterServer::get(MppClientType client_type)
631 {
632 RK_S32 i;
633 MppCluster *p = NULL;
634
635 if (client_type >= VPU_CLIENT_BUTT)
636 goto done;
637
638 {
639 AutoMutex auto_lock(this);
640
641 p = mClusters[client_type];
642 if (p)
643 goto done;
644
645 p = mpp_malloc(MppCluster, 1);
646 if (p) {
647 for (i = 0; i < MAX_PRIORITY; i++)
648 mpp_cluster_queue_init(&p->queue[i], p);
649
650 p->pid = getpid();
651 p->client_type = client_type;
652 snprintf(p->name, sizeof(p->name) - 1, "%d:%d", p->pid, client_type);
653 p->node_id = 0;
654 p->worker_id = 0;
655 p->worker_func = cluster_worker;
656 p->worker_count = mpp_cluster_thd_cnt;
657
658 mpp_assert(p->worker_count > 0);
659
660 p->worker = mpp_malloc(ClusterWorker, p->worker_count);
661
662 for (i = 0; i < p->worker_count; i++)
663 cluster_worker_init(&p->worker[i], p);
664
665 mClusters[client_type] = p;
666 cluster_dbg_flow("%s created\n", p->name);
667 }
668 }
669
670 done:
671 if (p)
672 cluster_dbg_flow("%s get\n", p->name);
673 else
674 cluster_dbg_flow("%d get cluster %d failed\n", getpid(), client_type);
675
676 return p;
677 }
678
put(MppClientType client_type)679 MPP_RET MppClusterServer::put(MppClientType client_type)
680 {
681 RK_S32 i;
682
683 if (client_type >= VPU_CLIENT_BUTT)
684 return MPP_NOK;
685
686 AutoMutex auto_lock(this);
687 MppCluster *p = mClusters[client_type];
688
689 if (!p)
690 return MPP_NOK;
691
692 for (i = 0; i < p->worker_count; i++)
693 cluster_worker_deinit(&p->worker[i]);
694
695 cluster_dbg_flow("put %s\n", p->name);
696
697 mpp_free(p);
698
699 return MPP_OK;
700 }
701
mpp_node_attach(MppNode node,MppClientType type)702 MPP_RET mpp_node_attach(MppNode node, MppClientType type)
703 {
704 MppNodeImpl *impl = (MppNodeImpl *)node;
705 MppCluster *p = MppClusterServer::single()->get(type);
706 RK_U32 priority = impl->priority;
707 ClusterQueue *queue = &p->queue[priority];
708
709 mpp_assert(priority < MAX_PRIORITY);
710 mpp_assert(p);
711
712 impl->node_id = MPP_FETCH_ADD(&p->node_id, 1);
713
714 snprintf(impl->name, sizeof(impl->name) - 1, "%s:%d", p->name, impl->node_id);
715
716 mpp_node_task_attach(&impl->task, impl, queue, &impl->work);
717
718 MPP_FETCH_ADD(&p->node_count, 1);
719
720 cluster_dbg_flow("%s:%d attached %d\n", p->name, impl->node_id, p->node_count);
721
722 /* attach and run once first */
723 mpp_node_task_schedule(&impl->task);
724 cluster_dbg_flow("%s trigger signal from %s\n", impl->name, __FUNCTION__);
725
726 return MPP_OK;
727 }
728
mpp_node_detach(MppNode node)729 MPP_RET mpp_node_detach(MppNode node)
730 {
731 MppNodeImpl *impl = (MppNodeImpl *)node;
732
733 mpp_node_task_detach(&impl->task);
734 cluster_dbg_flow("%s detached\n", impl->name);
735
736 return MPP_OK;
737 }
738
mpp_node_trigger_f(const char * caller,MppNode node,RK_S32 trigger)739 MPP_RET mpp_node_trigger_f(const char *caller, MppNode node, RK_S32 trigger)
740 {
741 if (trigger) {
742 MppNodeImpl *impl = (MppNodeImpl *)node;
743
744 mpp_node_task_schedule_from(caller, &impl->task);
745 }
746
747 return MPP_OK;
748 }
749