From: Trond Myklebust <trond.myklebust@fys.uio.no>

RPC: Remove the rpc_queue_lock global spinlock.  Replace it with per-rpc_queue
spinlocks

Signed-off-by: Trond Myklebust <trond.myklebust@fys.uio.no>	
Signed-off-by: Andrew Morton <akpm@osdl.org>
---

 25-akpm/include/linux/sunrpc/sched.h |    4 ++
 25-akpm/net/sunrpc/sched.c           |   69 +++++++++++++++--------------------
 2 files changed, 34 insertions(+), 39 deletions(-)

diff -puN include/linux/sunrpc/sched.h~linux-2.6.8.1-50-rpc_queue_lock include/linux/sunrpc/sched.h
--- 25/include/linux/sunrpc/sched.h~linux-2.6.8.1-50-rpc_queue_lock	2004-08-22 20:57:41.320341224 -0700
+++ 25-akpm/include/linux/sunrpc/sched.h	2004-08-22 20:57:41.325340464 -0700
@@ -11,6 +11,7 @@
 
 #include <linux/timer.h>
 #include <linux/sunrpc/types.h>
+#include <linux/spinlock.h>
 #include <linux/wait.h>
 #include <linux/workqueue.h>
 #include <linux/sunrpc/xdr.h>
@@ -185,6 +186,7 @@ typedef void			(*rpc_action)(struct rpc_
  * RPC synchronization objects
  */
 struct rpc_wait_queue {
+	spinlock_t		lock;
 	struct list_head	tasks[RPC_NR_PRIORITY];	/* task queue for each priority level */
 	unsigned long		cookie;			/* cookie of last task serviced */
 	unsigned char		maxpriority;		/* maximum priority (0 if queue is not a priority queue) */
@@ -205,6 +207,7 @@ struct rpc_wait_queue {
 
 #ifndef RPC_DEBUG
 # define RPC_WAITQ_INIT(var,qname) { \
+		.lock = SPIN_LOCK_UNLOCKED, \
 		.tasks = { \
 			[0] = LIST_HEAD_INIT(var.tasks[0]), \
 			[1] = LIST_HEAD_INIT(var.tasks[1]), \
@@ -213,6 +216,7 @@ struct rpc_wait_queue {
 	}
 #else
 # define RPC_WAITQ_INIT(var,qname) { \
+		.lock = SPIN_LOCK_UNLOCKED, \
 		.tasks = { \
 			[0] = LIST_HEAD_INIT(var.tasks[0]), \
 			[1] = LIST_HEAD_INIT(var.tasks[1]), \
diff -puN net/sunrpc/sched.c~linux-2.6.8.1-50-rpc_queue_lock net/sunrpc/sched.c
--- 25/net/sunrpc/sched.c~linux-2.6.8.1-50-rpc_queue_lock	2004-08-22 20:57:41.322340920 -0700
+++ 25-akpm/net/sunrpc/sched.c	2004-08-22 20:57:41.327340160 -0700
@@ -67,18 +67,13 @@ static unsigned int		rpciod_users;
 static struct workqueue_struct *rpciod_workqueue;
 
 /*
- * Spinlock for wait queues. Access to the latter also has to be
- * interrupt-safe in order to allow timers to wake up sleeping tasks.
- */
-static spinlock_t rpc_queue_lock = SPIN_LOCK_UNLOCKED;
-/*
  * Spinlock for other critical sections of code.
  */
 static spinlock_t rpc_sched_lock = SPIN_LOCK_UNLOCKED;
 
 /*
  * Disable the timer for a given RPC task. Should be called with
- * rpc_queue_lock and bh_disabled in order to avoid races within
+ * queue->lock and bh_disabled in order to avoid races within
  * rpc_run_timer().
  */
 static inline void
@@ -129,7 +124,7 @@ __rpc_add_timer(struct rpc_task *task, r
 
 /*
  * Delete any timer for the current task. Because we use del_timer_sync(),
- * this function should never be called while holding rpc_queue_lock.
+ * this function should never be called while holding queue->lock.
  */
 static inline void
 rpc_delete_timer(struct rpc_task *task)
@@ -238,6 +233,7 @@ static void __rpc_init_priority_wait_que
 {
 	int i;
 
+	spin_lock_init(&queue->lock);
 	for (i = 0; i < ARRAY_SIZE(queue->tasks); i++)
 		INIT_LIST_HEAD(&queue->tasks[i]);
 	queue->maxpriority = maxprio;
@@ -328,23 +324,22 @@ static void __rpc_sleep_on(struct rpc_wa
 	__rpc_add_timer(task, timer);
 }
 
-void
-rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
+void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
 				rpc_action action, rpc_action timer)
 {
 	/*
 	 * Protect the queue operations.
 	 */
-	spin_lock_bh(&rpc_queue_lock);
+	spin_lock_bh(&q->lock);
 	__rpc_sleep_on(q, task, action, timer);
-	spin_unlock_bh(&rpc_queue_lock);
+	spin_unlock_bh(&q->lock);
 }
 
 /**
  * __rpc_do_wake_up_task - wake up a single rpc_task
  * @task: task to be woken up
  *
- * Caller must hold rpc_queue_lock, and have cleared the task queued flag.
+ * Caller must hold queue->lock, and have cleared the task queued flag.
  */
 static void __rpc_do_wake_up_task(struct rpc_task *task)
 {
@@ -402,9 +397,11 @@ void rpc_wake_up_task(struct rpc_task *t
 {
 	if (rpc_start_wakeup(task)) {
 		if (RPC_IS_QUEUED(task)) {
-			spin_lock_bh(&rpc_queue_lock);
+			struct rpc_wait_queue *queue = task->u.tk_wait.rpc_waitq;
+
+			spin_lock_bh(&queue->lock);
 			__rpc_do_wake_up_task(task);
-			spin_unlock_bh(&rpc_queue_lock);
+			spin_unlock_bh(&queue->lock);
 		}
 		rpc_finish_wakeup(task);
 	}
@@ -470,14 +467,14 @@ struct rpc_task * rpc_wake_up_next(struc
 	struct rpc_task	*task = NULL;
 
 	dprintk("RPC:      wake_up_next(%p \"%s\")\n", queue, rpc_qname(queue));
-	spin_lock_bh(&rpc_queue_lock);
+	spin_lock_bh(&queue->lock);
 	if (RPC_IS_PRIORITY(queue))
 		task = __rpc_wake_up_next_priority(queue);
 	else {
 		task_for_first(task, &queue->tasks[0])
 			__rpc_wake_up_task(task);
 	}
-	spin_unlock_bh(&rpc_queue_lock);
+	spin_unlock_bh(&queue->lock);
 
 	return task;
 }
@@ -486,14 +483,14 @@ struct rpc_task * rpc_wake_up_next(struc
  * rpc_wake_up - wake up all rpc_tasks
  * @queue: rpc_wait_queue on which the tasks are sleeping
  *
- * Grabs rpc_queue_lock
+ * Grabs queue->lock
  */
 void rpc_wake_up(struct rpc_wait_queue *queue)
 {
 	struct rpc_task *task;
 
 	struct list_head *head;
-	spin_lock_bh(&rpc_queue_lock);
+	spin_lock_bh(&queue->lock);
 	head = &queue->tasks[queue->maxpriority];
 	for (;;) {
 		while (!list_empty(head)) {
@@ -504,7 +501,7 @@ void rpc_wake_up(struct rpc_wait_queue *
 			break;
 		head--;
 	}
-	spin_unlock_bh(&rpc_queue_lock);
+	spin_unlock_bh(&queue->lock);
 }
 
 /**
@@ -512,14 +509,14 @@ void rpc_wake_up(struct rpc_wait_queue *
  * @queue: rpc_wait_queue on which the tasks are sleeping
  * @status: status value to set
  *
- * Grabs rpc_queue_lock
+ * Grabs queue->lock
  */
 void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
 {
 	struct list_head *head;
 	struct rpc_task *task;
 
-	spin_lock_bh(&rpc_queue_lock);
+	spin_lock_bh(&queue->lock);
 	head = &queue->tasks[queue->maxpriority];
 	for (;;) {
 		while (!list_empty(head)) {
@@ -531,7 +528,7 @@ void rpc_wake_up_status(struct rpc_wait_
 			break;
 		head--;
 	}
-	spin_unlock_bh(&rpc_queue_lock);
+	spin_unlock_bh(&queue->lock);
 }
 
 /*
@@ -832,8 +829,7 @@ cleanup:
 	goto out;
 }
 
-void
-rpc_release_task(struct rpc_task *task)
+void rpc_release_task(struct rpc_task *task)
 {
 	dprintk("RPC: %4d release task\n", task->tk_pid);
 
@@ -883,10 +879,9 @@ rpc_release_task(struct rpc_task *task)
  * queue 'childq'. If so returns a pointer to the parent.
  * Upon failure returns NULL.
  *
- * Caller must hold rpc_queue_lock
+ * Caller must hold childq.lock
  */
-static inline struct rpc_task *
-rpc_find_parent(struct rpc_task *child)
+static inline struct rpc_task *rpc_find_parent(struct rpc_task *child)
 {
 	struct rpc_task	*task, *parent;
 	struct list_head *le;
@@ -899,17 +894,16 @@ rpc_find_parent(struct rpc_task *child)
 	return NULL;
 }
 
-static void
-rpc_child_exit(struct rpc_task *child)
+static void rpc_child_exit(struct rpc_task *child)
 {
 	struct rpc_task	*parent;
 
-	spin_lock_bh(&rpc_queue_lock);
+	spin_lock_bh(&childq.lock);
 	if ((parent = rpc_find_parent(child)) != NULL) {
 		parent->tk_status = child->tk_status;
 		__rpc_wake_up_task(parent);
 	}
-	spin_unlock_bh(&rpc_queue_lock);
+	spin_unlock_bh(&childq.lock);
 }
 
 /*
@@ -932,22 +926,20 @@ fail:
 	return NULL;
 }
 
-void
-rpc_run_child(struct rpc_task *task, struct rpc_task *child, rpc_action func)
+void rpc_run_child(struct rpc_task *task, struct rpc_task *child, rpc_action func)
 {
-	spin_lock_bh(&rpc_queue_lock);
+	spin_lock_bh(&childq.lock);
 	/* N.B. Is it possible for the child to have already finished? */
 	__rpc_sleep_on(&childq, task, func, NULL);
 	rpc_schedule_run(child);
-	spin_unlock_bh(&rpc_queue_lock);
+	spin_unlock_bh(&childq.lock);
 }
 
 /*
  * Kill all tasks for the given client.
  * XXX: kill their descendants as well?
  */
-void
-rpc_killall_tasks(struct rpc_clnt *clnt)
+void rpc_killall_tasks(struct rpc_clnt *clnt)
 {
 	struct rpc_task	*rovr;
 	struct list_head *le;
@@ -969,8 +961,7 @@ rpc_killall_tasks(struct rpc_clnt *clnt)
 
 static DECLARE_MUTEX_LOCKED(rpciod_running);
 
-static void
-rpciod_killall(void)
+static void rpciod_killall(void)
 {
 	unsigned long flags;
 
_