From: Chris Mason <mason@suse.com>

reiserfs logging rework, making things much faster for small transactions. 
metadata buffers are dirtied when they are safe to write, so normal kernel
mechanisms can contribute to log cleaning.


---

 25-akpm/fs/reiserfs/do_balan.c         |   25 
 25-akpm/fs/reiserfs/fix_node.c         |   34 
 25-akpm/fs/reiserfs/ibalance.c         |    2 
 25-akpm/fs/reiserfs/inode.c            |    4 
 25-akpm/fs/reiserfs/journal.c          | 1616 ++++++++++++++++++---------------
 25-akpm/fs/reiserfs/objectid.c         |    3 
 25-akpm/fs/reiserfs/procfs.c           |    5 
 25-akpm/fs/reiserfs/super.c            |   31 
 25-akpm/include/linux/reiserfs_fs.h    |   29 
 25-akpm/include/linux/reiserfs_fs_i.h  |    4 
 25-akpm/include/linux/reiserfs_fs_sb.h |   70 -
 11 files changed, 972 insertions(+), 851 deletions(-)

diff -puN fs/reiserfs/do_balan.c~reiserfs-logging fs/reiserfs/do_balan.c
--- 25/fs/reiserfs/do_balan.c~reiserfs-logging	Wed Mar 24 15:14:39 2004
+++ 25-akpm/fs/reiserfs/do_balan.c	Wed Mar 24 15:14:39 2004
@@ -30,32 +30,11 @@ struct tree_balance * cur_tb = NULL; /* 
                                         is interrupting do_balance */
 #endif
 
-/*
- * AKPM: The __mark_buffer_dirty() call here will not
- * put the buffer on the dirty buffer LRU because we've just
- * set BH_Dirty.  That's a thinko in reiserfs.
- *
- * I'm reluctant to "fix" this bug because that would change
- * behaviour.  Using mark_buffer_dirty() here would make the
- * buffer eligible for VM and periodic writeback, which may
- * violate ordering constraints.  I'll just leave the code
- * as-is by removing the __mark_buffer_dirty call altogether.
- *
- * Chris says this code has "probably never been run" anyway.
- * It is due to go away.
- */
-
 inline void do_balance_mark_leaf_dirty (struct tree_balance * tb, 
 					struct buffer_head * bh, int flag)
 {
-    if (reiserfs_dont_log(tb->tb_sb)) {
-	if (!test_set_buffer_dirty(bh)) {
-//	    __mark_buffer_dirty(bh) ;
-	    tb->need_balance_dirty = 1;
-	}
-    } else {
-	journal_mark_dirty(tb->transaction_handle, tb->transaction_handle->t_super, bh) ;
-    }
+    journal_mark_dirty(tb->transaction_handle,
+                       tb->transaction_handle->t_super, bh) ;
 }
 
 #define do_balance_mark_internal_dirty do_balance_mark_leaf_dirty
diff -puN fs/reiserfs/fix_node.c~reiserfs-logging fs/reiserfs/fix_node.c
--- 25/fs/reiserfs/fix_node.c~reiserfs-logging	Wed Mar 24 15:14:39 2004
+++ 25-akpm/fs/reiserfs/fix_node.c	Wed Mar 24 15:14:39 2004
@@ -2106,9 +2106,9 @@ static void tb_buffer_sanity_check (stru
 {;}
 #endif
 
-static void clear_all_dirty_bits(struct super_block *s, 
+static int clear_all_dirty_bits(struct super_block *s,
                                  struct buffer_head *bh) {
-  reiserfs_prepare_for_journal(s, bh, 0) ;
+  return reiserfs_prepare_for_journal(s, bh, 0) ;
 }
 
 static int wait_tb_buffers_until_unlocked (struct tree_balance * p_s_tb)
@@ -2137,11 +2137,11 @@ static int wait_tb_buffers_until_unlocke
 					    p_s_tb->tb_path->path_length - i);
 		}
 #endif
-		clear_all_dirty_bits(p_s_tb->tb_sb, 
-				     PATH_OFFSET_PBUFFER (p_s_tb->tb_path, i)) ;
-
-		if ( buffer_locked (PATH_OFFSET_PBUFFER (p_s_tb->tb_path, i)) )
+		if (!clear_all_dirty_bits(p_s_tb->tb_sb,
+				     PATH_OFFSET_PBUFFER (p_s_tb->tb_path, i)))
+		{
 		    locked = PATH_OFFSET_PBUFFER (p_s_tb->tb_path, i);
+		}
 	    }
 	}
 
@@ -2151,22 +2151,19 @@ static int wait_tb_buffers_until_unlocke
 
 		if ( p_s_tb->L[i] ) {
 		    tb_buffer_sanity_check (p_s_tb->tb_sb, p_s_tb->L[i], "L", i);
-		    clear_all_dirty_bits(p_s_tb->tb_sb, p_s_tb->L[i]) ;
-		    if ( buffer_locked (p_s_tb->L[i]) )
+		    if (!clear_all_dirty_bits(p_s_tb->tb_sb, p_s_tb->L[i]))
 			locked = p_s_tb->L[i];
 		}
 
 		if ( !locked && p_s_tb->FL[i] ) {
 		    tb_buffer_sanity_check (p_s_tb->tb_sb, p_s_tb->FL[i], "FL", i);
-		    clear_all_dirty_bits(p_s_tb->tb_sb, p_s_tb->FL[i]) ;
-		    if ( buffer_locked (p_s_tb->FL[i]) )
+		    if (!clear_all_dirty_bits(p_s_tb->tb_sb, p_s_tb->FL[i]))
 			locked = p_s_tb->FL[i];
 		}
 
 		if ( !locked && p_s_tb->CFL[i] ) {
 		    tb_buffer_sanity_check (p_s_tb->tb_sb, p_s_tb->CFL[i], "CFL", i);
-		    clear_all_dirty_bits(p_s_tb->tb_sb, p_s_tb->CFL[i]) ;
-		    if ( buffer_locked (p_s_tb->CFL[i]) )
+		    if (!clear_all_dirty_bits(p_s_tb->tb_sb, p_s_tb->CFL[i]))
 			locked = p_s_tb->CFL[i];
 		}
 
@@ -2176,23 +2173,20 @@ static int wait_tb_buffers_until_unlocke
 
 		if ( p_s_tb->R[i] ) {
 		    tb_buffer_sanity_check (p_s_tb->tb_sb, p_s_tb->R[i], "R", i);
-		    clear_all_dirty_bits(p_s_tb->tb_sb, p_s_tb->R[i]) ;
-		    if ( buffer_locked (p_s_tb->R[i]) )
+		    if (!clear_all_dirty_bits(p_s_tb->tb_sb, p_s_tb->R[i]))
 			locked = p_s_tb->R[i];
 		}
 
        
 		if ( !locked && p_s_tb->FR[i] ) {
 		    tb_buffer_sanity_check (p_s_tb->tb_sb, p_s_tb->FR[i], "FR", i);
-		    clear_all_dirty_bits(p_s_tb->tb_sb, p_s_tb->FR[i]) ;
-		    if ( buffer_locked (p_s_tb->FR[i]) )
+		    if (!clear_all_dirty_bits(p_s_tb->tb_sb, p_s_tb->FR[i]))
 			locked = p_s_tb->FR[i];
 		}
 
 		if ( !locked && p_s_tb->CFR[i] ) {
 		    tb_buffer_sanity_check (p_s_tb->tb_sb, p_s_tb->CFR[i], "CFR", i);
-		    clear_all_dirty_bits(p_s_tb->tb_sb, p_s_tb->CFR[i]) ;
-		    if ( buffer_locked (p_s_tb->CFR[i]) )
+		    if (!clear_all_dirty_bits(p_s_tb->tb_sb, p_s_tb->CFR[i]))
 			locked = p_s_tb->CFR[i];
 		}
 	    }
@@ -2207,10 +2201,8 @@ static int wait_tb_buffers_until_unlocke
 	*/
 	for ( i = 0; !locked && i < MAX_FEB_SIZE; i++ ) { 
 	    if ( p_s_tb->FEB[i] ) {
-		clear_all_dirty_bits(p_s_tb->tb_sb, p_s_tb->FEB[i]) ;
-		if (buffer_locked(p_s_tb->FEB[i])) {
+		if (!clear_all_dirty_bits(p_s_tb->tb_sb, p_s_tb->FEB[i]))
 		    locked = p_s_tb->FEB[i] ;
-		}
 	    }
 	}
 
diff -puN fs/reiserfs/ibalance.c~reiserfs-logging fs/reiserfs/ibalance.c
--- 25/fs/reiserfs/ibalance.c~reiserfs-logging	Wed Mar 24 15:14:39 2004
+++ 25-akpm/fs/reiserfs/ibalance.c	Wed Mar 24 15:14:39 2004
@@ -633,7 +633,6 @@ static void balance_internal_when_delete
 		/* use check_internal if new root is an internal node */
 		check_internal (new_root);
 	    /*&&&&&&&&&&&&&&&&&&&&&&*/
-	    tb->tb_sb->s_dirt = 1;
 
 	    /* do what is needed for buffer thrown from tree */
 	    reiserfs_invalidate_buffer(tb, tbSh);
@@ -951,7 +950,6 @@ int balance_internal (struct tree_balanc
         PUT_SB_ROOT_BLOCK( tb->tb_sb, tbSh->b_blocknr );
         PUT_SB_TREE_HEIGHT( tb->tb_sb, SB_TREE_HEIGHT(tb->tb_sb) + 1 );
 	do_balance_mark_sb_dirty (tb, REISERFS_SB(tb->tb_sb)->s_sbh, 1);
-	tb->tb_sb->s_dirt = 1;
     }
 	
     if ( tb->blknum[h] == 2 ) {
diff -puN fs/reiserfs/inode.c~reiserfs-logging fs/reiserfs/inode.c
--- 25/fs/reiserfs/inode.c~reiserfs-logging	Wed Mar 24 15:14:39 2004
+++ 25-akpm/fs/reiserfs/inode.c	Wed Mar 24 15:14:39 2004
@@ -964,7 +964,7 @@ static void init_inode (struct inode * i
     REISERFS_I(inode)->i_prealloc_block = 0;
     REISERFS_I(inode)->i_prealloc_count = 0;
     REISERFS_I(inode)->i_trans_id = 0;
-    REISERFS_I(inode)->i_trans_index = 0;
+    REISERFS_I(inode)->i_jl = NULL;
 
     if (stat_data_v1 (ih)) {
 	struct stat_data_v1 * sd = (struct stat_data_v1 *)B_I_PITEM (bh, ih);
@@ -1621,7 +1621,7 @@ int reiserfs_new_inode (struct reiserfs_
     REISERFS_I(inode)->i_prealloc_block = 0;
     REISERFS_I(inode)->i_prealloc_count = 0;
     REISERFS_I(inode)->i_trans_id = 0;
-    REISERFS_I(inode)->i_trans_index = 0;
+    REISERFS_I(inode)->i_jl = 0;
     REISERFS_I(inode)->i_attrs =
 	REISERFS_I(dir)->i_attrs & REISERFS_INHERIT_MASK;
     sd_attrs_to_i_attrs( REISERFS_I(inode) -> i_attrs, inode );
diff -puN fs/reiserfs/journal.c~reiserfs-logging fs/reiserfs/journal.c
--- 25/fs/reiserfs/journal.c~reiserfs-logging	Wed Mar 24 15:14:39 2004
+++ 25-akpm/fs/reiserfs/journal.c	Wed Mar 24 15:14:39 2004
@@ -32,13 +32,6 @@
 **                      around too long.
 **		     -- Note, if you call this as an immediate flush from 
 **		        from within kupdate, it will ignore the immediate flag
-**
-** The commit thread -- a writer process for async commits.  It allows a 
-**                      a process to request a log flush on a task queue.
-**                      the commit will happen once the commit thread wakes up.
-**                      The benefit here is the writer (with whatever
-**                      related locks it has) doesn't have to wait for the
-**                      log blocks to hit disk if it doesn't want to.
 */
 
 #include <linux/config.h>
@@ -60,6 +53,14 @@
 #include <linux/suspend.h>
 #include <linux/buffer_head.h>
 #include <linux/workqueue.h>
+#include <linux/writeback.h>
+
+
+/* gets a struct reiserfs_journal_list * from a list head */
+#define JOURNAL_LIST_ENTRY(h) (list_entry((h), struct reiserfs_journal_list, \
+                               j_list))
+#define JOURNAL_WORK_ENTRY(h) (list_entry((h), struct reiserfs_journal_list, \
+                               j_working_list))
 
 /* the number of mounted filesystems.  This is used to decide when to
 ** start and kill the commit workqueue
@@ -78,6 +79,12 @@ static struct workqueue_struct *commit_w
 #define BLOCK_FREED_HOLDER 3    /* this block was freed during this transaction, and can't be written */
 
 #define BLOCK_NEEDS_FLUSH 4	/* used in flush_journal_list */
+#define BLOCK_DIRTIED 5
+
+
+/* journal list state bits */
+#define LIST_TOUCHED 1
+#define LIST_DIRTY   2
 
 /* flags for do_journal_end */
 #define FLUSH_ALL   1		/* flush commit and real blocks */
@@ -86,6 +93,9 @@ static struct workqueue_struct *commit_w
 
 /* state bits for the journal */
 #define WRITERS_BLOCKED 1      /* set when new writers not allowed */
+#define WRITERS_QUEUED 2       /* set when log is full due to too many
+				* writers
+				*/
 
 static int do_journal_end(struct reiserfs_transaction_handle *,struct super_block *,unsigned long nblocks,int flags) ;
 static int flush_journal_list(struct super_block *s, struct reiserfs_journal_list *jl, int flushall) ;
