xref: /rockchip-linux_mpp/osal/mpp_queue.c (revision 437bfbeb9567cca9cd9080e3f6954aa9d6a94f18)
1 /* SPDX-License-Identifier: Apache-2.0 OR MIT */
2 /*
3  * Copyright (c) 2017 Rockchip Electronics Co., Ltd.
4  */
5 
6 #include "mpp_mem.h"
7 #include "mpp_queue.h"
8 
mpp_queue_create(node_destructor func)9 MppQueue* mpp_queue_create(node_destructor func)
10 {
11     MppQueue *queue = mpp_malloc(MppQueue, 1);
12 
13     queue->list = mpp_list_create(func);
14     sem_init(&queue->queue_pending, 0, 0);
15     queue->flush_flag = 0;
16 
17     return queue;
18 }
19 
mpp_queue_destroy(MppQueue * queue)20 void mpp_queue_destroy(MppQueue *queue)
21 {
22     mpp_list_destroy(queue->list);
23     mpp_free(queue);
24 }
25 
mpp_queue_push(MppQueue * queue,void * data,rk_s32 size)26 rk_s32 mpp_queue_push(MppQueue *queue, void *data, rk_s32 size)
27 {
28     rk_s32 ret = rk_ok;
29 
30     ret = mpp_list_add_at_tail(queue->list, data, size);
31     queue->flush_flag = 0;
32     sem_post(&queue->queue_pending);
33 
34     return ret;
35 }
36 
mpp_queue_pull(MppQueue * queue,void * data,rk_s32 size)37 rk_s32 mpp_queue_pull(MppQueue *queue, void *data, rk_s32 size)
38 {
39     rk_s32 ret = rk_ok;
40 
41     if (!queue->flush_flag)
42         sem_wait(&queue->queue_pending);
43 
44     mpp_mutex_cond_lock(&queue->list->cond_lock);
45     if (!mpp_list_size(queue->list)) {
46         mpp_mutex_cond_unlock(&queue->list->cond_lock);
47         return ret;
48     }
49 
50     ret = mpp_list_del_at_head(queue->list, data, size);
51     mpp_mutex_cond_unlock(&queue->list->cond_lock);
52 
53     return ret;
54 }
55 
mpp_queue_flush(MppQueue * queue)56 rk_s32 mpp_queue_flush(MppQueue *queue)
57 {
58     if (queue->flush_flag)
59         return 0;
60 
61     queue->flush_flag = 1;
62     sem_post(&queue->queue_pending);
63     mpp_list_flush(queue->list);
64 
65     return 0;
66 }
67