Skip to content

Commit 5d252f9

Browse files
chuckleverdledford
authored andcommitted
svcrdma: Add class for RDMA backwards direction transport
To support the server-side of an NFSv4.1 backchannel on RDMA connections, add a transport class that enables backward direction messages on an existing forward channel connection. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Acked-by: Bruce Fields <bfields@fieldses.org> Signed-off-by: Doug Ledford <dledford@redhat.com>
1 parent 03fe993 commit 5d252f9

File tree

8 files changed

+475
-15
lines changed

8 files changed

+475
-15
lines changed

include/linux/sunrpc/svc_rdma.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,11 @@ struct svcxprt_rdma {
195195

196196
#define RPCSVC_MAXPAYLOAD_RDMA RPCSVC_MAXPAYLOAD
197197

198+
/* svc_rdma_backchannel.c */
199+
extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt,
200+
struct rpcrdma_msg *rmsgp,
201+
struct xdr_buf *rcvbuf);
202+
198203
/* svc_rdma_marshal.c */
199204
extern int svc_rdma_xdr_decode_req(struct rpcrdma_msg **, struct svc_rqst *);
200205
extern int svc_rdma_xdr_encode_error(struct svcxprt_rdma *,

net/sunrpc/xprt.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1425,3 +1425,4 @@ void xprt_put(struct rpc_xprt *xprt)
14251425
if (atomic_dec_and_test(&xprt->count))
14261426
xprt_destroy(xprt);
14271427
}
1428+
EXPORT_SYMBOL_GPL(xprt_put);

net/sunrpc/xprtrdma/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ obj-$(CONFIG_SUNRPC_XPRT_RDMA) += rpcrdma.o
22

