Lines Matching refs:p
193 void cluster_signal_f(const char *caller, MppCluster *p);
332 MppNodeImpl *p = mpp_calloc(MppNodeImpl, 1); in mpp_node_init() local
333 if (p) in mpp_node_init()
334 sem_init(&p->sem_detach, 0, 0); in mpp_node_init()
336 *node = p; in mpp_node_init()
338 return p ? MPP_OK : MPP_NOK; in mpp_node_init()
343 MppNodeImpl *p = (MppNodeImpl *)node; in mpp_node_deinit() local
345 if (p) { in mpp_node_deinit()
346 if (p->attached) in mpp_node_deinit()
347 mpp_node_task_detach(&p->task); in mpp_node_deinit()
349 mpp_assert(p->attached == 0); in mpp_node_deinit()
351 sem_destroy(&p->sem_detach); in mpp_node_deinit()
353 mpp_free(p); in mpp_node_deinit()
361 MppNodeImpl *p = (MppNodeImpl *)node; in mpp_node_set_func() local
362 if (!p) in mpp_node_set_func()
365 p->work.proc = proc; in mpp_node_set_func()
366 p->work.param = param; in mpp_node_set_func()
371 MPP_RET cluster_worker_init(ClusterWorker *p, MppCluster *cluster) in cluster_worker_init() argument
376 INIT_LIST_HEAD(&p->list_task); in cluster_worker_init()
377 p->worker_id = cluster->worker_id++; in cluster_worker_init()
379 p->batch_count = 1; in cluster_worker_init()
380 p->work_count = 0; in cluster_worker_init()
381 p->cluster = cluster; in cluster_worker_init()
382 p->state = WORKER_IDLE; in cluster_worker_init()
383 snprintf(p->name, sizeof(p->name) - 1, "%d:W%d", cluster->pid, p->worker_id); in cluster_worker_init()
384 thd = mpp_thread_create(cluster->worker_func, p, p->name); in cluster_worker_init()
386 p->thd = thd; in cluster_worker_init()
394 MPP_RET cluster_worker_deinit(ClusterWorker *p) in cluster_worker_deinit() argument
396 if (p->thd) { in cluster_worker_deinit()
397 mpp_thread_stop(p->thd); in cluster_worker_deinit()
398 mpp_thread_destroy(p->thd); in cluster_worker_deinit()
399 p->thd = NULL; in cluster_worker_deinit()
402 mpp_assert(list_empty(&p->list_task)); in cluster_worker_deinit()
403 mpp_assert(p->work_count == 0); in cluster_worker_deinit()
405 p->batch_count = 0; in cluster_worker_deinit()
406 p->cluster = NULL; in cluster_worker_deinit()
411 RK_S32 cluster_worker_get_task(ClusterWorker *p) in cluster_worker_get_task() argument
413 MppCluster *cluster = p->cluster; in cluster_worker_get_task()
414 RK_S32 batch_count = p->batch_count; in cluster_worker_get_task()
421 cluster_dbg_flow("%s get %d task start\n", p->name, batch_count); in cluster_worker_get_task()
433 cluster_dbg_flow("%s get P%d task ret no task\n", p->name, i); in cluster_worker_get_task()
453 list_add_tail(&task->list_sched, &p->list_task); in cluster_worker_get_task()
454 p->work_count++; in cluster_worker_get_task()
457 cluster_dbg_flow("%s get P%d %s -> rq %d\n", p->name, i, node->name, p->work_count); in cluster_worker_get_task()
469 cluster_dbg_flow("%s get %d task ret %d\n", p->name, batch_count, count); in cluster_worker_get_task()
474 static void cluster_worker_run_task(ClusterWorker *p) in cluster_worker_run_task() argument
480 cluster_dbg_flow("%s run %d work start\n", p->name, p->work_count); in cluster_worker_run_task()
482 while (!list_empty(&p->list_task)) { in cluster_worker_run_task()
483 MppNodeTask *task = list_first_entry(&p->list_task, MppNodeTask, list_sched); in cluster_worker_run_task()
492 cluster_dbg_flow("%s run %s start atate %d\n", p->name, task->node_name, node->state); in cluster_worker_run_task()
495 mpp_err_f("%s run state check %x is invalid on run", p->name, node->state); in cluster_worker_run_task()
501 cluster_dbg_flow("%s run %s ret %d\n", p->name, task->node_name, proc_ret); in cluster_worker_run_task()
507 cluster_dbg_flow("%s run found destroy\n", p->name); in cluster_worker_run_task()
512 cluster_dbg_flow("%s run sem post done\n", p->name); in cluster_worker_run_task()
525 cluster_dbg_flow("%s run state %x -> %x signal -> wait\n", p->name, old_st, new_st); in cluster_worker_run_task()
542 cluster_dbg_flow("%s run state %x -> %x run -> idle\n", p->name, old_st, new_st); in cluster_worker_run_task()
545 p->work_count--; in cluster_worker_run_task()
548 mpp_assert(p->work_count == 0); in cluster_worker_run_task()
550 cluster_dbg_flow("%s run all done\n", p->name); in cluster_worker_run_task()
555 ClusterWorker *p = (ClusterWorker *)data; in cluster_worker() local
556 MppThread *thd = p->thd; in cluster_worker()
562 cluster_dbg_lock("%s lock start\n", p->name); in cluster_worker()
564 cluster_dbg_lock("%s lock done\n", p->name); in cluster_worker()
571 task_count = cluster_worker_get_task(p); in cluster_worker()
573 p->state = WORKER_IDLE; in cluster_worker()
575 p->state = WORKER_RUNNING; in cluster_worker()
580 cluster_worker_run_task(p); in cluster_worker()
586 void cluster_signal_f(const char *caller, MppCluster *p) in cluster_signal_f() argument
590 cluster_dbg_flow("%s signal from %s\n", p->name, caller); in cluster_signal_f()
592 for (i = 0; i < p->worker_count; i++) { in cluster_signal_f()
593 ClusterWorker *worker = &p->worker[i]; in cluster_signal_f()
600 cluster_dbg_flow("%s signal\n", p->name); in cluster_signal_f()
652 MppCluster *p = NULL; in MPP_SINGLETON() local
660 p = srv->clusters[client_type]; in MPP_SINGLETON()
661 if (p) { in MPP_SINGLETON()
666 p = mpp_malloc(MppCluster, 1); in MPP_SINGLETON()
667 if (p) { in MPP_SINGLETON()
669 mpp_cluster_queue_init(&p->queue[i], p); in MPP_SINGLETON()
671 p->pid = getpid(); in MPP_SINGLETON()
672 p->client_type = client_type; in MPP_SINGLETON()
673 snprintf(p->name, sizeof(p->name) - 1, "%d:%d", p->pid, client_type); in MPP_SINGLETON()
674 p->node_id = 0; in MPP_SINGLETON()
675 p->worker_id = 0; in MPP_SINGLETON()
676 p->worker_func = cluster_worker; in MPP_SINGLETON()
677 p->worker_count = mpp_cluster_thd_cnt; in MPP_SINGLETON()
679 mpp_assert(p->worker_count > 0); in MPP_SINGLETON()
681 p->worker = mpp_malloc(ClusterWorker, p->worker_count); in MPP_SINGLETON()
683 for (i = 0; i < p->worker_count; i++) in MPP_SINGLETON()
684 cluster_worker_init(&p->worker[i], p); in MPP_SINGLETON()
686 srv->clusters[client_type] = p; in MPP_SINGLETON()
687 cluster_dbg_flow("%s created\n", p->name); in MPP_SINGLETON()
694 if (p) in MPP_SINGLETON()
695 cluster_dbg_flow("%s get\n", p->name); in MPP_SINGLETON()
699 return p; in MPP_SINGLETON()
705 MppCluster *p; in cluster_server_put() local
713 p = srv->clusters[client_type]; in cluster_server_put()
714 if (!p) { in cluster_server_put()
719 for (i = 0; i < p->worker_count; i++) in cluster_server_put()
720 cluster_worker_deinit(&p->worker[i]); in cluster_server_put()
723 mpp_cluster_queue_deinit(&p->queue[i]); in cluster_server_put()
725 cluster_dbg_flow("put %s\n", p->name); in cluster_server_put()
727 MPP_FREE(p->worker); in cluster_server_put()
728 mpp_free(p); in cluster_server_put()
739 MppCluster *p = cluster_server_get(type); in mpp_node_attach() local
741 ClusterQueue *queue = &p->queue[priority]; in mpp_node_attach()
744 mpp_assert(p); in mpp_node_attach()
746 impl->node_id = MPP_FETCH_ADD(&p->node_id, 1); in mpp_node_attach()
748 snprintf(impl->name, sizeof(impl->name) - 1, "%s:%d", p->name, impl->node_id); in mpp_node_attach()
752 MPP_FETCH_ADD(&p->node_count, 1); in mpp_node_attach()
754 cluster_dbg_flow("%s:%d attached %d\n", p->name, impl->node_id, p->node_count); in mpp_node_attach()