RPC: Support for failover to a new transport struct Signed-off-by: Trond Myklebust --- include/linux/sunrpc/clnt.h | 11 ++++----- include/linux/sunrpc/sched.h | 2 - include/linux/sunrpc/xprt.h | 2 - net/sunrpc/clnt.c | 50 ++++++++++++++++++++++++++++++++++++------- net/sunrpc/pmap_clnt.c | 2 - net/sunrpc/rpc_pipe.c | 6 +++-- net/sunrpc/sched.c | 2 + net/sunrpc/xprt.c | 12 ++++++++++ 8 files changed, 68 insertions(+), 19 deletions(-) Index: linux-2.6.12-rc1/net/sunrpc/pmap_clnt.c =================================================================== --- linux-2.6.12-rc1.orig/net/sunrpc/pmap_clnt.c +++ linux-2.6.12-rc1/net/sunrpc/pmap_clnt.c @@ -39,7 +39,7 @@ void rpc_getport(struct rpc_task *task, struct rpc_clnt *clnt) { struct rpc_portmap *map = clnt->cl_pmap; - struct sockaddr_in *sap = &clnt->cl_xprt->addr; + struct sockaddr_in *sap = &task->tk_xprt->addr; struct rpc_message msg = { .rpc_proc = &pmap_procedures[PMAP_GETPORT], .rpc_argp = map, Index: linux-2.6.12-rc1/net/sunrpc/rpc_pipe.c =================================================================== --- linux-2.6.12-rc1.orig/net/sunrpc/rpc_pipe.c +++ linux-2.6.12-rc1/net/sunrpc/rpc_pipe.c @@ -291,14 +291,16 @@ static int rpc_show_info(struct seq_file *m, void *v) { struct rpc_clnt *clnt = m->private; + struct rpc_xprt *xprt = rpc_client_get_xprt(clnt); seq_printf(m, "RPC server: %s\n", clnt->cl_server); seq_printf(m, "service: %s (%d) version %d\n", clnt->cl_protname, clnt->cl_prog, clnt->cl_vers); seq_printf(m, "address: %u.%u.%u.%u\n", - NIPQUAD(clnt->cl_xprt->addr.sin_addr.s_addr)); + NIPQUAD(xprt->addr.sin_addr.s_addr)); seq_printf(m, "protocol: %s\n", - clnt->cl_xprt->prot == IPPROTO_UDP ? "udp" : "tcp"); + xprt->prot == IPPROTO_UDP ? "udp" : "tcp"); + rpc_put_xprt(xprt); return 0; } Index: linux-2.6.12-rc1/include/linux/sunrpc/clnt.h =================================================================== --- linux-2.6.12-rc1.orig/include/linux/sunrpc/clnt.h +++ linux-2.6.12-rc1/include/linux/sunrpc/clnt.h @@ -9,12 +9,9 @@ #ifndef _LINUX_SUNRPC_CLNT_H #define _LINUX_SUNRPC_CLNT_H -#include -#include -#include #include +#include #include -#include #include #include @@ -66,7 +63,6 @@ struct rpc_clnt { struct rpc_portmap cl_pmap_default; char cl_inline_name[32]; }; -#define cl_timeout cl_xprt->timeout #define cl_prog cl_pmap->pm_prog #define cl_vers cl_pmap->pm_vers #define cl_port cl_pmap->pm_port @@ -103,7 +99,6 @@ struct rpc_procinfo { unsigned int p_timer; /* Which RTT timer to use */ }; -#define RPC_CONGESTED(clnt) (RPCXPRT_CONGESTED((clnt)->cl_xprt)) #define RPC_PEERADDR(clnt) (&(clnt)->cl_xprt->addr) #ifdef __KERNEL__ @@ -136,6 +131,10 @@ void rpc_setbufsize(struct rpc_clnt *, size_t rpc_max_payload(struct rpc_clnt *); int rpc_ping(struct rpc_clnt *clnt, int flags); +struct rpc_xprt *rpc_client_get_xprt(struct rpc_clnt *clnt); +void rpc_client_set_xprt(struct rpc_clnt *, struct rpc_xprt *); +void rpc_put_xprt(struct rpc_xprt *xprt); + static __inline__ int rpc_call(struct rpc_clnt *clnt, u32 proc, void *argp, void *resp, int flags) { Index: linux-2.6.12-rc1/include/linux/sunrpc/xprt.h =================================================================== --- linux-2.6.12-rc1.orig/include/linux/sunrpc/xprt.h +++ linux-2.6.12-rc1/include/linux/sunrpc/xprt.h @@ -9,7 +9,6 @@ #ifndef _LINUX_SUNRPC_XPRT_H #define _LINUX_SUNRPC_XPRT_H -#include #include #include #include @@ -127,6 +126,7 @@ struct rpc_rqst { #define XPRT_COPY_DATA (1 << 3) struct rpc_xprt { + atomic_t count; /* Reference counter */ struct socket * sock; /* BSD socket layer */ struct sock * inet; /* INET layer */ Index: linux-2.6.12-rc1/net/sunrpc/sched.c =================================================================== --- linux-2.6.12-rc1.orig/net/sunrpc/sched.c +++ linux-2.6.12-rc1/net/sunrpc/sched.c @@ -773,6 +773,7 @@ void rpc_init_task(struct rpc_task *task task->tk_flags |= RPC_TASK_SOFT; if (!clnt->cl_intr) task->tk_flags |= RPC_TASK_NOINTR; + task->tk_xprt = rpc_client_get_xprt(clnt); } #ifdef RPC_DEBUG @@ -862,6 +863,7 @@ void rpc_release_task(struct rpc_task *t rpcauth_unbindcred(task); rpc_free(task); if (task->tk_client) { + rpc_put_xprt(task->tk_xprt); rpc_release_client(task->tk_client); task->tk_client = NULL; } Index: linux-2.6.12-rc1/include/linux/sunrpc/sched.h =================================================================== --- linux-2.6.12-rc1.orig/include/linux/sunrpc/sched.h +++ linux-2.6.12-rc1/include/linux/sunrpc/sched.h @@ -43,6 +43,7 @@ struct rpc_task { #endif struct list_head tk_task; /* global list of tasks */ struct rpc_clnt * tk_client; /* RPC client */ + struct rpc_xprt * tk_xprt; /* RPC request */ struct rpc_rqst * tk_rqstp; /* RPC request */ int tk_status; /* result of last operation */ @@ -93,7 +94,6 @@ struct rpc_task { #endif }; #define tk_auth tk_client->cl_auth -#define tk_xprt tk_client->cl_xprt /* support walking a list of tasks on a wait queue */ #define task_for_each(task, pos, head) \ Index: linux-2.6.12-rc1/net/sunrpc/clnt.c =================================================================== --- linux-2.6.12-rc1.orig/net/sunrpc/clnt.c +++ linux-2.6.12-rc1/net/sunrpc/clnt.c @@ -180,7 +180,7 @@ out_no_path: kfree(clnt->cl_server); kfree(clnt); out_err: - xprt_destroy(xprt); + rpc_put_xprt(xprt); return ERR_PTR(err); } @@ -305,7 +305,7 @@ rpc_destroy_client(struct rpc_clnt *clnt if (clnt->cl_pathname[0]) rpc_rmdir(clnt->cl_pathname); if (clnt->cl_xprt) { - xprt_destroy(clnt->cl_xprt); + rpc_put_xprt(clnt->cl_xprt); clnt->cl_xprt = NULL; } if (clnt->cl_server != clnt->cl_inline_name) @@ -450,6 +450,41 @@ out: return status; } +/** + * rpc_client_get_xprt() - Get reference to the RPC transport struct + * @clnt - pointer to RPC client + */ +struct rpc_xprt *rpc_client_get_xprt(struct rpc_clnt *clnt) +{ + struct rpc_xprt *xprt; + + /* Synchronize w.r.t. rpc_client_set_xprt() */ + rcu_read_lock(); + xprt = clnt->cl_xprt; + atomic_inc(&xprt->count); + rcu_read_unlock(); + return xprt; +} + +/** + * rpc_client_set_xprt() - Change the transport struct pointer on an in-use RPC client + * @clnt - pointer to RPC client + * @xprt - new transport + * + * This function should be called VERY infrequently, and is designed + * to be called only in case of a failover mount. + */ +void rpc_client_set_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt) +{ + struct rpc_xprt *old; + + old = xchg(&clnt->cl_xprt, xprt); + /* Wait for all reads of clnt->cl_xprt == old to complete */ + synchronize_kernel(); + rpc_put_xprt(old); +} +EXPORT_SYMBOL(rpc_client_set_xprt); + /* * New rpc_call implementation */ @@ -513,7 +548,7 @@ rpc_call_setup(struct rpc_task *task, st void rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize) { - struct rpc_xprt *xprt = clnt->cl_xprt; + struct rpc_xprt *xprt = rpc_client_get_xprt(clnt); xprt->sndsize = 0; if (sndsize) @@ -523,6 +558,7 @@ rpc_setbufsize(struct rpc_clnt *clnt, un xprt->rcvsize = rcvsize + RPC_SLACK_SPACE; if (xprt_connected(xprt)) xprt_sock_setbufsize(xprt); + rpc_put_xprt(xprt); } /* @@ -731,7 +767,7 @@ static void call_bind(struct rpc_task *task) { struct rpc_clnt *clnt = task->tk_client; - struct rpc_xprt *xprt = clnt->cl_xprt; + struct rpc_xprt *xprt = task->tk_xprt; dprintk("RPC: %4d call_bind xprt %p %s connected\n", task->tk_pid, xprt, (xprt_connected(xprt) ? "is" : "is not")); @@ -751,12 +787,10 @@ call_bind(struct rpc_task *task) static void call_connect(struct rpc_task *task) { - struct rpc_clnt *clnt = task->tk_client; - dprintk("RPC: %4d call_connect status %d\n", task->tk_pid, task->tk_status); - if (xprt_connected(clnt->cl_xprt)) { + if (xprt_connected(task->tk_xprt)) { task->tk_action = call_transmit; return; } @@ -1017,7 +1051,7 @@ static u32 * call_header(struct rpc_task *task) { struct rpc_clnt *clnt = task->tk_client; - struct rpc_xprt *xprt = clnt->cl_xprt; + struct rpc_xprt *xprt = task->tk_xprt; struct rpc_rqst *req = task->tk_rqstp; u32 *p = req->rq_svec[0].iov_base; Index: linux-2.6.12-rc1/net/sunrpc/xprt.c =================================================================== --- linux-2.6.12-rc1.orig/net/sunrpc/xprt.c +++ linux-2.6.12-rc1/net/sunrpc/xprt.c @@ -1464,6 +1464,7 @@ xprt_setup(int proto, struct sockaddr_in if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL) return ERR_PTR(-ENOMEM); memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */ + atomic_set(&xprt->count, 1); xprt->max_reqs = entries; slot_table_size = entries * sizeof(xprt->slot[0]); xprt->slot = kmalloc(slot_table_size, GFP_KERNEL); @@ -1694,3 +1695,14 @@ xprt_destroy(struct rpc_xprt *xprt) return 0; } + +/** + * rpc_put_xprt() - Drop reference to the RPC transport struct + * @xprt - pointer to RPC transport + */ +void rpc_put_xprt(struct rpc_xprt *xprt) +{ + if (xprt != NULL && atomic_dec_and_test(&xprt->count)) + xprt_destroy(xprt); +} +