xref: /rockchip-linux_mpp/osal/mpp_thread.c (revision 437bfbeb9567cca9cd9080e3f6954aa9d6a94f18)
1 /* SPDX-License-Identifier: Apache-2.0 OR MIT */
2 /*
3  * Copyright (c) 2015 Rockchip Electronics Co., Ltd.
4  */
5 
6 #define MODULE_TAG "mpp_thread"
7 
8 #include <string.h>
9 
10 #include "mpp_mem.h"
11 #include "mpp_lock.h"
12 #include "mpp_debug.h"
13 #include "mpp_common.h"
14 #include "mpp_thread.h"
15 
16 #define THREAD_DBG_FUNC             (0x00000001)
17 
18 static rk_u32 thread_debug = 0;
19 
20 #define thread_dbg(flag, fmt, ...)  _mpp_dbg(thread_debug, flag, fmt, ## __VA_ARGS__)
21 
mpp_thread_create(MppThreadFunc func,void * ctx,const char * name)22 MppThread *mpp_thread_create(MppThreadFunc func, void *ctx, const char *name)
23 {
24     MppThread *thread = mpp_malloc(MppThread, 1);
25     int i;
26 
27     if (thread) {
28         thread->func = func;
29         thread->ctx = ctx;
30 
31         thread->thd_status[THREAD_WORK] = MPP_THREAD_UNINITED;
32         thread->thd_status[THREAD_INPUT] = MPP_THREAD_RUNNING;
33         thread->thd_status[THREAD_OUTPUT] = MPP_THREAD_RUNNING;
34         thread->thd_status[THREAD_CONTROL] = MPP_THREAD_RUNNING;
35 
36         if (name) {
37             strncpy(thread->name, name, THREAD_NAME_LEN - 1);
38             thread->name[THREAD_NAME_LEN - 1] = '\0';
39         } else {
40             snprintf(thread->name, THREAD_NAME_LEN, "MppThread");
41         }
42         for (i = 0; i < THREAD_SIGNAL_BUTT; i++) {
43             mpp_mutex_cond_init(&thread->mutex_cond[i]);
44         }
45     }
46 
47     return thread;
48 }
49 
mpp_thread_dump_status(MppThread * thread)50 void mpp_thread_dump_status(MppThread *thread)
51 {
52     mpp_log("thread %s status: %d %d %d %d\n", thread->name,
53             thread->thd_status[THREAD_WORK], thread->thd_status[THREAD_INPUT],
54             thread->thd_status[THREAD_OUTPUT], thread->thd_status[THREAD_CONTROL]);
55 }
56 
mpp_thread_start(MppThread * thread)57 void mpp_thread_start(MppThread *thread)
58 {
59     pthread_attr_t attr;
60 
61     pthread_attr_init(&attr);
62     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
63 
64     if (mpp_thread_get_status(thread, THREAD_WORK) == MPP_THREAD_UNINITED) {
65         mpp_thread_set_status(thread, MPP_THREAD_RUNNING, THREAD_WORK);
66         if (0 == pthread_create(&thread->thd, &attr, thread->func, thread->ctx)) {
67             int ret = pthread_setname_np(thread->thd, thread->name);
68             if (ret) {
69                 mpp_err("thread %p setname %s failed\n", thread->func, thread->name);
70             }
71             thread_dbg(THREAD_DBG_FUNC, "thread %s %p context %p create success\n",
72                        thread->name, thread->func, thread->ctx);
73         } else {
74             mpp_thread_set_status(thread, MPP_THREAD_UNINITED, THREAD_WORK);
75         }
76     }
77 
78     pthread_attr_destroy(&attr);
79 }
80 
mpp_thread_stop(MppThread * thread)81 void mpp_thread_stop(MppThread *thread)
82 {
83     if (mpp_thread_get_status(thread, THREAD_WORK) != MPP_THREAD_UNINITED) {
84         void *dummy;
85 
86         mpp_thread_lock(thread, THREAD_WORK);
87         mpp_thread_set_status(thread, MPP_THREAD_STOPPING, THREAD_WORK);
88 
89         thread_dbg(THREAD_DBG_FUNC, "MPP_THREAD_STOPPING status set thd %p\n", (void *)thread);
90         mpp_thread_signal(thread, THREAD_WORK);
91         mpp_thread_unlock(thread, THREAD_WORK);
92 
93         pthread_join(thread->thd, &dummy);
94         thread_dbg(THREAD_DBG_FUNC, "thread %s %p context %p destroy success\n", thread->name, thread->func, thread->ctx);
95 
96         mpp_thread_set_status(thread, MPP_THREAD_UNINITED, THREAD_WORK);
97     }
98 }
99 
mpp_thread_destroy(MppThread * thread)100 void mpp_thread_destroy(MppThread *thread)
101 {
102     if (thread) {
103         mpp_thread_stop(thread);
104         mpp_free(thread);
105     }
106 }
107 
mpp_mutex_init(MppMutex * mutex)108 void mpp_mutex_init(MppMutex *mutex)
109 {
110     pthread_mutexattr_t attr;
111     pthread_mutexattr_init(&attr);
112     pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
113     pthread_mutex_init(&mutex->lock, &attr);
114     pthread_mutexattr_destroy(&attr);
115 }
116 
mpp_mutex_destroy(MppMutex * mutex)117 void mpp_mutex_destroy(MppMutex *mutex)
118 {
119     pthread_mutex_destroy(&mutex->lock);
120 }
121 
mpp_mutex_lock(MppMutex * mutex)122 void mpp_mutex_lock(MppMutex *mutex)
123 {
124     pthread_mutex_lock(&mutex->lock);
125 }
126 
mpp_mutex_unlock(MppMutex * mutex)127 void mpp_mutex_unlock(MppMutex *mutex)
128 {
129     pthread_mutex_unlock(&mutex->lock);
130 }
131 
mpp_mutex_trylock(MppMutex * mutex)132 int mpp_mutex_trylock(MppMutex *mutex)
133 {
134     return pthread_mutex_trylock(&mutex->lock);
135 }
136 
137 // MppCond functions
mpp_cond_init(MppCond * condition)138 void mpp_cond_init(MppCond *condition)
139 {
140 #ifdef COND_USE_CLOCK_MONOTONIC
141     pthread_condattr_t attr;
142 
143     pthread_condattr_init(&attr);;
144 
145     if (pthread_condattr_setclock(&attr, CLOCK_MONOTONIC)) {
146         pthread_cond_init(&condition->cond, NULL);
147         condition->clock_id = CLOCK_REALTIME;
148     } else {
149         pthread_cond_init(&condition->cond, &attr);
150         condition->clock_id = CLOCK_MONOTONIC;
151     }
152 
153     pthread_condattr_destroy(&attr);
154 #else
155     pthread_cond_init(&condition->cond, NULL);
156     condition->clock_id = CLOCK_REALTIME;
157 #endif // COND_USE_CLOCK_MONOTONIC
158 }
159 
mpp_cond_destroy(MppCond * condition)160 void mpp_cond_destroy(MppCond *condition)
161 {
162     pthread_cond_destroy(&condition->cond);
163 }
164 
mpp_cond_wait(MppCond * condition,MppMutex * mutex)165 rk_s32 mpp_cond_wait(MppCond *condition, MppMutex *mutex)
166 {
167     return pthread_cond_wait(&condition->cond, &mutex->lock);
168 }
169 
mpp_cond_timedwait(MppCond * condition,MppMutex * mutex,rk_s64 timeout)170 rk_s32 mpp_cond_timedwait(MppCond *condition, MppMutex *mutex, rk_s64 timeout)
171 {
172     struct timespec ts;
173 
174     clock_gettime(condition->clock_id, &ts);
175 
176     ts.tv_sec += timeout / 1000;
177     ts.tv_nsec += (timeout % 1000) * 1000000;
178     ts.tv_sec += ts.tv_nsec / 1000000000;
179     ts.tv_nsec %= 1000000000;
180 
181     return pthread_cond_timedwait(&condition->cond, &mutex->lock, &ts);
182 }
183 
mpp_cond_signal(MppCond * condition)184 rk_s32 mpp_cond_signal(MppCond *condition)
185 {
186     return pthread_cond_signal(&condition->cond);
187 }
188 
mpp_cond_broadcast(MppCond * condition)189 rk_s32 mpp_cond_broadcast(MppCond *condition)
190 {
191     return pthread_cond_broadcast(&condition->cond);
192 }
193 
194 // MppMutexCond functions
mpp_mutex_cond_init(MppMutexCond * mutexCond)195 void mpp_mutex_cond_init(MppMutexCond *mutexCond)
196 {
197     mpp_mutex_init(&mutexCond->lock);
198     mpp_cond_init(&mutexCond->cond);
199 }
200 
mpp_mutex_cond_destroy(MppMutexCond * mutexCond)201 void mpp_mutex_cond_destroy(MppMutexCond *mutexCond)
202 {
203     mpp_mutex_destroy(&mutexCond->lock);
204     mpp_cond_destroy(&mutexCond->cond);
205 }
206 
mpp_mutex_cond_lock(MppMutexCond * mutexCond)207 void mpp_mutex_cond_lock(MppMutexCond *mutexCond)
208 {
209     mpp_mutex_lock(&mutexCond->lock);
210 }
211 
mpp_mutex_cond_unlock(MppMutexCond * mutexCond)212 void mpp_mutex_cond_unlock(MppMutexCond *mutexCond)
213 {
214     mpp_mutex_unlock(&mutexCond->lock);
215 }
216 
mpp_mutex_cond_trylock(MppMutexCond * mutexCond)217 int mpp_mutex_cond_trylock(MppMutexCond *mutexCond)
218 {
219     return mpp_mutex_trylock(&mutexCond->lock);
220 }
221 
mpp_mutex_cond_wait(MppMutexCond * mutexCond)222 rk_s32 mpp_mutex_cond_wait(MppMutexCond *mutexCond)
223 {
224     return mpp_cond_wait(&mutexCond->cond, &mutexCond->lock);
225 }
226 
mpp_mutex_cond_timedwait(MppMutexCond * mutexCond,rk_s64 timeout)227 rk_s32 mpp_mutex_cond_timedwait(MppMutexCond *mutexCond, rk_s64 timeout)
228 {
229     return mpp_cond_timedwait(&mutexCond->cond, &mutexCond->lock, timeout);
230 }
231 
mpp_mutex_cond_signal(MppMutexCond * mutexCond)232 void mpp_mutex_cond_signal(MppMutexCond *mutexCond)
233 {
234     mpp_cond_signal(&mutexCond->cond);
235 }
236 
mpp_mutex_cond_broadcast(MppMutexCond * mutexCond)237 void mpp_mutex_cond_broadcast(MppMutexCond *mutexCond)
238 {
239     mpp_cond_broadcast(&mutexCond->cond);
240 }
241 
242 // MppThread functions
mpp_thread_init(MppThread * thread,MppThreadFunc func,void * ctx,const char * name)243 void mpp_thread_init(MppThread *thread, MppThreadFunc func, void *ctx, const char *name)
244 {
245     thread->func = func;
246     thread->ctx = ctx;
247     int i;
248 
249     if (name) {
250         strncpy(thread->name, name, THREAD_NAME_LEN - 1);
251         thread->name[THREAD_NAME_LEN - 1] = '\0';
252     }
253     for (i = 0; i < THREAD_SIGNAL_BUTT; i++) {
254         mpp_mutex_cond_init(&thread->mutex_cond[i]);
255         thread->thd_status[i] = MPP_THREAD_UNINITED;
256     }
257 }
258 
mpp_thread_set_status(MppThread * thread,MppThreadStatus status,MppThreadSignalId id)259 void mpp_thread_set_status(MppThread *thread, MppThreadStatus status, MppThreadSignalId id)
260 {
261     assert(id < THREAD_SIGNAL_BUTT);
262     thread->thd_status[id] = status;
263 }
264 
mpp_thread_get_status(MppThread * thread,MppThreadSignalId id)265 MppThreadStatus mpp_thread_get_status(MppThread *thread, MppThreadSignalId id)
266 {
267     assert(id < THREAD_SIGNAL_BUTT);
268     return thread->thd_status[id];
269 }
270 
mpp_thread_lock(MppThread * thread,MppThreadSignalId id)271 void mpp_thread_lock(MppThread *thread, MppThreadSignalId id)
272 {
273     assert(id < THREAD_SIGNAL_BUTT);
274     mpp_mutex_cond_lock(&thread->mutex_cond[id]);
275 }
276 
mpp_thread_unlock(MppThread * thread,MppThreadSignalId id)277 void mpp_thread_unlock(MppThread *thread, MppThreadSignalId id)
278 {
279     assert(id < THREAD_SIGNAL_BUTT);
280     mpp_mutex_cond_unlock(&thread->mutex_cond[id]);
281 }
282 
mpp_thread_wait(MppThread * thread,MppThreadSignalId id)283 void mpp_thread_wait(MppThread *thread, MppThreadSignalId id)
284 {
285     assert(id < THREAD_SIGNAL_BUTT);
286     MppThreadStatus status = thread->thd_status[id];
287     thread->thd_status[id] = MPP_THREAD_WAITING;
288     mpp_mutex_cond_wait(&thread->mutex_cond[id]);
289 
290     if (thread->thd_status[id] == MPP_THREAD_WAITING)
291         thread->thd_status[id] = status;
292 }
293 
mpp_thread_signal(MppThread * thread,MppThreadSignalId id)294 void mpp_thread_signal(MppThread *thread, MppThreadSignalId id)
295 {
296     assert(id < THREAD_SIGNAL_BUTT);
297     mpp_mutex_cond_signal(&thread->mutex_cond[id]);
298 }
299 
300 typedef struct MppSThdImpl_t {
301     char            *name;
302     MppSThdFunc     func;
303     MppSThdStatus   status;
304     rk_s32          idx;
305     pthread_t       thd;
306     pthread_mutex_t lock;
307     pthread_cond_t  cond;
308     MppSThdCtx      ctx;
309 } MppSThdImpl;
310 
311 typedef struct MppSThdGrpImpl_t {
312     char            name[THREAD_NAME_LEN];
313     rk_s32          count;
314     MppSThdStatus   status;
315     pthread_mutex_t lock;
316     MppSThdImpl     thds[];
317 } MppSThdGrpImpl;
318 
state2str(MppSThdStatus state)319 static const char *state2str(MppSThdStatus state)
320 {
321     static const char *strof_sthd_status[] = {
322         "uninited",
323         "ready",
324         "running",
325         "waiting",
326         "stopping",
327         "invalid"
328     };
329 
330     return state < MPP_STHD_BUTT ? strof_sthd_status[state] : strof_sthd_status[MPP_STHD_BUTT];
331 }
332 
check_sthd(const char * name,MppSThdImpl * thd)333 static rk_s32 check_sthd(const char *name, MppSThdImpl *thd)
334 {
335     if (!thd) {
336         mpp_err("mpp_sthd NULL found at %s\n", name);
337         return MPP_NOK;
338     }
339 
340     if (thd->ctx.thd != thd) {
341         mpp_err("mpp_sthd check %p:%p mismatch at %s\n", thd->ctx.thd, thd, name);
342         return MPP_NOK;
343     }
344 
345     return MPP_OK;
346 }
347 
348 #define CHECK_STHD(thd) check_sthd(__FUNCTION__, (MppSThdImpl *)(thd))
349 
mpp_sthd_init(MppSThdImpl * thd,rk_s32 idx)350 static void mpp_sthd_init(MppSThdImpl *thd, rk_s32 idx)
351 {
352     pthread_mutexattr_t attr;
353 
354     pthread_mutexattr_init(&attr);
355     pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
356     pthread_mutex_init(&thd->lock, &attr);
357     pthread_mutexattr_destroy(&attr);
358 
359     pthread_cond_init(&thd->cond, NULL);
360     thd->ctx.thd = thd;
361     thd->idx = idx;
362 }
363 
mpp_sthd_deinit(MppSThdImpl * thd)364 static void mpp_sthd_deinit(MppSThdImpl *thd)
365 {
366     mpp_assert(thd->ctx.thd == thd);
367     mpp_assert(thd->status < MPP_STHD_RUNNING);
368 
369     pthread_mutex_lock(&thd->lock);
370     thd->status = MPP_STHD_UNINITED;
371     thd->ctx.thd = NULL;
372     pthread_mutex_unlock(&thd->lock);
373 
374     pthread_cond_destroy(&thd->cond);
375     pthread_mutex_destroy(&thd->lock);
376 }
377 
mpp_sthd_create(MppSThdImpl * thd)378 static MPP_RET mpp_sthd_create(MppSThdImpl *thd)
379 {
380     pthread_attr_t attr;
381     MPP_RET ret = MPP_NOK;
382 
383     mpp_assert(thd->ctx.thd == thd);
384     mpp_assert(thd->status < MPP_STHD_RUNNING);
385 
386     pthread_attr_init(&attr);
387     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
388 
389     // NOTE: set status to running first
390     thd->status = MPP_STHD_RUNNING;
391     if (0 == pthread_create(&thd->thd, &attr, (MppThreadFunc)thd->func, &thd->ctx)) {
392         ret = (MPP_RET)pthread_setname_np(thd->thd, thd->name);
393         if (ret)
394             mpp_err("%s %p setname failed\n", thd->thd, thd->func);
395 
396         thread_dbg(THREAD_DBG_FUNC, "thread %s %p context %p create success\n",
397                    thd->name, thd->func, thd->ctx.ctx);
398         ret = MPP_OK;
399     } else {
400         thd->status = MPP_STHD_READY;
401     }
402 
403     pthread_attr_destroy(&attr);
404 
405     return ret;
406 }
407 
mpp_sthd_get(const char * name)408 MppSThd mpp_sthd_get(const char *name)
409 {
410     rk_s32 size = MPP_ALIGN(sizeof(MppSThdImpl), 8) + THREAD_NAME_LEN;
411     MppSThdImpl *thd = mpp_calloc_size(MppSThdImpl, size);
412 
413     if (!thd) {
414         mpp_err_f("failed to create simple thread\n");
415         return NULL;
416     }
417 
418     thd->name = (char *)(thd + 1);
419     if (!name)
420         name = "mpp_sthd";
421 
422     snprintf(thd->name, THREAD_NAME_LEN - 1, "%s", name);
423 
424     mpp_sthd_init(thd, -1);
425 
426     return thd;
427 }
428 
mpp_sthd_put(MppSThd thd)429 void mpp_sthd_put(MppSThd thd)
430 {
431     MppSThdImpl *impl = (MppSThdImpl *)thd;
432 
433     mpp_assert(impl);
434     mpp_assert(impl->ctx.thd == impl);
435     mpp_assert(impl->status == MPP_STHD_UNINITED || impl->status == MPP_STHD_READY);
436 
437     mpp_sthd_deinit(impl);
438 
439     mpp_free(impl);
440 }
441 
mpp_sthd_get_status(MppSThd thd)442 MppSThdStatus mpp_sthd_get_status(MppSThd thd)
443 {
444     MppSThdImpl *impl = (MppSThdImpl *)thd;
445 
446     CHECK_STHD(impl);
447 
448     return impl->status;
449 }
450 
mpp_sthd_get_name(MppSThd thd)451 const char* mpp_sthd_get_name(MppSThd thd)
452 {
453     MppSThdImpl *impl = (MppSThdImpl *)thd;
454 
455     CHECK_STHD(impl);
456 
457     return impl->name;
458 }
459 
mpp_sthd_get_idx(MppSThd thd)460 rk_s32 mpp_sthd_get_idx(MppSThd thd)
461 {
462     MppSThdImpl *impl = (MppSThdImpl *)thd;
463 
464     CHECK_STHD(impl);
465 
466     return impl->idx;
467 }
468 
mpp_sthd_check(MppSThd thd)469 rk_s32 mpp_sthd_check(MppSThd thd)
470 {
471     return CHECK_STHD(thd);
472 }
473 
mpp_sthd_setup(MppSThd thd,MppSThdFunc func,void * ctx)474 void mpp_sthd_setup(MppSThd thd, MppSThdFunc func, void *ctx)
475 {
476     MppSThdImpl *impl = (MppSThdImpl *)thd;
477     MppSThdStatus status;
478 
479     CHECK_STHD(impl);
480 
481     pthread_mutex_lock(&impl->lock);
482     status = impl->status;
483     switch (status) {
484     case MPP_STHD_UNINITED :
485     case MPP_STHD_READY : {
486         impl->func = func;
487         impl->ctx.ctx = ctx;
488         impl->status = func ? MPP_STHD_READY : MPP_STHD_UNINITED;
489     } break;
490     default : {
491         mpp_err("%s can NOT setup on %s\n", impl->name, state2str(status));
492     } break;
493     }
494     pthread_mutex_unlock(&impl->lock);
495 
496     CHECK_STHD(impl);
497 }
498 
mpp_sthd_start(MppSThd thd)499 void mpp_sthd_start(MppSThd thd)
500 {
501     MppSThdImpl *impl = (MppSThdImpl *)thd;
502     MppSThdStatus status;
503 
504     CHECK_STHD(impl);
505 
506     /* we can only change callback function on uninit */
507     pthread_mutex_lock(&impl->lock);
508     status = impl->status;
509     switch (status) {
510     case MPP_STHD_READY : {
511         mpp_sthd_create(impl);
512     } break;
513     default : {
514         mpp_err("%s can NOT start on %s\n", impl->name, state2str(status));
515     } break;
516     }
517     pthread_mutex_unlock(&impl->lock);
518 
519     CHECK_STHD(impl);
520 }
521 
mpp_sthd_stop(MppSThd thd)522 void mpp_sthd_stop(MppSThd thd)
523 {
524     MppSThdImpl *impl = (MppSThdImpl *)thd;
525     MppSThdStatus status;
526 
527     CHECK_STHD(impl);
528 
529     pthread_mutex_lock(&impl->lock);
530     status = impl->status;
531     switch (status) {
532     case MPP_STHD_RUNNING :
533     case MPP_STHD_WAITING : {
534         status = MPP_STHD_STOPPING;
535         pthread_cond_signal(&impl->cond);
536     } break;
537     default : {
538         mpp_err("%s can NOT stop on %s\n", impl->name, state2str(status));
539     } break;
540     }
541     pthread_mutex_unlock(&impl->lock);
542 
543     CHECK_STHD(impl);
544 }
545 
mpp_sthd_stop_sync(MppSThd thd)546 void mpp_sthd_stop_sync(MppSThd thd)
547 {
548     MppSThdImpl *impl = (MppSThdImpl *)thd;
549     MppSThdStatus status;
550 
551     CHECK_STHD(impl);
552 
553     pthread_mutex_lock(&impl->lock);
554     status = impl->status;
555     switch (status) {
556     case MPP_STHD_STOPPING : {
557         void *dummy;
558 
559         pthread_join(impl->thd, &dummy);
560         impl->status = MPP_STHD_READY;
561     } break;
562     default : {
563         mpp_err("%s can NOT stop on %s\n", impl->name, state2str(status));
564     } break;
565     }
566     pthread_mutex_unlock(&impl->lock);
567 
568     CHECK_STHD(impl);
569 }
570 
mpp_sthd_lock(MppSThd thd)571 void mpp_sthd_lock(MppSThd thd)
572 {
573     MppSThdImpl *impl = (MppSThdImpl *)thd;
574 
575     CHECK_STHD(impl);
576 
577     pthread_mutex_lock(&impl->lock);
578 }
579 
mpp_sthd_unlock(MppSThd thd)580 void mpp_sthd_unlock(MppSThd thd)
581 {
582     MppSThdImpl *impl = (MppSThdImpl *)thd;
583 
584     CHECK_STHD(impl);
585 
586     pthread_mutex_unlock(&impl->lock);
587 }
588 
mpp_sthd_trylock(MppSThd thd)589 int mpp_sthd_trylock(MppSThd thd)
590 {
591     MppSThdImpl *impl = (MppSThdImpl *)thd;
592 
593     CHECK_STHD(impl);
594 
595     return pthread_mutex_trylock(&impl->lock);
596 }
597 
mpp_sthd_wait(MppSThd thd)598 void mpp_sthd_wait(MppSThd thd)
599 {
600     MppSThdImpl *impl = (MppSThdImpl *)thd;
601 
602     CHECK_STHD(impl);
603 
604     if (impl->status == MPP_STHD_RUNNING)
605         impl->status = MPP_STHD_WAITING;
606 
607     pthread_cond_wait(&impl->cond, &impl->lock);
608 
609     if (impl->status == MPP_STHD_WAITING)
610         impl->status = MPP_STHD_RUNNING;
611 }
612 
mpp_sthd_signal(MppSThd thd)613 void mpp_sthd_signal(MppSThd thd)
614 {
615     MppSThdImpl *impl = (MppSThdImpl *)thd;
616 
617     CHECK_STHD(impl);
618 
619     pthread_cond_signal(&impl->cond);
620 }
621 
mpp_sthd_broadcast(MppSThd thd)622 void mpp_sthd_broadcast(MppSThd thd)
623 {
624     MppSThdImpl *impl = (MppSThdImpl *)thd;
625 
626     CHECK_STHD(impl);
627 
628     pthread_cond_broadcast(&impl->cond);
629 }
630 
mpp_sthd_grp_get(const char * name,rk_s32 count)631 MppSThdGrp mpp_sthd_grp_get(const char *name, rk_s32 count)
632 {
633     MppSThdGrpImpl *grp = NULL;
634 
635     if (count > 0) {
636         rk_s32 elem_size = MPP_ALIGN(sizeof(MppSThdImpl), 8);
637         rk_s32 total_size = MPP_ALIGN(sizeof(MppSThdGrpImpl), 8) + count * elem_size;
638 
639         grp = mpp_calloc_size(MppSThdGrpImpl, total_size);
640         if (grp) {
641             pthread_mutexattr_t attr;
642             rk_s32 i;
643 
644             if (!name)
645                 name = "mpp_sthd_grp";
646 
647             snprintf(grp->name, THREAD_NAME_LEN - 1, "%s", name);
648 
649             grp->count = count;
650             for (i = 0; i < count; i++) {
651                 MppSThdImpl *thd = &grp->thds[i];
652 
653                 thd->name = grp->name;
654                 mpp_sthd_init(thd, i);
655             }
656 
657             pthread_mutexattr_init(&attr);
658             pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
659             pthread_mutex_init(&grp->lock, &attr);
660             pthread_mutexattr_destroy(&attr);
661         }
662     }
663 
664     if (!grp)
665         mpp_err_f("can NOT create %d threads group\n", count);
666 
667     return grp;
668 }
669 
mpp_sthd_grp_put(MppSThdGrp grp)670 void mpp_sthd_grp_put(MppSThdGrp grp)
671 {
672     MppSThdGrpImpl *impl = (MppSThdGrpImpl *)grp;
673     rk_s32 i;
674 
675     mpp_assert(impl);
676     mpp_assert(impl->status == MPP_STHD_UNINITED || impl->status == MPP_STHD_READY);
677 
678     for (i = 0; i < impl->count; i++) {
679         MppSThdImpl *thd = &impl->thds[i];
680 
681         mpp_sthd_deinit(thd);
682     }
683 
684     mpp_free(impl);
685 }
686 
mpp_sthd_grp_setup(MppSThdGrp grp,MppSThdFunc func,void * ctx)687 void mpp_sthd_grp_setup(MppSThdGrp grp, MppSThdFunc func, void *ctx)
688 {
689     MppSThdGrpImpl *impl = (MppSThdGrpImpl *)grp;
690     MppSThdStatus status;
691 
692     mpp_assert(impl);
693 
694     pthread_mutex_lock(&impl->lock);
695     status = impl->status;
696     switch (status) {
697     case MPP_STHD_UNINITED :
698     case MPP_STHD_READY : {
699         MppSThdStatus next = func ? MPP_STHD_READY : MPP_STHD_UNINITED;
700         rk_s32 i;
701 
702         for (i = 0; i < impl->count; i++) {
703             MppSThdImpl *thd = &impl->thds[i];
704 
705             thd->func = func;
706             thd->ctx.ctx = ctx;
707             thd->status = next;
708         }
709         impl->status = next;
710     } break;
711     default : {
712         mpp_err("%s can NOT setup on %s\n", impl->name, state2str(status));
713     } break;
714     }
715     pthread_mutex_unlock(&impl->lock);
716 }
717 
mpp_sthd_grp_start(MppSThdGrp grp)718 void mpp_sthd_grp_start(MppSThdGrp grp)
719 {
720     MppSThdGrpImpl *impl = (MppSThdGrpImpl *)grp;
721     MppSThdStatus status;
722 
723     mpp_assert(impl);
724 
725     /* we can only change callback function on uninit */
726     pthread_mutex_lock(&impl->lock);
727     status = impl->status;
728     switch (status) {
729     case MPP_STHD_READY : {
730         rk_s32 i;
731 
732         for (i = 0; i < impl->count; i++)
733             mpp_sthd_start(&impl->thds[i]);
734 
735         impl->status = MPP_STHD_RUNNING;
736     } break;
737     default : {
738         mpp_err("%s can NOT start on %s\n", impl->name, state2str(status));
739     } break;
740     }
741     pthread_mutex_unlock(&impl->lock);
742 }
743 
mpp_sthd_grp_stop(MppSThdGrp grp)744 void mpp_sthd_grp_stop(MppSThdGrp grp)
745 {
746     MppSThdGrpImpl *impl = (MppSThdGrpImpl *)grp;
747     MppSThdStatus status;
748 
749     mpp_assert(impl);
750 
751     /* we can only change callback function on uninit */
752     pthread_mutex_lock(&impl->lock);
753     status = impl->status;
754     switch (status) {
755     case MPP_STHD_RUNNING :
756     case MPP_STHD_WAITING : {
757         rk_s32 i;
758 
759         impl->status = MPP_STHD_STOPPING;
760 
761         for (i = 0; i < impl->count; i++) {
762             MppSThdImpl *thd = &impl->thds[i];
763 
764             pthread_mutex_lock(&thd->lock);
765             thd->status = MPP_STHD_STOPPING;
766             pthread_cond_signal(&thd->cond);
767             pthread_mutex_unlock(&thd->lock);
768         }
769     } break;
770     default : {
771         mpp_err("%s can NOT stop on %s\n", impl->name, state2str(status));
772     } break;
773     }
774     pthread_mutex_unlock(&impl->lock);
775 }
776 
mpp_sthd_grp_stop_sync(MppSThdGrp grp)777 void mpp_sthd_grp_stop_sync(MppSThdGrp grp)
778 {
779     MppSThdGrpImpl *impl = (MppSThdGrpImpl *)grp;
780     MppSThdStatus status;
781 
782     mpp_assert(impl);
783 
784     /* we can only change callback function on uninit */
785     pthread_mutex_lock(&impl->lock);
786     status = impl->status;
787     switch (status) {
788     case MPP_STHD_STOPPING : {
789         void *dummy;
790         rk_s32 i;
791 
792         status = MPP_STHD_STOPPING;
793         for (i = 0; i < impl->count; i++) {
794             MppSThdImpl *thd = &impl->thds[i];
795 
796             pthread_join(thd->thd, &dummy);
797             thd->status = MPP_STHD_READY;
798         }
799         impl->status = MPP_STHD_READY;
800     } break;
801     default : {
802         mpp_err("%s can NOT stop sync on %s\n", impl->name, state2str(status));
803     } break;
804     }
805     pthread_mutex_unlock(&impl->lock);
806 }
807 
mpp_sthd_grp_get_each(MppSThdGrp grp,rk_s32 idx)808 MppSThd mpp_sthd_grp_get_each(MppSThdGrp grp, rk_s32 idx)
809 {
810     MppSThdGrpImpl *impl = (MppSThdGrpImpl *)grp;
811     MppSThd ret = NULL;
812 
813     mpp_assert(impl);
814     mpp_assert(idx >= 0 && idx < impl->count);
815 
816     pthread_mutex_lock(&impl->lock);
817     ret = &impl->thds[idx];
818     pthread_mutex_unlock(&impl->lock);
819 
820     return ret;
821 }