1*4882a593Smuzhiyun /* SPDX-License-Identifier: GPL-2.0-or-later */
2*4882a593Smuzhiyun /*
3*4882a593Smuzhiyun * Definitions for the 'struct ptr_ring' datastructure.
4*4882a593Smuzhiyun *
5*4882a593Smuzhiyun * Author:
6*4882a593Smuzhiyun * Michael S. Tsirkin <mst@redhat.com>
7*4882a593Smuzhiyun *
8*4882a593Smuzhiyun * Copyright (C) 2016 Red Hat, Inc.
9*4882a593Smuzhiyun *
10*4882a593Smuzhiyun * This is a limited-size FIFO maintaining pointers in FIFO order, with
11*4882a593Smuzhiyun * one CPU producing entries and another consuming entries from a FIFO.
12*4882a593Smuzhiyun *
13*4882a593Smuzhiyun * This implementation tries to minimize cache-contention when there is a
14*4882a593Smuzhiyun * single producer and a single consumer CPU.
15*4882a593Smuzhiyun */
16*4882a593Smuzhiyun
17*4882a593Smuzhiyun #ifndef _LINUX_PTR_RING_H
18*4882a593Smuzhiyun #define _LINUX_PTR_RING_H 1
19*4882a593Smuzhiyun
20*4882a593Smuzhiyun #ifdef __KERNEL__
21*4882a593Smuzhiyun #include <linux/spinlock.h>
22*4882a593Smuzhiyun #include <linux/cache.h>
23*4882a593Smuzhiyun #include <linux/types.h>
24*4882a593Smuzhiyun #include <linux/compiler.h>
25*4882a593Smuzhiyun #include <linux/slab.h>
26*4882a593Smuzhiyun #include <linux/mm.h>
27*4882a593Smuzhiyun #include <asm/errno.h>
28*4882a593Smuzhiyun #endif
29*4882a593Smuzhiyun
30*4882a593Smuzhiyun struct ptr_ring {
31*4882a593Smuzhiyun int producer ____cacheline_aligned_in_smp;
32*4882a593Smuzhiyun spinlock_t producer_lock;
33*4882a593Smuzhiyun int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */
34*4882a593Smuzhiyun int consumer_tail; /* next entry to invalidate */
35*4882a593Smuzhiyun spinlock_t consumer_lock;
36*4882a593Smuzhiyun /* Shared consumer/producer data */
37*4882a593Smuzhiyun /* Read-only by both the producer and the consumer */
38*4882a593Smuzhiyun int size ____cacheline_aligned_in_smp; /* max entries in queue */
39*4882a593Smuzhiyun int batch; /* number of entries to consume in a batch */
40*4882a593Smuzhiyun void **queue;
41*4882a593Smuzhiyun };
42*4882a593Smuzhiyun
43*4882a593Smuzhiyun /* Note: callers invoking this in a loop must use a compiler barrier,
44*4882a593Smuzhiyun * for example cpu_relax().
45*4882a593Smuzhiyun *
46*4882a593Smuzhiyun * NB: this is unlike __ptr_ring_empty in that callers must hold producer_lock:
47*4882a593Smuzhiyun * see e.g. ptr_ring_full.
48*4882a593Smuzhiyun */
__ptr_ring_full(struct ptr_ring * r)49*4882a593Smuzhiyun static inline bool __ptr_ring_full(struct ptr_ring *r)
50*4882a593Smuzhiyun {
51*4882a593Smuzhiyun return r->queue[r->producer];
52*4882a593Smuzhiyun }
53*4882a593Smuzhiyun
ptr_ring_full(struct ptr_ring * r)54*4882a593Smuzhiyun static inline bool ptr_ring_full(struct ptr_ring *r)
55*4882a593Smuzhiyun {
56*4882a593Smuzhiyun bool ret;
57*4882a593Smuzhiyun
58*4882a593Smuzhiyun spin_lock(&r->producer_lock);
59*4882a593Smuzhiyun ret = __ptr_ring_full(r);
60*4882a593Smuzhiyun spin_unlock(&r->producer_lock);
61*4882a593Smuzhiyun
62*4882a593Smuzhiyun return ret;
63*4882a593Smuzhiyun }
64*4882a593Smuzhiyun
ptr_ring_full_irq(struct ptr_ring * r)65*4882a593Smuzhiyun static inline bool ptr_ring_full_irq(struct ptr_ring *r)
66*4882a593Smuzhiyun {
67*4882a593Smuzhiyun bool ret;
68*4882a593Smuzhiyun
69*4882a593Smuzhiyun spin_lock_irq(&r->producer_lock);
70*4882a593Smuzhiyun ret = __ptr_ring_full(r);
71*4882a593Smuzhiyun spin_unlock_irq(&r->producer_lock);
72*4882a593Smuzhiyun
73*4882a593Smuzhiyun return ret;
74*4882a593Smuzhiyun }
75*4882a593Smuzhiyun
ptr_ring_full_any(struct ptr_ring * r)76*4882a593Smuzhiyun static inline bool ptr_ring_full_any(struct ptr_ring *r)
77*4882a593Smuzhiyun {
78*4882a593Smuzhiyun unsigned long flags;
79*4882a593Smuzhiyun bool ret;
80*4882a593Smuzhiyun
81*4882a593Smuzhiyun spin_lock_irqsave(&r->producer_lock, flags);
82*4882a593Smuzhiyun ret = __ptr_ring_full(r);
83*4882a593Smuzhiyun spin_unlock_irqrestore(&r->producer_lock, flags);
84*4882a593Smuzhiyun
85*4882a593Smuzhiyun return ret;
86*4882a593Smuzhiyun }
87*4882a593Smuzhiyun
ptr_ring_full_bh(struct ptr_ring * r)88*4882a593Smuzhiyun static inline bool ptr_ring_full_bh(struct ptr_ring *r)
89*4882a593Smuzhiyun {
90*4882a593Smuzhiyun bool ret;
91*4882a593Smuzhiyun
92*4882a593Smuzhiyun spin_lock_bh(&r->producer_lock);
93*4882a593Smuzhiyun ret = __ptr_ring_full(r);
94*4882a593Smuzhiyun spin_unlock_bh(&r->producer_lock);
95*4882a593Smuzhiyun
96*4882a593Smuzhiyun return ret;
97*4882a593Smuzhiyun }
98*4882a593Smuzhiyun
99*4882a593Smuzhiyun /* Note: callers invoking this in a loop must use a compiler barrier,
100*4882a593Smuzhiyun * for example cpu_relax(). Callers must hold producer_lock.
101*4882a593Smuzhiyun * Callers are responsible for making sure pointer that is being queued
102*4882a593Smuzhiyun * points to a valid data.
103*4882a593Smuzhiyun */
__ptr_ring_produce(struct ptr_ring * r,void * ptr)104*4882a593Smuzhiyun static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
105*4882a593Smuzhiyun {
106*4882a593Smuzhiyun if (unlikely(!r->size) || r->queue[r->producer])
107*4882a593Smuzhiyun return -ENOSPC;
108*4882a593Smuzhiyun
109*4882a593Smuzhiyun /* Make sure the pointer we are storing points to a valid data. */
110*4882a593Smuzhiyun /* Pairs with the dependency ordering in __ptr_ring_consume. */
111*4882a593Smuzhiyun smp_wmb();
112*4882a593Smuzhiyun
113*4882a593Smuzhiyun WRITE_ONCE(r->queue[r->producer++], ptr);
114*4882a593Smuzhiyun if (unlikely(r->producer >= r->size))
115*4882a593Smuzhiyun r->producer = 0;
116*4882a593Smuzhiyun return 0;
117*4882a593Smuzhiyun }
118*4882a593Smuzhiyun
119*4882a593Smuzhiyun /*
120*4882a593Smuzhiyun * Note: resize (below) nests producer lock within consumer lock, so if you
121*4882a593Smuzhiyun * consume in interrupt or BH context, you must disable interrupts/BH when
122*4882a593Smuzhiyun * calling this.
123*4882a593Smuzhiyun */
ptr_ring_produce(struct ptr_ring * r,void * ptr)124*4882a593Smuzhiyun static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
125*4882a593Smuzhiyun {
126*4882a593Smuzhiyun int ret;
127*4882a593Smuzhiyun
128*4882a593Smuzhiyun spin_lock(&r->producer_lock);
129*4882a593Smuzhiyun ret = __ptr_ring_produce(r, ptr);
130*4882a593Smuzhiyun spin_unlock(&r->producer_lock);
131*4882a593Smuzhiyun
132*4882a593Smuzhiyun return ret;
133*4882a593Smuzhiyun }
134*4882a593Smuzhiyun
ptr_ring_produce_irq(struct ptr_ring * r,void * ptr)135*4882a593Smuzhiyun static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr)
136*4882a593Smuzhiyun {
137*4882a593Smuzhiyun int ret;
138*4882a593Smuzhiyun
139*4882a593Smuzhiyun spin_lock_irq(&r->producer_lock);
140*4882a593Smuzhiyun ret = __ptr_ring_produce(r, ptr);
141*4882a593Smuzhiyun spin_unlock_irq(&r->producer_lock);
142*4882a593Smuzhiyun
143*4882a593Smuzhiyun return ret;
144*4882a593Smuzhiyun }
145*4882a593Smuzhiyun
ptr_ring_produce_any(struct ptr_ring * r,void * ptr)146*4882a593Smuzhiyun static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr)
147*4882a593Smuzhiyun {
148*4882a593Smuzhiyun unsigned long flags;
149*4882a593Smuzhiyun int ret;
150*4882a593Smuzhiyun
151*4882a593Smuzhiyun spin_lock_irqsave(&r->producer_lock, flags);
152*4882a593Smuzhiyun ret = __ptr_ring_produce(r, ptr);
153*4882a593Smuzhiyun spin_unlock_irqrestore(&r->producer_lock, flags);
154*4882a593Smuzhiyun
155*4882a593Smuzhiyun return ret;
156*4882a593Smuzhiyun }
157*4882a593Smuzhiyun
ptr_ring_produce_bh(struct ptr_ring * r,void * ptr)158*4882a593Smuzhiyun static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
159*4882a593Smuzhiyun {
160*4882a593Smuzhiyun int ret;
161*4882a593Smuzhiyun
162*4882a593Smuzhiyun spin_lock_bh(&r->producer_lock);
163*4882a593Smuzhiyun ret = __ptr_ring_produce(r, ptr);
164*4882a593Smuzhiyun spin_unlock_bh(&r->producer_lock);
165*4882a593Smuzhiyun
166*4882a593Smuzhiyun return ret;
167*4882a593Smuzhiyun }
168*4882a593Smuzhiyun
__ptr_ring_peek(struct ptr_ring * r)169*4882a593Smuzhiyun static inline void *__ptr_ring_peek(struct ptr_ring *r)
170*4882a593Smuzhiyun {
171*4882a593Smuzhiyun if (likely(r->size))
172*4882a593Smuzhiyun return READ_ONCE(r->queue[r->consumer_head]);
173*4882a593Smuzhiyun return NULL;
174*4882a593Smuzhiyun }
175*4882a593Smuzhiyun
176*4882a593Smuzhiyun /*
177*4882a593Smuzhiyun * Test ring empty status without taking any locks.
178*4882a593Smuzhiyun *
179*4882a593Smuzhiyun * NB: This is only safe to call if ring is never resized.
180*4882a593Smuzhiyun *
181*4882a593Smuzhiyun * However, if some other CPU consumes ring entries at the same time, the value
182*4882a593Smuzhiyun * returned is not guaranteed to be correct.
183*4882a593Smuzhiyun *
184*4882a593Smuzhiyun * In this case - to avoid incorrectly detecting the ring
185*4882a593Smuzhiyun * as empty - the CPU consuming the ring entries is responsible
186*4882a593Smuzhiyun * for either consuming all ring entries until the ring is empty,
187*4882a593Smuzhiyun * or synchronizing with some other CPU and causing it to
188*4882a593Smuzhiyun * re-test __ptr_ring_empty and/or consume the ring enteries
189*4882a593Smuzhiyun * after the synchronization point.
190*4882a593Smuzhiyun *
191*4882a593Smuzhiyun * Note: callers invoking this in a loop must use a compiler barrier,
192*4882a593Smuzhiyun * for example cpu_relax().
193*4882a593Smuzhiyun */
__ptr_ring_empty(struct ptr_ring * r)194*4882a593Smuzhiyun static inline bool __ptr_ring_empty(struct ptr_ring *r)
195*4882a593Smuzhiyun {
196*4882a593Smuzhiyun if (likely(r->size))
197*4882a593Smuzhiyun return !r->queue[READ_ONCE(r->consumer_head)];
198*4882a593Smuzhiyun return true;
199*4882a593Smuzhiyun }
200*4882a593Smuzhiyun
ptr_ring_empty(struct ptr_ring * r)201*4882a593Smuzhiyun static inline bool ptr_ring_empty(struct ptr_ring *r)
202*4882a593Smuzhiyun {
203*4882a593Smuzhiyun bool ret;
204*4882a593Smuzhiyun
205*4882a593Smuzhiyun spin_lock(&r->consumer_lock);
206*4882a593Smuzhiyun ret = __ptr_ring_empty(r);
207*4882a593Smuzhiyun spin_unlock(&r->consumer_lock);
208*4882a593Smuzhiyun
209*4882a593Smuzhiyun return ret;
210*4882a593Smuzhiyun }
211*4882a593Smuzhiyun
ptr_ring_empty_irq(struct ptr_ring * r)212*4882a593Smuzhiyun static inline bool ptr_ring_empty_irq(struct ptr_ring *r)
213*4882a593Smuzhiyun {
214*4882a593Smuzhiyun bool ret;
215*4882a593Smuzhiyun
216*4882a593Smuzhiyun spin_lock_irq(&r->consumer_lock);
217*4882a593Smuzhiyun ret = __ptr_ring_empty(r);
218*4882a593Smuzhiyun spin_unlock_irq(&r->consumer_lock);
219*4882a593Smuzhiyun
220*4882a593Smuzhiyun return ret;
221*4882a593Smuzhiyun }
222*4882a593Smuzhiyun
ptr_ring_empty_any(struct ptr_ring * r)223*4882a593Smuzhiyun static inline bool ptr_ring_empty_any(struct ptr_ring *r)
224*4882a593Smuzhiyun {
225*4882a593Smuzhiyun unsigned long flags;
226*4882a593Smuzhiyun bool ret;
227*4882a593Smuzhiyun
228*4882a593Smuzhiyun spin_lock_irqsave(&r->consumer_lock, flags);
229*4882a593Smuzhiyun ret = __ptr_ring_empty(r);
230*4882a593Smuzhiyun spin_unlock_irqrestore(&r->consumer_lock, flags);
231*4882a593Smuzhiyun
232*4882a593Smuzhiyun return ret;
233*4882a593Smuzhiyun }
234*4882a593Smuzhiyun
ptr_ring_empty_bh(struct ptr_ring * r)235*4882a593Smuzhiyun static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
236*4882a593Smuzhiyun {
237*4882a593Smuzhiyun bool ret;
238*4882a593Smuzhiyun
239*4882a593Smuzhiyun spin_lock_bh(&r->consumer_lock);
240*4882a593Smuzhiyun ret = __ptr_ring_empty(r);
241*4882a593Smuzhiyun spin_unlock_bh(&r->consumer_lock);
242*4882a593Smuzhiyun
243*4882a593Smuzhiyun return ret;
244*4882a593Smuzhiyun }
245*4882a593Smuzhiyun
246*4882a593Smuzhiyun /* Must only be called after __ptr_ring_peek returned !NULL */
__ptr_ring_discard_one(struct ptr_ring * r)247*4882a593Smuzhiyun static inline void __ptr_ring_discard_one(struct ptr_ring *r)
248*4882a593Smuzhiyun {
249*4882a593Smuzhiyun /* Fundamentally, what we want to do is update consumer
250*4882a593Smuzhiyun * index and zero out the entry so producer can reuse it.
251*4882a593Smuzhiyun * Doing it naively at each consume would be as simple as:
252*4882a593Smuzhiyun * consumer = r->consumer;
253*4882a593Smuzhiyun * r->queue[consumer++] = NULL;
254*4882a593Smuzhiyun * if (unlikely(consumer >= r->size))
255*4882a593Smuzhiyun * consumer = 0;
256*4882a593Smuzhiyun * r->consumer = consumer;
257*4882a593Smuzhiyun * but that is suboptimal when the ring is full as producer is writing
258*4882a593Smuzhiyun * out new entries in the same cache line. Defer these updates until a
259*4882a593Smuzhiyun * batch of entries has been consumed.
260*4882a593Smuzhiyun */
261*4882a593Smuzhiyun /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty
262*4882a593Smuzhiyun * to work correctly.
263*4882a593Smuzhiyun */
264*4882a593Smuzhiyun int consumer_head = r->consumer_head;
265*4882a593Smuzhiyun int head = consumer_head++;
266*4882a593Smuzhiyun
267*4882a593Smuzhiyun /* Once we have processed enough entries invalidate them in
268*4882a593Smuzhiyun * the ring all at once so producer can reuse their space in the ring.
269*4882a593Smuzhiyun * We also do this when we reach end of the ring - not mandatory
270*4882a593Smuzhiyun * but helps keep the implementation simple.
271*4882a593Smuzhiyun */
272*4882a593Smuzhiyun if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
273*4882a593Smuzhiyun consumer_head >= r->size)) {
274*4882a593Smuzhiyun /* Zero out entries in the reverse order: this way we touch the
275*4882a593Smuzhiyun * cache line that producer might currently be reading the last;
276*4882a593Smuzhiyun * producer won't make progress and touch other cache lines
277*4882a593Smuzhiyun * besides the first one until we write out all entries.
278*4882a593Smuzhiyun */
279*4882a593Smuzhiyun while (likely(head >= r->consumer_tail))
280*4882a593Smuzhiyun r->queue[head--] = NULL;
281*4882a593Smuzhiyun r->consumer_tail = consumer_head;
282*4882a593Smuzhiyun }
283*4882a593Smuzhiyun if (unlikely(consumer_head >= r->size)) {
284*4882a593Smuzhiyun consumer_head = 0;
285*4882a593Smuzhiyun r->consumer_tail = 0;
286*4882a593Smuzhiyun }
287*4882a593Smuzhiyun /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
288*4882a593Smuzhiyun WRITE_ONCE(r->consumer_head, consumer_head);
289*4882a593Smuzhiyun }
290*4882a593Smuzhiyun
__ptr_ring_consume(struct ptr_ring * r)291*4882a593Smuzhiyun static inline void *__ptr_ring_consume(struct ptr_ring *r)
292*4882a593Smuzhiyun {
293*4882a593Smuzhiyun void *ptr;
294*4882a593Smuzhiyun
295*4882a593Smuzhiyun /* The READ_ONCE in __ptr_ring_peek guarantees that anyone
296*4882a593Smuzhiyun * accessing data through the pointer is up to date. Pairs
297*4882a593Smuzhiyun * with smp_wmb in __ptr_ring_produce.
298*4882a593Smuzhiyun */
299*4882a593Smuzhiyun ptr = __ptr_ring_peek(r);
300*4882a593Smuzhiyun if (ptr)
301*4882a593Smuzhiyun __ptr_ring_discard_one(r);
302*4882a593Smuzhiyun
303*4882a593Smuzhiyun return ptr;
304*4882a593Smuzhiyun }
305*4882a593Smuzhiyun
__ptr_ring_consume_batched(struct ptr_ring * r,void ** array,int n)306*4882a593Smuzhiyun static inline int __ptr_ring_consume_batched(struct ptr_ring *r,
307*4882a593Smuzhiyun void **array, int n)
308*4882a593Smuzhiyun {
309*4882a593Smuzhiyun void *ptr;
310*4882a593Smuzhiyun int i;
311*4882a593Smuzhiyun
312*4882a593Smuzhiyun for (i = 0; i < n; i++) {
313*4882a593Smuzhiyun ptr = __ptr_ring_consume(r);
314*4882a593Smuzhiyun if (!ptr)
315*4882a593Smuzhiyun break;
316*4882a593Smuzhiyun array[i] = ptr;
317*4882a593Smuzhiyun }
318*4882a593Smuzhiyun
319*4882a593Smuzhiyun return i;
320*4882a593Smuzhiyun }
321*4882a593Smuzhiyun
322*4882a593Smuzhiyun /*
323*4882a593Smuzhiyun * Note: resize (below) nests producer lock within consumer lock, so if you
324*4882a593Smuzhiyun * call this in interrupt or BH context, you must disable interrupts/BH when
325*4882a593Smuzhiyun * producing.
326*4882a593Smuzhiyun */
ptr_ring_consume(struct ptr_ring * r)327*4882a593Smuzhiyun static inline void *ptr_ring_consume(struct ptr_ring *r)
328*4882a593Smuzhiyun {
329*4882a593Smuzhiyun void *ptr;
330*4882a593Smuzhiyun
331*4882a593Smuzhiyun spin_lock(&r->consumer_lock);
332*4882a593Smuzhiyun ptr = __ptr_ring_consume(r);
333*4882a593Smuzhiyun spin_unlock(&r->consumer_lock);
334*4882a593Smuzhiyun
335*4882a593Smuzhiyun return ptr;
336*4882a593Smuzhiyun }
337*4882a593Smuzhiyun
ptr_ring_consume_irq(struct ptr_ring * r)338*4882a593Smuzhiyun static inline void *ptr_ring_consume_irq(struct ptr_ring *r)
339*4882a593Smuzhiyun {
340*4882a593Smuzhiyun void *ptr;
341*4882a593Smuzhiyun
342*4882a593Smuzhiyun spin_lock_irq(&r->consumer_lock);
343*4882a593Smuzhiyun ptr = __ptr_ring_consume(r);
344*4882a593Smuzhiyun spin_unlock_irq(&r->consumer_lock);
345*4882a593Smuzhiyun
346*4882a593Smuzhiyun return ptr;
347*4882a593Smuzhiyun }
348*4882a593Smuzhiyun
ptr_ring_consume_any(struct ptr_ring * r)349*4882a593Smuzhiyun static inline void *ptr_ring_consume_any(struct ptr_ring *r)
350*4882a593Smuzhiyun {
351*4882a593Smuzhiyun unsigned long flags;
352*4882a593Smuzhiyun void *ptr;
353*4882a593Smuzhiyun
354*4882a593Smuzhiyun spin_lock_irqsave(&r->consumer_lock, flags);
355*4882a593Smuzhiyun ptr = __ptr_ring_consume(r);
356*4882a593Smuzhiyun spin_unlock_irqrestore(&r->consumer_lock, flags);
357*4882a593Smuzhiyun
358*4882a593Smuzhiyun return ptr;
359*4882a593Smuzhiyun }
360*4882a593Smuzhiyun
ptr_ring_consume_bh(struct ptr_ring * r)361*4882a593Smuzhiyun static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
362*4882a593Smuzhiyun {
363*4882a593Smuzhiyun void *ptr;
364*4882a593Smuzhiyun
365*4882a593Smuzhiyun spin_lock_bh(&r->consumer_lock);
366*4882a593Smuzhiyun ptr = __ptr_ring_consume(r);
367*4882a593Smuzhiyun spin_unlock_bh(&r->consumer_lock);
368*4882a593Smuzhiyun
369*4882a593Smuzhiyun return ptr;
370*4882a593Smuzhiyun }
371*4882a593Smuzhiyun
ptr_ring_consume_batched(struct ptr_ring * r,void ** array,int n)372*4882a593Smuzhiyun static inline int ptr_ring_consume_batched(struct ptr_ring *r,
373*4882a593Smuzhiyun void **array, int n)
374*4882a593Smuzhiyun {
375*4882a593Smuzhiyun int ret;
376*4882a593Smuzhiyun
377*4882a593Smuzhiyun spin_lock(&r->consumer_lock);
378*4882a593Smuzhiyun ret = __ptr_ring_consume_batched(r, array, n);
379*4882a593Smuzhiyun spin_unlock(&r->consumer_lock);
380*4882a593Smuzhiyun
381*4882a593Smuzhiyun return ret;
382*4882a593Smuzhiyun }
383*4882a593Smuzhiyun
ptr_ring_consume_batched_irq(struct ptr_ring * r,void ** array,int n)384*4882a593Smuzhiyun static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r,
385*4882a593Smuzhiyun void **array, int n)
386*4882a593Smuzhiyun {
387*4882a593Smuzhiyun int ret;
388*4882a593Smuzhiyun
389*4882a593Smuzhiyun spin_lock_irq(&r->consumer_lock);
390*4882a593Smuzhiyun ret = __ptr_ring_consume_batched(r, array, n);
391*4882a593Smuzhiyun spin_unlock_irq(&r->consumer_lock);
392*4882a593Smuzhiyun
393*4882a593Smuzhiyun return ret;
394*4882a593Smuzhiyun }
395*4882a593Smuzhiyun
ptr_ring_consume_batched_any(struct ptr_ring * r,void ** array,int n)396*4882a593Smuzhiyun static inline int ptr_ring_consume_batched_any(struct ptr_ring *r,
397*4882a593Smuzhiyun void **array, int n)
398*4882a593Smuzhiyun {
399*4882a593Smuzhiyun unsigned long flags;
400*4882a593Smuzhiyun int ret;
401*4882a593Smuzhiyun
402*4882a593Smuzhiyun spin_lock_irqsave(&r->consumer_lock, flags);
403*4882a593Smuzhiyun ret = __ptr_ring_consume_batched(r, array, n);
404*4882a593Smuzhiyun spin_unlock_irqrestore(&r->consumer_lock, flags);
405*4882a593Smuzhiyun
406*4882a593Smuzhiyun return ret;
407*4882a593Smuzhiyun }
408*4882a593Smuzhiyun
ptr_ring_consume_batched_bh(struct ptr_ring * r,void ** array,int n)409*4882a593Smuzhiyun static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r,
410*4882a593Smuzhiyun void **array, int n)
411*4882a593Smuzhiyun {
412*4882a593Smuzhiyun int ret;
413*4882a593Smuzhiyun
414*4882a593Smuzhiyun spin_lock_bh(&r->consumer_lock);
415*4882a593Smuzhiyun ret = __ptr_ring_consume_batched(r, array, n);
416*4882a593Smuzhiyun spin_unlock_bh(&r->consumer_lock);
417*4882a593Smuzhiyun
418*4882a593Smuzhiyun return ret;
419*4882a593Smuzhiyun }
420*4882a593Smuzhiyun
421*4882a593Smuzhiyun /* Cast to structure type and call a function without discarding from FIFO.
422*4882a593Smuzhiyun * Function must return a value.
423*4882a593Smuzhiyun * Callers must take consumer_lock.
424*4882a593Smuzhiyun */
425*4882a593Smuzhiyun #define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r)))
426*4882a593Smuzhiyun
427*4882a593Smuzhiyun #define PTR_RING_PEEK_CALL(r, f) ({ \
428*4882a593Smuzhiyun typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
429*4882a593Smuzhiyun \
430*4882a593Smuzhiyun spin_lock(&(r)->consumer_lock); \
431*4882a593Smuzhiyun __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
432*4882a593Smuzhiyun spin_unlock(&(r)->consumer_lock); \
433*4882a593Smuzhiyun __PTR_RING_PEEK_CALL_v; \
434*4882a593Smuzhiyun })
435*4882a593Smuzhiyun
436*4882a593Smuzhiyun #define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \
437*4882a593Smuzhiyun typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
438*4882a593Smuzhiyun \
439*4882a593Smuzhiyun spin_lock_irq(&(r)->consumer_lock); \
440*4882a593Smuzhiyun __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
441*4882a593Smuzhiyun spin_unlock_irq(&(r)->consumer_lock); \
442*4882a593Smuzhiyun __PTR_RING_PEEK_CALL_v; \
443*4882a593Smuzhiyun })
444*4882a593Smuzhiyun
445*4882a593Smuzhiyun #define PTR_RING_PEEK_CALL_BH(r, f) ({ \
446*4882a593Smuzhiyun typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
447*4882a593Smuzhiyun \
448*4882a593Smuzhiyun spin_lock_bh(&(r)->consumer_lock); \
449*4882a593Smuzhiyun __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
450*4882a593Smuzhiyun spin_unlock_bh(&(r)->consumer_lock); \
451*4882a593Smuzhiyun __PTR_RING_PEEK_CALL_v; \
452*4882a593Smuzhiyun })
453*4882a593Smuzhiyun
454*4882a593Smuzhiyun #define PTR_RING_PEEK_CALL_ANY(r, f) ({ \
455*4882a593Smuzhiyun typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
456*4882a593Smuzhiyun unsigned long __PTR_RING_PEEK_CALL_f;\
457*4882a593Smuzhiyun \
458*4882a593Smuzhiyun spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
459*4882a593Smuzhiyun __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
460*4882a593Smuzhiyun spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
461*4882a593Smuzhiyun __PTR_RING_PEEK_CALL_v; \
462*4882a593Smuzhiyun })
463*4882a593Smuzhiyun
464*4882a593Smuzhiyun /* Not all gfp_t flags (besides GFP_KERNEL) are allowed. See
465*4882a593Smuzhiyun * documentation for vmalloc for which of them are legal.
466*4882a593Smuzhiyun */
__ptr_ring_init_queue_alloc(unsigned int size,gfp_t gfp)467*4882a593Smuzhiyun static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp)
468*4882a593Smuzhiyun {
469*4882a593Smuzhiyun if (size > KMALLOC_MAX_SIZE / sizeof(void *))
470*4882a593Smuzhiyun return NULL;
471*4882a593Smuzhiyun return kvmalloc_array(size, sizeof(void *), gfp | __GFP_ZERO);
472*4882a593Smuzhiyun }
473*4882a593Smuzhiyun
__ptr_ring_set_size(struct ptr_ring * r,int size)474*4882a593Smuzhiyun static inline void __ptr_ring_set_size(struct ptr_ring *r, int size)
475*4882a593Smuzhiyun {
476*4882a593Smuzhiyun r->size = size;
477*4882a593Smuzhiyun r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue));
478*4882a593Smuzhiyun /* We need to set batch at least to 1 to make logic
479*4882a593Smuzhiyun * in __ptr_ring_discard_one work correctly.
480*4882a593Smuzhiyun * Batching too much (because ring is small) would cause a lot of
481*4882a593Smuzhiyun * burstiness. Needs tuning, for now disable batching.
482*4882a593Smuzhiyun */
483*4882a593Smuzhiyun if (r->batch > r->size / 2 || !r->batch)
484*4882a593Smuzhiyun r->batch = 1;
485*4882a593Smuzhiyun }
486*4882a593Smuzhiyun
ptr_ring_init(struct ptr_ring * r,int size,gfp_t gfp)487*4882a593Smuzhiyun static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
488*4882a593Smuzhiyun {
489*4882a593Smuzhiyun r->queue = __ptr_ring_init_queue_alloc(size, gfp);
490*4882a593Smuzhiyun if (!r->queue)
491*4882a593Smuzhiyun return -ENOMEM;
492*4882a593Smuzhiyun
493*4882a593Smuzhiyun __ptr_ring_set_size(r, size);
494*4882a593Smuzhiyun r->producer = r->consumer_head = r->consumer_tail = 0;
495*4882a593Smuzhiyun spin_lock_init(&r->producer_lock);
496*4882a593Smuzhiyun spin_lock_init(&r->consumer_lock);
497*4882a593Smuzhiyun
498*4882a593Smuzhiyun return 0;
499*4882a593Smuzhiyun }
500*4882a593Smuzhiyun
501*4882a593Smuzhiyun /*
502*4882a593Smuzhiyun * Return entries into ring. Destroy entries that don't fit.
503*4882a593Smuzhiyun *
504*4882a593Smuzhiyun * Note: this is expected to be a rare slow path operation.
505*4882a593Smuzhiyun *
506*4882a593Smuzhiyun * Note: producer lock is nested within consumer lock, so if you
507*4882a593Smuzhiyun * resize you must make sure all uses nest correctly.
508*4882a593Smuzhiyun * In particular if you consume ring in interrupt or BH context, you must
509*4882a593Smuzhiyun * disable interrupts/BH when doing so.
510*4882a593Smuzhiyun */
ptr_ring_unconsume(struct ptr_ring * r,void ** batch,int n,void (* destroy)(void *))511*4882a593Smuzhiyun static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n,
512*4882a593Smuzhiyun void (*destroy)(void *))
513*4882a593Smuzhiyun {
514*4882a593Smuzhiyun unsigned long flags;
515*4882a593Smuzhiyun int head;
516*4882a593Smuzhiyun
517*4882a593Smuzhiyun spin_lock_irqsave(&r->consumer_lock, flags);
518*4882a593Smuzhiyun spin_lock(&r->producer_lock);
519*4882a593Smuzhiyun
520*4882a593Smuzhiyun if (!r->size)
521*4882a593Smuzhiyun goto done;
522*4882a593Smuzhiyun
523*4882a593Smuzhiyun /*
524*4882a593Smuzhiyun * Clean out buffered entries (for simplicity). This way following code
525*4882a593Smuzhiyun * can test entries for NULL and if not assume they are valid.
526*4882a593Smuzhiyun */
527*4882a593Smuzhiyun head = r->consumer_head - 1;
528*4882a593Smuzhiyun while (likely(head >= r->consumer_tail))
529*4882a593Smuzhiyun r->queue[head--] = NULL;
530*4882a593Smuzhiyun r->consumer_tail = r->consumer_head;
531*4882a593Smuzhiyun
532*4882a593Smuzhiyun /*
533*4882a593Smuzhiyun * Go over entries in batch, start moving head back and copy entries.
534*4882a593Smuzhiyun * Stop when we run into previously unconsumed entries.
535*4882a593Smuzhiyun */
536*4882a593Smuzhiyun while (n) {
537*4882a593Smuzhiyun head = r->consumer_head - 1;
538*4882a593Smuzhiyun if (head < 0)
539*4882a593Smuzhiyun head = r->size - 1;
540*4882a593Smuzhiyun if (r->queue[head]) {
541*4882a593Smuzhiyun /* This batch entry will have to be destroyed. */
542*4882a593Smuzhiyun goto done;
543*4882a593Smuzhiyun }
544*4882a593Smuzhiyun r->queue[head] = batch[--n];
545*4882a593Smuzhiyun r->consumer_tail = head;
546*4882a593Smuzhiyun /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
547*4882a593Smuzhiyun WRITE_ONCE(r->consumer_head, head);
548*4882a593Smuzhiyun }
549*4882a593Smuzhiyun
550*4882a593Smuzhiyun done:
551*4882a593Smuzhiyun /* Destroy all entries left in the batch. */
552*4882a593Smuzhiyun while (n)
553*4882a593Smuzhiyun destroy(batch[--n]);
554*4882a593Smuzhiyun spin_unlock(&r->producer_lock);
555*4882a593Smuzhiyun spin_unlock_irqrestore(&r->consumer_lock, flags);
556*4882a593Smuzhiyun }
557*4882a593Smuzhiyun
__ptr_ring_swap_queue(struct ptr_ring * r,void ** queue,int size,gfp_t gfp,void (* destroy)(void *))558*4882a593Smuzhiyun static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
559*4882a593Smuzhiyun int size, gfp_t gfp,
560*4882a593Smuzhiyun void (*destroy)(void *))
561*4882a593Smuzhiyun {
562*4882a593Smuzhiyun int producer = 0;
563*4882a593Smuzhiyun void **old;
564*4882a593Smuzhiyun void *ptr;
565*4882a593Smuzhiyun
566*4882a593Smuzhiyun while ((ptr = __ptr_ring_consume(r)))
567*4882a593Smuzhiyun if (producer < size)
568*4882a593Smuzhiyun queue[producer++] = ptr;
569*4882a593Smuzhiyun else if (destroy)
570*4882a593Smuzhiyun destroy(ptr);
571*4882a593Smuzhiyun
572*4882a593Smuzhiyun if (producer >= size)
573*4882a593Smuzhiyun producer = 0;
574*4882a593Smuzhiyun __ptr_ring_set_size(r, size);
575*4882a593Smuzhiyun r->producer = producer;
576*4882a593Smuzhiyun r->consumer_head = 0;
577*4882a593Smuzhiyun r->consumer_tail = 0;
578*4882a593Smuzhiyun old = r->queue;
579*4882a593Smuzhiyun r->queue = queue;
580*4882a593Smuzhiyun
581*4882a593Smuzhiyun return old;
582*4882a593Smuzhiyun }
583*4882a593Smuzhiyun
584*4882a593Smuzhiyun /*
585*4882a593Smuzhiyun * Note: producer lock is nested within consumer lock, so if you
586*4882a593Smuzhiyun * resize you must make sure all uses nest correctly.
587*4882a593Smuzhiyun * In particular if you consume ring in interrupt or BH context, you must
588*4882a593Smuzhiyun * disable interrupts/BH when doing so.
589*4882a593Smuzhiyun */
ptr_ring_resize(struct ptr_ring * r,int size,gfp_t gfp,void (* destroy)(void *))590*4882a593Smuzhiyun static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp,
591*4882a593Smuzhiyun void (*destroy)(void *))
592*4882a593Smuzhiyun {
593*4882a593Smuzhiyun unsigned long flags;
594*4882a593Smuzhiyun void **queue = __ptr_ring_init_queue_alloc(size, gfp);
595*4882a593Smuzhiyun void **old;
596*4882a593Smuzhiyun
597*4882a593Smuzhiyun if (!queue)
598*4882a593Smuzhiyun return -ENOMEM;
599*4882a593Smuzhiyun
600*4882a593Smuzhiyun spin_lock_irqsave(&(r)->consumer_lock, flags);
601*4882a593Smuzhiyun spin_lock(&(r)->producer_lock);
602*4882a593Smuzhiyun
603*4882a593Smuzhiyun old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy);
604*4882a593Smuzhiyun
605*4882a593Smuzhiyun spin_unlock(&(r)->producer_lock);
606*4882a593Smuzhiyun spin_unlock_irqrestore(&(r)->consumer_lock, flags);
607*4882a593Smuzhiyun
608*4882a593Smuzhiyun kvfree(old);
609*4882a593Smuzhiyun
610*4882a593Smuzhiyun return 0;
611*4882a593Smuzhiyun }
612*4882a593Smuzhiyun
613*4882a593Smuzhiyun /*
614*4882a593Smuzhiyun * Note: producer lock is nested within consumer lock, so if you
615*4882a593Smuzhiyun * resize you must make sure all uses nest correctly.
616*4882a593Smuzhiyun * In particular if you consume ring in interrupt or BH context, you must
617*4882a593Smuzhiyun * disable interrupts/BH when doing so.
618*4882a593Smuzhiyun */
ptr_ring_resize_multiple(struct ptr_ring ** rings,unsigned int nrings,int size,gfp_t gfp,void (* destroy)(void *))619*4882a593Smuzhiyun static inline int ptr_ring_resize_multiple(struct ptr_ring **rings,
620*4882a593Smuzhiyun unsigned int nrings,
621*4882a593Smuzhiyun int size,
622*4882a593Smuzhiyun gfp_t gfp, void (*destroy)(void *))
623*4882a593Smuzhiyun {
624*4882a593Smuzhiyun unsigned long flags;
625*4882a593Smuzhiyun void ***queues;
626*4882a593Smuzhiyun int i;
627*4882a593Smuzhiyun
628*4882a593Smuzhiyun queues = kmalloc_array(nrings, sizeof(*queues), gfp);
629*4882a593Smuzhiyun if (!queues)
630*4882a593Smuzhiyun goto noqueues;
631*4882a593Smuzhiyun
632*4882a593Smuzhiyun for (i = 0; i < nrings; ++i) {
633*4882a593Smuzhiyun queues[i] = __ptr_ring_init_queue_alloc(size, gfp);
634*4882a593Smuzhiyun if (!queues[i])
635*4882a593Smuzhiyun goto nomem;
636*4882a593Smuzhiyun }
637*4882a593Smuzhiyun
638*4882a593Smuzhiyun for (i = 0; i < nrings; ++i) {
639*4882a593Smuzhiyun spin_lock_irqsave(&(rings[i])->consumer_lock, flags);
640*4882a593Smuzhiyun spin_lock(&(rings[i])->producer_lock);
641*4882a593Smuzhiyun queues[i] = __ptr_ring_swap_queue(rings[i], queues[i],
642*4882a593Smuzhiyun size, gfp, destroy);
643*4882a593Smuzhiyun spin_unlock(&(rings[i])->producer_lock);
644*4882a593Smuzhiyun spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags);
645*4882a593Smuzhiyun }
646*4882a593Smuzhiyun
647*4882a593Smuzhiyun for (i = 0; i < nrings; ++i)
648*4882a593Smuzhiyun kvfree(queues[i]);
649*4882a593Smuzhiyun
650*4882a593Smuzhiyun kfree(queues);
651*4882a593Smuzhiyun
652*4882a593Smuzhiyun return 0;
653*4882a593Smuzhiyun
654*4882a593Smuzhiyun nomem:
655*4882a593Smuzhiyun while (--i >= 0)
656*4882a593Smuzhiyun kvfree(queues[i]);
657*4882a593Smuzhiyun
658*4882a593Smuzhiyun kfree(queues);
659*4882a593Smuzhiyun
660*4882a593Smuzhiyun noqueues:
661*4882a593Smuzhiyun return -ENOMEM;
662*4882a593Smuzhiyun }
663*4882a593Smuzhiyun
ptr_ring_cleanup(struct ptr_ring * r,void (* destroy)(void *))664*4882a593Smuzhiyun static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
665*4882a593Smuzhiyun {
666*4882a593Smuzhiyun void *ptr;
667*4882a593Smuzhiyun
668*4882a593Smuzhiyun if (destroy)
669*4882a593Smuzhiyun while ((ptr = ptr_ring_consume(r)))
670*4882a593Smuzhiyun destroy(ptr);
671*4882a593Smuzhiyun kvfree(r->queue);
672*4882a593Smuzhiyun }
673*4882a593Smuzhiyun
674*4882a593Smuzhiyun #endif /* _LINUX_PTR_RING_H */
675