From: Trond Myklebust <trond.myklebust@fys.uio.no>

Signed-off-by: Trond Myklebust <trond.myklebust@fys.uio.no>	
Signed-off-by: Andrew Morton <akpm@osdl.org>
---

 25-akpm/fs/nfs/unlink.c              |    3 
 25-akpm/include/linux/sunrpc/sched.h |   61 +++-
 25-akpm/net/sunrpc/sched.c           |  467 ++++++++++-------------------------
 25-akpm/net/sunrpc/xprt.c            |    2 
 4 files changed, 183 insertions(+), 350 deletions(-)

diff -puN fs/nfs/unlink.c~linux-2.6.8.1-49-rpc_workqueue fs/nfs/unlink.c
--- 25/fs/nfs/unlink.c~linux-2.6.8.1-49-rpc_workqueue	2004-08-23 21:59:32.133415400 -0700
+++ 25-akpm/fs/nfs/unlink.c	2004-08-23 21:59:32.141414184 -0700
@@ -215,7 +215,6 @@ nfs_complete_unlink(struct dentry *dentr
 	spin_lock(&dentry->d_lock);
 	dentry->d_flags &= ~DCACHE_NFSFS_RENAMED;
 	spin_unlock(&dentry->d_lock);
-	if (data->task.tk_rpcwait == &nfs_delete_queue)
-		rpc_wake_up_task(&data->task);
+	rpc_wake_up_task(&data->task);
 	nfs_put_unlinkdata(data);
 }
diff -puN include/linux/sunrpc/sched.h~linux-2.6.8.1-49-rpc_workqueue include/linux/sunrpc/sched.h
--- 25/include/linux/sunrpc/sched.h~linux-2.6.8.1-49-rpc_workqueue	2004-08-23 21:59:32.135415096 -0700
+++ 25-akpm/include/linux/sunrpc/sched.h	2004-08-23 21:59:32.142414032 -0700
@@ -12,6 +12,7 @@
 #include <linux/timer.h>
 #include <linux/sunrpc/types.h>
 #include <linux/wait.h>
+#include <linux/workqueue.h>
 #include <linux/sunrpc/xdr.h>
 
 /*
@@ -25,11 +26,18 @@ struct rpc_message {
 	struct rpc_cred *	rpc_cred;	/* Credentials */
 };
 
