1 /* SPDX-License-Identifier: Apache-2.0 OR MIT */
2 /*
3 * Copyright (c) 2015 Rockchip Electronics Co., Ltd.
4 */
5
6 #define MODULE_TAG "mpp_thread"
7
8 #include <string.h>
9
10 #include "mpp_mem.h"
11 #include "mpp_lock.h"
12 #include "mpp_debug.h"
13 #include "mpp_common.h"
14 #include "mpp_thread.h"
15
16 #define THREAD_DBG_FUNC (0x00000001)
17
18 static rk_u32 thread_debug = 0;
19
20 #define thread_dbg(flag, fmt, ...) _mpp_dbg(thread_debug, flag, fmt, ## __VA_ARGS__)
21
mpp_thread_create(MppThreadFunc func,void * ctx,const char * name)22 MppThread *mpp_thread_create(MppThreadFunc func, void *ctx, const char *name)
23 {
24 MppThread *thread = mpp_malloc(MppThread, 1);
25 int i;
26
27 if (thread) {
28 thread->func = func;
29 thread->ctx = ctx;
30
31 thread->thd_status[THREAD_WORK] = MPP_THREAD_UNINITED;
32 thread->thd_status[THREAD_INPUT] = MPP_THREAD_RUNNING;
33 thread->thd_status[THREAD_OUTPUT] = MPP_THREAD_RUNNING;
34 thread->thd_status[THREAD_CONTROL] = MPP_THREAD_RUNNING;
35
36 if (name) {
37 strncpy(thread->name, name, THREAD_NAME_LEN - 1);
38 thread->name[THREAD_NAME_LEN - 1] = '\0';
39 } else {
40 snprintf(thread->name, THREAD_NAME_LEN, "MppThread");
41 }
42 for (i = 0; i < THREAD_SIGNAL_BUTT; i++) {
43 mpp_mutex_cond_init(&thread->mutex_cond[i]);
44 }
45 }
46
47 return thread;
48 }
49
mpp_thread_dump_status(MppThread * thread)50 void mpp_thread_dump_status(MppThread *thread)
51 {
52 mpp_log("thread %s status: %d %d %d %d\n", thread->name,
53 thread->thd_status[THREAD_WORK], thread->thd_status[THREAD_INPUT],
54 thread->thd_status[THREAD_OUTPUT], thread->thd_status[THREAD_CONTROL]);
55 }
56
mpp_thread_start(MppThread * thread)57 void mpp_thread_start(MppThread *thread)
58 {
59 pthread_attr_t attr;
60
61 pthread_attr_init(&attr);
62 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
63
64 if (mpp_thread_get_status(thread, THREAD_WORK) == MPP_THREAD_UNINITED) {
65 mpp_thread_set_status(thread, MPP_THREAD_RUNNING, THREAD_WORK);
66 if (0 == pthread_create(&thread->thd, &attr, thread->func, thread->ctx)) {
67 int ret = pthread_setname_np(thread->thd, thread->name);
68 if (ret) {
69 mpp_err("thread %p setname %s failed\n", thread->func, thread->name);
70 }
71 thread_dbg(THREAD_DBG_FUNC, "thread %s %p context %p create success\n",
72 thread->name, thread->func, thread->ctx);
73 } else {
74 mpp_thread_set_status(thread, MPP_THREAD_UNINITED, THREAD_WORK);
75 }
76 }
77
78 pthread_attr_destroy(&attr);
79 }
80
mpp_thread_stop(MppThread * thread)81 void mpp_thread_stop(MppThread *thread)
82 {
83 if (mpp_thread_get_status(thread, THREAD_WORK) != MPP_THREAD_UNINITED) {
84 void *dummy;
85
86 mpp_thread_lock(thread, THREAD_WORK);
87 mpp_thread_set_status(thread, MPP_THREAD_STOPPING, THREAD_WORK);
88
89 thread_dbg(THREAD_DBG_FUNC, "MPP_THREAD_STOPPING status set thd %p\n", (void *)thread);
90 mpp_thread_signal(thread, THREAD_WORK);
91 mpp_thread_unlock(thread, THREAD_WORK);
92
93 pthread_join(thread->thd, &dummy);
94 thread_dbg(THREAD_DBG_FUNC, "thread %s %p context %p destroy success\n", thread->name, thread->func, thread->ctx);
95
96 mpp_thread_set_status(thread, MPP_THREAD_UNINITED, THREAD_WORK);
97 }
98 }
99
mpp_thread_destroy(MppThread * thread)100 void mpp_thread_destroy(MppThread *thread)
101 {
102 if (thread) {
103 mpp_thread_stop(thread);
104 mpp_free(thread);
105 }
106 }
107
mpp_mutex_init(MppMutex * mutex)108 void mpp_mutex_init(MppMutex *mutex)
109 {
110 pthread_mutexattr_t attr;
111 pthread_mutexattr_init(&attr);
112 pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
113 pthread_mutex_init(&mutex->lock, &attr);
114 pthread_mutexattr_destroy(&attr);
115 }
116
mpp_mutex_destroy(MppMutex * mutex)117 void mpp_mutex_destroy(MppMutex *mutex)
118 {
119 pthread_mutex_destroy(&mutex->lock);
120 }
121
mpp_mutex_lock(MppMutex * mutex)122 void mpp_mutex_lock(MppMutex *mutex)
123 {
124 pthread_mutex_lock(&mutex->lock);
125 }
126
mpp_mutex_unlock(MppMutex * mutex)127 void mpp_mutex_unlock(MppMutex *mutex)
128 {
129 pthread_mutex_unlock(&mutex->lock);
130 }
131
mpp_mutex_trylock(MppMutex * mutex)132 int mpp_mutex_trylock(MppMutex *mutex)
133 {
134 return pthread_mutex_trylock(&mutex->lock);
135 }
136
137 // MppCond functions
mpp_cond_init(MppCond * condition)138 void mpp_cond_init(MppCond *condition)
139 {
140 #ifdef COND_USE_CLOCK_MONOTONIC
141 pthread_condattr_t attr;
142
143 pthread_condattr_init(&attr);;
144
145 if (pthread_condattr_setclock(&attr, CLOCK_MONOTONIC)) {
146 pthread_cond_init(&condition->cond, NULL);
147 condition->clock_id = CLOCK_REALTIME;
148 } else {
149 pthread_cond_init(&condition->cond, &attr);
150 condition->clock_id = CLOCK_MONOTONIC;
151 }
152
153 pthread_condattr_destroy(&attr);
154 #else
155 pthread_cond_init(&condition->cond, NULL);
156 condition->clock_id = CLOCK_REALTIME;
157 #endif // COND_USE_CLOCK_MONOTONIC
158 }
159
mpp_cond_destroy(MppCond * condition)160 void mpp_cond_destroy(MppCond *condition)
161 {
162 pthread_cond_destroy(&condition->cond);
163 }
164
mpp_cond_wait(MppCond * condition,MppMutex * mutex)165 rk_s32 mpp_cond_wait(MppCond *condition, MppMutex *mutex)
166 {
167 return pthread_cond_wait(&condition->cond, &mutex->lock);
168 }
169
mpp_cond_timedwait(MppCond * condition,MppMutex * mutex,rk_s64 timeout)170 rk_s32 mpp_cond_timedwait(MppCond *condition, MppMutex *mutex, rk_s64 timeout)
171 {
172 struct timespec ts;
173
174 clock_gettime(condition->clock_id, &ts);
175
176 ts.tv_sec += timeout / 1000;
177 ts.tv_nsec += (timeout % 1000) * 1000000;
178 ts.tv_sec += ts.tv_nsec / 1000000000;
179 ts.tv_nsec %= 1000000000;
180
181 return pthread_cond_timedwait(&condition->cond, &mutex->lock, &ts);
182 }
183
mpp_cond_signal(MppCond * condition)184 rk_s32 mpp_cond_signal(MppCond *condition)
185 {
186 return pthread_cond_signal(&condition->cond);
187 }
188
mpp_cond_broadcast(MppCond * condition)189 rk_s32 mpp_cond_broadcast(MppCond *condition)
190 {
191 return pthread_cond_broadcast(&condition->cond);
192 }
193
194 // MppMutexCond functions
mpp_mutex_cond_init(MppMutexCond * mutexCond)195 void mpp_mutex_cond_init(MppMutexCond *mutexCond)
196 {
197 mpp_mutex_init(&mutexCond->lock);
198 mpp_cond_init(&mutexCond->cond);
199 }
200
mpp_mutex_cond_destroy(MppMutexCond * mutexCond)201 void mpp_mutex_cond_destroy(MppMutexCond *mutexCond)
202 {
203 mpp_mutex_destroy(&mutexCond->lock);
204 mpp_cond_destroy(&mutexCond->cond);
205 }
206
mpp_mutex_cond_lock(MppMutexCond * mutexCond)207 void mpp_mutex_cond_lock(MppMutexCond *mutexCond)
208 {
209 mpp_mutex_lock(&mutexCond->lock);
210 }
211
mpp_mutex_cond_unlock(MppMutexCond * mutexCond)212 void mpp_mutex_cond_unlock(MppMutexCond *mutexCond)
213 {
214 mpp_mutex_unlock(&mutexCond->lock);
215 }
216
mpp_mutex_cond_trylock(MppMutexCond * mutexCond)217 int mpp_mutex_cond_trylock(MppMutexCond *mutexCond)
218 {
219 return mpp_mutex_trylock(&mutexCond->lock);
220 }
221
mpp_mutex_cond_wait(MppMutexCond * mutexCond)222 rk_s32 mpp_mutex_cond_wait(MppMutexCond *mutexCond)
223 {
224 return mpp_cond_wait(&mutexCond->cond, &mutexCond->lock);
225 }
226
mpp_mutex_cond_timedwait(MppMutexCond * mutexCond,rk_s64 timeout)227 rk_s32 mpp_mutex_cond_timedwait(MppMutexCond *mutexCond, rk_s64 timeout)
228 {
229 return mpp_cond_timedwait(&mutexCond->cond, &mutexCond->lock, timeout);
230 }
231
mpp_mutex_cond_signal(MppMutexCond * mutexCond)232 void mpp_mutex_cond_signal(MppMutexCond *mutexCond)
233 {
234 mpp_cond_signal(&mutexCond->cond);
235 }
236
mpp_mutex_cond_broadcast(MppMutexCond * mutexCond)237 void mpp_mutex_cond_broadcast(MppMutexCond *mutexCond)
238 {
239 mpp_cond_broadcast(&mutexCond->cond);
240 }
241
242 // MppThread functions
mpp_thread_init(MppThread * thread,MppThreadFunc func,void * ctx,const char * name)243 void mpp_thread_init(MppThread *thread, MppThreadFunc func, void *ctx, const char *name)
244 {
245 thread->func = func;
246 thread->ctx = ctx;
247 int i;
248
249 if (name) {
250 strncpy(thread->name, name, THREAD_NAME_LEN - 1);
251 thread->name[THREAD_NAME_LEN - 1] = '\0';
252 }
253 for (i = 0; i < THREAD_SIGNAL_BUTT; i++) {
254 mpp_mutex_cond_init(&thread->mutex_cond[i]);
255 thread->thd_status[i] = MPP_THREAD_UNINITED;
256 }
257 }
258
mpp_thread_set_status(MppThread * thread,MppThreadStatus status,MppThreadSignalId id)259 void mpp_thread_set_status(MppThread *thread, MppThreadStatus status, MppThreadSignalId id)
260 {
261 assert(id < THREAD_SIGNAL_BUTT);
262 thread->thd_status[id] = status;
263 }
264
mpp_thread_get_status(MppThread * thread,MppThreadSignalId id)265 MppThreadStatus mpp_thread_get_status(MppThread *thread, MppThreadSignalId id)
266 {
267 assert(id < THREAD_SIGNAL_BUTT);
268 return thread->thd_status[id];
269 }
270
mpp_thread_lock(MppThread * thread,MppThreadSignalId id)271 void mpp_thread_lock(MppThread *thread, MppThreadSignalId id)
272 {
273 assert(id < THREAD_SIGNAL_BUTT);
274 mpp_mutex_cond_lock(&thread->mutex_cond[id]);
275 }
276
mpp_thread_unlock(MppThread * thread,MppThreadSignalId id)277 void mpp_thread_unlock(MppThread *thread, MppThreadSignalId id)
278 {
279 assert(id < THREAD_SIGNAL_BUTT);
280 mpp_mutex_cond_unlock(&thread->mutex_cond[id]);
281 }
282
mpp_thread_wait(MppThread * thread,MppThreadSignalId id)283 void mpp_thread_wait(MppThread *thread, MppThreadSignalId id)
284 {
285 assert(id < THREAD_SIGNAL_BUTT);
286 MppThreadStatus status = thread->thd_status[id];
287 thread->thd_status[id] = MPP_THREAD_WAITING;
288 mpp_mutex_cond_wait(&thread->mutex_cond[id]);
289
290 if (thread->thd_status[id] == MPP_THREAD_WAITING)
291 thread->thd_status[id] = status;
292 }
293
mpp_thread_signal(MppThread * thread,MppThreadSignalId id)294 void mpp_thread_signal(MppThread *thread, MppThreadSignalId id)
295 {
296 assert(id < THREAD_SIGNAL_BUTT);
297 mpp_mutex_cond_signal(&thread->mutex_cond[id]);
298 }
299
300 typedef struct MppSThdImpl_t {
301 char *name;
302 MppSThdFunc func;
303 MppSThdStatus status;
304 rk_s32 idx;
305 pthread_t thd;
306 pthread_mutex_t lock;
307 pthread_cond_t cond;
308 MppSThdCtx ctx;
309 } MppSThdImpl;
310
311 typedef struct MppSThdGrpImpl_t {
312 char name[THREAD_NAME_LEN];
313 rk_s32 count;
314 MppSThdStatus status;
315 pthread_mutex_t lock;
316 MppSThdImpl thds[];
317 } MppSThdGrpImpl;
318
state2str(MppSThdStatus state)319 static const char *state2str(MppSThdStatus state)
320 {
321 static const char *strof_sthd_status[] = {
322 "uninited",
323 "ready",
324 "running",
325 "waiting",
326 "stopping",
327 "invalid"
328 };
329
330 return state < MPP_STHD_BUTT ? strof_sthd_status[state] : strof_sthd_status[MPP_STHD_BUTT];
331 }
332
check_sthd(const char * name,MppSThdImpl * thd)333 static rk_s32 check_sthd(const char *name, MppSThdImpl *thd)
334 {
335 if (!thd) {
336 mpp_err("mpp_sthd NULL found at %s\n", name);
337 return MPP_NOK;
338 }
339
340 if (thd->ctx.thd != thd) {
341 mpp_err("mpp_sthd check %p:%p mismatch at %s\n", thd->ctx.thd, thd, name);
342 return MPP_NOK;
343 }
344
345 return MPP_OK;
346 }
347
348 #define CHECK_STHD(thd) check_sthd(__FUNCTION__, (MppSThdImpl *)(thd))
349
mpp_sthd_init(MppSThdImpl * thd,rk_s32 idx)350 static void mpp_sthd_init(MppSThdImpl *thd, rk_s32 idx)
351 {
352 pthread_mutexattr_t attr;
353
354 pthread_mutexattr_init(&attr);
355 pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
356 pthread_mutex_init(&thd->lock, &attr);
357 pthread_mutexattr_destroy(&attr);
358
359 pthread_cond_init(&thd->cond, NULL);
360 thd->ctx.thd = thd;
361 thd->idx = idx;
362 }
363
mpp_sthd_deinit(MppSThdImpl * thd)364 static void mpp_sthd_deinit(MppSThdImpl *thd)
365 {
366 mpp_assert(thd->ctx.thd == thd);
367 mpp_assert(thd->status < MPP_STHD_RUNNING);
368
369 pthread_mutex_lock(&thd->lock);
370 thd->status = MPP_STHD_UNINITED;
371 thd->ctx.thd = NULL;
372 pthread_mutex_unlock(&thd->lock);
373
374 pthread_cond_destroy(&thd->cond);
375 pthread_mutex_destroy(&thd->lock);
376 }
377
mpp_sthd_create(MppSThdImpl * thd)378 static MPP_RET mpp_sthd_create(MppSThdImpl *thd)
379 {
380 pthread_attr_t attr;
381 MPP_RET ret = MPP_NOK;
382
383 mpp_assert(thd->ctx.thd == thd);
384 mpp_assert(thd->status < MPP_STHD_RUNNING);
385
386 pthread_attr_init(&attr);
387 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
388
389 // NOTE: set status to running first
390 thd->status = MPP_STHD_RUNNING;
391 if (0 == pthread_create(&thd->thd, &attr, (MppThreadFunc)thd->func, &thd->ctx)) {
392 ret = (MPP_RET)pthread_setname_np(thd->thd, thd->name);
393 if (ret)
394 mpp_err("%s %p setname failed\n", thd->thd, thd->func);
395
396 thread_dbg(THREAD_DBG_FUNC, "thread %s %p context %p create success\n",
397 thd->name, thd->func, thd->ctx.ctx);
398 ret = MPP_OK;
399 } else {
400 thd->status = MPP_STHD_READY;
401 }
402
403 pthread_attr_destroy(&attr);
404
405 return ret;
406 }
407
mpp_sthd_get(const char * name)408 MppSThd mpp_sthd_get(const char *name)
409 {
410 rk_s32 size = MPP_ALIGN(sizeof(MppSThdImpl), 8) + THREAD_NAME_LEN;
411 MppSThdImpl *thd = mpp_calloc_size(MppSThdImpl, size);
412
413 if (!thd) {
414 mpp_err_f("failed to create simple thread\n");
415 return NULL;
416 }
417
418 thd->name = (char *)(thd + 1);
419 if (!name)
420 name = "mpp_sthd";
421
422 snprintf(thd->name, THREAD_NAME_LEN - 1, "%s", name);
423
424 mpp_sthd_init(thd, -1);
425
426 return thd;
427 }
428
mpp_sthd_put(MppSThd thd)429 void mpp_sthd_put(MppSThd thd)
430 {
431 MppSThdImpl *impl = (MppSThdImpl *)thd;
432
433 mpp_assert(impl);
434 mpp_assert(impl->ctx.thd == impl);
435 mpp_assert(impl->status == MPP_STHD_UNINITED || impl->status == MPP_STHD_READY);
436
437 mpp_sthd_deinit(impl);
438
439 mpp_free(impl);
440 }
441
mpp_sthd_get_status(MppSThd thd)442 MppSThdStatus mpp_sthd_get_status(MppSThd thd)
443 {
444 MppSThdImpl *impl = (MppSThdImpl *)thd;
445
446 CHECK_STHD(impl);
447
448 return impl->status;
449 }
450
mpp_sthd_get_name(MppSThd thd)451 const char* mpp_sthd_get_name(MppSThd thd)
452 {
453 MppSThdImpl *impl = (MppSThdImpl *)thd;
454
455 CHECK_STHD(impl);
456
457 return impl->name;
458 }
459
mpp_sthd_get_idx(MppSThd thd)460 rk_s32 mpp_sthd_get_idx(MppSThd thd)
461 {
462 MppSThdImpl *impl = (MppSThdImpl *)thd;
463
464 CHECK_STHD(impl);
465
466 return impl->idx;
467 }
468
mpp_sthd_check(MppSThd thd)469 rk_s32 mpp_sthd_check(MppSThd thd)
470 {
471 return CHECK_STHD(thd);
472 }
473
mpp_sthd_setup(MppSThd thd,MppSThdFunc func,void * ctx)474 void mpp_sthd_setup(MppSThd thd, MppSThdFunc func, void *ctx)
475 {
476 MppSThdImpl *impl = (MppSThdImpl *)thd;
477 MppSThdStatus status;
478
479 CHECK_STHD(impl);
480
481 pthread_mutex_lock(&impl->lock);
482 status = impl->status;
483 switch (status) {
484 case MPP_STHD_UNINITED :
485 case MPP_STHD_READY : {
486 impl->func = func;
487 impl->ctx.ctx = ctx;
488 impl->status = func ? MPP_STHD_READY : MPP_STHD_UNINITED;
489 } break;
490 default : {
491 mpp_err("%s can NOT setup on %s\n", impl->name, state2str(status));
492 } break;
493 }
494 pthread_mutex_unlock(&impl->lock);
495
496 CHECK_STHD(impl);
497 }
498
mpp_sthd_start(MppSThd thd)499 void mpp_sthd_start(MppSThd thd)
500 {
501 MppSThdImpl *impl = (MppSThdImpl *)thd;
502 MppSThdStatus status;
503
504 CHECK_STHD(impl);
505
506 /* we can only change callback function on uninit */
507 pthread_mutex_lock(&impl->lock);
508 status = impl->status;
509 switch (status) {
510 case MPP_STHD_READY : {
511 mpp_sthd_create(impl);
512 } break;
513 default : {
514 mpp_err("%s can NOT start on %s\n", impl->name, state2str(status));
515 } break;
516 }
517 pthread_mutex_unlock(&impl->lock);
518
519 CHECK_STHD(impl);
520 }
521
mpp_sthd_stop(MppSThd thd)522 void mpp_sthd_stop(MppSThd thd)
523 {
524 MppSThdImpl *impl = (MppSThdImpl *)thd;
525 MppSThdStatus status;
526
527 CHECK_STHD(impl);
528
529 pthread_mutex_lock(&impl->lock);
530 status = impl->status;
531 switch (status) {
532 case MPP_STHD_RUNNING :
533 case MPP_STHD_WAITING : {
534 status = MPP_STHD_STOPPING;
535 pthread_cond_signal(&impl->cond);
536 } break;
537 default : {
538 mpp_err("%s can NOT stop on %s\n", impl->name, state2str(status));
539 } break;
540 }
541 pthread_mutex_unlock(&impl->lock);
542
543 CHECK_STHD(impl);
544 }
545
mpp_sthd_stop_sync(MppSThd thd)546 void mpp_sthd_stop_sync(MppSThd thd)
547 {
548 MppSThdImpl *impl = (MppSThdImpl *)thd;
549 MppSThdStatus status;
550
551 CHECK_STHD(impl);
552
553 pthread_mutex_lock(&impl->lock);
554 status = impl->status;
555 switch (status) {
556 case MPP_STHD_STOPPING : {
557 void *dummy;
558
559 pthread_join(impl->thd, &dummy);
560 impl->status = MPP_STHD_READY;
561 } break;
562 default : {
563 mpp_err("%s can NOT stop on %s\n", impl->name, state2str(status));
564 } break;
565 }
566 pthread_mutex_unlock(&impl->lock);
567
568 CHECK_STHD(impl);
569 }
570
mpp_sthd_lock(MppSThd thd)571 void mpp_sthd_lock(MppSThd thd)
572 {
573 MppSThdImpl *impl = (MppSThdImpl *)thd;
574
575 CHECK_STHD(impl);
576
577 pthread_mutex_lock(&impl->lock);
578 }
579
mpp_sthd_unlock(MppSThd thd)580 void mpp_sthd_unlock(MppSThd thd)
581 {
582 MppSThdImpl *impl = (MppSThdImpl *)thd;
583
584 CHECK_STHD(impl);
585
586 pthread_mutex_unlock(&impl->lock);
587 }
588
mpp_sthd_trylock(MppSThd thd)589 int mpp_sthd_trylock(MppSThd thd)
590 {
591 MppSThdImpl *impl = (MppSThdImpl *)thd;
592
593 CHECK_STHD(impl);
594
595 return pthread_mutex_trylock(&impl->lock);
596 }
597
mpp_sthd_wait(MppSThd thd)598 void mpp_sthd_wait(MppSThd thd)
599 {
600 MppSThdImpl *impl = (MppSThdImpl *)thd;
601
602 CHECK_STHD(impl);
603
604 if (impl->status == MPP_STHD_RUNNING)
605 impl->status = MPP_STHD_WAITING;
606
607 pthread_cond_wait(&impl->cond, &impl->lock);
608
609 if (impl->status == MPP_STHD_WAITING)
610 impl->status = MPP_STHD_RUNNING;
611 }
612
mpp_sthd_signal(MppSThd thd)613 void mpp_sthd_signal(MppSThd thd)
614 {
615 MppSThdImpl *impl = (MppSThdImpl *)thd;
616
617 CHECK_STHD(impl);
618
619 pthread_cond_signal(&impl->cond);
620 }
621
mpp_sthd_broadcast(MppSThd thd)622 void mpp_sthd_broadcast(MppSThd thd)
623 {
624 MppSThdImpl *impl = (MppSThdImpl *)thd;
625
626 CHECK_STHD(impl);
627
628 pthread_cond_broadcast(&impl->cond);
629 }
630
mpp_sthd_grp_get(const char * name,rk_s32 count)631 MppSThdGrp mpp_sthd_grp_get(const char *name, rk_s32 count)
632 {
633 MppSThdGrpImpl *grp = NULL;
634
635 if (count > 0) {
636 rk_s32 elem_size = MPP_ALIGN(sizeof(MppSThdImpl), 8);
637 rk_s32 total_size = MPP_ALIGN(sizeof(MppSThdGrpImpl), 8) + count * elem_size;
638
639 grp = mpp_calloc_size(MppSThdGrpImpl, total_size);
640 if (grp) {
641 pthread_mutexattr_t attr;
642 rk_s32 i;
643
644 if (!name)
645 name = "mpp_sthd_grp";
646
647 snprintf(grp->name, THREAD_NAME_LEN - 1, "%s", name);
648
649 grp->count = count;
650 for (i = 0; i < count; i++) {
651 MppSThdImpl *thd = &grp->thds[i];
652
653 thd->name = grp->name;
654 mpp_sthd_init(thd, i);
655 }
656
657 pthread_mutexattr_init(&attr);
658 pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
659 pthread_mutex_init(&grp->lock, &attr);
660 pthread_mutexattr_destroy(&attr);
661 }
662 }
663
664 if (!grp)
665 mpp_err_f("can NOT create %d threads group\n", count);
666
667 return grp;
668 }
669
mpp_sthd_grp_put(MppSThdGrp grp)670 void mpp_sthd_grp_put(MppSThdGrp grp)
671 {
672 MppSThdGrpImpl *impl = (MppSThdGrpImpl *)grp;
673 rk_s32 i;
674
675 mpp_assert(impl);
676 mpp_assert(impl->status == MPP_STHD_UNINITED || impl->status == MPP_STHD_READY);
677
678 for (i = 0; i < impl->count; i++) {
679 MppSThdImpl *thd = &impl->thds[i];
680
681 mpp_sthd_deinit(thd);
682 }
683
684 mpp_free(impl);
685 }
686
mpp_sthd_grp_setup(MppSThdGrp grp,MppSThdFunc func,void * ctx)687 void mpp_sthd_grp_setup(MppSThdGrp grp, MppSThdFunc func, void *ctx)
688 {
689 MppSThdGrpImpl *impl = (MppSThdGrpImpl *)grp;
690 MppSThdStatus status;
691
692 mpp_assert(impl);
693
694 pthread_mutex_lock(&impl->lock);
695 status = impl->status;
696 switch (status) {
697 case MPP_STHD_UNINITED :
698 case MPP_STHD_READY : {
699 MppSThdStatus next = func ? MPP_STHD_READY : MPP_STHD_UNINITED;
700 rk_s32 i;
701
702 for (i = 0; i < impl->count; i++) {
703 MppSThdImpl *thd = &impl->thds[i];
704
705 thd->func = func;
706 thd->ctx.ctx = ctx;
707 thd->status = next;
708 }
709 impl->status = next;
710 } break;
711 default : {
712 mpp_err("%s can NOT setup on %s\n", impl->name, state2str(status));
713 } break;
714 }
715 pthread_mutex_unlock(&impl->lock);
716 }
717
mpp_sthd_grp_start(MppSThdGrp grp)718 void mpp_sthd_grp_start(MppSThdGrp grp)
719 {
720 MppSThdGrpImpl *impl = (MppSThdGrpImpl *)grp;
721 MppSThdStatus status;
722
723 mpp_assert(impl);
724
725 /* we can only change callback function on uninit */
726 pthread_mutex_lock(&impl->lock);
727 status = impl->status;
728 switch (status) {
729 case MPP_STHD_READY : {
730 rk_s32 i;
731
732 for (i = 0; i < impl->count; i++)
733 mpp_sthd_start(&impl->thds[i]);
734
735 impl->status = MPP_STHD_RUNNING;
736 } break;
737 default : {
738 mpp_err("%s can NOT start on %s\n", impl->name, state2str(status));
739 } break;
740 }
741 pthread_mutex_unlock(&impl->lock);
742 }
743
mpp_sthd_grp_stop(MppSThdGrp grp)744 void mpp_sthd_grp_stop(MppSThdGrp grp)
745 {
746 MppSThdGrpImpl *impl = (MppSThdGrpImpl *)grp;
747 MppSThdStatus status;
748
749 mpp_assert(impl);
750
751 /* we can only change callback function on uninit */
752 pthread_mutex_lock(&impl->lock);
753 status = impl->status;
754 switch (status) {
755 case MPP_STHD_RUNNING :
756 case MPP_STHD_WAITING : {
757 rk_s32 i;
758
759 impl->status = MPP_STHD_STOPPING;
760
761 for (i = 0; i < impl->count; i++) {
762 MppSThdImpl *thd = &impl->thds[i];
763
764 pthread_mutex_lock(&thd->lock);
765 thd->status = MPP_STHD_STOPPING;
766 pthread_cond_signal(&thd->cond);
767 pthread_mutex_unlock(&thd->lock);
768 }
769 } break;
770 default : {
771 mpp_err("%s can NOT stop on %s\n", impl->name, state2str(status));
772 } break;
773 }
774 pthread_mutex_unlock(&impl->lock);
775 }
776
mpp_sthd_grp_stop_sync(MppSThdGrp grp)777 void mpp_sthd_grp_stop_sync(MppSThdGrp grp)
778 {
779 MppSThdGrpImpl *impl = (MppSThdGrpImpl *)grp;
780 MppSThdStatus status;
781
782 mpp_assert(impl);
783
784 /* we can only change callback function on uninit */
785 pthread_mutex_lock(&impl->lock);
786 status = impl->status;
787 switch (status) {
788 case MPP_STHD_STOPPING : {
789 void *dummy;
790 rk_s32 i;
791
792 status = MPP_STHD_STOPPING;
793 for (i = 0; i < impl->count; i++) {
794 MppSThdImpl *thd = &impl->thds[i];
795
796 pthread_join(thd->thd, &dummy);
797 thd->status = MPP_STHD_READY;
798 }
799 impl->status = MPP_STHD_READY;
800 } break;
801 default : {
802 mpp_err("%s can NOT stop sync on %s\n", impl->name, state2str(status));
803 } break;
804 }
805 pthread_mutex_unlock(&impl->lock);
806 }
807
mpp_sthd_grp_get_each(MppSThdGrp grp,rk_s32 idx)808 MppSThd mpp_sthd_grp_get_each(MppSThdGrp grp, rk_s32 idx)
809 {
810 MppSThdGrpImpl *impl = (MppSThdGrpImpl *)grp;
811 MppSThd ret = NULL;
812
813 mpp_assert(impl);
814 mpp_assert(idx >= 0 && idx < impl->count);
815
816 pthread_mutex_lock(&impl->lock);
817 ret = &impl->thds[idx];
818 pthread_mutex_unlock(&impl->lock);
819
820 return ret;
821 }