[pnfs] [PATCH 09/10] nfs41: New xs_tcp_read_data()

Ricardo Labiaga ricardo.labiaga at netapp.com
Wed Jan 2 20:11:27 EST 2008


On Wed, 2008-01-02 at 19:28 -0500, J. Bruce Fields wrote:
> On Wed, Jan 02, 2008 at 04:01:50PM -0800, Ricardo Labiaga wrote:
> > Handles RPC replies and backchannel callbacks.  Traditionally the NFS
> > client has only expected RPC replies on its open connections.  With
> 
> Replacing "only expected" by "expected only" might make that a little
> clearer.
> 

Sounds good, will do.

- ricardo


> --b.
> 
> > NFSv4.1, callbacks can arrive over an existing open connection.
> > 
> > This patch refactors the old xs_tcp_read_request() into an RPC reply handler:
> > xs_tcp_read_reply(), a new backchannel callback handler: xs_tcp_read_callback(),
> > and a common routine to read the data off the transport: xs_tcp_read_common().
> > The new xs_tcp_read_callback() queues callback requests onto a queue where
> > the callback service (a separate thread) is listening for the processing.
> > 
> > This patch incorporates work and suggestions from Rahul Iyer (iyer at netapp.com)
> > and Benny Halevy (bhalevy at panasas.com).
> > 
> > Signed-off-by: Ricardo Labiaga <ricardo.labiaga at netapp.com>
> > ---
> >  net/sunrpc/xprtsock.c |  130 ++++++++++++++++++++++++++++++++++++++++++-------
> >  1 files changed, 112 insertions(+), 18 deletions(-)
> > 
> > diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
> > index 703b9ed..9b35536 100644
> > --- a/net/sunrpc/xprtsock.c
> > +++ b/net/sunrpc/xprtsock.c
> > @@ -35,6 +35,9 @@
> >  #include <linux/sunrpc/svcsock.h>
> >  #include <linux/sunrpc/xprtsock.h>
> >  #include <linux/file.h>
> > +#ifdef CONFIG_NFS_V4_1
> > +#include <linux/sunrpc/bc_xprt.h>
> > +#endif
> >  
> >  #include <net/sock.h>
> >  #include <net/checksum.h>
> > @@ -953,25 +956,16 @@ static inline void xs_tcp_read_calldir(struct sock_xprt *transport, struct xdr_s
> >  	xs_tcp_check_fraghdr(transport);
> >  }
> >  
> > -static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
> > +static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
> > +				     struct xdr_skb_reader *desc,
> > +				     struct rpc_rqst *req)
> >  {
> > -	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
> > -	struct rpc_rqst *req;
> > +	struct sock_xprt *transport =
> > +				container_of(xprt, struct sock_xprt, xprt);
> >  	struct xdr_buf *rcvbuf;
> >  	size_t len;
> >  	ssize_t r;
> >  
> > -	/* Find and lock the request corresponding to this xid */
> > -	spin_lock(&xprt->transport_lock);
> > -	req = xprt_lookup_rqst(xprt, transport->tcp_xid);
> > -	if (!req) {
> > -		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
> > -		dprintk("RPC:       XID %08x request not found!\n",
> > -				ntohl(transport->tcp_xid));
> > -		spin_unlock(&xprt->transport_lock);
> > -		return;
> > -	}
> > -
> >  	rcvbuf = &req->rq_private_buf;
> >  	len = desc->count;
> >  	if (len > transport->tcp_reclen - transport->tcp_offset) {
> > @@ -1009,7 +1003,7 @@ static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_rea
> >  				"tcp_offset = %u, tcp_reclen = %u\n",
> >  				xprt, transport->tcp_copied,
> >  				transport->tcp_offset, transport->tcp_reclen);
> > -		goto out;
> > +		return;
> >  	}
> >  
> >  	dprintk("RPC:       XID %08x read %Zd bytes\n",
> > @@ -1025,11 +1019,111 @@ static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_rea
> >  			transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
> >  	}
> >  
> > -out:
> > +	return;
> > +}
> > +
> > +/*
> > + * Finds the request corresponding to the RPC xid and invokes the common
> > + * tcp read code to read the data.
> > + */
> > +static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
> > +				    struct xdr_skb_reader *desc)
> > +{
> > +	struct sock_xprt *transport =
> > +				container_of(xprt, struct sock_xprt, xprt);
> > +	struct rpc_rqst *req;
> > +
> > +	dprintk("RPC:       read reply XID %08x\n", ntohl(transport->tcp_xid));
> > +
> > +	/* Find and lock the request corresponding to this xid */
> > +	spin_lock(&xprt->transport_lock);
> > +	req = xprt_lookup_rqst(xprt, transport->tcp_xid);
> > +	if (!req) {
> > +		dprintk("RPC:       XID %08x request not found!\n",
> > +				ntohl(transport->tcp_xid));
> > +		spin_unlock(&xprt->transport_lock);
> > +		return -1;
> > +	}
> > +
> > +	xs_tcp_read_common(xprt, desc, req);
> > +
> >  	if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
> >  		xprt_complete_rqst(req->rq_task, transport->tcp_copied);
> > +
> >  	spin_unlock(&xprt->transport_lock);
> > -	xs_tcp_check_fraghdr(transport);
> > +	return 0;
> > +}
> > +
> > +/*
> > + * Allocates an rpc_rqst from the bc_mempool and invokes the common
> > + * tcp read code to read the data.  The result is placed in the callback
> > + * queue.
> > + */
> > +static inline int xs_tcp_read_callback(struct rpc_xprt *xprt,
> > +				       struct xdr_skb_reader *desc)
> > +{
> > +	struct sock_xprt *transport =
> > +				container_of(xprt, struct sock_xprt, xprt);
> > +	struct rpc_rqst *req;
> > +
> > +	req = xprt_alloc_bc_request(xprt);
> > +	req->rq_xid = transport->tcp_xid;
> > +	dprintk("RPC:       read callback  XID %08x\n", ntohl(req->rq_xid));
> > +	if (req == NULL) {
> > +		/*
> > +		 * Drop the callback.
> > +		 * The caller may choose to retransmit by first dropping the
> > +		 * connection if using NFSv4.1.
> > +		 */
> > +		dprintk("RPC:       Couldn't get rpc_rqst for the callback! Dropping callback...\n");
> > +		return -1;
> > +	}
> > +
> > +	xs_tcp_read_common(xprt, desc, req);
> > +
> > +	if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) {
> > +		/*
> > +		 * Add callback request to callback list.  The callback
> > +		 * service sleeps on the sv_cb_waitq waiting for new
> > +		 * requests.  Wake it up after adding enqueing the
> > +		 * request.
> > +		 */
> > +		dprintk("RPC:       add callback request to list\n");
> > +		spin_lock(&xprt->serv->sv_cb_lock);
> > +		list_add(&req->rq_list, &xprt->serv->sv_cb_list);
> > +		spin_unlock(&xprt->serv->sv_cb_lock);
> > +		wake_up(&xprt->serv->sv_cb_waitq);
> > +	}
> > +
> > +	return 0;
> > +}
> > +
> > +/*
> > + * Read data off the transport.  This can be either an RPC_CALL or an
> > + * RPC_REPLY.  Relay the processing to helper functions.
> > + */
> > +static void xs_tcp_read_data(struct rpc_xprt *xprt,
> > +				    struct xdr_skb_reader *desc)
> > +{
> > +	struct sock_xprt *transport =
> > +				container_of(xprt, struct sock_xprt, xprt);
> > +	int status;
> > +
> > +	status = (ntohl(transport->tcp_calldir) == RPC_REPLY) ?
> > +		xs_tcp_read_reply(xprt, desc) :
> > +		xs_tcp_read_callback(xprt, desc);
> > +
> > +	if (status == 0)
> > +		xs_tcp_check_fraghdr(transport);
> > +	else {
> > +		/*
> > +		 * The transport_lock protects the request handling.
> > +		 * There's no need to hold it to update the tcp_flags.
> > +		 */
> > +		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
> > +	}
> > +
> > +	return;
> >  }
> >  
> >  static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
> > @@ -1076,7 +1170,7 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns
> >  		}
> >  		/* Read in the request data */
> >  		if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
> > -			xs_tcp_read_request(xprt, &desc);
> > +			xs_tcp_read_data(xprt, &desc);
> >  			continue;
> >  		}
> >  		/* Skip over any trailing bytes on short reads */
> > -- 
> > 1.5.3.3
> > _______________________________________________
> > pNFS mailing list
> > pNFS at linux-nfs.org
> > http://linux-nfs.org/cgi-bin/mailman/listinfo/pnfs


More information about the pNFS mailing list