@@ -94,6 +104,9 @@ static int can_dirty(struct reiserfs_jou
 static int journal_join(struct reiserfs_transaction_handle *th, struct super_block *p_s_sb, unsigned long nblocks);
 static int release_journal_dev( struct super_block *super,
 				struct reiserfs_journal *journal );
+static int dirty_one_transaction(struct super_block *s,
+                                 struct reiserfs_journal_list *jl);
+static void flush_async_commits(void *p);
 
 static void init_journal_hash(struct super_block *p_s_sb) {
   memset(SB_JOURNAL(p_s_sb)->j_hash_table, 0, JOURNAL_HASH_SIZE * sizeof(struct reiserfs_journal_cnode *)) ;
@@ -105,8 +118,10 @@ static void init_journal_hash(struct sup
 ** more details.
 */
 static int reiserfs_clean_and_file_buffer(struct buffer_head *bh) {
-  if (bh)
+  if (bh) {
     clear_buffer_dirty(bh);
+    clear_bit(BH_JTest, &bh->b_state);
+  }
   return 0 ;
 }
 
@@ -367,6 +382,7 @@ static void free_cnode(struct super_bloc
 
 static int clear_prepared_bits(struct buffer_head *bh) {
   clear_bit(BH_JPrepared, &bh->b_state) ;
+  clear_bit(BH_JRestore_dirty, &bh->b_state) ;
   return 0 ;
 }
 
@@ -471,11 +487,6 @@ int reiserfs_in_journal(struct super_blo
 
   *next_zero_bit = 0 ; /* always start this at zero. */
 
-  /* we aren't logging all blocks are safe for reuse */
-  if (reiserfs_dont_log(p_s_sb)) {
-    return 0 ;
-  }
-
   PROC_INFO_INC( p_s_sb, journal.in_journal );
   /* If we aren't doing a search_all, this is a metablock, and it will be logged before use.
   ** if we crash before the transaction that freed it commits,  this transaction won't
@@ -503,6 +514,7 @@ int reiserfs_in_journal(struct super_blo
 
   /* is it in the current transaction.  This should never happen */
   if ((cn = get_journal_hash_dev(p_s_sb, SB_JOURNAL(p_s_sb)->j_hash_table, bl))) {
+    BUG();
     return 1; 
   }
 
@@ -527,18 +539,30 @@ inline void insert_journal_hash(struct r
 
 /* lock the current transaction */
 inline static void lock_journal(struct super_block *p_s_sb) {
-  PROC_INFO_INC( p_s_sb, journal.lock_journal );
-  while(atomic_read(&(SB_JOURNAL(p_s_sb)->j_wlock)) > 0) {
-    PROC_INFO_INC( p_s_sb, journal.lock_journal_wait );
-    sleep_on(&(SB_JOURNAL(p_s_sb)->j_wait)) ;
-  }
-  atomic_set(&(SB_JOURNAL(p_s_sb)->j_wlock), 1) ;
+    PROC_INFO_INC( p_s_sb, journal.lock_journal );
+    down(&SB_JOURNAL(p_s_sb)->j_lock);
 }
 
 /* unlock the current transaction */
 inline static void unlock_journal(struct super_block *p_s_sb) {
-  atomic_dec(&(SB_JOURNAL(p_s_sb)->j_wlock)) ;
-  wake_up(&(SB_JOURNAL(p_s_sb)->j_wait)) ;
+    up(&SB_JOURNAL(p_s_sb)->j_lock);
+}
+
+static inline void get_journal_list(struct reiserfs_journal_list *jl)
+{
+    jl->j_refcount++;
+}
+
+static inline void put_journal_list(struct super_block *s,
+                                   struct reiserfs_journal_list *jl)
+{
+    if (jl->j_refcount < 1) {
+        printk("trans id %lu, refcount at %d\n", jl->j_trans_id,
+	                                         jl->j_refcount);
+        BUG();
+    }
+    if (--jl->j_refcount == 0)
+        reiserfs_kfree(jl, sizeof(struct reiserfs_journal_list), s);
 }
 
 /*
@@ -556,6 +580,83 @@ static void cleanup_freed_for_journal_li
   jl->j_list_bitmap = NULL ;
 }
 
+static int journal_list_still_alive(struct super_block *s,
+                                    unsigned long trans_id)
+{
+    struct list_head *entry = &SB_JOURNAL(s)->j_journal_list;
+    struct reiserfs_journal_list *jl;
+
+    if (!list_empty(entry)) {
+        jl = JOURNAL_LIST_ENTRY(entry->next);
+	if (jl->j_trans_id <= trans_id) {
+	    return 1;
+	}
+    }
+    return 0;
+}
+
+static int flush_older_commits(struct super_block *s, struct reiserfs_journal_list *jl) {
+    struct reiserfs_journal_list *other_jl;
+    struct reiserfs_journal_list *first_jl;
+    struct list_head *entry;
+    unsigned long trans_id = jl->j_trans_id;
+    unsigned long other_trans_id;
+    unsigned long first_trans_id;
+
+find_first:
+    /*
+     * first we walk backwards to find the oldest uncommitted transation
+     */
+    first_jl = jl;
+    entry = jl->j_list.prev;
+    while(1) {
+	other_jl = JOURNAL_LIST_ENTRY(entry);
+	if (entry == &SB_JOURNAL(s)->j_journal_list ||
+	    atomic_read(&other_jl->j_older_commits_done))
+	    break;
+
+        first_jl = other_jl;
+	entry = other_jl->j_list.prev;
+    }
+
+    /* if we didn't find any older uncommitted transactions, return now */
+    if (first_jl == jl) {
+        return 0;
+    }
+
+    first_trans_id = first_jl->j_trans_id;
+
+    entry = &first_jl->j_list;
+    while(1) {
+	other_jl = JOURNAL_LIST_ENTRY(entry);
+	other_trans_id = other_jl->j_trans_id;
+
+	if (other_trans_id < trans_id) {
+	    if (atomic_read(&other_jl->j_commit_left) != 0) {
+		flush_commit_list(s, other_jl, 0);
+
+		/* list we were called with is gone, return */
+		if (!journal_list_still_alive(s, trans_id))
+		    return 1;
+
+		/* the one we just flushed is gone, this means all
+		 * older lists are also gone, so first_jl is no longer
+		 * valid either.  Go back to the beginning.
+		 */
+		if (!journal_list_still_alive(s, other_trans_id)) {
+		    goto find_first;
+		}
+	    }
+	    entry = entry->next;
+	    if (entry == &SB_JOURNAL(s)->j_journal_list)
+		return 0;
+	} else {
+	    return 0;
+	}
+    }
+    return 0;
+}
+
 /*
 ** if this journal list still has commit blocks unflushed, send them to disk.
 **
@@ -564,13 +665,10 @@ static void cleanup_freed_for_journal_li
 **
 */
 static int flush_commit_list(struct super_block *s, struct reiserfs_journal_list *jl, int flushall) {
-  int i, count ;
-  int index = 0 ;
+  int i;
   int bn ;
-  int retry_count = 0 ;
-  int orig_commit_left = 0 ;
   struct buffer_head *tbh = NULL ;
-  struct reiserfs_journal_list *other_jl ;
+  unsigned long trans_id = jl->j_trans_id;
 
   reiserfs_check_lock_depth("flush_commit_list") ;
 
@@ -581,133 +679,100 @@ static int flush_commit_list(struct supe
   /* before we can put our commit blocks on disk, we have to make sure everyone older than
   ** us is on disk too
   */
-  if (jl->j_len <= 0) {
-    return 0 ;
-  }
+  if (jl->j_len <= 0)
+    BUG();
+  if (trans_id == SB_JOURNAL(s)->j_trans_id)
+    BUG();
+
+  get_journal_list(jl);
   if (flushall) {
-    /* we _must_ make sure the transactions are committed in order.  Start with the
-    ** index after this one, wrap all the way around 
-    */
-    index = (jl - SB_JOURNAL_LIST(s)) + 1 ;
-    for (i = 0 ; i < JOURNAL_LIST_COUNT ; i++) {
-      other_jl = SB_JOURNAL_LIST(s) + ( (index + i) % JOURNAL_LIST_COUNT) ;
-      if (other_jl && other_jl != jl && other_jl->j_len > 0 && other_jl->j_trans_id > 0 && 
-          other_jl->j_trans_id <= jl->j_trans_id && (atomic_read(&(jl->j_older_commits_done)) == 0)) {
-        flush_commit_list(s, other_jl, 0) ;
-      }
+    if (flush_older_commits(s, jl) == 1) {
+      /* list disappeared during flush_older_commits.  return */
+      goto put_jl;
     }
   }
 
-  count = 0 ;
-  /* don't flush the commit list for the current transactoin */
-  if (jl == ((SB_JOURNAL_LIST(s) + SB_JOURNAL_LIST_INDEX(s)))) {
-    return 0 ;
-  }
-
   /* make sure nobody is trying to flush this one at the same time */
-  if (atomic_read(&(jl->j_commit_flushing))) {
-    sleep_on(&(jl->j_commit_wait)) ;
-    if (flushall) {
-      atomic_set(&(jl->j_older_commits_done), 1) ;
-    }
-    return 0 ;
+  down(&jl->j_commit_lock);
+  if (!journal_list_still_alive(s, trans_id)) {
+    up(&jl->j_commit_lock);
+    goto put_jl;
   }
-  
+  if (jl->j_trans_id == 0)
+    BUG();
+
   /* this commit is done, exit */
   if (atomic_read(&(jl->j_commit_left)) <= 0) {
     if (flushall) {
       atomic_set(&(jl->j_older_commits_done), 1) ;
     }
-    return 0 ;
+    up(&jl->j_commit_lock);
+    goto put_jl;
   }
-  /* keeps others from flushing while we are flushing */
-  atomic_set(&(jl->j_commit_flushing), 1) ; 
-
 
-  if (jl->j_len > SB_JOURNAL_TRANS_MAX(s)) {
-    reiserfs_panic(s, "journal-512: flush_commit_list: length is %lu, list number %d\n", jl->j_len, jl - SB_JOURNAL_LIST(s)) ;
-    return 0 ;
-  }
-
-  orig_commit_left = atomic_read(&(jl->j_commit_left)) ; 
-
-  /* start by checking all the commit blocks in this transaction.  
-  ** Add anyone not on disk into tbh.  Stop checking once commit_left <= 1, because that means we
-  ** only have the commit block left 
-  */
-retry:
-  count = 0 ;
-  for (i = 0 ; atomic_read(&(jl->j_commit_left)) > 1 && i < (jl->j_len + 1) ; i++) {  /* everything but commit_bh */
-    bn = SB_ONDISK_JOURNAL_1st_BLOCK(s) + (jl->j_start+i) %  SB_ONDISK_JOURNAL_SIZE(s);
+  /*
+   * for the description block and all the log blocks, submit any buffers
+   * that haven't already reached the disk
+   */
+  for (i = 0 ; i < (jl->j_len + 1) ; i++) {
+    bn = SB_ONDISK_JOURNAL_1st_BLOCK(s) + (jl->j_start+i) %
+         SB_ONDISK_JOURNAL_SIZE(s);
     tbh = journal_find_get_block(s, bn) ;
-
-/* kill this sanity check */
-if (count > (orig_commit_left + 2)) {
-reiserfs_panic(s, "journal-539: flush_commit_list: BAD count(%d) > orig_commit_left(%d)!\n", count, orig_commit_left) ;
-}
-    if (tbh) {
-      if (buffer_locked(tbh)) { /* wait on it, redo it just to make sure */
-	wait_on_buffer(tbh) ;
-	if (!buffer_uptodate(tbh)) {
-	  reiserfs_panic(s, "journal-584, buffer write failed\n") ;
-	}
-      } 
-      if (buffer_dirty(tbh)) {
-	printk("journal-569: flush_commit_list, block already dirty!\n") ;
-      } else {				
-	mark_buffer_dirty(tbh) ;
-      }
-      ll_rw_block(WRITE, 1, &tbh) ;
-      count++ ;
-      put_bh(tbh) ; /* once for our get_hash */
-    } 
+    wait_on_buffer(tbh) ;
+    ll_rw_block(WRITE, 1, &tbh) ;
+    put_bh(tbh) ;
   }
 
-  /* wait on everyone in tbh before writing commit block*/
-  if (count > 0) {
-    for (i = 0 ; atomic_read(&(jl->j_commit_left)) > 1 && 
-                 i < (jl->j_len + 1) ; i++) {  /* everything but commit_bh */
-      bn = SB_ONDISK_JOURNAL_1st_BLOCK(s) + (jl->j_start + i) % SB_ONDISK_JOURNAL_SIZE(s) ;
-      tbh = journal_find_get_block(s, bn) ;
+  /* wait on everything written so far before writing the commit */
+  for (i = 0 ;  i < (jl->j_len + 1) ; i++) {
+    bn = SB_ONDISK_JOURNAL_1st_BLOCK(s) +
+	 (jl->j_start + i) % SB_ONDISK_JOURNAL_SIZE(s) ;
+    tbh = journal_find_get_block(s, bn) ;
 
-      wait_on_buffer(tbh) ;
-      if (!buffer_uptodate(tbh)) {
-	reiserfs_panic(s, "journal-601, buffer write failed\n") ;
-      }
-      put_bh(tbh) ; /* once for our get_hash */
-      bforget(tbh) ;    /* once due to original getblk in do_journal_end */
-      atomic_dec(&(jl->j_commit_left)) ;
-    }
+    wait_on_buffer(tbh) ;
+    if (buffer_dirty(tbh))
+      BUG();
+    if (!buffer_uptodate(tbh)) {
+      reiserfs_panic(s, "journal-601, buffer write failed\n") ;
+    }
+    put_bh(tbh) ; /* once for journal_find_get_block */
+    put_bh(tbh) ;    /* once due to original getblk in do_journal_end */
+    atomic_dec(&(jl->j_commit_left)) ;
   }
 
-  if (atomic_read(&(jl->j_commit_left)) != 1) { /* just the commit_bh left, flush it without calling getblk for everyone */
-    if (retry_count < 2) {
-      printk("journal-582: flush_commit_list, not all log blocks on disk yet, trying again\n") ;
-      retry_count++ ;
-      goto retry;
-    }
-    reiserfs_panic(s, "journal-563: flush_commit_list: BAD, j_commit_left is %u, should be 1\n", 
-		   atomic_read(&(jl->j_commit_left)));
-  }
+  if (atomic_read(&(jl->j_commit_left)) != 1)
+    BUG();
 
+  if (buffer_dirty(jl->j_commit_bh))
+    BUG();
   mark_buffer_dirty(jl->j_commit_bh) ;
   sync_dirty_buffer(jl->j_commit_bh) ;
   if (!buffer_uptodate(jl->j_commit_bh)) {
     reiserfs_panic(s, "journal-615: buffer write failed\n") ;
   }
-  atomic_dec(&(jl->j_commit_left)) ;
   bforget(jl->j_commit_bh) ;
+  if (SB_JOURNAL(s)->j_last_commit_id != 0 &&
+     (jl->j_trans_id - SB_JOURNAL(s)->j_last_commit_id) != 1) {
+      reiserfs_warning("clm-2200: last commit %lu, current %lu\n",
+                       SB_JOURNAL(s)->j_last_commit_id,
+		       jl->j_trans_id);
+  }
+  SB_JOURNAL(s)->j_last_commit_id = jl->j_trans_id;
 
   /* now, every commit block is on the disk.  It is safe to allow blocks freed during this transaction to be reallocated */
   cleanup_freed_for_journal_list(s, jl) ;
 
+  /* mark the metadata dirty */
+  dirty_one_transaction(s, jl);
+  atomic_dec(&(jl->j_commit_left)) ;
+
   if (flushall) {
     atomic_set(&(jl->j_older_commits_done), 1) ;
   }
-  atomic_set(&(jl->j_commit_flushing), 0) ;
-  wake_up(&(jl->j_commit_wait)) ;
+  up(&jl->j_commit_lock);
+put_jl:
+  put_journal_list(s, jl);
 
-  s->s_dirt = 1 ;
   return 0 ;
 }
 
@@ -804,22 +869,27 @@ static int update_journal_header_block(s
 ** flush any and all journal lists older than you are 
 ** can only be called from flush_journal_list
 */
-static int flush_older_journal_lists(struct super_block *p_s_sb, struct reiserfs_journal_list *jl, unsigned long trans_id) {
-  int i, index ;
-  struct reiserfs_journal_list *other_jl ;
-
-  index = jl - SB_JOURNAL_LIST(p_s_sb) ;
-  for (i = 0 ; i < JOURNAL_LIST_COUNT ; i++) {
-    other_jl = SB_JOURNAL_LIST(p_s_sb) + ((index + i) % JOURNAL_LIST_COUNT) ;
-    if (other_jl && other_jl->j_len > 0 && 
-        other_jl->j_trans_id > 0 && 
-	other_jl->j_trans_id < trans_id && 
-        other_jl != jl) {
-      /* do not flush all */
-      flush_journal_list(p_s_sb, other_jl, 0) ; 
+static int flush_older_journal_lists(struct super_block *p_s_sb,
+                                     struct reiserfs_journal_list *jl)
+{
+    struct list_head *entry;
+    struct reiserfs_journal_list *other_jl ;
+    unsigned long trans_id = jl->j_trans_id;
+
+    /* we know we are the only ones flushing things, no extra race
+     * protection is required.
+     */
+restart:
+    entry = SB_JOURNAL(p_s_sb)->j_journal_list.next;
+    other_jl = JOURNAL_LIST_ENTRY(entry);
+    if (other_jl->j_trans_id < trans_id) {
+	/* do not flush all */
+	flush_journal_list(p_s_sb, other_jl, 0) ;
+
+	/* other_jl is now deleted from the list */
+	goto restart;
     }
-  }
-  return 0 ;
+    return 0 ;
 }
 
 static void reiserfs_end_buffer_io_sync(struct buffer_head *bh, int uptodate) {
@@ -836,15 +906,27 @@ static void reiserfs_end_buffer_io_sync(
     unlock_buffer(bh) ;
     put_bh(bh) ;
 }
+
 static void submit_logged_buffer(struct buffer_head *bh) {
-    lock_buffer(bh) ;
     get_bh(bh) ;
     bh->b_end_io = reiserfs_end_buffer_io_sync ;
     mark_buffer_notjournal_new(bh) ;
     clear_buffer_dirty(bh) ;
+    if (!test_and_clear_bit(BH_JTest, &bh->b_state))
+        BUG();
+    if (!buffer_uptodate(bh))
+        BUG();
     submit_bh(WRITE, bh) ;
 }
 
+static void del_from_work_list(struct super_block *s,
+                               struct reiserfs_journal_list *jl) {
+    if (!list_empty(&jl->j_working_list)) {
+	list_del_init(&jl->j_working_list);
+	SB_JOURNAL(s)->j_num_work_lists--;
+    }
+}
+
 /* flush a journal list, both commit and real blocks
 **
 ** always set flushall to 1, unless you are calling from inside
@@ -865,29 +947,26 @@ static int flush_journal_list(struct sup
   unsigned long j_len_saved = jl->j_len ;
 
   if (j_len_saved <= 0) {
-    return 0 ;
+    BUG();
   }
 
   if (atomic_read(&SB_JOURNAL(s)->j_wcount) != 0) {
     reiserfs_warning("clm-2048: flush_journal_list called with wcount %d\n",
                       atomic_read(&SB_JOURNAL(s)->j_wcount)) ;
   }
-  /* if someone is getting the commit list, we must wait for them */
-  while (atomic_read(&(jl->j_commit_flushing))) { 
-    sleep_on(&(jl->j_commit_wait)) ;
-  }
-  /* if someone is flushing this list, we must wait for them */
-  while (atomic_read(&(jl->j_flushing))) {
-    sleep_on(&(jl->j_flush_wait)) ;
-  }
+  if (jl->j_trans_id == 0)
+    BUG();
 
-  /* this list is now ours, we can change anything we want */
-  atomic_set(&(jl->j_flushing), 1) ;
+  /* if flushall == 0, the lock is already held */
+  if (flushall) {
+      down(&SB_JOURNAL(s)->j_flush_sem);
+  } else if (!down_trylock(&SB_JOURNAL(s)->j_flush_sem)) {
+      BUG();
+  }
 
   count = 0 ;
   if (j_len_saved > SB_JOURNAL_TRANS_MAX(s)) {
-    reiserfs_panic(s, "journal-715: flush_journal_list, length is %lu, list number %d\n", j_len_saved, jl - SB_JOURNAL_LIST(s)) ;
-    atomic_dec(&(jl->j_flushing)) ;
+    reiserfs_panic(s, "journal-715: flush_journal_list, length is %lu, trans id %lu\n", j_len_saved, jl->j_trans_id);
     return 0 ;
   }
 
@@ -902,6 +981,9 @@ static int flush_journal_list(struct sup
   */
   flush_commit_list(s, jl, 1) ;
 
+  if (!(jl->j_state & LIST_DIRTY))
+      BUG();
+
   /* are we done now? */
   if (atomic_read(&(jl->j_nonzerolen)) <= 0 && 
       atomic_read(&(jl->j_commit_left)) <= 0) {
@@ -937,13 +1019,13 @@ static int flush_journal_list(struct sup
       get_bh(saved_bh) ;
 
       if (buffer_journal_dirty(saved_bh)) {
+	if (!can_dirty(cn))
+	  BUG();
         was_jwait = 1 ;
-	mark_buffer_notjournal_dirty(saved_bh) ;
-        /* undo the inc from journal_mark_dirty */
-	put_bh(saved_bh) ;
-      }
-      if (can_dirty(cn)) {
         was_dirty = 1 ;
+      } else if (can_dirty(cn)) {
+        /* everything with !pjl && jwait should be writable */
+	BUG();
       }
     }
 
@@ -951,7 +1033,8 @@ static int flush_journal_list(struct sup
     ** sure they are commited, and don't try writing it to disk
     */
     if (pjl) {
-      flush_commit_list(s, pjl, 1) ;
+      if (atomic_read(&pjl->j_commit_left))
+        flush_commit_list(s, pjl, 1) ;
       goto free_cnode ;
     }
 
@@ -970,22 +1053,17 @@ static int flush_journal_list(struct sup
 printk("journal-813: BAD! buffer %llu %cdirty %cjwait, not in a newer tranasction\n", (unsigned long long)saved_bh->b_blocknr,
         was_dirty ? ' ' : '!', was_jwait ? ' ' : '!') ;
     }
-    /* kupdate_one_transaction waits on the buffers it is writing, so we
-    ** should never see locked buffers here
-    */
-    if (buffer_locked(saved_bh)) {
-      printk("clm-2083: locked buffer %llu in flush_journal_list\n", 
-              (unsigned long long)saved_bh->b_blocknr) ;
-      wait_on_buffer(saved_bh) ;
-      if (!buffer_uptodate(saved_bh)) {
-        reiserfs_panic(s, "journal-923: buffer write failed\n") ;
-      }
-    } 
     if (was_dirty) { 
       /* we inc again because saved_bh gets decremented at free_cnode */
       get_bh(saved_bh) ;
       set_bit(BLOCK_NEEDS_FLUSH, &cn->state) ;
-      submit_logged_buffer(saved_bh) ;
+      lock_buffer(saved_bh);
+      if (cn->blocknr != saved_bh->b_blocknr)
+        BUG();
+      if (buffer_dirty(saved_bh))
+        submit_logged_buffer(saved_bh) ;
+      else
+        unlock_buffer(saved_bh);
       count++ ;
     } else {
       printk("clm-2082: Unable to flush buffer %llu in flush_journal_list\n",
@@ -1016,6 +1094,14 @@ free_cnode:
 	if (!buffer_uptodate(cn->bh)) {
 	  reiserfs_panic(s, "journal-949: buffer write failed\n") ;
 	}
+	/* note, we must clear the JDirty_wait bit after the up to date
+	** check, otherwise we race against our flushpage routine
+	*/
+	if (!test_and_clear_bit(BH_JDirty_wait, &cn->bh->b_state))
+	    BUG();
+
+        /* undo the inc from journal_mark_dirty */
+	put_bh(cn->bh) ;
         brelse(cn->bh) ;
       }
       cn = cn->next ;
@@ -1029,7 +1115,7 @@ flush_older_and_return:
   ** replayed after a crash
   */
   if (flushall) {
-    flush_older_journal_lists(s, jl, jl->j_trans_id) ;
+    flush_older_journal_lists(s, jl);
   } 
   
   /* before we can remove everything from the hash tables for this 
@@ -1044,181 +1130,246 @@ flush_older_and_return:
     update_journal_header_block(s, (jl->j_start + jl->j_len + 2) % SB_ONDISK_JOURNAL_SIZE(s), jl->j_trans_id) ;
   }
   remove_all_from_journal_list(s, jl, 0) ;
+  list_del(&jl->j_list);
+  SB_JOURNAL(s)->j_num_lists--;
+  del_from_work_list(s, jl);
+
+  if (SB_JOURNAL(s)->j_last_flush_id != 0 &&
+     (jl->j_trans_id - SB_JOURNAL(s)->j_last_flush_id) != 1) {
+      reiserfs_warning("clm-2201: last flush %lu, current %lu\n",
+                       SB_JOURNAL(s)->j_last_flush_id,
+		       jl->j_trans_id);
+  }
+  SB_JOURNAL(s)->j_last_flush_id = jl->j_trans_id;
+
+  /* not strictly required since we are freeing the list, but it should
+   * help find code using dead lists later on
+   */
   jl->j_len = 0 ;
   atomic_set(&(jl->j_nonzerolen), 0) ;
   jl->j_start = 0 ;
   jl->j_realblock = NULL ;
   jl->j_commit_bh = NULL ;
   jl->j_trans_id = 0 ;
-  atomic_dec(&(jl->j_flushing)) ;
-  wake_up(&(jl->j_flush_wait)) ;
+  jl->j_state = 0;
+  put_journal_list(s, jl);
+  if (flushall)
+    up(&SB_JOURNAL(s)->j_flush_sem);
   return 0 ;
 } 
 
-
-static int kupdate_one_transaction(struct super_block *s,
-                                    struct reiserfs_journal_list *jl) 
+#define CHUNK_SIZE 32
+struct buffer_chunk {
+    struct buffer_head *bh[CHUNK_SIZE];
+    int nr;
+};
+
+static void write_chunk(struct buffer_chunk *chunk) {
+    int i;
+    for (i = 0; i < chunk->nr ; i++) {
+	submit_logged_buffer(chunk->bh[i]) ;
+    }
+    chunk->nr = 0;
+}
+
+static void add_to_chunk(struct buffer_chunk *chunk, struct buffer_head *bh) {
+    if (chunk->nr >= CHUNK_SIZE)
+        BUG();
+    chunk->bh[chunk->nr++] = bh;
+    if (chunk->nr >= CHUNK_SIZE)
+        write_chunk(chunk);
+}
+
+static int write_one_transaction(struct super_block *s,
+                                 struct reiserfs_journal_list *jl,
+				 struct buffer_chunk *chunk)
 {
-    struct reiserfs_journal_list *pjl ; /* previous list for this cn */
-    struct reiserfs_journal_cnode *cn, *walk_cn ;
-    b_blocknr_t blocknr ;
-    int run = 0 ;
-    int orig_trans_id = jl->j_trans_id ;
-    struct buffer_head *saved_bh ; 
+    struct reiserfs_journal_cnode *cn;
     int ret = 0 ;
 
-    /* if someone is getting the commit list, we must wait for them */
-    while (atomic_read(&(jl->j_commit_flushing))) {
-        sleep_on(&(jl->j_commit_wait)) ;
-    }
-    /* if someone is flushing this list, we must wait for them */
-    while (atomic_read(&(jl->j_flushing))) {
-        sleep_on(&(jl->j_flush_wait)) ;
-    }
-    /* was it flushed while we slept? */
-    if (jl->j_len <= 0 || jl->j_trans_id != orig_trans_id) {
-        return 0 ;
+    jl->j_state |= LIST_TOUCHED;
+    del_from_work_list(s, jl);
+    if (jl->j_len == 0 || atomic_read(&jl->j_nonzerolen) == 0) {
+        return 0;
     }
 
-    /* this list is now ours, we can change anything we want */
-    atomic_set(&(jl->j_flushing), 1) ;
-
-loop_start:
     cn = jl->j_realblock ;
     while(cn) {
-        saved_bh = NULL ;
         /* if the blocknr == 0, this has been cleared from the hash,
         ** skip it
         */
         if (cn->blocknr == 0) {
             goto next ;
         }
+        if (cn->bh && can_dirty(cn) && buffer_dirty(cn->bh)) {
+	    struct buffer_head *tmp_bh;
+	    /* we can race against journal_mark_freed when we try
+	     * to lock_buffer(cn->bh), so we have to inc the buffer
+	     * count, and recheck things after locking
+	     */
+	    tmp_bh = cn->bh;
+	    get_bh(tmp_bh);
+	    lock_buffer(tmp_bh);
+	    if (cn->bh && can_dirty(cn) && buffer_dirty(tmp_bh)) {
+		if (!buffer_journal_dirty(tmp_bh) ||
+		    reiserfs_buffer_prepared(tmp_bh))
+		    BUG();
+		add_to_chunk(chunk, tmp_bh);
+		ret++;
+	    } else {
+		/* note, cn->bh might be null now */
+		unlock_buffer(tmp_bh);
+	    }
+	    put_bh(tmp_bh);
+        }
+next:
+        cn = cn->next ;
+	cond_resched();
+    }
+    return ret ;
+}
+
+/* used by flush_commit_list */
+static int dirty_one_transaction(struct super_block *s,
+                                 struct reiserfs_journal_list *jl)
+{
+    struct reiserfs_journal_cnode *cn;
+    struct reiserfs_journal_list *pjl;
+    int ret = 0 ;
+
+    jl->j_state |= LIST_DIRTY;
+    cn = jl->j_realblock ;
+    while(cn) {
         /* look for a more recent transaction that logged this
         ** buffer.  Only the most recent transaction with a buffer in
         ** it is allowed to send that buffer to disk
         */
-        pjl = find_newer_jl_for_cn(cn) ;
-        if (run == 0 && !pjl && cn->bh && buffer_journal_dirty(cn->bh) &&
-            can_dirty(cn)) 
-        {
-            if (!test_bit(BH_JPrepared, &cn->bh->b_state)) {
-                set_bit(BLOCK_NEEDS_FLUSH, &cn->state) ;
-		submit_logged_buffer(cn->bh) ;
-            } else {
-                /* someone else is using this buffer.  We can't 
-                ** send it to disk right now because they might
-                ** be changing/logging it.
-                */
-                ret = 1 ;
-            }
-        } else if (test_bit(BLOCK_NEEDS_FLUSH, &cn->state)) {
-            clear_bit(BLOCK_NEEDS_FLUSH, &cn->state) ;
-            if (!pjl && cn->bh) {
-                wait_on_buffer(cn->bh) ;
-            }
-            /* check again, someone could have logged while we scheduled */
-            pjl = find_newer_jl_for_cn(cn) ;
-
-            /* before the JDirty_wait bit is set, the 
-            ** buffer is added to the hash list.  So, if we are
-            ** run in the middle of a do_journal_end, we will notice
-            ** if this buffer was logged and added from the latest
-            ** transaction.  In this case, we don't want to decrement
-            ** b_count
-            */
-            if (!pjl && cn->bh && buffer_journal_dirty(cn->bh)) {
-                blocknr = cn->blocknr ;
-                walk_cn = cn ;
-                saved_bh= cn->bh ;
-                /* update all older transactions to show this block
-                ** was flushed
-                */
-                mark_buffer_notjournal_dirty(cn->bh) ;
-                while(walk_cn) {
-                    if (walk_cn->bh && walk_cn->blocknr == blocknr && 
-                         walk_cn->sb == cn->sb) {
-                        if (walk_cn->jlist) {
-                            atomic_dec(&(walk_cn->jlist->j_nonzerolen)) ;
-                        }
-                        walk_cn->bh = NULL ;
-                    }
-                    walk_cn = walk_cn->hnext ;
-                }
-                if (atomic_read(&saved_bh->b_count) < 1) {
-                    reiserfs_warning("clm-2081: bad count on %lu\n", 
-                                      saved_bh->b_blocknr) ;
-                }
-                brelse(saved_bh) ;
-            }
-        }
-        /*
-        ** if the more recent transaction is committed to the log,
-        ** this buffer can be considered flushed.  Decrement our
-        ** counters to reflect one less buffer that needs writing.
-        **
-        ** note, this relies on all of the above code being
-        ** schedule free once pjl comes back non-null.
-        */
-        if (pjl && cn->bh && atomic_read(&pjl->j_commit_left) == 0) {
-            atomic_dec(&cn->jlist->j_nonzerolen) ;
-            cn->bh = NULL ;
+	pjl = find_newer_jl_for_cn(cn) ;
+        if (!pjl && cn->blocknr && cn->bh && buffer_journal_dirty(cn->bh))
+	{
+	    if (!can_dirty(cn))
+	        BUG();
+	    /* if the buffer is prepared, it will either be logged
+	     * or restored.  If restored, we need to make sure
+	     * it actually gets marked dirty
+	     */
+	    mark_buffer_notjournal_new(cn->bh) ;
+	    if (test_bit(BH_JPrepared, &cn->bh->b_state)) {
+	        set_bit(BH_JRestore_dirty, &cn->bh->b_state);
+	    } else {
+	        set_bit(BH_JTest, &cn->bh->b_state);
+	        mark_buffer_dirty(cn->bh);
+	    }
         } 
-next:
         cn = cn->next ;
     }
-    /* the first run through the loop sends all the dirty buffers to
-    ** ll_rw_block.
-    ** the second run through the loop does all the accounting
-    */
-    if (run++ == 0) {
-        goto loop_start ;
-    }
-
-    atomic_set(&(jl->j_flushing), 0) ;
-    wake_up(&(jl->j_flush_wait)) ;
     return ret ;
 }
-/* since we never give dirty buffers to bdflush/kupdate, we have to
-** flush them ourselves.  This runs through the journal lists, finds
-** old metadata in need of flushing and sends it to disk.
-** this does not end transactions, commit anything, or free
-** cnodes.
-**
-** returns the highest transaction id that was flushed last time
-*/
-static unsigned long reiserfs_journal_kupdate(struct super_block *s) {
-    struct reiserfs_journal_list *jl ;
-    int i ;
-    int start ;
-    time_t age ;
-    int ret = 0 ;
 
-    start = SB_JOURNAL_LIST_INDEX(s) ;
+static int kupdate_transactions(struct super_block *s,
+                                   struct reiserfs_journal_list *jl,
+				   struct reiserfs_journal_list **next_jl,
+				   unsigned long *next_trans_id,
+				   int num_blocks,
+				   int num_trans) {
+    int ret = 0;
+    int written = 0 ;
+    int transactions_flushed = 0;
+    unsigned long orig_trans_id = jl->j_trans_id;
+    struct buffer_chunk chunk;
+    struct list_head *entry;
+    chunk.nr = 0;
+
+    down(&SB_JOURNAL(s)->j_flush_sem);
+    if (!journal_list_still_alive(s, orig_trans_id)) {
+	goto done;
+    }
+
+    /* we've got j_flush_sem held, nobody is going to delete any
+     * of these lists out from underneath us
+     */
+    while((num_trans && transactions_flushed < num_trans) ||
+          (!num_trans && written < num_blocks)) {
+
+	if (jl->j_len == 0 || (jl->j_state & LIST_TOUCHED) ||
+	    atomic_read(&jl->j_commit_left))
+	{
+	    del_from_work_list(s, jl);
+	    break;
+	}
+	ret = write_one_transaction(s, jl, &chunk);
 
-    /* safety check to prevent flush attempts during a mount */
-    if (start < 0) {
-        return 0 ;
-    }
-    i = (start + 1) % JOURNAL_LIST_COUNT ;
-    while(i != start) {
-        jl = SB_JOURNAL_LIST(s) + i  ;
-        age = get_seconds() - jl->j_timestamp ;
-        if (jl->j_len > 0 && // age >= (JOURNAL_MAX_COMMIT_AGE * 2) && 
-            atomic_read(&(jl->j_nonzerolen)) > 0 &&
-            atomic_read(&(jl->j_commit_left)) == 0) {
-
-            if (jl->j_trans_id == SB_JOURNAL(s)->j_trans_id) {
-                break ;
-            }
-            /* if ret was already 1, we want to preserve that */
-            ret |= kupdate_one_transaction(s, jl) ;
-        } 
-        if (atomic_read(&(jl->j_nonzerolen)) > 0) {
-            ret |= 1 ;
+	if (ret < 0)
+	    goto done;
+	transactions_flushed++;
+	written += ret;
+	entry = jl->j_list.next;
+
+	/* did we wrap? */
+	if (entry == &SB_JOURNAL(s)->j_journal_list) {
+	    break;
         }
-        i = (i + 1) % JOURNAL_LIST_COUNT ;
+	jl = JOURNAL_LIST_ENTRY(entry);
+
+	/* don't bother with older transactions */
+	if (jl->j_trans_id <= orig_trans_id)
+	    break;
     }
-    return ret ;
+    if (chunk.nr) {
+        write_chunk(&chunk);
+    }
+
+done:
+    up(&SB_JOURNAL(s)->j_flush_sem);
+    return ret;
+}
+
+/* for o_sync and fsync heavy applications, they tend to use
+** all the journa list slots with tiny transactions.  These
+** trigger lots and lots of calls to update the header block, which
+** adds seeks and slows things down.
+**
+** This function tries to clear out a large chunk of the journal lists
+** at once, which makes everything faster since only the newest journal
+** list updates the header block
+*/
+static int flush_used_journal_lists(struct super_block *s,
+                                    struct reiserfs_journal_list *jl) {
+    unsigned long len = 0;
+    unsigned long cur_len;
+    int ret;
+    int i;
+    struct reiserfs_journal_list *tjl;
+    struct reiserfs_journal_list *flush_jl;
+    unsigned long trans_id;
+
+    flush_jl = tjl = jl;
+
+    /* flush for 256 transactions or 256 blocks, whichever comes first */
+    for(i = 0 ; i < 256 && len < 256 ; i++) {
+	if (atomic_read(&tjl->j_commit_left) ||
+	    tjl->j_trans_id < jl->j_trans_id) {
+	    break;
+	}
+	cur_len = atomic_read(&tjl->j_nonzerolen);
+	if (cur_len > 0) {
+	    tjl->j_state &= ~LIST_TOUCHED;
+	}
+	len += cur_len;
+	flush_jl = tjl;
+	if (tjl->j_list.next == &SB_JOURNAL(s)->j_journal_list)
+	    break;
+	tjl = JOURNAL_LIST_ENTRY(tjl->j_list.next);
+    }
+    /* try to find a group of blocks we can flush across all the
+    ** transactions, but only bother if we've actually spanned
+    ** across multiple lists
+    */
+    if (flush_jl != jl) {
+        ret = kupdate_transactions(s, jl, &tjl, &trans_id, len, i);
+    }
+    flush_journal_list(s, flush_jl, 1);
+    return 0;
 }
 
 /*
@@ -1262,6 +1413,10 @@ void remove_journal_hash(struct super_bl
 }
 
 static void free_journal_ram(struct super_block *p_s_sb) {
+  reiserfs_kfree(SB_JOURNAL(p_s_sb)->j_current_jl,
+                 sizeof(struct reiserfs_journal_list), p_s_sb);
+  SB_JOURNAL(p_s_sb)->j_num_lists--;
+
   vfree(SB_JOURNAL(p_s_sb)->j_cnode_free_orig) ;
   free_list_bitmaps(p_s_sb, SB_JOURNAL(p_s_sb)->j_list_bitmap) ;
   free_bitmap_nodes(p_s_sb) ; /* must be after free_list_bitmaps */
@@ -1392,7 +1547,7 @@ static int journal_transaction_is_valid(
     }
     brelse(c_bh) ;
     reiserfs_debug(p_s_sb, REISERFS_DEBUG_CODE, "journal-1006: found valid "
-                   "transaction start offset %lu, len %d id %d\n", 
+                   "transaction start offset %llu, len %d id %d\n",
 		   d_bh->b_blocknr - SB_ONDISK_JOURNAL_1st_BLOCK(p_s_sb), 
 		   get_desc_trans_len(desc), get_desc_trans_id(desc)) ;
     return 1 ;
@@ -1432,7 +1587,7 @@ static int journal_read_transaction(stru
   desc = (struct reiserfs_journal_desc *)d_bh->b_data ;
   trans_offset = d_bh->b_blocknr - SB_ONDISK_JOURNAL_1st_BLOCK(p_s_sb) ;
   reiserfs_debug(p_s_sb, REISERFS_DEBUG_CODE, "journal-1037: "
-                 "journal_read_transaction, offset %lu, len %d mount_id %d\n", 
+                 "journal_read_transaction, offset %llu, len %d mount_id %d\n",
 		 d_bh->b_blocknr - SB_ONDISK_JOURNAL_1st_BLOCK(p_s_sb), 
 		 get_desc_trans_len(desc), get_desc_mount_id(desc)) ;
   if (get_desc_trans_id(desc) < oldest_trans_id) {
@@ -1460,7 +1615,7 @@ static int journal_read_transaction(stru
   commit = (struct reiserfs_journal_commit *)c_bh->b_data ;
   if (journal_compare_desc_commit(p_s_sb, desc, commit)) {
     reiserfs_debug(p_s_sb, REISERFS_DEBUG_CODE, "journal_read_transaction, "
-                   "commit offset %ld had bad time %d or length %d\n", 
+                   "commit offset %llu had bad time %d or length %d\n",
 		   c_bh->b_blocknr -  SB_ONDISK_JOURNAL_1st_BLOCK(p_s_sb), 
 		   get_commit_trans_id(commit), get_commit_trans_len(commit));
     brelse(c_bh) ;
@@ -1628,7 +1783,7 @@ static int journal_read(struct super_blo
   printk("reiserfs: checking transaction log (%s) for (%s)\n",
 	 bdevname(SB_JOURNAL(p_s_sb)->j_dev_bd, b),
 	 reiserfs_bdevname(p_s_sb));
-  start = get_seconds() ;
+  start = get_seconds();
 
   /* step 1, read in the journal header block.  Check the transaction it says 
   ** is the first unflushed, and if that transaction is not valid, 
@@ -1688,7 +1843,7 @@ static int journal_read(struct super_blo
 	oldest_start = d_bh->b_blocknr ;
 	newest_mount_id = get_desc_mount_id(desc) ;
 	reiserfs_debug(p_s_sb, REISERFS_DEBUG_CODE, "journal-1179: Setting "
-	               "oldest_start to offset %lu, trans_id %lu\n", 
+	               "oldest_start to offset %llu, trans_id %lu\n",
 		       oldest_start - SB_ONDISK_JOURNAL_1st_BLOCK(p_s_sb), 
 		       oldest_trans_id) ;
       } else if (oldest_trans_id > get_desc_trans_id(desc)) { 
@@ -1716,7 +1871,7 @@ start_log_replay:
   cur_dblock = oldest_start ;
   if (oldest_trans_id)  {
     reiserfs_debug(p_s_sb, REISERFS_DEBUG_CODE, "journal-1206: Starting replay "
-                   "from offset %lu, trans_id %lu\n", 
+                   "from offset %llu, trans_id %lu\n",
 		   cur_dblock - SB_ONDISK_JOURNAL_1st_BLOCK(p_s_sb), 
 		   oldest_trans_id) ;
 
@@ -1770,70 +1925,26 @@ start_log_replay:
   return 0 ;
 }
 
-
-struct reiserfs_journal_commit_task {
-  struct super_block *p_s_sb ;
-  int jindex ;
-  int wake_on_finish ; /* if this is one, we wake the task_done queue, if it
-                       ** is zero, we free the whole struct on finish
-		       */
-  struct reiserfs_journal_commit_task *self ;
-  struct work_struct work;
-} ;
-
-static void reiserfs_journal_commit_task_func(void *__ct) {
-  struct reiserfs_journal_commit_task *ct = __ct;
-  struct reiserfs_journal_list *jl ;
-
-  reiserfs_write_lock(ct->p_s_sb);
-
-  jl = SB_JOURNAL_LIST(ct->p_s_sb) + ct->jindex ;
-
-  flush_commit_list(ct->p_s_sb, SB_JOURNAL_LIST(ct->p_s_sb) + ct->jindex, 1) ; 
-
-  if (jl->j_len > 0 && atomic_read(&(jl->j_nonzerolen)) > 0 &&
-      atomic_read(&(jl->j_commit_left)) == 0) {
-    kupdate_one_transaction(ct->p_s_sb, jl) ;
-  }
-  reiserfs_kfree(ct->self, sizeof(struct reiserfs_journal_commit_task), ct->p_s_sb) ;
-  reiserfs_write_unlock(ct->p_s_sb);
-}
-
-static void setup_commit_task_arg(struct reiserfs_journal_commit_task *ct,
-                                  struct super_block *p_s_sb, 
-				  int jindex) {
-  if (!ct) {
-    reiserfs_panic(NULL, "journal-1360: setup_commit_task_arg called with NULL struct\n") ;
-  }
-  ct->p_s_sb = p_s_sb ;
-  ct->jindex = jindex ;
-  INIT_WORK(&ct->work, reiserfs_journal_commit_task_func, ct);
-  ct->self = ct ;
-}
-
-static void commit_flush_async(struct super_block *p_s_sb, int jindex) {
-  struct reiserfs_journal_commit_task *ct ;
-  /* using GFP_NOFS, GFP_KERNEL could try to flush inodes, which will try
-  ** to start/join a transaction, which will deadlock
-  */
-  ct = reiserfs_kmalloc(sizeof(struct reiserfs_journal_commit_task), GFP_NOFS, p_s_sb) ;
-  if (ct) {
-    setup_commit_task_arg(ct, p_s_sb, jindex) ;
-    queue_work(commit_wq, &ct->work) ;
-  } else {
-#ifdef CONFIG_REISERFS_CHECK
-    reiserfs_warning("journal-1540: kmalloc failed, doing sync commit\n") ;
-#endif
-    flush_commit_list(p_s_sb, SB_JOURNAL_LIST(p_s_sb) + jindex, 1) ;
-  }
+static struct reiserfs_journal_list *alloc_journal_list(struct super_block *s)
+{
+    struct reiserfs_journal_list *jl;
+retry:
+    jl = reiserfs_kmalloc(sizeof(struct reiserfs_journal_list), GFP_NOFS, s);
+    if (!jl) {
+	yield();
+	goto retry;
+    }
+    memset(jl, 0, sizeof(*jl));
+    INIT_LIST_HEAD(&jl->j_list);
+    INIT_LIST_HEAD(&jl->j_working_list);
+    sema_init(&jl->j_commit_lock, 1);
+    SB_JOURNAL(s)->j_num_lists++;
+    get_journal_list(jl);
+    return jl;
 }
 
 static void journal_list_init(struct super_block *p_s_sb) {
-  int i ;
-  for (i = 0 ; i < JOURNAL_LIST_COUNT ; i++) {
-    init_waitqueue_head(&(SB_JOURNAL_LIST(p_s_sb)[i].j_commit_wait)) ;
-    init_waitqueue_head(&(SB_JOURNAL_LIST(p_s_sb)[i].j_flush_wait)) ;
-  }
+    SB_JOURNAL(p_s_sb)->j_current_jl = alloc_journal_list(p_s_sb);
 }
 
 static int release_journal_dev( struct super_block *super,
@@ -1924,6 +2035,7 @@ int journal_init(struct super_block *p_s
     struct reiserfs_super_block * rs;
     struct reiserfs_journal_header *jh;
     struct reiserfs_journal *journal;
+    struct reiserfs_journal_list *jl;
     char b[BDEVNAME_SIZE];
 
     journal = SB_JOURNAL(p_s_sb) = vmalloc(sizeof (struct reiserfs_journal)) ;
@@ -1934,6 +2046,8 @@ int journal_init(struct super_block *p_s
     memset(journal, 0, sizeof(struct reiserfs_journal)) ;
     INIT_LIST_HEAD(&SB_JOURNAL(p_s_sb)->j_bitmap_nodes) ;
     INIT_LIST_HEAD (&SB_JOURNAL(p_s_sb)->j_prealloc_list);
+    INIT_LIST_HEAD(&SB_JOURNAL(p_s_sb)->j_working_list);
+    INIT_LIST_HEAD(&SB_JOURNAL(p_s_sb)->j_journal_list);
     reiserfs_allocate_list_bitmaps(p_s_sb, SB_JOURNAL(p_s_sb)->j_list_bitmap, 
  				   SB_BMAP_NR(p_s_sb)) ;
     allocate_bitmap_nodes(p_s_sb) ;
@@ -2041,10 +2155,6 @@ int journal_init(struct super_block *p_s
   brelse (bhjh);
      
   SB_JOURNAL(p_s_sb)->j_list_bitmap_index = 0 ;
-  SB_JOURNAL_LIST_INDEX(p_s_sb) = -10000 ; /* make sure flush_old_commits does not try to flush a list while replay is on */
-
-  /* clear out the journal list array */
-  memset(SB_JOURNAL_LIST(p_s_sb), 0, sizeof(struct reiserfs_journal_list) * JOURNAL_LIST_COUNT) ; 
   journal_list_init(p_s_sb) ;
 
   memset(SB_JOURNAL(p_s_sb)->j_list_hash_table, 0, JOURNAL_HASH_SIZE * sizeof(struct reiserfs_journal_cnode *)) ;
@@ -2061,13 +2171,13 @@ int journal_init(struct super_block *p_s
   SB_JOURNAL(p_s_sb)->j_last = NULL ;	  
   SB_JOURNAL(p_s_sb)->j_first = NULL ;     
   init_waitqueue_head(&(SB_JOURNAL(p_s_sb)->j_join_wait)) ;
-  init_waitqueue_head(&(SB_JOURNAL(p_s_sb)->j_wait)) ; 
+  sema_init(&SB_JOURNAL(p_s_sb)->j_lock, 1);
+  sema_init(&SB_JOURNAL(p_s_sb)->j_flush_sem, 1);
 
   SB_JOURNAL(p_s_sb)->j_trans_id = 10 ;  
   SB_JOURNAL(p_s_sb)->j_mount_id = 10 ; 
   SB_JOURNAL(p_s_sb)->j_state = 0 ;
   atomic_set(&(SB_JOURNAL(p_s_sb)->j_jlock), 0) ;
-  atomic_set(&(SB_JOURNAL(p_s_sb)->j_wlock), 0) ;
   SB_JOURNAL(p_s_sb)->j_cnode_free_list = allocate_cnodes(num_cnodes) ;
   SB_JOURNAL(p_s_sb)->j_cnode_free_orig = SB_JOURNAL(p_s_sb)->j_cnode_free_list ;
   SB_JOURNAL(p_s_sb)->j_cnode_free = SB_JOURNAL(p_s_sb)->j_cnode_free_list ? num_cnodes : 0 ;
@@ -2075,8 +2185,9 @@ int journal_init(struct super_block *p_s
   SB_JOURNAL(p_s_sb)->j_must_wait = 0 ;
 
   init_journal_hash(p_s_sb) ;
-  SB_JOURNAL_LIST(p_s_sb)[0].j_list_bitmap = get_list_bitmap(p_s_sb, SB_JOURNAL_LIST(p_s_sb)) ;
-  if (!(SB_JOURNAL_LIST(p_s_sb)[0].j_list_bitmap)) {
+  jl = SB_JOURNAL(p_s_sb)->j_current_jl;
+  jl->j_list_bitmap = get_list_bitmap(p_s_sb, jl);
+  if (!jl->j_list_bitmap) {
     reiserfs_warning("journal-2005, get_list_bitmap failed for journal list 0\n") ;
     goto free_and_return;
   }
@@ -2084,16 +2195,12 @@ int journal_init(struct super_block *p_s
     reiserfs_warning("Replay Failure, unable to mount\n") ;
     goto free_and_return;
   }
-  SB_JOURNAL_LIST_INDEX(p_s_sb) = 0 ; /* once the read is done, we can set this
-                                         where it belongs */
-
-  if (reiserfs_dont_log (p_s_sb))
-    return 0;
 
   reiserfs_mounted_fs_count++ ;
   if (reiserfs_mounted_fs_count <= 1)
     commit_wq = create_workqueue("reiserfs");
 
+  INIT_WORK(&journal->j_work, flush_async_commits, p_s_sb);
   return 0 ;
 free_and_return:
   free_journal_ram(p_s_sb);
@@ -2107,8 +2214,6 @@ free_and_return:
 */
 int journal_transaction_should_end(struct reiserfs_transaction_handle *th, int new_alloc) {
   time_t now = get_seconds() ;
-  if (reiserfs_dont_log(th->t_super)) 
-    return 0 ;
   /* cannot restart while nested */
   if (th->t_refcount > 1)
     return 0 ;
@@ -2148,6 +2253,35 @@ void reiserfs_wait_on_write_block(struct
                !test_bit(WRITERS_BLOCKED, &SB_JOURNAL(s)->j_state)) ;
 }
 
+static void queue_log_writer(struct super_block *s) {
+    set_bit(WRITERS_QUEUED, &SB_JOURNAL(s)->j_state);
+    sleep_on(&SB_JOURNAL(s)->j_join_wait);
+}
+
+static void wake_queued_writers(struct super_block *s) {
+    if (test_and_clear_bit(WRITERS_QUEUED, &SB_JOURNAL(s)->j_state))
+        wake_up(&SB_JOURNAL(s)->j_join_wait);
+}
+
+static void let_transaction_grow(struct super_block *sb,
+                                 unsigned long trans_id)
+{
+    unsigned long bcount = SB_JOURNAL(sb)->j_bcount;
+    while(1) {
+	yield();
+        while ((atomic_read(&SB_JOURNAL(sb)->j_wcount) > 0 ||
+	        atomic_read(&SB_JOURNAL(sb)->j_jlock)) &&
+	       SB_JOURNAL(sb)->j_trans_id == trans_id) {
+	    queue_log_writer(sb);
+	}
+	if (SB_JOURNAL(sb)->j_trans_id != trans_id)
+	    break;
+	if (bcount == SB_JOURNAL(sb)->j_bcount)
+	    break;
+	bcount = SB_JOURNAL(sb)->j_bcount;
+    }
+}
+
 /* join == true if you must join an existing transaction.
 ** join == false if you can deal with waiting for others to finish
 **
@@ -2157,15 +2291,14 @@ void reiserfs_wait_on_write_block(struct
 static int do_journal_begin_r(struct reiserfs_transaction_handle *th, struct super_block * p_s_sb,unsigned long nblocks,int join) {
   time_t now = get_seconds() ;
   int old_trans_id  ;
+  struct reiserfs_journal *journal = SB_JOURNAL(p_s_sb);
+  struct reiserfs_transaction_handle myth;
+  int sched_count = 0;
 
   reiserfs_check_lock_depth("journal_begin") ;
   RFALSE( p_s_sb->s_flags & MS_RDONLY, 
 	  "clm-2078: calling journal_begin on readonly FS") ;
 
-  if (reiserfs_dont_log(p_s_sb)) {
-    th->t_super = p_s_sb ; /* others will check this for the don't log flag */
-    return 0 ;
-  }
   PROC_INFO_INC( p_s_sb, journal.journal_being );
   /* set here for journal_join */
   th->t_refcount = 1;
@@ -2173,66 +2306,76 @@ static int do_journal_begin_r(struct rei
 
 relock:
   lock_journal(p_s_sb) ;
+  journal->j_bcount++;
 
-  if (test_bit(WRITERS_BLOCKED, &SB_JOURNAL(p_s_sb)->j_state)) {
+  if (test_bit(WRITERS_BLOCKED, &journal->j_state)) {
     unlock_journal(p_s_sb) ;
     reiserfs_wait_on_write_block(p_s_sb) ;
     PROC_INFO_INC( p_s_sb, journal.journal_relock_writers );
     goto relock ;
   }
+  now = get_seconds();
 
   /* if there is no room in the journal OR
   ** if this transaction is too old, and we weren't called joinable, wait for it to finish before beginning 
   ** we don't sleep if there aren't other writers
   */
 
-  if (  (!join && SB_JOURNAL(p_s_sb)->j_must_wait > 0) ||
-     ( !join && (SB_JOURNAL(p_s_sb)->j_len_alloc + nblocks + 2) >= SB_JOURNAL_MAX_BATCH(p_s_sb)) || 
-     (!join && atomic_read(&(SB_JOURNAL(p_s_sb)->j_wcount)) > 0 && SB_JOURNAL(p_s_sb)->j_trans_start_time > 0 && 
-      (now - SB_JOURNAL(p_s_sb)->j_trans_start_time) > SB_JOURNAL_MAX_TRANS_AGE(p_s_sb)) ||
-     (!join && atomic_read(&(SB_JOURNAL(p_s_sb)->j_jlock)) ) ||
-     (!join && SB_JOURNAL(p_s_sb)->j_cnode_free < (SB_JOURNAL_TRANS_MAX(p_s_sb) * 3))) {
+  if ( (!join && journal->j_must_wait > 0) ||
+     ( !join && (journal->j_len_alloc + nblocks + 2) >= SB_JOURNAL_MAX_BATCH(p_s_sb)) ||
+     (!join && atomic_read(&journal->j_wcount) > 0 && journal->j_trans_start_time > 0 &&
+      (now - journal->j_trans_start_time) > SB_JOURNAL_MAX_TRANS_AGE(p_s_sb)) ||
+     (!join && atomic_read(&journal->j_jlock)) ||
+     (!join && journal->j_cnode_free < (SB_JOURNAL_TRANS_MAX(p_s_sb) * 3))) {
 
+    old_trans_id = journal->j_trans_id;
     unlock_journal(p_s_sb) ; /* allow others to finish this transaction */
 
-    /* if writer count is 0, we can just force this transaction to end, and start
-    ** a new one afterwards.
-    */
-    if (atomic_read(&(SB_JOURNAL(p_s_sb)->j_wcount)) <= 0) {
-      struct reiserfs_transaction_handle myth ;
-      journal_join(&myth, p_s_sb, 1) ;
-      reiserfs_prepare_for_journal(p_s_sb, SB_BUFFER_WITH_SB(p_s_sb), 1) ;
-      journal_mark_dirty(&myth, p_s_sb, SB_BUFFER_WITH_SB(p_s_sb)) ;
-      do_journal_end(&myth, p_s_sb,1,COMMIT_NOW) ;
+    if (!join && (journal->j_len_alloc + nblocks + 2) >=
+        SB_JOURNAL_MAX_BATCH(p_s_sb) &&
+	((journal->j_len + nblocks + 2) * 100) < (journal->j_len_alloc * 75))
+    {
+	if (atomic_read(&journal->j_wcount) > 10) {
+	    sched_count++;
+	    queue_log_writer(p_s_sb);
+	    goto relock;
+	}
+    }
+    /* don't mess with joining the transaction if all we have to do is
+     * wait for someone else to do a commit
+     */
+    if (atomic_read(&journal->j_jlock)) {
+	while (journal->j_trans_id == old_trans_id &&
+	       atomic_read(&journal->j_jlock)) {
+	    queue_log_writer(p_s_sb);
+        }
+	goto relock;
+    }
+    journal_join(&myth, p_s_sb, 1) ;
+
+    /* someone might have ended the transaction while we joined */
+    if (old_trans_id != SB_JOURNAL(p_s_sb)->j_trans_id) {
+        do_journal_end(&myth, p_s_sb, 1, 0) ;
     } else {
-      /* but if the writer count isn't zero, we have to wait for the current writers to finish.
-      ** They won't batch on transaction end once we set j_jlock
-      */
-      atomic_set(&(SB_JOURNAL(p_s_sb)->j_jlock), 1) ;
-      old_trans_id = SB_JOURNAL(p_s_sb)->j_trans_id ;
-      while(atomic_read(&(SB_JOURNAL(p_s_sb)->j_jlock)) &&
-            SB_JOURNAL(p_s_sb)->j_trans_id == old_trans_id) {
-	sleep_on(&(SB_JOURNAL(p_s_sb)->j_join_wait)) ;
-      }
+        do_journal_end(&myth, p_s_sb, 1, COMMIT_NOW) ;
     }
+
     PROC_INFO_INC( p_s_sb, journal.journal_relock_wcount );
     goto relock ;
   }
-
-  if (SB_JOURNAL(p_s_sb)->j_trans_start_time == 0) { /* we are the first writer, set trans_id */
-    SB_JOURNAL(p_s_sb)->j_trans_start_time = now ;
+  /* we are the first writer, set trans_id */
+  if (journal->j_trans_start_time == 0) {
+    journal->j_trans_start_time = get_seconds();
   }
-  atomic_inc(&(SB_JOURNAL(p_s_sb)->j_wcount)) ;
-  SB_JOURNAL(p_s_sb)->j_len_alloc += nblocks ;
+  atomic_inc(&(journal->j_wcount)) ;
+  journal->j_len_alloc += nblocks ;
   th->t_blocks_logged = 0 ;
   th->t_blocks_allocated = nblocks ;
-  th->t_trans_id = SB_JOURNAL(p_s_sb)->j_trans_id ;
+  th->t_trans_id = journal->j_trans_id ;
   unlock_journal(p_s_sb) ;
-  p_s_sb->s_dirt = 1; 
   return 0 ;
 }
 
-
 static int journal_join(struct reiserfs_transaction_handle *th, struct super_block *p_s_sb, unsigned long nblocks) {
   struct reiserfs_transaction_handle *cur_th = current->journal_info;
 
@@ -2277,11 +2420,6 @@ int journal_begin(struct reiserfs_transa
     return ret ;
 }
 
-/* not used at all */
-int journal_prepare(struct super_block  * p_s_sb, struct buffer_head *bh) {
-  return 0 ;
-}
-
 /*
 ** puts bh into the current transaction.  If it was already there, reorders removes the
 ** old pointers from the hash, and puts new ones in (to make sure replay happen in the right order).
@@ -2297,18 +2435,14 @@ int journal_mark_dirty(struct reiserfs_t
   int prepared = 0 ;
 
   PROC_INFO_INC( p_s_sb, journal.mark_dirty );
-  if (reiserfs_dont_log(th->t_super)) {
-    mark_buffer_dirty(bh) ;
-    return 0 ;
-  }
-
   if (th->t_trans_id != SB_JOURNAL(p_s_sb)->j_trans_id) {
     reiserfs_panic(th->t_super, "journal-1577: handle trans id %ld != current trans id %ld\n", 
                    th->t_trans_id, SB_JOURNAL(p_s_sb)->j_trans_id);
   }
-  p_s_sb->s_dirt = 1 ;
+  p_s_sb->s_dirt = 1;
 
   prepared = test_and_clear_bit(BH_JPrepared, &bh->b_state) ;
+  clear_bit(BH_JRestore_dirty, &bh->b_state);
   /* already in this transaction, we are done */
   if (buffer_journaled(bh)) {
     PROC_INFO_INC( p_s_sb, journal.mark_dirty_already );
@@ -2319,13 +2453,12 @@ int journal_mark_dirty(struct reiserfs_t
   ** a dirty or journal_dirty or locked buffer to be logged, as some changes
   ** could get to disk too early.  NOT GOOD.
   */
-  if (!prepared || buffer_locked(bh)) {
+  if (!prepared || buffer_locked(bh) || buffer_dirty(bh)) {
     printk("journal-1777: buffer %llu bad state %cPREPARED %cLOCKED %cDIRTY %cJDIRTY_WAIT\n", (unsigned long long)bh->b_blocknr, prepared ? ' ' : '!', 
                             buffer_locked(bh) ? ' ' : '!',
 			    buffer_dirty(bh) ? ' ' : '!',
 			    buffer_journal_dirty(bh) ? ' ' : '!') ;
   }
-  count_already_incd = clear_prepared_bits(bh) ;
 
   if (atomic_read(&(SB_JOURNAL(p_s_sb)->j_wcount)) <= 0) {
     printk("journal-1409: journal_mark_dirty returning because j_wcount was %d\n", atomic_read(&(SB_JOURNAL(p_s_sb)->j_wcount))) ;
@@ -2344,14 +2477,6 @@ int journal_mark_dirty(struct reiserfs_t
     mark_buffer_notjournal_dirty(bh) ;
   }
 
-  if (buffer_dirty(bh)) {
-    clear_buffer_dirty(bh) ;
-  }
-
-  if (buffer_journaled(bh)) { /* must double check after getting lock */
-    goto done ;
-  }
-
   if (SB_JOURNAL(p_s_sb)->j_len > SB_JOURNAL(p_s_sb)->j_len_alloc) {
     SB_JOURNAL(p_s_sb)->j_len_alloc = SB_JOURNAL(p_s_sb)->j_len + JOURNAL_PER_BALANCE_CNT ;
   }
@@ -2391,24 +2516,6 @@ int journal_mark_dirty(struct reiserfs_t
     SB_JOURNAL(p_s_sb)->j_first = cn ;
     SB_JOURNAL(p_s_sb)->j_last = cn ;
   }
-done:
-  return 0 ;
-}
-
-/*
-** if buffer already in current transaction, do a journal_mark_dirty
-** otherwise, just mark it dirty and move on.  Used for writes to meta blocks
-** that don't need journaling
-*/
-int journal_mark_dirty_nolog(struct reiserfs_transaction_handle *th, struct super_block *p_s_sb, struct buffer_head *bh) {
-  if (reiserfs_dont_log(th->t_super) || buffer_journaled(bh) || 
-      buffer_journal_dirty(bh)) {
-    return journal_mark_dirty(th, p_s_sb, bh) ;
-  }
-  if (get_journal_hash_dev(p_s_sb, SB_JOURNAL(p_s_sb)->j_list_hash_table, bh->b_blocknr)) {
-    return journal_mark_dirty(th, p_s_sb, bh) ;
-  }
-  mark_buffer_dirty(bh) ;
   return 0 ;
 }
 
@@ -2474,7 +2581,6 @@ static int remove_from_transaction(struc
     if (atomic_read(&(bh->b_count)) < 0) {
       printk("journal-1752: remove from trans, b_count < 0\n") ;
     }
-    if (!buffer_locked(bh)) reiserfs_clean_and_file_buffer(bh) ; 
     ret = 1 ;
   }
   SB_JOURNAL(p_s_sb)->j_len-- ;
@@ -2500,7 +2606,7 @@ static int can_dirty(struct reiserfs_jou
   int can_dirty = 1 ;
   
   /* first test hprev.  These are all newer than cn, so any node here
-  ** with the name block number and dev means this node can't be sent
+  ** with the same block number and dev means this node can't be sent
   ** to disk right now.
   */
   while(cur && can_dirty) {
@@ -2551,72 +2657,56 @@ int journal_end_sync(struct reiserfs_tra
 ** change flush_commit_lists to have a repeat parameter too.
 **
 */
-void flush_async_commits(struct super_block *p_s_sb) {
-  int i ;
-
-  for (i = 0 ; i < JOURNAL_LIST_COUNT ; i++) {
-    if (i != SB_JOURNAL_LIST_INDEX(p_s_sb)) {
-      flush_commit_list(p_s_sb, SB_JOURNAL_LIST(p_s_sb) + i, 1) ; 
-    }
+static void flush_async_commits(void *p) {
+  struct super_block *p_s_sb = p;
+  struct reiserfs_journal_list *jl;
+  struct list_head *entry;
+
+  lock_kernel();
+  if (!list_empty(&SB_JOURNAL(p_s_sb)->j_journal_list)) {
+      /* last entry is the youngest, commit it and you get everything */
+      entry = SB_JOURNAL(p_s_sb)->j_journal_list.prev;
+      jl = JOURNAL_LIST_ENTRY(entry);
+      flush_commit_list(p_s_sb, jl, 1);
   }
+  unlock_kernel();
 }
 
 /*
 ** flushes any old transactions to disk
 ** ends the current transaction if it is too old
-**
-** also calls flush_journal_list with old_only == 1, which allows me to reclaim
-** memory and such from the journal lists whose real blocks are all on disk.
-**
-** called by sync_dev_journal from buffer.c
 */
-int flush_old_commits(struct super_block *p_s_sb, int immediate) {
-  int i ;
-  int count = 0;
-  int start ; 
-  time_t now ; 
-  struct reiserfs_transaction_handle th ; 
-
-  start =  SB_JOURNAL_LIST_INDEX(p_s_sb) ;
-  now = get_seconds() ;
-
-  /* safety check so we don't flush while we are replaying the log during mount */
-  if (SB_JOURNAL_LIST_INDEX(p_s_sb) < 0) {
-    return 0  ;
-  }
-  /* starting with oldest, loop until we get to the start */
-  i = (SB_JOURNAL_LIST_INDEX(p_s_sb) + 1) % JOURNAL_LIST_COUNT ;
-  while(i != start) {
-    if (SB_JOURNAL_LIST(p_s_sb)[i].j_len > 0 && ((now - SB_JOURNAL_LIST(p_s_sb)[i].j_timestamp) > SB_JOURNAL_MAX_COMMIT_AGE(p_s_sb) ||
-       immediate)) {
-      /* we have to check again to be sure the current transaction did not change */
-      if (i != SB_JOURNAL_LIST_INDEX(p_s_sb))  {
-	flush_commit_list(p_s_sb, SB_JOURNAL_LIST(p_s_sb) + i, 1) ;
-      }
-    }
-    i = (i + 1) % JOURNAL_LIST_COUNT ;
-    count++ ;
-  }
-  /* now, check the current transaction.  If there are no writers, and it is too old, finish it, and
-  ** force the commit blocks to disk
-  */
-  if (!immediate && atomic_read(&(SB_JOURNAL(p_s_sb)->j_wcount)) <= 0 &&  
-     SB_JOURNAL(p_s_sb)->j_trans_start_time > 0 && 
-     SB_JOURNAL(p_s_sb)->j_len > 0 && 
-     (now - SB_JOURNAL(p_s_sb)->j_trans_start_time) > SB_JOURNAL_MAX_TRANS_AGE(p_s_sb)) {
-    journal_join(&th, p_s_sb, 1) ;
-    reiserfs_prepare_for_journal(p_s_sb, SB_BUFFER_WITH_SB(p_s_sb), 1) ;
-    journal_mark_dirty(&th, p_s_sb, SB_BUFFER_WITH_SB(p_s_sb)) ;
-    do_journal_end(&th, p_s_sb,1, COMMIT_NOW) ;
-  } else if (immediate) { /* belongs above, but I wanted this to be very explicit as a special case.  If they say to 
-                             flush, we must be sure old transactions hit the disk too. */
-    journal_join(&th, p_s_sb, 1) ;
-    reiserfs_prepare_for_journal(p_s_sb, SB_BUFFER_WITH_SB(p_s_sb), 1) ;
-    journal_mark_dirty(&th, p_s_sb, SB_BUFFER_WITH_SB(p_s_sb)) ;
-    do_journal_end(&th, p_s_sb,1, COMMIT_NOW | WAIT) ;
-  }
-   reiserfs_journal_kupdate(p_s_sb) ;
-   return 0 ;
+int reiserfs_flush_old_commits(struct super_block *p_s_sb) {
+    time_t now ;
+    struct reiserfs_transaction_handle th ;
+
+    now = get_seconds();
+    /* safety check so we don't flush while we are replaying the log during
+     * mount
+     */
+    if (list_empty(&SB_JOURNAL(p_s_sb)->j_journal_list)) {
+	return 0  ;
+    }
+
+    /* check the current transaction.  If there are no writers, and it is
+     * too old, finish it, and force the commit blocks to disk
+     */
+    if (atomic_read(&(SB_JOURNAL(p_s_sb)->j_wcount)) <= 0 &&
+        SB_JOURNAL(p_s_sb)->j_trans_start_time > 0 &&
+        SB_JOURNAL(p_s_sb)->j_len > 0 &&
+        (now - SB_JOURNAL(p_s_sb)->j_trans_start_time) >
+	SB_JOURNAL_MAX_TRANS_AGE(p_s_sb))
+    {
+	journal_join(&th, p_s_sb, 1) ;
+	reiserfs_prepare_for_journal(p_s_sb, SB_BUFFER_WITH_SB(p_s_sb), 1) ;
+	journal_mark_dirty(&th, p_s_sb, SB_BUFFER_WITH_SB(p_s_sb)) ;
+
+	/* we're only being called from kreiserfsd, it makes no sense to do
+	** an async commit so that kreiserfsd can do it later
+	*/
+	do_journal_end(&th, p_s_sb,1, COMMIT_NOW | WAIT) ;
+    }
+    return p_s_sb->s_dirt;
 }
 
 /*
@@ -2637,6 +2727,7 @@ static int check_journal_end(struct reis
   int flush = flags & FLUSH_ALL ;
   int commit_now = flags & COMMIT_NOW ;
   int wait_on_commit = flags & WAIT ;
+  struct reiserfs_journal_list *jl;
 
   if (th->t_trans_id != SB_JOURNAL(p_s_sb)->j_trans_id) {
     reiserfs_panic(th->t_super, "journal-1577: handle trans id %ld != current trans id %ld\n", 
@@ -2653,13 +2744,7 @@ static int check_journal_end(struct reis
   ** care of in this trans
   */
   if (SB_JOURNAL(p_s_sb)->j_len == 0) {
-    int wcount = atomic_read(&(SB_JOURNAL(p_s_sb)->j_wcount)) ;
-    unlock_journal(p_s_sb) ;
-    if (atomic_read(&(SB_JOURNAL(p_s_sb)->j_jlock))  > 0 && wcount <= 0) {
-      atomic_dec(&(SB_JOURNAL(p_s_sb)->j_jlock)) ;
-      wake_up(&(SB_JOURNAL(p_s_sb)->j_join_wait)) ;
-    }
-    return 0 ;
+    BUG();
   }
   /* if wcount > 0, and we are called to with flush or commit_now,
   ** we wait on j_join_wait.  We will wake up when the last writer has
@@ -2669,24 +2754,37 @@ static int check_journal_end(struct reis
   */
   if (atomic_read(&(SB_JOURNAL(p_s_sb)->j_wcount)) > 0) {
     if (flush || commit_now) {
-      int orig_jindex = SB_JOURNAL_LIST_INDEX(p_s_sb) ;
+      unsigned trans_id ;
+
+      jl = SB_JOURNAL(p_s_sb)->j_current_jl;
+      trans_id = jl->j_trans_id;
+
       atomic_set(&(SB_JOURNAL(p_s_sb)->j_jlock), 1) ;
       if (flush) {
         SB_JOURNAL(p_s_sb)->j_next_full_flush = 1 ;
       }
       unlock_journal(p_s_sb) ;
+
       /* sleep while the current transaction is still j_jlocked */
-      while(atomic_read(&(SB_JOURNAL(p_s_sb)->j_jlock)) && 
-            SB_JOURNAL(p_s_sb)->j_trans_id == th->t_trans_id) {
-	sleep_on(&(SB_JOURNAL(p_s_sb)->j_join_wait)) ;
-      }
-      if (commit_now) {
-	if (wait_on_commit) {
-	  flush_commit_list(p_s_sb,  SB_JOURNAL_LIST(p_s_sb) + orig_jindex, 1) ;
-	} else {
-	  commit_flush_async(p_s_sb, orig_jindex) ; 
+      while(SB_JOURNAL(p_s_sb)->j_trans_id == trans_id) {
+	if (atomic_read(&SB_JOURNAL(p_s_sb)->j_jlock)) {
+	    queue_log_writer(p_s_sb);
+        } else {
+	    lock_journal(p_s_sb);
+	    if (SB_JOURNAL(p_s_sb)->j_trans_id == trans_id) {
+	        atomic_set(&(SB_JOURNAL(p_s_sb)->j_jlock), 1) ;
+	    }
+	    unlock_journal(p_s_sb);
 	}
       }
+      if (SB_JOURNAL(p_s_sb)->j_trans_id == trans_id) {
+          BUG();
+      }
+      if (commit_now && journal_list_still_alive(p_s_sb, trans_id) &&
+          wait_on_commit)
+      {
+	  flush_commit_list(p_s_sb, jl, 1) ;
+      }
       return 0 ;
     } 
     unlock_journal(p_s_sb) ;
@@ -2694,7 +2792,7 @@ static int check_journal_end(struct reis
   }
 
   /* deal with old transactions where we are the last writers */
-  now = get_seconds() ;
+  now = get_seconds();
   if ((now - SB_JOURNAL(p_s_sb)->j_trans_start_time) > SB_JOURNAL_MAX_TRANS_AGE(p_s_sb)) {
     commit_now = 1 ;
     SB_JOURNAL(p_s_sb)->j_next_async_flush = 1 ;
@@ -2734,25 +2832,21 @@ int journal_mark_freed(struct reiserfs_t
   struct buffer_head *bh = NULL ;
   struct reiserfs_list_bitmap *jb = NULL ;
   int cleaned = 0 ;
-  
-  if (reiserfs_dont_log(th->t_super)) {
-    bh = sb_find_get_block(p_s_sb, blocknr) ;
-    if (bh && buffer_dirty (bh)) {
-      printk ("journal_mark_freed(dont_log): dirty buffer on hash list: %lx %d\n", bh->b_state, blocknr);
-      BUG ();
-    }
-    brelse (bh);
-    return 0 ;
+
+  cn = get_journal_hash_dev(p_s_sb, SB_JOURNAL(p_s_sb)->j_hash_table, blocknr);
+  if (cn && cn->bh) {
+      bh = cn->bh ;
+      get_bh(bh) ;
   }
-  bh = sb_find_get_block(p_s_sb, blocknr) ;
   /* if it is journal new, we just remove it from this transaction */
   if (bh && buffer_journal_new(bh)) {
     mark_buffer_notjournal_new(bh) ;
     clear_prepared_bits(bh) ;
+    reiserfs_clean_and_file_buffer(bh) ;
     cleaned = remove_from_transaction(p_s_sb, blocknr, cleaned) ;
   } else {
     /* set the bit for this block in the journal bitmap for this transaction */
-    jb = SB_JOURNAL_LIST(p_s_sb)[SB_JOURNAL_LIST_INDEX(p_s_sb)].j_list_bitmap ;
+    jb = SB_JOURNAL(p_s_sb)->j_current_jl->j_list_bitmap;
     if (!jb) {
       reiserfs_panic(p_s_sb, "journal-1702: journal_mark_freed, journal_list_bitmap is NULL\n") ;
     }
@@ -2762,6 +2856,7 @@ int journal_mark_freed(struct reiserfs_t
 
     if (bh) {
       clear_prepared_bits(bh) ;
+      reiserfs_clean_and_file_buffer(bh) ;
     }
     cleaned = remove_from_transaction(p_s_sb, blocknr, cleaned) ;
 
@@ -2793,7 +2888,6 @@ int journal_mark_freed(struct reiserfs_t
   }
 
   if (bh) {
-    reiserfs_clean_and_file_buffer(bh) ;
     put_bh(bh) ; /* get_hash grabs the buffer */
     if (atomic_read(&(bh->b_count)) < 0) {
       printk("journal-2165: bh->b_count < 0\n") ;
@@ -2803,50 +2897,84 @@ int journal_mark_freed(struct reiserfs_t
 }
 
 void reiserfs_update_inode_transaction(struct inode *inode) {
-  
-  REISERFS_I(inode)->i_trans_index = SB_JOURNAL_LIST_INDEX(inode->i_sb);
-
+  REISERFS_I(inode)->i_jl = SB_JOURNAL(inode->i_sb)->j_current_jl;
   REISERFS_I(inode)->i_trans_id = SB_JOURNAL(inode->i_sb)->j_trans_id ;
 }
 
-static int reiserfs_inode_in_this_transaction(struct inode *inode) {
-  if (REISERFS_I(inode)->i_trans_id == SB_JOURNAL(inode->i_sb)->j_trans_id || 
-      REISERFS_I(inode)->i_trans_id == 0) {
-    return 1; 
-  } 
-  return 0 ;
+static void __commit_trans_jl(struct inode *inode, unsigned long id,
+                                 struct reiserfs_journal_list *jl)
+{
+    struct reiserfs_transaction_handle th ;
+    struct super_block *sb = inode->i_sb ;
+
+    /* is it from the current transaction, or from an unknown transaction? */
+    if (id == SB_JOURNAL(sb)->j_trans_id) {
+	jl = SB_JOURNAL(sb)->j_current_jl;
+	/* try to let other writers come in and grow this transaction */
+	let_transaction_grow(sb, id);
+	if (SB_JOURNAL(sb)->j_trans_id != id) {
+	    goto flush_commit_only;
+	}
+
+	journal_begin(&th, sb, 1) ;
+
+	/* someone might have ended this transaction while we joined */
+	if (SB_JOURNAL(sb)->j_trans_id != id) {
+	    reiserfs_prepare_for_journal(sb, SB_BUFFER_WITH_SB(sb), 1) ;
+	    journal_mark_dirty(&th, sb, SB_BUFFER_WITH_SB(sb)) ;
+	    journal_end(&th, sb, 1) ;
+	    goto flush_commit_only;
+	}
+
+	journal_end_sync(&th, sb, 1) ;
+
+    } else {
+	/* this gets tricky, we have to make sure the journal list in
+	 * the inode still exists.  We know the list is still around
+	 * if we've got a larger transaction id than the oldest list
+	 */
+flush_commit_only:
+	if (journal_list_still_alive(inode->i_sb, id)) {
+	    flush_commit_list(sb, jl, 1) ;
+	}
+    }
+    /* otherwise the list is gone, and long since committed */
 }
 
 void reiserfs_commit_for_inode(struct inode *inode) {
-  struct reiserfs_journal_list *jl ;
-  struct reiserfs_transaction_handle th ;
-  struct super_block *sb = inode->i_sb ;
-
-  jl = SB_JOURNAL_LIST(sb) + REISERFS_I(inode)->i_trans_index ;
-
-  /* is it from the current transaction, or from an unknown transaction? */
-  if (reiserfs_inode_in_this_transaction(inode)) {
-    journal_join(&th, sb, 1) ;
-    reiserfs_update_inode_transaction(inode) ;
-    journal_end_sync(&th, sb, 1) ;
-  } else if (jl->j_trans_id == REISERFS_I(inode)->i_trans_id) {
-    flush_commit_list(sb, jl, 1) ;
-  }
-  /* if the transaction id does not match, this list is long since flushed
-  ** and we don't have to do anything here
-  */
+    unsigned long id = REISERFS_I(inode)->i_trans_id;
+    struct reiserfs_journal_list *jl = REISERFS_I(inode)->i_jl;
+
+    /* for the whole inode, assume unset id means it was
+     * changed in the current transaction.  More conservative
+     */
+    if (!id || !jl) {
+	reiserfs_update_inode_transaction(inode) ;
+	id = REISERFS_I(inode)->i_trans_id;
+	/* jl will be updated in __commit_trans_jl */
+    }
+
+    __commit_trans_jl(inode, id, jl);
 }
 
 void reiserfs_restore_prepared_buffer(struct super_block *p_s_sb, 
                                       struct buffer_head *bh) {
-  PROC_INFO_INC( p_s_sb, journal.restore_prepared );
-  if (reiserfs_dont_log (p_s_sb))
-    return;
-
-  if (!bh) {
-    return ;
-  }
-  clear_bit(BH_JPrepared, &bh->b_state) ;
+    PROC_INFO_INC( p_s_sb, journal.restore_prepared );
+    if (!bh) {
+	return ;
+    }
+    if (test_and_clear_bit(BH_JRestore_dirty, &bh->b_state) &&
+	buffer_journal_dirty(bh)) {
+	struct reiserfs_journal_cnode *cn;
+	cn = get_journal_hash_dev(p_s_sb,
+	                          SB_JOURNAL(p_s_sb)->j_list_hash_table,
+				  bh->b_blocknr);
+	if (cn && can_dirty(cn)) {
+	    set_bit(BH_JTest, &bh->b_state);
+	    mark_buffer_dirty(bh);
+        }
+    }
+    clear_bit(BH_JPrepared, &bh->b_state) ;
 }
 
 extern struct tree_balance *cur_tb ;
@@ -2857,29 +2985,39 @@ extern struct tree_balance *cur_tb ;
 ** wait on it.
 ** 
 */
-void reiserfs_prepare_for_journal(struct super_block *p_s_sb, 
+int reiserfs_prepare_for_journal(struct super_block *p_s_sb,
                                   struct buffer_head *bh, int wait) {
-  int retry_count = 0 ;
-
   PROC_INFO_INC( p_s_sb, journal.prepare );
-  if (reiserfs_dont_log (p_s_sb))
-    return;
 
-  while(!test_bit(BH_JPrepared, &bh->b_state) ||
-        (wait && buffer_locked(bh))) {
-    if (buffer_journaled(bh)) {
-      set_bit(BH_JPrepared, &bh->b_state) ;
-      return ;
-    }
-    set_bit(BH_JPrepared, &bh->b_state) ;
-    if (wait) {
-      RFALSE( buffer_locked(bh) && cur_tb != NULL,
-	      "waiting while do_balance was running\n") ;
-      wait_on_buffer(bh) ;
+    if (test_set_buffer_locked(bh)) {
+	if (!wait)
+	    return 0;
+	lock_buffer(bh);
+    }
+    set_bit(BH_JPrepared, &bh->b_state);
+    if (test_clear_buffer_dirty(bh) && buffer_journal_dirty(bh))  {
+	clear_bit(BH_JTest, &bh->b_state);
+	set_bit(BH_JRestore_dirty, &bh->b_state);
+    }
+    unlock_buffer(bh);
+    return 1;
+}
+
+static void flush_old_journal_lists(struct super_block *s) {
+    struct reiserfs_journal_list *jl;
+    struct list_head *entry;
+    time_t now = get_seconds();
+
+    while(!list_empty(&SB_JOURNAL(s)->j_journal_list)) {
+        entry = SB_JOURNAL(s)->j_journal_list.next;
+	jl = JOURNAL_LIST_ENTRY(entry);
+	/* this check should always be run, to send old lists to disk */
+	if (jl->j_timestamp < (now - (JOURNAL_MAX_TRANS_AGE * 4))) {
+	    flush_used_journal_lists(s, jl);
+	} else {
+	    break;
+	}
     }
-    PROC_INFO_INC( p_s_sb, journal.prepare_retry );
-    retry_count++ ;
-  }
 }
 
 /* 
@@ -2898,23 +3036,24 @@ static int do_journal_end(struct reiserf
   struct buffer_head *c_bh ; /* commit bh */
   struct buffer_head *d_bh ; /* desc bh */
   int cur_write_start = 0 ; /* start index of current log write */
-  int cur_blocks_left = 0 ; /* number of journal blocks left to write */
   int old_start ;
   int i ;
-  int jindex ;
-  int orig_jindex ;
   int flush = flags & FLUSH_ALL ;
-  int commit_now = flags & COMMIT_NOW ;
   int wait_on_commit = flags & WAIT ;
-  struct reiserfs_super_block *rs ; 
-  int trans_half ;
+  struct reiserfs_journal_list *jl, *temp_jl;
+  struct list_head *entry, *safe;
+  unsigned long jindex;
+  unsigned long commit_trans_id;
+  int trans_half;
 
   if (th->t_refcount > 1)
     BUG() ;
 
   current->journal_info = th->t_handle_save;
-  if (reiserfs_dont_log(th->t_super)) {
-    return 0 ;
+  reiserfs_check_lock_depth("journal end");
+  if (SB_JOURNAL(p_s_sb)->j_len == 0) {
+      reiserfs_prepare_for_journal(p_s_sb, SB_BUFFER_WITH_SB(p_s_sb), 1) ;
+      journal_mark_dirty(th, p_s_sb, SB_BUFFER_WITH_SB(p_s_sb)) ;
   }
 
   lock_journal(p_s_sb) ;
@@ -2923,24 +3062,24 @@ static int do_journal_end(struct reiserf
     flush = 1 ;
   }
   if (SB_JOURNAL(p_s_sb)->j_next_async_flush) {
-    flags |= COMMIT_NOW ;
-    commit_now = 1 ;
+    flags |= COMMIT_NOW | WAIT;
+    wait_on_commit = 1;
   }
 
   /* check_journal_end locks the journal, and unlocks if it does not return 1 
   ** it tells us if we should continue with the journal_end, or just return
   */
   if (!check_journal_end(th, p_s_sb, nblocks, flags)) {
-    return 0 ;
+    p_s_sb->s_dirt = 1;
+    wake_queued_writers(p_s_sb);
+    goto out ;
   }
 
   /* check_journal_end might set these, check again */
   if (SB_JOURNAL(p_s_sb)->j_next_full_flush) {
     flush = 1 ;
   }
-  if (SB_JOURNAL(p_s_sb)->j_next_async_flush) {
-    commit_now = 1 ;
-  }
+
   /*
   ** j must wait means we have to flush the log blocks, and the real blocks for
   ** this transaction
@@ -2957,10 +3096,9 @@ static int do_journal_end(struct reiserf
   current->journal_info = th->t_handle_save ;
 #endif
   
-  rs = SB_DISK_SUPER_BLOCK(p_s_sb) ;
   /* setup description block */
   d_bh = journal_getblk(p_s_sb, SB_ONDISK_JOURNAL_1st_BLOCK(p_s_sb) + SB_JOURNAL(p_s_sb)->j_start) ; 
-  set_buffer_uptodate(d_bh) ;
+  set_buffer_uptodate(d_bh);
   desc = (struct reiserfs_journal_desc *)(d_bh)->b_data ;
   memset(d_bh->b_data, 0, d_bh->b_size) ;
   memcpy(get_journal_desc_magic (d_bh), JOURNAL_DESC_MAGIC, 8) ;
@@ -2975,28 +3113,33 @@ static int do_journal_end(struct reiserf
   set_buffer_uptodate(c_bh) ;
 
   /* init this journal list */
-  atomic_set(&(SB_JOURNAL_LIST(p_s_sb)[SB_JOURNAL_LIST_INDEX(p_s_sb)].j_older_commits_done), 0) ;
-  SB_JOURNAL_LIST(p_s_sb)[SB_JOURNAL_LIST_INDEX(p_s_sb)].j_trans_id = SB_JOURNAL(p_s_sb)->j_trans_id ;
-  SB_JOURNAL_LIST(p_s_sb)[SB_JOURNAL_LIST_INDEX(p_s_sb)].j_timestamp = SB_JOURNAL(p_s_sb)->j_trans_start_time ;
-  SB_JOURNAL_LIST(p_s_sb)[SB_JOURNAL_LIST_INDEX(p_s_sb)].j_commit_bh = c_bh ;
-  SB_JOURNAL_LIST(p_s_sb)[SB_JOURNAL_LIST_INDEX(p_s_sb)].j_start = SB_JOURNAL(p_s_sb)->j_start ;
-  SB_JOURNAL_LIST(p_s_sb)[SB_JOURNAL_LIST_INDEX(p_s_sb)].j_len = SB_JOURNAL(p_s_sb)->j_len ;  
-  atomic_set(&(SB_JOURNAL_LIST(p_s_sb)[SB_JOURNAL_LIST_INDEX(p_s_sb)].j_nonzerolen), SB_JOURNAL(p_s_sb)->j_len) ;
-  atomic_set(&(SB_JOURNAL_LIST(p_s_sb)[SB_JOURNAL_LIST_INDEX(p_s_sb)].j_commit_left), SB_JOURNAL(p_s_sb)->j_len + 2);
-  SB_JOURNAL_LIST(p_s_sb)[SB_JOURNAL_LIST_INDEX(p_s_sb)].j_realblock = NULL ;
-  atomic_set(&(SB_JOURNAL_LIST(p_s_sb)[SB_JOURNAL_LIST_INDEX(p_s_sb)].j_commit_flushing), 1) ;
-  atomic_set(&(SB_JOURNAL_LIST(p_s_sb)[SB_JOURNAL_LIST_INDEX(p_s_sb)].j_flushing), 1) ;
-
-  /* which is faster, locking/unlocking at the start and end of the for
-  ** or locking once per iteration around the insert_journal_hash?
-  ** eitherway, we are write locking insert_journal_hash.  The ENTIRE FOR
-  ** LOOP MUST not cause schedule to occur.
-  */
+  jl = SB_JOURNAL(p_s_sb)->j_current_jl;
+
+  /* we lock the commit before doing anything because
+   * we want to make sure nobody tries to run flush_commit_list until
+   * the new transaction is fully setup, and we've already flushed the
+   * ordered bh list
+   */
+  down(&jl->j_commit_lock);
+
+  /* save the transaction id in case we need to commit it later */
+  commit_trans_id = jl->j_trans_id;
+
+  atomic_set(&jl->j_older_commits_done, 0) ;
+  jl->j_trans_id = SB_JOURNAL(p_s_sb)->j_trans_id ;
+  jl->j_timestamp = SB_JOURNAL(p_s_sb)->j_trans_start_time ;
+  jl->j_commit_bh = c_bh ;
+  jl->j_start = SB_JOURNAL(p_s_sb)->j_start ;
+  jl->j_len = SB_JOURNAL(p_s_sb)->j_len ;
+  atomic_set(&jl->j_nonzerolen, SB_JOURNAL(p_s_sb)->j_len) ;
+  atomic_set(&jl->j_commit_left, SB_JOURNAL(p_s_sb)->j_len + 2);
+  jl->j_realblock = NULL ;
 
-  /* for each real block, add it to the journal list hash,
+  /* The ENTIRE FOR LOOP MUST not cause schedule to occur.
+  **  for each real block, add it to the journal list hash,
   ** copy into real block index array in the commit or desc block
   */
-  trans_half = journal_trans_half(p_s_sb->s_blocksize) ;
+  trans_half = journal_trans_half(p_s_sb->s_blocksize);
   for (i = 0, cn = SB_JOURNAL(p_s_sb)->j_first ; cn ; cn = cn->next, i++) {
     if (test_bit(BH_JDirty, &cn->bh->b_state) ) {
       jl_cn = get_cnode(p_s_sb) ;
@@ -3004,7 +3147,7 @@ static int do_journal_end(struct reiserf
         reiserfs_panic(p_s_sb, "journal-1676, get_cnode returned NULL\n") ;
       }
       if (i == 0) {
-        SB_JOURNAL_LIST(p_s_sb)[SB_JOURNAL_LIST_INDEX(p_s_sb)].j_realblock = jl_cn ;
+        jl->j_realblock = jl_cn ;
       }
       jl_cn->prev = last_cn ;
       jl_cn->next = NULL ;
@@ -3020,9 +3163,9 @@ static int do_journal_end(struct reiserf
       }
       jl_cn->blocknr = cn->bh->b_blocknr ; 
       jl_cn->state = 0 ;
-      jl_cn->sb = p_s_sb ;
+      jl_cn->sb = p_s_sb;
       jl_cn->bh = cn->bh ;
-      jl_cn->jlist = SB_JOURNAL_LIST(p_s_sb) + SB_JOURNAL_LIST_INDEX(p_s_sb) ;
+      jl_cn->jlist = jl;
       insert_journal_hash(SB_JOURNAL(p_s_sb)->j_list_hash_table, jl_cn) ; 
       if (i < trans_half) {
 	desc->j_realblock[i] = cpu_to_le32(cn->bh->b_blocknr) ;
@@ -3033,7 +3176,6 @@ static int do_journal_end(struct reiserf
       i-- ;
     }
   }
-  
   set_desc_trans_len(desc, SB_JOURNAL(p_s_sb)->j_len) ;
   set_desc_mount_id(desc, SB_JOURNAL(p_s_sb)->j_mount_id) ;
   set_desc_trans_id(desc, SB_JOURNAL(p_s_sb)->j_trans_id) ;
@@ -3041,53 +3183,35 @@ static int do_journal_end(struct reiserf
 
   /* special check in case all buffers in the journal were marked for not logging */
   if (SB_JOURNAL(p_s_sb)->j_len == 0) {
-    brelse(d_bh) ;
-    brelse(c_bh) ;
-    unlock_journal(p_s_sb) ;
-    printk("journal-2020: do_journal_end: BAD desc->j_len is ZERO\n") ;
-    atomic_set(&(SB_JOURNAL(p_s_sb)->j_jlock), 0) ;
-    wake_up(&(SB_JOURNAL(p_s_sb)->j_join_wait)) ;
-    return 0 ;
+    BUG();
   }
 
+  /* we're about to dirty all the log blocks, mark the description block
+   * dirty now too.  Don't mark the commit block dirty until all the
+   * others are on disk
+   */
+  mark_buffer_dirty(d_bh);
+
   /* first data block is j_start + 1, so add one to cur_write_start wherever you use it */
   cur_write_start = SB_JOURNAL(p_s_sb)->j_start ;
-  cur_blocks_left = SB_JOURNAL(p_s_sb)->j_len  ;
   cn = SB_JOURNAL(p_s_sb)->j_first ;
   jindex = 1 ; /* start at one so we don't get the desc again */
-  while(cur_blocks_left > 0) {
+  while(cn) {
+    clear_bit(BH_JNew, &(cn->bh->b_state)) ;
     /* copy all the real blocks into log area.  dirty log blocks */
     if (test_bit(BH_JDirty, &cn->bh->b_state)) {
       struct buffer_head *tmp_bh ;
       tmp_bh =  journal_getblk(p_s_sb, SB_ONDISK_JOURNAL_1st_BLOCK(p_s_sb) + 
 		       ((cur_write_start + jindex) % SB_ONDISK_JOURNAL_SIZE(p_s_sb))) ;
-      set_buffer_uptodate(tmp_bh) ;
+      set_buffer_uptodate(tmp_bh);
       memcpy(tmp_bh->b_data, cn->bh->b_data, cn->bh->b_size) ;  
+      mark_buffer_dirty(tmp_bh);
       jindex++ ;
-    } else {
-      /* JDirty cleared sometime during transaction.  don't log this one */
-      printk("journal-2048: do_journal_end: BAD, buffer in journal hash, but not JDirty!\n") ;
-    }
-    cn = cn->next ;
-    cur_blocks_left-- ;
-  }
-
-  /* we are done  with both the c_bh and d_bh, but
-  ** c_bh must be written after all other commit blocks,
-  ** so we dirty/relse c_bh in flush_commit_list, with commit_left <= 1.
-  */
-
-  /* now loop through and mark all buffers from this transaction as JDirty_wait
-  ** clear the JDirty bit, clear BH_JNew too.  
-  ** if they weren't JDirty, they weren't logged, just relse them and move on
-  */
-  cn = SB_JOURNAL(p_s_sb)->j_first ; 
-  while(cn) {
-    clear_bit(BH_JNew, &(cn->bh->b_state)) ;
-    if (test_bit(BH_JDirty, &(cn->bh->b_state))) {
       set_bit(BH_JDirty_wait, &(cn->bh->b_state)) ; 
       clear_bit(BH_JDirty, &(cn->bh->b_state)) ;
     } else {
+      /* JDirty cleared sometime during transaction.  don't log this one */
+      reiserfs_warning("journal-2048: do_journal_end: BAD, buffer in journal hash, but not JDirty!\n") ;
       brelse(cn->bh) ;
     }
     next = cn->next ;
@@ -3095,30 +3219,17 @@ static int do_journal_end(struct reiserf
     cn = next ;
   }
 
-  /* unlock the journal list for committing and flushing */
-  atomic_set(&(SB_JOURNAL_LIST(p_s_sb)[SB_JOURNAL_LIST_INDEX(p_s_sb)].j_commit_flushing), 0) ;
-  atomic_set(&(SB_JOURNAL_LIST(p_s_sb)[SB_JOURNAL_LIST_INDEX(p_s_sb)].j_flushing), 0) ;
-
-  orig_jindex = SB_JOURNAL_LIST_INDEX(p_s_sb) ;
-  jindex = (SB_JOURNAL_LIST_INDEX(p_s_sb) + 1) % JOURNAL_LIST_COUNT ; 
-  SB_JOURNAL_LIST_INDEX(p_s_sb) = jindex ;
+  /* we are done  with both the c_bh and d_bh, but
+  ** c_bh must be written after all other commit blocks,
+  ** so we dirty/relse c_bh in flush_commit_list, with commit_left <= 1.
+  */
 
-  /* write any buffers that must hit disk before this commit is done */
-  fsync_buffers_list(&(SB_JOURNAL(p_s_sb)->j_dirty_buffers_lock),
-		     &(SB_JOURNAL(p_s_sb)->j_dirty_buffers)) ;
+  SB_JOURNAL(p_s_sb)->j_current_jl = alloc_journal_list(p_s_sb);
 
-  /* honor the flush and async wishes from the caller */
-  if (flush) {
-  
-    flush_commit_list(p_s_sb, SB_JOURNAL_LIST(p_s_sb) + orig_jindex, 1) ;
-    flush_journal_list(p_s_sb,  SB_JOURNAL_LIST(p_s_sb) + orig_jindex , 1) ;  
-  } else if (commit_now) {
-    if (wait_on_commit) {
-      flush_commit_list(p_s_sb, SB_JOURNAL_LIST(p_s_sb) + orig_jindex, 1) ;
-    } else {
-      commit_flush_async(p_s_sb, orig_jindex) ; 
-    }
-  }
+  /* now it is safe to insert this transaction on the main list */
+  list_add_tail(&jl->j_list, &SB_JOURNAL(p_s_sb)->j_journal_list);
+  list_add_tail(&jl->j_working_list, &SB_JOURNAL(p_s_sb)->j_working_list);
+  SB_JOURNAL(p_s_sb)->j_num_work_lists++;
 
   /* reset journal values for the next transaction */
   old_start = SB_JOURNAL(p_s_sb)->j_start ;
@@ -3130,57 +3241,96 @@ static int do_journal_end(struct reiserf
   SB_JOURNAL(p_s_sb)->j_len = 0 ;
   SB_JOURNAL(p_s_sb)->j_trans_start_time = 0 ;
   SB_JOURNAL(p_s_sb)->j_trans_id++ ;
+  SB_JOURNAL(p_s_sb)->j_current_jl->j_trans_id = SB_JOURNAL(p_s_sb)->j_trans_id;
   SB_JOURNAL(p_s_sb)->j_must_wait = 0 ;
   SB_JOURNAL(p_s_sb)->j_len_alloc = 0 ;
   SB_JOURNAL(p_s_sb)->j_next_full_flush = 0 ;
   SB_JOURNAL(p_s_sb)->j_next_async_flush = 0 ;
   init_journal_hash(p_s_sb) ; 
 
+  /* tail conversion targets have to hit the disk before we end the
+   * transaction.  Otherwise a later transaction might repack the tail
+   * before this transaction commits, leaving the data block unflushed and
+   * clean, if we crash before the later transaction commits, the data block
+   * is lost.
+   */
+  fsync_buffers_list(&(SB_JOURNAL(p_s_sb)->j_dirty_buffers_lock),
+		     &(SB_JOURNAL(p_s_sb)->j_dirty_buffers)) ;
+  up(&jl->j_commit_lock);
+
+  /* honor the flush wishes from the caller, simple commits can
+  ** be done outside the journal lock, they are done below
+  */
+  if (flush) {
+    flush_commit_list(p_s_sb, jl, 1) ;
+    flush_journal_list(p_s_sb, jl, 1) ;
+  }
+
+
   /* if the next transaction has any chance of wrapping, flush 
   ** transactions that might get overwritten.  If any journal lists are very 
   ** old flush them as well.  
   */
-  for (i = 0 ; i < JOURNAL_LIST_COUNT ; i++) {
-    jindex = i ;
-    if (SB_JOURNAL_LIST(p_s_sb)[jindex].j_len > 0 && SB_JOURNAL(p_s_sb)->j_start <= SB_JOURNAL_LIST(p_s_sb)[jindex].j_start) {
-      if ((SB_JOURNAL(p_s_sb)->j_start + SB_JOURNAL_TRANS_MAX(p_s_sb) + 1) >= SB_JOURNAL_LIST(p_s_sb)[jindex].j_start) {
-	flush_journal_list(p_s_sb, SB_JOURNAL_LIST(p_s_sb) + jindex, 1) ; 
-      }
-    } else if (SB_JOURNAL_LIST(p_s_sb)[jindex].j_len > 0 && 
-              (SB_JOURNAL(p_s_sb)->j_start + SB_JOURNAL_TRANS_MAX(p_s_sb) + 1) > SB_ONDISK_JOURNAL_SIZE(p_s_sb)) {
-      if (((SB_JOURNAL(p_s_sb)->j_start + SB_JOURNAL_TRANS_MAX(p_s_sb) + 1) % SB_ONDISK_JOURNAL_SIZE(p_s_sb)) >= 
-            SB_JOURNAL_LIST(p_s_sb)[jindex].j_start) {
-	flush_journal_list(p_s_sb, SB_JOURNAL_LIST(p_s_sb) + jindex, 1 ) ; 
+first_jl:
+  list_for_each_safe(entry, safe, &SB_JOURNAL(p_s_sb)->j_journal_list) {
+    temp_jl = JOURNAL_LIST_ENTRY(entry);
+    if (SB_JOURNAL(p_s_sb)->j_start <= temp_jl->j_start) {
+      if ((SB_JOURNAL(p_s_sb)->j_start + SB_JOURNAL_TRANS_MAX(p_s_sb) + 1) >=
+          temp_jl->j_start)
+      {
+	flush_used_journal_lists(p_s_sb, temp_jl);
+	goto first_jl;
+      } else if ((SB_JOURNAL(p_s_sb)->j_start +
+                  SB_JOURNAL_TRANS_MAX(p_s_sb) + 1) <
+		  SB_ONDISK_JOURNAL_SIZE(p_s_sb))
+      {
+          /* if we don't cross into the next transaction and we don't
+	   * wrap, there is no way we can overlap any later transactions
+	   * break now
+	   */
+	  break;
+      }
+    } else if ((SB_JOURNAL(p_s_sb)->j_start +
+                SB_JOURNAL_TRANS_MAX(p_s_sb) + 1) >
+		SB_ONDISK_JOURNAL_SIZE(p_s_sb))
+    {
+      if (((SB_JOURNAL(p_s_sb)->j_start + SB_JOURNAL_TRANS_MAX(p_s_sb) + 1) %
+            SB_ONDISK_JOURNAL_SIZE(p_s_sb)) >= temp_jl->j_start)
+      {
+	flush_used_journal_lists(p_s_sb, temp_jl);
+	goto first_jl;
+      } else {
+	  /* we don't overlap anything from out start to the end of the
+	   * log, and our wrapped portion doesn't overlap anything at
+	   * the start of the log.  We can break
+	   */
+	  break;
       }
-    } 
-    /* this check should always be run, to send old lists to disk */
-    if (SB_JOURNAL_LIST(p_s_sb)[jindex].j_len > 0 && 
-              SB_JOURNAL_LIST(p_s_sb)[jindex].j_timestamp < 
-	      (get_seconds() - (SB_JOURNAL_MAX_TRANS_AGE(p_s_sb) * 4))) {
-	flush_journal_list(p_s_sb, SB_JOURNAL_LIST(p_s_sb) + jindex, 1 ) ; 
     }
   }
+  flush_old_journal_lists(p_s_sb);
 
-  /* if the next journal_list is still in use, flush it */
-  if (SB_JOURNAL_LIST(p_s_sb)[SB_JOURNAL_LIST_INDEX(p_s_sb)].j_len != 0) {
-    flush_journal_list(p_s_sb, SB_JOURNAL_LIST(p_s_sb) + SB_JOURNAL_LIST_INDEX(p_s_sb), 1) ; 
-  }
-
-  /* we don't want anyone flushing the new transaction's list */
-  atomic_set(&(SB_JOURNAL_LIST(p_s_sb)[SB_JOURNAL_LIST_INDEX(p_s_sb)].j_commit_flushing), 1) ;
-  atomic_set(&(SB_JOURNAL_LIST(p_s_sb)[SB_JOURNAL_LIST_INDEX(p_s_sb)].j_flushing), 1) ;
-  SB_JOURNAL_LIST(p_s_sb)[SB_JOURNAL_LIST_INDEX(p_s_sb)].j_list_bitmap = get_list_bitmap(p_s_sb, SB_JOURNAL_LIST(p_s_sb) + 
-											 SB_JOURNAL_LIST_INDEX(p_s_sb)) ;
+  SB_JOURNAL(p_s_sb)->j_current_jl->j_list_bitmap = get_list_bitmap(p_s_sb, SB_JOURNAL(p_s_sb)->j_current_jl) ;
 
-  if (!(SB_JOURNAL_LIST(p_s_sb)[SB_JOURNAL_LIST_INDEX(p_s_sb)].j_list_bitmap)) {
+  if (!(SB_JOURNAL(p_s_sb)->j_current_jl->j_list_bitmap)) {
     reiserfs_panic(p_s_sb, "journal-1996: do_journal_end, could not get a list bitmap\n") ;
   }
-  unlock_journal(p_s_sb) ;
+
   atomic_set(&(SB_JOURNAL(p_s_sb)->j_jlock), 0) ;
+  unlock_journal(p_s_sb) ;
   /* wake up any body waiting to join. */
+  clear_bit(WRITERS_QUEUED, &SB_JOURNAL(p_s_sb)->j_state);
   wake_up(&(SB_JOURNAL(p_s_sb)->j_join_wait)) ;
+
+  if (!flush) {
+      if (wait_on_commit) {
+	  if (journal_list_still_alive(p_s_sb, commit_trans_id))
+	      flush_commit_list(p_s_sb, jl, 1) ;
+      } else {
+          queue_work(commit_wq, &SB_JOURNAL(p_s_sb)->j_work);
+      }
+  }
+out:
+  reiserfs_check_lock_depth("journal end2");
   return 0 ;
 }
-
-
-
diff -puN fs/reiserfs/objectid.c~reiserfs-logging fs/reiserfs/objectid.c
--- 25/fs/reiserfs/objectid.c~reiserfs-logging	Wed Mar 24 15:14:39 2004
+++ 25-akpm/fs/reiserfs/objectid.c	Wed Mar 24 15:14:39 2004
@@ -86,7 +86,6 @@ __u32 reiserfs_get_unused_objectid (stru
     }
 
     journal_mark_dirty(th, s, SB_BUFFER_WITH_SB (s));
-    s->s_dirt = 1;
     return unused_objectid;
 }
 
@@ -105,8 +104,6 @@ void reiserfs_release_objectid (struct r
 
     reiserfs_prepare_for_journal(s, SB_BUFFER_WITH_SB(s), 1) ;
     journal_mark_dirty(th, s, SB_BUFFER_WITH_SB (s)); 
-    s->s_dirt = 1;
-
 
     /* start at the beginning of the objectid map (i = 0) and go to
        the end of it (i = disk_sb->s_oid_cursize).  Linear search is
diff -puN fs/reiserfs/procfs.c~reiserfs-logging fs/reiserfs/procfs.c
--- 25/fs/reiserfs/procfs.c~reiserfs-logging	Wed Mar 24 15:14:39 2004
+++ 25-akpm/fs/reiserfs/procfs.c	Wed Mar 24 15:14:39 2004
@@ -87,7 +87,7 @@ static int show_super(struct seq_file *m
 	struct reiserfs_sb_info *r = REISERFS_SB(sb);
     
 	seq_printf(m,	"state: \t%s\n"
-			"mount options: \t%s%s%s%s%s%s%s%s%s%s%s%s\n"
+			"mount options: \t%s%s%s%s%s%s%s%s%s%s%s\n"
 			"gen. counter: \t%i\n"
 			"s_kmallocs: \t%i\n"
 			"s_disk_reads: \t%i\n"
@@ -131,7 +131,6 @@ static int show_super(struct seq_file *m
 			reiserfs_test4( sb ) ? "TEST4 " : "",
 			have_large_tails( sb ) ? "TAILS " : have_small_tails(sb)?"SMALL_TAILS ":"NO_TAILS ",
 			replay_only( sb ) ? "REPLAY_ONLY " : "",
-			reiserfs_dont_log( sb ) ? "DONT_LOG " : "LOG ",
 			convert_reiserfs( sb ) ? "CONV " : "",
 
 			atomic_read( &r -> s_generation_counter ),
@@ -370,7 +369,6 @@ static int show_journal(struct seq_file 
 			"j_first_unflushed_offset: \t%lu\n"
 			"j_last_flush_trans_id: \t%lu\n"
 			"j_trans_start_time: \t%li\n"
-			"j_journal_list_index: \t%i\n"
 			"j_list_bitmap_index: \t%i\n"
 			"j_must_wait: \t%i\n"
 			"j_next_full_flush: \t%i\n"
@@ -416,7 +414,6 @@ static int show_journal(struct seq_file 
 			JF( j_first_unflushed_offset ),
 			JF( j_last_flush_trans_id ),
 			JF( j_trans_start_time ),
-			JF( j_journal_list_index ),
 			JF( j_list_bitmap_index ),
 			JF( j_must_wait ),
 			JF( j_next_full_flush ),
diff -puN fs/reiserfs/super.c~reiserfs-logging fs/reiserfs/super.c
--- 25/fs/reiserfs/super.c~reiserfs-logging	Wed Mar 24 15:14:39 2004
+++ 25-akpm/fs/reiserfs/super.c	Wed Mar 24 15:14:39 2004
@@ -59,22 +59,26 @@ static int is_any_reiserfs_magic_string 
 static int reiserfs_remount (struct super_block * s, int * flags, char * data);
 static int reiserfs_statfs (struct super_block * s, struct kstatfs * buf);
 
-static void reiserfs_write_super (struct super_block * s)
+static void reiserfs_sync_fs (struct super_block * s)
 {
+    if (!(s->s_flags & MS_RDONLY)) {
+        struct reiserfs_transaction_handle th;
+	reiserfs_write_lock(s);
+	journal_begin(&th, s, 1);
+	journal_end_sync(&th, s, 1);
+	reiserfs_flush_old_commits(s);
+	s->s_dirt = 0;
+	reiserfs_write_unlock(s);
+    }
+}
 
-  int dirty = 0 ;
-  reiserfs_write_lock(s);
-  if (!(s->s_flags & MS_RDONLY)) {
-    dirty = flush_old_commits(s, 1) ;
-  }
-  s->s_dirt = dirty;
-  reiserfs_write_unlock(s);
+static void reiserfs_write_super(struct super_block *s)
+{
+    reiserfs_sync_fs(s);
 }
 
 static void reiserfs_write_super_lockfs (struct super_block * s)
 {
-
-  int dirty = 0 ;
   struct reiserfs_transaction_handle th ;
   reiserfs_write_lock(s);
   if (!(s->s_flags & MS_RDONLY)) {
@@ -84,7 +88,7 @@ static void reiserfs_write_super_lockfs 
     reiserfs_block_writes(&th) ;
     journal_end(&th, s, 1) ;
   }
-  s->s_dirt = dirty;
+  s->s_dirt = 0;
   reiserfs_write_unlock(s);
 }
 
@@ -805,7 +809,6 @@ static int reiserfs_remount (struct supe
     reiserfs_prepare_for_journal(s, SB_BUFFER_WITH_SB(s), 1) ;
     set_sb_umount_state( rs, REISERFS_SB(s)->s_mount_state );
     journal_mark_dirty(&th, s, SB_BUFFER_WITH_SB (s));
-    s->s_dirt = 0;
   } else {
     /* remount read-write */
     if (!(s->s_flags & MS_RDONLY))
@@ -822,12 +825,12 @@ static int reiserfs_remount (struct supe
     set_sb_umount_state( rs, REISERFS_ERROR_FS );
     /* mark_buffer_dirty (SB_BUFFER_WITH_SB (s), 1); */
     journal_mark_dirty(&th, s, SB_BUFFER_WITH_SB (s));
-    s->s_dirt = 0;
     REISERFS_SB(s)->s_mount_state = REISERFS_VALID_FS ;
   }
   /* this will force a full flush of all journal lists */
   SB_JOURNAL(s)->j_must_wait = 1 ;
   journal_end(&th, s, 10) ;
+  s->s_dirt = 0;
 
   if (!( *mount_flags & MS_RDONLY ) )
     finish_unfinished( s );
@@ -1392,8 +1395,6 @@ static int reiserfs_fill_super (struct s
 	
 	/* look for files which were to be removed in previous session */
 	finish_unfinished (s);
-
-	s->s_dirt = 0;
     } else {
 	if ( old_format_only(s) && !silent) {
 	    reiserfs_warning("reiserfs: using 3.5.x disk format\n") ;
diff -puN include/linux/reiserfs_fs.h~reiserfs-logging include/linux/reiserfs_fs.h
--- 25/include/linux/reiserfs_fs.h~reiserfs-logging	Wed Mar 24 15:14:39 2004
+++ 25-akpm/include/linux/reiserfs_fs.h	Wed Mar 24 15:14:39 2004
@@ -1702,23 +1702,39 @@ struct reiserfs_journal_header {
 	 (((block)<<(JBH_HASH_SHIFT - 6)) ^ ((block) >> 13) ^ ((block) << (JBH_HASH_SHIFT - 12))))
 #define journal_hash(t,sb,block) ((t)[_jhashfn((sb),(block)) & JBH_HASH_MASK])
 
-/* finds n'th buffer with 0 being the start of this commit.  Needs to go away, j_ap_blocks has changed
-** since I created this.  One chunk of code in journal.c needs changing before deleting it
-*/
-#define JOURNAL_BUFFER(j,n) ((j)->j_ap_blocks[((j)->j_start + (n)) % JOURNAL_BLOCK_COUNT])
-
 // We need these to make journal.c code more readable
 #define journal_find_get_block(s, block) __find_get_block(SB_JOURNAL(s)->j_dev_bd, block, s->s_blocksize)
 #define journal_getblk(s, block) __getblk(SB_JOURNAL(s)->j_dev_bd, block, s->s_blocksize)
 #define journal_bread(s, block) __bread(SB_JOURNAL(s)->j_dev_bd, block, s->s_blocksize)
 
+/*
+** transaction handle which is passed around for all journal calls
+*/
+struct reiserfs_transaction_handle {
+  struct super_block *t_super ; /* super for this FS when journal_begin was
+				   called. saves calls to reiserfs_get_super
+				   also used by nested transactions to make
+				   sure they are nesting on the right FS
+				   _must_ be first in the handle
+				*/
+  int t_refcount;
+  int t_blocks_logged ;         /* number of blocks this writer has logged */
+  int t_blocks_allocated ;      /* number of blocks this writer allocated */
+  unsigned long t_trans_id ;    /* sanity check, equals the current trans id */
+  void *t_handle_save ;		/* save existing current->journal_info */
+  int displace_new_blocks:1;	/* if new block allocation occurres, that block
+				   should be displaced from others */
+} ;
+
+int journal_mark_dirty(struct reiserfs_transaction_handle *, struct super_block *, struct buffer_head *bh) ;
+int reiserfs_flush_old_commits(struct super_block *);
 void reiserfs_commit_for_inode(struct inode *) ;
 void reiserfs_update_inode_transaction(struct inode *) ;
 void reiserfs_wait_on_write_block(struct super_block *s) ;
 void reiserfs_block_writes(struct reiserfs_transaction_handle *th) ;
 void reiserfs_allow_writes(struct super_block *s) ;
 void reiserfs_check_lock_depth(char *caller) ;
-void reiserfs_prepare_for_journal(struct super_block *, struct buffer_head *bh, int wait) ;
+int reiserfs_prepare_for_journal(struct super_block *, struct buffer_head *bh, int wait) ;
 void reiserfs_restore_prepared_buffer(struct super_block *, struct buffer_head *bh) ;
 int journal_init(struct super_block *, const char * j_dev_name, int old_format, unsigned int) ;
 int journal_release(struct reiserfs_transaction_handle*, struct super_block *) ;
@@ -1730,7 +1746,6 @@ int journal_mark_freed(struct reiserfs_t
 int journal_transaction_should_end(struct reiserfs_transaction_handle *, int) ;
 int reiserfs_in_journal(struct super_block *p_s_sb, int bmap_nr, int bit_nr, int searchall, b_blocknr_t *next) ;
 int journal_begin(struct reiserfs_transaction_handle *, struct super_block *p_s_sb, unsigned long) ;
-void flush_async_commits(struct super_block *p_s_sb) ;
 
 int buffer_journaled(const struct buffer_head *bh) ;
 int mark_buffer_journal_new(struct buffer_head *bh) ;
diff -puN include/linux/reiserfs_fs_i.h~reiserfs-logging include/linux/reiserfs_fs_i.h
--- 25/include/linux/reiserfs_fs_i.h~reiserfs-logging	Wed Mar 24 15:14:39 2004
+++ 25-akpm/include/linux/reiserfs_fs_i.h	Wed Mar 24 15:14:39 2004
@@ -3,6 +3,8 @@
 
 #include <linux/list.h>
 
+struct reiserfs_journal_list;
+
 /** bitmasks for i_flags field in reiserfs-specific part of inode */
 typedef enum {
     /** this says what format of key do all items (but stat data) of
@@ -48,7 +50,7 @@ struct reiserfs_inode_info {
     ** needs to be committed in order for this inode to be properly
     ** flushed */
     unsigned long i_trans_id ;
-    unsigned long i_trans_index ;
+    struct reiserfs_journal_list *i_jl;
     struct inode vfs_inode;
 };
 
diff -puN include/linux/reiserfs_fs_sb.h~reiserfs-logging include/linux/reiserfs_fs_sb.h
--- 25/include/linux/reiserfs_fs_sb.h~reiserfs-logging	Wed Mar 24 15:14:39 2004
+++ 25-akpm/include/linux/reiserfs_fs_sb.h	Wed Mar 24 15:14:39 2004
@@ -106,7 +106,6 @@ typedef enum {
 #define JOURNAL_MAX_CNODE   1500 /* max cnodes to allocate. */
 #define JOURNAL_HASH_SIZE 8192   
 #define JOURNAL_NUM_BITMAPS 5 /* number of copies of the bitmaps to have floating.  Must be >= 2 */
-#define JOURNAL_LIST_COUNT 64
 
 /* these are bh_state bit flag offset numbers, for use in the buffer head */
 
@@ -121,6 +120,7 @@ typedef enum {
 */
 #define BH_JPrepared 20		/* block has been prepared for the log */
 #define BH_JRestore_dirty 22    /* restore the dirty bit later */
+#define BH_JTest 23             /* debugging use only */
 
 /* One of these for every block in every transaction
 ** Each one is in two hash tables.  First, a hash of the current transaction, and after journal_end, a
@@ -154,26 +154,6 @@ struct reiserfs_list_bitmap {
 } ;
 
 /*
-** transaction handle which is passed around for all journal calls
-*/
-struct reiserfs_transaction_handle {
-  struct super_block *t_super ; /* super for this FS when journal_begin was
-				   called. saves calls to reiserfs_get_super
-				   also used by nested transactions to make
-				   sure they are nesting on the right FS
-				   _must_ be first in the handle
-				*/
-  int t_refcount;
-  int t_blocks_logged ;         /* number of blocks this writer has logged */
-  int t_blocks_allocated ;      /* number of blocks this writer allocated */
-  unsigned long t_trans_id ;    /* sanity check, equals the current trans id */
-  void *t_handle_save ;		/* save existing current->journal_info */
-  int displace_new_blocks:1;	/* if new block allocation occurres, that block
-				   should be displaced from others */
-
-} ;
-
-/*
 ** one of these for each transaction.  The most important part here is the j_realblock.
 ** this list of cnodes is used to hash all the blocks in all the commits, to mark all the
 ** real buffer heads dirty once all the commits hit the disk,
@@ -181,23 +161,25 @@ struct reiserfs_transaction_handle {
 ** to be overwritten */
 struct reiserfs_journal_list {
   unsigned long j_start ;
+  unsigned long j_state;
   unsigned long j_len ;
   atomic_t j_nonzerolen ;
   atomic_t j_commit_left ;
-  atomic_t j_flushing ;
-  atomic_t j_commit_flushing ;
   atomic_t j_older_commits_done ;      /* all commits older than this on disk*/
+  struct semaphore j_commit_lock;
   unsigned long j_trans_id ;
   time_t j_timestamp ;
   struct reiserfs_list_bitmap *j_list_bitmap ;
   struct buffer_head *j_commit_bh ; /* commit buffer head */
   struct reiserfs_journal_cnode *j_realblock  ;
   struct reiserfs_journal_cnode *j_freedlist ; /* list of buffers that were freed during this trans.  free each of these on flush */
-  wait_queue_head_t j_commit_wait ; /* wait for all the commit blocks to be flushed */
-  wait_queue_head_t j_flush_wait ; /* wait for all the real blocks to be flushed */
-} ;
+  /* time ordered list of all active transactions */
+  struct list_head j_list;
 
-struct reiserfs_page_list  ; /* defined in reiserfs_fs.h */
+  /* time ordered list of all transactions we haven't tried to flush yet */
+  struct list_head j_working_list;
+  int j_refcount;
+} ;
 
 struct reiserfs_journal {
   struct buffer_head ** j_ap_blocks ; /* journal blocks on disk */
@@ -220,16 +202,11 @@ struct reiserfs_journal {
   unsigned long j_last_flush_trans_id ;    /* last fully flushed journal timestamp */
   struct buffer_head *j_header_bh ;   
 
-  /* j_flush_pages must be flushed before the current transaction can
-  ** commit
-  */
-  struct reiserfs_page_list *j_flush_pages ;
   time_t j_trans_start_time ;         /* time this transaction started */
-  wait_queue_head_t j_wait ;         /* wait  journal_end to finish I/O */
-  atomic_t j_wlock ;                       /* lock for j_wait */
+  struct semaphore j_lock;
+  struct semaphore j_flush_sem;
   wait_queue_head_t j_join_wait ;    /* wait for current transaction to finish before starting new one */
   atomic_t j_jlock ;                       /* lock for j_join_wait */
-  int j_journal_list_index ;	      /* journal list number of the current trans */
   int j_list_bitmap_index ;	      /* number of next list bitmap to use */
   int j_must_wait ;		       /* no more journal begins allowed. MUST sleep on j_join_wait */
   int j_next_full_flush ;             /* next journal_end will flush all journal list */
@@ -246,19 +223,37 @@ struct reiserfs_journal {
   struct reiserfs_journal_cnode *j_cnode_free_list ;
   struct reiserfs_journal_cnode *j_cnode_free_orig ; /* orig pointer returned from vmalloc */
 
+  struct reiserfs_journal_list *j_current_jl;
   int j_free_bitmap_nodes ;
   int j_used_bitmap_nodes ;
+
+  int j_num_lists;      /* total number of active transactions */
+  int j_num_work_lists; /* number that need attention from kreiserfsd */
+
+  /* debugging to make sure things are flushed in order */
+  int j_last_flush_id;
+
+  /* debugging to make sure things are committed in order */
+  int j_last_commit_id;
+
   struct list_head j_bitmap_nodes ;
   struct list_head j_dirty_buffers ;
   spinlock_t j_dirty_buffers_lock ; /* protects j_dirty_buffers */
+
+  /* list of all active transactions */
+  struct list_head j_journal_list;
+  /* lists that haven't been touched by writeback attempts */
+  struct list_head j_working_list;
+
   struct reiserfs_list_bitmap j_list_bitmap[JOURNAL_NUM_BITMAPS] ;	/* array of bitmaps to record the deleted blocks */
-  struct reiserfs_journal_list j_journal_list[JOURNAL_LIST_COUNT] ;	    /* array of all the journal lists */
   struct reiserfs_journal_cnode *j_hash_table[JOURNAL_HASH_SIZE] ; 	    /* hash table for real buffer heads in current trans */ 
   struct reiserfs_journal_cnode *j_list_hash_table[JOURNAL_HASH_SIZE] ; /* hash table for all the real buffer heads in all 
   										the transactions */
   struct list_head j_prealloc_list;     /* list of inodes which have preallocated blocks */
   unsigned long j_max_trans_size ;
   unsigned long j_max_batch_size ;
+
+  struct work_struct j_work;
 };
 
 #define JOURNAL_DESC_MAGIC "ReIsErLB" /* ick.  magic string to find desc blocks in the journal */
@@ -417,7 +412,6 @@ struct reiserfs_sb_info
 #define REISERFS_LARGETAIL 0  /* large tails will be created in a session */
 #define REISERFS_SMALLTAIL 17  /* small (for files less than block size) tails will be created in a session */
 #define REPLAYONLY 3 /* replay journal and return 0. Use by fsck */
-#define REISERFS_NOLOG 4      /* -o nolog: turn journalling off */
 #define REISERFS_CONVERT 5    /* -o conv: causes conversion of old
                                  format super block to the new
                                  format. If not specified - old
@@ -473,8 +467,6 @@ struct reiserfs_sb_info
 
 void reiserfs_file_buffer (struct buffer_head * bh, int list);
 extern struct file_system_type reiserfs_fs_type;
-int journal_mark_dirty(struct reiserfs_transaction_handle *, struct super_block *, struct buffer_head *bh) ;
-int flush_old_commits(struct super_block *s, int) ;
 int reiserfs_resize(struct super_block *, unsigned long) ;
 
 #define CARRY_ON                0
@@ -484,8 +476,6 @@ int reiserfs_resize(struct super_block *
 #define SB_BUFFER_WITH_SB(s) (REISERFS_SB(s)->s_sbh)
 #define SB_JOURNAL(s) (REISERFS_SB(s)->s_journal)
 #define SB_JOURNAL_1st_RESERVED_BLOCK(s) (SB_JOURNAL(s)->j_1st_reserved_block)
-#define SB_JOURNAL_LIST(s) (SB_JOURNAL(s)->j_journal_list)
-#define SB_JOURNAL_LIST_INDEX(s) (SB_JOURNAL(s)->j_journal_list_index) 
 #define SB_JOURNAL_LEN_FREE(s) (SB_JOURNAL(s)->j_journal_len_free) 
 #define SB_AP_BITMAP(s) (REISERFS_SB(s)->s_ap_bitmap)
 

_