RPC: Support for failover to a new transport struct Signed-off-by: Trond Myklebust --- include/linux/sunrpc/clnt.h | 11 +++---- include/linux/sunrpc/sched.h | 2 - include/linux/sunrpc/xprt.h | 2 - net/sunrpc/clnt.c | 65 ++++++++++++++++++++++++++++++++++--------- net/sunrpc/pmap_clnt.c | 2 - net/sunrpc/rpc_pipe.c | 6 ++- net/sunrpc/sched.c | 2 + net/sunrpc/xprt.c | 12 +++++++ 8 files changed, 78 insertions(+), 24 deletions(-) Index: linux-2.6.13-rc1/net/sunrpc/pmap_clnt.c =================================================================== --- linux-2.6.13-rc1.orig/net/sunrpc/pmap_clnt.c +++ linux-2.6.13-rc1/net/sunrpc/pmap_clnt.c @@ -39,7 +39,7 @@ void rpc_getport(struct rpc_task *task, struct rpc_clnt *clnt) { struct rpc_portmap *map = clnt->cl_pmap; - struct sockaddr_in *sap = &clnt->cl_xprt->addr; + struct sockaddr_in *sap = &task->tk_xprt->addr; struct rpc_message msg = { .rpc_proc = &pmap_procedures[PMAP_GETPORT], .rpc_argp = map, Index: linux-2.6.13-rc1/net/sunrpc/rpc_pipe.c =================================================================== --- linux-2.6.13-rc1.orig/net/sunrpc/rpc_pipe.c +++ linux-2.6.13-rc1/net/sunrpc/rpc_pipe.c @@ -291,14 +291,16 @@ static int rpc_show_info(struct seq_file *m, void *v) { struct rpc_clnt *clnt = m->private; + struct rpc_xprt *xprt = rpc_client_get_xprt(clnt); seq_printf(m, "RPC server: %s\n", clnt->cl_server); seq_printf(m, "service: %s (%d) version %d\n", clnt->cl_protname, clnt->cl_prog, clnt->cl_vers); seq_printf(m, "address: %u.%u.%u.%u\n", - NIPQUAD(clnt->cl_xprt->addr.sin_addr.s_addr)); + NIPQUAD(xprt->addr.sin_addr.s_addr)); seq_printf(m, "protocol: %s\n", - clnt->cl_xprt->prot == IPPROTO_UDP ? "udp" : "tcp"); + xprt->prot == IPPROTO_UDP ? "udp" : "tcp"); + rpc_put_xprt(xprt); return 0; } Index: linux-2.6.13-rc1/include/linux/sunrpc/clnt.h =================================================================== --- linux-2.6.13-rc1.orig/include/linux/sunrpc/clnt.h +++ linux-2.6.13-rc1/include/linux/sunrpc/clnt.h @@ -9,12 +9,9 @@ #ifndef _LINUX_SUNRPC_CLNT_H #define _LINUX_SUNRPC_CLNT_H -#include -#include -#include #include +#include #include -#include #include #include @@ -66,7 +63,6 @@ struct rpc_clnt { struct rpc_portmap cl_pmap_default; char cl_inline_name[32]; }; -#define cl_timeout cl_xprt->timeout #define cl_prog cl_pmap->pm_prog #define cl_vers cl_pmap->pm_vers #define cl_port cl_pmap->pm_port @@ -103,7 +99,6 @@ struct rpc_procinfo { unsigned int p_timer; /* Which RTT timer to use */ }; -#define RPC_CONGESTED(clnt) (RPCXPRT_CONGESTED((clnt)->cl_xprt)) #define RPC_PEERADDR(clnt) (&(clnt)->cl_xprt->addr) #ifdef __KERNEL__ @@ -136,6 +131,10 @@ void rpc_setbufsize(struct rpc_clnt *, size_t rpc_max_payload(struct rpc_clnt *); int rpc_ping(struct rpc_clnt *clnt, int flags); +struct rpc_xprt *rpc_client_get_xprt(struct rpc_clnt *clnt); +void rpc_client_set_xprt(struct rpc_clnt *, struct rpc_xprt *); +void rpc_put_xprt(struct rpc_xprt *xprt); + static __inline__ int rpc_call(struct rpc_clnt *clnt, u32 proc, void *argp, void *resp, int flags) { Index: linux-2.6.13-rc1/include/linux/sunrpc/xprt.h =================================================================== --- linux-2.6.13-rc1.orig/include/linux/sunrpc/xprt.h +++ linux-2.6.13-rc1/include/linux/sunrpc/xprt.h @@ -9,7 +9,6 @@ #ifndef _LINUX_SUNRPC_XPRT_H #define _LINUX_SUNRPC_XPRT_H -#include #include #include #include @@ -127,6 +126,7 @@ struct rpc_rqst { #define XPRT_COPY_DATA (1 << 3) struct rpc_xprt { + atomic_t count; /* Reference counter */ struct socket * sock; /* BSD socket layer */ struct sock * inet; /* INET layer */ Index: linux-2.6.13-rc1/net/sunrpc/sched.c =================================================================== --- linux-2.6.13-rc1.orig/net/sunrpc/sched.c +++ linux-2.6.13-rc1/net/sunrpc/sched.c @@ -780,6 +780,7 @@ void rpc_init_task(struct rpc_task *task task->tk_flags |= RPC_TASK_SOFT; if (!clnt->cl_intr) task->tk_flags |= RPC_TASK_NOINTR; + task->tk_xprt = rpc_client_get_xprt(clnt); } #ifdef RPC_DEBUG @@ -869,6 +870,7 @@ void rpc_release_task(struct rpc_task *t rpcauth_unbindcred(task); rpc_free(task); if (task->tk_client) { + rpc_put_xprt(task->tk_xprt); rpc_release_client(task->tk_client); task->tk_client = NULL; } Index: linux-2.6.13-rc1/include/linux/sunrpc/sched.h =================================================================== --- linux-2.6.13-rc1.orig/include/linux/sunrpc/sched.h +++ linux-2.6.13-rc1/include/linux/sunrpc/sched.h @@ -43,6 +43,7 @@ struct rpc_task { #endif struct list_head tk_task; /* global list of tasks */ struct rpc_clnt * tk_client; /* RPC client */ + struct rpc_xprt * tk_xprt; /* RPC request */ struct rpc_rqst * tk_rqstp; /* RPC request */ int tk_status; /* result of last operation */ @@ -93,7 +94,6 @@ struct rpc_task { #endif }; #define tk_auth tk_client->cl_auth -#define tk_xprt tk_client->cl_xprt /* support walking a list of tasks on a wait queue */ #define task_for_each(task, pos, head) \ Index: linux-2.6.13-rc1/net/sunrpc/clnt.c =================================================================== --- linux-2.6.13-rc1.orig/net/sunrpc/clnt.c +++ linux-2.6.13-rc1/net/sunrpc/clnt.c @@ -180,7 +180,7 @@ out_no_path: kfree(clnt->cl_server); kfree(clnt); out_err: - xprt_destroy(xprt); + rpc_put_xprt(xprt); return ERR_PTR(err); } @@ -238,7 +238,8 @@ rpc_clone_client(struct rpc_clnt *clnt) new->cl_autobind = 0; new->cl_oneshot = 0; new->cl_dead = 0; - rpc_init_rtt(&new->cl_rtt_default, clnt->cl_xprt->timeout.to_initval); + new->cl_xprt = rpc_client_get_xprt(clnt); + rpc_init_rtt(&new->cl_rtt_default, new->cl_xprt->timeout.to_initval); if (new->cl_auth) atomic_inc(&new->cl_auth->au_count); new->cl_pmap = &new->cl_pmap_default; @@ -298,16 +299,16 @@ rpc_destroy_client(struct rpc_clnt *clnt rpcauth_destroy(clnt->cl_auth); clnt->cl_auth = NULL; } + if (clnt->cl_xprt) { + rpc_put_xprt(clnt->cl_xprt); + clnt->cl_xprt = NULL; + } if (clnt->cl_parent != clnt) { rpc_destroy_client(clnt->cl_parent); goto out_free; } if (clnt->cl_pathname[0]) rpc_rmdir(clnt->cl_pathname); - if (clnt->cl_xprt) { - xprt_destroy(clnt->cl_xprt); - clnt->cl_xprt = NULL; - } if (clnt->cl_server != clnt->cl_inline_name) kfree(clnt->cl_server); out_free: @@ -453,6 +454,41 @@ out: return status; } +/** + * rpc_client_get_xprt() - Get reference to the RPC transport struct + * @clnt - pointer to RPC client + */ +struct rpc_xprt *rpc_client_get_xprt(struct rpc_clnt *clnt) +{ + struct rpc_xprt *xprt; + + /* Synchronize w.r.t. rpc_client_set_xprt() */ + rcu_read_lock(); + xprt = rcu_dereference(clnt->cl_xprt); + atomic_inc(&xprt->count); + rcu_read_unlock(); + return xprt; +} + +/** + * rpc_client_set_xprt() - Change the transport struct pointer on an in-use RPC client + * @clnt - pointer to RPC client + * @xprt - new transport + * + * This function should be called VERY infrequently, and is designed + * to be called only in case of a failover mount. + */ +void rpc_client_set_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt) +{ + struct rpc_xprt *old; + + old = xchg(&clnt->cl_xprt, xprt); + /* Wait for all reads of clnt->cl_xprt == old to complete */ + synchronize_rcu(); + rpc_put_xprt(old); +} +EXPORT_SYMBOL(rpc_client_set_xprt); + /* * New rpc_call implementation */ @@ -516,7 +552,7 @@ rpc_call_setup(struct rpc_task *task, st void rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize) { - struct rpc_xprt *xprt = clnt->cl_xprt; + struct rpc_xprt *xprt = rpc_client_get_xprt(clnt); xprt->sndsize = 0; if (sndsize) @@ -526,6 +562,7 @@ rpc_setbufsize(struct rpc_clnt *clnt, un xprt->rcvsize = rcvsize + RPC_SLACK_SPACE; if (xprt_connected(xprt)) xprt_sock_setbufsize(xprt); + rpc_put_xprt(xprt); } /* @@ -538,7 +575,11 @@ rpc_setbufsize(struct rpc_clnt *clnt, un */ size_t rpc_max_payload(struct rpc_clnt *clnt) { - return clnt->cl_xprt->max_payload; + size_t res; + rcu_read_lock(); + res = rcu_dereference(clnt->cl_xprt)->max_payload; + rcu_read_unlock(); + return res; } EXPORT_SYMBOL(rpc_max_payload); @@ -734,7 +775,7 @@ static void call_bind(struct rpc_task *task) { struct rpc_clnt *clnt = task->tk_client; - struct rpc_xprt *xprt = clnt->cl_xprt; + struct rpc_xprt *xprt = task->tk_xprt; dprintk("RPC: %4d call_bind xprt %p %s connected\n", task->tk_pid, xprt, (xprt_connected(xprt) ? "is" : "is not")); @@ -754,12 +795,10 @@ call_bind(struct rpc_task *task) static void call_connect(struct rpc_task *task) { - struct rpc_clnt *clnt = task->tk_client; - dprintk("RPC: %4d call_connect status %d\n", task->tk_pid, task->tk_status); - if (xprt_connected(clnt->cl_xprt)) { + if (xprt_connected(task->tk_xprt)) { task->tk_action = call_transmit; return; } @@ -1020,7 +1059,7 @@ static u32 * call_header(struct rpc_task *task) { struct rpc_clnt *clnt = task->tk_client; - struct rpc_xprt *xprt = clnt->cl_xprt; + struct rpc_xprt *xprt = task->tk_xprt; struct rpc_rqst *req = task->tk_rqstp; u32 *p = req->rq_svec[0].iov_base; Index: linux-2.6.13-rc1/net/sunrpc/xprt.c =================================================================== --- linux-2.6.13-rc1.orig/net/sunrpc/xprt.c +++ linux-2.6.13-rc1/net/sunrpc/xprt.c @@ -1485,6 +1485,7 @@ xprt_setup(int proto, struct sockaddr_in if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL) return ERR_PTR(-ENOMEM); memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */ + atomic_set(&xprt->count, 1); xprt->max_reqs = entries; slot_table_size = entries * sizeof(xprt->slot[0]); xprt->slot = kmalloc(slot_table_size, GFP_KERNEL); @@ -1719,3 +1720,14 @@ xprt_destroy(struct rpc_xprt *xprt) return 0; } + +/** + * rpc_put_xprt() - Drop reference to the RPC transport struct + * @xprt - pointer to RPC transport + */ +void rpc_put_xprt(struct rpc_xprt *xprt) +{ + if (xprt != NULL && atomic_dec_and_test(&xprt->count)) + xprt_destroy(xprt); +} +