+struct rpc_wait_queue;
+struct rpc_wait {
+	struct list_head	list;		/* wait queue links */
+	struct list_head	links;		/* Links to related tasks */
+	wait_queue_head_t	waitq;		/* sync: sleep on this q */
+	struct rpc_wait_queue *	rpc_waitq;	/* RPC wait queue we're on */
+};
+
 /*
  * This is the RPC task struct
  */
 struct rpc_task {
-	struct list_head	tk_list;	/* wait queue links */
 #ifdef RPC_DEBUG
 	unsigned long		tk_magic;	/* 0xf00baa */
 #endif
@@ -37,7 +45,6 @@ struct rpc_task {
 	struct rpc_clnt *	tk_client;	/* RPC client */
 	struct rpc_rqst *	tk_rqstp;	/* RPC request */
 	int			tk_status;	/* result of last operation */
-	struct rpc_wait_queue *	tk_rpcwait;	/* RPC wait queue we're on */
 
 	/*
 	 * RPC call state
@@ -70,13 +77,18 @@ struct rpc_task {
 	 * you have a pathological interest in kernel oopses.
 	 */
 	struct timer_list	tk_timer;	/* kernel timer */
-	wait_queue_head_t	tk_wait;	/* sync: sleep on this q */
 	unsigned long		tk_timeout;	/* timeout for rpc_sleep() */
 	unsigned short		tk_flags;	/* misc flags */
 	unsigned char		tk_active   : 1;/* Task has been activated */
 	unsigned char		tk_priority : 2;/* Task priority */
 	unsigned long		tk_runstate;	/* Task run status */
-	struct list_head	tk_links;	/* links to related tasks */
+	struct workqueue_struct	*tk_workqueue;	/* Normally rpciod, but could
+						 * be any workqueue
+						 */
+	union {
+		struct work_struct	tk_work;	/* Async task work queue */
+		struct rpc_wait		tk_wait;	/* RPC wait */
+	} u;
 #ifdef RPC_DEBUG
 	unsigned short		tk_pid;		/* debugging aid */
 #endif
@@ -87,11 +99,11 @@ struct rpc_task {
 /* support walking a list of tasks on a wait queue */
 #define	task_for_each(task, pos, head) \
 	list_for_each(pos, head) \
-		if ((task=list_entry(pos, struct rpc_task, tk_list)),1)
+		if ((task=list_entry(pos, struct rpc_task, u.tk_wait.list)),1)
 
 #define	task_for_first(task, head) \
 	if (!list_empty(head) &&  \
-	    ((task=list_entry((head)->next, struct rpc_task, tk_list)),1))
+	    ((task=list_entry((head)->next, struct rpc_task, u.tk_wait.list)),1))
 
 /* .. and walking list of all tasks */
 #define	alltask_for_each(task, pos, head) \
@@ -126,22 +138,38 @@ typedef void			(*rpc_action)(struct rpc_
 #define RPC_IS_SOFT(t)		((t)->tk_flags & RPC_TASK_SOFT)
 #define RPC_TASK_UNINTERRUPTIBLE(t) ((t)->tk_flags & RPC_TASK_NOINTR)
 
-#define RPC_TASK_SLEEPING	0
-#define RPC_TASK_RUNNING	1
-#define RPC_IS_SLEEPING(t)	(test_bit(RPC_TASK_SLEEPING, &(t)->tk_runstate))
-#define RPC_IS_RUNNING(t)	(test_bit(RPC_TASK_RUNNING, &(t)->tk_runstate))
+#define RPC_TASK_RUNNING	0
+#define RPC_TASK_QUEUED		1
+#define RPC_TASK_WAKEUP		2
 
+#define RPC_IS_RUNNING(t)	(test_bit(RPC_TASK_RUNNING, &(t)->tk_runstate))
 #define rpc_set_running(t)	(set_bit(RPC_TASK_RUNNING, &(t)->tk_runstate))
-#define rpc_clear_running(t)	(clear_bit(RPC_TASK_RUNNING, &(t)->tk_runstate))
+#define rpc_test_and_set_running(t) \
+				(test_and_set_bit(RPC_TASK_RUNNING, &(t)->tk_runstate))
+#define rpc_clear_running(t)	\
+	do { \
+		smp_mb__before_clear_bit(); \
+		clear_bit(RPC_TASK_RUNNING, &(t)->tk_runstate); \
+		smp_mb__after_clear_bit(); \
+	} while (0)
 
-#define rpc_set_sleeping(t)	(set_bit(RPC_TASK_SLEEPING, &(t)->tk_runstate))
+#define RPC_IS_QUEUED(t)	(test_bit(RPC_TASK_QUEUED, &(t)->tk_runstate))
+#define rpc_set_queued(t)	(set_bit(RPC_TASK_QUEUED, &(t)->tk_runstate))
+#define rpc_clear_queued(t)	\
+	do { \
+		smp_mb__before_clear_bit(); \
+		clear_bit(RPC_TASK_QUEUED, &(t)->tk_runstate); \
+		smp_mb__after_clear_bit(); \
+	} while (0)
 
-#define rpc_clear_sleeping(t) \
+#define rpc_start_wakeup(t) \
+	(test_and_set_bit(RPC_TASK_WAKEUP, &(t)->tk_runstate) == 0)
+#define rpc_finish_wakeup(t) \
 	do { \
 		smp_mb__before_clear_bit(); \
-		clear_bit(RPC_TASK_SLEEPING, &(t)->tk_runstate); \
+		clear_bit(RPC_TASK_WAKEUP, &(t)->tk_runstate); \
 		smp_mb__after_clear_bit(); \
-	} while(0)
+	} while (0)
 
 /*
  * Task priorities.
@@ -209,13 +237,10 @@ void		rpc_killall_tasks(struct rpc_clnt 
 int		rpc_execute(struct rpc_task *);
 void		rpc_run_child(struct rpc_task *parent, struct rpc_task *child,
 					rpc_action action);
-int		rpc_add_wait_queue(struct rpc_wait_queue *, struct rpc_task *);
-void		rpc_remove_wait_queue(struct rpc_task *);
 void		rpc_init_priority_wait_queue(struct rpc_wait_queue *, const char *);
 void		rpc_init_wait_queue(struct rpc_wait_queue *, const char *);
 void		rpc_sleep_on(struct rpc_wait_queue *, struct rpc_task *,
 					rpc_action action, rpc_action timer);
-void		rpc_add_timer(struct rpc_task *, rpc_action);
 void		rpc_wake_up_task(struct rpc_task *);
 void		rpc_wake_up(struct rpc_wait_queue *);
 struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *);
diff -puN net/sunrpc/sched.c~linux-2.6.8.1-49-rpc_workqueue net/sunrpc/sched.c
--- 25/net/sunrpc/sched.c~linux-2.6.8.1-49-rpc_workqueue	2004-08-23 21:59:32.137414792 -0700
+++ 25-akpm/net/sunrpc/sched.c	2004-08-23 21:59:32.147413272 -0700
@@ -41,13 +41,7 @@ static mempool_t	*rpc_buffer_mempool;
 
 static void			__rpc_default_timer(struct rpc_task *task);
 static void			rpciod_killall(void);
-
-/*
- * When an asynchronous RPC task is activated within a bottom half
- * handler, or while executing another RPC task, it is put on
- * schedq, and rpciod is woken up.
- */
-static RPC_WAITQ(schedq, "schedq");
+static void			rpc_async_schedule(void *);
 
 /*
  * RPC tasks that create another task (e.g. for contacting the portmapper)
@@ -68,12 +62,9 @@ static LIST_HEAD(all_tasks);
 /*
  * rpciod-related stuff
  */
-static DECLARE_WAIT_QUEUE_HEAD(rpciod_idle);
-static DECLARE_COMPLETION(rpciod_killer);
 static DECLARE_MUTEX(rpciod_sema);
 static unsigned int		rpciod_users;
-static pid_t			rpciod_pid;
-static int			rpc_inhibit;
+static struct workqueue_struct *rpciod_workqueue;
 
 /*
  * Spinlock for wait queues. Access to the latter also has to be
@@ -105,16 +96,13 @@ __rpc_disable_timer(struct rpc_task *tas
  * without calling del_timer_sync(). The latter could cause a
  * deadlock if called while we're holding spinlocks...
  */
-static void
-rpc_run_timer(struct rpc_task *task)
+static void rpc_run_timer(struct rpc_task *task)
 {
 	void (*callback)(struct rpc_task *);
 
-	spin_lock_bh(&rpc_queue_lock);
 	callback = task->tk_timeout_fn;
 	task->tk_timeout_fn = NULL;
-	spin_unlock_bh(&rpc_queue_lock);
-	if (callback) {
+	if (callback && RPC_IS_QUEUED(task)) {
 		dprintk("RPC: %4d running timer\n", task->tk_pid);
 		callback(task);
 	}
@@ -140,17 +128,6 @@ __rpc_add_timer(struct rpc_task *task, r
 }
 
 /*
- * Set up a timer for an already sleeping task.
- */
-void rpc_add_timer(struct rpc_task *task, rpc_action timer)
-{
-	spin_lock_bh(&rpc_queue_lock);
-	if (!RPC_IS_RUNNING(task))
-		__rpc_add_timer(task, timer);
-	spin_unlock_bh(&rpc_queue_lock);
-}
-
-/*
  * Delete any timer for the current task. Because we use del_timer_sync(),
  * this function should never be called while holding rpc_queue_lock.
  */
@@ -169,16 +146,17 @@ static void __rpc_add_wait_queue_priorit
 	struct list_head *q;
 	struct rpc_task *t;
 
+	INIT_LIST_HEAD(&task->u.tk_wait.links);
 	q = &queue->tasks[task->tk_priority];
 	if (unlikely(task->tk_priority > queue->maxpriority))
 		q = &queue->tasks[queue->maxpriority];
-	list_for_each_entry(t, q, tk_list) {
+	list_for_each_entry(t, q, u.tk_wait.list) {
 		if (t->tk_cookie == task->tk_cookie) {
-			list_add_tail(&task->tk_list, &t->tk_links);
+			list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links);
 			return;
 		}
 	}
-	list_add_tail(&task->tk_list, q);
+	list_add_tail(&task->u.tk_wait.list, q);
 }
 
 /*
@@ -189,37 +167,21 @@ static void __rpc_add_wait_queue_priorit
  * improve overall performance.
  * Everyone else gets appended to the queue to ensure proper FIFO behavior.
  */
-static int __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
+static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
 {
-	if (task->tk_rpcwait == queue)
-		return 0;
+	BUG_ON (RPC_IS_QUEUED(task));
 
-	if (task->tk_rpcwait) {
-		printk(KERN_WARNING "RPC: doubly enqueued task!\n");
-		return -EWOULDBLOCK;
-	}
 	if (RPC_IS_PRIORITY(queue))
 		__rpc_add_wait_queue_priority(queue, task);
 	else if (RPC_IS_SWAPPER(task))
-		list_add(&task->tk_list, &queue->tasks[0]);
+		list_add(&task->u.tk_wait.list, &queue->tasks[0]);
 	else
-		list_add_tail(&task->tk_list, &queue->tasks[0]);
-	task->tk_rpcwait = queue;
+		list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]);
+	task->u.tk_wait.rpc_waitq = queue;
+	rpc_set_queued(task);
 
 	dprintk("RPC: %4d added to queue %p \"%s\"\n",
 				task->tk_pid, queue, rpc_qname(queue));
-
-	return 0;
-}
-
-int rpc_add_wait_queue(struct rpc_wait_queue *q, struct rpc_task *task)
-{
-	int		result;
-
-	spin_lock_bh(&rpc_queue_lock);
-	result = __rpc_add_wait_queue(q, task);
-	spin_unlock_bh(&rpc_queue_lock);
-	return result;
 }
 
 /*
@@ -229,12 +191,12 @@ static void __rpc_remove_wait_queue_prio
 {
 	struct rpc_task *t;
 
-	if (!list_empty(&task->tk_links)) {
-		t = list_entry(task->tk_links.next, struct rpc_task, tk_list);
-		list_move(&t->tk_list, &task->tk_list);
-		list_splice_init(&task->tk_links, &t->tk_links);
+	if (!list_empty(&task->u.tk_wait.links)) {
+		t = list_entry(task->u.tk_wait.links.next, struct rpc_task, u.tk_wait.list);
+		list_move(&t->u.tk_wait.list, &task->u.tk_wait.list);
+		list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links);
 	}
-	list_del(&task->tk_list);
+	list_del(&task->u.tk_wait.list);
 }
 
 /*
@@ -243,31 +205,17 @@ static void __rpc_remove_wait_queue_prio
  */
 static void __rpc_remove_wait_queue(struct rpc_task *task)
 {
-	struct rpc_wait_queue *queue = task->tk_rpcwait;
-
-	if (!queue)
-		return;
+	struct rpc_wait_queue *queue;
+	queue = task->u.tk_wait.rpc_waitq;
 
 	if (RPC_IS_PRIORITY(queue))
 		__rpc_remove_wait_queue_priority(task);
 	else
-		list_del(&task->tk_list);
-	task->tk_rpcwait = NULL;
-
+		list_del(&task->u.tk_wait.list);
 	dprintk("RPC: %4d removed from queue %p \"%s\"\n",
 				task->tk_pid, queue, rpc_qname(queue));
 }
 
-void
-rpc_remove_wait_queue(struct rpc_task *task)
-{
-	if (!task->tk_rpcwait)
-		return;
-	spin_lock_bh(&rpc_queue_lock);
-	__rpc_remove_wait_queue(task);
-	spin_unlock_bh(&rpc_queue_lock);
-}
-
 static inline void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority)
 {
 	queue->priority = priority;
@@ -316,34 +264,31 @@ EXPORT_SYMBOL(rpc_init_wait_queue);
  * Note: If the task is ASYNC, this must be called with 
  * the spinlock held to protect the wait queue operation.
  */
-static inline void
-rpc_make_runnable(struct rpc_task *task)
+static void rpc_make_runnable(struct rpc_task *task)
 {
-	if (task->tk_timeout_fn) {
-		printk(KERN_ERR "RPC: task w/ running timer in rpc_make_runnable!!\n");
+	int do_ret;
+
+	BUG_ON(task->tk_timeout_fn);
+	do_ret = rpc_test_and_set_running(task);
+	rpc_clear_queued(task);
+	if (do_ret)
 		return;
-	}
-	rpc_set_running(task);
 	if (RPC_IS_ASYNC(task)) {
-		if (RPC_IS_SLEEPING(task)) {
-			int status;
-			status = __rpc_add_wait_queue(&schedq, task);
-			if (status < 0) {
-				printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
-				task->tk_status = status;
-				return;
-			}
-			rpc_clear_sleeping(task);
-			wake_up(&rpciod_idle);
+		int status;
+
+		INIT_WORK(&task->u.tk_work, rpc_async_schedule, (void *)task);
+		status = queue_work(task->tk_workqueue, &task->u.tk_work);
+		if (status < 0) {
+			printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
+			task->tk_status = status;
+			return;
 		}
-	} else {
-		rpc_clear_sleeping(task);
-		wake_up(&task->tk_wait);
-	}
+	} else
+		wake_up(&task->u.tk_wait.waitq);
 }
 
 /*
- * Place a newly initialized task on the schedq.
+ * Place a newly initialized task on the workqueue.
  */
 static inline void
 rpc_schedule_run(struct rpc_task *task)
@@ -352,33 +297,18 @@ rpc_schedule_run(struct rpc_task *task)
 	if (RPC_IS_ACTIVATED(task))
 		return;
 	task->tk_active = 1;
-	rpc_set_sleeping(task);
 	rpc_make_runnable(task);
 }
 
 /*
- *	For other people who may need to wake the I/O daemon
- *	but should (for now) know nothing about its innards
- */
-void rpciod_wake_up(void)
-{
-	if(rpciod_pid==0)
-		printk(KERN_ERR "rpciod: wot no daemon?\n");
-	wake_up(&rpciod_idle);
-}
-
-/*
  * Prepare for sleeping on a wait queue.
  * By always appending tasks to the list we ensure FIFO behavior.
  * NB: An RPC task will only receive interrupt-driven events as long
  * as it's on a wait queue.
  */
