xref: /OK3568_Linux_fs/kernel/net/sunrpc/xprtrdma/svc_rdma_sendto.c (revision 4882a59341e53eb6f0b4789bf948001014eff981)
1*4882a593Smuzhiyun // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
2*4882a593Smuzhiyun /*
3*4882a593Smuzhiyun  * Copyright (c) 2016-2018 Oracle. All rights reserved.
4*4882a593Smuzhiyun  * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
5*4882a593Smuzhiyun  * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
6*4882a593Smuzhiyun  *
7*4882a593Smuzhiyun  * This software is available to you under a choice of one of two
8*4882a593Smuzhiyun  * licenses.  You may choose to be licensed under the terms of the GNU
9*4882a593Smuzhiyun  * General Public License (GPL) Version 2, available from the file
10*4882a593Smuzhiyun  * COPYING in the main directory of this source tree, or the BSD-type
11*4882a593Smuzhiyun  * license below:
12*4882a593Smuzhiyun  *
13*4882a593Smuzhiyun  * Redistribution and use in source and binary forms, with or without
14*4882a593Smuzhiyun  * modification, are permitted provided that the following conditions
15*4882a593Smuzhiyun  * are met:
16*4882a593Smuzhiyun  *
17*4882a593Smuzhiyun  *      Redistributions of source code must retain the above copyright
18*4882a593Smuzhiyun  *      notice, this list of conditions and the following disclaimer.
19*4882a593Smuzhiyun  *
20*4882a593Smuzhiyun  *      Redistributions in binary form must reproduce the above
21*4882a593Smuzhiyun  *      copyright notice, this list of conditions and the following
22*4882a593Smuzhiyun  *      disclaimer in the documentation and/or other materials provided
23*4882a593Smuzhiyun  *      with the distribution.
24*4882a593Smuzhiyun  *
25*4882a593Smuzhiyun  *      Neither the name of the Network Appliance, Inc. nor the names of
26*4882a593Smuzhiyun  *      its contributors may be used to endorse or promote products
27*4882a593Smuzhiyun  *      derived from this software without specific prior written
28*4882a593Smuzhiyun  *      permission.
29*4882a593Smuzhiyun  *
30*4882a593Smuzhiyun  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
31*4882a593Smuzhiyun  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
32*4882a593Smuzhiyun  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
33*4882a593Smuzhiyun  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
34*4882a593Smuzhiyun  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
35*4882a593Smuzhiyun  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
36*4882a593Smuzhiyun  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
37*4882a593Smuzhiyun  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
38*4882a593Smuzhiyun  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
39*4882a593Smuzhiyun  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
40*4882a593Smuzhiyun  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41*4882a593Smuzhiyun  *
42*4882a593Smuzhiyun  * Author: Tom Tucker <tom@opengridcomputing.com>
43*4882a593Smuzhiyun  */
44*4882a593Smuzhiyun 
45*4882a593Smuzhiyun /* Operation
46*4882a593Smuzhiyun  *
47*4882a593Smuzhiyun  * The main entry point is svc_rdma_sendto. This is called by the
48*4882a593Smuzhiyun  * RPC server when an RPC Reply is ready to be transmitted to a client.
49*4882a593Smuzhiyun  *
50*4882a593Smuzhiyun  * The passed-in svc_rqst contains a struct xdr_buf which holds an
51*4882a593Smuzhiyun  * XDR-encoded RPC Reply message. sendto must construct the RPC-over-RDMA
52*4882a593Smuzhiyun  * transport header, post all Write WRs needed for this Reply, then post
53*4882a593Smuzhiyun  * a Send WR conveying the transport header and the RPC message itself to
54*4882a593Smuzhiyun  * the client.
55*4882a593Smuzhiyun  *
56*4882a593Smuzhiyun  * svc_rdma_sendto must fully transmit the Reply before returning, as
57*4882a593Smuzhiyun  * the svc_rqst will be recycled as soon as sendto returns. Remaining
58*4882a593Smuzhiyun  * resources referred to by the svc_rqst are also recycled at that time.
59*4882a593Smuzhiyun  * Therefore any resources that must remain longer must be detached
60*4882a593Smuzhiyun  * from the svc_rqst and released later.
61*4882a593Smuzhiyun  *
62*4882a593Smuzhiyun  * Page Management
63*4882a593Smuzhiyun  *
64*4882a593Smuzhiyun  * The I/O that performs Reply transmission is asynchronous, and may
65*4882a593Smuzhiyun  * complete well after sendto returns. Thus pages under I/O must be
66*4882a593Smuzhiyun  * removed from the svc_rqst before sendto returns.
67*4882a593Smuzhiyun  *
68*4882a593Smuzhiyun  * The logic here depends on Send Queue and completion ordering. Since
69*4882a593Smuzhiyun  * the Send WR is always posted last, it will always complete last. Thus
70*4882a593Smuzhiyun  * when it completes, it is guaranteed that all previous Write WRs have
71*4882a593Smuzhiyun  * also completed.
72*4882a593Smuzhiyun  *
73*4882a593Smuzhiyun  * Write WRs are constructed and posted. Each Write segment gets its own
74*4882a593Smuzhiyun  * svc_rdma_rw_ctxt, allowing the Write completion handler to find and
75*4882a593Smuzhiyun  * DMA-unmap the pages under I/O for that Write segment. The Write
76*4882a593Smuzhiyun  * completion handler does not release any pages.
77*4882a593Smuzhiyun  *
78*4882a593Smuzhiyun  * When the Send WR is constructed, it also gets its own svc_rdma_send_ctxt.
79*4882a593Smuzhiyun  * The ownership of all of the Reply's pages are transferred into that
80*4882a593Smuzhiyun  * ctxt, the Send WR is posted, and sendto returns.
81*4882a593Smuzhiyun  *
82*4882a593Smuzhiyun  * The svc_rdma_send_ctxt is presented when the Send WR completes. The
83*4882a593Smuzhiyun  * Send completion handler finally releases the Reply's pages.
84*4882a593Smuzhiyun  *
85*4882a593Smuzhiyun  * This mechanism also assumes that completions on the transport's Send
86*4882a593Smuzhiyun  * Completion Queue do not run in parallel. Otherwise a Write completion
87*4882a593Smuzhiyun  * and Send completion running at the same time could release pages that
88*4882a593Smuzhiyun  * are still DMA-mapped.
89*4882a593Smuzhiyun  *
90*4882a593Smuzhiyun  * Error Handling
91*4882a593Smuzhiyun  *
92*4882a593Smuzhiyun  * - If the Send WR is posted successfully, it will either complete
93*4882a593Smuzhiyun  *   successfully, or get flushed. Either way, the Send completion
94*4882a593Smuzhiyun  *   handler releases the Reply's pages.
95*4882a593Smuzhiyun  * - If the Send WR cannot be not posted, the forward path releases
96*4882a593Smuzhiyun  *   the Reply's pages.
97*4882a593Smuzhiyun  *
98*4882a593Smuzhiyun  * This handles the case, without the use of page reference counting,
99*4882a593Smuzhiyun  * where two different Write segments send portions of the same page.
100*4882a593Smuzhiyun  */
101*4882a593Smuzhiyun 
102*4882a593Smuzhiyun #include <linux/spinlock.h>
103*4882a593Smuzhiyun #include <asm/unaligned.h>
104*4882a593Smuzhiyun 
105*4882a593Smuzhiyun #include <rdma/ib_verbs.h>
106*4882a593Smuzhiyun #include <rdma/rdma_cm.h>
107*4882a593Smuzhiyun 
108*4882a593Smuzhiyun #include <linux/sunrpc/debug.h>
109*4882a593Smuzhiyun #include <linux/sunrpc/svc_rdma.h>
110*4882a593Smuzhiyun 
111*4882a593Smuzhiyun #include "xprt_rdma.h"
112*4882a593Smuzhiyun #include <trace/events/rpcrdma.h>
113*4882a593Smuzhiyun 
114*4882a593Smuzhiyun #define RPCDBG_FACILITY	RPCDBG_SVCXPRT
115*4882a593Smuzhiyun 
116*4882a593Smuzhiyun static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc);
117*4882a593Smuzhiyun 
118*4882a593Smuzhiyun static inline struct svc_rdma_send_ctxt *
svc_rdma_next_send_ctxt(struct list_head * list)119*4882a593Smuzhiyun svc_rdma_next_send_ctxt(struct list_head *list)
120*4882a593Smuzhiyun {
121*4882a593Smuzhiyun 	return list_first_entry_or_null(list, struct svc_rdma_send_ctxt,
122*4882a593Smuzhiyun 					sc_list);
123*4882a593Smuzhiyun }
124*4882a593Smuzhiyun 
svc_rdma_send_cid_init(struct svcxprt_rdma * rdma,struct rpc_rdma_cid * cid)125*4882a593Smuzhiyun static void svc_rdma_send_cid_init(struct svcxprt_rdma *rdma,
126*4882a593Smuzhiyun 				   struct rpc_rdma_cid *cid)
127*4882a593Smuzhiyun {
128*4882a593Smuzhiyun 	cid->ci_queue_id = rdma->sc_sq_cq->res.id;
129*4882a593Smuzhiyun 	cid->ci_completion_id = atomic_inc_return(&rdma->sc_completion_ids);
130*4882a593Smuzhiyun }
131*4882a593Smuzhiyun 
132*4882a593Smuzhiyun static struct svc_rdma_send_ctxt *
svc_rdma_send_ctxt_alloc(struct svcxprt_rdma * rdma)133*4882a593Smuzhiyun svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma)
134*4882a593Smuzhiyun {
135*4882a593Smuzhiyun 	struct svc_rdma_send_ctxt *ctxt;
136*4882a593Smuzhiyun 	dma_addr_t addr;
137*4882a593Smuzhiyun 	void *buffer;
138*4882a593Smuzhiyun 	size_t size;
139*4882a593Smuzhiyun 	int i;
140*4882a593Smuzhiyun 
141*4882a593Smuzhiyun 	size = sizeof(*ctxt);
142*4882a593Smuzhiyun 	size += rdma->sc_max_send_sges * sizeof(struct ib_sge);
143*4882a593Smuzhiyun 	ctxt = kmalloc(size, GFP_KERNEL);
144*4882a593Smuzhiyun 	if (!ctxt)
145*4882a593Smuzhiyun 		goto fail0;
146*4882a593Smuzhiyun 	buffer = kmalloc(rdma->sc_max_req_size, GFP_KERNEL);
147*4882a593Smuzhiyun 	if (!buffer)
148*4882a593Smuzhiyun 		goto fail1;
149*4882a593Smuzhiyun 	addr = ib_dma_map_single(rdma->sc_pd->device, buffer,
150*4882a593Smuzhiyun 				 rdma->sc_max_req_size, DMA_TO_DEVICE);
151*4882a593Smuzhiyun 	if (ib_dma_mapping_error(rdma->sc_pd->device, addr))
152*4882a593Smuzhiyun 		goto fail2;
153*4882a593Smuzhiyun 
154*4882a593Smuzhiyun 	svc_rdma_send_cid_init(rdma, &ctxt->sc_cid);
155*4882a593Smuzhiyun 
156*4882a593Smuzhiyun 	ctxt->sc_send_wr.next = NULL;
157*4882a593Smuzhiyun 	ctxt->sc_send_wr.wr_cqe = &ctxt->sc_cqe;
158*4882a593Smuzhiyun 	ctxt->sc_send_wr.sg_list = ctxt->sc_sges;
159*4882a593Smuzhiyun 	ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED;
160*4882a593Smuzhiyun 	ctxt->sc_cqe.done = svc_rdma_wc_send;
161*4882a593Smuzhiyun 	ctxt->sc_xprt_buf = buffer;
162*4882a593Smuzhiyun 	xdr_buf_init(&ctxt->sc_hdrbuf, ctxt->sc_xprt_buf,
163*4882a593Smuzhiyun 		     rdma->sc_max_req_size);
164*4882a593Smuzhiyun 	ctxt->sc_sges[0].addr = addr;
165*4882a593Smuzhiyun 
166*4882a593Smuzhiyun 	for (i = 0; i < rdma->sc_max_send_sges; i++)
167*4882a593Smuzhiyun 		ctxt->sc_sges[i].lkey = rdma->sc_pd->local_dma_lkey;
168*4882a593Smuzhiyun 	return ctxt;
169*4882a593Smuzhiyun 
170*4882a593Smuzhiyun fail2:
171*4882a593Smuzhiyun 	kfree(buffer);
172*4882a593Smuzhiyun fail1:
173*4882a593Smuzhiyun 	kfree(ctxt);
174*4882a593Smuzhiyun fail0:
175*4882a593Smuzhiyun 	return NULL;
176*4882a593Smuzhiyun }
177*4882a593Smuzhiyun 
178*4882a593Smuzhiyun /**
179*4882a593Smuzhiyun  * svc_rdma_send_ctxts_destroy - Release all send_ctxt's for an xprt
180*4882a593Smuzhiyun  * @rdma: svcxprt_rdma being torn down
181*4882a593Smuzhiyun  *
182*4882a593Smuzhiyun  */
svc_rdma_send_ctxts_destroy(struct svcxprt_rdma * rdma)183*4882a593Smuzhiyun void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma)
184*4882a593Smuzhiyun {
185*4882a593Smuzhiyun 	struct svc_rdma_send_ctxt *ctxt;
186*4882a593Smuzhiyun 
187*4882a593Smuzhiyun 	while ((ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts))) {
188*4882a593Smuzhiyun 		list_del(&ctxt->sc_list);
189*4882a593Smuzhiyun 		ib_dma_unmap_single(rdma->sc_pd->device,
190*4882a593Smuzhiyun 				    ctxt->sc_sges[0].addr,
191*4882a593Smuzhiyun 				    rdma->sc_max_req_size,
192*4882a593Smuzhiyun 				    DMA_TO_DEVICE);
193*4882a593Smuzhiyun 		kfree(ctxt->sc_xprt_buf);
194*4882a593Smuzhiyun 		kfree(ctxt);
195*4882a593Smuzhiyun 	}
196*4882a593Smuzhiyun }
197*4882a593Smuzhiyun 
198*4882a593Smuzhiyun /**
199*4882a593Smuzhiyun  * svc_rdma_send_ctxt_get - Get a free send_ctxt
200*4882a593Smuzhiyun  * @rdma: controlling svcxprt_rdma
201*4882a593Smuzhiyun  *
202*4882a593Smuzhiyun  * Returns a ready-to-use send_ctxt, or NULL if none are
203*4882a593Smuzhiyun  * available and a fresh one cannot be allocated.
204*4882a593Smuzhiyun  */
svc_rdma_send_ctxt_get(struct svcxprt_rdma * rdma)205*4882a593Smuzhiyun struct svc_rdma_send_ctxt *svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma)
206*4882a593Smuzhiyun {
207*4882a593Smuzhiyun 	struct svc_rdma_send_ctxt *ctxt;
208*4882a593Smuzhiyun 
209*4882a593Smuzhiyun 	spin_lock(&rdma->sc_send_lock);
210*4882a593Smuzhiyun 	ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts);
211*4882a593Smuzhiyun 	if (!ctxt)
212*4882a593Smuzhiyun 		goto out_empty;
213*4882a593Smuzhiyun 	list_del(&ctxt->sc_list);
214*4882a593Smuzhiyun 	spin_unlock(&rdma->sc_send_lock);
215*4882a593Smuzhiyun 
216*4882a593Smuzhiyun out:
217*4882a593Smuzhiyun 	rpcrdma_set_xdrlen(&ctxt->sc_hdrbuf, 0);
218*4882a593Smuzhiyun 	xdr_init_encode(&ctxt->sc_stream, &ctxt->sc_hdrbuf,
219*4882a593Smuzhiyun 			ctxt->sc_xprt_buf, NULL);
220*4882a593Smuzhiyun 
221*4882a593Smuzhiyun 	ctxt->sc_send_wr.num_sge = 0;
222*4882a593Smuzhiyun 	ctxt->sc_cur_sge_no = 0;
223*4882a593Smuzhiyun 	ctxt->sc_page_count = 0;
224*4882a593Smuzhiyun 	return ctxt;
225*4882a593Smuzhiyun 
226*4882a593Smuzhiyun out_empty:
227*4882a593Smuzhiyun 	spin_unlock(&rdma->sc_send_lock);
228*4882a593Smuzhiyun 	ctxt = svc_rdma_send_ctxt_alloc(rdma);
229*4882a593Smuzhiyun 	if (!ctxt)
230*4882a593Smuzhiyun 		return NULL;
231*4882a593Smuzhiyun 	goto out;
232*4882a593Smuzhiyun }
233*4882a593Smuzhiyun 
234*4882a593Smuzhiyun /**
235*4882a593Smuzhiyun  * svc_rdma_send_ctxt_put - Return send_ctxt to free list
236*4882a593Smuzhiyun  * @rdma: controlling svcxprt_rdma
237*4882a593Smuzhiyun  * @ctxt: object to return to the free list
238*4882a593Smuzhiyun  *
239*4882a593Smuzhiyun  * Pages left in sc_pages are DMA unmapped and released.
240*4882a593Smuzhiyun  */
svc_rdma_send_ctxt_put(struct svcxprt_rdma * rdma,struct svc_rdma_send_ctxt * ctxt)241*4882a593Smuzhiyun void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma,
242*4882a593Smuzhiyun 			    struct svc_rdma_send_ctxt *ctxt)
243*4882a593Smuzhiyun {
244*4882a593Smuzhiyun 	struct ib_device *device = rdma->sc_cm_id->device;
245*4882a593Smuzhiyun 	unsigned int i;
246*4882a593Smuzhiyun 
247*4882a593Smuzhiyun 	/* The first SGE contains the transport header, which
248*4882a593Smuzhiyun 	 * remains mapped until @ctxt is destroyed.
249*4882a593Smuzhiyun 	 */
250*4882a593Smuzhiyun 	for (i = 1; i < ctxt->sc_send_wr.num_sge; i++) {
251*4882a593Smuzhiyun 		ib_dma_unmap_page(device,
252*4882a593Smuzhiyun 				  ctxt->sc_sges[i].addr,
253*4882a593Smuzhiyun 				  ctxt->sc_sges[i].length,
254*4882a593Smuzhiyun 				  DMA_TO_DEVICE);
255*4882a593Smuzhiyun 		trace_svcrdma_dma_unmap_page(rdma,
256*4882a593Smuzhiyun 					     ctxt->sc_sges[i].addr,
257*4882a593Smuzhiyun 					     ctxt->sc_sges[i].length);
258*4882a593Smuzhiyun 	}
259*4882a593Smuzhiyun 
260*4882a593Smuzhiyun 	for (i = 0; i < ctxt->sc_page_count; ++i)
261*4882a593Smuzhiyun 		put_page(ctxt->sc_pages[i]);
262*4882a593Smuzhiyun 
263*4882a593Smuzhiyun 	spin_lock(&rdma->sc_send_lock);
264*4882a593Smuzhiyun 	list_add(&ctxt->sc_list, &rdma->sc_send_ctxts);
265*4882a593Smuzhiyun 	spin_unlock(&rdma->sc_send_lock);
266*4882a593Smuzhiyun }
267*4882a593Smuzhiyun 
268*4882a593Smuzhiyun /**
269*4882a593Smuzhiyun  * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
270*4882a593Smuzhiyun  * @cq: Completion Queue context
271*4882a593Smuzhiyun  * @wc: Work Completion object
272*4882a593Smuzhiyun  *
273*4882a593Smuzhiyun  * NB: The svc_xprt/svcxprt_rdma is pinned whenever it's possible that
274*4882a593Smuzhiyun  * the Send completion handler could be running.
275*4882a593Smuzhiyun  */
svc_rdma_wc_send(struct ib_cq * cq,struct ib_wc * wc)276*4882a593Smuzhiyun static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
277*4882a593Smuzhiyun {
278*4882a593Smuzhiyun 	struct svcxprt_rdma *rdma = cq->cq_context;
279*4882a593Smuzhiyun 	struct ib_cqe *cqe = wc->wr_cqe;
280*4882a593Smuzhiyun 	struct svc_rdma_send_ctxt *ctxt =
281*4882a593Smuzhiyun 		container_of(cqe, struct svc_rdma_send_ctxt, sc_cqe);
282*4882a593Smuzhiyun 
283*4882a593Smuzhiyun 	trace_svcrdma_wc_send(wc, &ctxt->sc_cid);
284*4882a593Smuzhiyun 
285*4882a593Smuzhiyun 	atomic_inc(&rdma->sc_sq_avail);
286*4882a593Smuzhiyun 	wake_up(&rdma->sc_send_wait);
287*4882a593Smuzhiyun 
288*4882a593Smuzhiyun 	svc_rdma_send_ctxt_put(rdma, ctxt);
289*4882a593Smuzhiyun 
290*4882a593Smuzhiyun 	if (unlikely(wc->status != IB_WC_SUCCESS)) {
291*4882a593Smuzhiyun 		set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
292*4882a593Smuzhiyun 		svc_xprt_enqueue(&rdma->sc_xprt);
293*4882a593Smuzhiyun 	}
294*4882a593Smuzhiyun }
295*4882a593Smuzhiyun 
296*4882a593Smuzhiyun /**
297*4882a593Smuzhiyun  * svc_rdma_send - Post a single Send WR
298*4882a593Smuzhiyun  * @rdma: transport on which to post the WR
299*4882a593Smuzhiyun  * @ctxt: send ctxt with a Send WR ready to post
300*4882a593Smuzhiyun  *
301*4882a593Smuzhiyun  * Returns zero the Send WR was posted successfully. Otherwise, a
302*4882a593Smuzhiyun  * negative errno is returned.
303*4882a593Smuzhiyun  */
svc_rdma_send(struct svcxprt_rdma * rdma,struct svc_rdma_send_ctxt * ctxt)304*4882a593Smuzhiyun int svc_rdma_send(struct svcxprt_rdma *rdma, struct svc_rdma_send_ctxt *ctxt)
305*4882a593Smuzhiyun {
306*4882a593Smuzhiyun 	struct ib_send_wr *wr = &ctxt->sc_send_wr;
307*4882a593Smuzhiyun 	int ret;
308*4882a593Smuzhiyun 
309*4882a593Smuzhiyun 	might_sleep();
310*4882a593Smuzhiyun 
311*4882a593Smuzhiyun 	/* Sync the transport header buffer */
312*4882a593Smuzhiyun 	ib_dma_sync_single_for_device(rdma->sc_pd->device,
313*4882a593Smuzhiyun 				      wr->sg_list[0].addr,
314*4882a593Smuzhiyun 				      wr->sg_list[0].length,
315*4882a593Smuzhiyun 				      DMA_TO_DEVICE);
316*4882a593Smuzhiyun 
317*4882a593Smuzhiyun 	/* If the SQ is full, wait until an SQ entry is available */
318*4882a593Smuzhiyun 	while (1) {
319*4882a593Smuzhiyun 		if ((atomic_dec_return(&rdma->sc_sq_avail) < 0)) {
320*4882a593Smuzhiyun 			atomic_inc(&rdma_stat_sq_starve);
321*4882a593Smuzhiyun 			trace_svcrdma_sq_full(rdma);
322*4882a593Smuzhiyun 			atomic_inc(&rdma->sc_sq_avail);
323*4882a593Smuzhiyun 			wait_event(rdma->sc_send_wait,
324*4882a593Smuzhiyun 				   atomic_read(&rdma->sc_sq_avail) > 1);
325*4882a593Smuzhiyun 			if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags))
326*4882a593Smuzhiyun 				return -ENOTCONN;
327*4882a593Smuzhiyun 			trace_svcrdma_sq_retry(rdma);
328*4882a593Smuzhiyun 			continue;
329*4882a593Smuzhiyun 		}
330*4882a593Smuzhiyun 
331*4882a593Smuzhiyun 		trace_svcrdma_post_send(ctxt);
332*4882a593Smuzhiyun 		ret = ib_post_send(rdma->sc_qp, wr, NULL);
333*4882a593Smuzhiyun 		if (ret)
334*4882a593Smuzhiyun 			break;
335*4882a593Smuzhiyun 		return 0;
336*4882a593Smuzhiyun 	}
337*4882a593Smuzhiyun 
338*4882a593Smuzhiyun 	trace_svcrdma_sq_post_err(rdma, ret);
339*4882a593Smuzhiyun 	set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
340*4882a593Smuzhiyun 	wake_up(&rdma->sc_send_wait);
341*4882a593Smuzhiyun 	return ret;
342*4882a593Smuzhiyun }
343*4882a593Smuzhiyun 
344*4882a593Smuzhiyun /**
345*4882a593Smuzhiyun  * svc_rdma_encode_read_list - Encode RPC Reply's Read chunk list
346*4882a593Smuzhiyun  * @sctxt: Send context for the RPC Reply
347*4882a593Smuzhiyun  *
348*4882a593Smuzhiyun  * Return values:
349*4882a593Smuzhiyun  *   On success, returns length in bytes of the Reply XDR buffer
350*4882a593Smuzhiyun  *   that was consumed by the Reply Read list
351*4882a593Smuzhiyun  *   %-EMSGSIZE on XDR buffer overflow
352*4882a593Smuzhiyun  */
svc_rdma_encode_read_list(struct svc_rdma_send_ctxt * sctxt)353*4882a593Smuzhiyun static ssize_t svc_rdma_encode_read_list(struct svc_rdma_send_ctxt *sctxt)
354*4882a593Smuzhiyun {
355*4882a593Smuzhiyun 	/* RPC-over-RDMA version 1 replies never have a Read list. */
356*4882a593Smuzhiyun 	return xdr_stream_encode_item_absent(&sctxt->sc_stream);
357*4882a593Smuzhiyun }
358*4882a593Smuzhiyun 
359*4882a593Smuzhiyun /**
360*4882a593Smuzhiyun  * svc_rdma_encode_write_segment - Encode one Write segment
361*4882a593Smuzhiyun  * @src: matching Write chunk in the RPC Call header
362*4882a593Smuzhiyun  * @sctxt: Send context for the RPC Reply
363*4882a593Smuzhiyun  * @remaining: remaining bytes of the payload left in the Write chunk
364*4882a593Smuzhiyun  *
365*4882a593Smuzhiyun  * Return values:
366*4882a593Smuzhiyun  *   On success, returns length in bytes of the Reply XDR buffer
367*4882a593Smuzhiyun  *   that was consumed by the Write segment
368*4882a593Smuzhiyun  *   %-EMSGSIZE on XDR buffer overflow
369*4882a593Smuzhiyun  */
svc_rdma_encode_write_segment(__be32 * src,struct svc_rdma_send_ctxt * sctxt,unsigned int * remaining)370*4882a593Smuzhiyun static ssize_t svc_rdma_encode_write_segment(__be32 *src,
371*4882a593Smuzhiyun 					     struct svc_rdma_send_ctxt *sctxt,
372*4882a593Smuzhiyun 					     unsigned int *remaining)
373*4882a593Smuzhiyun {
374*4882a593Smuzhiyun 	__be32 *p;
375*4882a593Smuzhiyun 	const size_t len = rpcrdma_segment_maxsz * sizeof(*p);
376*4882a593Smuzhiyun 	u32 handle, length;
377*4882a593Smuzhiyun 	u64 offset;
378*4882a593Smuzhiyun 
379*4882a593Smuzhiyun 	p = xdr_reserve_space(&sctxt->sc_stream, len);
380*4882a593Smuzhiyun 	if (!p)
381*4882a593Smuzhiyun 		return -EMSGSIZE;
382*4882a593Smuzhiyun 
383*4882a593Smuzhiyun 	xdr_decode_rdma_segment(src, &handle, &length, &offset);
384*4882a593Smuzhiyun 
385*4882a593Smuzhiyun 	if (*remaining < length) {
386*4882a593Smuzhiyun 		/* segment only partly filled */
387*4882a593Smuzhiyun 		length = *remaining;
388*4882a593Smuzhiyun 		*remaining = 0;
389*4882a593Smuzhiyun 	} else {
390*4882a593Smuzhiyun 		/* entire segment was consumed */
391*4882a593Smuzhiyun 		*remaining -= length;
392*4882a593Smuzhiyun 	}
393*4882a593Smuzhiyun 	xdr_encode_rdma_segment(p, handle, length, offset);
394*4882a593Smuzhiyun 
395*4882a593Smuzhiyun 	trace_svcrdma_encode_wseg(handle, length, offset);
396*4882a593Smuzhiyun 	return len;
397*4882a593Smuzhiyun }
398*4882a593Smuzhiyun 
399*4882a593Smuzhiyun /**
400*4882a593Smuzhiyun  * svc_rdma_encode_write_chunk - Encode one Write chunk
401*4882a593Smuzhiyun  * @src: matching Write chunk in the RPC Call header
402*4882a593Smuzhiyun  * @sctxt: Send context for the RPC Reply
403*4882a593Smuzhiyun  * @remaining: size in bytes of the payload in the Write chunk
404*4882a593Smuzhiyun  *
405*4882a593Smuzhiyun  * Copy a Write chunk from the Call transport header to the
406*4882a593Smuzhiyun  * Reply transport header. Update each segment's length field
407*4882a593Smuzhiyun  * to reflect the number of bytes written in that segment.
408*4882a593Smuzhiyun  *
409*4882a593Smuzhiyun  * Return values:
410*4882a593Smuzhiyun  *   On success, returns length in bytes of the Reply XDR buffer
411*4882a593Smuzhiyun  *   that was consumed by the Write chunk
412*4882a593Smuzhiyun  *   %-EMSGSIZE on XDR buffer overflow
413*4882a593Smuzhiyun  */
svc_rdma_encode_write_chunk(__be32 * src,struct svc_rdma_send_ctxt * sctxt,unsigned int remaining)414*4882a593Smuzhiyun static ssize_t svc_rdma_encode_write_chunk(__be32 *src,
415*4882a593Smuzhiyun 					   struct svc_rdma_send_ctxt *sctxt,
416*4882a593Smuzhiyun 					   unsigned int remaining)
417*4882a593Smuzhiyun {
418*4882a593Smuzhiyun 	unsigned int i, nsegs;
419*4882a593Smuzhiyun 	ssize_t len, ret;
420*4882a593Smuzhiyun 
421*4882a593Smuzhiyun 	len = 0;
422*4882a593Smuzhiyun 	trace_svcrdma_encode_write_chunk(remaining);
423*4882a593Smuzhiyun 
424*4882a593Smuzhiyun 	src++;
425*4882a593Smuzhiyun 	ret = xdr_stream_encode_item_present(&sctxt->sc_stream);
426*4882a593Smuzhiyun 	if (ret < 0)
427*4882a593Smuzhiyun 		return -EMSGSIZE;
428*4882a593Smuzhiyun 	len += ret;
429*4882a593Smuzhiyun 
430*4882a593Smuzhiyun 	nsegs = be32_to_cpup(src++);
431*4882a593Smuzhiyun 	ret = xdr_stream_encode_u32(&sctxt->sc_stream, nsegs);
432*4882a593Smuzhiyun 	if (ret < 0)
433*4882a593Smuzhiyun 		return -EMSGSIZE;
434*4882a593Smuzhiyun 	len += ret;
435*4882a593Smuzhiyun 
436*4882a593Smuzhiyun 	for (i = nsegs; i; i--) {
437*4882a593Smuzhiyun 		ret = svc_rdma_encode_write_segment(src, sctxt, &remaining);
438*4882a593Smuzhiyun 		if (ret < 0)
439*4882a593Smuzhiyun 			return -EMSGSIZE;
440*4882a593Smuzhiyun 		src += rpcrdma_segment_maxsz;
441*4882a593Smuzhiyun 		len += ret;
442*4882a593Smuzhiyun 	}
443*4882a593Smuzhiyun 
444*4882a593Smuzhiyun 	return len;
445*4882a593Smuzhiyun }
446*4882a593Smuzhiyun 
447*4882a593Smuzhiyun /**
448*4882a593Smuzhiyun  * svc_rdma_encode_write_list - Encode RPC Reply's Write chunk list
449*4882a593Smuzhiyun  * @rctxt: Reply context with information about the RPC Call
450*4882a593Smuzhiyun  * @sctxt: Send context for the RPC Reply
451*4882a593Smuzhiyun  * @length: size in bytes of the payload in the first Write chunk
452*4882a593Smuzhiyun  *
453*4882a593Smuzhiyun  * The client provides a Write chunk list in the Call message. Fill
454*4882a593Smuzhiyun  * in the segments in the first Write chunk in the Reply's transport
455*4882a593Smuzhiyun  * header with the number of bytes consumed in each segment.
456*4882a593Smuzhiyun  * Remaining chunks are returned unused.
457*4882a593Smuzhiyun  *
458*4882a593Smuzhiyun  * Assumptions:
459*4882a593Smuzhiyun  *  - Client has provided only one Write chunk
460*4882a593Smuzhiyun  *
461*4882a593Smuzhiyun  * Return values:
462*4882a593Smuzhiyun  *   On success, returns length in bytes of the Reply XDR buffer
463*4882a593Smuzhiyun  *   that was consumed by the Reply's Write list
464*4882a593Smuzhiyun  *   %-EMSGSIZE on XDR buffer overflow
465*4882a593Smuzhiyun  */
466*4882a593Smuzhiyun static ssize_t
svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt * rctxt,struct svc_rdma_send_ctxt * sctxt,unsigned int length)467*4882a593Smuzhiyun svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt,
468*4882a593Smuzhiyun 			   struct svc_rdma_send_ctxt *sctxt,
469*4882a593Smuzhiyun 			   unsigned int length)
470*4882a593Smuzhiyun {
471*4882a593Smuzhiyun 	ssize_t len, ret;
472*4882a593Smuzhiyun 
473*4882a593Smuzhiyun 	ret = svc_rdma_encode_write_chunk(rctxt->rc_write_list, sctxt, length);
474*4882a593Smuzhiyun 	if (ret < 0)
475*4882a593Smuzhiyun 		return ret;
476*4882a593Smuzhiyun 	len = ret;
477*4882a593Smuzhiyun 
478*4882a593Smuzhiyun 	/* Terminate the Write list */
479*4882a593Smuzhiyun 	ret = xdr_stream_encode_item_absent(&sctxt->sc_stream);
480*4882a593Smuzhiyun 	if (ret < 0)
481*4882a593Smuzhiyun 		return ret;
482*4882a593Smuzhiyun 
483*4882a593Smuzhiyun 	return len + ret;
484*4882a593Smuzhiyun }
485*4882a593Smuzhiyun 
486*4882a593Smuzhiyun /**
487*4882a593Smuzhiyun  * svc_rdma_encode_reply_chunk - Encode RPC Reply's Reply chunk
488*4882a593Smuzhiyun  * @rctxt: Reply context with information about the RPC Call
489*4882a593Smuzhiyun  * @sctxt: Send context for the RPC Reply
490*4882a593Smuzhiyun  * @length: size in bytes of the payload in the Reply chunk
491*4882a593Smuzhiyun  *
492*4882a593Smuzhiyun  * Assumptions:
493*4882a593Smuzhiyun  * - Reply can always fit in the client-provided Reply chunk
494*4882a593Smuzhiyun  *
495*4882a593Smuzhiyun  * Return values:
496*4882a593Smuzhiyun  *   On success, returns length in bytes of the Reply XDR buffer
497*4882a593Smuzhiyun  *   that was consumed by the Reply's Reply chunk
498*4882a593Smuzhiyun  *   %-EMSGSIZE on XDR buffer overflow
499*4882a593Smuzhiyun  */
500*4882a593Smuzhiyun static ssize_t
svc_rdma_encode_reply_chunk(const struct svc_rdma_recv_ctxt * rctxt,struct svc_rdma_send_ctxt * sctxt,unsigned int length)501*4882a593Smuzhiyun svc_rdma_encode_reply_chunk(const struct svc_rdma_recv_ctxt *rctxt,
502*4882a593Smuzhiyun 			    struct svc_rdma_send_ctxt *sctxt,
503*4882a593Smuzhiyun 			    unsigned int length)
504*4882a593Smuzhiyun {
505*4882a593Smuzhiyun 	return svc_rdma_encode_write_chunk(rctxt->rc_reply_chunk, sctxt,
506*4882a593Smuzhiyun 					   length);
507*4882a593Smuzhiyun }
508*4882a593Smuzhiyun 
svc_rdma_dma_map_page(struct svcxprt_rdma * rdma,struct svc_rdma_send_ctxt * ctxt,struct page * page,unsigned long offset,unsigned int len)509*4882a593Smuzhiyun static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
510*4882a593Smuzhiyun 				 struct svc_rdma_send_ctxt *ctxt,
511*4882a593Smuzhiyun 				 struct page *page,
512*4882a593Smuzhiyun 				 unsigned long offset,
513*4882a593Smuzhiyun 				 unsigned int len)
514*4882a593Smuzhiyun {
515*4882a593Smuzhiyun 	struct ib_device *dev = rdma->sc_cm_id->device;
516*4882a593Smuzhiyun 	dma_addr_t dma_addr;
517*4882a593Smuzhiyun 
518*4882a593Smuzhiyun 	dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE);
519*4882a593Smuzhiyun 	trace_svcrdma_dma_map_page(rdma, dma_addr, len);
520*4882a593Smuzhiyun 	if (ib_dma_mapping_error(dev, dma_addr))
521*4882a593Smuzhiyun 		goto out_maperr;
522*4882a593Smuzhiyun 
523*4882a593Smuzhiyun 	ctxt->sc_sges[ctxt->sc_cur_sge_no].addr = dma_addr;
524*4882a593Smuzhiyun 	ctxt->sc_sges[ctxt->sc_cur_sge_no].length = len;
525*4882a593Smuzhiyun 	ctxt->sc_send_wr.num_sge++;
526*4882a593Smuzhiyun 	return 0;
527*4882a593Smuzhiyun 
528*4882a593Smuzhiyun out_maperr:
529*4882a593Smuzhiyun 	return -EIO;
530*4882a593Smuzhiyun }
531*4882a593Smuzhiyun 
532*4882a593Smuzhiyun /* ib_dma_map_page() is used here because svc_rdma_dma_unmap()
533*4882a593Smuzhiyun  * handles DMA-unmap and it uses ib_dma_unmap_page() exclusively.
534*4882a593Smuzhiyun  */
svc_rdma_dma_map_buf(struct svcxprt_rdma * rdma,struct svc_rdma_send_ctxt * ctxt,unsigned char * base,unsigned int len)535*4882a593Smuzhiyun static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma,
536*4882a593Smuzhiyun 				struct svc_rdma_send_ctxt *ctxt,
537*4882a593Smuzhiyun 				unsigned char *base,
538*4882a593Smuzhiyun 				unsigned int len)
539*4882a593Smuzhiyun {
540*4882a593Smuzhiyun 	return svc_rdma_dma_map_page(rdma, ctxt, virt_to_page(base),
541*4882a593Smuzhiyun 				     offset_in_page(base), len);
542*4882a593Smuzhiyun }
543*4882a593Smuzhiyun 
544*4882a593Smuzhiyun /**
545*4882a593Smuzhiyun  * svc_rdma_pull_up_needed - Determine whether to use pull-up
546*4882a593Smuzhiyun  * @rdma: controlling transport
547*4882a593Smuzhiyun  * @sctxt: send_ctxt for the Send WR
548*4882a593Smuzhiyun  * @rctxt: Write and Reply chunks provided by client
549*4882a593Smuzhiyun  * @xdr: xdr_buf containing RPC message to transmit
550*4882a593Smuzhiyun  *
551*4882a593Smuzhiyun  * Returns:
552*4882a593Smuzhiyun  *	%true if pull-up must be used
553*4882a593Smuzhiyun  *	%false otherwise
554*4882a593Smuzhiyun  */
svc_rdma_pull_up_needed(struct svcxprt_rdma * rdma,struct svc_rdma_send_ctxt * sctxt,const struct svc_rdma_recv_ctxt * rctxt,struct xdr_buf * xdr)555*4882a593Smuzhiyun static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
556*4882a593Smuzhiyun 				    struct svc_rdma_send_ctxt *sctxt,
557*4882a593Smuzhiyun 				    const struct svc_rdma_recv_ctxt *rctxt,
558*4882a593Smuzhiyun 				    struct xdr_buf *xdr)
559*4882a593Smuzhiyun {
560*4882a593Smuzhiyun 	int elements;
561*4882a593Smuzhiyun 
562*4882a593Smuzhiyun 	/* For small messages, copying bytes is cheaper than DMA mapping.
563*4882a593Smuzhiyun 	 */
564*4882a593Smuzhiyun 	if (sctxt->sc_hdrbuf.len + xdr->len < RPCRDMA_PULLUP_THRESH)
565*4882a593Smuzhiyun 		return true;
566*4882a593Smuzhiyun 
567*4882a593Smuzhiyun 	/* Check whether the xdr_buf has more elements than can
568*4882a593Smuzhiyun 	 * fit in a single RDMA Send.
569*4882a593Smuzhiyun 	 */
570*4882a593Smuzhiyun 	/* xdr->head */
571*4882a593Smuzhiyun 	elements = 1;
572*4882a593Smuzhiyun 
573*4882a593Smuzhiyun 	/* xdr->pages */
574*4882a593Smuzhiyun 	if (!rctxt || !rctxt->rc_write_list) {
575*4882a593Smuzhiyun 		unsigned int remaining;
576*4882a593Smuzhiyun 		unsigned long pageoff;
577*4882a593Smuzhiyun 
578*4882a593Smuzhiyun 		pageoff = xdr->page_base & ~PAGE_MASK;
579*4882a593Smuzhiyun 		remaining = xdr->page_len;
580*4882a593Smuzhiyun 		while (remaining) {
581*4882a593Smuzhiyun 			++elements;
582*4882a593Smuzhiyun 			remaining -= min_t(u32, PAGE_SIZE - pageoff,
583*4882a593Smuzhiyun 					   remaining);
584*4882a593Smuzhiyun 			pageoff = 0;
585*4882a593Smuzhiyun 		}
586*4882a593Smuzhiyun 	}
587*4882a593Smuzhiyun 
588*4882a593Smuzhiyun 	/* xdr->tail */
589*4882a593Smuzhiyun 	if (xdr->tail[0].iov_len)
590*4882a593Smuzhiyun 		++elements;
591*4882a593Smuzhiyun 
592*4882a593Smuzhiyun 	/* assume 1 SGE is needed for the transport header */
593*4882a593Smuzhiyun 	return elements >= rdma->sc_max_send_sges;
594*4882a593Smuzhiyun }
595*4882a593Smuzhiyun 
596*4882a593Smuzhiyun /**
597*4882a593Smuzhiyun  * svc_rdma_pull_up_reply_msg - Copy Reply into a single buffer
598*4882a593Smuzhiyun  * @rdma: controlling transport
599*4882a593Smuzhiyun  * @sctxt: send_ctxt for the Send WR; xprt hdr is already prepared
600*4882a593Smuzhiyun  * @rctxt: Write and Reply chunks provided by client
601*4882a593Smuzhiyun  * @xdr: prepared xdr_buf containing RPC message
602*4882a593Smuzhiyun  *
603*4882a593Smuzhiyun  * The device is not capable of sending the reply directly.
604*4882a593Smuzhiyun  * Assemble the elements of @xdr into the transport header buffer.
605*4882a593Smuzhiyun  *
606*4882a593Smuzhiyun  * Returns zero on success, or a negative errno on failure.
607*4882a593Smuzhiyun  */
svc_rdma_pull_up_reply_msg(struct svcxprt_rdma * rdma,struct svc_rdma_send_ctxt * sctxt,const struct svc_rdma_recv_ctxt * rctxt,const struct xdr_buf * xdr)608*4882a593Smuzhiyun static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma,
609*4882a593Smuzhiyun 				      struct svc_rdma_send_ctxt *sctxt,
610*4882a593Smuzhiyun 				      const struct svc_rdma_recv_ctxt *rctxt,
611*4882a593Smuzhiyun 				      const struct xdr_buf *xdr)
612*4882a593Smuzhiyun {
613*4882a593Smuzhiyun 	unsigned char *dst, *tailbase;
614*4882a593Smuzhiyun 	unsigned int taillen;
615*4882a593Smuzhiyun 
616*4882a593Smuzhiyun 	dst = sctxt->sc_xprt_buf + sctxt->sc_hdrbuf.len;
617*4882a593Smuzhiyun 	memcpy(dst, xdr->head[0].iov_base, xdr->head[0].iov_len);
618*4882a593Smuzhiyun 	dst += xdr->head[0].iov_len;
619*4882a593Smuzhiyun 
620*4882a593Smuzhiyun 	tailbase = xdr->tail[0].iov_base;
621*4882a593Smuzhiyun 	taillen = xdr->tail[0].iov_len;
622*4882a593Smuzhiyun 	if (rctxt && rctxt->rc_write_list) {
623*4882a593Smuzhiyun 		u32 xdrpad;
624*4882a593Smuzhiyun 
625*4882a593Smuzhiyun 		xdrpad = xdr_pad_size(xdr->page_len);
626*4882a593Smuzhiyun 		if (taillen && xdrpad) {
627*4882a593Smuzhiyun 			tailbase += xdrpad;
628*4882a593Smuzhiyun 			taillen -= xdrpad;
629*4882a593Smuzhiyun 		}
630*4882a593Smuzhiyun 	} else {
631*4882a593Smuzhiyun 		unsigned int len, remaining;
632*4882a593Smuzhiyun 		unsigned long pageoff;
633*4882a593Smuzhiyun 		struct page **ppages;
634*4882a593Smuzhiyun 
635*4882a593Smuzhiyun 		ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
636*4882a593Smuzhiyun 		pageoff = xdr->page_base & ~PAGE_MASK;
637*4882a593Smuzhiyun 		remaining = xdr->page_len;
638*4882a593Smuzhiyun 		while (remaining) {
639*4882a593Smuzhiyun 			len = min_t(u32, PAGE_SIZE - pageoff, remaining);
640*4882a593Smuzhiyun 
641*4882a593Smuzhiyun 			memcpy(dst, page_address(*ppages) + pageoff, len);
642*4882a593Smuzhiyun 			remaining -= len;
643*4882a593Smuzhiyun 			dst += len;
644*4882a593Smuzhiyun 			pageoff = 0;
645*4882a593Smuzhiyun 			ppages++;
646*4882a593Smuzhiyun 		}
647*4882a593Smuzhiyun 	}
648*4882a593Smuzhiyun 
649*4882a593Smuzhiyun 	if (taillen)
650*4882a593Smuzhiyun 		memcpy(dst, tailbase, taillen);
651*4882a593Smuzhiyun 
652*4882a593Smuzhiyun 	sctxt->sc_sges[0].length += xdr->len;
653*4882a593Smuzhiyun 	trace_svcrdma_send_pullup(sctxt->sc_sges[0].length);
654*4882a593Smuzhiyun 	return 0;
655*4882a593Smuzhiyun }
656*4882a593Smuzhiyun 
657*4882a593Smuzhiyun /* svc_rdma_map_reply_msg - DMA map the buffer holding RPC message
658*4882a593Smuzhiyun  * @rdma: controlling transport
659*4882a593Smuzhiyun  * @sctxt: send_ctxt for the Send WR
660*4882a593Smuzhiyun  * @rctxt: Write and Reply chunks provided by client
661*4882a593Smuzhiyun  * @xdr: prepared xdr_buf containing RPC message
662*4882a593Smuzhiyun  *
663*4882a593Smuzhiyun  * Load the xdr_buf into the ctxt's sge array, and DMA map each
664*4882a593Smuzhiyun  * element as it is added. The Send WR's num_sge field is set.
665*4882a593Smuzhiyun  *
666*4882a593Smuzhiyun  * Returns zero on success, or a negative errno on failure.
667*4882a593Smuzhiyun  */
svc_rdma_map_reply_msg(struct svcxprt_rdma * rdma,struct svc_rdma_send_ctxt * sctxt,const struct svc_rdma_recv_ctxt * rctxt,struct xdr_buf * xdr)668*4882a593Smuzhiyun int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
669*4882a593Smuzhiyun 			   struct svc_rdma_send_ctxt *sctxt,
670*4882a593Smuzhiyun 			   const struct svc_rdma_recv_ctxt *rctxt,
671*4882a593Smuzhiyun 			   struct xdr_buf *xdr)
672*4882a593Smuzhiyun {
673*4882a593Smuzhiyun 	unsigned int len, remaining;
674*4882a593Smuzhiyun 	unsigned long page_off;
675*4882a593Smuzhiyun 	struct page **ppages;
676*4882a593Smuzhiyun 	unsigned char *base;
677*4882a593Smuzhiyun 	u32 xdr_pad;
678*4882a593Smuzhiyun 	int ret;
679*4882a593Smuzhiyun 
680*4882a593Smuzhiyun 	/* Set up the (persistently-mapped) transport header SGE. */
681*4882a593Smuzhiyun 	sctxt->sc_send_wr.num_sge = 1;
682*4882a593Smuzhiyun 	sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len;
683*4882a593Smuzhiyun 
684*4882a593Smuzhiyun 	/* If there is a Reply chunk, nothing follows the transport
685*4882a593Smuzhiyun 	 * header, and we're done here.
686*4882a593Smuzhiyun 	 */
687*4882a593Smuzhiyun 	if (rctxt && rctxt->rc_reply_chunk)
688*4882a593Smuzhiyun 		return 0;
689*4882a593Smuzhiyun 
690*4882a593Smuzhiyun 	/* For pull-up, svc_rdma_send() will sync the transport header.
691*4882a593Smuzhiyun 	 * No additional DMA mapping is necessary.
692*4882a593Smuzhiyun 	 */
693*4882a593Smuzhiyun 	if (svc_rdma_pull_up_needed(rdma, sctxt, rctxt, xdr))
694*4882a593Smuzhiyun 		return svc_rdma_pull_up_reply_msg(rdma, sctxt, rctxt, xdr);
695*4882a593Smuzhiyun 
696*4882a593Smuzhiyun 	++sctxt->sc_cur_sge_no;
697*4882a593Smuzhiyun 	ret = svc_rdma_dma_map_buf(rdma, sctxt,
698*4882a593Smuzhiyun 				   xdr->head[0].iov_base,
699*4882a593Smuzhiyun 				   xdr->head[0].iov_len);
700*4882a593Smuzhiyun 	if (ret < 0)
701*4882a593Smuzhiyun 		return ret;
702*4882a593Smuzhiyun 
703*4882a593Smuzhiyun 	/* If a Write chunk is present, the xdr_buf's page list
704*4882a593Smuzhiyun 	 * is not included inline. However the Upper Layer may
705*4882a593Smuzhiyun 	 * have added XDR padding in the tail buffer, and that
706*4882a593Smuzhiyun 	 * should not be included inline.
707*4882a593Smuzhiyun 	 */
708*4882a593Smuzhiyun 	if (rctxt && rctxt->rc_write_list) {
709*4882a593Smuzhiyun 		base = xdr->tail[0].iov_base;
710*4882a593Smuzhiyun 		len = xdr->tail[0].iov_len;
711*4882a593Smuzhiyun 		xdr_pad = xdr_pad_size(xdr->page_len);
712*4882a593Smuzhiyun 
713*4882a593Smuzhiyun 		if (len && xdr_pad) {
714*4882a593Smuzhiyun 			base += xdr_pad;
715*4882a593Smuzhiyun 			len -= xdr_pad;
716*4882a593Smuzhiyun 		}
717*4882a593Smuzhiyun 
718*4882a593Smuzhiyun 		goto tail;
719*4882a593Smuzhiyun 	}
720*4882a593Smuzhiyun 
721*4882a593Smuzhiyun 	ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
722*4882a593Smuzhiyun 	page_off = xdr->page_base & ~PAGE_MASK;
723*4882a593Smuzhiyun 	remaining = xdr->page_len;
724*4882a593Smuzhiyun 	while (remaining) {
725*4882a593Smuzhiyun 		len = min_t(u32, PAGE_SIZE - page_off, remaining);
726*4882a593Smuzhiyun 
727*4882a593Smuzhiyun 		++sctxt->sc_cur_sge_no;
728*4882a593Smuzhiyun 		ret = svc_rdma_dma_map_page(rdma, sctxt, *ppages++,
729*4882a593Smuzhiyun 					    page_off, len);
730*4882a593Smuzhiyun 		if (ret < 0)
731*4882a593Smuzhiyun 			return ret;
732*4882a593Smuzhiyun 
733*4882a593Smuzhiyun 		remaining -= len;
734*4882a593Smuzhiyun 		page_off = 0;
735*4882a593Smuzhiyun 	}
736*4882a593Smuzhiyun 
737*4882a593Smuzhiyun 	base = xdr->tail[0].iov_base;
738*4882a593Smuzhiyun 	len = xdr->tail[0].iov_len;
739*4882a593Smuzhiyun tail:
740*4882a593Smuzhiyun 	if (len) {
741*4882a593Smuzhiyun 		++sctxt->sc_cur_sge_no;
742*4882a593Smuzhiyun 		ret = svc_rdma_dma_map_buf(rdma, sctxt, base, len);
743*4882a593Smuzhiyun 		if (ret < 0)
744*4882a593Smuzhiyun 			return ret;
745*4882a593Smuzhiyun 	}
746*4882a593Smuzhiyun 
747*4882a593Smuzhiyun 	return 0;
748*4882a593Smuzhiyun }
749*4882a593Smuzhiyun 
750*4882a593Smuzhiyun /* The svc_rqst and all resources it owns are released as soon as
751*4882a593Smuzhiyun  * svc_rdma_sendto returns. Transfer pages under I/O to the ctxt
752*4882a593Smuzhiyun  * so they are released by the Send completion handler.
753*4882a593Smuzhiyun  */
svc_rdma_save_io_pages(struct svc_rqst * rqstp,struct svc_rdma_send_ctxt * ctxt)754*4882a593Smuzhiyun static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
755*4882a593Smuzhiyun 				   struct svc_rdma_send_ctxt *ctxt)
756*4882a593Smuzhiyun {
757*4882a593Smuzhiyun 	int i, pages = rqstp->rq_next_page - rqstp->rq_respages;
758*4882a593Smuzhiyun 
759*4882a593Smuzhiyun 	ctxt->sc_page_count += pages;
760*4882a593Smuzhiyun 	for (i = 0; i < pages; i++) {
761*4882a593Smuzhiyun 		ctxt->sc_pages[i] = rqstp->rq_respages[i];
762*4882a593Smuzhiyun 		rqstp->rq_respages[i] = NULL;
763*4882a593Smuzhiyun 	}
764*4882a593Smuzhiyun 
765*4882a593Smuzhiyun 	/* Prevent svc_xprt_release from releasing pages in rq_pages */
766*4882a593Smuzhiyun 	rqstp->rq_next_page = rqstp->rq_respages;
767*4882a593Smuzhiyun }
768*4882a593Smuzhiyun 
769*4882a593Smuzhiyun /* Prepare the portion of the RPC Reply that will be transmitted
770*4882a593Smuzhiyun  * via RDMA Send. The RPC-over-RDMA transport header is prepared
771*4882a593Smuzhiyun  * in sc_sges[0], and the RPC xdr_buf is prepared in following sges.
772*4882a593Smuzhiyun  *
773*4882a593Smuzhiyun  * Depending on whether a Write list or Reply chunk is present,
774*4882a593Smuzhiyun  * the server may send all, a portion of, or none of the xdr_buf.
775*4882a593Smuzhiyun  * In the latter case, only the transport header (sc_sges[0]) is
776*4882a593Smuzhiyun  * transmitted.
777*4882a593Smuzhiyun  *
778*4882a593Smuzhiyun  * RDMA Send is the last step of transmitting an RPC reply. Pages
779*4882a593Smuzhiyun  * involved in the earlier RDMA Writes are here transferred out
780*4882a593Smuzhiyun  * of the rqstp and into the sctxt's page array. These pages are
781*4882a593Smuzhiyun  * DMA unmapped by each Write completion, but the subsequent Send
782*4882a593Smuzhiyun  * completion finally releases these pages.
783*4882a593Smuzhiyun  *
784*4882a593Smuzhiyun  * Assumptions:
785*4882a593Smuzhiyun  * - The Reply's transport header will never be larger than a page.
786*4882a593Smuzhiyun  */
svc_rdma_send_reply_msg(struct svcxprt_rdma * rdma,struct svc_rdma_send_ctxt * sctxt,const struct svc_rdma_recv_ctxt * rctxt,struct svc_rqst * rqstp)787*4882a593Smuzhiyun static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
788*4882a593Smuzhiyun 				   struct svc_rdma_send_ctxt *sctxt,
789*4882a593Smuzhiyun 				   const struct svc_rdma_recv_ctxt *rctxt,
790*4882a593Smuzhiyun 				   struct svc_rqst *rqstp)
791*4882a593Smuzhiyun {
792*4882a593Smuzhiyun 	int ret;
793*4882a593Smuzhiyun 
794*4882a593Smuzhiyun 	ret = svc_rdma_map_reply_msg(rdma, sctxt, rctxt, &rqstp->rq_res);
795*4882a593Smuzhiyun 	if (ret < 0)
796*4882a593Smuzhiyun 		return ret;
797*4882a593Smuzhiyun 
798*4882a593Smuzhiyun 	svc_rdma_save_io_pages(rqstp, sctxt);
799*4882a593Smuzhiyun 
800*4882a593Smuzhiyun 	if (rctxt->rc_inv_rkey) {
801*4882a593Smuzhiyun 		sctxt->sc_send_wr.opcode = IB_WR_SEND_WITH_INV;
802*4882a593Smuzhiyun 		sctxt->sc_send_wr.ex.invalidate_rkey = rctxt->rc_inv_rkey;
803*4882a593Smuzhiyun 	} else {
804*4882a593Smuzhiyun 		sctxt->sc_send_wr.opcode = IB_WR_SEND;
805*4882a593Smuzhiyun 	}
806*4882a593Smuzhiyun 	return svc_rdma_send(rdma, sctxt);
807*4882a593Smuzhiyun }
808*4882a593Smuzhiyun 
809*4882a593Smuzhiyun /**
810*4882a593Smuzhiyun  * svc_rdma_send_error_msg - Send an RPC/RDMA v1 error response
811*4882a593Smuzhiyun  * @rdma: controlling transport context
812*4882a593Smuzhiyun  * @sctxt: Send context for the response
813*4882a593Smuzhiyun  * @rctxt: Receive context for incoming bad message
814*4882a593Smuzhiyun  * @status: negative errno indicating error that occurred
815*4882a593Smuzhiyun  *
816*4882a593Smuzhiyun  * Given the client-provided Read, Write, and Reply chunks, the
817*4882a593Smuzhiyun  * server was not able to parse the Call or form a complete Reply.
818*4882a593Smuzhiyun  * Return an RDMA_ERROR message so the client can retire the RPC
819*4882a593Smuzhiyun  * transaction.
820*4882a593Smuzhiyun  *
821*4882a593Smuzhiyun  * The caller does not have to release @sctxt. It is released by
822*4882a593Smuzhiyun  * Send completion, or by this function on error.
823*4882a593Smuzhiyun  */
svc_rdma_send_error_msg(struct svcxprt_rdma * rdma,struct svc_rdma_send_ctxt * sctxt,struct svc_rdma_recv_ctxt * rctxt,int status)824*4882a593Smuzhiyun void svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
825*4882a593Smuzhiyun 			     struct svc_rdma_send_ctxt *sctxt,
826*4882a593Smuzhiyun 			     struct svc_rdma_recv_ctxt *rctxt,
827*4882a593Smuzhiyun 			     int status)
828*4882a593Smuzhiyun {
829*4882a593Smuzhiyun 	__be32 *rdma_argp = rctxt->rc_recv_buf;
830*4882a593Smuzhiyun 	__be32 *p;
831*4882a593Smuzhiyun 
832*4882a593Smuzhiyun 	rpcrdma_set_xdrlen(&sctxt->sc_hdrbuf, 0);
833*4882a593Smuzhiyun 	xdr_init_encode(&sctxt->sc_stream, &sctxt->sc_hdrbuf,
834*4882a593Smuzhiyun 			sctxt->sc_xprt_buf, NULL);
835*4882a593Smuzhiyun 
836*4882a593Smuzhiyun 	p = xdr_reserve_space(&sctxt->sc_stream,
837*4882a593Smuzhiyun 			      rpcrdma_fixed_maxsz * sizeof(*p));
838*4882a593Smuzhiyun 	if (!p)
839*4882a593Smuzhiyun 		goto put_ctxt;
840*4882a593Smuzhiyun 
841*4882a593Smuzhiyun 	*p++ = *rdma_argp;
842*4882a593Smuzhiyun 	*p++ = *(rdma_argp + 1);
843*4882a593Smuzhiyun 	*p++ = rdma->sc_fc_credits;
844*4882a593Smuzhiyun 	*p = rdma_error;
845*4882a593Smuzhiyun 
846*4882a593Smuzhiyun 	switch (status) {
847*4882a593Smuzhiyun 	case -EPROTONOSUPPORT:
848*4882a593Smuzhiyun 		p = xdr_reserve_space(&sctxt->sc_stream, 3 * sizeof(*p));
849*4882a593Smuzhiyun 		if (!p)
850*4882a593Smuzhiyun 			goto put_ctxt;
851*4882a593Smuzhiyun 
852*4882a593Smuzhiyun 		*p++ = err_vers;
853*4882a593Smuzhiyun 		*p++ = rpcrdma_version;
854*4882a593Smuzhiyun 		*p = rpcrdma_version;
855*4882a593Smuzhiyun 		trace_svcrdma_err_vers(*rdma_argp);
856*4882a593Smuzhiyun 		break;
857*4882a593Smuzhiyun 	default:
858*4882a593Smuzhiyun 		p = xdr_reserve_space(&sctxt->sc_stream, sizeof(*p));
859*4882a593Smuzhiyun 		if (!p)
860*4882a593Smuzhiyun 			goto put_ctxt;
861*4882a593Smuzhiyun 
862*4882a593Smuzhiyun 		*p = err_chunk;
863*4882a593Smuzhiyun 		trace_svcrdma_err_chunk(*rdma_argp);
864*4882a593Smuzhiyun 	}
865*4882a593Smuzhiyun 
866*4882a593Smuzhiyun 	/* Remote Invalidation is skipped for simplicity. */
867*4882a593Smuzhiyun 	sctxt->sc_send_wr.num_sge = 1;
868*4882a593Smuzhiyun 	sctxt->sc_send_wr.opcode = IB_WR_SEND;
869*4882a593Smuzhiyun 	sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len;
870*4882a593Smuzhiyun 	if (svc_rdma_send(rdma, sctxt))
871*4882a593Smuzhiyun 		goto put_ctxt;
872*4882a593Smuzhiyun 	return;
873*4882a593Smuzhiyun 
874*4882a593Smuzhiyun put_ctxt:
875*4882a593Smuzhiyun 	svc_rdma_send_ctxt_put(rdma, sctxt);
876*4882a593Smuzhiyun }
877*4882a593Smuzhiyun 
878*4882a593Smuzhiyun /**
879*4882a593Smuzhiyun  * svc_rdma_sendto - Transmit an RPC reply
880*4882a593Smuzhiyun  * @rqstp: processed RPC request, reply XDR already in ::rq_res
881*4882a593Smuzhiyun  *
882*4882a593Smuzhiyun  * Any resources still associated with @rqstp are released upon return.
883*4882a593Smuzhiyun  * If no reply message was possible, the connection is closed.
884*4882a593Smuzhiyun  *
885*4882a593Smuzhiyun  * Returns:
886*4882a593Smuzhiyun  *	%0 if an RPC reply has been successfully posted,
887*4882a593Smuzhiyun  *	%-ENOMEM if a resource shortage occurred (connection is lost),
888*4882a593Smuzhiyun  *	%-ENOTCONN if posting failed (connection is lost).
889*4882a593Smuzhiyun  */
svc_rdma_sendto(struct svc_rqst * rqstp)890*4882a593Smuzhiyun int svc_rdma_sendto(struct svc_rqst *rqstp)
891*4882a593Smuzhiyun {
892*4882a593Smuzhiyun 	struct svc_xprt *xprt = rqstp->rq_xprt;
893*4882a593Smuzhiyun 	struct svcxprt_rdma *rdma =
894*4882a593Smuzhiyun 		container_of(xprt, struct svcxprt_rdma, sc_xprt);
895*4882a593Smuzhiyun 	struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
896*4882a593Smuzhiyun 	__be32 *rdma_argp = rctxt->rc_recv_buf;
897*4882a593Smuzhiyun 	__be32 *wr_lst = rctxt->rc_write_list;
898*4882a593Smuzhiyun 	__be32 *rp_ch = rctxt->rc_reply_chunk;
899*4882a593Smuzhiyun 	struct xdr_buf *xdr = &rqstp->rq_res;
900*4882a593Smuzhiyun 	struct svc_rdma_send_ctxt *sctxt;
901*4882a593Smuzhiyun 	__be32 *p;
902*4882a593Smuzhiyun 	int ret;
903*4882a593Smuzhiyun 
904*4882a593Smuzhiyun 	ret = -ENOTCONN;
905*4882a593Smuzhiyun 	if (svc_xprt_is_dead(xprt))
906*4882a593Smuzhiyun 		goto err0;
907*4882a593Smuzhiyun 
908*4882a593Smuzhiyun 	ret = -ENOMEM;
909*4882a593Smuzhiyun 	sctxt = svc_rdma_send_ctxt_get(rdma);
910*4882a593Smuzhiyun 	if (!sctxt)
911*4882a593Smuzhiyun 		goto err0;
912*4882a593Smuzhiyun 
913*4882a593Smuzhiyun 	p = xdr_reserve_space(&sctxt->sc_stream,
914*4882a593Smuzhiyun 			      rpcrdma_fixed_maxsz * sizeof(*p));
915*4882a593Smuzhiyun 	if (!p)
916*4882a593Smuzhiyun 		goto err0;
917*4882a593Smuzhiyun 	*p++ = *rdma_argp;
918*4882a593Smuzhiyun 	*p++ = *(rdma_argp + 1);
919*4882a593Smuzhiyun 	*p++ = rdma->sc_fc_credits;
920*4882a593Smuzhiyun 	*p   = rp_ch ? rdma_nomsg : rdma_msg;
921*4882a593Smuzhiyun 
922*4882a593Smuzhiyun 	if (svc_rdma_encode_read_list(sctxt) < 0)
923*4882a593Smuzhiyun 		goto err0;
924*4882a593Smuzhiyun 	if (wr_lst) {
925*4882a593Smuzhiyun 		/* XXX: Presume the client sent only one Write chunk */
926*4882a593Smuzhiyun 		unsigned long offset;
927*4882a593Smuzhiyun 		unsigned int length;
928*4882a593Smuzhiyun 
929*4882a593Smuzhiyun 		if (rctxt->rc_read_payload_length) {
930*4882a593Smuzhiyun 			offset = rctxt->rc_read_payload_offset;
931*4882a593Smuzhiyun 			length = rctxt->rc_read_payload_length;
932*4882a593Smuzhiyun 		} else {
933*4882a593Smuzhiyun 			offset = xdr->head[0].iov_len;
934*4882a593Smuzhiyun 			length = xdr->page_len;
935*4882a593Smuzhiyun 		}
936*4882a593Smuzhiyun 		ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr, offset,
937*4882a593Smuzhiyun 						length);
938*4882a593Smuzhiyun 		if (ret < 0)
939*4882a593Smuzhiyun 			goto err2;
940*4882a593Smuzhiyun 		if (svc_rdma_encode_write_list(rctxt, sctxt, length) < 0)
941*4882a593Smuzhiyun 			goto err0;
942*4882a593Smuzhiyun 	} else {
943*4882a593Smuzhiyun 		if (xdr_stream_encode_item_absent(&sctxt->sc_stream) < 0)
944*4882a593Smuzhiyun 			goto err0;
945*4882a593Smuzhiyun 	}
946*4882a593Smuzhiyun 	if (rp_ch) {
947*4882a593Smuzhiyun 		ret = svc_rdma_send_reply_chunk(rdma, rctxt, &rqstp->rq_res);
948*4882a593Smuzhiyun 		if (ret < 0)
949*4882a593Smuzhiyun 			goto err2;
950*4882a593Smuzhiyun 		if (svc_rdma_encode_reply_chunk(rctxt, sctxt, ret) < 0)
951*4882a593Smuzhiyun 			goto err0;
952*4882a593Smuzhiyun 	} else {
953*4882a593Smuzhiyun 		if (xdr_stream_encode_item_absent(&sctxt->sc_stream) < 0)
954*4882a593Smuzhiyun 			goto err0;
955*4882a593Smuzhiyun 	}
956*4882a593Smuzhiyun 
957*4882a593Smuzhiyun 	ret = svc_rdma_send_reply_msg(rdma, sctxt, rctxt, rqstp);
958*4882a593Smuzhiyun 	if (ret < 0)
959*4882a593Smuzhiyun 		goto err1;
960*4882a593Smuzhiyun 	return 0;
961*4882a593Smuzhiyun 
962*4882a593Smuzhiyun  err2:
963*4882a593Smuzhiyun 	if (ret != -E2BIG && ret != -EINVAL)
964*4882a593Smuzhiyun 		goto err1;
965*4882a593Smuzhiyun 
966*4882a593Smuzhiyun 	/* Send completion releases payload pages that were part
967*4882a593Smuzhiyun 	 * of previously posted RDMA Writes.
968*4882a593Smuzhiyun 	 */
969*4882a593Smuzhiyun 	svc_rdma_save_io_pages(rqstp, sctxt);
970*4882a593Smuzhiyun 	svc_rdma_send_error_msg(rdma, sctxt, rctxt, ret);
971*4882a593Smuzhiyun 	return 0;
972*4882a593Smuzhiyun 
973*4882a593Smuzhiyun  err1:
974*4882a593Smuzhiyun 	svc_rdma_send_ctxt_put(rdma, sctxt);
975*4882a593Smuzhiyun  err0:
976*4882a593Smuzhiyun 	trace_svcrdma_send_err(rqstp, ret);
977*4882a593Smuzhiyun 	set_bit(XPT_CLOSE, &xprt->xpt_flags);
978*4882a593Smuzhiyun 	return -ENOTCONN;
979*4882a593Smuzhiyun }
980*4882a593Smuzhiyun 
981*4882a593Smuzhiyun /**
982*4882a593Smuzhiyun  * svc_rdma_read_payload - special processing for a READ payload
983*4882a593Smuzhiyun  * @rqstp: svc_rqst to operate on
984*4882a593Smuzhiyun  * @offset: payload's byte offset in @xdr
985*4882a593Smuzhiyun  * @length: size of payload, in bytes
986*4882a593Smuzhiyun  *
987*4882a593Smuzhiyun  * Returns zero on success.
988*4882a593Smuzhiyun  *
989*4882a593Smuzhiyun  * For the moment, just record the xdr_buf location of the READ
990*4882a593Smuzhiyun  * payload. svc_rdma_sendto will use that location later when
991*4882a593Smuzhiyun  * we actually send the payload.
992*4882a593Smuzhiyun  */
svc_rdma_read_payload(struct svc_rqst * rqstp,unsigned int offset,unsigned int length)993*4882a593Smuzhiyun int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset,
994*4882a593Smuzhiyun 			  unsigned int length)
995*4882a593Smuzhiyun {
996*4882a593Smuzhiyun 	struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
997*4882a593Smuzhiyun 
998*4882a593Smuzhiyun 	/* XXX: Just one READ payload slot for now, since our
999*4882a593Smuzhiyun 	 * transport implementation currently supports only one
1000*4882a593Smuzhiyun 	 * Write chunk.
1001*4882a593Smuzhiyun 	 */
1002*4882a593Smuzhiyun 	rctxt->rc_read_payload_offset = offset;
1003*4882a593Smuzhiyun 	rctxt->rc_read_payload_length = length;
1004*4882a593Smuzhiyun 
1005*4882a593Smuzhiyun 	return 0;
1006*4882a593Smuzhiyun }
1007