diff -u --recursive --new-file linux-2.4.6-rdplus/include/linux/sunrpc/xprt.h linux-2.4.6-rpc_smpfixes/include/linux/sunrpc/xprt.h --- linux-2.4.6-rdplus/include/linux/sunrpc/xprt.h Fri Jun 22 17:24:25 2001 +++ linux-2.4.6-rpc_smpfixes/include/linux/sunrpc/xprt.h Fri Jun 22 19:23:19 2001 @@ -135,15 +135,13 @@ struct rpc_wait_queue sending; /* requests waiting to send */ struct rpc_wait_queue pending; /* requests in flight */ struct rpc_wait_queue backlog; /* waiting for slot */ - struct rpc_wait_queue reconn; /* waiting for reconnect */ struct rpc_rqst * free; /* free slots */ struct rpc_rqst slot[RPC_MAXREQS]; unsigned long sockstate; /* Socket state */ unsigned char shutdown : 1, /* being shut down */ nocong : 1, /* no congestion control */ stream : 1, /* TCP */ - tcp_more : 1, /* more record fragments */ - connecting : 1; /* being reconnected */ + tcp_more : 1; /* more record fragments */ /* * State of TCP reply receive stuff @@ -158,6 +156,8 @@ /* * Send stuff */ + rwlock_t sock_lock; /* lock socket info */ + spinlock_t xprt_lock; /* lock xprt info */ struct rpc_task * snd_task; /* Task blocked in send */ @@ -185,10 +185,9 @@ void xprt_release(struct rpc_task *); void xprt_reconnect(struct rpc_task *); int xprt_clear_backlog(struct rpc_xprt *); +int xprt_tcp_pending(void); void __rpciod_tcp_dispatcher(void); -extern struct list_head rpc_xprt_pending; - #define XPRT_WSPACE 0 #define XPRT_CONNECT 1 @@ -200,12 +199,6 @@ #define xprt_set_connected(xp) (set_bit(XPRT_CONNECT, &(xp)->sockstate)) #define xprt_test_and_set_connected(xp) (test_and_set_bit(XPRT_CONNECT, &(xp)->sockstate)) #define xprt_clear_connected(xp) (clear_bit(XPRT_CONNECT, &(xp)->sockstate)) - -static inline -int xprt_tcp_pending(void) -{ - return !list_empty(&rpc_xprt_pending); -} static inline void rpciod_tcp_dispatcher(void) diff -u --recursive --new-file linux-2.4.6-rdplus/net/sunrpc/clnt.c linux-2.4.6-rpc_smpfixes/net/sunrpc/clnt.c --- linux-2.4.6-rdplus/net/sunrpc/clnt.c Thu Apr 19 17:38:50 2001 +++ linux-2.4.6-rpc_smpfixes/net/sunrpc/clnt.c Fri Jun 22 19:23:06 2001 @@ -55,6 +55,8 @@ static void call_refreshresult(struct rpc_task *task); static void call_timeout(struct rpc_task *task); static void call_reconnect(struct rpc_task *task); +static void child_reconnect(struct rpc_task *); +static void child_reconnect_status(struct rpc_task *); static u32 * call_header(struct rpc_task *task); static u32 * call_verify(struct rpc_task *task); @@ -526,6 +528,7 @@ call_reconnect(struct rpc_task *task) { struct rpc_clnt *clnt = task->tk_client; + struct rpc_task *child; dprintk("RPC: %4d call_reconnect status %d\n", task->tk_pid, task->tk_status); @@ -533,8 +536,29 @@ task->tk_action = call_transmit; if (task->tk_status < 0 || !clnt->cl_xprt->stream) return; - clnt->cl_stats->netreconn++; + + /* Run as a child to ensure it runs as an rpciod task */ + child = rpc_new_child(clnt, task); + if (child) { + child->tk_action = child_reconnect; + rpc_run_child(task, child, NULL); + } +} + +static void child_reconnect(struct rpc_task *task) +{ + task->tk_client->cl_stats->netreconn++; + task->tk_status = 0; + task->tk_action = child_reconnect_status; xprt_reconnect(task); +} + +static void child_reconnect_status(struct rpc_task *task) +{ + if (task->tk_status == -EAGAIN) + task->tk_action = child_reconnect; + else + task->tk_action = NULL; } /* diff -u --recursive --new-file linux-2.4.6-rdplus/net/sunrpc/xprt.c linux-2.4.6-rpc_smpfixes/net/sunrpc/xprt.c --- linux-2.4.6-rdplus/net/sunrpc/xprt.c Thu Apr 12 21:11:39 2001 +++ linux-2.4.6-rpc_smpfixes/net/sunrpc/xprt.c Fri Jun 22 19:23:06 2001 @@ -75,10 +75,6 @@ * Local variables */ -/* Spinlock for critical sections in the code. */ -spinlock_t xprt_sock_lock = SPIN_LOCK_UNLOCKED; -spinlock_t xprt_lock = SPIN_LOCK_UNLOCKED; - #ifdef RPC_DEBUG # undef RPC_DEBUG_DATA # define RPCDBG_FACILITY RPCDBG_XPRT @@ -177,6 +173,44 @@ } /* + * Serialize write access to sockets, in order to prevent different + * requests from interfering with each other. + * Also prevents TCP socket reconnections from colliding with writes. + */ +static int +xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) +{ + int retval; + write_lock_bh(&xprt->sock_lock); + if (!xprt->snd_task) + xprt->snd_task = task; + else if (xprt->snd_task != task) { + dprintk("RPC: %4d TCP write queue full (task %d)\n", + task->tk_pid, xprt->snd_task->tk_pid); + task->tk_timeout = 0; + task->tk_status = -EAGAIN; + rpc_sleep_on(&xprt->sending, task, NULL, NULL); + } + retval = xprt->snd_task == task; + write_unlock_bh(&xprt->sock_lock); + return retval; +} + +/* + * Releases the socket for use by other requests. + */ +static void +xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) +{ + write_lock_bh(&xprt->sock_lock); + if (xprt->snd_task == task) { + xprt->snd_task = NULL; + rpc_wake_up_next(&xprt->sending); + } + write_unlock_bh(&xprt->sock_lock); +} + +/* * Write data to socket. */ static inline int @@ -290,7 +324,10 @@ if (xprt->nocong) return; - spin_lock_bh(&xprt_sock_lock); + /* + * Note: we're in a BH context + */ + spin_lock(&xprt->xprt_lock); cwnd = xprt->cwnd; if (result >= 0) { if (xprt->cong < cwnd || time_before(jiffies, xprt->congtime)) @@ -318,7 +355,7 @@ xprt->cwnd = cwnd; out: - spin_unlock_bh(&xprt_sock_lock); + spin_unlock(&xprt->xprt_lock); } /* @@ -399,6 +436,8 @@ /* * Reconnect a broken TCP connection. + * + * Note: This cannot collide with the TCP reads, as both run from rpciod */ void xprt_reconnect(struct rpc_task *task) @@ -421,15 +460,10 @@ return; } - spin_lock(&xprt_lock); - if (xprt->connecting) { - task->tk_timeout = 0; - rpc_sleep_on(&xprt->reconn, task, NULL, NULL); - spin_unlock(&xprt_lock); + if (!xprt_lock_write(xprt, task)) return; - } - xprt->connecting = 1; - spin_unlock(&xprt_lock); + if (xprt_connected(xprt)) + goto out_write; status = -ENOTCONN; if (!inet) { @@ -444,6 +478,7 @@ /* Reset TCP record info */ xprt->tcp_offset = 0; + xprt->tcp_reclen = 0; xprt->tcp_copied = 0; xprt->tcp_more = 0; @@ -472,24 +507,22 @@ dprintk("RPC: %4d connect status %d connected %d\n", task->tk_pid, status, xprt_connected(xprt)); - spin_lock_bh(&xprt_sock_lock); + write_lock_bh(&xprt->sock_lock); if (!xprt_connected(xprt)) { task->tk_timeout = xprt->timeout.to_maxval; - rpc_sleep_on(&xprt->reconn, task, xprt_reconn_status, NULL); - spin_unlock_bh(&xprt_sock_lock); + rpc_sleep_on(&xprt->sending, task, xprt_reconn_status, NULL); + write_unlock_bh(&xprt->sock_lock); return; } - spin_unlock_bh(&xprt_sock_lock); + write_unlock_bh(&xprt->sock_lock); } defer: - spin_lock(&xprt_lock); - xprt->connecting = 0; if (status < 0) { rpc_delay(task, 5*HZ); task->tk_status = -ENOTCONN; } - rpc_wake_up(&xprt->reconn); - spin_unlock(&xprt_lock); + out_write: + xprt_release_write(xprt, task); } /* @@ -504,10 +537,7 @@ dprintk("RPC: %4d xprt_reconn_timeout %d\n", task->tk_pid, task->tk_status); - spin_lock(&xprt_lock); - xprt->connecting = 0; - rpc_wake_up(&xprt->reconn); - spin_unlock(&xprt_lock); + xprt_release_write(xprt, task); } /* @@ -704,10 +734,6 @@ struct iovec riov; int want, result; - if (xprt->tcp_offset >= xprt->tcp_reclen + sizeof(xprt->tcp_recm)) { - xprt->tcp_offset = 0; - xprt->tcp_reclen = 0; - } if (xprt->tcp_offset >= sizeof(xprt->tcp_recm)) goto done; @@ -723,10 +749,6 @@ want -= result; } while (want); - /* Is this another fragment in the last message */ - if (!xprt->tcp_more) - xprt->tcp_copied = 0; /* No, so we're reading a new message */ - /* Get the record length and mask out the last fragment bit */ xprt->tcp_reclen = ntohl(xprt->tcp_recm); xprt->tcp_more = (xprt->tcp_reclen & 0x80000000) ? 0 : 1; @@ -848,14 +870,15 @@ /* Read in a new fragment marker if necessary */ /* Can we ever really expect to get completely empty fragments? */ - if ((result = tcp_read_fraghdr(xprt)) <= 0) + if ((result = tcp_read_fraghdr(xprt)) < 0) return result; avail = result; /* Read in the xid if necessary */ - if ((result = tcp_read_xid(xprt, avail)) <= 0) + if ((result = tcp_read_xid(xprt, avail)) < 0) return result; - avail = result; + if (!(avail = result)) + goto out_ok; /* Find and lock the request corresponding to this xid */ req = xprt_lookup_rqst(xprt, xprt->tcp_xid); @@ -873,9 +896,14 @@ if ((result = tcp_read_discard(xprt, avail)) < 0) return result; + out_ok: dprintk("RPC: tcp_input_record done (off %d reclen %d copied %d)\n", xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_copied); result = xprt->tcp_reclen; + xprt->tcp_reclen = 0; + xprt->tcp_offset = 0; + if (!xprt->tcp_more) + xprt->tcp_copied = 0; return result; } @@ -890,11 +918,19 @@ rpciod_wake_up(); } +int xprt_tcp_pending(void) +{ + int retval; + + spin_lock_bh(&rpc_queue_lock); + retval = !list_empty(&rpc_xprt_pending); + spin_unlock_bh(&rpc_queue_lock); + return retval; +} + static inline void xprt_append_pending(struct rpc_xprt *xprt) { - if (!list_empty(&xprt->rx_pending)) - return; spin_lock_bh(&rpc_queue_lock); if (list_empty(&xprt->rx_pending)) { list_add(&xprt->rx_pending, rpc_xprt_pending.prev); @@ -1008,11 +1044,10 @@ case TCP_ESTABLISHED: if (xprt_test_and_set_connected(xprt)) break; - spin_lock_bh(&xprt_sock_lock); + read_lock(&xprt->sock_lock); if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending) rpc_wake_up_task(xprt->snd_task); - rpc_wake_up(&xprt->reconn); - spin_unlock_bh(&xprt_sock_lock); + read_unlock(&xprt->sock_lock); break; case TCP_SYN_SENT: case TCP_SYN_RECV: @@ -1046,10 +1081,10 @@ return; if (!xprt_test_and_set_wspace(xprt)) { - spin_lock_bh(&xprt_sock_lock); + read_lock(&xprt->sock_lock); if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending) rpc_wake_up_task(xprt->snd_task); - spin_unlock_bh(&xprt_sock_lock); + read_unlock(&xprt->sock_lock); } if (test_bit(SOCK_NOSPACE, &sock->flags)) { @@ -1076,10 +1111,10 @@ return; if (!xprt_test_and_set_wspace(xprt)) { - spin_lock_bh(&xprt_sock_lock); + read_lock(&xprt->sock_lock); if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending) rpc_wake_up_task(xprt->snd_task); - spin_unlock_bh(&xprt_sock_lock); + read_unlock(&xprt->sock_lock); } if (sk->sleep && waitqueue_active(sk->sleep)) @@ -1105,55 +1140,6 @@ rpc_wake_up_task(task); } - -/* - * Serialize access to sockets, in order to prevent different - * requests from interfering with each other. - */ -static int -xprt_down_transmit(struct rpc_task *task) -{ - struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; - struct rpc_rqst *req = task->tk_rqstp; - - spin_lock_bh(&xprt_sock_lock); - spin_lock(&xprt_lock); - if (xprt->snd_task && xprt->snd_task != task) { - dprintk("RPC: %4d TCP write queue full (task %d)\n", - task->tk_pid, xprt->snd_task->tk_pid); - task->tk_timeout = 0; - task->tk_status = -EAGAIN; - rpc_sleep_on(&xprt->sending, task, NULL, NULL); - } else if (!xprt->snd_task) { - xprt->snd_task = task; -#ifdef RPC_PROFILE - req->rq_xtime = jiffies; -#endif - req->rq_bytes_sent = 0; - } - spin_unlock(&xprt_lock); - spin_unlock_bh(&xprt_sock_lock); - return xprt->snd_task == task; -} - -/* - * Releases the socket for use by other requests. - */ -static inline void -xprt_up_transmit(struct rpc_task *task) -{ - struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; - - if (xprt->snd_task && xprt->snd_task == task) { - spin_lock_bh(&xprt_sock_lock); - spin_lock(&xprt_lock); - xprt->snd_task = NULL; - rpc_wake_up_next(&xprt->sending); - spin_unlock(&xprt_lock); - spin_unlock_bh(&xprt_sock_lock); - } -} - /* * Place the actual RPC call. * We have to copy the iovec because sendmsg fiddles with its contents. @@ -1187,9 +1173,12 @@ *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker))); } - if (!xprt_down_transmit(task)) + if (!xprt_lock_write(xprt, task)) return; +#ifdef RPC_PROFILE + req->rq_xtime = jiffies; +#endif do_xprt_transmit(task); } @@ -1257,12 +1246,12 @@ switch (status) { case -ENOMEM: /* Protect against (udp|tcp)_write_space */ - spin_lock_bh(&xprt_sock_lock); + write_lock_bh(&xprt->sock_lock); if (!xprt_wspace(xprt)) { task->tk_timeout = req->rq_timeout.to_current; rpc_sleep_on(&xprt->sending, task, NULL, NULL); } - spin_unlock_bh(&xprt_sock_lock); + write_unlock_bh(&xprt->sock_lock); return; case -EAGAIN: /* Keep holding the socket if it is blocked */ @@ -1273,6 +1262,9 @@ if (!xprt->stream) return; default: + if (xprt->stream) + xprt_disconnect(xprt); + req->rq_bytes_sent = 0; goto out_release; } @@ -1283,7 +1275,7 @@ rpc_add_timer(task, xprt_timer); rpc_unlock_task(task); out_release: - xprt_up_transmit(task); + xprt_release_write(xprt, task); } /* @@ -1318,7 +1310,7 @@ dprintk("RPC: %4d xprt_reserve cong = %ld cwnd = %ld\n", task->tk_pid, xprt->cong, xprt->cwnd); - spin_lock_bh(&xprt_sock_lock); + spin_lock_bh(&xprt->xprt_lock); xprt_reserve_status(task); if (task->tk_rqstp) { task->tk_timeout = 0; @@ -1329,7 +1321,7 @@ task->tk_status = -EAGAIN; rpc_sleep_on(&xprt->backlog, task, NULL, NULL); } - spin_unlock_bh(&xprt_sock_lock); + spin_unlock_bh(&xprt->xprt_lock); dprintk("RPC: %4d xprt_reserve returns %d\n", task->tk_pid, task->tk_status); return task->tk_status; @@ -1402,7 +1394,11 @@ struct rpc_xprt *xprt = task->tk_xprt; struct rpc_rqst *req; - xprt_up_transmit(task); + if (xprt->snd_task == task) { + if (xprt->stream) + xprt_disconnect(xprt); + xprt_release_write(xprt, task); + } if (!(req = task->tk_rqstp)) return; task->tk_rqstp = NULL; @@ -1416,7 +1412,7 @@ rpc_remove_wait_queue(task); } - spin_lock_bh(&xprt_sock_lock); + spin_lock_bh(&xprt->xprt_lock); req->rq_next = xprt->free; xprt->free = req; @@ -1424,7 +1420,7 @@ xprt->cong -= RPC_CWNDSCALE; xprt_clear_backlog(xprt); - spin_unlock_bh(&xprt_sock_lock); + spin_unlock_bh(&xprt->xprt_lock); } /* @@ -1481,6 +1477,8 @@ } else xprt->cwnd = RPC_INITCWND; xprt->congtime = jiffies; + rwlock_init(&xprt->sock_lock); + spin_lock_init(&xprt->xprt_lock); init_waitqueue_head(&xprt->cong_wait); /* Set timeout parameters */ @@ -1494,7 +1492,6 @@ xprt->pending = RPC_INIT_WAITQ("xprt_pending"); xprt->sending = RPC_INIT_WAITQ("xprt_sending"); xprt->backlog = RPC_INIT_WAITQ("xprt_backlog"); - xprt->reconn = RPC_INIT_WAITQ("xprt_reconn"); /* initialize free list */ for (i = 0, req = xprt->slot; i < RPC_MAXREQS-1; i++, req++) @@ -1630,7 +1627,6 @@ rpc_wake_up(&xprt->sending); rpc_wake_up(&xprt->pending); rpc_wake_up(&xprt->backlog); - rpc_wake_up(&xprt->reconn); if (waitqueue_active(&xprt->cong_wait)) wake_up(&xprt->cong_wait); }