xref: /OK3568_Linux_fs/kernel/io_uring/io-wq.c (revision 4882a59341e53eb6f0b4789bf948001014eff981)
1*4882a593Smuzhiyun // SPDX-License-Identifier: GPL-2.0
2*4882a593Smuzhiyun /*
3*4882a593Smuzhiyun  * Basic worker thread pool for io_uring
4*4882a593Smuzhiyun  *
5*4882a593Smuzhiyun  * Copyright (C) 2019 Jens Axboe
6*4882a593Smuzhiyun  *
7*4882a593Smuzhiyun  */
8*4882a593Smuzhiyun #include <linux/kernel.h>
9*4882a593Smuzhiyun #include <linux/init.h>
10*4882a593Smuzhiyun #include <linux/errno.h>
11*4882a593Smuzhiyun #include <linux/sched/signal.h>
12*4882a593Smuzhiyun #include <linux/percpu.h>
13*4882a593Smuzhiyun #include <linux/slab.h>
14*4882a593Smuzhiyun #include <linux/rculist_nulls.h>
15*4882a593Smuzhiyun #include <linux/cpu.h>
16*4882a593Smuzhiyun #include <linux/tracehook.h>
17*4882a593Smuzhiyun #include <uapi/linux/io_uring.h>
18*4882a593Smuzhiyun 
19*4882a593Smuzhiyun #include "io-wq.h"
20*4882a593Smuzhiyun 
21*4882a593Smuzhiyun #define WORKER_IDLE_TIMEOUT	(5 * HZ)
22*4882a593Smuzhiyun 
23*4882a593Smuzhiyun enum {
24*4882a593Smuzhiyun 	IO_WORKER_F_UP		= 1,	/* up and active */
25*4882a593Smuzhiyun 	IO_WORKER_F_RUNNING	= 2,	/* account as running */
26*4882a593Smuzhiyun 	IO_WORKER_F_FREE	= 4,	/* worker on free list */
27*4882a593Smuzhiyun 	IO_WORKER_F_BOUND	= 8,	/* is doing bounded work */
28*4882a593Smuzhiyun };
29*4882a593Smuzhiyun 
30*4882a593Smuzhiyun enum {
31*4882a593Smuzhiyun 	IO_WQ_BIT_EXIT		= 0,	/* wq exiting */
32*4882a593Smuzhiyun };
33*4882a593Smuzhiyun 
34*4882a593Smuzhiyun enum {
35*4882a593Smuzhiyun 	IO_ACCT_STALLED_BIT	= 0,	/* stalled on hash */
36*4882a593Smuzhiyun };
37*4882a593Smuzhiyun 
38*4882a593Smuzhiyun /*
39*4882a593Smuzhiyun  * One for each thread in a wqe pool
40*4882a593Smuzhiyun  */
41*4882a593Smuzhiyun struct io_worker {
42*4882a593Smuzhiyun 	refcount_t ref;
43*4882a593Smuzhiyun 	unsigned flags;
44*4882a593Smuzhiyun 	struct hlist_nulls_node nulls_node;
45*4882a593Smuzhiyun 	struct list_head all_list;
46*4882a593Smuzhiyun 	struct task_struct *task;
47*4882a593Smuzhiyun 	struct io_wqe *wqe;
48*4882a593Smuzhiyun 
49*4882a593Smuzhiyun 	struct io_wq_work *cur_work;
50*4882a593Smuzhiyun 	spinlock_t lock;
51*4882a593Smuzhiyun 
52*4882a593Smuzhiyun 	struct completion ref_done;
53*4882a593Smuzhiyun 
54*4882a593Smuzhiyun 	unsigned long create_state;
55*4882a593Smuzhiyun 	struct callback_head create_work;
56*4882a593Smuzhiyun 	int create_index;
57*4882a593Smuzhiyun 
58*4882a593Smuzhiyun 	union {
59*4882a593Smuzhiyun 		struct rcu_head rcu;
60*4882a593Smuzhiyun 		struct work_struct work;
61*4882a593Smuzhiyun 	};
62*4882a593Smuzhiyun };
63*4882a593Smuzhiyun 
64*4882a593Smuzhiyun #if BITS_PER_LONG == 64
65*4882a593Smuzhiyun #define IO_WQ_HASH_ORDER	6
66*4882a593Smuzhiyun #else
67*4882a593Smuzhiyun #define IO_WQ_HASH_ORDER	5
68*4882a593Smuzhiyun #endif
69*4882a593Smuzhiyun 
70*4882a593Smuzhiyun #define IO_WQ_NR_HASH_BUCKETS	(1u << IO_WQ_HASH_ORDER)
71*4882a593Smuzhiyun 
72*4882a593Smuzhiyun struct io_wqe_acct {
73*4882a593Smuzhiyun 	unsigned nr_workers;
74*4882a593Smuzhiyun 	unsigned max_workers;
75*4882a593Smuzhiyun 	int index;
76*4882a593Smuzhiyun 	atomic_t nr_running;
77*4882a593Smuzhiyun 	struct io_wq_work_list work_list;
78*4882a593Smuzhiyun 	unsigned long flags;
79*4882a593Smuzhiyun };
80*4882a593Smuzhiyun 
81*4882a593Smuzhiyun enum {
82*4882a593Smuzhiyun 	IO_WQ_ACCT_BOUND,
83*4882a593Smuzhiyun 	IO_WQ_ACCT_UNBOUND,
84*4882a593Smuzhiyun 	IO_WQ_ACCT_NR,
85*4882a593Smuzhiyun };
86*4882a593Smuzhiyun 
87*4882a593Smuzhiyun /*
88*4882a593Smuzhiyun  * Per-node worker thread pool
89*4882a593Smuzhiyun  */
90*4882a593Smuzhiyun struct io_wqe {
91*4882a593Smuzhiyun 	raw_spinlock_t lock;
92*4882a593Smuzhiyun 	struct io_wqe_acct acct[2];
93*4882a593Smuzhiyun 
94*4882a593Smuzhiyun 	int node;
95*4882a593Smuzhiyun 
96*4882a593Smuzhiyun 	struct hlist_nulls_head free_list;
97*4882a593Smuzhiyun 	struct list_head all_list;
98*4882a593Smuzhiyun 
99*4882a593Smuzhiyun 	struct wait_queue_entry wait;
100*4882a593Smuzhiyun 
101*4882a593Smuzhiyun 	struct io_wq *wq;
102*4882a593Smuzhiyun 	struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
103*4882a593Smuzhiyun 
104*4882a593Smuzhiyun 	cpumask_var_t cpu_mask;
105*4882a593Smuzhiyun };
106*4882a593Smuzhiyun 
107*4882a593Smuzhiyun /*
108*4882a593Smuzhiyun  * Per io_wq state
109*4882a593Smuzhiyun   */
110*4882a593Smuzhiyun struct io_wq {
111*4882a593Smuzhiyun 	unsigned long state;
112*4882a593Smuzhiyun 
113*4882a593Smuzhiyun 	free_work_fn *free_work;
114*4882a593Smuzhiyun 	io_wq_work_fn *do_work;
115*4882a593Smuzhiyun 
116*4882a593Smuzhiyun 	struct io_wq_hash *hash;
117*4882a593Smuzhiyun 
118*4882a593Smuzhiyun 	atomic_t worker_refs;
119*4882a593Smuzhiyun 	struct completion worker_done;
120*4882a593Smuzhiyun 
121*4882a593Smuzhiyun 	struct hlist_node cpuhp_node;
122*4882a593Smuzhiyun 
123*4882a593Smuzhiyun 	struct task_struct *task;
124*4882a593Smuzhiyun 
125*4882a593Smuzhiyun 	struct io_wqe *wqes[];
126*4882a593Smuzhiyun };
127*4882a593Smuzhiyun 
128*4882a593Smuzhiyun static enum cpuhp_state io_wq_online;
129*4882a593Smuzhiyun 
130*4882a593Smuzhiyun struct io_cb_cancel_data {
131*4882a593Smuzhiyun 	work_cancel_fn *fn;
132*4882a593Smuzhiyun 	void *data;
133*4882a593Smuzhiyun 	int nr_running;
134*4882a593Smuzhiyun 	int nr_pending;
135*4882a593Smuzhiyun 	bool cancel_all;
136*4882a593Smuzhiyun };
137*4882a593Smuzhiyun 
138*4882a593Smuzhiyun static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
139*4882a593Smuzhiyun static void io_wqe_dec_running(struct io_worker *worker);
140*4882a593Smuzhiyun static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
141*4882a593Smuzhiyun 					struct io_wqe_acct *acct,
142*4882a593Smuzhiyun 					struct io_cb_cancel_data *match);
143*4882a593Smuzhiyun static void create_worker_cb(struct callback_head *cb);
144*4882a593Smuzhiyun static void io_wq_cancel_tw_create(struct io_wq *wq);
145*4882a593Smuzhiyun 
io_worker_get(struct io_worker * worker)146*4882a593Smuzhiyun static bool io_worker_get(struct io_worker *worker)
147*4882a593Smuzhiyun {
148*4882a593Smuzhiyun 	return refcount_inc_not_zero(&worker->ref);
149*4882a593Smuzhiyun }
150*4882a593Smuzhiyun 
io_worker_release(struct io_worker * worker)151*4882a593Smuzhiyun static void io_worker_release(struct io_worker *worker)
152*4882a593Smuzhiyun {
153*4882a593Smuzhiyun 	if (refcount_dec_and_test(&worker->ref))
154*4882a593Smuzhiyun 		complete(&worker->ref_done);
155*4882a593Smuzhiyun }
156*4882a593Smuzhiyun 
io_get_acct(struct io_wqe * wqe,bool bound)157*4882a593Smuzhiyun static inline struct io_wqe_acct *io_get_acct(struct io_wqe *wqe, bool bound)
158*4882a593Smuzhiyun {
159*4882a593Smuzhiyun 	return &wqe->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
160*4882a593Smuzhiyun }
161*4882a593Smuzhiyun 
io_work_get_acct(struct io_wqe * wqe,struct io_wq_work * work)162*4882a593Smuzhiyun static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
163*4882a593Smuzhiyun 						   struct io_wq_work *work)
164*4882a593Smuzhiyun {
165*4882a593Smuzhiyun 	return io_get_acct(wqe, !(work->flags & IO_WQ_WORK_UNBOUND));
166*4882a593Smuzhiyun }
167*4882a593Smuzhiyun 
io_wqe_get_acct(struct io_worker * worker)168*4882a593Smuzhiyun static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker)
169*4882a593Smuzhiyun {
170*4882a593Smuzhiyun 	return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND);
171*4882a593Smuzhiyun }
172*4882a593Smuzhiyun 
io_worker_ref_put(struct io_wq * wq)173*4882a593Smuzhiyun static void io_worker_ref_put(struct io_wq *wq)
174*4882a593Smuzhiyun {
175*4882a593Smuzhiyun 	if (atomic_dec_and_test(&wq->worker_refs))
176*4882a593Smuzhiyun 		complete(&wq->worker_done);
177*4882a593Smuzhiyun }
178*4882a593Smuzhiyun 
io_worker_cancel_cb(struct io_worker * worker)179*4882a593Smuzhiyun static void io_worker_cancel_cb(struct io_worker *worker)
180*4882a593Smuzhiyun {
181*4882a593Smuzhiyun 	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
182*4882a593Smuzhiyun 	struct io_wqe *wqe = worker->wqe;
183*4882a593Smuzhiyun 	struct io_wq *wq = wqe->wq;
184*4882a593Smuzhiyun 
185*4882a593Smuzhiyun 	atomic_dec(&acct->nr_running);
186*4882a593Smuzhiyun 	raw_spin_lock(&worker->wqe->lock);
187*4882a593Smuzhiyun 	acct->nr_workers--;
188*4882a593Smuzhiyun 	raw_spin_unlock(&worker->wqe->lock);
189*4882a593Smuzhiyun 	io_worker_ref_put(wq);
190*4882a593Smuzhiyun 	clear_bit_unlock(0, &worker->create_state);
191*4882a593Smuzhiyun 	io_worker_release(worker);
192*4882a593Smuzhiyun }
193*4882a593Smuzhiyun 
io_task_worker_match(struct callback_head * cb,void * data)194*4882a593Smuzhiyun static bool io_task_worker_match(struct callback_head *cb, void *data)
195*4882a593Smuzhiyun {
196*4882a593Smuzhiyun 	struct io_worker *worker;
197*4882a593Smuzhiyun 
198*4882a593Smuzhiyun 	if (cb->func != create_worker_cb)
199*4882a593Smuzhiyun 		return false;
200*4882a593Smuzhiyun 	worker = container_of(cb, struct io_worker, create_work);
201*4882a593Smuzhiyun 	return worker == data;
202*4882a593Smuzhiyun }
203*4882a593Smuzhiyun 
io_worker_exit(struct io_worker * worker)204*4882a593Smuzhiyun static void io_worker_exit(struct io_worker *worker)
205*4882a593Smuzhiyun {
206*4882a593Smuzhiyun 	struct io_wqe *wqe = worker->wqe;
207*4882a593Smuzhiyun 	struct io_wq *wq = wqe->wq;
208*4882a593Smuzhiyun 
209*4882a593Smuzhiyun 	while (1) {
210*4882a593Smuzhiyun 		struct callback_head *cb = task_work_cancel_match(wq->task,
211*4882a593Smuzhiyun 						io_task_worker_match, worker);
212*4882a593Smuzhiyun 
213*4882a593Smuzhiyun 		if (!cb)
214*4882a593Smuzhiyun 			break;
215*4882a593Smuzhiyun 		io_worker_cancel_cb(worker);
216*4882a593Smuzhiyun 	}
217*4882a593Smuzhiyun 
218*4882a593Smuzhiyun 	if (refcount_dec_and_test(&worker->ref))
219*4882a593Smuzhiyun 		complete(&worker->ref_done);
220*4882a593Smuzhiyun 	wait_for_completion(&worker->ref_done);
221*4882a593Smuzhiyun 
222*4882a593Smuzhiyun 	raw_spin_lock(&wqe->lock);
223*4882a593Smuzhiyun 	if (worker->flags & IO_WORKER_F_FREE)
224*4882a593Smuzhiyun 		hlist_nulls_del_rcu(&worker->nulls_node);
225*4882a593Smuzhiyun 	list_del_rcu(&worker->all_list);
226*4882a593Smuzhiyun 	preempt_disable();
227*4882a593Smuzhiyun 	io_wqe_dec_running(worker);
228*4882a593Smuzhiyun 	worker->flags = 0;
229*4882a593Smuzhiyun 	current->flags &= ~PF_IO_WORKER;
230*4882a593Smuzhiyun 	preempt_enable();
231*4882a593Smuzhiyun 	raw_spin_unlock(&wqe->lock);
232*4882a593Smuzhiyun 
233*4882a593Smuzhiyun 	kfree_rcu(worker, rcu);
234*4882a593Smuzhiyun 	io_worker_ref_put(wqe->wq);
235*4882a593Smuzhiyun 	do_exit(0);
236*4882a593Smuzhiyun }
237*4882a593Smuzhiyun 
io_acct_run_queue(struct io_wqe_acct * acct)238*4882a593Smuzhiyun static inline bool io_acct_run_queue(struct io_wqe_acct *acct)
239*4882a593Smuzhiyun {
240*4882a593Smuzhiyun 	if (!wq_list_empty(&acct->work_list) &&
241*4882a593Smuzhiyun 	    !test_bit(IO_ACCT_STALLED_BIT, &acct->flags))
242*4882a593Smuzhiyun 		return true;
243*4882a593Smuzhiyun 	return false;
244*4882a593Smuzhiyun }
245*4882a593Smuzhiyun 
246*4882a593Smuzhiyun /*
247*4882a593Smuzhiyun  * Check head of free list for an available worker. If one isn't available,
248*4882a593Smuzhiyun  * caller must create one.
249*4882a593Smuzhiyun  */
io_wqe_activate_free_worker(struct io_wqe * wqe,struct io_wqe_acct * acct)250*4882a593Smuzhiyun static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
251*4882a593Smuzhiyun 					struct io_wqe_acct *acct)
252*4882a593Smuzhiyun 	__must_hold(RCU)
253*4882a593Smuzhiyun {
254*4882a593Smuzhiyun 	struct hlist_nulls_node *n;
255*4882a593Smuzhiyun 	struct io_worker *worker;
256*4882a593Smuzhiyun 
257*4882a593Smuzhiyun 	/*
258*4882a593Smuzhiyun 	 * Iterate free_list and see if we can find an idle worker to
259*4882a593Smuzhiyun 	 * activate. If a given worker is on the free_list but in the process
260*4882a593Smuzhiyun 	 * of exiting, keep trying.
261*4882a593Smuzhiyun 	 */
262*4882a593Smuzhiyun 	hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
263*4882a593Smuzhiyun 		if (!io_worker_get(worker))
264*4882a593Smuzhiyun 			continue;
265*4882a593Smuzhiyun 		if (io_wqe_get_acct(worker) != acct) {
266*4882a593Smuzhiyun 			io_worker_release(worker);
267*4882a593Smuzhiyun 			continue;
268*4882a593Smuzhiyun 		}
269*4882a593Smuzhiyun 		if (wake_up_process(worker->task)) {
270*4882a593Smuzhiyun 			io_worker_release(worker);
271*4882a593Smuzhiyun 			return true;
272*4882a593Smuzhiyun 		}
273*4882a593Smuzhiyun 		io_worker_release(worker);
274*4882a593Smuzhiyun 	}
275*4882a593Smuzhiyun 
276*4882a593Smuzhiyun 	return false;
277*4882a593Smuzhiyun }
278*4882a593Smuzhiyun 
279*4882a593Smuzhiyun /*
280*4882a593Smuzhiyun  * We need a worker. If we find a free one, we're good. If not, and we're
281*4882a593Smuzhiyun  * below the max number of workers, create one.
282*4882a593Smuzhiyun  */
io_wqe_create_worker(struct io_wqe * wqe,struct io_wqe_acct * acct)283*4882a593Smuzhiyun static bool io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
284*4882a593Smuzhiyun {
285*4882a593Smuzhiyun 	/*
286*4882a593Smuzhiyun 	 * Most likely an attempt to queue unbounded work on an io_wq that
287*4882a593Smuzhiyun 	 * wasn't setup with any unbounded workers.
288*4882a593Smuzhiyun 	 */
289*4882a593Smuzhiyun 	if (unlikely(!acct->max_workers))
290*4882a593Smuzhiyun 		pr_warn_once("io-wq is not configured for unbound workers");
291*4882a593Smuzhiyun 
292*4882a593Smuzhiyun 	raw_spin_lock(&wqe->lock);
293*4882a593Smuzhiyun 	if (acct->nr_workers >= acct->max_workers) {
294*4882a593Smuzhiyun 		raw_spin_unlock(&wqe->lock);
295*4882a593Smuzhiyun 		return true;
296*4882a593Smuzhiyun 	}
297*4882a593Smuzhiyun 	acct->nr_workers++;
298*4882a593Smuzhiyun 	raw_spin_unlock(&wqe->lock);
299*4882a593Smuzhiyun 	atomic_inc(&acct->nr_running);
300*4882a593Smuzhiyun 	atomic_inc(&wqe->wq->worker_refs);
301*4882a593Smuzhiyun 	return create_io_worker(wqe->wq, wqe, acct->index);
302*4882a593Smuzhiyun }
303*4882a593Smuzhiyun 
io_wqe_inc_running(struct io_worker * worker)304*4882a593Smuzhiyun static void io_wqe_inc_running(struct io_worker *worker)
305*4882a593Smuzhiyun {
306*4882a593Smuzhiyun 	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
307*4882a593Smuzhiyun 
308*4882a593Smuzhiyun 	atomic_inc(&acct->nr_running);
309*4882a593Smuzhiyun }
310*4882a593Smuzhiyun 
create_worker_cb(struct callback_head * cb)311*4882a593Smuzhiyun static void create_worker_cb(struct callback_head *cb)
312*4882a593Smuzhiyun {
313*4882a593Smuzhiyun 	struct io_worker *worker;
314*4882a593Smuzhiyun 	struct io_wq *wq;
315*4882a593Smuzhiyun 	struct io_wqe *wqe;
316*4882a593Smuzhiyun 	struct io_wqe_acct *acct;
317*4882a593Smuzhiyun 	bool do_create = false;
318*4882a593Smuzhiyun 
319*4882a593Smuzhiyun 	worker = container_of(cb, struct io_worker, create_work);
320*4882a593Smuzhiyun 	wqe = worker->wqe;
321*4882a593Smuzhiyun 	wq = wqe->wq;
322*4882a593Smuzhiyun 	acct = &wqe->acct[worker->create_index];
323*4882a593Smuzhiyun 	raw_spin_lock(&wqe->lock);
324*4882a593Smuzhiyun 	if (acct->nr_workers < acct->max_workers) {
325*4882a593Smuzhiyun 		acct->nr_workers++;
326*4882a593Smuzhiyun 		do_create = true;
327*4882a593Smuzhiyun 	}
328*4882a593Smuzhiyun 	raw_spin_unlock(&wqe->lock);
329*4882a593Smuzhiyun 	if (do_create) {
330*4882a593Smuzhiyun 		create_io_worker(wq, wqe, worker->create_index);
331*4882a593Smuzhiyun 	} else {
332*4882a593Smuzhiyun 		atomic_dec(&acct->nr_running);
333*4882a593Smuzhiyun 		io_worker_ref_put(wq);
334*4882a593Smuzhiyun 	}
335*4882a593Smuzhiyun 	clear_bit_unlock(0, &worker->create_state);
336*4882a593Smuzhiyun 	io_worker_release(worker);
337*4882a593Smuzhiyun }
338*4882a593Smuzhiyun 
io_queue_worker_create(struct io_worker * worker,struct io_wqe_acct * acct,task_work_func_t func)339*4882a593Smuzhiyun static bool io_queue_worker_create(struct io_worker *worker,
340*4882a593Smuzhiyun 				   struct io_wqe_acct *acct,
341*4882a593Smuzhiyun 				   task_work_func_t func)
342*4882a593Smuzhiyun {
343*4882a593Smuzhiyun 	struct io_wqe *wqe = worker->wqe;
344*4882a593Smuzhiyun 	struct io_wq *wq = wqe->wq;
345*4882a593Smuzhiyun 
346*4882a593Smuzhiyun 	/* raced with exit, just ignore create call */
347*4882a593Smuzhiyun 	if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
348*4882a593Smuzhiyun 		goto fail;
349*4882a593Smuzhiyun 	if (!io_worker_get(worker))
350*4882a593Smuzhiyun 		goto fail;
351*4882a593Smuzhiyun 	/*
352*4882a593Smuzhiyun 	 * create_state manages ownership of create_work/index. We should
353*4882a593Smuzhiyun 	 * only need one entry per worker, as the worker going to sleep
354*4882a593Smuzhiyun 	 * will trigger the condition, and waking will clear it once it
355*4882a593Smuzhiyun 	 * runs the task_work.
356*4882a593Smuzhiyun 	 */
357*4882a593Smuzhiyun 	if (test_bit(0, &worker->create_state) ||
358*4882a593Smuzhiyun 	    test_and_set_bit_lock(0, &worker->create_state))
359*4882a593Smuzhiyun 		goto fail_release;
360*4882a593Smuzhiyun 
361*4882a593Smuzhiyun 	atomic_inc(&wq->worker_refs);
362*4882a593Smuzhiyun 	init_task_work(&worker->create_work, func);
363*4882a593Smuzhiyun 	worker->create_index = acct->index;
364*4882a593Smuzhiyun 	if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) {
365*4882a593Smuzhiyun 		/*
366*4882a593Smuzhiyun 		 * EXIT may have been set after checking it above, check after
367*4882a593Smuzhiyun 		 * adding the task_work and remove any creation item if it is
368*4882a593Smuzhiyun 		 * now set. wq exit does that too, but we can have added this
369*4882a593Smuzhiyun 		 * work item after we canceled in io_wq_exit_workers().
370*4882a593Smuzhiyun 		 */
371*4882a593Smuzhiyun 		if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
372*4882a593Smuzhiyun 			io_wq_cancel_tw_create(wq);
373*4882a593Smuzhiyun 		io_worker_ref_put(wq);
374*4882a593Smuzhiyun 		return true;
375*4882a593Smuzhiyun 	}
376*4882a593Smuzhiyun 	io_worker_ref_put(wq);
377*4882a593Smuzhiyun 	clear_bit_unlock(0, &worker->create_state);
378*4882a593Smuzhiyun fail_release:
379*4882a593Smuzhiyun 	io_worker_release(worker);
380*4882a593Smuzhiyun fail:
381*4882a593Smuzhiyun 	atomic_dec(&acct->nr_running);
382*4882a593Smuzhiyun 	io_worker_ref_put(wq);
383*4882a593Smuzhiyun 	return false;
384*4882a593Smuzhiyun }
385*4882a593Smuzhiyun 
io_wqe_dec_running(struct io_worker * worker)386*4882a593Smuzhiyun static void io_wqe_dec_running(struct io_worker *worker)
387*4882a593Smuzhiyun 	__must_hold(wqe->lock)
388*4882a593Smuzhiyun {
389*4882a593Smuzhiyun 	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
390*4882a593Smuzhiyun 	struct io_wqe *wqe = worker->wqe;
391*4882a593Smuzhiyun 
392*4882a593Smuzhiyun 	if (!(worker->flags & IO_WORKER_F_UP))
393*4882a593Smuzhiyun 		return;
394*4882a593Smuzhiyun 
395*4882a593Smuzhiyun 	if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) {
396*4882a593Smuzhiyun 		atomic_inc(&acct->nr_running);
397*4882a593Smuzhiyun 		atomic_inc(&wqe->wq->worker_refs);
398*4882a593Smuzhiyun 		raw_spin_unlock(&wqe->lock);
399*4882a593Smuzhiyun 		io_queue_worker_create(worker, acct, create_worker_cb);
400*4882a593Smuzhiyun 		raw_spin_lock(&wqe->lock);
401*4882a593Smuzhiyun 	}
402*4882a593Smuzhiyun }
403*4882a593Smuzhiyun 
404*4882a593Smuzhiyun /*
405*4882a593Smuzhiyun  * Worker will start processing some work. Move it to the busy list, if
406*4882a593Smuzhiyun  * it's currently on the freelist
407*4882a593Smuzhiyun  */
__io_worker_busy(struct io_wqe * wqe,struct io_worker * worker,struct io_wq_work * work)408*4882a593Smuzhiyun static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
409*4882a593Smuzhiyun 			     struct io_wq_work *work)
410*4882a593Smuzhiyun 	__must_hold(wqe->lock)
411*4882a593Smuzhiyun {
412*4882a593Smuzhiyun 	if (worker->flags & IO_WORKER_F_FREE) {
413*4882a593Smuzhiyun 		worker->flags &= ~IO_WORKER_F_FREE;
414*4882a593Smuzhiyun 		hlist_nulls_del_init_rcu(&worker->nulls_node);
415*4882a593Smuzhiyun 	}
416*4882a593Smuzhiyun }
417*4882a593Smuzhiyun 
418*4882a593Smuzhiyun /*
419*4882a593Smuzhiyun  * No work, worker going to sleep. Move to freelist, and unuse mm if we
420*4882a593Smuzhiyun  * have one attached. Dropping the mm may potentially sleep, so we drop
421*4882a593Smuzhiyun  * the lock in that case and return success. Since the caller has to
422*4882a593Smuzhiyun  * retry the loop in that case (we changed task state), we don't regrab
423*4882a593Smuzhiyun  * the lock if we return success.
424*4882a593Smuzhiyun  */
__io_worker_idle(struct io_wqe * wqe,struct io_worker * worker)425*4882a593Smuzhiyun static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
426*4882a593Smuzhiyun 	__must_hold(wqe->lock)
427*4882a593Smuzhiyun {
428*4882a593Smuzhiyun 	if (!(worker->flags & IO_WORKER_F_FREE)) {
429*4882a593Smuzhiyun 		worker->flags |= IO_WORKER_F_FREE;
430*4882a593Smuzhiyun 		hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
431*4882a593Smuzhiyun 	}
432*4882a593Smuzhiyun }
433*4882a593Smuzhiyun 
io_get_work_hash(struct io_wq_work * work)434*4882a593Smuzhiyun static inline unsigned int io_get_work_hash(struct io_wq_work *work)
435*4882a593Smuzhiyun {
436*4882a593Smuzhiyun 	return work->flags >> IO_WQ_HASH_SHIFT;
437*4882a593Smuzhiyun }
438*4882a593Smuzhiyun 
io_wait_on_hash(struct io_wqe * wqe,unsigned int hash)439*4882a593Smuzhiyun static bool io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
440*4882a593Smuzhiyun {
441*4882a593Smuzhiyun 	struct io_wq *wq = wqe->wq;
442*4882a593Smuzhiyun 	bool ret = false;
443*4882a593Smuzhiyun 
444*4882a593Smuzhiyun 	spin_lock_irq(&wq->hash->wait.lock);
445*4882a593Smuzhiyun 	if (list_empty(&wqe->wait.entry)) {
446*4882a593Smuzhiyun 		__add_wait_queue(&wq->hash->wait, &wqe->wait);
447*4882a593Smuzhiyun 		if (!test_bit(hash, &wq->hash->map)) {
448*4882a593Smuzhiyun 			__set_current_state(TASK_RUNNING);
449*4882a593Smuzhiyun 			list_del_init(&wqe->wait.entry);
450*4882a593Smuzhiyun 			ret = true;
451*4882a593Smuzhiyun 		}
452*4882a593Smuzhiyun 	}
453*4882a593Smuzhiyun 	spin_unlock_irq(&wq->hash->wait.lock);
454*4882a593Smuzhiyun 	return ret;
455*4882a593Smuzhiyun }
456*4882a593Smuzhiyun 
io_get_next_work(struct io_wqe_acct * acct,struct io_worker * worker)457*4882a593Smuzhiyun static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
458*4882a593Smuzhiyun 					   struct io_worker *worker)
459*4882a593Smuzhiyun 	__must_hold(wqe->lock)
460*4882a593Smuzhiyun {
461*4882a593Smuzhiyun 	struct io_wq_work_node *node, *prev;
462*4882a593Smuzhiyun 	struct io_wq_work *work, *tail;
463*4882a593Smuzhiyun 	unsigned int stall_hash = -1U;
464*4882a593Smuzhiyun 	struct io_wqe *wqe = worker->wqe;
465*4882a593Smuzhiyun 
466*4882a593Smuzhiyun 	wq_list_for_each(node, prev, &acct->work_list) {
467*4882a593Smuzhiyun 		unsigned int hash;
468*4882a593Smuzhiyun 
469*4882a593Smuzhiyun 		work = container_of(node, struct io_wq_work, list);
470*4882a593Smuzhiyun 
471*4882a593Smuzhiyun 		/* not hashed, can run anytime */
472*4882a593Smuzhiyun 		if (!io_wq_is_hashed(work)) {
473*4882a593Smuzhiyun 			wq_list_del(&acct->work_list, node, prev);
474*4882a593Smuzhiyun 			return work;
475*4882a593Smuzhiyun 		}
476*4882a593Smuzhiyun 
477*4882a593Smuzhiyun 		hash = io_get_work_hash(work);
478*4882a593Smuzhiyun 		/* all items with this hash lie in [work, tail] */
479*4882a593Smuzhiyun 		tail = wqe->hash_tail[hash];
480*4882a593Smuzhiyun 
481*4882a593Smuzhiyun 		/* hashed, can run if not already running */
482*4882a593Smuzhiyun 		if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
483*4882a593Smuzhiyun 			wqe->hash_tail[hash] = NULL;
484*4882a593Smuzhiyun 			wq_list_cut(&acct->work_list, &tail->list, prev);
485*4882a593Smuzhiyun 			return work;
486*4882a593Smuzhiyun 		}
487*4882a593Smuzhiyun 		if (stall_hash == -1U)
488*4882a593Smuzhiyun 			stall_hash = hash;
489*4882a593Smuzhiyun 		/* fast forward to a next hash, for-each will fix up @prev */
490*4882a593Smuzhiyun 		node = &tail->list;
491*4882a593Smuzhiyun 	}
492*4882a593Smuzhiyun 
493*4882a593Smuzhiyun 	if (stall_hash != -1U) {
494*4882a593Smuzhiyun 		bool unstalled;
495*4882a593Smuzhiyun 
496*4882a593Smuzhiyun 		/*
497*4882a593Smuzhiyun 		 * Set this before dropping the lock to avoid racing with new
498*4882a593Smuzhiyun 		 * work being added and clearing the stalled bit.
499*4882a593Smuzhiyun 		 */
500*4882a593Smuzhiyun 		set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
501*4882a593Smuzhiyun 		raw_spin_unlock(&wqe->lock);
502*4882a593Smuzhiyun 		unstalled = io_wait_on_hash(wqe, stall_hash);
503*4882a593Smuzhiyun 		raw_spin_lock(&wqe->lock);
504*4882a593Smuzhiyun 		if (unstalled) {
505*4882a593Smuzhiyun 			clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
506*4882a593Smuzhiyun 			if (wq_has_sleeper(&wqe->wq->hash->wait))
507*4882a593Smuzhiyun 				wake_up(&wqe->wq->hash->wait);
508*4882a593Smuzhiyun 		}
509*4882a593Smuzhiyun 	}
510*4882a593Smuzhiyun 
511*4882a593Smuzhiyun 	return NULL;
512*4882a593Smuzhiyun }
513*4882a593Smuzhiyun 
io_flush_signals(void)514*4882a593Smuzhiyun static bool io_flush_signals(void)
515*4882a593Smuzhiyun {
516*4882a593Smuzhiyun 	if (unlikely(test_thread_flag(TIF_NOTIFY_SIGNAL))) {
517*4882a593Smuzhiyun 		__set_current_state(TASK_RUNNING);
518*4882a593Smuzhiyun 		tracehook_notify_signal();
519*4882a593Smuzhiyun 		return true;
520*4882a593Smuzhiyun 	}
521*4882a593Smuzhiyun 	return false;
522*4882a593Smuzhiyun }
523*4882a593Smuzhiyun 
io_assign_current_work(struct io_worker * worker,struct io_wq_work * work)524*4882a593Smuzhiyun static void io_assign_current_work(struct io_worker *worker,
525*4882a593Smuzhiyun 				   struct io_wq_work *work)
526*4882a593Smuzhiyun {
527*4882a593Smuzhiyun 	if (work) {
528*4882a593Smuzhiyun 		io_flush_signals();
529*4882a593Smuzhiyun 		cond_resched();
530*4882a593Smuzhiyun 	}
531*4882a593Smuzhiyun 
532*4882a593Smuzhiyun 	spin_lock(&worker->lock);
533*4882a593Smuzhiyun 	worker->cur_work = work;
534*4882a593Smuzhiyun 	spin_unlock(&worker->lock);
535*4882a593Smuzhiyun }
536*4882a593Smuzhiyun 
537*4882a593Smuzhiyun static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
538*4882a593Smuzhiyun 
io_worker_handle_work(struct io_worker * worker)539*4882a593Smuzhiyun static void io_worker_handle_work(struct io_worker *worker)
540*4882a593Smuzhiyun 	__releases(wqe->lock)
541*4882a593Smuzhiyun {
542*4882a593Smuzhiyun 	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
543*4882a593Smuzhiyun 	struct io_wqe *wqe = worker->wqe;
544*4882a593Smuzhiyun 	struct io_wq *wq = wqe->wq;
545*4882a593Smuzhiyun 	bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
546*4882a593Smuzhiyun 
547*4882a593Smuzhiyun 	do {
548*4882a593Smuzhiyun 		struct io_wq_work *work;
549*4882a593Smuzhiyun get_next:
550*4882a593Smuzhiyun 		/*
551*4882a593Smuzhiyun 		 * If we got some work, mark us as busy. If we didn't, but
552*4882a593Smuzhiyun 		 * the list isn't empty, it means we stalled on hashed work.
553*4882a593Smuzhiyun 		 * Mark us stalled so we don't keep looking for work when we
554*4882a593Smuzhiyun 		 * can't make progress, any work completion or insertion will
555*4882a593Smuzhiyun 		 * clear the stalled flag.
556*4882a593Smuzhiyun 		 */
557*4882a593Smuzhiyun 		work = io_get_next_work(acct, worker);
558*4882a593Smuzhiyun 		if (work)
559*4882a593Smuzhiyun 			__io_worker_busy(wqe, worker, work);
560*4882a593Smuzhiyun 
561*4882a593Smuzhiyun 		raw_spin_unlock(&wqe->lock);
562*4882a593Smuzhiyun 		if (!work)
563*4882a593Smuzhiyun 			break;
564*4882a593Smuzhiyun 		io_assign_current_work(worker, work);
565*4882a593Smuzhiyun 		__set_current_state(TASK_RUNNING);
566*4882a593Smuzhiyun 
567*4882a593Smuzhiyun 		/* handle a whole dependent link */
568*4882a593Smuzhiyun 		do {
569*4882a593Smuzhiyun 			struct io_wq_work *next_hashed, *linked;
570*4882a593Smuzhiyun 			unsigned int hash = io_get_work_hash(work);
571*4882a593Smuzhiyun 
572*4882a593Smuzhiyun 			next_hashed = wq_next_work(work);
573*4882a593Smuzhiyun 
574*4882a593Smuzhiyun 			if (unlikely(do_kill) && (work->flags & IO_WQ_WORK_UNBOUND))
575*4882a593Smuzhiyun 				work->flags |= IO_WQ_WORK_CANCEL;
576*4882a593Smuzhiyun 			wq->do_work(work);
577*4882a593Smuzhiyun 			io_assign_current_work(worker, NULL);
578*4882a593Smuzhiyun 
579*4882a593Smuzhiyun 			linked = wq->free_work(work);
580*4882a593Smuzhiyun 			work = next_hashed;
581*4882a593Smuzhiyun 			if (!work && linked && !io_wq_is_hashed(linked)) {
582*4882a593Smuzhiyun 				work = linked;
583*4882a593Smuzhiyun 				linked = NULL;
584*4882a593Smuzhiyun 			}
585*4882a593Smuzhiyun 			io_assign_current_work(worker, work);
586*4882a593Smuzhiyun 			if (linked)
587*4882a593Smuzhiyun 				io_wqe_enqueue(wqe, linked);
588*4882a593Smuzhiyun 
589*4882a593Smuzhiyun 			if (hash != -1U && !next_hashed) {
590*4882a593Smuzhiyun 				/* serialize hash clear with wake_up() */
591*4882a593Smuzhiyun 				spin_lock_irq(&wq->hash->wait.lock);
592*4882a593Smuzhiyun 				clear_bit(hash, &wq->hash->map);
593*4882a593Smuzhiyun 				clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
594*4882a593Smuzhiyun 				spin_unlock_irq(&wq->hash->wait.lock);
595*4882a593Smuzhiyun 				if (wq_has_sleeper(&wq->hash->wait))
596*4882a593Smuzhiyun 					wake_up(&wq->hash->wait);
597*4882a593Smuzhiyun 				raw_spin_lock(&wqe->lock);
598*4882a593Smuzhiyun 				/* skip unnecessary unlock-lock wqe->lock */
599*4882a593Smuzhiyun 				if (!work)
600*4882a593Smuzhiyun 					goto get_next;
601*4882a593Smuzhiyun 				raw_spin_unlock(&wqe->lock);
602*4882a593Smuzhiyun 			}
603*4882a593Smuzhiyun 		} while (work);
604*4882a593Smuzhiyun 
605*4882a593Smuzhiyun 		raw_spin_lock(&wqe->lock);
606*4882a593Smuzhiyun 	} while (1);
607*4882a593Smuzhiyun }
608*4882a593Smuzhiyun 
io_wqe_worker(void * data)609*4882a593Smuzhiyun static int io_wqe_worker(void *data)
610*4882a593Smuzhiyun {
611*4882a593Smuzhiyun 	struct io_worker *worker = data;
612*4882a593Smuzhiyun 	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
613*4882a593Smuzhiyun 	struct io_wqe *wqe = worker->wqe;
614*4882a593Smuzhiyun 	struct io_wq *wq = wqe->wq;
615*4882a593Smuzhiyun 	bool last_timeout = false;
616*4882a593Smuzhiyun 	char buf[TASK_COMM_LEN];
617*4882a593Smuzhiyun 
618*4882a593Smuzhiyun 	worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
619*4882a593Smuzhiyun 
620*4882a593Smuzhiyun 	snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
621*4882a593Smuzhiyun 	set_task_comm(current, buf);
622*4882a593Smuzhiyun 
623*4882a593Smuzhiyun 	while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
624*4882a593Smuzhiyun 		long ret;
625*4882a593Smuzhiyun 
626*4882a593Smuzhiyun 		set_current_state(TASK_INTERRUPTIBLE);
627*4882a593Smuzhiyun loop:
628*4882a593Smuzhiyun 		raw_spin_lock(&wqe->lock);
629*4882a593Smuzhiyun 		if (io_acct_run_queue(acct)) {
630*4882a593Smuzhiyun 			io_worker_handle_work(worker);
631*4882a593Smuzhiyun 			goto loop;
632*4882a593Smuzhiyun 		}
633*4882a593Smuzhiyun 		/* timed out, exit unless we're the last worker */
634*4882a593Smuzhiyun 		if (last_timeout && acct->nr_workers > 1) {
635*4882a593Smuzhiyun 			acct->nr_workers--;
636*4882a593Smuzhiyun 			raw_spin_unlock(&wqe->lock);
637*4882a593Smuzhiyun 			__set_current_state(TASK_RUNNING);
638*4882a593Smuzhiyun 			break;
639*4882a593Smuzhiyun 		}
640*4882a593Smuzhiyun 		last_timeout = false;
641*4882a593Smuzhiyun 		__io_worker_idle(wqe, worker);
642*4882a593Smuzhiyun 		raw_spin_unlock(&wqe->lock);
643*4882a593Smuzhiyun 		if (io_flush_signals())
644*4882a593Smuzhiyun 			continue;
645*4882a593Smuzhiyun 		ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
646*4882a593Smuzhiyun 		if (signal_pending(current)) {
647*4882a593Smuzhiyun 			struct ksignal ksig;
648*4882a593Smuzhiyun 
649*4882a593Smuzhiyun 			if (!get_signal(&ksig))
650*4882a593Smuzhiyun 				continue;
651*4882a593Smuzhiyun 			break;
652*4882a593Smuzhiyun 		}
653*4882a593Smuzhiyun 		last_timeout = !ret;
654*4882a593Smuzhiyun 	}
655*4882a593Smuzhiyun 
656*4882a593Smuzhiyun 	if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
657*4882a593Smuzhiyun 		raw_spin_lock(&wqe->lock);
658*4882a593Smuzhiyun 		io_worker_handle_work(worker);
659*4882a593Smuzhiyun 	}
660*4882a593Smuzhiyun 
661*4882a593Smuzhiyun 	io_worker_exit(worker);
662*4882a593Smuzhiyun 	return 0;
663*4882a593Smuzhiyun }
664*4882a593Smuzhiyun 
665*4882a593Smuzhiyun /*
666*4882a593Smuzhiyun  * Called when a worker is scheduled in. Mark us as currently running.
667*4882a593Smuzhiyun  */
io_wq_worker_running(struct task_struct * tsk)668*4882a593Smuzhiyun void io_wq_worker_running(struct task_struct *tsk)
669*4882a593Smuzhiyun {
670*4882a593Smuzhiyun 	struct io_worker *worker = tsk->pf_io_worker;
671*4882a593Smuzhiyun 
672*4882a593Smuzhiyun 	if (!worker)
673*4882a593Smuzhiyun 		return;
674*4882a593Smuzhiyun 	if (!(worker->flags & IO_WORKER_F_UP))
675*4882a593Smuzhiyun 		return;
676*4882a593Smuzhiyun 	if (worker->flags & IO_WORKER_F_RUNNING)
677*4882a593Smuzhiyun 		return;
678*4882a593Smuzhiyun 	worker->flags |= IO_WORKER_F_RUNNING;
679*4882a593Smuzhiyun 	io_wqe_inc_running(worker);
680*4882a593Smuzhiyun }
681*4882a593Smuzhiyun 
682*4882a593Smuzhiyun /*
683*4882a593Smuzhiyun  * Called when worker is going to sleep. If there are no workers currently
684*4882a593Smuzhiyun  * running and we have work pending, wake up a free one or create a new one.
685*4882a593Smuzhiyun  */
io_wq_worker_sleeping(struct task_struct * tsk)686*4882a593Smuzhiyun void io_wq_worker_sleeping(struct task_struct *tsk)
687*4882a593Smuzhiyun {
688*4882a593Smuzhiyun 	struct io_worker *worker = tsk->pf_io_worker;
689*4882a593Smuzhiyun 
690*4882a593Smuzhiyun 	if (!worker)
691*4882a593Smuzhiyun 		return;
692*4882a593Smuzhiyun 	if (!(worker->flags & IO_WORKER_F_UP))
693*4882a593Smuzhiyun 		return;
694*4882a593Smuzhiyun 	if (!(worker->flags & IO_WORKER_F_RUNNING))
695*4882a593Smuzhiyun 		return;
696*4882a593Smuzhiyun 
697*4882a593Smuzhiyun 	worker->flags &= ~IO_WORKER_F_RUNNING;
698*4882a593Smuzhiyun 
699*4882a593Smuzhiyun 	raw_spin_lock(&worker->wqe->lock);
700*4882a593Smuzhiyun 	io_wqe_dec_running(worker);
701*4882a593Smuzhiyun 	raw_spin_unlock(&worker->wqe->lock);
702*4882a593Smuzhiyun }
703*4882a593Smuzhiyun 
io_init_new_worker(struct io_wqe * wqe,struct io_worker * worker,struct task_struct * tsk)704*4882a593Smuzhiyun static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker,
705*4882a593Smuzhiyun 			       struct task_struct *tsk)
706*4882a593Smuzhiyun {
707*4882a593Smuzhiyun 	tsk->pf_io_worker = worker;
708*4882a593Smuzhiyun 	worker->task = tsk;
709*4882a593Smuzhiyun 	set_cpus_allowed_ptr(tsk, wqe->cpu_mask);
710*4882a593Smuzhiyun 	tsk->flags |= PF_NO_SETAFFINITY;
711*4882a593Smuzhiyun 
712*4882a593Smuzhiyun 	raw_spin_lock(&wqe->lock);
713*4882a593Smuzhiyun 	hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
714*4882a593Smuzhiyun 	list_add_tail_rcu(&worker->all_list, &wqe->all_list);
715*4882a593Smuzhiyun 	worker->flags |= IO_WORKER_F_FREE;
716*4882a593Smuzhiyun 	raw_spin_unlock(&wqe->lock);
717*4882a593Smuzhiyun 	wake_up_new_task(tsk);
718*4882a593Smuzhiyun }
719*4882a593Smuzhiyun 
io_wq_work_match_all(struct io_wq_work * work,void * data)720*4882a593Smuzhiyun static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
721*4882a593Smuzhiyun {
722*4882a593Smuzhiyun 	return true;
723*4882a593Smuzhiyun }
724*4882a593Smuzhiyun 
io_should_retry_thread(long err)725*4882a593Smuzhiyun static inline bool io_should_retry_thread(long err)
726*4882a593Smuzhiyun {
727*4882a593Smuzhiyun 	/*
728*4882a593Smuzhiyun 	 * Prevent perpetual task_work retry, if the task (or its group) is
729*4882a593Smuzhiyun 	 * exiting.
730*4882a593Smuzhiyun 	 */
731*4882a593Smuzhiyun 	if (fatal_signal_pending(current))
732*4882a593Smuzhiyun 		return false;
733*4882a593Smuzhiyun 
734*4882a593Smuzhiyun 	switch (err) {
735*4882a593Smuzhiyun 	case -EAGAIN:
736*4882a593Smuzhiyun 	case -ERESTARTSYS:
737*4882a593Smuzhiyun 	case -ERESTARTNOINTR:
738*4882a593Smuzhiyun 	case -ERESTARTNOHAND:
739*4882a593Smuzhiyun 		return true;
740*4882a593Smuzhiyun 	default:
741*4882a593Smuzhiyun 		return false;
742*4882a593Smuzhiyun 	}
743*4882a593Smuzhiyun }
744*4882a593Smuzhiyun 
create_worker_cont(struct callback_head * cb)745*4882a593Smuzhiyun static void create_worker_cont(struct callback_head *cb)
746*4882a593Smuzhiyun {
747*4882a593Smuzhiyun 	struct io_worker *worker;
748*4882a593Smuzhiyun 	struct task_struct *tsk;
749*4882a593Smuzhiyun 	struct io_wqe *wqe;
750*4882a593Smuzhiyun 
751*4882a593Smuzhiyun 	worker = container_of(cb, struct io_worker, create_work);
752*4882a593Smuzhiyun 	clear_bit_unlock(0, &worker->create_state);
753*4882a593Smuzhiyun 	wqe = worker->wqe;
754*4882a593Smuzhiyun 	tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
755*4882a593Smuzhiyun 	if (!IS_ERR(tsk)) {
756*4882a593Smuzhiyun 		io_init_new_worker(wqe, worker, tsk);
757*4882a593Smuzhiyun 		io_worker_release(worker);
758*4882a593Smuzhiyun 		return;
759*4882a593Smuzhiyun 	} else if (!io_should_retry_thread(PTR_ERR(tsk))) {
760*4882a593Smuzhiyun 		struct io_wqe_acct *acct = io_wqe_get_acct(worker);
761*4882a593Smuzhiyun 
762*4882a593Smuzhiyun 		atomic_dec(&acct->nr_running);
763*4882a593Smuzhiyun 		raw_spin_lock(&wqe->lock);
764*4882a593Smuzhiyun 		acct->nr_workers--;
765*4882a593Smuzhiyun 		if (!acct->nr_workers) {
766*4882a593Smuzhiyun 			struct io_cb_cancel_data match = {
767*4882a593Smuzhiyun 				.fn		= io_wq_work_match_all,
768*4882a593Smuzhiyun 				.cancel_all	= true,
769*4882a593Smuzhiyun 			};
770*4882a593Smuzhiyun 
771*4882a593Smuzhiyun 			while (io_acct_cancel_pending_work(wqe, acct, &match))
772*4882a593Smuzhiyun 				raw_spin_lock(&wqe->lock);
773*4882a593Smuzhiyun 		}
774*4882a593Smuzhiyun 		raw_spin_unlock(&wqe->lock);
775*4882a593Smuzhiyun 		io_worker_ref_put(wqe->wq);
776*4882a593Smuzhiyun 		kfree(worker);
777*4882a593Smuzhiyun 		return;
778*4882a593Smuzhiyun 	}
779*4882a593Smuzhiyun 
780*4882a593Smuzhiyun 	/* re-create attempts grab a new worker ref, drop the existing one */
781*4882a593Smuzhiyun 	io_worker_release(worker);
782*4882a593Smuzhiyun 	schedule_work(&worker->work);
783*4882a593Smuzhiyun }
784*4882a593Smuzhiyun 
io_workqueue_create(struct work_struct * work)785*4882a593Smuzhiyun static void io_workqueue_create(struct work_struct *work)
786*4882a593Smuzhiyun {
787*4882a593Smuzhiyun 	struct io_worker *worker = container_of(work, struct io_worker, work);
788*4882a593Smuzhiyun 	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
789*4882a593Smuzhiyun 
790*4882a593Smuzhiyun 	if (!io_queue_worker_create(worker, acct, create_worker_cont))
791*4882a593Smuzhiyun 		kfree(worker);
792*4882a593Smuzhiyun }
793*4882a593Smuzhiyun 
create_io_worker(struct io_wq * wq,struct io_wqe * wqe,int index)794*4882a593Smuzhiyun static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
795*4882a593Smuzhiyun {
796*4882a593Smuzhiyun 	struct io_wqe_acct *acct = &wqe->acct[index];
797*4882a593Smuzhiyun 	struct io_worker *worker;
798*4882a593Smuzhiyun 	struct task_struct *tsk;
799*4882a593Smuzhiyun 
800*4882a593Smuzhiyun 	__set_current_state(TASK_RUNNING);
801*4882a593Smuzhiyun 
802*4882a593Smuzhiyun 	worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
803*4882a593Smuzhiyun 	if (!worker) {
804*4882a593Smuzhiyun fail:
805*4882a593Smuzhiyun 		atomic_dec(&acct->nr_running);
806*4882a593Smuzhiyun 		raw_spin_lock(&wqe->lock);
807*4882a593Smuzhiyun 		acct->nr_workers--;
808*4882a593Smuzhiyun 		raw_spin_unlock(&wqe->lock);
809*4882a593Smuzhiyun 		io_worker_ref_put(wq);
810*4882a593Smuzhiyun 		return false;
811*4882a593Smuzhiyun 	}
812*4882a593Smuzhiyun 
813*4882a593Smuzhiyun 	refcount_set(&worker->ref, 1);
814*4882a593Smuzhiyun 	worker->wqe = wqe;
815*4882a593Smuzhiyun 	spin_lock_init(&worker->lock);
816*4882a593Smuzhiyun 	init_completion(&worker->ref_done);
817*4882a593Smuzhiyun 
818*4882a593Smuzhiyun 	if (index == IO_WQ_ACCT_BOUND)
819*4882a593Smuzhiyun 		worker->flags |= IO_WORKER_F_BOUND;
820*4882a593Smuzhiyun 
821*4882a593Smuzhiyun 	tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
822*4882a593Smuzhiyun 	if (!IS_ERR(tsk)) {
823*4882a593Smuzhiyun 		io_init_new_worker(wqe, worker, tsk);
824*4882a593Smuzhiyun 	} else if (!io_should_retry_thread(PTR_ERR(tsk))) {
825*4882a593Smuzhiyun 		kfree(worker);
826*4882a593Smuzhiyun 		goto fail;
827*4882a593Smuzhiyun 	} else {
828*4882a593Smuzhiyun 		INIT_WORK(&worker->work, io_workqueue_create);
829*4882a593Smuzhiyun 		schedule_work(&worker->work);
830*4882a593Smuzhiyun 	}
831*4882a593Smuzhiyun 
832*4882a593Smuzhiyun 	return true;
833*4882a593Smuzhiyun }
834*4882a593Smuzhiyun 
835*4882a593Smuzhiyun /*
836*4882a593Smuzhiyun  * Iterate the passed in list and call the specific function for each
837*4882a593Smuzhiyun  * worker that isn't exiting
838*4882a593Smuzhiyun  */
io_wq_for_each_worker(struct io_wqe * wqe,bool (* func)(struct io_worker *,void *),void * data)839*4882a593Smuzhiyun static bool io_wq_for_each_worker(struct io_wqe *wqe,
840*4882a593Smuzhiyun 				  bool (*func)(struct io_worker *, void *),
841*4882a593Smuzhiyun 				  void *data)
842*4882a593Smuzhiyun {
843*4882a593Smuzhiyun 	struct io_worker *worker;
844*4882a593Smuzhiyun 	bool ret = false;
845*4882a593Smuzhiyun 
846*4882a593Smuzhiyun 	list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
847*4882a593Smuzhiyun 		if (io_worker_get(worker)) {
848*4882a593Smuzhiyun 			/* no task if node is/was offline */
849*4882a593Smuzhiyun 			if (worker->task)
850*4882a593Smuzhiyun 				ret = func(worker, data);
851*4882a593Smuzhiyun 			io_worker_release(worker);
852*4882a593Smuzhiyun 			if (ret)
853*4882a593Smuzhiyun 				break;
854*4882a593Smuzhiyun 		}
855*4882a593Smuzhiyun 	}
856*4882a593Smuzhiyun 
857*4882a593Smuzhiyun 	return ret;
858*4882a593Smuzhiyun }
859*4882a593Smuzhiyun 
io_wq_worker_wake(struct io_worker * worker,void * data)860*4882a593Smuzhiyun static bool io_wq_worker_wake(struct io_worker *worker, void *data)
861*4882a593Smuzhiyun {
862*4882a593Smuzhiyun 	set_notify_signal(worker->task);
863*4882a593Smuzhiyun 	wake_up_process(worker->task);
864*4882a593Smuzhiyun 	return false;
865*4882a593Smuzhiyun }
866*4882a593Smuzhiyun 
io_run_cancel(struct io_wq_work * work,struct io_wqe * wqe)867*4882a593Smuzhiyun static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
868*4882a593Smuzhiyun {
869*4882a593Smuzhiyun 	struct io_wq *wq = wqe->wq;
870*4882a593Smuzhiyun 
871*4882a593Smuzhiyun 	do {
872*4882a593Smuzhiyun 		work->flags |= IO_WQ_WORK_CANCEL;
873*4882a593Smuzhiyun 		wq->do_work(work);
874*4882a593Smuzhiyun 		work = wq->free_work(work);
875*4882a593Smuzhiyun 	} while (work);
876*4882a593Smuzhiyun }
877*4882a593Smuzhiyun 
io_wqe_insert_work(struct io_wqe * wqe,struct io_wq_work * work)878*4882a593Smuzhiyun static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
879*4882a593Smuzhiyun {
880*4882a593Smuzhiyun 	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
881*4882a593Smuzhiyun 	unsigned int hash;
882*4882a593Smuzhiyun 	struct io_wq_work *tail;
883*4882a593Smuzhiyun 
884*4882a593Smuzhiyun 	if (!io_wq_is_hashed(work)) {
885*4882a593Smuzhiyun append:
886*4882a593Smuzhiyun 		wq_list_add_tail(&work->list, &acct->work_list);
887*4882a593Smuzhiyun 		return;
888*4882a593Smuzhiyun 	}
889*4882a593Smuzhiyun 
890*4882a593Smuzhiyun 	hash = io_get_work_hash(work);
891*4882a593Smuzhiyun 	tail = wqe->hash_tail[hash];
892*4882a593Smuzhiyun 	wqe->hash_tail[hash] = work;
893*4882a593Smuzhiyun 	if (!tail)
894*4882a593Smuzhiyun 		goto append;
895*4882a593Smuzhiyun 
896*4882a593Smuzhiyun 	wq_list_add_after(&work->list, &tail->list, &acct->work_list);
897*4882a593Smuzhiyun }
898*4882a593Smuzhiyun 
io_wq_work_match_item(struct io_wq_work * work,void * data)899*4882a593Smuzhiyun static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
900*4882a593Smuzhiyun {
901*4882a593Smuzhiyun 	return work == data;
902*4882a593Smuzhiyun }
903*4882a593Smuzhiyun 
io_wqe_enqueue(struct io_wqe * wqe,struct io_wq_work * work)904*4882a593Smuzhiyun static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
905*4882a593Smuzhiyun {
906*4882a593Smuzhiyun 	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
907*4882a593Smuzhiyun 	unsigned work_flags = work->flags;
908*4882a593Smuzhiyun 	bool do_create;
909*4882a593Smuzhiyun 
910*4882a593Smuzhiyun 	/*
911*4882a593Smuzhiyun 	 * If io-wq is exiting for this task, or if the request has explicitly
912*4882a593Smuzhiyun 	 * been marked as one that should not get executed, cancel it here.
913*4882a593Smuzhiyun 	 */
914*4882a593Smuzhiyun 	if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state) ||
915*4882a593Smuzhiyun 	    (work->flags & IO_WQ_WORK_CANCEL)) {
916*4882a593Smuzhiyun 		io_run_cancel(work, wqe);
917*4882a593Smuzhiyun 		return;
918*4882a593Smuzhiyun 	}
919*4882a593Smuzhiyun 
920*4882a593Smuzhiyun 	raw_spin_lock(&wqe->lock);
921*4882a593Smuzhiyun 	io_wqe_insert_work(wqe, work);
922*4882a593Smuzhiyun 	clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
923*4882a593Smuzhiyun 
924*4882a593Smuzhiyun 	rcu_read_lock();
925*4882a593Smuzhiyun 	do_create = !io_wqe_activate_free_worker(wqe, acct);
926*4882a593Smuzhiyun 	rcu_read_unlock();
927*4882a593Smuzhiyun 
928*4882a593Smuzhiyun 	raw_spin_unlock(&wqe->lock);
929*4882a593Smuzhiyun 
930*4882a593Smuzhiyun 	if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
931*4882a593Smuzhiyun 	    !atomic_read(&acct->nr_running))) {
932*4882a593Smuzhiyun 		bool did_create;
933*4882a593Smuzhiyun 
934*4882a593Smuzhiyun 		did_create = io_wqe_create_worker(wqe, acct);
935*4882a593Smuzhiyun 		if (likely(did_create))
936*4882a593Smuzhiyun 			return;
937*4882a593Smuzhiyun 
938*4882a593Smuzhiyun 		raw_spin_lock(&wqe->lock);
939*4882a593Smuzhiyun 		/* fatal condition, failed to create the first worker */
940*4882a593Smuzhiyun 		if (!acct->nr_workers) {
941*4882a593Smuzhiyun 			struct io_cb_cancel_data match = {
942*4882a593Smuzhiyun 				.fn		= io_wq_work_match_item,
943*4882a593Smuzhiyun 				.data		= work,
944*4882a593Smuzhiyun 				.cancel_all	= false,
945*4882a593Smuzhiyun 			};
946*4882a593Smuzhiyun 
947*4882a593Smuzhiyun 			if (io_acct_cancel_pending_work(wqe, acct, &match))
948*4882a593Smuzhiyun 				raw_spin_lock(&wqe->lock);
949*4882a593Smuzhiyun 		}
950*4882a593Smuzhiyun 		raw_spin_unlock(&wqe->lock);
951*4882a593Smuzhiyun 	}
952*4882a593Smuzhiyun }
953*4882a593Smuzhiyun 
io_wq_enqueue(struct io_wq * wq,struct io_wq_work * work)954*4882a593Smuzhiyun void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
955*4882a593Smuzhiyun {
956*4882a593Smuzhiyun 	struct io_wqe *wqe = wq->wqes[numa_node_id()];
957*4882a593Smuzhiyun 
958*4882a593Smuzhiyun 	io_wqe_enqueue(wqe, work);
959*4882a593Smuzhiyun }
960*4882a593Smuzhiyun 
961*4882a593Smuzhiyun /*
962*4882a593Smuzhiyun  * Work items that hash to the same value will not be done in parallel.
963*4882a593Smuzhiyun  * Used to limit concurrent writes, generally hashed by inode.
964*4882a593Smuzhiyun  */
io_wq_hash_work(struct io_wq_work * work,void * val)965*4882a593Smuzhiyun void io_wq_hash_work(struct io_wq_work *work, void *val)
966*4882a593Smuzhiyun {
967*4882a593Smuzhiyun 	unsigned int bit;
968*4882a593Smuzhiyun 
969*4882a593Smuzhiyun 	bit = hash_ptr(val, IO_WQ_HASH_ORDER);
970*4882a593Smuzhiyun 	work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
971*4882a593Smuzhiyun }
972*4882a593Smuzhiyun 
io_wq_worker_cancel(struct io_worker * worker,void * data)973*4882a593Smuzhiyun static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
974*4882a593Smuzhiyun {
975*4882a593Smuzhiyun 	struct io_cb_cancel_data *match = data;
976*4882a593Smuzhiyun 
977*4882a593Smuzhiyun 	/*
978*4882a593Smuzhiyun 	 * Hold the lock to avoid ->cur_work going out of scope, caller
979*4882a593Smuzhiyun 	 * may dereference the passed in work.
980*4882a593Smuzhiyun 	 */
981*4882a593Smuzhiyun 	spin_lock(&worker->lock);
982*4882a593Smuzhiyun 	if (worker->cur_work &&
983*4882a593Smuzhiyun 	    match->fn(worker->cur_work, match->data)) {
984*4882a593Smuzhiyun 		set_notify_signal(worker->task);
985*4882a593Smuzhiyun 		match->nr_running++;
986*4882a593Smuzhiyun 	}
987*4882a593Smuzhiyun 	spin_unlock(&worker->lock);
988*4882a593Smuzhiyun 
989*4882a593Smuzhiyun 	return match->nr_running && !match->cancel_all;
990*4882a593Smuzhiyun }
991*4882a593Smuzhiyun 
io_wqe_remove_pending(struct io_wqe * wqe,struct io_wq_work * work,struct io_wq_work_node * prev)992*4882a593Smuzhiyun static inline void io_wqe_remove_pending(struct io_wqe *wqe,
993*4882a593Smuzhiyun 					 struct io_wq_work *work,
994*4882a593Smuzhiyun 					 struct io_wq_work_node *prev)
995*4882a593Smuzhiyun {
996*4882a593Smuzhiyun 	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
997*4882a593Smuzhiyun 	unsigned int hash = io_get_work_hash(work);
998*4882a593Smuzhiyun 	struct io_wq_work *prev_work = NULL;
999*4882a593Smuzhiyun 
1000*4882a593Smuzhiyun 	if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) {
1001*4882a593Smuzhiyun 		if (prev)
1002*4882a593Smuzhiyun 			prev_work = container_of(prev, struct io_wq_work, list);
1003*4882a593Smuzhiyun 		if (prev_work && io_get_work_hash(prev_work) == hash)
1004*4882a593Smuzhiyun 			wqe->hash_tail[hash] = prev_work;
1005*4882a593Smuzhiyun 		else
1006*4882a593Smuzhiyun 			wqe->hash_tail[hash] = NULL;
1007*4882a593Smuzhiyun 	}
1008*4882a593Smuzhiyun 	wq_list_del(&acct->work_list, &work->list, prev);
1009*4882a593Smuzhiyun }
1010*4882a593Smuzhiyun 
io_acct_cancel_pending_work(struct io_wqe * wqe,struct io_wqe_acct * acct,struct io_cb_cancel_data * match)1011*4882a593Smuzhiyun static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
1012*4882a593Smuzhiyun 					struct io_wqe_acct *acct,
1013*4882a593Smuzhiyun 					struct io_cb_cancel_data *match)
1014*4882a593Smuzhiyun 	__releases(wqe->lock)
1015*4882a593Smuzhiyun {
1016*4882a593Smuzhiyun 	struct io_wq_work_node *node, *prev;
1017*4882a593Smuzhiyun 	struct io_wq_work *work;
1018*4882a593Smuzhiyun 
1019*4882a593Smuzhiyun 	wq_list_for_each(node, prev, &acct->work_list) {
1020*4882a593Smuzhiyun 		work = container_of(node, struct io_wq_work, list);
1021*4882a593Smuzhiyun 		if (!match->fn(work, match->data))
1022*4882a593Smuzhiyun 			continue;
1023*4882a593Smuzhiyun 		io_wqe_remove_pending(wqe, work, prev);
1024*4882a593Smuzhiyun 		raw_spin_unlock(&wqe->lock);
1025*4882a593Smuzhiyun 		io_run_cancel(work, wqe);
1026*4882a593Smuzhiyun 		match->nr_pending++;
1027*4882a593Smuzhiyun 		/* not safe to continue after unlock */
1028*4882a593Smuzhiyun 		return true;
1029*4882a593Smuzhiyun 	}
1030*4882a593Smuzhiyun 
1031*4882a593Smuzhiyun 	return false;
1032*4882a593Smuzhiyun }
1033*4882a593Smuzhiyun 
io_wqe_cancel_pending_work(struct io_wqe * wqe,struct io_cb_cancel_data * match)1034*4882a593Smuzhiyun static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
1035*4882a593Smuzhiyun 				       struct io_cb_cancel_data *match)
1036*4882a593Smuzhiyun {
1037*4882a593Smuzhiyun 	int i;
1038*4882a593Smuzhiyun retry:
1039*4882a593Smuzhiyun 	raw_spin_lock(&wqe->lock);
1040*4882a593Smuzhiyun 	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1041*4882a593Smuzhiyun 		struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
1042*4882a593Smuzhiyun 
1043*4882a593Smuzhiyun 		if (io_acct_cancel_pending_work(wqe, acct, match)) {
1044*4882a593Smuzhiyun 			if (match->cancel_all)
1045*4882a593Smuzhiyun 				goto retry;
1046*4882a593Smuzhiyun 			return;
1047*4882a593Smuzhiyun 		}
1048*4882a593Smuzhiyun 	}
1049*4882a593Smuzhiyun 	raw_spin_unlock(&wqe->lock);
1050*4882a593Smuzhiyun }
1051*4882a593Smuzhiyun 
io_wqe_cancel_running_work(struct io_wqe * wqe,struct io_cb_cancel_data * match)1052*4882a593Smuzhiyun static void io_wqe_cancel_running_work(struct io_wqe *wqe,
1053*4882a593Smuzhiyun 				       struct io_cb_cancel_data *match)
1054*4882a593Smuzhiyun {
1055*4882a593Smuzhiyun 	rcu_read_lock();
1056*4882a593Smuzhiyun 	io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
1057*4882a593Smuzhiyun 	rcu_read_unlock();
1058*4882a593Smuzhiyun }
1059*4882a593Smuzhiyun 
io_wq_cancel_cb(struct io_wq * wq,work_cancel_fn * cancel,void * data,bool cancel_all)1060*4882a593Smuzhiyun enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
1061*4882a593Smuzhiyun 				  void *data, bool cancel_all)
1062*4882a593Smuzhiyun {
1063*4882a593Smuzhiyun 	struct io_cb_cancel_data match = {
1064*4882a593Smuzhiyun 		.fn		= cancel,
1065*4882a593Smuzhiyun 		.data		= data,
1066*4882a593Smuzhiyun 		.cancel_all	= cancel_all,
1067*4882a593Smuzhiyun 	};
1068*4882a593Smuzhiyun 	int node;
1069*4882a593Smuzhiyun 
1070*4882a593Smuzhiyun 	/*
1071*4882a593Smuzhiyun 	 * First check pending list, if we're lucky we can just remove it
1072*4882a593Smuzhiyun 	 * from there. CANCEL_OK means that the work is returned as-new,
1073*4882a593Smuzhiyun 	 * no completion will be posted for it.
1074*4882a593Smuzhiyun 	 */
1075*4882a593Smuzhiyun 	for_each_node(node) {
1076*4882a593Smuzhiyun 		struct io_wqe *wqe = wq->wqes[node];
1077*4882a593Smuzhiyun 
1078*4882a593Smuzhiyun 		io_wqe_cancel_pending_work(wqe, &match);
1079*4882a593Smuzhiyun 		if (match.nr_pending && !match.cancel_all)
1080*4882a593Smuzhiyun 			return IO_WQ_CANCEL_OK;
1081*4882a593Smuzhiyun 	}
1082*4882a593Smuzhiyun 
1083*4882a593Smuzhiyun 	/*
1084*4882a593Smuzhiyun 	 * Now check if a free (going busy) or busy worker has the work
1085*4882a593Smuzhiyun 	 * currently running. If we find it there, we'll return CANCEL_RUNNING
1086*4882a593Smuzhiyun 	 * as an indication that we attempt to signal cancellation. The
1087*4882a593Smuzhiyun 	 * completion will run normally in this case.
1088*4882a593Smuzhiyun 	 */
1089*4882a593Smuzhiyun 	for_each_node(node) {
1090*4882a593Smuzhiyun 		struct io_wqe *wqe = wq->wqes[node];
1091*4882a593Smuzhiyun 
1092*4882a593Smuzhiyun 		io_wqe_cancel_running_work(wqe, &match);
1093*4882a593Smuzhiyun 		if (match.nr_running && !match.cancel_all)
1094*4882a593Smuzhiyun 			return IO_WQ_CANCEL_RUNNING;
1095*4882a593Smuzhiyun 	}
1096*4882a593Smuzhiyun 
1097*4882a593Smuzhiyun 	if (match.nr_running)
1098*4882a593Smuzhiyun 		return IO_WQ_CANCEL_RUNNING;
1099*4882a593Smuzhiyun 	if (match.nr_pending)
1100*4882a593Smuzhiyun 		return IO_WQ_CANCEL_OK;
1101*4882a593Smuzhiyun 	return IO_WQ_CANCEL_NOTFOUND;
1102*4882a593Smuzhiyun }
1103*4882a593Smuzhiyun 
io_wqe_hash_wake(struct wait_queue_entry * wait,unsigned mode,int sync,void * key)1104*4882a593Smuzhiyun static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
1105*4882a593Smuzhiyun 			    int sync, void *key)
1106*4882a593Smuzhiyun {
1107*4882a593Smuzhiyun 	struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
1108*4882a593Smuzhiyun 	int i;
1109*4882a593Smuzhiyun 
1110*4882a593Smuzhiyun 	list_del_init(&wait->entry);
1111*4882a593Smuzhiyun 
1112*4882a593Smuzhiyun 	rcu_read_lock();
1113*4882a593Smuzhiyun 	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1114*4882a593Smuzhiyun 		struct io_wqe_acct *acct = &wqe->acct[i];
1115*4882a593Smuzhiyun 
1116*4882a593Smuzhiyun 		if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
1117*4882a593Smuzhiyun 			io_wqe_activate_free_worker(wqe, acct);
1118*4882a593Smuzhiyun 	}
1119*4882a593Smuzhiyun 	rcu_read_unlock();
1120*4882a593Smuzhiyun 	return 1;
1121*4882a593Smuzhiyun }
1122*4882a593Smuzhiyun 
io_wq_create(unsigned bounded,struct io_wq_data * data)1123*4882a593Smuzhiyun struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
1124*4882a593Smuzhiyun {
1125*4882a593Smuzhiyun 	int ret, node, i;
1126*4882a593Smuzhiyun 	struct io_wq *wq;
1127*4882a593Smuzhiyun 
1128*4882a593Smuzhiyun 	if (WARN_ON_ONCE(!data->free_work || !data->do_work))
1129*4882a593Smuzhiyun 		return ERR_PTR(-EINVAL);
1130*4882a593Smuzhiyun 	if (WARN_ON_ONCE(!bounded))
1131*4882a593Smuzhiyun 		return ERR_PTR(-EINVAL);
1132*4882a593Smuzhiyun 
1133*4882a593Smuzhiyun 	wq = kzalloc(struct_size(wq, wqes, nr_node_ids), GFP_KERNEL);
1134*4882a593Smuzhiyun 	if (!wq)
1135*4882a593Smuzhiyun 		return ERR_PTR(-ENOMEM);
1136*4882a593Smuzhiyun 	ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1137*4882a593Smuzhiyun 	if (ret)
1138*4882a593Smuzhiyun 		goto err_wq;
1139*4882a593Smuzhiyun 
1140*4882a593Smuzhiyun 	refcount_inc(&data->hash->refs);
1141*4882a593Smuzhiyun 	wq->hash = data->hash;
1142*4882a593Smuzhiyun 	wq->free_work = data->free_work;
1143*4882a593Smuzhiyun 	wq->do_work = data->do_work;
1144*4882a593Smuzhiyun 
1145*4882a593Smuzhiyun 	ret = -ENOMEM;
1146*4882a593Smuzhiyun 	for_each_node(node) {
1147*4882a593Smuzhiyun 		struct io_wqe *wqe;
1148*4882a593Smuzhiyun 		int alloc_node = node;
1149*4882a593Smuzhiyun 
1150*4882a593Smuzhiyun 		if (!node_online(alloc_node))
1151*4882a593Smuzhiyun 			alloc_node = NUMA_NO_NODE;
1152*4882a593Smuzhiyun 		wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
1153*4882a593Smuzhiyun 		if (!wqe)
1154*4882a593Smuzhiyun 			goto err;
1155*4882a593Smuzhiyun 		wq->wqes[node] = wqe;
1156*4882a593Smuzhiyun 		if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL))
1157*4882a593Smuzhiyun 			goto err;
1158*4882a593Smuzhiyun 		cpumask_copy(wqe->cpu_mask, cpumask_of_node(node));
1159*4882a593Smuzhiyun 		wqe->node = alloc_node;
1160*4882a593Smuzhiyun 		wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
1161*4882a593Smuzhiyun 		wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
1162*4882a593Smuzhiyun 					task_rlimit(current, RLIMIT_NPROC);
1163*4882a593Smuzhiyun 		INIT_LIST_HEAD(&wqe->wait.entry);
1164*4882a593Smuzhiyun 		wqe->wait.func = io_wqe_hash_wake;
1165*4882a593Smuzhiyun 		for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1166*4882a593Smuzhiyun 			struct io_wqe_acct *acct = &wqe->acct[i];
1167*4882a593Smuzhiyun 
1168*4882a593Smuzhiyun 			acct->index = i;
1169*4882a593Smuzhiyun 			atomic_set(&acct->nr_running, 0);
1170*4882a593Smuzhiyun 			INIT_WQ_LIST(&acct->work_list);
1171*4882a593Smuzhiyun 		}
1172*4882a593Smuzhiyun 		wqe->wq = wq;
1173*4882a593Smuzhiyun 		raw_spin_lock_init(&wqe->lock);
1174*4882a593Smuzhiyun 		INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
1175*4882a593Smuzhiyun 		INIT_LIST_HEAD(&wqe->all_list);
1176*4882a593Smuzhiyun 	}
1177*4882a593Smuzhiyun 
1178*4882a593Smuzhiyun 	wq->task = get_task_struct(data->task);
1179*4882a593Smuzhiyun 	atomic_set(&wq->worker_refs, 1);
1180*4882a593Smuzhiyun 	init_completion(&wq->worker_done);
1181*4882a593Smuzhiyun 	return wq;
1182*4882a593Smuzhiyun err:
1183*4882a593Smuzhiyun 	io_wq_put_hash(data->hash);
1184*4882a593Smuzhiyun 	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1185*4882a593Smuzhiyun 	for_each_node(node) {
1186*4882a593Smuzhiyun 		if (!wq->wqes[node])
1187*4882a593Smuzhiyun 			continue;
1188*4882a593Smuzhiyun 		free_cpumask_var(wq->wqes[node]->cpu_mask);
1189*4882a593Smuzhiyun 		kfree(wq->wqes[node]);
1190*4882a593Smuzhiyun 	}
1191*4882a593Smuzhiyun err_wq:
1192*4882a593Smuzhiyun 	kfree(wq);
1193*4882a593Smuzhiyun 	return ERR_PTR(ret);
1194*4882a593Smuzhiyun }
1195*4882a593Smuzhiyun 
io_task_work_match(struct callback_head * cb,void * data)1196*4882a593Smuzhiyun static bool io_task_work_match(struct callback_head *cb, void *data)
1197*4882a593Smuzhiyun {
1198*4882a593Smuzhiyun 	struct io_worker *worker;
1199*4882a593Smuzhiyun 
1200*4882a593Smuzhiyun 	if (cb->func != create_worker_cb && cb->func != create_worker_cont)
1201*4882a593Smuzhiyun 		return false;
1202*4882a593Smuzhiyun 	worker = container_of(cb, struct io_worker, create_work);
1203*4882a593Smuzhiyun 	return worker->wqe->wq == data;
1204*4882a593Smuzhiyun }
1205*4882a593Smuzhiyun 
io_wq_exit_start(struct io_wq * wq)1206*4882a593Smuzhiyun void io_wq_exit_start(struct io_wq *wq)
1207*4882a593Smuzhiyun {
1208*4882a593Smuzhiyun 	set_bit(IO_WQ_BIT_EXIT, &wq->state);
1209*4882a593Smuzhiyun }
1210*4882a593Smuzhiyun 
io_wq_cancel_tw_create(struct io_wq * wq)1211*4882a593Smuzhiyun static void io_wq_cancel_tw_create(struct io_wq *wq)
1212*4882a593Smuzhiyun {
1213*4882a593Smuzhiyun 	struct callback_head *cb;
1214*4882a593Smuzhiyun 
1215*4882a593Smuzhiyun 	while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
1216*4882a593Smuzhiyun 		struct io_worker *worker;
1217*4882a593Smuzhiyun 
1218*4882a593Smuzhiyun 		worker = container_of(cb, struct io_worker, create_work);
1219*4882a593Smuzhiyun 		io_worker_cancel_cb(worker);
1220*4882a593Smuzhiyun 	}
1221*4882a593Smuzhiyun }
1222*4882a593Smuzhiyun 
io_wq_exit_workers(struct io_wq * wq)1223*4882a593Smuzhiyun static void io_wq_exit_workers(struct io_wq *wq)
1224*4882a593Smuzhiyun {
1225*4882a593Smuzhiyun 	int node;
1226*4882a593Smuzhiyun 
1227*4882a593Smuzhiyun 	if (!wq->task)
1228*4882a593Smuzhiyun 		return;
1229*4882a593Smuzhiyun 
1230*4882a593Smuzhiyun 	io_wq_cancel_tw_create(wq);
1231*4882a593Smuzhiyun 
1232*4882a593Smuzhiyun 	rcu_read_lock();
1233*4882a593Smuzhiyun 	for_each_node(node) {
1234*4882a593Smuzhiyun 		struct io_wqe *wqe = wq->wqes[node];
1235*4882a593Smuzhiyun 
1236*4882a593Smuzhiyun 		io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
1237*4882a593Smuzhiyun 	}
1238*4882a593Smuzhiyun 	rcu_read_unlock();
1239*4882a593Smuzhiyun 	io_worker_ref_put(wq);
1240*4882a593Smuzhiyun 	wait_for_completion(&wq->worker_done);
1241*4882a593Smuzhiyun 
1242*4882a593Smuzhiyun 	for_each_node(node) {
1243*4882a593Smuzhiyun 		spin_lock_irq(&wq->hash->wait.lock);
1244*4882a593Smuzhiyun 		list_del_init(&wq->wqes[node]->wait.entry);
1245*4882a593Smuzhiyun 		spin_unlock_irq(&wq->hash->wait.lock);
1246*4882a593Smuzhiyun 	}
1247*4882a593Smuzhiyun 	put_task_struct(wq->task);
1248*4882a593Smuzhiyun 	wq->task = NULL;
1249*4882a593Smuzhiyun }
1250*4882a593Smuzhiyun 
io_wq_destroy(struct io_wq * wq)1251*4882a593Smuzhiyun static void io_wq_destroy(struct io_wq *wq)
1252*4882a593Smuzhiyun {
1253*4882a593Smuzhiyun 	int node;
1254*4882a593Smuzhiyun 
1255*4882a593Smuzhiyun 	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1256*4882a593Smuzhiyun 
1257*4882a593Smuzhiyun 	for_each_node(node) {
1258*4882a593Smuzhiyun 		struct io_wqe *wqe = wq->wqes[node];
1259*4882a593Smuzhiyun 		struct io_cb_cancel_data match = {
1260*4882a593Smuzhiyun 			.fn		= io_wq_work_match_all,
1261*4882a593Smuzhiyun 			.cancel_all	= true,
1262*4882a593Smuzhiyun 		};
1263*4882a593Smuzhiyun 		io_wqe_cancel_pending_work(wqe, &match);
1264*4882a593Smuzhiyun 		free_cpumask_var(wqe->cpu_mask);
1265*4882a593Smuzhiyun 		kfree(wqe);
1266*4882a593Smuzhiyun 	}
1267*4882a593Smuzhiyun 	io_wq_put_hash(wq->hash);
1268*4882a593Smuzhiyun 	kfree(wq);
1269*4882a593Smuzhiyun }
1270*4882a593Smuzhiyun 
io_wq_put_and_exit(struct io_wq * wq)1271*4882a593Smuzhiyun void io_wq_put_and_exit(struct io_wq *wq)
1272*4882a593Smuzhiyun {
1273*4882a593Smuzhiyun 	WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state));
1274*4882a593Smuzhiyun 
1275*4882a593Smuzhiyun 	io_wq_exit_workers(wq);
1276*4882a593Smuzhiyun 	io_wq_destroy(wq);
1277*4882a593Smuzhiyun }
1278*4882a593Smuzhiyun 
1279*4882a593Smuzhiyun struct online_data {
1280*4882a593Smuzhiyun 	unsigned int cpu;
1281*4882a593Smuzhiyun 	bool online;
1282*4882a593Smuzhiyun };
1283*4882a593Smuzhiyun 
io_wq_worker_affinity(struct io_worker * worker,void * data)1284*4882a593Smuzhiyun static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
1285*4882a593Smuzhiyun {
1286*4882a593Smuzhiyun 	struct online_data *od = data;
1287*4882a593Smuzhiyun 
1288*4882a593Smuzhiyun 	if (od->online)
1289*4882a593Smuzhiyun 		cpumask_set_cpu(od->cpu, worker->wqe->cpu_mask);
1290*4882a593Smuzhiyun 	else
1291*4882a593Smuzhiyun 		cpumask_clear_cpu(od->cpu, worker->wqe->cpu_mask);
1292*4882a593Smuzhiyun 	return false;
1293*4882a593Smuzhiyun }
1294*4882a593Smuzhiyun 
__io_wq_cpu_online(struct io_wq * wq,unsigned int cpu,bool online)1295*4882a593Smuzhiyun static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
1296*4882a593Smuzhiyun {
1297*4882a593Smuzhiyun 	struct online_data od = {
1298*4882a593Smuzhiyun 		.cpu = cpu,
1299*4882a593Smuzhiyun 		.online = online
1300*4882a593Smuzhiyun 	};
1301*4882a593Smuzhiyun 	int i;
1302*4882a593Smuzhiyun 
1303*4882a593Smuzhiyun 	rcu_read_lock();
1304*4882a593Smuzhiyun 	for_each_node(i)
1305*4882a593Smuzhiyun 		io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, &od);
1306*4882a593Smuzhiyun 	rcu_read_unlock();
1307*4882a593Smuzhiyun 	return 0;
1308*4882a593Smuzhiyun }
1309*4882a593Smuzhiyun 
io_wq_cpu_online(unsigned int cpu,struct hlist_node * node)1310*4882a593Smuzhiyun static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node)
1311*4882a593Smuzhiyun {
1312*4882a593Smuzhiyun 	struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
1313*4882a593Smuzhiyun 
1314*4882a593Smuzhiyun 	return __io_wq_cpu_online(wq, cpu, true);
1315*4882a593Smuzhiyun }
1316*4882a593Smuzhiyun 
io_wq_cpu_offline(unsigned int cpu,struct hlist_node * node)1317*4882a593Smuzhiyun static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
1318*4882a593Smuzhiyun {
1319*4882a593Smuzhiyun 	struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
1320*4882a593Smuzhiyun 
1321*4882a593Smuzhiyun 	return __io_wq_cpu_online(wq, cpu, false);
1322*4882a593Smuzhiyun }
1323*4882a593Smuzhiyun 
io_wq_cpu_affinity(struct io_wq * wq,cpumask_var_t mask)1324*4882a593Smuzhiyun int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
1325*4882a593Smuzhiyun {
1326*4882a593Smuzhiyun 	int i;
1327*4882a593Smuzhiyun 
1328*4882a593Smuzhiyun 	rcu_read_lock();
1329*4882a593Smuzhiyun 	for_each_node(i) {
1330*4882a593Smuzhiyun 		struct io_wqe *wqe = wq->wqes[i];
1331*4882a593Smuzhiyun 
1332*4882a593Smuzhiyun 		if (mask)
1333*4882a593Smuzhiyun 			cpumask_copy(wqe->cpu_mask, mask);
1334*4882a593Smuzhiyun 		else
1335*4882a593Smuzhiyun 			cpumask_copy(wqe->cpu_mask, cpumask_of_node(i));
1336*4882a593Smuzhiyun 	}
1337*4882a593Smuzhiyun 	rcu_read_unlock();
1338*4882a593Smuzhiyun 	return 0;
1339*4882a593Smuzhiyun }
1340*4882a593Smuzhiyun 
1341*4882a593Smuzhiyun /*
1342*4882a593Smuzhiyun  * Set max number of unbounded workers, returns old value. If new_count is 0,
1343*4882a593Smuzhiyun  * then just return the old value.
1344*4882a593Smuzhiyun  */
io_wq_max_workers(struct io_wq * wq,int * new_count)1345*4882a593Smuzhiyun int io_wq_max_workers(struct io_wq *wq, int *new_count)
1346*4882a593Smuzhiyun {
1347*4882a593Smuzhiyun 	int prev[IO_WQ_ACCT_NR];
1348*4882a593Smuzhiyun 	bool first_node = true;
1349*4882a593Smuzhiyun 	int i, node;
1350*4882a593Smuzhiyun 
1351*4882a593Smuzhiyun 	BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND   != (int) IO_WQ_BOUND);
1352*4882a593Smuzhiyun 	BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
1353*4882a593Smuzhiyun 	BUILD_BUG_ON((int) IO_WQ_ACCT_NR      != 2);
1354*4882a593Smuzhiyun 
1355*4882a593Smuzhiyun 	for (i = 0; i < 2; i++) {
1356*4882a593Smuzhiyun 		if (new_count[i] > task_rlimit(current, RLIMIT_NPROC))
1357*4882a593Smuzhiyun 			new_count[i] = task_rlimit(current, RLIMIT_NPROC);
1358*4882a593Smuzhiyun 	}
1359*4882a593Smuzhiyun 
1360*4882a593Smuzhiyun 	for (i = 0; i < IO_WQ_ACCT_NR; i++)
1361*4882a593Smuzhiyun 		prev[i] = 0;
1362*4882a593Smuzhiyun 
1363*4882a593Smuzhiyun 	rcu_read_lock();
1364*4882a593Smuzhiyun 	for_each_node(node) {
1365*4882a593Smuzhiyun 		struct io_wqe *wqe = wq->wqes[node];
1366*4882a593Smuzhiyun 		struct io_wqe_acct *acct;
1367*4882a593Smuzhiyun 
1368*4882a593Smuzhiyun 		raw_spin_lock(&wqe->lock);
1369*4882a593Smuzhiyun 		for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1370*4882a593Smuzhiyun 			acct = &wqe->acct[i];
1371*4882a593Smuzhiyun 			if (first_node)
1372*4882a593Smuzhiyun 				prev[i] = max_t(int, acct->max_workers, prev[i]);
1373*4882a593Smuzhiyun 			if (new_count[i])
1374*4882a593Smuzhiyun 				acct->max_workers = new_count[i];
1375*4882a593Smuzhiyun 		}
1376*4882a593Smuzhiyun 		raw_spin_unlock(&wqe->lock);
1377*4882a593Smuzhiyun 		first_node = false;
1378*4882a593Smuzhiyun 	}
1379*4882a593Smuzhiyun 	rcu_read_unlock();
1380*4882a593Smuzhiyun 
1381*4882a593Smuzhiyun 	for (i = 0; i < IO_WQ_ACCT_NR; i++)
1382*4882a593Smuzhiyun 		new_count[i] = prev[i];
1383*4882a593Smuzhiyun 
1384*4882a593Smuzhiyun 	return 0;
1385*4882a593Smuzhiyun }
1386*4882a593Smuzhiyun 
io_wq_init(void)1387*4882a593Smuzhiyun static __init int io_wq_init(void)
1388*4882a593Smuzhiyun {
1389*4882a593Smuzhiyun 	int ret;
1390*4882a593Smuzhiyun 
1391*4882a593Smuzhiyun 	ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online",
1392*4882a593Smuzhiyun 					io_wq_cpu_online, io_wq_cpu_offline);
1393*4882a593Smuzhiyun 	if (ret < 0)
1394*4882a593Smuzhiyun 		return ret;
1395*4882a593Smuzhiyun 	io_wq_online = ret;
1396*4882a593Smuzhiyun 	return 0;
1397*4882a593Smuzhiyun }
1398*4882a593Smuzhiyun subsys_initcall(io_wq_init);
1399