-static void
-__rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
+static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
 			rpc_action action, rpc_action timer)
 {
-	int status;
-
 	dprintk("RPC: %4d sleep_on(queue \"%s\" time %ld)\n", task->tk_pid,
 				rpc_qname(q), jiffies);
 
@@ -388,24 +318,14 @@ __rpc_sleep_on(struct rpc_wait_queue *q,
 	}
 
 	/* Mark the task as being activated if so needed */
-	if (!RPC_IS_ACTIVATED(task)) {
+	if (!RPC_IS_ACTIVATED(task))
 		task->tk_active = 1;
-		rpc_set_sleeping(task);
-	}
 
-	status = __rpc_add_wait_queue(q, task);
-	if (status) {
-		printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
-		task->tk_status = status;
-	} else {
-		rpc_clear_running(task);
-		if (task->tk_callback) {
-			dprintk(KERN_ERR "RPC: %4d overwrites an active callback\n", task->tk_pid);
-			BUG();
-		}
-		task->tk_callback = action;
-		__rpc_add_timer(task, timer);
-	}
+	__rpc_add_wait_queue(q, task);
+
+	BUG_ON(task->tk_callback != NULL);
+	task->tk_callback = action;
+	__rpc_add_timer(task, timer);
 }
 
 void
@@ -421,16 +341,14 @@ rpc_sleep_on(struct rpc_wait_queue *q, s
 }
 
 /**
- * __rpc_wake_up_task - wake up a single rpc_task
+ * __rpc_do_wake_up_task - wake up a single rpc_task
  * @task: task to be woken up
  *
- * Caller must hold rpc_queue_lock
+ * Caller must hold rpc_queue_lock, and have cleared the task queued flag.
  */
