1*4882a593Smuzhiyun // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
2*4882a593Smuzhiyun /*
3*4882a593Smuzhiyun * Copyright (c) 2016-2018 Oracle. All rights reserved.
4*4882a593Smuzhiyun * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
5*4882a593Smuzhiyun * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
6*4882a593Smuzhiyun *
7*4882a593Smuzhiyun * This software is available to you under a choice of one of two
8*4882a593Smuzhiyun * licenses. You may choose to be licensed under the terms of the GNU
9*4882a593Smuzhiyun * General Public License (GPL) Version 2, available from the file
10*4882a593Smuzhiyun * COPYING in the main directory of this source tree, or the BSD-type
11*4882a593Smuzhiyun * license below:
12*4882a593Smuzhiyun *
13*4882a593Smuzhiyun * Redistribution and use in source and binary forms, with or without
14*4882a593Smuzhiyun * modification, are permitted provided that the following conditions
15*4882a593Smuzhiyun * are met:
16*4882a593Smuzhiyun *
17*4882a593Smuzhiyun * Redistributions of source code must retain the above copyright
18*4882a593Smuzhiyun * notice, this list of conditions and the following disclaimer.
19*4882a593Smuzhiyun *
20*4882a593Smuzhiyun * Redistributions in binary form must reproduce the above
21*4882a593Smuzhiyun * copyright notice, this list of conditions and the following
22*4882a593Smuzhiyun * disclaimer in the documentation and/or other materials provided
23*4882a593Smuzhiyun * with the distribution.
24*4882a593Smuzhiyun *
25*4882a593Smuzhiyun * Neither the name of the Network Appliance, Inc. nor the names of
26*4882a593Smuzhiyun * its contributors may be used to endorse or promote products
27*4882a593Smuzhiyun * derived from this software without specific prior written
28*4882a593Smuzhiyun * permission.
29*4882a593Smuzhiyun *
30*4882a593Smuzhiyun * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
31*4882a593Smuzhiyun * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
32*4882a593Smuzhiyun * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
33*4882a593Smuzhiyun * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
34*4882a593Smuzhiyun * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
35*4882a593Smuzhiyun * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
36*4882a593Smuzhiyun * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
37*4882a593Smuzhiyun * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
38*4882a593Smuzhiyun * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
39*4882a593Smuzhiyun * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
40*4882a593Smuzhiyun * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41*4882a593Smuzhiyun *
42*4882a593Smuzhiyun * Author: Tom Tucker <tom@opengridcomputing.com>
43*4882a593Smuzhiyun */
44*4882a593Smuzhiyun
45*4882a593Smuzhiyun /* Operation
46*4882a593Smuzhiyun *
47*4882a593Smuzhiyun * The main entry point is svc_rdma_sendto. This is called by the
48*4882a593Smuzhiyun * RPC server when an RPC Reply is ready to be transmitted to a client.
49*4882a593Smuzhiyun *
50*4882a593Smuzhiyun * The passed-in svc_rqst contains a struct xdr_buf which holds an
51*4882a593Smuzhiyun * XDR-encoded RPC Reply message. sendto must construct the RPC-over-RDMA
52*4882a593Smuzhiyun * transport header, post all Write WRs needed for this Reply, then post
53*4882a593Smuzhiyun * a Send WR conveying the transport header and the RPC message itself to
54*4882a593Smuzhiyun * the client.
55*4882a593Smuzhiyun *
56*4882a593Smuzhiyun * svc_rdma_sendto must fully transmit the Reply before returning, as
57*4882a593Smuzhiyun * the svc_rqst will be recycled as soon as sendto returns. Remaining
58*4882a593Smuzhiyun * resources referred to by the svc_rqst are also recycled at that time.
59*4882a593Smuzhiyun * Therefore any resources that must remain longer must be detached
60*4882a593Smuzhiyun * from the svc_rqst and released later.
61*4882a593Smuzhiyun *
62*4882a593Smuzhiyun * Page Management
63*4882a593Smuzhiyun *
64*4882a593Smuzhiyun * The I/O that performs Reply transmission is asynchronous, and may
65*4882a593Smuzhiyun * complete well after sendto returns. Thus pages under I/O must be
66*4882a593Smuzhiyun * removed from the svc_rqst before sendto returns.
67*4882a593Smuzhiyun *
68*4882a593Smuzhiyun * The logic here depends on Send Queue and completion ordering. Since
69*4882a593Smuzhiyun * the Send WR is always posted last, it will always complete last. Thus
70*4882a593Smuzhiyun * when it completes, it is guaranteed that all previous Write WRs have
71*4882a593Smuzhiyun * also completed.
72*4882a593Smuzhiyun *
73*4882a593Smuzhiyun * Write WRs are constructed and posted. Each Write segment gets its own
74*4882a593Smuzhiyun * svc_rdma_rw_ctxt, allowing the Write completion handler to find and
75*4882a593Smuzhiyun * DMA-unmap the pages under I/O for that Write segment. The Write
76*4882a593Smuzhiyun * completion handler does not release any pages.
77*4882a593Smuzhiyun *
78*4882a593Smuzhiyun * When the Send WR is constructed, it also gets its own svc_rdma_send_ctxt.
79*4882a593Smuzhiyun * The ownership of all of the Reply's pages are transferred into that
80*4882a593Smuzhiyun * ctxt, the Send WR is posted, and sendto returns.
81*4882a593Smuzhiyun *
82*4882a593Smuzhiyun * The svc_rdma_send_ctxt is presented when the Send WR completes. The
83*4882a593Smuzhiyun * Send completion handler finally releases the Reply's pages.
84*4882a593Smuzhiyun *
85*4882a593Smuzhiyun * This mechanism also assumes that completions on the transport's Send
86*4882a593Smuzhiyun * Completion Queue do not run in parallel. Otherwise a Write completion
87*4882a593Smuzhiyun * and Send completion running at the same time could release pages that
88*4882a593Smuzhiyun * are still DMA-mapped.
89*4882a593Smuzhiyun *
90*4882a593Smuzhiyun * Error Handling
91*4882a593Smuzhiyun *
92*4882a593Smuzhiyun * - If the Send WR is posted successfully, it will either complete
93*4882a593Smuzhiyun * successfully, or get flushed. Either way, the Send completion
94*4882a593Smuzhiyun * handler releases the Reply's pages.
95*4882a593Smuzhiyun * - If the Send WR cannot be not posted, the forward path releases
96*4882a593Smuzhiyun * the Reply's pages.
97*4882a593Smuzhiyun *
98*4882a593Smuzhiyun * This handles the case, without the use of page reference counting,
99*4882a593Smuzhiyun * where two different Write segments send portions of the same page.
100*4882a593Smuzhiyun */
101*4882a593Smuzhiyun
102*4882a593Smuzhiyun #include <linux/spinlock.h>
103*4882a593Smuzhiyun #include <asm/unaligned.h>
104*4882a593Smuzhiyun
105*4882a593Smuzhiyun #include <rdma/ib_verbs.h>
106*4882a593Smuzhiyun #include <rdma/rdma_cm.h>
107*4882a593Smuzhiyun
108*4882a593Smuzhiyun #include <linux/sunrpc/debug.h>
109*4882a593Smuzhiyun #include <linux/sunrpc/svc_rdma.h>
110*4882a593Smuzhiyun
111*4882a593Smuzhiyun #include "xprt_rdma.h"
112*4882a593Smuzhiyun #include <trace/events/rpcrdma.h>
113*4882a593Smuzhiyun
114*4882a593Smuzhiyun #define RPCDBG_FACILITY RPCDBG_SVCXPRT
115*4882a593Smuzhiyun
116*4882a593Smuzhiyun static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc);
117*4882a593Smuzhiyun
118*4882a593Smuzhiyun static inline struct svc_rdma_send_ctxt *
svc_rdma_next_send_ctxt(struct list_head * list)119*4882a593Smuzhiyun svc_rdma_next_send_ctxt(struct list_head *list)
120*4882a593Smuzhiyun {
121*4882a593Smuzhiyun return list_first_entry_or_null(list, struct svc_rdma_send_ctxt,
122*4882a593Smuzhiyun sc_list);
123*4882a593Smuzhiyun }
124*4882a593Smuzhiyun
svc_rdma_send_cid_init(struct svcxprt_rdma * rdma,struct rpc_rdma_cid * cid)125*4882a593Smuzhiyun static void svc_rdma_send_cid_init(struct svcxprt_rdma *rdma,
126*4882a593Smuzhiyun struct rpc_rdma_cid *cid)
127*4882a593Smuzhiyun {
128*4882a593Smuzhiyun cid->ci_queue_id = rdma->sc_sq_cq->res.id;
129*4882a593Smuzhiyun cid->ci_completion_id = atomic_inc_return(&rdma->sc_completion_ids);
130*4882a593Smuzhiyun }
131*4882a593Smuzhiyun
132*4882a593Smuzhiyun static struct svc_rdma_send_ctxt *
svc_rdma_send_ctxt_alloc(struct svcxprt_rdma * rdma)133*4882a593Smuzhiyun svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma)
134*4882a593Smuzhiyun {
135*4882a593Smuzhiyun struct svc_rdma_send_ctxt *ctxt;
136*4882a593Smuzhiyun dma_addr_t addr;
137*4882a593Smuzhiyun void *buffer;
138*4882a593Smuzhiyun size_t size;
139*4882a593Smuzhiyun int i;
140*4882a593Smuzhiyun
141*4882a593Smuzhiyun size = sizeof(*ctxt);
142*4882a593Smuzhiyun size += rdma->sc_max_send_sges * sizeof(struct ib_sge);
143*4882a593Smuzhiyun ctxt = kmalloc(size, GFP_KERNEL);
144*4882a593Smuzhiyun if (!ctxt)
145*4882a593Smuzhiyun goto fail0;
146*4882a593Smuzhiyun buffer = kmalloc(rdma->sc_max_req_size, GFP_KERNEL);
147*4882a593Smuzhiyun if (!buffer)
148*4882a593Smuzhiyun goto fail1;
149*4882a593Smuzhiyun addr = ib_dma_map_single(rdma->sc_pd->device, buffer,
150*4882a593Smuzhiyun rdma->sc_max_req_size, DMA_TO_DEVICE);
151*4882a593Smuzhiyun if (ib_dma_mapping_error(rdma->sc_pd->device, addr))
152*4882a593Smuzhiyun goto fail2;
153*4882a593Smuzhiyun
154*4882a593Smuzhiyun svc_rdma_send_cid_init(rdma, &ctxt->sc_cid);
155*4882a593Smuzhiyun
156*4882a593Smuzhiyun ctxt->sc_send_wr.next = NULL;
157*4882a593Smuzhiyun ctxt->sc_send_wr.wr_cqe = &ctxt->sc_cqe;
158*4882a593Smuzhiyun ctxt->sc_send_wr.sg_list = ctxt->sc_sges;
159*4882a593Smuzhiyun ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED;
160*4882a593Smuzhiyun ctxt->sc_cqe.done = svc_rdma_wc_send;
161*4882a593Smuzhiyun ctxt->sc_xprt_buf = buffer;
162*4882a593Smuzhiyun xdr_buf_init(&ctxt->sc_hdrbuf, ctxt->sc_xprt_buf,
163*4882a593Smuzhiyun rdma->sc_max_req_size);
164*4882a593Smuzhiyun ctxt->sc_sges[0].addr = addr;
165*4882a593Smuzhiyun
166*4882a593Smuzhiyun for (i = 0; i < rdma->sc_max_send_sges; i++)
167*4882a593Smuzhiyun ctxt->sc_sges[i].lkey = rdma->sc_pd->local_dma_lkey;
168*4882a593Smuzhiyun return ctxt;
169*4882a593Smuzhiyun
170*4882a593Smuzhiyun fail2:
171*4882a593Smuzhiyun kfree(buffer);
172*4882a593Smuzhiyun fail1:
173*4882a593Smuzhiyun kfree(ctxt);
174*4882a593Smuzhiyun fail0:
175*4882a593Smuzhiyun return NULL;
176*4882a593Smuzhiyun }
177*4882a593Smuzhiyun
178*4882a593Smuzhiyun /**
179*4882a593Smuzhiyun * svc_rdma_send_ctxts_destroy - Release all send_ctxt's for an xprt
180*4882a593Smuzhiyun * @rdma: svcxprt_rdma being torn down
181*4882a593Smuzhiyun *
182*4882a593Smuzhiyun */
svc_rdma_send_ctxts_destroy(struct svcxprt_rdma * rdma)183*4882a593Smuzhiyun void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma)
184*4882a593Smuzhiyun {
185*4882a593Smuzhiyun struct svc_rdma_send_ctxt *ctxt;
186*4882a593Smuzhiyun
187*4882a593Smuzhiyun while ((ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts))) {
188*4882a593Smuzhiyun list_del(&ctxt->sc_list);
189*4882a593Smuzhiyun ib_dma_unmap_single(rdma->sc_pd->device,
190*4882a593Smuzhiyun ctxt->sc_sges[0].addr,
191*4882a593Smuzhiyun rdma->sc_max_req_size,
192*4882a593Smuzhiyun DMA_TO_DEVICE);
193*4882a593Smuzhiyun kfree(ctxt->sc_xprt_buf);
194*4882a593Smuzhiyun kfree(ctxt);
195*4882a593Smuzhiyun }
196*4882a593Smuzhiyun }
197*4882a593Smuzhiyun
198*4882a593Smuzhiyun /**
199*4882a593Smuzhiyun * svc_rdma_send_ctxt_get - Get a free send_ctxt
200*4882a593Smuzhiyun * @rdma: controlling svcxprt_rdma
201*4882a593Smuzhiyun *
202*4882a593Smuzhiyun * Returns a ready-to-use send_ctxt, or NULL if none are
203*4882a593Smuzhiyun * available and a fresh one cannot be allocated.
204*4882a593Smuzhiyun */
svc_rdma_send_ctxt_get(struct svcxprt_rdma * rdma)205*4882a593Smuzhiyun struct svc_rdma_send_ctxt *svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma)
206*4882a593Smuzhiyun {
207*4882a593Smuzhiyun struct svc_rdma_send_ctxt *ctxt;
208*4882a593Smuzhiyun
209*4882a593Smuzhiyun spin_lock(&rdma->sc_send_lock);
210*4882a593Smuzhiyun ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts);
211*4882a593Smuzhiyun if (!ctxt)
212*4882a593Smuzhiyun goto out_empty;
213*4882a593Smuzhiyun list_del(&ctxt->sc_list);
214*4882a593Smuzhiyun spin_unlock(&rdma->sc_send_lock);
215*4882a593Smuzhiyun
216*4882a593Smuzhiyun out:
217*4882a593Smuzhiyun rpcrdma_set_xdrlen(&ctxt->sc_hdrbuf, 0);
218*4882a593Smuzhiyun xdr_init_encode(&ctxt->sc_stream, &ctxt->sc_hdrbuf,
219*4882a593Smuzhiyun ctxt->sc_xprt_buf, NULL);
220*4882a593Smuzhiyun
221*4882a593Smuzhiyun ctxt->sc_send_wr.num_sge = 0;
222*4882a593Smuzhiyun ctxt->sc_cur_sge_no = 0;
223*4882a593Smuzhiyun ctxt->sc_page_count = 0;
224*4882a593Smuzhiyun return ctxt;
225*4882a593Smuzhiyun
226*4882a593Smuzhiyun out_empty:
227*4882a593Smuzhiyun spin_unlock(&rdma->sc_send_lock);
228*4882a593Smuzhiyun ctxt = svc_rdma_send_ctxt_alloc(rdma);
229*4882a593Smuzhiyun if (!ctxt)
230*4882a593Smuzhiyun return NULL;
231*4882a593Smuzhiyun goto out;
232*4882a593Smuzhiyun }
233*4882a593Smuzhiyun
234*4882a593Smuzhiyun /**
235*4882a593Smuzhiyun * svc_rdma_send_ctxt_put - Return send_ctxt to free list
236*4882a593Smuzhiyun * @rdma: controlling svcxprt_rdma
237*4882a593Smuzhiyun * @ctxt: object to return to the free list
238*4882a593Smuzhiyun *
239*4882a593Smuzhiyun * Pages left in sc_pages are DMA unmapped and released.
240*4882a593Smuzhiyun */
svc_rdma_send_ctxt_put(struct svcxprt_rdma * rdma,struct svc_rdma_send_ctxt * ctxt)241*4882a593Smuzhiyun void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma,
242*4882a593Smuzhiyun struct svc_rdma_send_ctxt *ctxt)
243*4882a593Smuzhiyun {
244*4882a593Smuzhiyun struct ib_device *device = rdma->sc_cm_id->device;
245*4882a593Smuzhiyun unsigned int i;
246*4882a593Smuzhiyun
247*4882a593Smuzhiyun /* The first SGE contains the transport header, which
248*4882a593Smuzhiyun * remains mapped until @ctxt is destroyed.
249*4882a593Smuzhiyun */
250*4882a593Smuzhiyun for (i = 1; i < ctxt->sc_send_wr.num_sge; i++) {
251*4882a593Smuzhiyun ib_dma_unmap_page(device,
252*4882a593Smuzhiyun ctxt->sc_sges[i].addr,
253*4882a593Smuzhiyun ctxt->sc_sges[i].length,
254*4882a593Smuzhiyun DMA_TO_DEVICE);
255*4882a593Smuzhiyun trace_svcrdma_dma_unmap_page(rdma,
256*4882a593Smuzhiyun ctxt->sc_sges[i].addr,
257*4882a593Smuzhiyun ctxt->sc_sges[i].length);
258*4882a593Smuzhiyun }
259*4882a593Smuzhiyun
260*4882a593Smuzhiyun for (i = 0; i < ctxt->sc_page_count; ++i)
261*4882a593Smuzhiyun put_page(ctxt->sc_pages[i]);
262*4882a593Smuzhiyun
263*4882a593Smuzhiyun spin_lock(&rdma->sc_send_lock);
264*4882a593Smuzhiyun list_add(&ctxt->sc_list, &rdma->sc_send_ctxts);
265*4882a593Smuzhiyun spin_unlock(&rdma->sc_send_lock);
266*4882a593Smuzhiyun }
267*4882a593Smuzhiyun
268*4882a593Smuzhiyun /**
269*4882a593Smuzhiyun * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
270*4882a593Smuzhiyun * @cq: Completion Queue context
271*4882a593Smuzhiyun * @wc: Work Completion object
272*4882a593Smuzhiyun *
273*4882a593Smuzhiyun * NB: The svc_xprt/svcxprt_rdma is pinned whenever it's possible that
274*4882a593Smuzhiyun * the Send completion handler could be running.
275*4882a593Smuzhiyun */
svc_rdma_wc_send(struct ib_cq * cq,struct ib_wc * wc)276*4882a593Smuzhiyun static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
277*4882a593Smuzhiyun {
278*4882a593Smuzhiyun struct svcxprt_rdma *rdma = cq->cq_context;
279*4882a593Smuzhiyun struct ib_cqe *cqe = wc->wr_cqe;
280*4882a593Smuzhiyun struct svc_rdma_send_ctxt *ctxt =
281*4882a593Smuzhiyun container_of(cqe, struct svc_rdma_send_ctxt, sc_cqe);
282*4882a593Smuzhiyun
283*4882a593Smuzhiyun trace_svcrdma_wc_send(wc, &ctxt->sc_cid);
284*4882a593Smuzhiyun
285*4882a593Smuzhiyun atomic_inc(&rdma->sc_sq_avail);
286*4882a593Smuzhiyun wake_up(&rdma->sc_send_wait);
287*4882a593Smuzhiyun
288*4882a593Smuzhiyun svc_rdma_send_ctxt_put(rdma, ctxt);
289*4882a593Smuzhiyun
290*4882a593Smuzhiyun if (unlikely(wc->status != IB_WC_SUCCESS)) {
291*4882a593Smuzhiyun set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
292*4882a593Smuzhiyun svc_xprt_enqueue(&rdma->sc_xprt);
293*4882a593Smuzhiyun }
294*4882a593Smuzhiyun }
295*4882a593Smuzhiyun
296*4882a593Smuzhiyun /**
297*4882a593Smuzhiyun * svc_rdma_send - Post a single Send WR
298*4882a593Smuzhiyun * @rdma: transport on which to post the WR
299*4882a593Smuzhiyun * @ctxt: send ctxt with a Send WR ready to post
300*4882a593Smuzhiyun *
301*4882a593Smuzhiyun * Returns zero the Send WR was posted successfully. Otherwise, a
302*4882a593Smuzhiyun * negative errno is returned.
303*4882a593Smuzhiyun */
svc_rdma_send(struct svcxprt_rdma * rdma,struct svc_rdma_send_ctxt * ctxt)304*4882a593Smuzhiyun int svc_rdma_send(struct svcxprt_rdma *rdma, struct svc_rdma_send_ctxt *ctxt)
305*4882a593Smuzhiyun {
306*4882a593Smuzhiyun struct ib_send_wr *wr = &ctxt->sc_send_wr;
307*4882a593Smuzhiyun int ret;
308*4882a593Smuzhiyun
309*4882a593Smuzhiyun might_sleep();
310*4882a593Smuzhiyun
311*4882a593Smuzhiyun /* Sync the transport header buffer */
312*4882a593Smuzhiyun ib_dma_sync_single_for_device(rdma->sc_pd->device,
313*4882a593Smuzhiyun wr->sg_list[0].addr,
314*4882a593Smuzhiyun wr->sg_list[0].length,
315*4882a593Smuzhiyun DMA_TO_DEVICE);
316*4882a593Smuzhiyun
317*4882a593Smuzhiyun /* If the SQ is full, wait until an SQ entry is available */
318*4882a593Smuzhiyun while (1) {
319*4882a593Smuzhiyun if ((atomic_dec_return(&rdma->sc_sq_avail) < 0)) {
320*4882a593Smuzhiyun atomic_inc(&rdma_stat_sq_starve);
321*4882a593Smuzhiyun trace_svcrdma_sq_full(rdma);
322*4882a593Smuzhiyun atomic_inc(&rdma->sc_sq_avail);
323*4882a593Smuzhiyun wait_event(rdma->sc_send_wait,
324*4882a593Smuzhiyun atomic_read(&rdma->sc_sq_avail) > 1);
325*4882a593Smuzhiyun if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags))
326*4882a593Smuzhiyun return -ENOTCONN;
327*4882a593Smuzhiyun trace_svcrdma_sq_retry(rdma);
328*4882a593Smuzhiyun continue;
329*4882a593Smuzhiyun }
330*4882a593Smuzhiyun
331*4882a593Smuzhiyun trace_svcrdma_post_send(ctxt);
332*4882a593Smuzhiyun ret = ib_post_send(rdma->sc_qp, wr, NULL);
333*4882a593Smuzhiyun if (ret)
334*4882a593Smuzhiyun break;
335*4882a593Smuzhiyun return 0;
336*4882a593Smuzhiyun }
337*4882a593Smuzhiyun
338*4882a593Smuzhiyun trace_svcrdma_sq_post_err(rdma, ret);
339*4882a593Smuzhiyun set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
340*4882a593Smuzhiyun wake_up(&rdma->sc_send_wait);
341*4882a593Smuzhiyun return ret;
342*4882a593Smuzhiyun }
343*4882a593Smuzhiyun
344*4882a593Smuzhiyun /**
345*4882a593Smuzhiyun * svc_rdma_encode_read_list - Encode RPC Reply's Read chunk list
346*4882a593Smuzhiyun * @sctxt: Send context for the RPC Reply
347*4882a593Smuzhiyun *
348*4882a593Smuzhiyun * Return values:
349*4882a593Smuzhiyun * On success, returns length in bytes of the Reply XDR buffer
350*4882a593Smuzhiyun * that was consumed by the Reply Read list
351*4882a593Smuzhiyun * %-EMSGSIZE on XDR buffer overflow
352*4882a593Smuzhiyun */
svc_rdma_encode_read_list(struct svc_rdma_send_ctxt * sctxt)353*4882a593Smuzhiyun static ssize_t svc_rdma_encode_read_list(struct svc_rdma_send_ctxt *sctxt)
354*4882a593Smuzhiyun {
355*4882a593Smuzhiyun /* RPC-over-RDMA version 1 replies never have a Read list. */
356*4882a593Smuzhiyun return xdr_stream_encode_item_absent(&sctxt->sc_stream);
357*4882a593Smuzhiyun }
358*4882a593Smuzhiyun
359*4882a593Smuzhiyun /**
360*4882a593Smuzhiyun * svc_rdma_encode_write_segment - Encode one Write segment
361*4882a593Smuzhiyun * @src: matching Write chunk in the RPC Call header
362*4882a593Smuzhiyun * @sctxt: Send context for the RPC Reply
363*4882a593Smuzhiyun * @remaining: remaining bytes of the payload left in the Write chunk
364*4882a593Smuzhiyun *
365*4882a593Smuzhiyun * Return values:
366*4882a593Smuzhiyun * On success, returns length in bytes of the Reply XDR buffer
367*4882a593Smuzhiyun * that was consumed by the Write segment
368*4882a593Smuzhiyun * %-EMSGSIZE on XDR buffer overflow
369*4882a593Smuzhiyun */
svc_rdma_encode_write_segment(__be32 * src,struct svc_rdma_send_ctxt * sctxt,unsigned int * remaining)370*4882a593Smuzhiyun static ssize_t svc_rdma_encode_write_segment(__be32 *src,
371*4882a593Smuzhiyun struct svc_rdma_send_ctxt *sctxt,
372*4882a593Smuzhiyun unsigned int *remaining)
373*4882a593Smuzhiyun {
374*4882a593Smuzhiyun __be32 *p;
375*4882a593Smuzhiyun const size_t len = rpcrdma_segment_maxsz * sizeof(*p);
376*4882a593Smuzhiyun u32 handle, length;
377*4882a593Smuzhiyun u64 offset;
378*4882a593Smuzhiyun
379*4882a593Smuzhiyun p = xdr_reserve_space(&sctxt->sc_stream, len);
380*4882a593Smuzhiyun if (!p)
381*4882a593Smuzhiyun return -EMSGSIZE;
382*4882a593Smuzhiyun
383*4882a593Smuzhiyun xdr_decode_rdma_segment(src, &handle, &length, &offset);
384*4882a593Smuzhiyun
385*4882a593Smuzhiyun if (*remaining < length) {
386*4882a593Smuzhiyun /* segment only partly filled */
387*4882a593Smuzhiyun length = *remaining;
388*4882a593Smuzhiyun *remaining = 0;
389*4882a593Smuzhiyun } else {
390*4882a593Smuzhiyun /* entire segment was consumed */
391*4882a593Smuzhiyun *remaining -= length;
392*4882a593Smuzhiyun }
393*4882a593Smuzhiyun xdr_encode_rdma_segment(p, handle, length, offset);
394*4882a593Smuzhiyun
395*4882a593Smuzhiyun trace_svcrdma_encode_wseg(handle, length, offset);
396*4882a593Smuzhiyun return len;
397*4882a593Smuzhiyun }
398*4882a593Smuzhiyun
399*4882a593Smuzhiyun /**
400*4882a593Smuzhiyun * svc_rdma_encode_write_chunk - Encode one Write chunk
401*4882a593Smuzhiyun * @src: matching Write chunk in the RPC Call header
402*4882a593Smuzhiyun * @sctxt: Send context for the RPC Reply
403*4882a593Smuzhiyun * @remaining: size in bytes of the payload in the Write chunk
404*4882a593Smuzhiyun *
405*4882a593Smuzhiyun * Copy a Write chunk from the Call transport header to the
406*4882a593Smuzhiyun * Reply transport header. Update each segment's length field
407*4882a593Smuzhiyun * to reflect the number of bytes written in that segment.
408*4882a593Smuzhiyun *
409*4882a593Smuzhiyun * Return values:
410*4882a593Smuzhiyun * On success, returns length in bytes of the Reply XDR buffer
411*4882a593Smuzhiyun * that was consumed by the Write chunk
412*4882a593Smuzhiyun * %-EMSGSIZE on XDR buffer overflow
413*4882a593Smuzhiyun */
svc_rdma_encode_write_chunk(__be32 * src,struct svc_rdma_send_ctxt * sctxt,unsigned int remaining)414*4882a593Smuzhiyun static ssize_t svc_rdma_encode_write_chunk(__be32 *src,
415*4882a593Smuzhiyun struct svc_rdma_send_ctxt *sctxt,
416*4882a593Smuzhiyun unsigned int remaining)
417*4882a593Smuzhiyun {
418*4882a593Smuzhiyun unsigned int i, nsegs;
419*4882a593Smuzhiyun ssize_t len, ret;
420*4882a593Smuzhiyun
421*4882a593Smuzhiyun len = 0;
422*4882a593Smuzhiyun trace_svcrdma_encode_write_chunk(remaining);
423*4882a593Smuzhiyun
424*4882a593Smuzhiyun src++;
425*4882a593Smuzhiyun ret = xdr_stream_encode_item_present(&sctxt->sc_stream);
426*4882a593Smuzhiyun if (ret < 0)
427*4882a593Smuzhiyun return -EMSGSIZE;
428*4882a593Smuzhiyun len += ret;
429*4882a593Smuzhiyun
430*4882a593Smuzhiyun nsegs = be32_to_cpup(src++);
431*4882a593Smuzhiyun ret = xdr_stream_encode_u32(&sctxt->sc_stream, nsegs);
432*4882a593Smuzhiyun if (ret < 0)
433*4882a593Smuzhiyun return -EMSGSIZE;
434*4882a593Smuzhiyun len += ret;
435*4882a593Smuzhiyun
436*4882a593Smuzhiyun for (i = nsegs; i; i--) {
437*4882a593Smuzhiyun ret = svc_rdma_encode_write_segment(src, sctxt, &remaining);
438*4882a593Smuzhiyun if (ret < 0)
439*4882a593Smuzhiyun return -EMSGSIZE;
440*4882a593Smuzhiyun src += rpcrdma_segment_maxsz;
441*4882a593Smuzhiyun len += ret;
442*4882a593Smuzhiyun }
443*4882a593Smuzhiyun
444*4882a593Smuzhiyun return len;
445*4882a593Smuzhiyun }
446*4882a593Smuzhiyun
447*4882a593Smuzhiyun /**
448*4882a593Smuzhiyun * svc_rdma_encode_write_list - Encode RPC Reply's Write chunk list
449*4882a593Smuzhiyun * @rctxt: Reply context with information about the RPC Call
450*4882a593Smuzhiyun * @sctxt: Send context for the RPC Reply
451*4882a593Smuzhiyun * @length: size in bytes of the payload in the first Write chunk
452*4882a593Smuzhiyun *
453*4882a593Smuzhiyun * The client provides a Write chunk list in the Call message. Fill
454*4882a593Smuzhiyun * in the segments in the first Write chunk in the Reply's transport
455*4882a593Smuzhiyun * header with the number of bytes consumed in each segment.
456*4882a593Smuzhiyun * Remaining chunks are returned unused.
457*4882a593Smuzhiyun *
458*4882a593Smuzhiyun * Assumptions:
459*4882a593Smuzhiyun * - Client has provided only one Write chunk
460*4882a593Smuzhiyun *
461*4882a593Smuzhiyun * Return values:
462*4882a593Smuzhiyun * On success, returns length in bytes of the Reply XDR buffer
463*4882a593Smuzhiyun * that was consumed by the Reply's Write list
464*4882a593Smuzhiyun * %-EMSGSIZE on XDR buffer overflow
465*4882a593Smuzhiyun */
466*4882a593Smuzhiyun static ssize_t
svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt * rctxt,struct svc_rdma_send_ctxt * sctxt,unsigned int length)467*4882a593Smuzhiyun svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt,
468*4882a593Smuzhiyun struct svc_rdma_send_ctxt *sctxt,
469*4882a593Smuzhiyun unsigned int length)
470*4882a593Smuzhiyun {
471*4882a593Smuzhiyun ssize_t len, ret;
472*4882a593Smuzhiyun
473*4882a593Smuzhiyun ret = svc_rdma_encode_write_chunk(rctxt->rc_write_list, sctxt, length);
474*4882a593Smuzhiyun if (ret < 0)
475*4882a593Smuzhiyun return ret;
476*4882a593Smuzhiyun len = ret;
477*4882a593Smuzhiyun
478*4882a593Smuzhiyun /* Terminate the Write list */
479*4882a593Smuzhiyun ret = xdr_stream_encode_item_absent(&sctxt->sc_stream);
480*4882a593Smuzhiyun if (ret < 0)
481*4882a593Smuzhiyun return ret;
482*4882a593Smuzhiyun
483*4882a593Smuzhiyun return len + ret;
484*4882a593Smuzhiyun }
485*4882a593Smuzhiyun
486*4882a593Smuzhiyun /**
487*4882a593Smuzhiyun * svc_rdma_encode_reply_chunk - Encode RPC Reply's Reply chunk
488*4882a593Smuzhiyun * @rctxt: Reply context with information about the RPC Call
489*4882a593Smuzhiyun * @sctxt: Send context for the RPC Reply
490*4882a593Smuzhiyun * @length: size in bytes of the payload in the Reply chunk
491*4882a593Smuzhiyun *
492*4882a593Smuzhiyun * Assumptions:
493*4882a593Smuzhiyun * - Reply can always fit in the client-provided Reply chunk
494*4882a593Smuzhiyun *
495*4882a593Smuzhiyun * Return values:
496*4882a593Smuzhiyun * On success, returns length in bytes of the Reply XDR buffer
497*4882a593Smuzhiyun * that was consumed by the Reply's Reply chunk
498*4882a593Smuzhiyun * %-EMSGSIZE on XDR buffer overflow
499*4882a593Smuzhiyun */
500*4882a593Smuzhiyun static ssize_t
svc_rdma_encode_reply_chunk(const struct svc_rdma_recv_ctxt * rctxt,struct svc_rdma_send_ctxt * sctxt,unsigned int length)501*4882a593Smuzhiyun svc_rdma_encode_reply_chunk(const struct svc_rdma_recv_ctxt *rctxt,
502*4882a593Smuzhiyun struct svc_rdma_send_ctxt *sctxt,
503*4882a593Smuzhiyun unsigned int length)
504*4882a593Smuzhiyun {
505*4882a593Smuzhiyun return svc_rdma_encode_write_chunk(rctxt->rc_reply_chunk, sctxt,
506*4882a593Smuzhiyun length);
507*4882a593Smuzhiyun }
508*4882a593Smuzhiyun
svc_rdma_dma_map_page(struct svcxprt_rdma * rdma,struct svc_rdma_send_ctxt * ctxt,struct page * page,unsigned long offset,unsigned int len)509*4882a593Smuzhiyun static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
510*4882a593Smuzhiyun struct svc_rdma_send_ctxt *ctxt,
511*4882a593Smuzhiyun struct page *page,
512*4882a593Smuzhiyun unsigned long offset,
513*4882a593Smuzhiyun unsigned int len)
514*4882a593Smuzhiyun {
515*4882a593Smuzhiyun struct ib_device *dev = rdma->sc_cm_id->device;
516*4882a593Smuzhiyun dma_addr_t dma_addr;
517*4882a593Smuzhiyun
518*4882a593Smuzhiyun dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE);
519*4882a593Smuzhiyun trace_svcrdma_dma_map_page(rdma, dma_addr, len);
520*4882a593Smuzhiyun if (ib_dma_mapping_error(dev, dma_addr))
521*4882a593Smuzhiyun goto out_maperr;
522*4882a593Smuzhiyun
523*4882a593Smuzhiyun ctxt->sc_sges[ctxt->sc_cur_sge_no].addr = dma_addr;
524*4882a593Smuzhiyun ctxt->sc_sges[ctxt->sc_cur_sge_no].length = len;
525*4882a593Smuzhiyun ctxt->sc_send_wr.num_sge++;
526*4882a593Smuzhiyun return 0;
527*4882a593Smuzhiyun
528*4882a593Smuzhiyun out_maperr:
529*4882a593Smuzhiyun return -EIO;
530*4882a593Smuzhiyun }
531*4882a593Smuzhiyun
532*4882a593Smuzhiyun /* ib_dma_map_page() is used here because svc_rdma_dma_unmap()
533*4882a593Smuzhiyun * handles DMA-unmap and it uses ib_dma_unmap_page() exclusively.
534*4882a593Smuzhiyun */
svc_rdma_dma_map_buf(struct svcxprt_rdma * rdma,struct svc_rdma_send_ctxt * ctxt,unsigned char * base,unsigned int len)535*4882a593Smuzhiyun static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma,
536*4882a593Smuzhiyun struct svc_rdma_send_ctxt *ctxt,
537*4882a593Smuzhiyun unsigned char *base,
538*4882a593Smuzhiyun unsigned int len)
539*4882a593Smuzhiyun {
540*4882a593Smuzhiyun return svc_rdma_dma_map_page(rdma, ctxt, virt_to_page(base),
541*4882a593Smuzhiyun offset_in_page(base), len);
542*4882a593Smuzhiyun }
543*4882a593Smuzhiyun
544*4882a593Smuzhiyun /**
545*4882a593Smuzhiyun * svc_rdma_pull_up_needed - Determine whether to use pull-up
546*4882a593Smuzhiyun * @rdma: controlling transport
547*4882a593Smuzhiyun * @sctxt: send_ctxt for the Send WR
548*4882a593Smuzhiyun * @rctxt: Write and Reply chunks provided by client
549*4882a593Smuzhiyun * @xdr: xdr_buf containing RPC message to transmit
550*4882a593Smuzhiyun *
551*4882a593Smuzhiyun * Returns:
552*4882a593Smuzhiyun * %true if pull-up must be used
553*4882a593Smuzhiyun * %false otherwise
554*4882a593Smuzhiyun */
svc_rdma_pull_up_needed(struct svcxprt_rdma * rdma,struct svc_rdma_send_ctxt * sctxt,const struct svc_rdma_recv_ctxt * rctxt,struct xdr_buf * xdr)555*4882a593Smuzhiyun static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
556*4882a593Smuzhiyun struct svc_rdma_send_ctxt *sctxt,
557*4882a593Smuzhiyun const struct svc_rdma_recv_ctxt *rctxt,
558*4882a593Smuzhiyun struct xdr_buf *xdr)
559*4882a593Smuzhiyun {
560*4882a593Smuzhiyun int elements;
561*4882a593Smuzhiyun
562*4882a593Smuzhiyun /* For small messages, copying bytes is cheaper than DMA mapping.
563*4882a593Smuzhiyun */
564*4882a593Smuzhiyun if (sctxt->sc_hdrbuf.len + xdr->len < RPCRDMA_PULLUP_THRESH)
565*4882a593Smuzhiyun return true;
566*4882a593Smuzhiyun
567*4882a593Smuzhiyun /* Check whether the xdr_buf has more elements than can
568*4882a593Smuzhiyun * fit in a single RDMA Send.
569*4882a593Smuzhiyun */
570*4882a593Smuzhiyun /* xdr->head */
571*4882a593Smuzhiyun elements = 1;
572*4882a593Smuzhiyun
573*4882a593Smuzhiyun /* xdr->pages */
574*4882a593Smuzhiyun if (!rctxt || !rctxt->rc_write_list) {
575*4882a593Smuzhiyun unsigned int remaining;
576*4882a593Smuzhiyun unsigned long pageoff;
577*4882a593Smuzhiyun
578*4882a593Smuzhiyun pageoff = xdr->page_base & ~PAGE_MASK;
579*4882a593Smuzhiyun remaining = xdr->page_len;
580*4882a593Smuzhiyun while (remaining) {
581*4882a593Smuzhiyun ++elements;
582*4882a593Smuzhiyun remaining -= min_t(u32, PAGE_SIZE - pageoff,
583*4882a593Smuzhiyun remaining);
584*4882a593Smuzhiyun pageoff = 0;
585*4882a593Smuzhiyun }
586*4882a593Smuzhiyun }
587*4882a593Smuzhiyun
588*4882a593Smuzhiyun /* xdr->tail */
589*4882a593Smuzhiyun if (xdr->tail[0].iov_len)
590*4882a593Smuzhiyun ++elements;
591*4882a593Smuzhiyun
592*4882a593Smuzhiyun /* assume 1 SGE is needed for the transport header */
593*4882a593Smuzhiyun return elements >= rdma->sc_max_send_sges;
594*4882a593Smuzhiyun }
595*4882a593Smuzhiyun
596*4882a593Smuzhiyun /**
597*4882a593Smuzhiyun * svc_rdma_pull_up_reply_msg - Copy Reply into a single buffer
598*4882a593Smuzhiyun * @rdma: controlling transport
599*4882a593Smuzhiyun * @sctxt: send_ctxt for the Send WR; xprt hdr is already prepared
600*4882a593Smuzhiyun * @rctxt: Write and Reply chunks provided by client
601*4882a593Smuzhiyun * @xdr: prepared xdr_buf containing RPC message
602*4882a593Smuzhiyun *
603*4882a593Smuzhiyun * The device is not capable of sending the reply directly.
604*4882a593Smuzhiyun * Assemble the elements of @xdr into the transport header buffer.
605*4882a593Smuzhiyun *
606*4882a593Smuzhiyun * Returns zero on success, or a negative errno on failure.
607*4882a593Smuzhiyun */
svc_rdma_pull_up_reply_msg(struct svcxprt_rdma * rdma,struct svc_rdma_send_ctxt * sctxt,const struct svc_rdma_recv_ctxt * rctxt,const struct xdr_buf * xdr)608*4882a593Smuzhiyun static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma,
609*4882a593Smuzhiyun struct svc_rdma_send_ctxt *sctxt,
610*4882a593Smuzhiyun const struct svc_rdma_recv_ctxt *rctxt,
611*4882a593Smuzhiyun const struct xdr_buf *xdr)
612*4882a593Smuzhiyun {
613*4882a593Smuzhiyun unsigned char *dst, *tailbase;
614*4882a593Smuzhiyun unsigned int taillen;
615*4882a593Smuzhiyun
616*4882a593Smuzhiyun dst = sctxt->sc_xprt_buf + sctxt->sc_hdrbuf.len;
617*4882a593Smuzhiyun memcpy(dst, xdr->head[0].iov_base, xdr->head[0].iov_len);
618*4882a593Smuzhiyun dst += xdr->head[0].iov_len;
619*4882a593Smuzhiyun
620*4882a593Smuzhiyun tailbase = xdr->tail[0].iov_base;
621*4882a593Smuzhiyun taillen = xdr->tail[0].iov_len;
622*4882a593Smuzhiyun if (rctxt && rctxt->rc_write_list) {
623*4882a593Smuzhiyun u32 xdrpad;
624*4882a593Smuzhiyun
625*4882a593Smuzhiyun xdrpad = xdr_pad_size(xdr->page_len);
626*4882a593Smuzhiyun if (taillen && xdrpad) {
627*4882a593Smuzhiyun tailbase += xdrpad;
628*4882a593Smuzhiyun taillen -= xdrpad;
629*4882a593Smuzhiyun }
630*4882a593Smuzhiyun } else {
631*4882a593Smuzhiyun unsigned int len, remaining;
632*4882a593Smuzhiyun unsigned long pageoff;
633*4882a593Smuzhiyun struct page **ppages;
634*4882a593Smuzhiyun
635*4882a593Smuzhiyun ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
636*4882a593Smuzhiyun pageoff = xdr->page_base & ~PAGE_MASK;
637*4882a593Smuzhiyun remaining = xdr->page_len;
638*4882a593Smuzhiyun while (remaining) {
639*4882a593Smuzhiyun len = min_t(u32, PAGE_SIZE - pageoff, remaining);
640*4882a593Smuzhiyun
641*4882a593Smuzhiyun memcpy(dst, page_address(*ppages) + pageoff, len);
642*4882a593Smuzhiyun remaining -= len;
643*4882a593Smuzhiyun dst += len;
644*4882a593Smuzhiyun pageoff = 0;
645*4882a593Smuzhiyun ppages++;
646*4882a593Smuzhiyun }
647*4882a593Smuzhiyun }
648*4882a593Smuzhiyun
649*4882a593Smuzhiyun if (taillen)
650*4882a593Smuzhiyun memcpy(dst, tailbase, taillen);
651*4882a593Smuzhiyun
652*4882a593Smuzhiyun sctxt->sc_sges[0].length += xdr->len;
653*4882a593Smuzhiyun trace_svcrdma_send_pullup(sctxt->sc_sges[0].length);
654*4882a593Smuzhiyun return 0;
655*4882a593Smuzhiyun }
656*4882a593Smuzhiyun
657*4882a593Smuzhiyun /* svc_rdma_map_reply_msg - DMA map the buffer holding RPC message
658*4882a593Smuzhiyun * @rdma: controlling transport
659*4882a593Smuzhiyun * @sctxt: send_ctxt for the Send WR
660*4882a593Smuzhiyun * @rctxt: Write and Reply chunks provided by client
661*4882a593Smuzhiyun * @xdr: prepared xdr_buf containing RPC message
662*4882a593Smuzhiyun *
663*4882a593Smuzhiyun * Load the xdr_buf into the ctxt's sge array, and DMA map each
664*4882a593Smuzhiyun * element as it is added. The Send WR's num_sge field is set.
665*4882a593Smuzhiyun *
666*4882a593Smuzhiyun * Returns zero on success, or a negative errno on failure.
667*4882a593Smuzhiyun */
svc_rdma_map_reply_msg(struct svcxprt_rdma * rdma,struct svc_rdma_send_ctxt * sctxt,const struct svc_rdma_recv_ctxt * rctxt,struct xdr_buf * xdr)668*4882a593Smuzhiyun int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
669*4882a593Smuzhiyun struct svc_rdma_send_ctxt *sctxt,
670*4882a593Smuzhiyun const struct svc_rdma_recv_ctxt *rctxt,
671*4882a593Smuzhiyun struct xdr_buf *xdr)
672*4882a593Smuzhiyun {
673*4882a593Smuzhiyun unsigned int len, remaining;
674*4882a593Smuzhiyun unsigned long page_off;
675*4882a593Smuzhiyun struct page **ppages;
676*4882a593Smuzhiyun unsigned char *base;
677*4882a593Smuzhiyun u32 xdr_pad;
678*4882a593Smuzhiyun int ret;
679*4882a593Smuzhiyun
680*4882a593Smuzhiyun /* Set up the (persistently-mapped) transport header SGE. */
681*4882a593Smuzhiyun sctxt->sc_send_wr.num_sge = 1;
682*4882a593Smuzhiyun sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len;
683*4882a593Smuzhiyun
684*4882a593Smuzhiyun /* If there is a Reply chunk, nothing follows the transport
685*4882a593Smuzhiyun * header, and we're done here.
686*4882a593Smuzhiyun */
687*4882a593Smuzhiyun if (rctxt && rctxt->rc_reply_chunk)
688*4882a593Smuzhiyun return 0;
689*4882a593Smuzhiyun
690*4882a593Smuzhiyun /* For pull-up, svc_rdma_send() will sync the transport header.
691*4882a593Smuzhiyun * No additional DMA mapping is necessary.
692*4882a593Smuzhiyun */
693*4882a593Smuzhiyun if (svc_rdma_pull_up_needed(rdma, sctxt, rctxt, xdr))
694*4882a593Smuzhiyun return svc_rdma_pull_up_reply_msg(rdma, sctxt, rctxt, xdr);
695*4882a593Smuzhiyun
696*4882a593Smuzhiyun ++sctxt->sc_cur_sge_no;
697*4882a593Smuzhiyun ret = svc_rdma_dma_map_buf(rdma, sctxt,
698*4882a593Smuzhiyun xdr->head[0].iov_base,
699*4882a593Smuzhiyun xdr->head[0].iov_len);
700*4882a593Smuzhiyun if (ret < 0)
701*4882a593Smuzhiyun return ret;
702*4882a593Smuzhiyun
703*4882a593Smuzhiyun /* If a Write chunk is present, the xdr_buf's page list
704*4882a593Smuzhiyun * is not included inline. However the Upper Layer may
705*4882a593Smuzhiyun * have added XDR padding in the tail buffer, and that
706*4882a593Smuzhiyun * should not be included inline.
707*4882a593Smuzhiyun */
708*4882a593Smuzhiyun if (rctxt && rctxt->rc_write_list) {
709*4882a593Smuzhiyun base = xdr->tail[0].iov_base;
710*4882a593Smuzhiyun len = xdr->tail[0].iov_len;
711*4882a593Smuzhiyun xdr_pad = xdr_pad_size(xdr->page_len);
712*4882a593Smuzhiyun
713*4882a593Smuzhiyun if (len && xdr_pad) {
714*4882a593Smuzhiyun base += xdr_pad;
715*4882a593Smuzhiyun len -= xdr_pad;
716*4882a593Smuzhiyun }
717*4882a593Smuzhiyun
718*4882a593Smuzhiyun goto tail;
719*4882a593Smuzhiyun }
720*4882a593Smuzhiyun
721*4882a593Smuzhiyun ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
722*4882a593Smuzhiyun page_off = xdr->page_base & ~PAGE_MASK;
723*4882a593Smuzhiyun remaining = xdr->page_len;
724*4882a593Smuzhiyun while (remaining) {
725*4882a593Smuzhiyun len = min_t(u32, PAGE_SIZE - page_off, remaining);
726*4882a593Smuzhiyun
727*4882a593Smuzhiyun ++sctxt->sc_cur_sge_no;
728*4882a593Smuzhiyun ret = svc_rdma_dma_map_page(rdma, sctxt, *ppages++,
729*4882a593Smuzhiyun page_off, len);
730*4882a593Smuzhiyun if (ret < 0)
731*4882a593Smuzhiyun return ret;
732*4882a593Smuzhiyun
733*4882a593Smuzhiyun remaining -= len;
734*4882a593Smuzhiyun page_off = 0;
735*4882a593Smuzhiyun }
736*4882a593Smuzhiyun
737*4882a593Smuzhiyun base = xdr->tail[0].iov_base;
738*4882a593Smuzhiyun len = xdr->tail[0].iov_len;
739*4882a593Smuzhiyun tail:
740*4882a593Smuzhiyun if (len) {
741*4882a593Smuzhiyun ++sctxt->sc_cur_sge_no;
742*4882a593Smuzhiyun ret = svc_rdma_dma_map_buf(rdma, sctxt, base, len);
743*4882a593Smuzhiyun if (ret < 0)
744*4882a593Smuzhiyun return ret;
745*4882a593Smuzhiyun }
746*4882a593Smuzhiyun
747*4882a593Smuzhiyun return 0;
748*4882a593Smuzhiyun }
749*4882a593Smuzhiyun
750*4882a593Smuzhiyun /* The svc_rqst and all resources it owns are released as soon as
751*4882a593Smuzhiyun * svc_rdma_sendto returns. Transfer pages under I/O to the ctxt
752*4882a593Smuzhiyun * so they are released by the Send completion handler.
753*4882a593Smuzhiyun */
svc_rdma_save_io_pages(struct svc_rqst * rqstp,struct svc_rdma_send_ctxt * ctxt)754*4882a593Smuzhiyun static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
755*4882a593Smuzhiyun struct svc_rdma_send_ctxt *ctxt)
756*4882a593Smuzhiyun {
757*4882a593Smuzhiyun int i, pages = rqstp->rq_next_page - rqstp->rq_respages;
758*4882a593Smuzhiyun
759*4882a593Smuzhiyun ctxt->sc_page_count += pages;
760*4882a593Smuzhiyun for (i = 0; i < pages; i++) {
761*4882a593Smuzhiyun ctxt->sc_pages[i] = rqstp->rq_respages[i];
762*4882a593Smuzhiyun rqstp->rq_respages[i] = NULL;
763*4882a593Smuzhiyun }
764*4882a593Smuzhiyun
765*4882a593Smuzhiyun /* Prevent svc_xprt_release from releasing pages in rq_pages */
766*4882a593Smuzhiyun rqstp->rq_next_page = rqstp->rq_respages;
767*4882a593Smuzhiyun }
768*4882a593Smuzhiyun
769*4882a593Smuzhiyun /* Prepare the portion of the RPC Reply that will be transmitted
770*4882a593Smuzhiyun * via RDMA Send. The RPC-over-RDMA transport header is prepared
771*4882a593Smuzhiyun * in sc_sges[0], and the RPC xdr_buf is prepared in following sges.
772*4882a593Smuzhiyun *
773*4882a593Smuzhiyun * Depending on whether a Write list or Reply chunk is present,
774*4882a593Smuzhiyun * the server may send all, a portion of, or none of the xdr_buf.
775*4882a593Smuzhiyun * In the latter case, only the transport header (sc_sges[0]) is
776*4882a593Smuzhiyun * transmitted.
777*4882a593Smuzhiyun *
778*4882a593Smuzhiyun * RDMA Send is the last step of transmitting an RPC reply. Pages
779*4882a593Smuzhiyun * involved in the earlier RDMA Writes are here transferred out
780*4882a593Smuzhiyun * of the rqstp and into the sctxt's page array. These pages are
781*4882a593Smuzhiyun * DMA unmapped by each Write completion, but the subsequent Send
782*4882a593Smuzhiyun * completion finally releases these pages.
783*4882a593Smuzhiyun *
784*4882a593Smuzhiyun * Assumptions:
785*4882a593Smuzhiyun * - The Reply's transport header will never be larger than a page.
786*4882a593Smuzhiyun */
svc_rdma_send_reply_msg(struct svcxprt_rdma * rdma,struct svc_rdma_send_ctxt * sctxt,const struct svc_rdma_recv_ctxt * rctxt,struct svc_rqst * rqstp)787*4882a593Smuzhiyun static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
788*4882a593Smuzhiyun struct svc_rdma_send_ctxt *sctxt,
789*4882a593Smuzhiyun const struct svc_rdma_recv_ctxt *rctxt,
790*4882a593Smuzhiyun struct svc_rqst *rqstp)
791*4882a593Smuzhiyun {
792*4882a593Smuzhiyun int ret;
793*4882a593Smuzhiyun
794*4882a593Smuzhiyun ret = svc_rdma_map_reply_msg(rdma, sctxt, rctxt, &rqstp->rq_res);
795*4882a593Smuzhiyun if (ret < 0)
796*4882a593Smuzhiyun return ret;
797*4882a593Smuzhiyun
798*4882a593Smuzhiyun svc_rdma_save_io_pages(rqstp, sctxt);
799*4882a593Smuzhiyun
800*4882a593Smuzhiyun if (rctxt->rc_inv_rkey) {
801*4882a593Smuzhiyun sctxt->sc_send_wr.opcode = IB_WR_SEND_WITH_INV;
802*4882a593Smuzhiyun sctxt->sc_send_wr.ex.invalidate_rkey = rctxt->rc_inv_rkey;
803*4882a593Smuzhiyun } else {
804*4882a593Smuzhiyun sctxt->sc_send_wr.opcode = IB_WR_SEND;
805*4882a593Smuzhiyun }
806*4882a593Smuzhiyun return svc_rdma_send(rdma, sctxt);
807*4882a593Smuzhiyun }
808*4882a593Smuzhiyun
809*4882a593Smuzhiyun /**
810*4882a593Smuzhiyun * svc_rdma_send_error_msg - Send an RPC/RDMA v1 error response
811*4882a593Smuzhiyun * @rdma: controlling transport context
812*4882a593Smuzhiyun * @sctxt: Send context for the response
813*4882a593Smuzhiyun * @rctxt: Receive context for incoming bad message
814*4882a593Smuzhiyun * @status: negative errno indicating error that occurred
815*4882a593Smuzhiyun *
816*4882a593Smuzhiyun * Given the client-provided Read, Write, and Reply chunks, the
817*4882a593Smuzhiyun * server was not able to parse the Call or form a complete Reply.
818*4882a593Smuzhiyun * Return an RDMA_ERROR message so the client can retire the RPC
819*4882a593Smuzhiyun * transaction.
820*4882a593Smuzhiyun *
821*4882a593Smuzhiyun * The caller does not have to release @sctxt. It is released by
822*4882a593Smuzhiyun * Send completion, or by this function on error.
823*4882a593Smuzhiyun */
svc_rdma_send_error_msg(struct svcxprt_rdma * rdma,struct svc_rdma_send_ctxt * sctxt,struct svc_rdma_recv_ctxt * rctxt,int status)824*4882a593Smuzhiyun void svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
825*4882a593Smuzhiyun struct svc_rdma_send_ctxt *sctxt,
826*4882a593Smuzhiyun struct svc_rdma_recv_ctxt *rctxt,
827*4882a593Smuzhiyun int status)
828*4882a593Smuzhiyun {
829*4882a593Smuzhiyun __be32 *rdma_argp = rctxt->rc_recv_buf;
830*4882a593Smuzhiyun __be32 *p;
831*4882a593Smuzhiyun
832*4882a593Smuzhiyun rpcrdma_set_xdrlen(&sctxt->sc_hdrbuf, 0);
833*4882a593Smuzhiyun xdr_init_encode(&sctxt->sc_stream, &sctxt->sc_hdrbuf,
834*4882a593Smuzhiyun sctxt->sc_xprt_buf, NULL);
835*4882a593Smuzhiyun
836*4882a593Smuzhiyun p = xdr_reserve_space(&sctxt->sc_stream,
837*4882a593Smuzhiyun rpcrdma_fixed_maxsz * sizeof(*p));
838*4882a593Smuzhiyun if (!p)
839*4882a593Smuzhiyun goto put_ctxt;
840*4882a593Smuzhiyun
841*4882a593Smuzhiyun *p++ = *rdma_argp;
842*4882a593Smuzhiyun *p++ = *(rdma_argp + 1);
843*4882a593Smuzhiyun *p++ = rdma->sc_fc_credits;
844*4882a593Smuzhiyun *p = rdma_error;
845*4882a593Smuzhiyun
846*4882a593Smuzhiyun switch (status) {
847*4882a593Smuzhiyun case -EPROTONOSUPPORT:
848*4882a593Smuzhiyun p = xdr_reserve_space(&sctxt->sc_stream, 3 * sizeof(*p));
849*4882a593Smuzhiyun if (!p)
850*4882a593Smuzhiyun goto put_ctxt;
851*4882a593Smuzhiyun
852*4882a593Smuzhiyun *p++ = err_vers;
853*4882a593Smuzhiyun *p++ = rpcrdma_version;
854*4882a593Smuzhiyun *p = rpcrdma_version;
855*4882a593Smuzhiyun trace_svcrdma_err_vers(*rdma_argp);
856*4882a593Smuzhiyun break;
857*4882a593Smuzhiyun default:
858*4882a593Smuzhiyun p = xdr_reserve_space(&sctxt->sc_stream, sizeof(*p));
859*4882a593Smuzhiyun if (!p)
860*4882a593Smuzhiyun goto put_ctxt;
861*4882a593Smuzhiyun
862*4882a593Smuzhiyun *p = err_chunk;
863*4882a593Smuzhiyun trace_svcrdma_err_chunk(*rdma_argp);
864*4882a593Smuzhiyun }
865*4882a593Smuzhiyun
866*4882a593Smuzhiyun /* Remote Invalidation is skipped for simplicity. */
867*4882a593Smuzhiyun sctxt->sc_send_wr.num_sge = 1;
868*4882a593Smuzhiyun sctxt->sc_send_wr.opcode = IB_WR_SEND;
869*4882a593Smuzhiyun sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len;
870*4882a593Smuzhiyun if (svc_rdma_send(rdma, sctxt))
871*4882a593Smuzhiyun goto put_ctxt;
872*4882a593Smuzhiyun return;
873*4882a593Smuzhiyun
874*4882a593Smuzhiyun put_ctxt:
875*4882a593Smuzhiyun svc_rdma_send_ctxt_put(rdma, sctxt);
876*4882a593Smuzhiyun }
877*4882a593Smuzhiyun
878*4882a593Smuzhiyun /**
879*4882a593Smuzhiyun * svc_rdma_sendto - Transmit an RPC reply
880*4882a593Smuzhiyun * @rqstp: processed RPC request, reply XDR already in ::rq_res
881*4882a593Smuzhiyun *
882*4882a593Smuzhiyun * Any resources still associated with @rqstp are released upon return.
883*4882a593Smuzhiyun * If no reply message was possible, the connection is closed.
884*4882a593Smuzhiyun *
885*4882a593Smuzhiyun * Returns:
886*4882a593Smuzhiyun * %0 if an RPC reply has been successfully posted,
887*4882a593Smuzhiyun * %-ENOMEM if a resource shortage occurred (connection is lost),
888*4882a593Smuzhiyun * %-ENOTCONN if posting failed (connection is lost).
889*4882a593Smuzhiyun */
svc_rdma_sendto(struct svc_rqst * rqstp)890*4882a593Smuzhiyun int svc_rdma_sendto(struct svc_rqst *rqstp)
891*4882a593Smuzhiyun {
892*4882a593Smuzhiyun struct svc_xprt *xprt = rqstp->rq_xprt;
893*4882a593Smuzhiyun struct svcxprt_rdma *rdma =
894*4882a593Smuzhiyun container_of(xprt, struct svcxprt_rdma, sc_xprt);
895*4882a593Smuzhiyun struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
896*4882a593Smuzhiyun __be32 *rdma_argp = rctxt->rc_recv_buf;
897*4882a593Smuzhiyun __be32 *wr_lst = rctxt->rc_write_list;
898*4882a593Smuzhiyun __be32 *rp_ch = rctxt->rc_reply_chunk;
899*4882a593Smuzhiyun struct xdr_buf *xdr = &rqstp->rq_res;
900*4882a593Smuzhiyun struct svc_rdma_send_ctxt *sctxt;
901*4882a593Smuzhiyun __be32 *p;
902*4882a593Smuzhiyun int ret;
903*4882a593Smuzhiyun
904*4882a593Smuzhiyun ret = -ENOTCONN;
905*4882a593Smuzhiyun if (svc_xprt_is_dead(xprt))
906*4882a593Smuzhiyun goto err0;
907*4882a593Smuzhiyun
908*4882a593Smuzhiyun ret = -ENOMEM;
909*4882a593Smuzhiyun sctxt = svc_rdma_send_ctxt_get(rdma);
910*4882a593Smuzhiyun if (!sctxt)
911*4882a593Smuzhiyun goto err0;
912*4882a593Smuzhiyun
913*4882a593Smuzhiyun p = xdr_reserve_space(&sctxt->sc_stream,
914*4882a593Smuzhiyun rpcrdma_fixed_maxsz * sizeof(*p));
915*4882a593Smuzhiyun if (!p)
916*4882a593Smuzhiyun goto err0;
917*4882a593Smuzhiyun *p++ = *rdma_argp;
918*4882a593Smuzhiyun *p++ = *(rdma_argp + 1);
919*4882a593Smuzhiyun *p++ = rdma->sc_fc_credits;
920*4882a593Smuzhiyun *p = rp_ch ? rdma_nomsg : rdma_msg;
921*4882a593Smuzhiyun
922*4882a593Smuzhiyun if (svc_rdma_encode_read_list(sctxt) < 0)
923*4882a593Smuzhiyun goto err0;
924*4882a593Smuzhiyun if (wr_lst) {
925*4882a593Smuzhiyun /* XXX: Presume the client sent only one Write chunk */
926*4882a593Smuzhiyun unsigned long offset;
927*4882a593Smuzhiyun unsigned int length;
928*4882a593Smuzhiyun
929*4882a593Smuzhiyun if (rctxt->rc_read_payload_length) {
930*4882a593Smuzhiyun offset = rctxt->rc_read_payload_offset;
931*4882a593Smuzhiyun length = rctxt->rc_read_payload_length;
932*4882a593Smuzhiyun } else {
933*4882a593Smuzhiyun offset = xdr->head[0].iov_len;
934*4882a593Smuzhiyun length = xdr->page_len;
935*4882a593Smuzhiyun }
936*4882a593Smuzhiyun ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr, offset,
937*4882a593Smuzhiyun length);
938*4882a593Smuzhiyun if (ret < 0)
939*4882a593Smuzhiyun goto err2;
940*4882a593Smuzhiyun if (svc_rdma_encode_write_list(rctxt, sctxt, length) < 0)
941*4882a593Smuzhiyun goto err0;
942*4882a593Smuzhiyun } else {
943*4882a593Smuzhiyun if (xdr_stream_encode_item_absent(&sctxt->sc_stream) < 0)
944*4882a593Smuzhiyun goto err0;
945*4882a593Smuzhiyun }
946*4882a593Smuzhiyun if (rp_ch) {
947*4882a593Smuzhiyun ret = svc_rdma_send_reply_chunk(rdma, rctxt, &rqstp->rq_res);
948*4882a593Smuzhiyun if (ret < 0)
949*4882a593Smuzhiyun goto err2;
950*4882a593Smuzhiyun if (svc_rdma_encode_reply_chunk(rctxt, sctxt, ret) < 0)
951*4882a593Smuzhiyun goto err0;
952*4882a593Smuzhiyun } else {
953*4882a593Smuzhiyun if (xdr_stream_encode_item_absent(&sctxt->sc_stream) < 0)
954*4882a593Smuzhiyun goto err0;
955*4882a593Smuzhiyun }
956*4882a593Smuzhiyun
957*4882a593Smuzhiyun ret = svc_rdma_send_reply_msg(rdma, sctxt, rctxt, rqstp);
958*4882a593Smuzhiyun if (ret < 0)
959*4882a593Smuzhiyun goto err1;
960*4882a593Smuzhiyun return 0;
961*4882a593Smuzhiyun
962*4882a593Smuzhiyun err2:
963*4882a593Smuzhiyun if (ret != -E2BIG && ret != -EINVAL)
964*4882a593Smuzhiyun goto err1;
965*4882a593Smuzhiyun
966*4882a593Smuzhiyun /* Send completion releases payload pages that were part
967*4882a593Smuzhiyun * of previously posted RDMA Writes.
968*4882a593Smuzhiyun */
969*4882a593Smuzhiyun svc_rdma_save_io_pages(rqstp, sctxt);
970*4882a593Smuzhiyun svc_rdma_send_error_msg(rdma, sctxt, rctxt, ret);
971*4882a593Smuzhiyun return 0;
972*4882a593Smuzhiyun
973*4882a593Smuzhiyun err1:
974*4882a593Smuzhiyun svc_rdma_send_ctxt_put(rdma, sctxt);
975*4882a593Smuzhiyun err0:
976*4882a593Smuzhiyun trace_svcrdma_send_err(rqstp, ret);
977*4882a593Smuzhiyun set_bit(XPT_CLOSE, &xprt->xpt_flags);
978*4882a593Smuzhiyun return -ENOTCONN;
979*4882a593Smuzhiyun }
980*4882a593Smuzhiyun
981*4882a593Smuzhiyun /**
982*4882a593Smuzhiyun * svc_rdma_read_payload - special processing for a READ payload
983*4882a593Smuzhiyun * @rqstp: svc_rqst to operate on
984*4882a593Smuzhiyun * @offset: payload's byte offset in @xdr
985*4882a593Smuzhiyun * @length: size of payload, in bytes
986*4882a593Smuzhiyun *
987*4882a593Smuzhiyun * Returns zero on success.
988*4882a593Smuzhiyun *
989*4882a593Smuzhiyun * For the moment, just record the xdr_buf location of the READ
990*4882a593Smuzhiyun * payload. svc_rdma_sendto will use that location later when
991*4882a593Smuzhiyun * we actually send the payload.
992*4882a593Smuzhiyun */
svc_rdma_read_payload(struct svc_rqst * rqstp,unsigned int offset,unsigned int length)993*4882a593Smuzhiyun int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset,
994*4882a593Smuzhiyun unsigned int length)
995*4882a593Smuzhiyun {
996*4882a593Smuzhiyun struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
997*4882a593Smuzhiyun
998*4882a593Smuzhiyun /* XXX: Just one READ payload slot for now, since our
999*4882a593Smuzhiyun * transport implementation currently supports only one
1000*4882a593Smuzhiyun * Write chunk.
1001*4882a593Smuzhiyun */
1002*4882a593Smuzhiyun rctxt->rc_read_payload_offset = offset;
1003*4882a593Smuzhiyun rctxt->rc_read_payload_length = length;
1004*4882a593Smuzhiyun
1005*4882a593Smuzhiyun return 0;
1006*4882a593Smuzhiyun }
1007