33
rpcrdma-y := transport.o rpc_rdma.o verbs.o \
44
fmr_ops.o frwr_ops.o physical_ops.o \
5-
svc_rdma.o svc_rdma_transport.o \
5+
svc_rdma.o svc_rdma_backchannel.o svc_rdma_transport.o \
66
svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \
77
module.o
88
rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o
Lines changed: 371 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,371 @@
1+
/*
2+
* Copyright (c) 2015 Oracle. All rights reserved.
3+
*
4+
* Support for backward direction RPCs on RPC/RDMA (server-side).
5+
*/
6+
7+
#include <linux/sunrpc/svc_rdma.h>
8+
#include "xprt_rdma.h"
9+
10+
#define RPCDBG_FACILITY RPCDBG_SVCXPRT
11+
12+
#undef SVCRDMA_BACKCHANNEL_DEBUG
13+
14+
int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, struct rpcrdma_msg *rmsgp,
15+
struct xdr_buf *rcvbuf)
16+
{
17+
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
18+
struct kvec *dst, *src = &rcvbuf->head[0];
19+
struct rpc_rqst *req;
20+
unsigned long cwnd;
21+
u32 credits;
22+
size_t len;
23+
__be32 xid;
24+
__be32 *p;
25+
int ret;
26+
27+
p = (__be32 *)src->iov_base;
28+
len = src->iov_len;
29+
xid = rmsgp->rm_xid;
30+
31+
#ifdef SVCRDMA_BACKCHANNEL_DEBUG
32+
pr_info("%s: xid=%08x, length=%zu\n",
33+
__func__, be32_to_cpu(xid), len);
34+
pr_info("%s: RPC/RDMA: %*ph\n",
35+
__func__, (int)RPCRDMA_HDRLEN_MIN, rmsgp);
36+
pr_info("%s: RPC: %*ph\n",
37+
__func__, (int)len, p);
38+
#endif
39+
40+
ret = -EAGAIN;
41+
if (src->iov_len < 24)
42+
goto out_shortreply;
43+
44+
spin_lock_bh(&xprt->transport_lock);
45+
req = xprt_lookup_rqst(xprt, xid);
46+
if (!req)
47+
goto out_notfound;
48+
49+
dst = &req->rq_private_buf.head[0];
50+
memcpy(&req->rq_private_buf, &req->rq_rcv_buf, sizeof(struct xdr_buf));
51+
if (dst->iov_len < len)
52+
goto out_unlock;
53+
memcpy(dst->iov_base, p, len);
54+
55+
credits = be32_to_cpu(rmsgp->rm_credit);
56+
if (credits == 0)
57+
credits = 1; /* don't deadlock */
58+
else if (credits > r_xprt->rx_buf.rb_bc_max_requests)
59+
credits = r_xprt->rx_buf.rb_bc_max_requests;
60+
61+
cwnd = xprt->cwnd;
62+
xprt->cwnd = credits << RPC_CWNDSHIFT;
63+
if (xprt->cwnd > cwnd)
64+
xprt_release_rqst_cong(req->rq_task);
65+
66+
ret = 0;
67+
xprt_complete_rqst(req->rq_task, rcvbuf->len);
68+
rcvbuf->len = 0;
69+
70+
out_unlock:
71+
spin_unlock_bh(&xprt->transport_lock);
72+
out:
73+
return ret;
74+
75+
out_shortreply:
76+
dprintk("svcrdma: short bc reply: xprt=%p, len=%zu\n",
77+
xprt, src->iov_len);
78+
goto out;
79+
80+
out_notfound:
81+
dprintk("svcrdma: unrecognized bc reply: xprt=%p, xid=%08x\n",
82+
xprt, be32_to_cpu(xid));
83+
84+
goto out_unlock;
85+
}
86+
87+
/* Send a backwards direction RPC call.
88+
*
89+
* Caller holds the connection's mutex and has already marshaled
90+
* the RPC/RDMA request.
91+
*
92+
* This is similar to svc_rdma_reply, but takes an rpc_rqst
93+
* instead, does not support chunks, and avoids blocking memory
94+
* allocation.
95+
*
96+
* XXX: There is still an opportunity to block in svc_rdma_send()
97+
* if there are no SQ entries to post the Send. This may occur if
98+
* the adapter has a small maximum SQ depth.
99+
*/
100+
static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
101+
struct rpc_rqst *rqst)
102+
{
103+
struct xdr_buf *sndbuf = &rqst->rq_snd_buf;
104+
struct svc_rdma_op_ctxt *ctxt;
105+
struct svc_rdma_req_map *vec;
106+
struct ib_send_wr send_wr;
107+
int ret;
108+
109+
vec = svc_rdma_get_req_map(rdma);
110+
ret = svc_rdma_map_xdr(rdma, sndbuf, vec);
111+
if (ret)
112+
goto out_err;
113+
114+
/* Post a recv buffer to handle the reply for this request. */
115+
ret = svc_rdma_post_recv(rdma, GFP_NOIO);
116+
if (ret) {
117+
pr_err("svcrdma: Failed to post bc receive buffer, err=%d.\n",
118+
ret);
119+
pr_err("svcrdma: closing transport %p.\n", rdma);
120+
set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
121+
ret = -ENOTCONN;
122+
goto out_err;
123+
}
124+
125+
ctxt = svc_rdma_get_context(rdma);
126+
ctxt->pages[0] = virt_to_page(rqst->rq_buffer);
127+
ctxt->count = 1;
128+
129+
ctxt->wr_op = IB_WR_SEND;
130+
ctxt->direction = DMA_TO_DEVICE;
131+
ctxt->sge[0].lkey = rdma->sc_dma_lkey;
132+
ctxt->sge[0].length = sndbuf->len;
133+
ctxt->sge[0].addr =
134+
ib_dma_map_page(rdma->sc_cm_id->device, ctxt->pages[0], 0,
135+
sndbuf->len, DMA_TO_DEVICE);
136+
if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr)) {
137+
ret = -EIO;
138+
goto out_unmap;
139+
}
140+
atomic_inc(&rdma->sc_dma_used);
141+
142+
memset(&send_wr, 0, sizeof(send_wr));
143+
send_wr.wr_id = (unsigned long)ctxt;
144+
send_wr.sg_list = ctxt->sge;
145+
send_wr.num_sge = 1;
146+
send_wr.opcode = IB_WR_SEND;
147+
send_wr.send_flags = IB_SEND_SIGNALED;
148+
149+
ret = svc_rdma_send(rdma, &send_wr);
150+
if (ret) {
151+
ret = -EIO;
152+
goto out_unmap;
153+
}
154+
155+
out_err:
156+
svc_rdma_put_req_map(rdma, vec);
157+
dprintk("svcrdma: %s returns %d\n", __func__, ret);
158+
return ret;
159+
160+
out_unmap:
161+
svc_rdma_unmap_dma(ctxt);
162+
svc_rdma_put_context(ctxt, 1);
163+
goto out_err;
164+
}
165+
166+
/* Server-side transport endpoint wants a whole page for its send
167+
* buffer. The client RPC code constructs the RPC header in this
168+
* buffer before it invokes ->send_request.
169+
*
170+
* Returns NULL if there was a temporary allocation failure.
171+
*/
172+
static void *
173+
xprt_rdma_bc_allocate(struct rpc_task *task, size_t size)
174+
{
175+
struct rpc_rqst *rqst = task->tk_rqstp;
176+
struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt;
177+
struct svcxprt_rdma *rdma;
178+
struct page *page;
179+
180+
rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt);
181+
182+
/* Prevent an infinite loop: try to make this case work */
183+
if (size > PAGE_SIZE)
184+
WARN_ONCE(1, "svcrdma: large bc buffer request (size %zu)\n",
185+
size);
186+
187+
page = alloc_page(RPCRDMA_DEF_GFP);
188+
if (!page)
189+
return NULL;
190+
191+
return page_address(page);
192+
}
193+
194+
static void
195+
xprt_rdma_bc_free(void *buffer)
196+
{
197+
/* No-op: ctxt and page have already been freed. */
198+
}
199+
200+
static int
201+
rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
202+
{
203+
struct rpc_xprt *xprt = rqst->rq_xprt;
204+
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
205+
struct rpcrdma_msg *headerp = (struct rpcrdma_msg *)rqst->rq_buffer;
206+
int rc;
207+
208+
/* Space in the send buffer for an RPC/RDMA header is reserved
209+
* via xprt->tsh_size.
210+
*/
211+
headerp->rm_xid = rqst->rq_xid;
212+
headerp->rm_vers = rpcrdma_version;
213+
headerp->rm_credit = cpu_to_be32(r_xprt->rx_buf.rb_bc_max_requests);
214+
headerp->rm_type = rdma_msg;
215+
headerp->rm_body.rm_chunks[0] = xdr_zero;
216+
headerp->rm_body.rm_chunks[1] = xdr_zero;
217+
headerp->rm_body.rm_chunks[2] = xdr_zero;
218+
219+
#ifdef SVCRDMA_BACKCHANNEL_DEBUG
220+
pr_info("%s: %*ph\n", __func__, 64, rqst->rq_buffer);
221+
#endif
222+
223+
rc = svc_rdma_bc_sendto(rdma, rqst);
224+
if (rc)
225+
goto drop_connection;
226+
return rc;
227+
228+
drop_connection:
229+
dprintk("svcrdma: failed to send bc call\n");
230+
xprt_disconnect_done(xprt);
231+
return -ENOTCONN;
232+
}
233+
234+
/* Send an RPC call on the passive end of a transport
235+
* connection.
236+
*/
237+
static int
238+
xprt_rdma_bc_send_request(struct rpc_task *task)
239+
{
240+
struct rpc_rqst *rqst = task->tk_rqstp;
241+
struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt;
242+
struct svcxprt_rdma *rdma;
243+
int ret;
244+
245+
dprintk("svcrdma: sending bc call with xid: %08x\n",
246+
be32_to_cpu(rqst->rq_xid));
247+
248+
if (!mutex_trylock(&sxprt->xpt_mutex)) {
249+
rpc_sleep_on(&sxprt->xpt_bc_pending, task, NULL);
250+
if (!mutex_trylock(&sxprt->xpt_mutex))
251+
return -EAGAIN;
252+
rpc_wake_up_queued_task(&sxprt->xpt_bc_pending, task);
253+
}
254+
255+
ret = -ENOTCONN;
256+
rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt);
257+
if (!test_bit(XPT_DEAD, &sxprt->xpt_flags))
258+
ret = rpcrdma_bc_send_request(rdma, rqst);
259+
260+
mutex_unlock(&sxprt->xpt_mutex);
261+
262+
if (ret < 0)
263+
return ret;
264+
return 0;
265+
}
266+
267+
static void
268+
xprt_rdma_bc_close(struct rpc_xprt *xprt)
269+
{
270+
dprintk("svcrdma: %s: xprt %p\n", __func__, xprt);
271+
}
272+
273+
static void
274+
xprt_rdma_bc_put(struct rpc_xprt *xprt)
275+
{
276+
dprintk("svcrdma: %s: xprt %p\n", __func__, xprt);
277+
278+
xprt_free(xprt);
279+
module_put(THIS_MODULE);
280+
}
281+
282+
static struct rpc_xprt_ops xprt_rdma_bc_procs = {
283+
.reserve_xprt = xprt_reserve_xprt_cong,
284+
.release_xprt = xprt_release_xprt_cong,
285+
.alloc_slot = xprt_alloc_slot,
286+
.release_request = xprt_release_rqst_cong,
287+
.buf_alloc = xprt_rdma_bc_allocate,
288+
.buf_free = xprt_rdma_bc_free,
289+
.send_request = xprt_rdma_bc_send_request,
290+
.set_retrans_timeout = xprt_set_retrans_timeout_def,
291+
.close = xprt_rdma_bc_close,
292+
.destroy = xprt_rdma_bc_put,
293+
.print_stats = xprt_rdma_print_stats
294+
};
295+
296+
static const struct rpc_timeout xprt_rdma_bc_timeout = {
297+
.to_initval = 60 * HZ,
298+
.to_maxval = 60 * HZ,
299+
};
300+
301+
/* It shouldn't matter if the number of backchannel session slots
302+
* doesn't match the number of RPC/RDMA credits. That just means
303+
* one or the other will have extra slots that aren't used.
304+
*/
305+
static struct rpc_xprt *
306+
xprt_setup_rdma_bc(struct xprt_create *args)
307+
{
308+
struct rpc_xprt *xprt;
309+
struct rpcrdma_xprt *new_xprt;
310+
311+
if (args->addrlen > sizeof(xprt->addr)) {
312+
dprintk("RPC: %s: address too large\n", __func__);
313+
return ERR_PTR(-EBADF);
314+
}
315+
316+
xprt = xprt_alloc(args->net, sizeof(*new_xprt),
317+
RPCRDMA_MAX_BC_REQUESTS,
318+
RPCRDMA_MAX_BC_REQUESTS);
319+
if (!xprt) {
320+
dprintk("RPC: %s: couldn't allocate rpc_xprt\n",
321+
__func__);
322+
return ERR_PTR(-ENOMEM);
323+
}
324+
325+
xprt->timeout = &xprt_rdma_bc_timeout;
326+
xprt_set_bound(xprt);
327+
xprt_set_connected(xprt);
328+
xprt->bind_timeout = RPCRDMA_BIND_TO;
329+
xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
330+
xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;
331+
332+
xprt->prot = XPRT_TRANSPORT_BC_RDMA;
333+
xprt->tsh_size = RPCRDMA_HDRLEN_MIN / sizeof(__be32);
334+
xprt->ops = &xprt_rdma_bc_procs;
335+
336+
memcpy(&xprt->addr, args->dstaddr, args->addrlen);
337+
xprt->addrlen = args->addrlen;
338+
xprt_rdma_format_addresses(xprt, (struct sockaddr *)&xprt->addr);
339+
xprt->resvport = 0;
340+
341+
xprt->max_payload = xprt_rdma_max_inline_read;
342+
343+
new_xprt = rpcx_to_rdmax(xprt);
344+
new_xprt->rx_buf.rb_bc_max_requests = xprt->max_reqs;
345+
346+
xprt_get(xprt);
347+
args->bc_xprt->xpt_bc_xprt = xprt;
348+
xprt->bc_xprt = args->bc_xprt;
349+
350+
if (!try_module_get(THIS_MODULE))
351+
goto out_fail;
352+
353+
/* Final put for backchannel xprt is in __svc_rdma_free */
354+
xprt_get(xprt);
355+
return xprt;
356+
357+
out_fail:
358+
xprt_rdma_free_addresses(xprt);
359+
args->bc_xprt->xpt_bc_xprt = NULL;
360+
xprt_put(xprt);
361+
xprt_free(xprt);
362+
return ERR_PTR(-EINVAL);
363+
}
364+
365+
struct xprt_class xprt_rdma_bc = {
366+
.list = LIST_HEAD_INIT(xprt_rdma_bc.list),
367+
.name = "rdma backchannel",
368+
.owner = THIS_MODULE,
369+
.ident = XPRT_TRANSPORT_BC_RDMA,
370+
.setup = xprt_setup_rdma_bc,
371+
};

0 commit comments

Comments
 (0)