-static void
-__rpc_wake_up_task(struct rpc_task *task)
+static void __rpc_do_wake_up_task(struct rpc_task *task)
 {
-	dprintk("RPC: %4d __rpc_wake_up_task (now %ld inh %d)\n",
-					task->tk_pid, jiffies, rpc_inhibit);
+	dprintk("RPC: %4d __rpc_wake_up_task (now %ld)\n", task->tk_pid, jiffies);
 
 #ifdef RPC_DEBUG
 	if (task->tk_magic != 0xf00baa) {
@@ -445,12 +363,9 @@ __rpc_wake_up_task(struct rpc_task *task
 		printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
 		return;
 	}
-	if (RPC_IS_RUNNING(task))
-		return;
 
 	__rpc_disable_timer(task);
-	if (task->tk_rpcwait != &schedq)
-		__rpc_remove_wait_queue(task);
+	__rpc_remove_wait_queue(task);
 
 	rpc_make_runnable(task);
 
@@ -458,6 +373,18 @@ __rpc_wake_up_task(struct rpc_task *task
 }
 
 /*
+ * Wake up the specified task
+ */
+static void __rpc_wake_up_task(struct rpc_task *task)
+{
+	if (rpc_start_wakeup(task)) {
+		if (RPC_IS_QUEUED(task))
+			__rpc_do_wake_up_task(task);
+		rpc_finish_wakeup(task);
+	}
+}
+
+/*
  * Default timeout handler if none specified by user
  */
 static void
@@ -471,14 +398,16 @@ __rpc_default_timer(struct rpc_task *tas
 /*
  * Wake up the specified task
  */
-void
-rpc_wake_up_task(struct rpc_task *task)
+void rpc_wake_up_task(struct rpc_task *task)
 {
-	if (RPC_IS_RUNNING(task))
-		return;
-	spin_lock_bh(&rpc_queue_lock);
-	__rpc_wake_up_task(task);
-	spin_unlock_bh(&rpc_queue_lock);
+	if (rpc_start_wakeup(task)) {
+		if (RPC_IS_QUEUED(task)) {
+			spin_lock_bh(&rpc_queue_lock);
+			__rpc_do_wake_up_task(task);
+			spin_unlock_bh(&rpc_queue_lock);
+		}
+		rpc_finish_wakeup(task);
+	}
 }
 
 /*
@@ -494,11 +423,11 @@ static struct rpc_task * __rpc_wake_up_n
 	 */
 	q = &queue->tasks[queue->priority];
 	if (!list_empty(q)) {
-		task = list_entry(q->next, struct rpc_task, tk_list);
+		task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
 		if (queue->cookie == task->tk_cookie) {
 			if (--queue->nr)
 				goto out;
-			list_move_tail(&task->tk_list, q);
+			list_move_tail(&task->u.tk_wait.list, q);
 		}
 		/*
 		 * Check if we need to switch queues.
@@ -516,7 +445,7 @@ static struct rpc_task * __rpc_wake_up_n
 		else
 			q = q - 1;
 		if (!list_empty(q)) {
-			task = list_entry(q->next, struct rpc_task, tk_list);
+			task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
 			goto new_queue;
 		}
 	} while (q != &queue->tasks[queue->priority]);
@@ -568,7 +497,7 @@ void rpc_wake_up(struct rpc_wait_queue *
 	head = &queue->tasks[queue->maxpriority];
 	for (;;) {
 		while (!list_empty(head)) {
-			task = list_entry(head->next, struct rpc_task, tk_list);
+			task = list_entry(head->next, struct rpc_task, u.tk_wait.list);
 			__rpc_wake_up_task(task);
 		}
 		if (head == &queue->tasks[0])
@@ -594,7 +523,7 @@ void rpc_wake_up_status(struct rpc_wait_
 	head = &queue->tasks[queue->maxpriority];
 	for (;;) {
 		while (!list_empty(head)) {
-			task = list_entry(head->next, struct rpc_task, tk_list);
+			task = list_entry(head->next, struct rpc_task, u.tk_wait.list);
 			task->tk_status = status;
 			__rpc_wake_up_task(task);
 		}
@@ -626,22 +555,23 @@ __rpc_atrun(struct rpc_task *task)
 /*
  * This is the RPC `scheduler' (or rather, the finite state machine).
  */
-static int
-__rpc_execute(struct rpc_task *task)
+static int __rpc_execute(struct rpc_task *task)
 {
 	int		status = 0;
 
 	dprintk("RPC: %4d rpc_execute flgs %x\n",
 				task->tk_pid, task->tk_flags);
 
-	if (!RPC_IS_RUNNING(task)) {
-		printk(KERN_WARNING "RPC: rpc_execute called for sleeping task!!\n");
-		return 0;
-	}
+	BUG_ON(RPC_IS_QUEUED(task));
 
  restarted:
 	while (1) {
 		/*
+		 * Garbage collection of pending timers...
+		 */
+		rpc_delete_timer(task);
+
+		/*
 		 * Execute any pending callback.
 		 */
 		if (RPC_DO_CALLBACK(task)) {
@@ -657,7 +587,9 @@ __rpc_execute(struct rpc_task *task)
 			 */
 			save_callback=task->tk_callback;
 			task->tk_callback=NULL;
+			lock_kernel();
 			save_callback(task);
+			unlock_kernel();
 		}
 
 		/*
@@ -665,43 +597,35 @@ __rpc_execute(struct rpc_task *task)
 		 * tk_action may be NULL when the task has been killed
 		 * by someone else.
 		 */
-		if (RPC_IS_RUNNING(task)) {
-			/*
-			 * Garbage collection of pending timers...
-			 */
-			rpc_delete_timer(task);
+		if (!RPC_IS_QUEUED(task)) {
 			if (!task->tk_action)
 				break;
+			lock_kernel();
 			task->tk_action(task);
-			/* micro-optimization to avoid spinlock */
-			if (RPC_IS_RUNNING(task))
-				continue;
+			unlock_kernel();
 		}
 
 		/*
-		 * Check whether task is sleeping.
+		 * Lockless check for whether task is sleeping or not.
 		 */
-		spin_lock_bh(&rpc_queue_lock);
-		if (!RPC_IS_RUNNING(task)) {
-			rpc_set_sleeping(task);
-			if (RPC_IS_ASYNC(task)) {
-				spin_unlock_bh(&rpc_queue_lock);
+		if (!RPC_IS_QUEUED(task))
+			continue;
+		rpc_clear_running(task);
+		if (RPC_IS_ASYNC(task)) {
+			/* Careful! we may have raced... */
+			if (RPC_IS_QUEUED(task))
 				return 0;
-			}
+			if (rpc_test_and_set_running(task))
+				return 0;
+			continue;
 		}
-		spin_unlock_bh(&rpc_queue_lock);
 
-		if (!RPC_IS_SLEEPING(task))
-			continue;
 		/* sync task: sleep here */
 		dprintk("RPC: %4d sync task going to sleep\n", task->tk_pid);
-		if (current->pid == rpciod_pid)
-			printk(KERN_ERR "RPC: rpciod waiting on sync task!\n");
-
 		if (RPC_TASK_UNINTERRUPTIBLE(task)) {
-			__wait_event(task->tk_wait, !RPC_IS_SLEEPING(task));
+			__wait_event(task->u.tk_wait.waitq, !RPC_IS_QUEUED(task));
 		} else {
-			__wait_event_interruptible(task->tk_wait, !RPC_IS_SLEEPING(task), status);
+			__wait_event_interruptible(task->u.tk_wait.waitq, !RPC_IS_QUEUED(task), status);
 			/*
 			 * When a sync task receives a signal, it exits with
 			 * -ERESTARTSYS. In order to catch any callbacks that
@@ -715,11 +639,14 @@ __rpc_execute(struct rpc_task *task)
 				rpc_wake_up_task(task);
 			}
 		}
+		rpc_set_running(task);
 		dprintk("RPC: %4d sync task resuming\n", task->tk_pid);
 	}
 
 	if (task->tk_exit) {
+		lock_kernel();
 		task->tk_exit(task);
+		unlock_kernel();
 		/* If tk_action is non-null, the user wants us to restart */
 		if (task->tk_action) {
 			if (!RPC_ASSASSINATED(task)) {
@@ -738,7 +665,6 @@ __rpc_execute(struct rpc_task *task)
 
 	/* Release all resources associated with the task */
 	rpc_release_task(task);
-
 	return status;
 }
 
@@ -754,57 +680,16 @@ __rpc_execute(struct rpc_task *task)
 int
 rpc_execute(struct rpc_task *task)
 {
-	int status = -EIO;
-	if (rpc_inhibit) {
-		printk(KERN_INFO "RPC: execution inhibited!\n");
-		goto out_release;
-	}
-
-	status = -EWOULDBLOCK;
-	if (task->tk_active) {
-		printk(KERN_ERR "RPC: active task was run twice!\n");
-		goto out_err;
-	}
+	BUG_ON(task->tk_active);
 
 	task->tk_active = 1;
 	rpc_set_running(task);
 	return __rpc_execute(task);
- out_release:
-	rpc_release_task(task);
- out_err:
-	return status;
 }
 
-/*
- * This is our own little scheduler for async RPC tasks.
- */
-static void
-__rpc_schedule(void)
+static void rpc_async_schedule(void *arg)
 {
-	struct rpc_task	*task;
-	int		count = 0;
-
-	dprintk("RPC:      rpc_schedule enter\n");
-	while (1) {
-
-		task_for_first(task, &schedq.tasks[0]) {
-			__rpc_remove_wait_queue(task);
-			spin_unlock_bh(&rpc_queue_lock);
-
-			__rpc_execute(task);
-			spin_lock_bh(&rpc_queue_lock);
-		} else {
-			break;
-		}
-
-		if (++count >= 200 || need_resched()) {
-			count = 0;
-			spin_unlock_bh(&rpc_queue_lock);
-			schedule();
-			spin_lock_bh(&rpc_queue_lock);
-		}
-	}
-	dprintk("RPC:      rpc_schedule leave\n");
+	__rpc_execute((struct rpc_task *)arg);
 }
 
 /*
@@ -862,7 +747,6 @@ void rpc_init_task(struct rpc_task *task
 	task->tk_client = clnt;
 	task->tk_flags  = flags;
 	task->tk_exit   = callback;
-	init_waitqueue_head(&task->tk_wait);
 	if (current->uid != current->fsuid || current->gid != current->fsgid)
 		task->tk_flags |= RPC_TASK_SETUID;
 
@@ -873,7 +757,11 @@ void rpc_init_task(struct rpc_task *task
 
 	task->tk_priority = RPC_PRIORITY_NORMAL;
 	task->tk_cookie = (unsigned long)current;
-	INIT_LIST_HEAD(&task->tk_links);
+
+	/* Initialize workqueue for async tasks */
+	task->tk_workqueue = rpciod_workqueue;
+	if (!RPC_IS_ASYNC(task))
+		init_waitqueue_head(&task->u.tk_wait.waitq);
 
 	/* Add to global list of all tasks */
 	spin_lock(&rpc_sched_lock);
@@ -963,19 +851,9 @@ rpc_release_task(struct rpc_task *task)
 	list_del(&task->tk_task);
 	spin_unlock(&rpc_sched_lock);
 
-	/* Protect the execution below. */
-	spin_lock_bh(&rpc_queue_lock);
-
-	/* Disable timer to prevent zombie wakeup */
-	__rpc_disable_timer(task);
-
-	/* Remove from any wait queue we're still on */
-	__rpc_remove_wait_queue(task);
-
+	BUG_ON (RPC_IS_QUEUED(task));
 	task->tk_active = 0;
 
-	spin_unlock_bh(&rpc_queue_lock);
-
 	/* Synchronously delete any running timer */
 	rpc_delete_timer(task);
 
@@ -1091,82 +969,6 @@ rpc_killall_tasks(struct rpc_clnt *clnt)
 
 static DECLARE_MUTEX_LOCKED(rpciod_running);
 
-static inline int
-rpciod_task_pending(void)
-{
-	return !list_empty(&schedq.tasks[0]);
-}
-
-
-/*
- * This is the rpciod kernel thread
- */
-static int
-rpciod(void *ptr)
-{
-	int		rounds = 0;
-
-	lock_kernel();
-	/*
-	 * Let our maker know we're running ...
-	 */
-	rpciod_pid = current->pid;
-	up(&rpciod_running);
-
-	daemonize("rpciod");
-	allow_signal(SIGKILL);
-
-	dprintk("RPC: rpciod starting (pid %d)\n", rpciod_pid);
-	spin_lock_bh(&rpc_queue_lock);
-	while (rpciod_users) {
-		DEFINE_WAIT(wait);
-		if (signalled()) {
-			spin_unlock_bh(&rpc_queue_lock);
-			rpciod_killall();
-			flush_signals(current);
-			spin_lock_bh(&rpc_queue_lock);
-		}
-		__rpc_schedule();
-		if (current->flags & PF_FREEZE) {
-			spin_unlock_bh(&rpc_queue_lock);
-			refrigerator(PF_FREEZE);
-			spin_lock_bh(&rpc_queue_lock);
-		}
-
-		if (++rounds >= 64) {	/* safeguard */
-			spin_unlock_bh(&rpc_queue_lock);
-			schedule();
-			rounds = 0;
-			spin_lock_bh(&rpc_queue_lock);
-		}
-
-		dprintk("RPC: rpciod back to sleep\n");
-		prepare_to_wait(&rpciod_idle, &wait, TASK_INTERRUPTIBLE);
-		if (!rpciod_task_pending() && !signalled()) {
-			spin_unlock_bh(&rpc_queue_lock);
-			schedule();
-			rounds = 0;
-			spin_lock_bh(&rpc_queue_lock);
-		}
-		finish_wait(&rpciod_idle, &wait);
-		dprintk("RPC: switch to rpciod\n");
-	}
-	spin_unlock_bh(&rpc_queue_lock);
-
-	dprintk("RPC: rpciod shutdown commences\n");
-	if (!list_empty(&all_tasks)) {
-		printk(KERN_ERR "rpciod: active tasks at shutdown?!\n");
-		rpciod_killall();
-	}
-
-	dprintk("RPC: rpciod exiting\n");
-	unlock_kernel();
-
-	rpciod_pid = 0;
-	complete_and_exit(&rpciod_killer, 0);
-	return 0;
-}
-
 static void
 rpciod_killall(void)
 {
@@ -1175,9 +977,7 @@ rpciod_killall(void)
 	while (!list_empty(&all_tasks)) {
 		clear_thread_flag(TIF_SIGPENDING);
 		rpc_killall_tasks(NULL);
-		spin_lock_bh(&rpc_queue_lock);
-		__rpc_schedule();
-		spin_unlock_bh(&rpc_queue_lock);
+		flush_workqueue(rpciod_workqueue);
 		if (!list_empty(&all_tasks)) {
 			dprintk("rpciod_killall: waiting for tasks to exit\n");
 			yield();
@@ -1195,28 +995,30 @@ rpciod_killall(void)
 int
 rpciod_up(void)
 {
+	struct workqueue_struct *wq;
 	int error = 0;
 
 	down(&rpciod_sema);
-	dprintk("rpciod_up: pid %d, users %d\n", rpciod_pid, rpciod_users);
+	dprintk("rpciod_up: users %d\n", rpciod_users);
 	rpciod_users++;
-	if (rpciod_pid)
+	if (rpciod_workqueue)
 		goto out;
 	/*
 	 * If there's no pid, we should be the first user.
 	 */
 	if (rpciod_users > 1)
-		printk(KERN_WARNING "rpciod_up: no pid, %d users??\n", rpciod_users);
+		printk(KERN_WARNING "rpciod_up: no workqueue, %d users??\n", rpciod_users);
 	/*
 	 * Create the rpciod thread and wait for it to start.
 	 */
-	error = kernel_thread(rpciod, NULL, 0);
-	if (error < 0) {
-		printk(KERN_WARNING "rpciod_up: create thread failed, error=%d\n", error);
+	error = -ENOMEM;
+	wq = create_workqueue("rpciod");
+	if (wq == NULL) {
+		printk(KERN_WARNING "rpciod_up: create workqueue failed, error=%d\n", error);
 		rpciod_users--;
 		goto out;
 	}
-	down(&rpciod_running);
+	rpciod_workqueue = wq;
 	error = 0;
 out:
 	up(&rpciod_sema);
@@ -1227,20 +1029,21 @@ void
 rpciod_down(void)
 {
 	down(&rpciod_sema);
-	dprintk("rpciod_down pid %d sema %d\n", rpciod_pid, rpciod_users);
+	dprintk("rpciod_down sema %d\n", rpciod_users);
 	if (rpciod_users) {
 		if (--rpciod_users)
 			goto out;
 	} else
-		printk(KERN_WARNING "rpciod_down: pid=%d, no users??\n", rpciod_pid);
+		printk(KERN_WARNING "rpciod_down: no users??\n");
 
-	if (!rpciod_pid) {
+	if (!rpciod_workqueue) {
 		dprintk("rpciod_down: Nothing to do!\n");
 		goto out;
 	}
+	rpciod_killall();
 
-	kill_proc(rpciod_pid, SIGKILL, 1);
-	wait_for_completion(&rpciod_killer);
+	destroy_workqueue(rpciod_workqueue);
+	rpciod_workqueue = NULL;
  out:
 	up(&rpciod_sema);
 }
@@ -1258,7 +1061,12 @@ void rpc_show_tasks(void)
 	}
 	printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
 		"-rpcwait -action- --exit--\n");
-	alltask_for_each(t, le, &all_tasks)
+	alltask_for_each(t, le, &all_tasks) {
+		const char *rpc_waitq = "none";
+
+		if (RPC_IS_QUEUED(t))
+			rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq);
+
 		printk("%05d %04d %04x %06d %8p %6d %8p %08ld %8s %8p %8p\n",
 			t->tk_pid,
 			(t->tk_msg.rpc_proc ? t->tk_msg.rpc_proc->p_proc : -1),
@@ -1266,8 +1074,9 @@ void rpc_show_tasks(void)
 			t->tk_client,
 			(t->tk_client ? t->tk_client->cl_prog : 0),
 			t->tk_rqstp, t->tk_timeout,
-			rpc_qname(t->tk_rpcwait),
+			rpc_waitq,
 			t->tk_action, t->tk_exit);
+	}
 	spin_unlock(&rpc_sched_lock);
 }
 #endif
diff -puN net/sunrpc/xprt.c~linux-2.6.8.1-49-rpc_workqueue net/sunrpc/xprt.c
--- 25/net/sunrpc/xprt.c~linux-2.6.8.1-49-rpc_workqueue	2004-08-23 21:59:32.138414640 -0700
+++ 25-akpm/net/sunrpc/xprt.c	2004-08-23 21:59:32.154412208 -0700
@@ -1101,7 +1101,7 @@ xprt_write_space(struct sock *sk)
 		goto out;
 
 	spin_lock_bh(&xprt->sock_lock);
-	if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending)
+	if (xprt->snd_task)
 		rpc_wake_up_task(xprt->snd_task);
 	spin_unlock_bh(&xprt->sock_lock);
 out:
_