[pnfs] [PATCH] xs_tcp_read_request: unlock transport_lock only for RPC_REPLY
Benny Halevy
bhalevy at panasas.com
Wed Nov 28 10:57:13 EST 2007
On Nov. 28, 2007, 16:56 +0200, Trond Myklebust <trond.myklebust at fys.uio.no> wrote:
> On Wed, 2007-11-28 at 16:22 +0200, Benny Halevy wrote:
>> The lock is taken only in the (calldir == RPC_REPLY) path
>>
>> Signed-off-by: Benny Halevy <bhalevy at panasas.com>
>> ---
>> net/sunrpc/xprtsock.c | 6 ++++--
>> 1 files changed, 4 insertions(+), 2 deletions(-)
>>
>> diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
>> index 883a134..3247521 100644
>> --- a/net/sunrpc/xprtsock.c
>> +++ b/net/sunrpc/xprtsock.c
>> @@ -1065,13 +1065,15 @@ out:
>> wake_up(&xprt->serv->sv_cb_waitq);
>> }
>> }
>> - spin_unlock(&xprt->transport_lock);
>> + if (calldir == RPC_REPLY)
>> + spin_unlock(&xprt->transport_lock);
>> xs_tcp_check_fraghdr(transport);
>> return;
>>
>> error:
>> transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
>> - spin_unlock(&xprt->transport_lock);
>> + if (calldir == RPC_REPLY)
>> + spin_unlock(&xprt->transport_lock);
>> return;
>> }
>
> This sort of conditional spinlock leaves my stomach quesy. I think it
> would be much better to split the routine up. Move the common code into
> one routine, then define xs_tcp_read_reply() (which takes the spinlock,
> calls xprt_lookup_rqst() and then calls the common code) to handle the
> RPC_REPLY case and xs_tcp_read_callback() (which allocates the request
> buffer, then calls the common routine) to handles the RPC_CALL case.
>
> Cheers
> Trond
>
Something like the following?
notes:
1. transport->tcp_flags &= ~TCP_RCV_COPY_DATA is done outside the transport_lock
in the error case. I think that's ok since it seems like the lock is protecting
the request handling in the xprt_lookup_rqst .. xprt_complete_rqst section
rather than transport which should be handled single threadedly.
(please correct me otherwise it's needed also for the callback request case)
2. previously the common code would have called xprt_complete_rqst()
in the r != len case. This seems like a bug to me as the request will be waken
up with a partially read reply and it could contain junk.
This implementation returns an error internally and drops the reply.
3. common tail handling is done in the main function based on
xs_tcp_read_{reply,callback}'s status.
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 883a134..c783ae2 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -956,43 +956,13 @@ static inline void xs_tcp_read_calldir(struct sock_xprt *transport, struct xdr_s
xs_tcp_check_fraghdr(transport);
}
-static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
+static inline int xs_tcp_read_common(struct rpc_xprt *xprt,
+ struct xdr_skb_reader *desc, struct rpc_rqst *req)
{
struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
- struct rpc_rqst *req;
struct xdr_buf *rcvbuf;
size_t len;
ssize_t r;
- u32 calldir;
-
- calldir = ntohl(transport->tcp_calldir);
- if (calldir == RPC_REPLY) {
- /* Find and lock the request corresponding to this xid */
- spin_lock(&xprt->transport_lock);
- req = xprt_lookup_rqst(xprt, transport->tcp_xid);
- if (!req) {
- dprintk("RPC: XID %08x request not found!\n",
- ntohl(transport->tcp_xid));
- goto error;
- }
- } else {
- /* RPC_CALL */
- if (xprt->bc_mempool) {
- req = xprt_alloc_bc_request(xprt);
- req->rq_xid = transport->tcp_xid;
- } else {
- dprintk("RPC: Unexpected callback dropped\n");
- goto error;
- }
- if (req == NULL) {
- /*
- * Drop the callback.
- * XXX Should we instead terminate the connection?
- */
- dprintk("RPC: Couldn't get rpc_rqst for the callback! Dropping callback...\n");
- goto error;
- }
- }
rcvbuf = &req->rq_private_buf;
len = desc->count;
@@ -1024,14 +994,13 @@ static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_rea
* Any remaining data from this record will
* be discarded.
*/
- transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
dprintk("RPC: XID %08x truncated request\n",
ntohl(transport->tcp_xid));
dprintk("RPC: xprt = %p, tcp_copied = %lu, "
"tcp_offset = %u, tcp_reclen = %u\n",
xprt, transport->tcp_copied,
transport->tcp_offset, transport->tcp_reclen);
- goto out;
+ return -1;
}
dprintk("RPC: XID %08x read %Zd bytes\n",
@@ -1046,33 +1015,89 @@ static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_rea
if (transport->tcp_flags & TCP_RCV_LAST_FRAG)
transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
}
+ return 0;
+}
-out:
- if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) {
- if (calldir == RPC_REPLY) {
- xprt_complete_rqst(req->rq_task, transport->tcp_copied);
- } else {
- /*
- * Add callback request to callback list. The callback
- * service sleeps on the sv_cb_waitq waiting for new
- * requests. Wake it up after adding enqueing the
- * request.
- */
- dprintk("RPC: add callback request to list\n");
- spin_lock(&xprt->serv->sv_cb_lock);
- list_add(&req->rq_list, &xprt->serv->sv_cb_list);
- spin_unlock(&xprt->serv->sv_cb_lock);
- wake_up(&xprt->serv->sv_cb_waitq);
- }
+static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
+ struct xdr_skb_reader *desc)
+{
+ struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
+ struct rpc_rqst *req;
+
+ /* Find and lock the request corresponding to this xid */
+ spin_lock(&xprt->transport_lock);
+ req = xprt_lookup_rqst(xprt, transport->tcp_xid);
+ if (!req) {
+ dprintk("RPC: XID %08x request not found!\n",
+ ntohl(transport->tcp_xid));
+ spin_unlock(&xprt->transport_lock);
+ return -1;
}
- spin_unlock(&xprt->transport_lock);
- xs_tcp_check_fraghdr(transport);
- return;
-error:
- transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
+ if (xs_tcp_read_common(xprt, desc, req))
+ return -1;
+
+ if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
+ xprt_complete_rqst(req->rq_task, transport->tcp_copied);
spin_unlock(&xprt->transport_lock);
- return;
+ return 0;
+}
+
+static inline int xs_tcp_read_callback(struct rpc_xprt *xprt,
+ struct xdr_skb_reader *desc)
+{
+ struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
+ struct rpc_rqst *req;
+
+ if (xprt->bc_mempool) {
+ req = xprt_alloc_bc_request(xprt);
+ req->rq_xid = transport->tcp_xid;
+ } else {
+ dprintk("RPC: Unexpected callback dropped\n");
+ return -1;
+ }
+ if (req == NULL) {
+ /*
+ * Drop the callback.
+ * XXX Should we instead terminate the connection?
+ */
+ dprintk("RPC: Couldn't get rpc_rqst for the callback! Dropping callback...\n");
+ return -1;
+ }
+
+ if (xs_tcp_read_common(xprt, desc, req))
+ return -1;
+
+ if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) {
+ /*
+ * Add callback request to callback list. The callback
+ * service sleeps on the sv_cb_waitq waiting for new
+ * requests. Wake it up after adding enqueing the
+ * request.
+ */
+ dprintk("RPC: add callback request to list\n");
+ spin_lock(&xprt->serv->sv_cb_lock);
+ list_add(&req->rq_list, &xprt->serv->sv_cb_list);
+ spin_unlock(&xprt->serv->sv_cb_lock);
+ wake_up(&xprt->serv->sv_cb_waitq);
+ }
+ return 0;
+}
+
+static inline void xs_tcp_read_data(struct rpc_xprt *xprt,
+ struct xdr_skb_reader *desc)
+{
+ struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
+ int status;
+
+ status = (ntohl(transport->tcp_calldir) == RPC_REPLY) ?
+ xs_tcp_read_reply(xprt, desc) :
+ xs_tcp_read_callback(xprt, desc);
+
+ if (!status)
+ xs_tcp_check_fraghdr(transport);
+ else
+ transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
}
static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
@@ -1117,9 +1142,9 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns
xs_tcp_read_calldir(transport, &desc);
continue;
}
- /* Read in the request data */
+ /* Read in the operation data */
if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
- xs_tcp_read_request(xprt, &desc);
+ xs_tcp_read_data(xprt, &desc);
continue;
}
/* Skip over any trailing bytes on short reads */
More information about the pNFS
mailing list