From: Trond Myklebust Date: Tue, 15 Jul 2008 17:58:17 -0400 SUNRPC: Add connected sockets for UDP Signed-off-by: Trond Myklebust --- net/sunrpc/xprtsock.c | 149 +++++++++++++++++++++++++++++-------------------- 1 files changed, 88 insertions(+), 61 deletions(-) diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 4486c59..2e49f5a 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -580,8 +580,8 @@ static int xs_udp_send_request(struct rpc_task *task) req->rq_svec->iov_len); status = xs_sendpages(transport->sock, - xs_addr(xprt), - xprt->addrlen, xdr, + NULL, + 0, xdr, req->rq_bytes_sent); dprintk("RPC: xs_udp_send_request(%u) = %d\n", @@ -1445,13 +1445,13 @@ static inline void xs_reclassify_socket6(struct socket *sock) } #endif -static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) +static int xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) { struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); + struct sock *sk = sock->sk; + int ret; if (!transport->inet) { - struct sock *sk = sock->sk; - write_lock_bh(&sk->sk_callback_lock); sk->sk_user_data = xprt; @@ -1463,8 +1463,6 @@ static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) sk->sk_no_check = UDP_CSUM_NORCV; sk->sk_allocation = GFP_ATOMIC; - xprt_set_connected(xprt); - /* Reset to new socket */ transport->sock = sock; transport->inet = sk; @@ -1472,6 +1470,39 @@ static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) write_unlock_bh(&sk->sk_callback_lock); } xs_udp_do_set_buffer_size(xprt); + ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0); + + if (ret == 0) { + spin_lock_bh(&xprt->transport_lock); + if (sk->sk_state == TCP_ESTABLISHED) + xprt_set_connected(xprt); + spin_unlock_bh(&xprt->transport_lock); + } + return ret; +} + +/* + * We need to preserve the port number so the reply cache on the server can + * find our cached RPC replies when we get around to reconnecting. + */ +static void xs_sock_reuse_connection(struct rpc_xprt *xprt) +{ + int result; + struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); + struct sockaddr any; + + dprintk("RPC: disconnecting xprt %p to reuse port\n", xprt); + + /* + * Disconnect the transport socket by doing a connect operation + * with AF_UNSPEC. This should return immediately... + */ + memset(&any, 0, sizeof(any)); + any.sa_family = AF_UNSPEC; + result = kernel_connect(transport->sock, &any, sizeof(any), 0); + if (result) + dprintk("RPC: AF_UNSPEC connect return code %d\n", + result); } /** @@ -1491,25 +1522,35 @@ static void xs_udp_connect_worker4(struct work_struct *work) if (xprt->shutdown || !xprt_bound(xprt)) goto out; - /* Start by resetting any existing state */ - xs_close(xprt); - - if ((err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) { - dprintk("RPC: can't create UDP transport socket (%d).\n", -err); - goto out; - } - xs_reclassify_socket4(sock); + if (!sock) { + if ((err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) { + dprintk("RPC: can't create UDP transport socket (%d).\n", -err); + goto out; + } + xs_reclassify_socket4(sock); - if (xs_bind4(transport, sock)) { - sock_release(sock); - goto out; - } + if (xs_bind4(transport, sock)) { + sock_release(sock); + goto out; + } + } else + xs_sock_reuse_connection(xprt); dprintk("RPC: worker connecting xprt %p to address: %s\n", xprt, xprt->address_strings[RPC_DISPLAY_ALL]); - xs_udp_finish_connecting(xprt, sock); - status = 0; + status = xs_udp_finish_connecting(xprt, sock); + if (status < 0) { + switch (status) { + case -ECONNREFUSED: + case -ECONNRESET: + /* retry with existing socket, after a delay */ + break; + default: + /* get rid of existing socket, and retry */ + xs_close(xprt); + } + } out: xprt_wake_pending_tasks(xprt, status); xprt_clear_connecting(xprt); @@ -1532,54 +1573,40 @@ static void xs_udp_connect_worker6(struct work_struct *work) if (xprt->shutdown || !xprt_bound(xprt)) goto out; - /* Start by resetting any existing state */ - xs_close(xprt); - - if ((err = sock_create_kern(PF_INET6, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) { - dprintk("RPC: can't create UDP transport socket (%d).\n", -err); - goto out; - } - xs_reclassify_socket6(sock); + if (!sock) { + if ((err = sock_create_kern(PF_INET6, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) { + dprintk("RPC: can't create UDP transport socket (%d).\n", -err); + goto out; + } + xs_reclassify_socket6(sock); - if (xs_bind6(transport, sock) < 0) { - sock_release(sock); - goto out; - } + if (xs_bind6(transport, sock) < 0) { + sock_release(sock); + goto out; + } + } else + xs_sock_reuse_connection(xprt); dprintk("RPC: worker connecting xprt %p to address: %s\n", xprt, xprt->address_strings[RPC_DISPLAY_ALL]); - xs_udp_finish_connecting(xprt, sock); - status = 0; + status = xs_udp_finish_connecting(xprt, sock); + if (status < 0) { + switch (status) { + case -ECONNREFUSED: + case -ECONNRESET: + /* retry with existing socket, after a delay */ + break; + default: + /* get rid of existing socket, and retry */ + xs_close(xprt); + } + } out: xprt_wake_pending_tasks(xprt, status); xprt_clear_connecting(xprt); } -/* - * We need to preserve the port number so the reply cache on the server can - * find our cached RPC replies when we get around to reconnecting. - */ -static void xs_tcp_reuse_connection(struct rpc_xprt *xprt) -{ - int result; - struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); - struct sockaddr any; - - dprintk("RPC: disconnecting xprt %p to reuse port\n", xprt); - - /* - * Disconnect the transport socket by doing a connect operation - * with AF_UNSPEC. This should return immediately... - */ - memset(&any, 0, sizeof(any)); - any.sa_family = AF_UNSPEC; - result = kernel_connect(transport->sock, &any, sizeof(any), 0); - if (result) - dprintk("RPC: AF_UNSPEC connect return code %d\n", - result); -} - static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) { struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); @@ -1650,7 +1677,7 @@ static void xs_tcp_connect_worker4(struct work_struct *work) } } else /* "close" the socket, preserving the local port */ - xs_tcp_reuse_connection(xprt); + xs_sock_reuse_connection(xprt); dprintk("RPC: worker connecting xprt %p to address: %s\n", xprt, xprt->address_strings[RPC_DISPLAY_ALL]); @@ -1710,7 +1737,7 @@ static void xs_tcp_connect_worker6(struct work_struct *work) } } else /* "close" the socket, preserving the local port */ - xs_tcp_reuse_connection(xprt); + xs_sock_reuse_connection(xprt); dprintk("RPC: worker connecting xprt %p to address: %s\n", xprt, xprt->address_strings[RPC_DISPLAY_ALL]);