xref: /OK3568_Linux_fs/kernel/include/linux/ptr_ring.h (revision 4882a59341e53eb6f0b4789bf948001014eff981)
1*4882a593Smuzhiyun /* SPDX-License-Identifier: GPL-2.0-or-later */
2*4882a593Smuzhiyun /*
3*4882a593Smuzhiyun  *	Definitions for the 'struct ptr_ring' datastructure.
4*4882a593Smuzhiyun  *
5*4882a593Smuzhiyun  *	Author:
6*4882a593Smuzhiyun  *		Michael S. Tsirkin <mst@redhat.com>
7*4882a593Smuzhiyun  *
8*4882a593Smuzhiyun  *	Copyright (C) 2016 Red Hat, Inc.
9*4882a593Smuzhiyun  *
10*4882a593Smuzhiyun  *	This is a limited-size FIFO maintaining pointers in FIFO order, with
11*4882a593Smuzhiyun  *	one CPU producing entries and another consuming entries from a FIFO.
12*4882a593Smuzhiyun  *
13*4882a593Smuzhiyun  *	This implementation tries to minimize cache-contention when there is a
14*4882a593Smuzhiyun  *	single producer and a single consumer CPU.
15*4882a593Smuzhiyun  */
16*4882a593Smuzhiyun 
17*4882a593Smuzhiyun #ifndef _LINUX_PTR_RING_H
18*4882a593Smuzhiyun #define _LINUX_PTR_RING_H 1
19*4882a593Smuzhiyun 
20*4882a593Smuzhiyun #ifdef __KERNEL__
21*4882a593Smuzhiyun #include <linux/spinlock.h>
22*4882a593Smuzhiyun #include <linux/cache.h>
23*4882a593Smuzhiyun #include <linux/types.h>
24*4882a593Smuzhiyun #include <linux/compiler.h>
25*4882a593Smuzhiyun #include <linux/slab.h>
26*4882a593Smuzhiyun #include <linux/mm.h>
27*4882a593Smuzhiyun #include <asm/errno.h>
28*4882a593Smuzhiyun #endif
29*4882a593Smuzhiyun 
30*4882a593Smuzhiyun struct ptr_ring {
31*4882a593Smuzhiyun 	int producer ____cacheline_aligned_in_smp;
32*4882a593Smuzhiyun 	spinlock_t producer_lock;
33*4882a593Smuzhiyun 	int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */
34*4882a593Smuzhiyun 	int consumer_tail; /* next entry to invalidate */
35*4882a593Smuzhiyun 	spinlock_t consumer_lock;
36*4882a593Smuzhiyun 	/* Shared consumer/producer data */
37*4882a593Smuzhiyun 	/* Read-only by both the producer and the consumer */
38*4882a593Smuzhiyun 	int size ____cacheline_aligned_in_smp; /* max entries in queue */
39*4882a593Smuzhiyun 	int batch; /* number of entries to consume in a batch */
40*4882a593Smuzhiyun 	void **queue;
41*4882a593Smuzhiyun };
42*4882a593Smuzhiyun 
43*4882a593Smuzhiyun /* Note: callers invoking this in a loop must use a compiler barrier,
44*4882a593Smuzhiyun  * for example cpu_relax().
45*4882a593Smuzhiyun  *
46*4882a593Smuzhiyun  * NB: this is unlike __ptr_ring_empty in that callers must hold producer_lock:
47*4882a593Smuzhiyun  * see e.g. ptr_ring_full.
48*4882a593Smuzhiyun  */
__ptr_ring_full(struct ptr_ring * r)49*4882a593Smuzhiyun static inline bool __ptr_ring_full(struct ptr_ring *r)
50*4882a593Smuzhiyun {
51*4882a593Smuzhiyun 	return r->queue[r->producer];
52*4882a593Smuzhiyun }
53*4882a593Smuzhiyun 
ptr_ring_full(struct ptr_ring * r)54*4882a593Smuzhiyun static inline bool ptr_ring_full(struct ptr_ring *r)
55*4882a593Smuzhiyun {
56*4882a593Smuzhiyun 	bool ret;
57*4882a593Smuzhiyun 
58*4882a593Smuzhiyun 	spin_lock(&r->producer_lock);
59*4882a593Smuzhiyun 	ret = __ptr_ring_full(r);
60*4882a593Smuzhiyun 	spin_unlock(&r->producer_lock);
61*4882a593Smuzhiyun 
62*4882a593Smuzhiyun 	return ret;
63*4882a593Smuzhiyun }
64*4882a593Smuzhiyun 
ptr_ring_full_irq(struct ptr_ring * r)65*4882a593Smuzhiyun static inline bool ptr_ring_full_irq(struct ptr_ring *r)
66*4882a593Smuzhiyun {
67*4882a593Smuzhiyun 	bool ret;
68*4882a593Smuzhiyun 
69*4882a593Smuzhiyun 	spin_lock_irq(&r->producer_lock);
70*4882a593Smuzhiyun 	ret = __ptr_ring_full(r);
71*4882a593Smuzhiyun 	spin_unlock_irq(&r->producer_lock);
72*4882a593Smuzhiyun 
73*4882a593Smuzhiyun 	return ret;
74*4882a593Smuzhiyun }
75*4882a593Smuzhiyun 
ptr_ring_full_any(struct ptr_ring * r)76*4882a593Smuzhiyun static inline bool ptr_ring_full_any(struct ptr_ring *r)
77*4882a593Smuzhiyun {
78*4882a593Smuzhiyun 	unsigned long flags;
79*4882a593Smuzhiyun 	bool ret;
80*4882a593Smuzhiyun 
81*4882a593Smuzhiyun 	spin_lock_irqsave(&r->producer_lock, flags);
82*4882a593Smuzhiyun 	ret = __ptr_ring_full(r);
83*4882a593Smuzhiyun 	spin_unlock_irqrestore(&r->producer_lock, flags);
84*4882a593Smuzhiyun 
85*4882a593Smuzhiyun 	return ret;
86*4882a593Smuzhiyun }
87*4882a593Smuzhiyun 
ptr_ring_full_bh(struct ptr_ring * r)88*4882a593Smuzhiyun static inline bool ptr_ring_full_bh(struct ptr_ring *r)
89*4882a593Smuzhiyun {
90*4882a593Smuzhiyun 	bool ret;
91*4882a593Smuzhiyun 
92*4882a593Smuzhiyun 	spin_lock_bh(&r->producer_lock);
93*4882a593Smuzhiyun 	ret = __ptr_ring_full(r);
94*4882a593Smuzhiyun 	spin_unlock_bh(&r->producer_lock);
95*4882a593Smuzhiyun 
96*4882a593Smuzhiyun 	return ret;
97*4882a593Smuzhiyun }
98*4882a593Smuzhiyun 
99*4882a593Smuzhiyun /* Note: callers invoking this in a loop must use a compiler barrier,
100*4882a593Smuzhiyun  * for example cpu_relax(). Callers must hold producer_lock.
101*4882a593Smuzhiyun  * Callers are responsible for making sure pointer that is being queued
102*4882a593Smuzhiyun  * points to a valid data.
103*4882a593Smuzhiyun  */
__ptr_ring_produce(struct ptr_ring * r,void * ptr)104*4882a593Smuzhiyun static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
105*4882a593Smuzhiyun {
106*4882a593Smuzhiyun 	if (unlikely(!r->size) || r->queue[r->producer])
107*4882a593Smuzhiyun 		return -ENOSPC;
108*4882a593Smuzhiyun 
109*4882a593Smuzhiyun 	/* Make sure the pointer we are storing points to a valid data. */
110*4882a593Smuzhiyun 	/* Pairs with the dependency ordering in __ptr_ring_consume. */
111*4882a593Smuzhiyun 	smp_wmb();
112*4882a593Smuzhiyun 
113*4882a593Smuzhiyun 	WRITE_ONCE(r->queue[r->producer++], ptr);
114*4882a593Smuzhiyun 	if (unlikely(r->producer >= r->size))
115*4882a593Smuzhiyun 		r->producer = 0;
116*4882a593Smuzhiyun 	return 0;
117*4882a593Smuzhiyun }
118*4882a593Smuzhiyun 
119*4882a593Smuzhiyun /*
120*4882a593Smuzhiyun  * Note: resize (below) nests producer lock within consumer lock, so if you
121*4882a593Smuzhiyun  * consume in interrupt or BH context, you must disable interrupts/BH when
122*4882a593Smuzhiyun  * calling this.
123*4882a593Smuzhiyun  */
ptr_ring_produce(struct ptr_ring * r,void * ptr)124*4882a593Smuzhiyun static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
125*4882a593Smuzhiyun {
126*4882a593Smuzhiyun 	int ret;
127*4882a593Smuzhiyun 
128*4882a593Smuzhiyun 	spin_lock(&r->producer_lock);
129*4882a593Smuzhiyun 	ret = __ptr_ring_produce(r, ptr);
130*4882a593Smuzhiyun 	spin_unlock(&r->producer_lock);
131*4882a593Smuzhiyun 
132*4882a593Smuzhiyun 	return ret;
133*4882a593Smuzhiyun }
134*4882a593Smuzhiyun 
ptr_ring_produce_irq(struct ptr_ring * r,void * ptr)135*4882a593Smuzhiyun static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr)
136*4882a593Smuzhiyun {
137*4882a593Smuzhiyun 	int ret;
138*4882a593Smuzhiyun 
139*4882a593Smuzhiyun 	spin_lock_irq(&r->producer_lock);
140*4882a593Smuzhiyun 	ret = __ptr_ring_produce(r, ptr);
141*4882a593Smuzhiyun 	spin_unlock_irq(&r->producer_lock);
142*4882a593Smuzhiyun 
143*4882a593Smuzhiyun 	return ret;
144*4882a593Smuzhiyun }
145*4882a593Smuzhiyun 
ptr_ring_produce_any(struct ptr_ring * r,void * ptr)146*4882a593Smuzhiyun static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr)
147*4882a593Smuzhiyun {
148*4882a593Smuzhiyun 	unsigned long flags;
149*4882a593Smuzhiyun 	int ret;
150*4882a593Smuzhiyun 
151*4882a593Smuzhiyun 	spin_lock_irqsave(&r->producer_lock, flags);
152*4882a593Smuzhiyun 	ret = __ptr_ring_produce(r, ptr);
153*4882a593Smuzhiyun 	spin_unlock_irqrestore(&r->producer_lock, flags);
154*4882a593Smuzhiyun 
155*4882a593Smuzhiyun 	return ret;
156*4882a593Smuzhiyun }
157*4882a593Smuzhiyun 
ptr_ring_produce_bh(struct ptr_ring * r,void * ptr)158*4882a593Smuzhiyun static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
159*4882a593Smuzhiyun {
160*4882a593Smuzhiyun 	int ret;
161*4882a593Smuzhiyun 
162*4882a593Smuzhiyun 	spin_lock_bh(&r->producer_lock);
163*4882a593Smuzhiyun 	ret = __ptr_ring_produce(r, ptr);
164*4882a593Smuzhiyun 	spin_unlock_bh(&r->producer_lock);
165*4882a593Smuzhiyun 
166*4882a593Smuzhiyun 	return ret;
167*4882a593Smuzhiyun }
168*4882a593Smuzhiyun 
__ptr_ring_peek(struct ptr_ring * r)169*4882a593Smuzhiyun static inline void *__ptr_ring_peek(struct ptr_ring *r)
170*4882a593Smuzhiyun {
171*4882a593Smuzhiyun 	if (likely(r->size))
172*4882a593Smuzhiyun 		return READ_ONCE(r->queue[r->consumer_head]);
173*4882a593Smuzhiyun 	return NULL;
174*4882a593Smuzhiyun }
175*4882a593Smuzhiyun 
176*4882a593Smuzhiyun /*
177*4882a593Smuzhiyun  * Test ring empty status without taking any locks.
178*4882a593Smuzhiyun  *
179*4882a593Smuzhiyun  * NB: This is only safe to call if ring is never resized.
180*4882a593Smuzhiyun  *
181*4882a593Smuzhiyun  * However, if some other CPU consumes ring entries at the same time, the value
182*4882a593Smuzhiyun  * returned is not guaranteed to be correct.
183*4882a593Smuzhiyun  *
184*4882a593Smuzhiyun  * In this case - to avoid incorrectly detecting the ring
185*4882a593Smuzhiyun  * as empty - the CPU consuming the ring entries is responsible
186*4882a593Smuzhiyun  * for either consuming all ring entries until the ring is empty,
187*4882a593Smuzhiyun  * or synchronizing with some other CPU and causing it to
188*4882a593Smuzhiyun  * re-test __ptr_ring_empty and/or consume the ring enteries
189*4882a593Smuzhiyun  * after the synchronization point.
190*4882a593Smuzhiyun  *
191*4882a593Smuzhiyun  * Note: callers invoking this in a loop must use a compiler barrier,
192*4882a593Smuzhiyun  * for example cpu_relax().
193*4882a593Smuzhiyun  */
__ptr_ring_empty(struct ptr_ring * r)194*4882a593Smuzhiyun static inline bool __ptr_ring_empty(struct ptr_ring *r)
195*4882a593Smuzhiyun {
196*4882a593Smuzhiyun 	if (likely(r->size))
197*4882a593Smuzhiyun 		return !r->queue[READ_ONCE(r->consumer_head)];
198*4882a593Smuzhiyun 	return true;
199*4882a593Smuzhiyun }
200*4882a593Smuzhiyun 
ptr_ring_empty(struct ptr_ring * r)201*4882a593Smuzhiyun static inline bool ptr_ring_empty(struct ptr_ring *r)
202*4882a593Smuzhiyun {
203*4882a593Smuzhiyun 	bool ret;
204*4882a593Smuzhiyun 
205*4882a593Smuzhiyun 	spin_lock(&r->consumer_lock);
206*4882a593Smuzhiyun 	ret = __ptr_ring_empty(r);
207*4882a593Smuzhiyun 	spin_unlock(&r->consumer_lock);
208*4882a593Smuzhiyun 
209*4882a593Smuzhiyun 	return ret;
210*4882a593Smuzhiyun }
211*4882a593Smuzhiyun 
ptr_ring_empty_irq(struct ptr_ring * r)212*4882a593Smuzhiyun static inline bool ptr_ring_empty_irq(struct ptr_ring *r)
213*4882a593Smuzhiyun {
214*4882a593Smuzhiyun 	bool ret;
215*4882a593Smuzhiyun 
216*4882a593Smuzhiyun 	spin_lock_irq(&r->consumer_lock);
217*4882a593Smuzhiyun 	ret = __ptr_ring_empty(r);
218*4882a593Smuzhiyun 	spin_unlock_irq(&r->consumer_lock);
219*4882a593Smuzhiyun 
220*4882a593Smuzhiyun 	return ret;
221*4882a593Smuzhiyun }
222*4882a593Smuzhiyun 
ptr_ring_empty_any(struct ptr_ring * r)223*4882a593Smuzhiyun static inline bool ptr_ring_empty_any(struct ptr_ring *r)
224*4882a593Smuzhiyun {
225*4882a593Smuzhiyun 	unsigned long flags;
226*4882a593Smuzhiyun 	bool ret;
227*4882a593Smuzhiyun 
228*4882a593Smuzhiyun 	spin_lock_irqsave(&r->consumer_lock, flags);
229*4882a593Smuzhiyun 	ret = __ptr_ring_empty(r);
230*4882a593Smuzhiyun 	spin_unlock_irqrestore(&r->consumer_lock, flags);
231*4882a593Smuzhiyun 
232*4882a593Smuzhiyun 	return ret;
233*4882a593Smuzhiyun }
234*4882a593Smuzhiyun 
ptr_ring_empty_bh(struct ptr_ring * r)235*4882a593Smuzhiyun static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
236*4882a593Smuzhiyun {
237*4882a593Smuzhiyun 	bool ret;
238*4882a593Smuzhiyun 
239*4882a593Smuzhiyun 	spin_lock_bh(&r->consumer_lock);
240*4882a593Smuzhiyun 	ret = __ptr_ring_empty(r);
241*4882a593Smuzhiyun 	spin_unlock_bh(&r->consumer_lock);
242*4882a593Smuzhiyun 
243*4882a593Smuzhiyun 	return ret;
244*4882a593Smuzhiyun }
245*4882a593Smuzhiyun 
246*4882a593Smuzhiyun /* Must only be called after __ptr_ring_peek returned !NULL */
__ptr_ring_discard_one(struct ptr_ring * r)247*4882a593Smuzhiyun static inline void __ptr_ring_discard_one(struct ptr_ring *r)
248*4882a593Smuzhiyun {
249*4882a593Smuzhiyun 	/* Fundamentally, what we want to do is update consumer
250*4882a593Smuzhiyun 	 * index and zero out the entry so producer can reuse it.
251*4882a593Smuzhiyun 	 * Doing it naively at each consume would be as simple as:
252*4882a593Smuzhiyun 	 *       consumer = r->consumer;
253*4882a593Smuzhiyun 	 *       r->queue[consumer++] = NULL;
254*4882a593Smuzhiyun 	 *       if (unlikely(consumer >= r->size))
255*4882a593Smuzhiyun 	 *               consumer = 0;
256*4882a593Smuzhiyun 	 *       r->consumer = consumer;
257*4882a593Smuzhiyun 	 * but that is suboptimal when the ring is full as producer is writing
258*4882a593Smuzhiyun 	 * out new entries in the same cache line.  Defer these updates until a
259*4882a593Smuzhiyun 	 * batch of entries has been consumed.
260*4882a593Smuzhiyun 	 */
261*4882a593Smuzhiyun 	/* Note: we must keep consumer_head valid at all times for __ptr_ring_empty
262*4882a593Smuzhiyun 	 * to work correctly.
263*4882a593Smuzhiyun 	 */
264*4882a593Smuzhiyun 	int consumer_head = r->consumer_head;
265*4882a593Smuzhiyun 	int head = consumer_head++;
266*4882a593Smuzhiyun 
267*4882a593Smuzhiyun 	/* Once we have processed enough entries invalidate them in
268*4882a593Smuzhiyun 	 * the ring all at once so producer can reuse their space in the ring.
269*4882a593Smuzhiyun 	 * We also do this when we reach end of the ring - not mandatory
270*4882a593Smuzhiyun 	 * but helps keep the implementation simple.
271*4882a593Smuzhiyun 	 */
272*4882a593Smuzhiyun 	if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
273*4882a593Smuzhiyun 		     consumer_head >= r->size)) {
274*4882a593Smuzhiyun 		/* Zero out entries in the reverse order: this way we touch the
275*4882a593Smuzhiyun 		 * cache line that producer might currently be reading the last;
276*4882a593Smuzhiyun 		 * producer won't make progress and touch other cache lines
277*4882a593Smuzhiyun 		 * besides the first one until we write out all entries.
278*4882a593Smuzhiyun 		 */
279*4882a593Smuzhiyun 		while (likely(head >= r->consumer_tail))
280*4882a593Smuzhiyun 			r->queue[head--] = NULL;
281*4882a593Smuzhiyun 		r->consumer_tail = consumer_head;
282*4882a593Smuzhiyun 	}
283*4882a593Smuzhiyun 	if (unlikely(consumer_head >= r->size)) {
284*4882a593Smuzhiyun 		consumer_head = 0;
285*4882a593Smuzhiyun 		r->consumer_tail = 0;
286*4882a593Smuzhiyun 	}
287*4882a593Smuzhiyun 	/* matching READ_ONCE in __ptr_ring_empty for lockless tests */
288*4882a593Smuzhiyun 	WRITE_ONCE(r->consumer_head, consumer_head);
289*4882a593Smuzhiyun }
290*4882a593Smuzhiyun 
__ptr_ring_consume(struct ptr_ring * r)291*4882a593Smuzhiyun static inline void *__ptr_ring_consume(struct ptr_ring *r)
292*4882a593Smuzhiyun {
293*4882a593Smuzhiyun 	void *ptr;
294*4882a593Smuzhiyun 
295*4882a593Smuzhiyun 	/* The READ_ONCE in __ptr_ring_peek guarantees that anyone
296*4882a593Smuzhiyun 	 * accessing data through the pointer is up to date. Pairs
297*4882a593Smuzhiyun 	 * with smp_wmb in __ptr_ring_produce.
298*4882a593Smuzhiyun 	 */
299*4882a593Smuzhiyun 	ptr = __ptr_ring_peek(r);
300*4882a593Smuzhiyun 	if (ptr)
301*4882a593Smuzhiyun 		__ptr_ring_discard_one(r);
302*4882a593Smuzhiyun 
303*4882a593Smuzhiyun 	return ptr;
304*4882a593Smuzhiyun }
305*4882a593Smuzhiyun 
__ptr_ring_consume_batched(struct ptr_ring * r,void ** array,int n)306*4882a593Smuzhiyun static inline int __ptr_ring_consume_batched(struct ptr_ring *r,
307*4882a593Smuzhiyun 					     void **array, int n)
308*4882a593Smuzhiyun {
309*4882a593Smuzhiyun 	void *ptr;
310*4882a593Smuzhiyun 	int i;
311*4882a593Smuzhiyun 
312*4882a593Smuzhiyun 	for (i = 0; i < n; i++) {
313*4882a593Smuzhiyun 		ptr = __ptr_ring_consume(r);
314*4882a593Smuzhiyun 		if (!ptr)
315*4882a593Smuzhiyun 			break;
316*4882a593Smuzhiyun 		array[i] = ptr;
317*4882a593Smuzhiyun 	}
318*4882a593Smuzhiyun 
319*4882a593Smuzhiyun 	return i;
320*4882a593Smuzhiyun }
321*4882a593Smuzhiyun 
322*4882a593Smuzhiyun /*
323*4882a593Smuzhiyun  * Note: resize (below) nests producer lock within consumer lock, so if you
324*4882a593Smuzhiyun  * call this in interrupt or BH context, you must disable interrupts/BH when
325*4882a593Smuzhiyun  * producing.
326*4882a593Smuzhiyun  */
ptr_ring_consume(struct ptr_ring * r)327*4882a593Smuzhiyun static inline void *ptr_ring_consume(struct ptr_ring *r)
328*4882a593Smuzhiyun {
329*4882a593Smuzhiyun 	void *ptr;
330*4882a593Smuzhiyun 
331*4882a593Smuzhiyun 	spin_lock(&r->consumer_lock);
332*4882a593Smuzhiyun 	ptr = __ptr_ring_consume(r);
333*4882a593Smuzhiyun 	spin_unlock(&r->consumer_lock);
334*4882a593Smuzhiyun 
335*4882a593Smuzhiyun 	return ptr;
336*4882a593Smuzhiyun }
337*4882a593Smuzhiyun 
ptr_ring_consume_irq(struct ptr_ring * r)338*4882a593Smuzhiyun static inline void *ptr_ring_consume_irq(struct ptr_ring *r)
339*4882a593Smuzhiyun {
340*4882a593Smuzhiyun 	void *ptr;
341*4882a593Smuzhiyun 
342*4882a593Smuzhiyun 	spin_lock_irq(&r->consumer_lock);
343*4882a593Smuzhiyun 	ptr = __ptr_ring_consume(r);
344*4882a593Smuzhiyun 	spin_unlock_irq(&r->consumer_lock);
345*4882a593Smuzhiyun 
346*4882a593Smuzhiyun 	return ptr;
347*4882a593Smuzhiyun }
348*4882a593Smuzhiyun 
ptr_ring_consume_any(struct ptr_ring * r)349*4882a593Smuzhiyun static inline void *ptr_ring_consume_any(struct ptr_ring *r)
350*4882a593Smuzhiyun {
351*4882a593Smuzhiyun 	unsigned long flags;
352*4882a593Smuzhiyun 	void *ptr;
353*4882a593Smuzhiyun 
354*4882a593Smuzhiyun 	spin_lock_irqsave(&r->consumer_lock, flags);
355*4882a593Smuzhiyun 	ptr = __ptr_ring_consume(r);
356*4882a593Smuzhiyun 	spin_unlock_irqrestore(&r->consumer_lock, flags);
357*4882a593Smuzhiyun 
358*4882a593Smuzhiyun 	return ptr;
359*4882a593Smuzhiyun }
360*4882a593Smuzhiyun 
ptr_ring_consume_bh(struct ptr_ring * r)361*4882a593Smuzhiyun static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
362*4882a593Smuzhiyun {
363*4882a593Smuzhiyun 	void *ptr;
364*4882a593Smuzhiyun 
365*4882a593Smuzhiyun 	spin_lock_bh(&r->consumer_lock);
366*4882a593Smuzhiyun 	ptr = __ptr_ring_consume(r);
367*4882a593Smuzhiyun 	spin_unlock_bh(&r->consumer_lock);
368*4882a593Smuzhiyun 
369*4882a593Smuzhiyun 	return ptr;
370*4882a593Smuzhiyun }
371*4882a593Smuzhiyun 
ptr_ring_consume_batched(struct ptr_ring * r,void ** array,int n)372*4882a593Smuzhiyun static inline int ptr_ring_consume_batched(struct ptr_ring *r,
373*4882a593Smuzhiyun 					   void **array, int n)
374*4882a593Smuzhiyun {
375*4882a593Smuzhiyun 	int ret;
376*4882a593Smuzhiyun 
377*4882a593Smuzhiyun 	spin_lock(&r->consumer_lock);
378*4882a593Smuzhiyun 	ret = __ptr_ring_consume_batched(r, array, n);
379*4882a593Smuzhiyun 	spin_unlock(&r->consumer_lock);
380*4882a593Smuzhiyun 
381*4882a593Smuzhiyun 	return ret;
382*4882a593Smuzhiyun }
383*4882a593Smuzhiyun 
ptr_ring_consume_batched_irq(struct ptr_ring * r,void ** array,int n)384*4882a593Smuzhiyun static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r,
385*4882a593Smuzhiyun 					       void **array, int n)
386*4882a593Smuzhiyun {
387*4882a593Smuzhiyun 	int ret;
388*4882a593Smuzhiyun 
389*4882a593Smuzhiyun 	spin_lock_irq(&r->consumer_lock);
390*4882a593Smuzhiyun 	ret = __ptr_ring_consume_batched(r, array, n);
391*4882a593Smuzhiyun 	spin_unlock_irq(&r->consumer_lock);
392*4882a593Smuzhiyun 
393*4882a593Smuzhiyun 	return ret;
394*4882a593Smuzhiyun }
395*4882a593Smuzhiyun 
ptr_ring_consume_batched_any(struct ptr_ring * r,void ** array,int n)396*4882a593Smuzhiyun static inline int ptr_ring_consume_batched_any(struct ptr_ring *r,
397*4882a593Smuzhiyun 					       void **array, int n)
398*4882a593Smuzhiyun {
399*4882a593Smuzhiyun 	unsigned long flags;
400*4882a593Smuzhiyun 	int ret;
401*4882a593Smuzhiyun 
402*4882a593Smuzhiyun 	spin_lock_irqsave(&r->consumer_lock, flags);
403*4882a593Smuzhiyun 	ret = __ptr_ring_consume_batched(r, array, n);
404*4882a593Smuzhiyun 	spin_unlock_irqrestore(&r->consumer_lock, flags);
405*4882a593Smuzhiyun 
406*4882a593Smuzhiyun 	return ret;
407*4882a593Smuzhiyun }
408*4882a593Smuzhiyun 
ptr_ring_consume_batched_bh(struct ptr_ring * r,void ** array,int n)409*4882a593Smuzhiyun static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r,
410*4882a593Smuzhiyun 					      void **array, int n)
411*4882a593Smuzhiyun {
412*4882a593Smuzhiyun 	int ret;
413*4882a593Smuzhiyun 
414*4882a593Smuzhiyun 	spin_lock_bh(&r->consumer_lock);
415*4882a593Smuzhiyun 	ret = __ptr_ring_consume_batched(r, array, n);
416*4882a593Smuzhiyun 	spin_unlock_bh(&r->consumer_lock);
417*4882a593Smuzhiyun 
418*4882a593Smuzhiyun 	return ret;
419*4882a593Smuzhiyun }
420*4882a593Smuzhiyun 
421*4882a593Smuzhiyun /* Cast to structure type and call a function without discarding from FIFO.
422*4882a593Smuzhiyun  * Function must return a value.
423*4882a593Smuzhiyun  * Callers must take consumer_lock.
424*4882a593Smuzhiyun  */
425*4882a593Smuzhiyun #define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r)))
426*4882a593Smuzhiyun 
427*4882a593Smuzhiyun #define PTR_RING_PEEK_CALL(r, f) ({ \
428*4882a593Smuzhiyun 	typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
429*4882a593Smuzhiyun 	\
430*4882a593Smuzhiyun 	spin_lock(&(r)->consumer_lock); \
431*4882a593Smuzhiyun 	__PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
432*4882a593Smuzhiyun 	spin_unlock(&(r)->consumer_lock); \
433*4882a593Smuzhiyun 	__PTR_RING_PEEK_CALL_v; \
434*4882a593Smuzhiyun })
435*4882a593Smuzhiyun 
436*4882a593Smuzhiyun #define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \
437*4882a593Smuzhiyun 	typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
438*4882a593Smuzhiyun 	\
439*4882a593Smuzhiyun 	spin_lock_irq(&(r)->consumer_lock); \
440*4882a593Smuzhiyun 	__PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
441*4882a593Smuzhiyun 	spin_unlock_irq(&(r)->consumer_lock); \
442*4882a593Smuzhiyun 	__PTR_RING_PEEK_CALL_v; \
443*4882a593Smuzhiyun })
444*4882a593Smuzhiyun 
445*4882a593Smuzhiyun #define PTR_RING_PEEK_CALL_BH(r, f) ({ \
446*4882a593Smuzhiyun 	typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
447*4882a593Smuzhiyun 	\
448*4882a593Smuzhiyun 	spin_lock_bh(&(r)->consumer_lock); \
449*4882a593Smuzhiyun 	__PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
450*4882a593Smuzhiyun 	spin_unlock_bh(&(r)->consumer_lock); \
451*4882a593Smuzhiyun 	__PTR_RING_PEEK_CALL_v; \
452*4882a593Smuzhiyun })
453*4882a593Smuzhiyun 
454*4882a593Smuzhiyun #define PTR_RING_PEEK_CALL_ANY(r, f) ({ \
455*4882a593Smuzhiyun 	typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
456*4882a593Smuzhiyun 	unsigned long __PTR_RING_PEEK_CALL_f;\
457*4882a593Smuzhiyun 	\
458*4882a593Smuzhiyun 	spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
459*4882a593Smuzhiyun 	__PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
460*4882a593Smuzhiyun 	spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
461*4882a593Smuzhiyun 	__PTR_RING_PEEK_CALL_v; \
462*4882a593Smuzhiyun })
463*4882a593Smuzhiyun 
464*4882a593Smuzhiyun /* Not all gfp_t flags (besides GFP_KERNEL) are allowed. See
465*4882a593Smuzhiyun  * documentation for vmalloc for which of them are legal.
466*4882a593Smuzhiyun  */
__ptr_ring_init_queue_alloc(unsigned int size,gfp_t gfp)467*4882a593Smuzhiyun static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp)
468*4882a593Smuzhiyun {
469*4882a593Smuzhiyun 	if (size > KMALLOC_MAX_SIZE / sizeof(void *))
470*4882a593Smuzhiyun 		return NULL;
471*4882a593Smuzhiyun 	return kvmalloc_array(size, sizeof(void *), gfp | __GFP_ZERO);
472*4882a593Smuzhiyun }
473*4882a593Smuzhiyun 
__ptr_ring_set_size(struct ptr_ring * r,int size)474*4882a593Smuzhiyun static inline void __ptr_ring_set_size(struct ptr_ring *r, int size)
475*4882a593Smuzhiyun {
476*4882a593Smuzhiyun 	r->size = size;
477*4882a593Smuzhiyun 	r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue));
478*4882a593Smuzhiyun 	/* We need to set batch at least to 1 to make logic
479*4882a593Smuzhiyun 	 * in __ptr_ring_discard_one work correctly.
480*4882a593Smuzhiyun 	 * Batching too much (because ring is small) would cause a lot of
481*4882a593Smuzhiyun 	 * burstiness. Needs tuning, for now disable batching.
482*4882a593Smuzhiyun 	 */
483*4882a593Smuzhiyun 	if (r->batch > r->size / 2 || !r->batch)
484*4882a593Smuzhiyun 		r->batch = 1;
485*4882a593Smuzhiyun }
486*4882a593Smuzhiyun 
ptr_ring_init(struct ptr_ring * r,int size,gfp_t gfp)487*4882a593Smuzhiyun static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
488*4882a593Smuzhiyun {
489*4882a593Smuzhiyun 	r->queue = __ptr_ring_init_queue_alloc(size, gfp);
490*4882a593Smuzhiyun 	if (!r->queue)
491*4882a593Smuzhiyun 		return -ENOMEM;
492*4882a593Smuzhiyun 
493*4882a593Smuzhiyun 	__ptr_ring_set_size(r, size);
494*4882a593Smuzhiyun 	r->producer = r->consumer_head = r->consumer_tail = 0;
495*4882a593Smuzhiyun 	spin_lock_init(&r->producer_lock);
496*4882a593Smuzhiyun 	spin_lock_init(&r->consumer_lock);
497*4882a593Smuzhiyun 
498*4882a593Smuzhiyun 	return 0;
499*4882a593Smuzhiyun }
500*4882a593Smuzhiyun 
501*4882a593Smuzhiyun /*
502*4882a593Smuzhiyun  * Return entries into ring. Destroy entries that don't fit.
503*4882a593Smuzhiyun  *
504*4882a593Smuzhiyun  * Note: this is expected to be a rare slow path operation.
505*4882a593Smuzhiyun  *
506*4882a593Smuzhiyun  * Note: producer lock is nested within consumer lock, so if you
507*4882a593Smuzhiyun  * resize you must make sure all uses nest correctly.
508*4882a593Smuzhiyun  * In particular if you consume ring in interrupt or BH context, you must
509*4882a593Smuzhiyun  * disable interrupts/BH when doing so.
510*4882a593Smuzhiyun  */
ptr_ring_unconsume(struct ptr_ring * r,void ** batch,int n,void (* destroy)(void *))511*4882a593Smuzhiyun static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n,
512*4882a593Smuzhiyun 				      void (*destroy)(void *))
513*4882a593Smuzhiyun {
514*4882a593Smuzhiyun 	unsigned long flags;
515*4882a593Smuzhiyun 	int head;
516*4882a593Smuzhiyun 
517*4882a593Smuzhiyun 	spin_lock_irqsave(&r->consumer_lock, flags);
518*4882a593Smuzhiyun 	spin_lock(&r->producer_lock);
519*4882a593Smuzhiyun 
520*4882a593Smuzhiyun 	if (!r->size)
521*4882a593Smuzhiyun 		goto done;
522*4882a593Smuzhiyun 
523*4882a593Smuzhiyun 	/*
524*4882a593Smuzhiyun 	 * Clean out buffered entries (for simplicity). This way following code
525*4882a593Smuzhiyun 	 * can test entries for NULL and if not assume they are valid.
526*4882a593Smuzhiyun 	 */
527*4882a593Smuzhiyun 	head = r->consumer_head - 1;
528*4882a593Smuzhiyun 	while (likely(head >= r->consumer_tail))
529*4882a593Smuzhiyun 		r->queue[head--] = NULL;
530*4882a593Smuzhiyun 	r->consumer_tail = r->consumer_head;
531*4882a593Smuzhiyun 
532*4882a593Smuzhiyun 	/*
533*4882a593Smuzhiyun 	 * Go over entries in batch, start moving head back and copy entries.
534*4882a593Smuzhiyun 	 * Stop when we run into previously unconsumed entries.
535*4882a593Smuzhiyun 	 */
536*4882a593Smuzhiyun 	while (n) {
537*4882a593Smuzhiyun 		head = r->consumer_head - 1;
538*4882a593Smuzhiyun 		if (head < 0)
539*4882a593Smuzhiyun 			head = r->size - 1;
540*4882a593Smuzhiyun 		if (r->queue[head]) {
541*4882a593Smuzhiyun 			/* This batch entry will have to be destroyed. */
542*4882a593Smuzhiyun 			goto done;
543*4882a593Smuzhiyun 		}
544*4882a593Smuzhiyun 		r->queue[head] = batch[--n];
545*4882a593Smuzhiyun 		r->consumer_tail = head;
546*4882a593Smuzhiyun 		/* matching READ_ONCE in __ptr_ring_empty for lockless tests */
547*4882a593Smuzhiyun 		WRITE_ONCE(r->consumer_head, head);
548*4882a593Smuzhiyun 	}
549*4882a593Smuzhiyun 
550*4882a593Smuzhiyun done:
551*4882a593Smuzhiyun 	/* Destroy all entries left in the batch. */
552*4882a593Smuzhiyun 	while (n)
553*4882a593Smuzhiyun 		destroy(batch[--n]);
554*4882a593Smuzhiyun 	spin_unlock(&r->producer_lock);
555*4882a593Smuzhiyun 	spin_unlock_irqrestore(&r->consumer_lock, flags);
556*4882a593Smuzhiyun }
557*4882a593Smuzhiyun 
__ptr_ring_swap_queue(struct ptr_ring * r,void ** queue,int size,gfp_t gfp,void (* destroy)(void *))558*4882a593Smuzhiyun static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
559*4882a593Smuzhiyun 					   int size, gfp_t gfp,
560*4882a593Smuzhiyun 					   void (*destroy)(void *))
561*4882a593Smuzhiyun {
562*4882a593Smuzhiyun 	int producer = 0;
563*4882a593Smuzhiyun 	void **old;
564*4882a593Smuzhiyun 	void *ptr;
565*4882a593Smuzhiyun 
566*4882a593Smuzhiyun 	while ((ptr = __ptr_ring_consume(r)))
567*4882a593Smuzhiyun 		if (producer < size)
568*4882a593Smuzhiyun 			queue[producer++] = ptr;
569*4882a593Smuzhiyun 		else if (destroy)
570*4882a593Smuzhiyun 			destroy(ptr);
571*4882a593Smuzhiyun 
572*4882a593Smuzhiyun 	if (producer >= size)
573*4882a593Smuzhiyun 		producer = 0;
574*4882a593Smuzhiyun 	__ptr_ring_set_size(r, size);
575*4882a593Smuzhiyun 	r->producer = producer;
576*4882a593Smuzhiyun 	r->consumer_head = 0;
577*4882a593Smuzhiyun 	r->consumer_tail = 0;
578*4882a593Smuzhiyun 	old = r->queue;
579*4882a593Smuzhiyun 	r->queue = queue;
580*4882a593Smuzhiyun 
581*4882a593Smuzhiyun 	return old;
582*4882a593Smuzhiyun }
583*4882a593Smuzhiyun 
584*4882a593Smuzhiyun /*
585*4882a593Smuzhiyun  * Note: producer lock is nested within consumer lock, so if you
586*4882a593Smuzhiyun  * resize you must make sure all uses nest correctly.
587*4882a593Smuzhiyun  * In particular if you consume ring in interrupt or BH context, you must
588*4882a593Smuzhiyun  * disable interrupts/BH when doing so.
589*4882a593Smuzhiyun  */
ptr_ring_resize(struct ptr_ring * r,int size,gfp_t gfp,void (* destroy)(void *))590*4882a593Smuzhiyun static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp,
591*4882a593Smuzhiyun 				  void (*destroy)(void *))
592*4882a593Smuzhiyun {
593*4882a593Smuzhiyun 	unsigned long flags;
594*4882a593Smuzhiyun 	void **queue = __ptr_ring_init_queue_alloc(size, gfp);
595*4882a593Smuzhiyun 	void **old;
596*4882a593Smuzhiyun 
597*4882a593Smuzhiyun 	if (!queue)
598*4882a593Smuzhiyun 		return -ENOMEM;
599*4882a593Smuzhiyun 
600*4882a593Smuzhiyun 	spin_lock_irqsave(&(r)->consumer_lock, flags);
601*4882a593Smuzhiyun 	spin_lock(&(r)->producer_lock);
602*4882a593Smuzhiyun 
603*4882a593Smuzhiyun 	old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy);
604*4882a593Smuzhiyun 
605*4882a593Smuzhiyun 	spin_unlock(&(r)->producer_lock);
606*4882a593Smuzhiyun 	spin_unlock_irqrestore(&(r)->consumer_lock, flags);
607*4882a593Smuzhiyun 
608*4882a593Smuzhiyun 	kvfree(old);
609*4882a593Smuzhiyun 
610*4882a593Smuzhiyun 	return 0;
611*4882a593Smuzhiyun }
612*4882a593Smuzhiyun 
613*4882a593Smuzhiyun /*
614*4882a593Smuzhiyun  * Note: producer lock is nested within consumer lock, so if you
615*4882a593Smuzhiyun  * resize you must make sure all uses nest correctly.
616*4882a593Smuzhiyun  * In particular if you consume ring in interrupt or BH context, you must
617*4882a593Smuzhiyun  * disable interrupts/BH when doing so.
618*4882a593Smuzhiyun  */
ptr_ring_resize_multiple(struct ptr_ring ** rings,unsigned int nrings,int size,gfp_t gfp,void (* destroy)(void *))619*4882a593Smuzhiyun static inline int ptr_ring_resize_multiple(struct ptr_ring **rings,
620*4882a593Smuzhiyun 					   unsigned int nrings,
621*4882a593Smuzhiyun 					   int size,
622*4882a593Smuzhiyun 					   gfp_t gfp, void (*destroy)(void *))
623*4882a593Smuzhiyun {
624*4882a593Smuzhiyun 	unsigned long flags;
625*4882a593Smuzhiyun 	void ***queues;
626*4882a593Smuzhiyun 	int i;
627*4882a593Smuzhiyun 
628*4882a593Smuzhiyun 	queues = kmalloc_array(nrings, sizeof(*queues), gfp);
629*4882a593Smuzhiyun 	if (!queues)
630*4882a593Smuzhiyun 		goto noqueues;
631*4882a593Smuzhiyun 
632*4882a593Smuzhiyun 	for (i = 0; i < nrings; ++i) {
633*4882a593Smuzhiyun 		queues[i] = __ptr_ring_init_queue_alloc(size, gfp);
634*4882a593Smuzhiyun 		if (!queues[i])
635*4882a593Smuzhiyun 			goto nomem;
636*4882a593Smuzhiyun 	}
637*4882a593Smuzhiyun 
638*4882a593Smuzhiyun 	for (i = 0; i < nrings; ++i) {
639*4882a593Smuzhiyun 		spin_lock_irqsave(&(rings[i])->consumer_lock, flags);
640*4882a593Smuzhiyun 		spin_lock(&(rings[i])->producer_lock);
641*4882a593Smuzhiyun 		queues[i] = __ptr_ring_swap_queue(rings[i], queues[i],
642*4882a593Smuzhiyun 						  size, gfp, destroy);
643*4882a593Smuzhiyun 		spin_unlock(&(rings[i])->producer_lock);
644*4882a593Smuzhiyun 		spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags);
645*4882a593Smuzhiyun 	}
646*4882a593Smuzhiyun 
647*4882a593Smuzhiyun 	for (i = 0; i < nrings; ++i)
648*4882a593Smuzhiyun 		kvfree(queues[i]);
649*4882a593Smuzhiyun 
650*4882a593Smuzhiyun 	kfree(queues);
651*4882a593Smuzhiyun 
652*4882a593Smuzhiyun 	return 0;
653*4882a593Smuzhiyun 
654*4882a593Smuzhiyun nomem:
655*4882a593Smuzhiyun 	while (--i >= 0)
656*4882a593Smuzhiyun 		kvfree(queues[i]);
657*4882a593Smuzhiyun 
658*4882a593Smuzhiyun 	kfree(queues);
659*4882a593Smuzhiyun 
660*4882a593Smuzhiyun noqueues:
661*4882a593Smuzhiyun 	return -ENOMEM;
662*4882a593Smuzhiyun }
663*4882a593Smuzhiyun 
ptr_ring_cleanup(struct ptr_ring * r,void (* destroy)(void *))664*4882a593Smuzhiyun static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
665*4882a593Smuzhiyun {
666*4882a593Smuzhiyun 	void *ptr;
667*4882a593Smuzhiyun 
668*4882a593Smuzhiyun 	if (destroy)
669*4882a593Smuzhiyun 		while ((ptr = ptr_ring_consume(r)))
670*4882a593Smuzhiyun 			destroy(ptr);
671*4882a593Smuzhiyun 	kvfree(r->queue);
672*4882a593Smuzhiyun }
673*4882a593Smuzhiyun 
674*4882a593Smuzhiyun #endif /* _LINUX_PTR_RING_H  */
675