diff -u --recursive --new-file linux-2.4.19-fix_kmap3/include/linux/sunrpc/xprt.h linux-2.4.19-rpc_rep/include/linux/sunrpc/xprt.h --- linux-2.4.19-fix_kmap3/include/linux/sunrpc/xprt.h Thu Aug 1 11:08:29 2002 +++ linux-2.4.19-rpc_rep/include/linux/sunrpc/xprt.h Thu Aug 1 11:08:58 2002 @@ -83,7 +83,9 @@ struct rpc_task * rq_task; /* RPC task data */ __u32 rq_xid; /* request XID */ struct rpc_rqst * rq_next; /* free list */ - volatile unsigned char rq_received : 1;/* receive completed */ + int rq_received; /* receive completed */ + + struct list_head rq_list; /* * For authentication (e.g. auth_des) @@ -149,6 +151,8 @@ spinlock_t xprt_lock; /* lock xprt info */ struct rpc_task * snd_task; /* Task blocked in send */ + struct list_head recv; + void (*old_data_ready)(struct sock *, int); void (*old_state_change)(struct sock *); diff -u --recursive --new-file linux-2.4.19-fix_kmap3/net/sunrpc/clnt.c linux-2.4.19-rpc_rep/net/sunrpc/clnt.c --- linux-2.4.19-fix_kmap3/net/sunrpc/clnt.c Thu Aug 1 11:07:56 2002 +++ linux-2.4.19-rpc_rep/net/sunrpc/clnt.c Thu Aug 1 11:09:08 2002 @@ -593,19 +593,22 @@ { struct rpc_clnt *clnt = task->tk_client; struct rpc_xprt *xprt = clnt->cl_xprt; - struct rpc_rqst *req; - int status = task->tk_status; + struct rpc_rqst *req = task->tk_rqstp; + int status; + + if (req->rq_received != 0) + task->tk_status = req->rq_received; dprintk("RPC: %4d call_status (status %d)\n", task->tk_pid, task->tk_status); + status = task->tk_status; if (status >= 0) { task->tk_action = call_decode; return; } task->tk_status = 0; - req = task->tk_rqstp; switch(status) { case -ETIMEDOUT: task->tk_action = call_timeout; diff -u --recursive --new-file linux-2.4.19-fix_kmap3/net/sunrpc/xprt.c linux-2.4.19-rpc_rep/net/sunrpc/xprt.c --- linux-2.4.19-fix_kmap3/net/sunrpc/xprt.c Thu Aug 1 11:08:29 2002 +++ linux-2.4.19-rpc_rep/net/sunrpc/xprt.c Thu Aug 1 11:11:18 2002 @@ -67,8 +67,6 @@ #include -extern spinlock_t rpc_queue_lock; - /* * Local variables */ @@ -135,10 +133,8 @@ * Also prevents TCP socket reconnections from colliding with writes. */ static int -xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) +__xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) { - int retval; - spin_lock_bh(&xprt->sock_lock); if (!xprt->snd_task) xprt->snd_task = task; else if (xprt->snd_task != task) { @@ -148,7 +144,15 @@ task->tk_status = -EAGAIN; rpc_sleep_on(&xprt->sending, task, NULL, NULL); } - retval = xprt->snd_task == task; + return xprt->snd_task == task; +} + +static inline int +xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) +{ + int retval; + spin_lock_bh(&xprt->sock_lock); + retval = __xprt_lock_write(xprt, task); spin_unlock_bh(&xprt->sock_lock); return retval; } @@ -464,30 +468,16 @@ static inline struct rpc_rqst * xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid) { - struct rpc_task *head, *task; - struct rpc_rqst *req; - int safe = 0; + struct list_head *pos; + struct rpc_rqst *req = NULL; - spin_lock_bh(&rpc_queue_lock); - if ((head = xprt->pending.task) != NULL) { - task = head; - do { - if ((req = task->tk_rqstp) && req->rq_xid == xid) - goto out; - task = task->tk_next; - if (++safe > 100) { - printk("xprt_lookup_rqst: loop in Q!\n"); - goto out_bad; - } - } while (task != head); - } - dprintk("RPC: unknown XID %08x in reply.\n", xid); - out_bad: - req = NULL; - out: - if (req && !__rpc_lock_task(req->rq_task)) - req = NULL; - spin_unlock_bh(&rpc_queue_lock); + list_for_each(pos, &xprt->recv) { + struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list); + if (entry->rq_xid == xid) { + req = entry; + break; + } + } return req; } @@ -523,8 +513,8 @@ #endif dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied); - task->tk_status = copied; - req->rq_received = 1; + req->rq_received = copied; + list_del_init(&req->rq_list); /* ... and wake up the process. */ rpc_wake_up_task(task); @@ -622,9 +612,10 @@ } /* Look up and lock the request corresponding to the given XID */ + spin_lock(&xprt->sock_lock); rovr = xprt_lookup_rqst(xprt, *(u32 *) (skb->h.raw + sizeof(struct udphdr))); if (!rovr) - goto dropit; + goto out_unlock; task = rovr->rq_task; dprintk("RPC: %4d received reply\n", task->tk_pid); @@ -644,8 +635,7 @@ xprt_complete_rqst(xprt, rovr, copied); out_unlock: - rpc_unlock_task(task); - + spin_unlock(&xprt->sock_lock); dropit: skb_free_datagram(sk, skb); out: @@ -747,11 +737,13 @@ size_t len; /* Find and lock the request corresponding to this xid */ + spin_lock(&xprt->sock_lock); req = xprt_lookup_rqst(xprt, xprt->tcp_xid); if (!req) { xprt->tcp_flags &= ~XPRT_COPY_DATA; dprintk("RPC: XID %08x request not found!\n", xprt->tcp_xid); + spin_unlock(&xprt->sock_lock); return; } @@ -785,7 +777,7 @@ req->rq_task->tk_pid); xprt_complete_rqst(xprt, req, xprt->tcp_copied); } - rpc_unlock_task(req->rq_task); + spin_unlock(&xprt->sock_lock); tcp_check_recm(xprt); } @@ -942,16 +934,21 @@ xprt_timer(struct rpc_task *task) { struct rpc_rqst *req = task->tk_rqstp; + struct rpc_xprt *xprt = req->rq_xprt; - if (req) - xprt_adjust_cwnd(task->tk_xprt, -ETIMEDOUT); + spin_lock(&xprt->sock_lock); + if (req->rq_received) + goto out; + xprt_adjust_cwnd(xprt, -ETIMEDOUT); dprintk("RPC: %4d xprt_timer (%s request)\n", task->tk_pid, req ? "pending" : "backlogged"); task->tk_status = -ETIMEDOUT; +out: task->tk_timeout = 0; rpc_wake_up_task(task); + spin_unlock(&xprt->sock_lock); } /* @@ -987,8 +984,16 @@ *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker))); } - if (!xprt_lock_write(xprt, task)) + spin_lock_bh(&xprt->sock_lock); + if (!__xprt_lock_write(xprt, task)) { + spin_unlock_bh(&xprt->sock_lock); return; + } + if (list_empty(&req->rq_list)) { + list_add_tail(&req->rq_list, &xprt->recv); + req->rq_received = 0; + } + spin_unlock_bh(&xprt->sock_lock); #ifdef RPC_PROFILE req->rq_xtime = jiffies; @@ -1004,14 +1009,6 @@ int status, retry = 0; - /* For fast networks/servers we have to put the request on - * the pending list now: - * Note that we don't want the task timing out during the - * call to xprt_sendmsg(), so we initially disable the timeout, - * and then reset it later... - */ - xprt_receive(task); - /* Continue transmitting the packet/record. We must be careful * to cope with writespace callbacks arriving _after_ we have * called xprt_sendmsg(). @@ -1043,15 +1040,11 @@ if (retry++ > 50) break; } - rpc_unlock_task(task); /* Note: at this point, task->tk_sleeping has not yet been set, * hence there is no danger of the waking up task being put on * schedq, and being picked up by a parallel run of rpciod(). */ - rpc_wake_up_task(task); - if (!RPC_IS_RUNNING(task)) - goto out_release; if (req->rq_received) goto out_release; @@ -1086,31 +1079,15 @@ dprintk("RPC: %4d xmit complete\n", task->tk_pid); /* Set the task's receive timeout value */ task->tk_timeout = req->rq_timeout.to_current; - rpc_add_timer(task, xprt_timer); - rpc_unlock_task(task); + spin_lock_bh(&xprt->sock_lock); + if (!req->rq_received) + rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer); + spin_unlock_bh(&xprt->sock_lock); out_release: xprt_release_write(xprt, task); } /* - * Queue the task for a reply to our call. - * When the callback is invoked, the congestion window should have - * been updated already. - */ -void -xprt_receive(struct rpc_task *task) -{ - struct rpc_rqst *req = task->tk_rqstp; - struct rpc_xprt *xprt = req->rq_xprt; - - dprintk("RPC: %4d xprt_receive\n", task->tk_pid); - - req->rq_received = 0; - task->tk_timeout = 0; - rpc_sleep_locked(&xprt->pending, task, NULL, NULL); -} - -/* * Reserve an RPC call slot. */ int @@ -1197,6 +1174,7 @@ req->rq_xid = xid++; if (!xid) xid++; + INIT_LIST_HEAD(&req->rq_list); } /* @@ -1215,6 +1193,10 @@ } if (!(req = task->tk_rqstp)) return; + spin_lock_bh(&xprt->sock_lock); + if (!list_empty(&req->rq_list)) + list_del(&req->rq_list); + spin_unlock_bh(&xprt->sock_lock); task->tk_rqstp = NULL; memset(req, 0, sizeof(*req)); /* mark unused */ @@ -1289,6 +1271,8 @@ spin_lock_init(&xprt->xprt_lock); init_waitqueue_head(&xprt->cong_wait); + INIT_LIST_HEAD(&xprt->recv); + /* Set timeout parameters */ if (to) { xprt->timeout = *to;