RPC: Support for failover to a new transport struct Signed-off-by: Trond Myklebust --- include/linux/sunrpc/clnt.h | 11 ++++----- include/linux/sunrpc/sched.h | 2 - include/linux/sunrpc/xprt.h | 2 - net/sunrpc/clnt.c | 48 ++++++++++++++++++++++++++++++++++++------- net/sunrpc/pmap_clnt.c | 2 - net/sunrpc/rpc_pipe.c | 6 +++-- net/sunrpc/sched.c | 2 + net/sunrpc/xprt.c | 12 ++++++++++ 8 files changed, 67 insertions(+), 18 deletions(-) Index: linux-2.6.11-rc5/net/sunrpc/pmap_clnt.c =================================================================== --- linux-2.6.11-rc5.orig/net/sunrpc/pmap_clnt.c +++ linux-2.6.11-rc5/net/sunrpc/pmap_clnt.c @@ -39,7 +39,7 @@ void rpc_getport(struct rpc_task *task, struct rpc_clnt *clnt) { struct rpc_portmap *map = clnt->cl_pmap->pm_parent; - struct sockaddr_in *sap = &clnt->cl_xprt->addr; + struct sockaddr_in *sap = &task->tk_xprt->addr; struct rpc_message msg = { .rpc_proc = &pmap_procedures[PMAP_GETPORT], .rpc_argp = map, Index: linux-2.6.11-rc5/net/sunrpc/rpc_pipe.c =================================================================== --- linux-2.6.11-rc5.orig/net/sunrpc/rpc_pipe.c +++ linux-2.6.11-rc5/net/sunrpc/rpc_pipe.c @@ -291,14 +291,16 @@ static int rpc_show_info(struct seq_file *m, void *v) { struct rpc_clnt *clnt = m->private; + struct rpc_xprt *xprt = rpc_client_get_xprt(clnt); seq_printf(m, "RPC server: %s\n", clnt->cl_server); seq_printf(m, "service: %s (%d) version %d\n", clnt->cl_protname, clnt->cl_prog, clnt->cl_vers); seq_printf(m, "address: %u.%u.%u.%u\n", - NIPQUAD(clnt->cl_xprt->addr.sin_addr.s_addr)); + NIPQUAD(xprt->addr.sin_addr.s_addr)); seq_printf(m, "protocol: %s\n", - clnt->cl_xprt->prot == IPPROTO_UDP ? "udp" : "tcp"); + xprt->prot == IPPROTO_UDP ? "udp" : "tcp"); + rpc_put_xprt(xprt); return 0; } Index: linux-2.6.11-rc5/include/linux/sunrpc/clnt.h =================================================================== --- linux-2.6.11-rc5.orig/include/linux/sunrpc/clnt.h +++ linux-2.6.11-rc5/include/linux/sunrpc/clnt.h @@ -9,12 +9,9 @@ #ifndef _LINUX_SUNRPC_CLNT_H #define _LINUX_SUNRPC_CLNT_H -#include -#include -#include #include +#include #include -#include #include #include @@ -67,7 +64,6 @@ struct rpc_clnt { struct rpc_portmap cl_pmap_default; char cl_inline_name[32]; }; -#define cl_timeout cl_xprt->timeout #define cl_prog cl_pmap->pm_prog #define cl_vers cl_pmap->pm_vers #define cl_port cl_pmap->pm_port @@ -104,7 +100,6 @@ struct rpc_procinfo { unsigned int p_timer; /* Which RTT timer to use */ }; -#define RPC_CONGESTED(clnt) (RPCXPRT_CONGESTED((clnt)->cl_xprt)) #define RPC_PEERADDR(clnt) (&(clnt)->cl_xprt->addr) #ifdef __KERNEL__ @@ -133,6 +128,10 @@ void rpc_clnt_sigunmask(struct rpc_clnt void rpc_setbufsize(struct rpc_clnt *, unsigned int, unsigned int); size_t rpc_max_payload(struct rpc_clnt *); +struct rpc_xprt *rpc_client_get_xprt(struct rpc_clnt *clnt); +void rpc_client_set_xprt(struct rpc_clnt *, struct rpc_xprt *); +void rpc_put_xprt(struct rpc_xprt *xprt); + static __inline__ int rpc_call(struct rpc_clnt *clnt, u32 proc, void *argp, void *resp, int flags) { Index: linux-2.6.11-rc5/include/linux/sunrpc/xprt.h =================================================================== --- linux-2.6.11-rc5.orig/include/linux/sunrpc/xprt.h +++ linux-2.6.11-rc5/include/linux/sunrpc/xprt.h @@ -9,7 +9,6 @@ #ifndef _LINUX_SUNRPC_XPRT_H #define _LINUX_SUNRPC_XPRT_H -#include #include #include #include @@ -127,6 +126,7 @@ struct rpc_rqst { #define XPRT_COPY_DATA (1 << 3) struct rpc_xprt { + atomic_t count; /* Reference counter */ struct socket * sock; /* BSD socket layer */ struct sock * inet; /* INET layer */ Index: linux-2.6.11-rc5/net/sunrpc/sched.c =================================================================== --- linux-2.6.11-rc5.orig/net/sunrpc/sched.c +++ linux-2.6.11-rc5/net/sunrpc/sched.c @@ -768,6 +768,7 @@ void rpc_init_task(struct rpc_task *task task->tk_flags |= RPC_TASK_SOFT; if (!clnt->cl_intr) task->tk_flags |= RPC_TASK_NOINTR; + task->tk_xprt = rpc_client_get_xprt(clnt); } #ifdef RPC_DEBUG @@ -857,6 +858,7 @@ void rpc_release_task(struct rpc_task *t rpcauth_unbindcred(task); rpc_free(task); if (task->tk_client) { + rpc_put_xprt(task->tk_xprt); rpc_release_client(task->tk_client); task->tk_client = NULL; } Index: linux-2.6.11-rc5/include/linux/sunrpc/sched.h =================================================================== --- linux-2.6.11-rc5.orig/include/linux/sunrpc/sched.h +++ linux-2.6.11-rc5/include/linux/sunrpc/sched.h @@ -44,6 +44,7 @@ struct rpc_task { #endif struct list_head tk_task; /* global list of tasks */ struct rpc_clnt * tk_client; /* RPC client */ + struct rpc_xprt * tk_xprt; /* RPC request */ struct rpc_rqst * tk_rqstp; /* RPC request */ int tk_status; /* result of last operation */ @@ -94,7 +95,6 @@ struct rpc_task { #endif }; #define tk_auth tk_client->cl_auth -#define tk_xprt tk_client->cl_xprt /* support walking a list of tasks on a wait queue */ #define task_for_each(task, pos, head) \ Index: linux-2.6.11-rc5/net/sunrpc/clnt.c =================================================================== --- linux-2.6.11-rc5.orig/net/sunrpc/clnt.c +++ linux-2.6.11-rc5/net/sunrpc/clnt.c @@ -274,7 +274,7 @@ rpc_destroy_client(struct rpc_clnt *clnt if (clnt->cl_pathname[0]) rpc_rmdir(clnt->cl_pathname); if (clnt->cl_xprt) { - xprt_destroy(clnt->cl_xprt); + rpc_put_xprt(clnt->cl_xprt); clnt->cl_xprt = NULL; } if (clnt->cl_server != clnt->cl_inline_name) @@ -400,6 +400,41 @@ out: return status; } +/** + * rpc_client_get_xprt() - Get reference to the RPC transport struct + * @clnt - pointer to RPC client + */ +struct rpc_xprt *rpc_client_get_xprt(struct rpc_clnt *clnt) +{ + struct rpc_xprt *xprt; + + /* Synchronize w.r.t. rpc_client_set_xprt() */ + rcu_read_lock(); + xprt = clnt->cl_xprt; + atomic_inc(&xprt->count); + rcu_read_unlock(); + return xprt; +} + +/** + * rpc_client_set_xprt() - Change the transport struct pointer on an in-use RPC client + * @clnt - pointer to RPC client + * @xprt - new transport + * + * This function should be called VERY infrequently, and is designed + * to be called only in case of a failover mount. + */ +void rpc_client_set_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt) +{ + struct rpc_xprt *old; + + old = xchg(&clnt->cl_xprt, xprt); + /* Wait for all reads of clnt->cl_xprt == old to complete */ + synchronize_kernel(); + rpc_put_xprt(old); +} +EXPORT_SYMBOL(rpc_client_set_xprt); + /* * New rpc_call implementation */ @@ -463,7 +498,7 @@ rpc_call_setup(struct rpc_task *task, st void rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize) { - struct rpc_xprt *xprt = clnt->cl_xprt; + struct rpc_xprt *xprt = rpc_client_get_xprt(clnt); xprt->sndsize = 0; if (sndsize) @@ -473,6 +508,7 @@ rpc_setbufsize(struct rpc_clnt *clnt, un xprt->rcvsize = rcvsize + RPC_SLACK_SPACE; if (xprt_connected(xprt)) xprt_sock_setbufsize(xprt); + rpc_put_xprt(xprt); } /* @@ -681,7 +717,7 @@ static void call_bind(struct rpc_task *task) { struct rpc_clnt *clnt = task->tk_client; - struct rpc_xprt *xprt = clnt->cl_xprt; + struct rpc_xprt *xprt = task->tk_xprt; dprintk("RPC: %4d call_bind xprt %p %s connected\n", task->tk_pid, xprt, (xprt_connected(xprt) ? "is" : "is not")); @@ -701,12 +737,10 @@ call_bind(struct rpc_task *task) static void call_connect(struct rpc_task *task) { - struct rpc_clnt *clnt = task->tk_client; - dprintk("RPC: %4d call_connect status %d\n", task->tk_pid, task->tk_status); - if (xprt_connected(clnt->cl_xprt)) { + if (xprt_connected(task->tk_xprt)) { task->tk_action = call_transmit; return; } @@ -967,7 +1001,7 @@ static u32 * call_header(struct rpc_task *task) { struct rpc_clnt *clnt = task->tk_client; - struct rpc_xprt *xprt = clnt->cl_xprt; + struct rpc_xprt *xprt = task->tk_xprt; struct rpc_rqst *req = task->tk_rqstp; u32 *p = req->rq_svec[0].iov_base; Index: linux-2.6.11-rc5/net/sunrpc/xprt.c =================================================================== --- linux-2.6.11-rc5.orig/net/sunrpc/xprt.c +++ linux-2.6.11-rc5/net/sunrpc/xprt.c @@ -1463,6 +1463,7 @@ xprt_setup(int proto, struct sockaddr_in if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL) return ERR_PTR(-ENOMEM); memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */ + atomic_set(&xprt->count, 1); xprt->max_reqs = entries; slot_table_size = entries * sizeof(xprt->slot[0]); xprt->slot = kmalloc(slot_table_size, GFP_KERNEL); @@ -1693,3 +1694,14 @@ xprt_destroy(struct rpc_xprt *xprt) return 0; } + +/** + * rpc_put_xprt() - Drop reference to the RPC transport struct + * @xprt - pointer to RPC transport + */ +void rpc_put_xprt(struct rpc_xprt *xprt) +{ + if (xprt != NULL && atomic_dec_and_test(&xprt->count)) + xprt_destroy(xprt); +} +