From: Chris Mason <mason@suse.com>

reiserfs data=ordered support.


---

 25-akpm/fs/buffer.c                    |    1 
 25-akpm/fs/reiserfs/file.c             |  167 ++++++++++---
 25-akpm/fs/reiserfs/inode.c            |  285 +++++++++++++++++-----
 25-akpm/fs/reiserfs/ioctl.c            |    7 
 25-akpm/fs/reiserfs/journal.c          |  411 +++++++++++++++++++++++++++------
 25-akpm/fs/reiserfs/super.c            |   52 ++++
 25-akpm/include/linux/reiserfs_fs.h    |   44 +++
 25-akpm/include/linux/reiserfs_fs_sb.h |   69 ++---
 8 files changed, 805 insertions(+), 231 deletions(-)

diff -puN fs/buffer.c~reiserfs-jh-2 fs/buffer.c
--- 25/fs/buffer.c~reiserfs-jh-2	2004-04-06 20:57:15.716002872 -0700
+++ 25-akpm/fs/buffer.c	2004-04-06 20:57:15.732000440 -0700
@@ -1583,6 +1583,7 @@ int try_to_release_page(struct page *pag
 		return mapping->a_ops->releasepage(page, gfp_mask);
 	return try_to_free_buffers(page);
 }
+EXPORT_SYMBOL(try_to_release_page);
 
 /**
  * block_invalidatepage - invalidate part of all of a buffer-backed page
diff -puN fs/reiserfs/file.c~reiserfs-jh-2 fs/reiserfs/file.c
--- 25/fs/reiserfs/file.c~reiserfs-jh-2	2004-04-06 20:57:15.718002568 -0700
+++ 25-akpm/fs/reiserfs/file.c	2004-04-06 20:57:15.735999832 -0700
@@ -9,6 +9,8 @@
 #include <asm/uaccess.h>
 #include <linux/pagemap.h>
 #include <linux/writeback.h>
+#include <linux/blkdev.h>
+#include <linux/buffer_head.h>
 
 /*
 ** We pack the tails of files on file close, not at the time they are written.
@@ -150,6 +152,7 @@ out:
    Maps all unmapped but prepared pages from the list.
    Updates metadata with newly allocated blocknumbers as needed */
 int reiserfs_allocate_blocks_for_region(
+				struct reiserfs_transaction_handle *th,
 				struct inode *inode, /* Inode we work with */
 				loff_t pos, /* Writing position */
 				int num_pages, /* number of pages write going
@@ -167,7 +170,6 @@ int reiserfs_allocate_blocks_for_region(
     struct cpu_key key; // cpu key of item that we are going to deal with
     struct item_head *ih; // pointer to item head that we are going to deal with
     struct buffer_head *bh; // Buffer head that contains items that we are going to deal with
-    struct reiserfs_transaction_handle th; // transaction handle for transaction we are going to create.
     __u32 * item; // pointer to item we are going to deal with
     INITIALIZE_PATH(path); // path to item, that we are going to deal with.
     b_blocknr_t allocated_blocks[blocks_to_allocate]; // Pointer to a place where allocated blocknumbers would be stored. Right now statically allocated, later that will change.
@@ -194,7 +196,7 @@ int reiserfs_allocate_blocks_for_region(
     /* If we came here, it means we absolutely need to open a transaction,
        since we need to allocate some blocks */
     reiserfs_write_lock(inode->i_sb); // Journaling stuff and we need that.
-    journal_begin(&th, inode->i_sb, JOURNAL_PER_BALANCE_CNT * 3 + 1); // Wish I know if this number enough
+    journal_begin(th, inode->i_sb, JOURNAL_PER_BALANCE_CNT * 3 + 1); // Wish I know if this number enough
     reiserfs_update_inode_transaction(inode) ;
 
     /* Look for the in-tree position of our write, need path for block allocator */
@@ -206,7 +208,7 @@ int reiserfs_allocate_blocks_for_region(
    
     /* Allocate blocks */
     /* First fill in "hint" structure for block allocator */
-    hint.th = &th; // transaction handle.
+    hint.th = th; // transaction handle.
     hint.path = &path; // Path, so that block allocator can determine packing locality or whatever it needs to determine.
     hint.inode = inode; // Inode is needed by block allocator too.
     hint.search_start = 0; // We have no hint on where to search free blocks for block allocator.
@@ -222,7 +224,7 @@ int reiserfs_allocate_blocks_for_region(
 	    /* We flush the transaction in case of no space. This way some
 	       blocks might become free */
 	    SB_JOURNAL(inode->i_sb)->j_must_wait = 1;
-	    restart_transaction(&th, inode, &path);
+	    restart_transaction(th, inode, &path);
 
 	    /* We might have scheduled, so search again */
 	    res = search_for_position_by_key(inode->i_sb, &key, &path);
@@ -296,7 +298,7 @@ int reiserfs_allocate_blocks_for_region(
 		    /* Ok, there is existing indirect item already. Need to append it */
 		    /* Calculate position past inserted item */
 		    make_cpu_key( &key, inode, le_key_k_offset( get_inode_item_key_version(inode), &(ih->ih_key)) + op_bytes_number(ih, inode->i_sb->s_blocksize), TYPE_INDIRECT, 3);
-		    res = reiserfs_paste_into_item( &th, &path, &key, (char *)zeros, UNFM_P_SIZE*to_paste);
+		    res = reiserfs_paste_into_item( th, &path, &key, (char *)zeros, UNFM_P_SIZE*to_paste);
 		    if ( res ) {
 			kfree(zeros);
 			goto error_exit_free_blocks;
@@ -326,7 +328,7 @@ int reiserfs_allocate_blocks_for_region(
 		        kfree(zeros);
 			goto error_exit_free_blocks;
 		    }
-		    res = reiserfs_insert_item( &th, &path, &key, &ins_ih, (char *)zeros);
+		    res = reiserfs_insert_item( th, &path, &key, &ins_ih, (char *)zeros);
 		} else {
 		    reiserfs_panic(inode->i_sb, "green-9011: Unexpected key type %K\n", &key);
 		}
@@ -336,8 +338,8 @@ int reiserfs_allocate_blocks_for_region(
 		}
 		/* Now we want to check if transaction is too full, and if it is
 		   we restart it. This will also free the path. */
-		if (journal_transaction_should_end(&th, th.t_blocks_allocated))
-		    restart_transaction(&th, inode, &path);
+		if (journal_transaction_should_end(th, th->t_blocks_allocated))
+		    restart_transaction(th, inode, &path);
 
 		/* Well, need to recalculate path and stuff */
 		set_cpu_key_k_offset( &key, cpu_key_k_offset(&key) + (to_paste << inode->i_blkbits));
@@ -368,7 +370,7 @@ retry:
 	       one. */
 	    /* First if we are already modifying current item, log it */
 	    if ( modifying_this_item ) {
-		journal_mark_dirty (&th, inode->i_sb, bh);
+		journal_mark_dirty (th, inode->i_sb, bh);
 		modifying_this_item = 0;
 	    }
 	    /* Then set the key to look for a new indirect item (offset of old
@@ -432,7 +434,7 @@ retry:
 
     if ( modifying_this_item ) { // We need to log last-accessed block, if it
 				 // was modified, but not logged yet.
-	journal_mark_dirty (&th, inode->i_sb, bh);
+	journal_mark_dirty (th, inode->i_sb, bh);
     }
 
     if ( curr_block < blocks_to_allocate ) {
@@ -443,7 +445,7 @@ retry:
 	    // position. We do not need to recalculate path as it should
 	    // already point to correct place.
 	    make_cpu_key( &key, inode, le_key_k_offset( get_inode_item_key_version(inode), &(ih->ih_key)) + op_bytes_number(ih, inode->i_sb->s_blocksize), TYPE_INDIRECT, 3);
-	    res = reiserfs_paste_into_item( &th, &path, &key, (char *)(allocated_blocks+curr_block), UNFM_P_SIZE*(blocks_to_allocate-curr_block));
+	    res = reiserfs_paste_into_item( th, &path, &key, (char *)(allocated_blocks+curr_block), UNFM_P_SIZE*(blocks_to_allocate-curr_block));
 	    if ( res ) {
 		goto error_exit_free_blocks;
 	    }
@@ -474,29 +476,18 @@ retry:
 		goto error_exit_free_blocks;
 	    }
 	    /* Insert item into the tree with the data as its body */
-	    res = reiserfs_insert_item( &th, &path, &key, &ins_ih, (char *)(allocated_blocks+curr_block));
+	    res = reiserfs_insert_item( th, &path, &key, &ins_ih, (char *)(allocated_blocks+curr_block));
 	} else {
 	    reiserfs_panic(inode->i_sb, "green-9010: unexpected item type for key %K\n",&key);
 	}
     }
 
-    /* Now the final thing, if we have grew the file, we must update it's size*/
-    if ( pos + write_bytes > inode->i_size) {
-	inode->i_size = pos + write_bytes; // Set new size
-	/* If the file have grown so much that tail packing is no longer possible, reset
-	   "need to pack" flag */
-	if ( (have_large_tails (inode->i_sb) && inode->i_size > i_block_size (inode)*4) ||
-	     (have_small_tails (inode->i_sb) && inode->i_size > i_block_size(inode)) )
-	    REISERFS_I(inode)->i_flags &= ~i_pack_on_close_mask ;
-    }
-
-    /* Amount of on-disk blocks used by file have changed, update it */
+    // the caller is responsible for closing the transaction
+    // unless we return an error, they are also responsible for logging
+    // the inode.
+    //
     inode->i_blocks += blocks_to_allocate << (inode->i_blkbits - 9);
-    reiserfs_update_sd(&th, inode); // And update on-disk metadata
-    // finish all journal stuff now, We are not going to play with metadata
-    // anymore.
     pathrelse(&path);
-    journal_end(&th, inode->i_sb, JOURNAL_PER_BALANCE_CNT * 3 + 1);
     reiserfs_write_unlock(inode->i_sb);
 
     // go through all the pages/buffers and map the buffers to newly allocated
@@ -527,6 +518,7 @@ retry:
 	    if ( !buffer_mapped(bh) ) { // Ok, unmapped buffer, need to map it
 		map_bh( bh, inode->i_sb, le32_to_cpu(allocated_blocks[curr_block]));
 		curr_block++;
+		set_buffer_new(bh);
 	    }
 	}
     }
@@ -540,10 +532,11 @@ error_exit_free_blocks:
     pathrelse(&path);
     // free blocks
     for( i = 0; i < blocks_to_allocate; i++ )
-	reiserfs_free_block( &th, le32_to_cpu(allocated_blocks[i]));
+	reiserfs_free_block(th, le32_to_cpu(allocated_blocks[i]));
 
 error_exit:
-    journal_end(&th, inode->i_sb, JOURNAL_PER_BALANCE_CNT * 3 + 1);
+    reiserfs_update_sd(th, inode); // update any changes we made to blk count
+    journal_end(th, inode->i_sb, JOURNAL_PER_BALANCE_CNT * 3 + 1);
     reiserfs_write_unlock(inode->i_sb);
 
     return res;
@@ -603,12 +596,63 @@ int reiserfs_copy_from_user_to_file_regi
     return page_fault?-EFAULT:0;
 }
 
+/* taken fs/buffer.c:__block_commit_write */
+int reiserfs_commit_page(struct inode *inode, struct page *page,
+		unsigned from, unsigned to)
+{
+    unsigned block_start, block_end;
+    int partial = 0;
+    unsigned blocksize;
+    struct buffer_head *bh, *head;
+    unsigned long i_size_index = inode->i_size >> PAGE_CACHE_SHIFT;
+    int new;
+
+    blocksize = 1 << inode->i_blkbits;
+
+    for(bh = head = page_buffers(page), block_start = 0;
+        bh != head || !block_start;
+	block_start=block_end, bh = bh->b_this_page)
+    {
+
+	new = buffer_new(bh);
+	clear_buffer_new(bh);
+	block_end = block_start + blocksize;
+	if (block_end <= from || block_start >= to) {
+	    if (!buffer_uptodate(bh))
+		    partial = 1;
+	} else {
+	    set_buffer_uptodate(bh);
+	    if (!buffer_dirty(bh)) {
+		mark_buffer_dirty(bh);
+		/* do data=ordered on any page past the end
+		 * of file and any buffer marked BH_New.
+		 */
+		if (reiserfs_data_ordered(inode->i_sb) &&
+		    (new || page->index >= i_size_index)) {
+		    reiserfs_add_ordered_list(inode, bh);
+	        }
+	    }
+	}
+    }
+
+    /*
+     * If this is a partial write which happened to make all buffers
+     * uptodate then we can optimize away a bogus readpage() for
+     * the next read(). Here we 'discover' whether the page went
+     * uptodate as a result of this (potentially partial) write.
+     */
+    if (!partial)
+	SetPageUptodate(page);
+    return 0;
+}
 
 
 /* Submit pages for write. This was separated from actual file copying
    because we might want to allocate block numbers in-between.
    This function assumes that caller will adjust file size to correct value. */
 int reiserfs_submit_file_region_for_write(
+				struct reiserfs_transaction_handle *th,
+				struct inode *inode,
 				loff_t pos, /* Writing position offset */
 				int num_pages, /* Number of pages to write */
 				int write_bytes, /* number of bytes to write */
@@ -619,12 +663,14 @@ int reiserfs_submit_file_region_for_writ
     int retval = 0; // Return value we are going to return.
     int i; // loop counter
     int offset; // Writing offset in page.
+    int orig_write_bytes = write_bytes;
+    int sd_update = 0;
 
     for ( i = 0, offset = (pos & (PAGE_CACHE_SIZE-1)); i < num_pages ; i++,offset=0) {
 	int count = min_t(int,PAGE_CACHE_SIZE-offset,write_bytes); // How much of bytes to write to this page
 	struct page *page=prepared_pages[i]; // Current page we process.
 
-	status = block_commit_write(page, offset, offset+count);
+	status = reiserfs_commit_page(inode, page, offset, offset+count);
 	if ( status )
 	    retval = status; // To not overcomplicate matters We are going to
 			     // submit all the pages even if there was error.
@@ -636,6 +682,41 @@ int reiserfs_submit_file_region_for_writ
 			  // to grab_cache_page
 	page_cache_release(page);
     }
+    /* now that we've gotten all the ordered buffers marked dirty,
+     * we can safely update i_size and close any running transaction
+     */
+    if ( pos + orig_write_bytes > inode->i_size) {
+	inode->i_size = pos + orig_write_bytes; // Set new size
+	/* If the file have grown so much that tail packing is no
+	 * longer possible, reset "need to pack" flag */
+	if ( (have_large_tails (inode->i_sb) &&
+	      inode->i_size > i_block_size (inode)*4) ||
+	     (have_small_tails (inode->i_sb) &&
+	     inode->i_size > i_block_size(inode)) )
+	    REISERFS_I(inode)->i_flags &= ~i_pack_on_close_mask ;
+        else if ( (have_large_tails (inode->i_sb) &&
+	          inode->i_size < i_block_size (inode)*4) ||
+	          (have_small_tails (inode->i_sb) &&
+		  inode->i_size < i_block_size(inode)) )
+	    REISERFS_I(inode)->i_flags |= i_pack_on_close_mask ;
+
+	if (th->t_trans_id) {
+	    reiserfs_write_lock(inode->i_sb);
+	    reiserfs_update_sd(th, inode); // And update on-disk metadata
+	    reiserfs_write_unlock(inode->i_sb);
+	} else
+	    inode->i_sb->s_op->dirty_inode(inode);
+
+        sd_update = 1;
+    }
+    if (th->t_trans_id) {
+	reiserfs_write_lock(inode->i_sb);
+	if (!sd_update)
+	    reiserfs_update_sd(th, inode);
+	journal_end(th, th->t_super, th->t_blocks_allocated);
+	reiserfs_write_unlock(inode->i_sb);
+    }
+    th->t_trans_id = 0;
     return retval;
 }
 
@@ -1003,19 +1084,18 @@ ssize_t reiserfs_file_write( struct file
     loff_t pos; // Current position in the file.
     size_t res; // return value of various functions that we call.
     struct inode *inode = file->f_dentry->d_inode; // Inode of the file that we are writing to.
-    struct page * prepared_pages[REISERFS_WRITE_PAGES_AT_A_TIME];
 				/* To simplify coding at this time, we store
 				   locked pages in array for now */
-    if ( count <= PAGE_CACHE_SIZE )
-        return generic_file_write(file, buf, count, ppos);
+    struct page * prepared_pages[REISERFS_WRITE_PAGES_AT_A_TIME];
+    struct reiserfs_transaction_handle th;
+    th.t_trans_id = 0;
 
-    if ( file->f_flags & O_DIRECT) { // Direct IO needs some special threating.
+    if ( file->f_flags & O_DIRECT) { // Direct IO needs treatment
 	int result, after_file_end = 0;
 	if ( (*ppos + count >= inode->i_size) || (file->f_flags & O_APPEND) ) {
 	    /* If we are appending a file, we need to put this savelink in here.
 	       If we will crash while doing direct io, finish_unfinished will
 	       cut the garbage from the file end. */
-	    struct reiserfs_transaction_handle th;
 	    reiserfs_write_lock(inode->i_sb);
 	    journal_begin(&th, inode->i_sb,  JOURNAL_PER_BALANCE_CNT );
 	    reiserfs_update_inode_transaction(inode);
@@ -1040,7 +1120,6 @@ ssize_t reiserfs_file_write( struct file
 	return result;
     }
 
-
     if ( unlikely((ssize_t) count < 0 ))
         return -EINVAL;
 
@@ -1146,11 +1225,7 @@ ssize_t reiserfs_file_write( struct file
 
 	if ( blocks_to_allocate > 0) {/*We only allocate blocks if we need to*/
 	    /* Fill in all the possible holes and append the file if needed */
-	    res = reiserfs_allocate_blocks_for_region(inode, pos, num_pages, write_bytes, prepared_pages, blocks_to_allocate);
-	} else if ( pos + write_bytes > inode->i_size ) {
-	    /* File might have grown even though no new blocks were added */
-	    inode->i_size = pos + write_bytes;
-	    inode->i_sb->s_op->dirty_inode(inode);
+	    res = reiserfs_allocate_blocks_for_region(&th, inode, pos, num_pages, write_bytes, prepared_pages, blocks_to_allocate);
 	}
 
 	/* well, we have allocated the blocks, so it is time to free
@@ -1173,7 +1248,8 @@ ssize_t reiserfs_file_write( struct file
 	}
 
 	/* Send the pages to disk and unlock them. */
-	res = reiserfs_submit_file_region_for_write(pos, num_pages, write_bytes, prepared_pages);
+	res = reiserfs_submit_file_region_for_write(&th, inode, pos, num_pages,
+	                                            write_bytes,prepared_pages);
 	if ( res )
 	    break;
 
@@ -1184,10 +1260,17 @@ ssize_t reiserfs_file_write( struct file
 	balance_dirty_pages_ratelimited(inode->i_mapping);
     }
 
+    /* this is only true on error */
+    if (th.t_trans_id) {
+        reiserfs_write_lock(inode->i_sb);
+	journal_end(&th, th.t_super, th.t_blocks_allocated);
+        reiserfs_write_unlock(inode->i_sb);
+    }
     if ((file->f_flags & O_SYNC) || IS_SYNC(inode))
 	res = generic_osync_inode(inode, file->f_mapping, OSYNC_METADATA|OSYNC_DATA);
 
     up(&inode->i_sem);
+    reiserfs_async_progress_wait(inode->i_sb);
     return (already_written != 0)?already_written:res;
 
 out:
diff -puN fs/reiserfs/inode.c~reiserfs-jh-2 fs/reiserfs/inode.c
--- 25/fs/reiserfs/inode.c~reiserfs-jh-2	2004-04-06 20:57:15.719002416 -0700
+++ 25-akpm/fs/reiserfs/inode.c	2004-04-06 20:57:15.740999072 -0700
@@ -22,9 +22,12 @@ extern int reiserfs_default_io_size; /* 
 #define GET_BLOCK_NO_HOLE 2   /* return -ENOENT for file holes */
 #define GET_BLOCK_READ_DIRECT 4  /* read the tail if indirect item not found */
 #define GET_BLOCK_NO_ISEM     8 /* i_sem is not held, don't preallocate */
+#define GET_BLOCK_NO_DANGLE   16 /* don't leave any transactions running */
 
 static int reiserfs_get_block (struct inode * inode, sector_t block,
 			       struct buffer_head * bh_result, int create);
+static int reiserfs_commit_write(struct file *f, struct page *page,
+                                 unsigned from, unsigned to);
 
 void reiserfs_delete_inode (struct inode * inode)
 {
@@ -103,12 +106,6 @@ inline void make_le_item_head (struct it
     put_ih_entry_count( ih, entry_count );
 }
 
-static void add_to_flushlist(struct inode *inode, struct buffer_head *bh) {
-    struct reiserfs_journal *j = SB_JOURNAL(inode->i_sb) ;
-
-    buffer_insert_list(&j->j_dirty_buffers_lock, bh, &j->j_dirty_buffers) ;
-}
-
 //
 // FIXME: we might cache recently accessed indirect item
 
@@ -437,7 +434,8 @@ static int reiserfs_get_blocks_direct_io
        reiserfs_get_block() */
     bh_result->b_size = (1 << inode->i_blkbits);
 
-    ret = reiserfs_get_block(inode, iblock, bh_result, create) ;
+    ret = reiserfs_get_block(inode, iblock, bh_result,
+                             create | GET_BLOCK_NO_DANGLE) ;
 
     /* don't allow direct io onto tail pages */
     if (ret == 0 && buffer_mapped(bh_result) && bh_result->b_blocknr == 0) {
@@ -510,15 +508,14 @@ static int convert_tail_for_hole(struct 
     ** won't trigger a get_block in this case.
     */
     fix_tail_page_for_writing(tail_page) ;
-    retval = block_prepare_write(tail_page, tail_start, tail_end, 
-                                 reiserfs_get_block) ; 
+    retval = reiserfs_prepare_write(NULL, tail_page, tail_start, tail_end);
     if (retval)
         goto unlock ;
 
     /* tail conversion might change the data in the page */
     flush_dcache_page(tail_page) ;
 
-    retval = generic_commit_write(NULL, tail_page, tail_start, tail_end) ;
+    retval = reiserfs_commit_write(NULL, tail_page, tail_start, tail_end) ;
 
 unlock:
     if (tail_page != hole_page) {
@@ -557,7 +554,7 @@ int reiserfs_get_block (struct inode * i
     __u32 * item;
     int done;
     int fs_gen;
-    struct reiserfs_transaction_handle th ;
+    struct reiserfs_transaction_handle *th = NULL;
     /* space reserved in transaction batch: 
         . 3 balancings in direct->indirect conversion
         . 1 block involved into reiserfs_update_sd()
@@ -565,12 +562,11 @@ int reiserfs_get_block (struct inode * i
        can incur (much) more that 3 balancings. */
     int jbegin_count = JOURNAL_PER_BALANCE_CNT * 3 + 1;
     int version;
-    int transaction_started = 0 ;
+    int dangle = 1;
     loff_t new_offset = (((loff_t)block) << inode->i_sb->s_blocksize_bits) + 1 ;
 
 				/* bad.... */
     reiserfs_write_lock(inode->i_sb);
-    th.t_trans_id = 0 ;
     version = get_inode_item_key_version (inode);
 
     if (block < 0) {
@@ -594,6 +590,13 @@ int reiserfs_get_block (struct inode * i
 	reiserfs_write_unlock(inode->i_sb);
 	return ret;
     }
+    /*
+     * if we're already in a transaction, make sure to close
+     * any new transactions we start in this func
+     */
+    if ((create & GET_BLOCK_NO_DANGLE) ||
+        reiserfs_transaction_running(inode->i_sb))
+        dangle = 0;
 
     /* If file is of such a size, that it might have a tail and tails are enabled
     ** we should mark it as possibly needing tail packing on close
@@ -606,9 +609,13 @@ int reiserfs_get_block (struct inode * i
     make_cpu_key (&key, inode, new_offset,
 		  TYPE_ANY, 3/*key length*/);
     if ((new_offset + inode->i_sb->s_blocksize - 1) > inode->i_size) {
-	journal_begin(&th, inode->i_sb, jbegin_count) ;
+start_trans:
+	th = reiserfs_persistent_transaction(inode->i_sb, jbegin_count);
+	if (!th) {
+	    retval = -ENOMEM;
+	    goto failure;
+	}
 	reiserfs_update_inode_transaction(inode) ;
-	transaction_started = 1 ;
     }
  research:
 
@@ -628,23 +635,21 @@ int reiserfs_get_block (struct inode * i
 
     if (allocation_needed (retval, allocated_block_nr, ih, item, pos_in_item)) {
 	/* we have to allocate block for the unformatted node */
-	if (!transaction_started) {
+	if (!th) {
 	    pathrelse(&path) ;
-	    journal_begin(&th, inode->i_sb, jbegin_count) ;
-	    reiserfs_update_inode_transaction(inode) ;
-	    transaction_started = 1 ;
-	    goto research ;
+	    goto start_trans;
 	}
 
-	repeat = _allocate_block(&th, block, inode, &allocated_block_nr, &path, create);
+	repeat = _allocate_block(th, block, inode, &allocated_block_nr, &path, create);
 
 	if (repeat == NO_DISK_SPACE) {
 	    /* restart the transaction to give the journal a chance to free
 	    ** some blocks.  releases the path, so we have to go back to
 	    ** research if we succeed on the second try
 	    */
-	    restart_transaction(&th, inode, &path) ; 
-	    repeat = _allocate_block(&th, block, inode, &allocated_block_nr, NULL, create);
+	    SB_JOURNAL(inode->i_sb)->j_next_async_flush = 1;
+	    restart_transaction(th, inode, &path) ;
+	    repeat = _allocate_block(th, block, inode, &allocated_block_nr, NULL, create);
 
 	    if (repeat != NO_DISK_SPACE) {
 		goto research ;
@@ -672,16 +677,18 @@ int reiserfs_get_block (struct inode * i
 		goto research;
 	    }
 	    set_buffer_new(bh_result);
+	    if (buffer_dirty(bh_result) && reiserfs_data_ordered(inode->i_sb))
+	    	reiserfs_add_ordered_list(inode, bh_result);
 	    put_block_num(item, pos_in_item, allocated_block_nr) ;
             unfm_ptr = allocated_block_nr;
-	    journal_mark_dirty (&th, inode->i_sb, bh);
+	    journal_mark_dirty (th, inode->i_sb, bh);
 	    inode->i_blocks += (inode->i_sb->s_blocksize / 512) ;
-	    reiserfs_update_sd(&th, inode) ;
+	    reiserfs_update_sd(th, inode) ;
 	}
 	set_block_dev_mapped(bh_result, unfm_ptr, inode);
 	pathrelse (&path);
-	if (transaction_started)
-	    journal_end(&th, inode->i_sb, jbegin_count) ;
+	if (!dangle && th)
+	    reiserfs_end_persistent_transaction(th);
 
 	reiserfs_write_unlock(inode->i_sb);
 	 
@@ -692,16 +699,9 @@ int reiserfs_get_block (struct inode * i
 	return 0;
     }
 
-    if (!transaction_started) {
-	/* if we don't pathrelse, we could vs-3050 on the buffer if
-	** someone is waiting for it (they can't finish until the buffer
-	** is released, we can start a new transaction until they finish)
-	*/
+    if (!th) {
 	pathrelse(&path) ;
-	journal_begin(&th, inode->i_sb, jbegin_count) ;
-	reiserfs_update_inode_transaction(inode) ;
-	transaction_started = 1 ;
-	goto research;
+	goto start_trans;
     }
 
     /* desired position is not found or is in the direct item. We have
@@ -729,9 +729,9 @@ int reiserfs_get_block (struct inode * i
 	    set_cpu_key_k_offset (&tmp_key, 1);
 	    PATH_LAST_POSITION(&path) ++;
 
-	    retval = reiserfs_insert_item (&th, &path, &tmp_key, &tmp_ih, (char *)&unp);
+	    retval = reiserfs_insert_item (th, &path, &tmp_key, &tmp_ih, (char *)&unp);
 	    if (retval) {
-		reiserfs_free_block (&th, allocated_block_nr);
+		reiserfs_free_block (th, allocated_block_nr);
 		goto failure; // retval == -ENOSPC or -EIO or -EEXIST
 	    }
 	    if (unp)
@@ -755,8 +755,14 @@ int reiserfs_get_block (struct inode * i
 		   node. FIXME: this should also get into page cache */
 
 		pathrelse(&path) ;
-		journal_end(&th, inode->i_sb, jbegin_count) ;
-		transaction_started = 0 ;
+		/*
+		 * ugly, but we can only end the transaction if
+		 * we aren't nested
+		 */
+		if (th->t_refcount == 1) {
+		    reiserfs_end_persistent_transaction(th);
+		    th = NULL;
+		}
 
 		retval = convert_tail_for_hole(inode, bh_result, tail_offset) ;
 		if (retval) {
@@ -764,18 +770,19 @@ int reiserfs_get_block (struct inode * i
 			printk("clm-6004: convert tail failed inode %lu, error %d\n", inode->i_ino, retval) ;
 		    if (allocated_block_nr) {
 			/* the bitmap, the super, and the stat data == 3 */
-			journal_begin(&th, inode->i_sb, 3) ;
-			reiserfs_free_block (&th, allocated_block_nr);
-			transaction_started = 1 ;
+			if (!th)
+			    th = reiserfs_persistent_transaction(inode->i_sb,3);
+			if (th)
+			    reiserfs_free_block (th, allocated_block_nr);
 		    }
 		    goto failure ;
 		}
 		goto research ;
 	    }
-	    retval = direct2indirect (&th, inode, &path, unbh, tail_offset);
+	    retval = direct2indirect (th, inode, &path, unbh, tail_offset);
 	    if (retval) {
 		reiserfs_unmap_buffer(unbh);
-		reiserfs_free_block (&th, allocated_block_nr);
+		reiserfs_free_block (th, allocated_block_nr);
 		goto failure;
 	    }
 	    /* it is important the set_buffer_uptodate is done after
@@ -795,7 +802,7 @@ int reiserfs_get_block (struct inode * i
 		/* we've converted the tail, so we must
 		** flush unbh before the transaction commits
 		*/
-		add_to_flushlist(inode, unbh) ;
+		reiserfs_add_tail_list(inode, unbh) ;
 
 		/* mark it dirty now to prevent commit_write from adding
 		** this buffer to the inode's dirty buffer list
@@ -858,13 +865,13 @@ int reiserfs_get_block (struct inode * i
 		   only have space for one block */
 		blocks_needed=max_to_insert?max_to_insert:1;
 	    }
-	    retval = reiserfs_paste_into_item (&th, &path, &tmp_key, (char *)un, UNFM_P_SIZE * blocks_needed);
+	    retval = reiserfs_paste_into_item (th, &path, &tmp_key, (char *)un, UNFM_P_SIZE * blocks_needed);
 
 	    if (blocks_needed != 1)
 		kfree(un);
 
 	    if (retval) {
-		reiserfs_free_block (&th, allocated_block_nr);
+		reiserfs_free_block (th, allocated_block_nr);
 		goto failure;
 	    }
 	    if (done) {
@@ -889,8 +896,8 @@ int reiserfs_get_block (struct inode * i
 	** release the path so that anybody waiting on the path before
 	** ending their transaction will be able to continue.
 	*/
-	if (journal_transaction_should_end(&th, th.t_blocks_allocated)) {
-	  restart_transaction(&th, inode, &path) ; 
+	if (journal_transaction_should_end(th, th->t_blocks_allocated)) {
+	  restart_transaction(th, inode, &path) ;
 	}
 	/* inserting indirect pointers for a hole can take a 
 	** long time.  reschedule if needed
@@ -907,7 +914,7 @@ int reiserfs_get_block (struct inode * i
 			      "%K should not be found\n", &key);
 	    retval = -EEXIST;
 	    if (allocated_block_nr)
-	        reiserfs_free_block (&th, allocated_block_nr);
+	        reiserfs_free_block (th, allocated_block_nr);
 	    pathrelse(&path) ;
 	    goto failure;
 	}
@@ -921,9 +928,9 @@ int reiserfs_get_block (struct inode * i
     retval = 0;
 
  failure:
-    if (transaction_started) {
-      reiserfs_update_sd(&th, inode) ;
-      journal_end(&th, inode->i_sb, jbegin_count) ;
+    if (th && !dangle) {
+      reiserfs_update_sd(th, inode) ;
+      reiserfs_end_persistent_transaction(th);
     }
     reiserfs_write_unlock(inode->i_sb);
     reiserfs_check_path(&path) ;
@@ -2007,7 +2014,8 @@ out:
     /* this is where we fill in holes in the file. */
     if (use_get_block) {
 	retval = reiserfs_get_block(inode, block, bh_result, 
-	                            GET_BLOCK_CREATE | GET_BLOCK_NO_ISEM) ;
+	                            GET_BLOCK_CREATE | GET_BLOCK_NO_ISEM |
+				    GET_BLOCK_NO_DANGLE);
 	if (!retval) {
 	    if (!buffer_mapped(bh_result) || bh_result->b_blocknr == 0) {
 	        /* get_block failed to find a mapped unformatted node. */
@@ -2219,13 +2227,43 @@ static int reiserfs_writepage (struct pa
     return reiserfs_write_full_page(page, wbc) ;
 }
 
-
 int reiserfs_prepare_write(struct file *f, struct page *page, 
 			   unsigned from, unsigned to) {
     struct inode *inode = page->mapping->host ;
+    int ret;
+    int old_ref = 0;
+
     reiserfs_wait_on_write_block(inode->i_sb) ;
     fix_tail_page_for_writing(page) ;
-    return block_prepare_write(page, from, to, reiserfs_get_block) ;
+    if (reiserfs_transaction_running(inode->i_sb)) {
+	struct reiserfs_transaction_handle *th;
+        th = (struct reiserfs_transaction_handle *)current->journal_info;
+	old_ref = th->t_refcount;
+	th->t_refcount++;
+    }
+
+    ret = block_prepare_write(page, from, to, reiserfs_get_block) ;
+    if (ret && reiserfs_transaction_running(inode->i_sb)) {
+    	struct reiserfs_transaction_handle *th = current->journal_info;
+	/* this gets a little ugly.  If reiserfs_get_block returned an
+	 * error and left a transacstion running, we've got to close it,
+	 * and we've got to free handle if it was a persistent transaction.
+	 *
+	 * But, if we had nested into an existing transaction, we need
+	 * to just drop the ref count on the handle.
+	 *
+	 * If old_ref == 0, the transaction is from reiserfs_get_block,
+	 * and it was a persistent trans.  Otherwise, it was nested above.
+	 */
+	if (th->t_refcount > old_ref) {
+	    if (old_ref)
+	    	th->t_refcount--;
+	    else
+		reiserfs_end_persistent_transaction(th);
+	}
+    }
+    return ret;
+
 }
 
 
@@ -2237,16 +2275,21 @@ static int reiserfs_commit_write(struct 
                                  unsigned from, unsigned to) {
     struct inode *inode = page->mapping->host ;
     loff_t pos = ((loff_t)page->index << PAGE_CACHE_SHIFT) + to;
-    int ret ; 
+    int ret = 0;
+    struct reiserfs_transaction_handle *th = NULL;
     
     reiserfs_wait_on_write_block(inode->i_sb) ;
+    if (reiserfs_transaction_running(inode->i_sb)) {
+        th = current->journal_info;
+    }
+    reiserfs_commit_page(inode, page, from, to);
  
     /* generic_commit_write does this for us, but does not update the
     ** transaction tracking stuff when the size changes.  So, we have
     ** to do the i_size updates here.
     */
     if (pos > inode->i_size) {
-	struct reiserfs_transaction_handle th ;
+	struct reiserfs_transaction_handle myth ;
 	reiserfs_write_lock(inode->i_sb);
 	/* If the file have grown beyond the border where it
 	   can have a tail, unmark it as needing a tail
@@ -2255,16 +2298,19 @@ static int reiserfs_commit_write(struct 
 	     (have_small_tails (inode->i_sb) && inode->i_size > i_block_size(inode)) )
 	    REISERFS_I(inode)->i_flags &= ~i_pack_on_close_mask ;
 
-	journal_begin(&th, inode->i_sb, 1) ;
+	journal_begin(&myth, inode->i_sb, 1) ;
 	reiserfs_update_inode_transaction(inode) ;
 	inode->i_size = pos ;
-	reiserfs_update_sd(&th, inode) ;
-	journal_end(&th, inode->i_sb, 1) ;
+	reiserfs_update_sd(&myth, inode) ;
+	journal_end(&myth, inode->i_sb, 1) ;
+	reiserfs_write_unlock(inode->i_sb);
+    }
+    if (th) {
+	reiserfs_write_lock(inode->i_sb);
+        reiserfs_end_persistent_transaction(th);
 	reiserfs_write_unlock(inode->i_sb);
     }
  
-    ret = generic_commit_write(f, page, from, to) ;
-
     /* we test for O_SYNC here so we can commit the transaction
     ** for any packed tails the file might have had
     */
@@ -2324,16 +2370,110 @@ void i_attrs_to_sd_attrs( struct inode *
 	}
 }
 
+/* decide if this buffer needs to stay around for data logging or ordered
+** write purposes
+*/
+static int invalidatepage_can_drop(struct inode *inode, struct buffer_head *bh)
+{
+    int ret = 1 ;
+    struct reiserfs_journal *j = SB_JOURNAL(inode->i_sb) ;
+
+    spin_lock(&j->j_dirty_buffers_lock) ;
+    if (!buffer_mapped(bh)) {
+        goto free_jh;
+    }
+    /* the page is locked, and the only places that log a data buffer
+     * also lock the page.
+     */
+#if 0
+    if (reiserfs_file_data_log(inode)) {
+	/* very conservative, leave the buffer pinned if anyone might need it.
+	** this should be changed to drop the buffer if it is only in the
+	** current transaction
+	*/
+        if (buffer_journaled(bh) || buffer_journal_dirty(bh)) {
+	    ret = 0 ;
+	}
+    } else
+#endif
+    if (buffer_dirty(bh) || buffer_locked(bh)) {
+	struct reiserfs_journal_list *jl;
+	struct reiserfs_jh *jh = bh->b_private;
+
+	/* why is this safe?
+	 * reiserfs_setattr updates i_size in the on disk
+	 * stat data before allowing vmtruncate to be called.
+	 *
+	 * If buffer was put onto the ordered list for this
+	 * transaction, we know for sure either this transaction
+	 * or an older one already has updated i_size on disk,
+	 * and this ordered data won't be referenced in the file
+	 * if we crash.
+	 *
+	 * if the buffer was put onto the ordered list for an older
+	 * transaction, we need to leave it around
+	 */
+	if (jh && (jl = jh->jl) && jl != SB_JOURNAL(inode->i_sb)->j_current_jl)
+	    ret = 0;
+    }
+free_jh:
+    if (ret && bh->b_private) {
+        reiserfs_free_jh(bh);
+    }
+    spin_unlock(&j->j_dirty_buffers_lock) ;
+    return ret ;
+}
+
+/* clm -- taken from fs/buffer.c:block_invalidate_page */
+static int reiserfs_invalidatepage(struct page *page, unsigned long offset)
+{
+    struct buffer_head *head, *bh, *next;
+    struct inode *inode = page->mapping->host;
+    unsigned int curr_off = 0;
+    int ret = 1;
+
+    BUG_ON(!PageLocked(page));
+    if (!page_has_buffers(page))
+	goto out;
+
+    head = page_buffers(page);
+    bh = head;
+    do {
+	unsigned int next_off = curr_off + bh->b_size;
+	next = bh->b_this_page;
+
+	/*
+	 * is this block fully invalidated?
+	 */
+	if (offset <= curr_off) {
+	    if (invalidatepage_can_drop(inode, bh))
+		reiserfs_unmap_buffer(bh);
+	    else
+	        ret = 0;
+	}
+	curr_off = next_off;
+	bh = next;
+    } while (bh != head);
+
+    /*
+     * We release buffers only if the entire page is being invalidated.
+     * The get_block cached value has been unconditionally invalidated,
+     * so real IO is not possible anymore.
+     */
+    if (!offset && ret)
+	ret = try_to_release_page(page, 0);
+out:
+    return ret;
+}
+
 /*
  * Returns 1 if the page's buffers were dropped.  The page is locked.
  *
  * Takes j_dirty_buffers_lock to protect the b_assoc_buffers list_heads
  * in the buffers at page_buffers(page).
  *
- * FIXME: Chris says the buffer list is not used with `mount -o notail',
- * so in that case the fs can avoid the extra locking.  Create a second
- * address_space_operations with a NULL ->releasepage and install that
- * into new address_spaces.
+ * even in -o notail mode, we can't be sure an old mount without -o notail
+ * didn't create files with tails.
  */
 static int reiserfs_releasepage(struct page *page, int unused_gfp_flags)
 {
@@ -2347,11 +2487,13 @@ static int reiserfs_releasepage(struct p
     head = page_buffers(page) ;
     bh = head ;
     do {
-	if (!buffer_dirty(bh) && !buffer_locked(bh)) {
-		list_del_init(&bh->b_assoc_buffers) ;
-	} else {
+	if (bh->b_private) {
+	    if (!buffer_dirty(bh) && !buffer_locked(bh)) {
+		reiserfs_free_jh(bh);
+	    } else {
 		ret = 0 ;
 		break ;
+	    }
 	}
 	bh = bh->b_this_page ;
     } while (bh != head) ;
@@ -2379,6 +2521,7 @@ struct address_space_operations reiserfs
     .readpage = reiserfs_readpage, 
     .readpages = reiserfs_readpages, 
     .releasepage = reiserfs_releasepage,
+    .invalidatepage = reiserfs_invalidatepage,
     .sync_page = block_sync_page,
     .prepare_write = reiserfs_prepare_write,
     .commit_write = reiserfs_commit_write,
diff -puN fs/reiserfs/ioctl.c~reiserfs-jh-2 fs/reiserfs/ioctl.c
--- 25/fs/reiserfs/ioctl.c~reiserfs-jh-2	2004-04-06 20:57:15.721002112 -0700
+++ 25-akpm/fs/reiserfs/ioctl.c	2004-04-06 20:57:15.741998920 -0700
@@ -92,6 +92,7 @@ int reiserfs_unpack (struct inode * inod
     int retval = 0;
     int index ;
     struct page *page ;
+    struct address_space *mapping ;
     unsigned long write_from ;
     unsigned long blocksize = inode->i_sb->s_blocksize ;
     	
@@ -122,17 +123,19 @@ int reiserfs_unpack (struct inode * inod
     ** reiserfs_get_block to unpack the tail for us.
     */
     index = inode->i_size >> PAGE_CACHE_SHIFT ;
-    page = grab_cache_page(inode->i_mapping, index) ;
+    mapping = inode->i_mapping ;
+    page = grab_cache_page(mapping, index) ;
     retval = -ENOMEM;
     if (!page) {
         goto out ;
     }
-    retval = reiserfs_prepare_write(NULL, page, write_from, blocksize) ;
+    retval = mapping->a_ops->prepare_write(NULL, page, write_from, write_from) ;
     if (retval)
         goto out_unlock ;
 
     /* conversion can change page contents, must flush */
     flush_dcache_page(page) ;
+    retval = mapping->a_ops->commit_write(NULL, page, write_from, write_from) ;
     REISERFS_I(inode)->i_flags |= i_nopack_mask;
 
 out_unlock:
diff -puN fs/reiserfs/journal.c~reiserfs-jh-2 fs/reiserfs/journal.c
--- 25/fs/reiserfs/journal.c~reiserfs-jh-2	2004-04-06 20:57:15.723001808 -0700
+++ 25-akpm/fs/reiserfs/journal.c	2004-04-06 20:57:15.746998160 -0700
@@ -54,6 +54,7 @@
 #include <linux/buffer_head.h>
 #include <linux/workqueue.h>
 #include <linux/writeback.h>
+#include <linux/blkdev.h>
 
 
 /* gets a struct reiserfs_journal_list * from a list head */
@@ -595,6 +596,248 @@ static int journal_list_still_alive(stru
     return 0;
 }
 
+static void reiserfs_end_buffer_io_sync(struct buffer_head *bh, int uptodate) {
+    char b[BDEVNAME_SIZE];
+
+    if (buffer_journaled(bh)) {
+        reiserfs_warning("clm-2084: pinned buffer %lu:%s sent to disk\n",
+	                 bh->b_blocknr, bdevname(bh->b_bdev, b)) ;
+    }
+    if (uptodate)
+    	set_buffer_uptodate(bh) ;
+    else
+    	clear_buffer_uptodate(bh) ;
+    unlock_buffer(bh) ;
+    put_bh(bh) ;
+}
+
+static void reiserfs_end_ordered_io(struct buffer_head *bh, int uptodate) {
+    if (uptodate)
+    	set_buffer_uptodate(bh) ;
+    else
+    	clear_buffer_uptodate(bh) ;
+    unlock_buffer(bh) ;
+    put_bh(bh) ;
+}
+
+static void submit_logged_buffer(struct buffer_head *bh) {
+    get_bh(bh) ;
+    bh->b_end_io = reiserfs_end_buffer_io_sync ;
+    mark_buffer_notjournal_new(bh) ;
+    clear_buffer_dirty(bh) ;
+    if (!test_and_clear_bit(BH_JTest, &bh->b_state))
+        BUG();
+    if (!buffer_uptodate(bh))
+        BUG();
+    submit_bh(WRITE, bh) ;
+}
+
+static void submit_ordered_buffer(struct buffer_head *bh) {
+    get_bh(bh) ;
+    bh->b_end_io = reiserfs_end_ordered_io;
+    clear_buffer_dirty(bh) ;
+    if (!buffer_uptodate(bh))
+        BUG();
+    submit_bh(WRITE, bh) ;
+}
+
+#define CHUNK_SIZE 32
+struct buffer_chunk {
+    struct buffer_head *bh[CHUNK_SIZE];
+    int nr;
+};
+
+static void write_chunk(struct buffer_chunk *chunk) {
+    int i;
+    for (i = 0; i < chunk->nr ; i++) {
+	submit_logged_buffer(chunk->bh[i]) ;
+    }
+    chunk->nr = 0;
+}
+
+static void write_ordered_chunk(struct buffer_chunk *chunk) {
+    int i;
+    for (i = 0; i < chunk->nr ; i++) {
+	submit_ordered_buffer(chunk->bh[i]) ;
+    }
+    chunk->nr = 0;
+}
+
+static int add_to_chunk(struct buffer_chunk *chunk, struct buffer_head *bh,
+			 spinlock_t *lock,
+			 void (fn)(struct buffer_chunk *))
+{
+    int ret = 0;
+    if (chunk->nr >= CHUNK_SIZE)
+        BUG();
+    chunk->bh[chunk->nr++] = bh;
+    if (chunk->nr >= CHUNK_SIZE) {
+	ret = 1;
+        if (lock)
+	    spin_unlock(lock);
+        fn(chunk);
+        if (lock)
+	    spin_lock(lock);
+    }
+    return ret;
+}
+
+
+atomic_t nr_reiserfs_jh = ATOMIC_INIT(0);
+static struct reiserfs_jh *alloc_jh(void) {
+    struct reiserfs_jh *jh;
+    while(1) {
+	jh = kmalloc(sizeof(*jh), GFP_NOFS);
+	if (jh) {
+	    atomic_inc(&nr_reiserfs_jh);
+	    return jh;
+	}
+        yield();
+    }
+}
+
+/*
+ * we want to free the jh when the buffer has been written
+ * and waited on
+ */
+void reiserfs_free_jh(struct buffer_head *bh) {
+    struct reiserfs_jh *jh;
+
+    jh = bh->b_private;
+    if (jh) {
+	bh->b_private = NULL;
+	jh->bh = NULL;
+	list_del_init(&jh->list);
+	kfree(jh);
+	if (atomic_read(&nr_reiserfs_jh) <= 0)
+	    BUG();
+	atomic_dec(&nr_reiserfs_jh);
+	put_bh(bh);
+    }
+}
+
+static inline int __add_jh(struct reiserfs_journal *j, struct buffer_head *bh,
+                           int tail)
+{
+    struct reiserfs_jh *jh;
+
+    if (bh->b_private) {
+	spin_lock(&j->j_dirty_buffers_lock);
+	if (!bh->b_private) {
+	    spin_unlock(&j->j_dirty_buffers_lock);
+	    goto no_jh;
+	}
+        jh = bh->b_private;
+	list_del_init(&jh->list);
+    } else {
+no_jh:
+	get_bh(bh);
+	jh = alloc_jh();
+	spin_lock(&j->j_dirty_buffers_lock);
+	/* buffer must be locked for __add_jh, should be able to have
+	 * two adds at the same time
+	 */
+	if (bh->b_private)
+	    BUG();
+	jh->bh = bh;
+	bh->b_private = jh;
+    }
+    jh->jl = j->j_current_jl;
+    if (tail)
+	list_add_tail(&jh->list, &jh->jl->j_tail_bh_list);
+    else {
+	list_add_tail(&jh->list, &jh->jl->j_bh_list);
+    }
+    spin_unlock(&j->j_dirty_buffers_lock);
+    return 0;
+}
+
+int reiserfs_add_tail_list(struct inode *inode, struct buffer_head *bh) {
+    return __add_jh(SB_JOURNAL(inode->i_sb), bh, 1);
+}
+int reiserfs_add_ordered_list(struct inode *inode, struct buffer_head *bh) {
+    return __add_jh(SB_JOURNAL(inode->i_sb), bh, 0);
+}
+
+#define JH_ENTRY(l) list_entry((l), struct reiserfs_jh, list)
+static int write_ordered_buffers(spinlock_t *lock,
+				 struct reiserfs_journal *j,
+                                 struct reiserfs_journal_list *jl,
+				 struct list_head *list)
+{
+    struct buffer_head *bh;
+    struct reiserfs_jh *jh;
+    int ret = 0;
+    struct buffer_chunk chunk;
+    struct list_head tmp;
+    INIT_LIST_HEAD(&tmp);
+
+    chunk.nr = 0;
+    spin_lock(lock);
+    while(!list_empty(list)) {
+        jh = JH_ENTRY(list->next);
+	bh = jh->bh;
+	get_bh(bh);
+	if (test_set_buffer_locked(bh)) {
+	    if (!buffer_dirty(bh)) {
+		list_del_init(&jh->list);
+		list_add(&jh->list, &tmp);
+		goto loop_next;
+	    }
+	    spin_unlock(lock);
+	    if (chunk.nr)
+		write_ordered_chunk(&chunk);
+	    wait_on_buffer(bh);
+	    if (need_resched)
+	        schedule();
+	    spin_lock(lock);
+	    goto loop_next;
+	}
+	if (buffer_dirty(bh)) {
+	    list_del_init(&jh->list);
+	    list_add(&jh->list, &tmp);
+	    add_to_chunk(&chunk, bh, lock, write_ordered_chunk);
+	} else {
+	    reiserfs_free_jh(bh);
+	    unlock_buffer(bh);
+	}
+loop_next:
+	put_bh(bh);
+	if (chunk.nr == 0 && need_resched) {
+	    spin_unlock(lock);
+	    schedule();
+	    spin_lock(lock);
+	}
+    }
+    if (chunk.nr) {
+	spin_unlock(lock);
+        write_ordered_chunk(&chunk);
+	spin_lock(lock);
+    }
+    while(!list_empty(&tmp)) {
+        jh = JH_ENTRY(tmp.prev);
+	bh = jh->bh;
+	get_bh(bh);
+	reiserfs_free_jh(bh);
+
+	if (buffer_locked(bh)) {
+	    spin_unlock(lock);
+	    wait_on_buffer(bh);
+	    spin_lock(lock);
+	}
+	if (!buffer_uptodate(bh))
+	    ret = -EIO;
+	put_bh(bh);
+	if (need_resched()) {
+	    spin_unlock(lock);
+	    schedule();
+	    spin_lock(lock);
+	}
+    }
+    spin_unlock(lock);
+    return ret;
+}
+
 static int flush_older_commits(struct super_block *s, struct reiserfs_journal_list *jl) {
     struct reiserfs_journal_list *other_jl;
     struct reiserfs_journal_list *first_jl;
@@ -656,6 +899,13 @@ find_first:
     }
     return 0;
 }
+int reiserfs_async_progress_wait(struct super_block *s) {
+    DEFINE_WAIT(wait);
+    struct reiserfs_journal *j = SB_JOURNAL(s);
+    if (atomic_read(&j->j_async_throttle))
+    	blk_congestion_wait(WRITE, HZ/10);
+    return 0;
+}
 
 /*
 ** if this journal list still has commit blocks unflushed, send them to disk.
@@ -710,28 +960,40 @@ static int flush_commit_list(struct supe
     goto put_jl;
   }
 
+  if (!list_empty(&jl->j_bh_list)) {
+      unlock_kernel();
+      write_ordered_buffers(&SB_JOURNAL(s)->j_dirty_buffers_lock,
+                            SB_JOURNAL(s), jl, &jl->j_bh_list);
+      lock_kernel();
+  }
+  if (!list_empty(&jl->j_bh_list))
+      BUG();
   /*
    * for the description block and all the log blocks, submit any buffers
    * that haven't already reached the disk
    */
+  atomic_inc(&SB_JOURNAL(s)->j_async_throttle);
   for (i = 0 ; i < (jl->j_len + 1) ; i++) {
     bn = SB_ONDISK_JOURNAL_1st_BLOCK(s) + (jl->j_start+i) %
          SB_ONDISK_JOURNAL_SIZE(s);
     tbh = journal_find_get_block(s, bn) ;
-    wait_on_buffer(tbh) ;
-    ll_rw_block(WRITE, 1, &tbh) ;
+    if (buffer_dirty(tbh))
+	ll_rw_block(WRITE, 1, &tbh) ;
     put_bh(tbh) ;
   }
+  atomic_dec(&SB_JOURNAL(s)->j_async_throttle);
 
   /* wait on everything written so far before writing the commit */
   for (i = 0 ;  i < (jl->j_len + 1) ; i++) {
     bn = SB_ONDISK_JOURNAL_1st_BLOCK(s) +
 	 (jl->j_start + i) % SB_ONDISK_JOURNAL_SIZE(s) ;
     tbh = journal_find_get_block(s, bn) ;
-
     wait_on_buffer(tbh) ;
+    // since we're using ll_rw_blk above, it might have skipped over
+    // a locked buffer.  Double check here
+    //
     if (buffer_dirty(tbh))
-      BUG();
+      sync_dirty_buffer(tbh);
     if (!buffer_uptodate(tbh)) {
       reiserfs_panic(s, "journal-601, buffer write failed\n") ;
     }
@@ -892,33 +1154,6 @@ restart:
     return 0 ;
 }
 
-static void reiserfs_end_buffer_io_sync(struct buffer_head *bh, int uptodate) {
-    char b[BDEVNAME_SIZE];
-
-    if (buffer_journaled(bh)) {
-        reiserfs_warning("clm-2084: pinned buffer %lu:%s sent to disk\n",
-	                 bh->b_blocknr, bdevname(bh->b_bdev, b)) ;
-    }
-    if (uptodate)
-    	set_buffer_uptodate(bh) ;
-    else
-    	clear_buffer_uptodate(bh) ;
-    unlock_buffer(bh) ;
-    put_bh(bh) ;
-}
-
-static void submit_logged_buffer(struct buffer_head *bh) {
-    get_bh(bh) ;
-    bh->b_end_io = reiserfs_end_buffer_io_sync ;
-    mark_buffer_notjournal_new(bh) ;
-    clear_buffer_dirty(bh) ;
-    if (!test_and_clear_bit(BH_JTest, &bh->b_state))
-        BUG();
-    if (!buffer_uptodate(bh))
-        BUG();
-    submit_bh(WRITE, bh) ;
-}
-
 static void del_from_work_list(struct super_block *s,
                                struct reiserfs_journal_list *jl) {
     if (!list_empty(&jl->j_working_list)) {
@@ -1158,28 +1393,6 @@ flush_older_and_return:
   return 0 ;
 } 
 
-#define CHUNK_SIZE 32
-struct buffer_chunk {
-    struct buffer_head *bh[CHUNK_SIZE];
-    int nr;
-};
-
-static void write_chunk(struct buffer_chunk *chunk) {
-    int i;
-    for (i = 0; i < chunk->nr ; i++) {
-	submit_logged_buffer(chunk->bh[i]) ;
-    }
-    chunk->nr = 0;
-}
-
-static void add_to_chunk(struct buffer_chunk *chunk, struct buffer_head *bh) {
-    if (chunk->nr >= CHUNK_SIZE)
-        BUG();
-    chunk->bh[chunk->nr++] = bh;
-    if (chunk->nr >= CHUNK_SIZE)
-        write_chunk(chunk);
-}
-
 static int write_one_transaction(struct super_block *s,
                                  struct reiserfs_journal_list *jl,
 				 struct buffer_chunk *chunk)
@@ -1214,7 +1427,7 @@ static int write_one_transaction(struct 
 		if (!buffer_journal_dirty(tmp_bh) ||
 		    reiserfs_buffer_prepared(tmp_bh))
 		    BUG();
-		add_to_chunk(chunk, tmp_bh);
+		add_to_chunk(chunk, tmp_bh, NULL, write_chunk);
 		ret++;
 	    } else {
 		/* note, cn->bh might be null now */
@@ -1937,6 +2150,8 @@ retry:
     memset(jl, 0, sizeof(*jl));
     INIT_LIST_HEAD(&jl->j_list);
     INIT_LIST_HEAD(&jl->j_working_list);
+    INIT_LIST_HEAD(&jl->j_tail_bh_list);
+    INIT_LIST_HEAD(&jl->j_bh_list);
     sema_init(&jl->j_commit_lock, 1);
     SB_JOURNAL(s)->j_num_lists++;
     get_journal_list(jl);
@@ -2166,6 +2381,7 @@ int journal_init(struct super_block *p_s
   SB_JOURNAL(p_s_sb)->j_len = 0 ;
   SB_JOURNAL(p_s_sb)->j_len_alloc = 0 ;
   atomic_set(&(SB_JOURNAL(p_s_sb)->j_wcount), 0) ;
+  atomic_set(&(SB_JOURNAL(p_s_sb)->j_async_throttle), 0) ;
   SB_JOURNAL(p_s_sb)->j_bcount = 0 ;	  
   SB_JOURNAL(p_s_sb)->j_trans_start_time = 0 ;	  
   SB_JOURNAL(p_s_sb)->j_last = NULL ;	  
@@ -2376,6 +2592,43 @@ relock:
   return 0 ;
 }
 
+struct reiserfs_transaction_handle *
+reiserfs_persistent_transaction(struct super_block *s, int nblocks) {
+    int ret ;
+    struct reiserfs_transaction_handle *th ;
+
+    /* if we're nesting into an existing transaction.  It will be
+    ** persistent on its own
+    */
+    if (reiserfs_transaction_running(s)) {
+        th = current->journal_info ;
+	th->t_refcount++ ;
+	if (th->t_refcount < 2) {
+	    BUG() ;
+	}
+	return th ;
+    }
+    th = reiserfs_kmalloc(sizeof(struct reiserfs_transaction_handle), GFP_NOFS, s) ;
+    if (!th)
+       return NULL;
+    ret = journal_begin(th, s, nblocks) ;
+    if (ret) {
+	reiserfs_kfree(th, sizeof(struct reiserfs_transaction_handle), s) ;
+        return NULL;
+    }
+    return th ;
+}
+
+int
+reiserfs_end_persistent_transaction(struct reiserfs_transaction_handle *th) {
+    struct super_block *s = th->t_super;
+    int ret;
+    ret = journal_end(th, th->t_super, th->t_blocks_allocated);
+    if (th->t_refcount == 0)
+	reiserfs_kfree(th, sizeof(struct reiserfs_transaction_handle), s) ;
+    return ret;
+}
+
 static int journal_join(struct reiserfs_transaction_handle *th, struct super_block *p_s_sb, unsigned long nblocks) {
   struct reiserfs_transaction_handle *cur_th = current->journal_info;
 
@@ -2522,7 +2775,9 @@ int journal_mark_dirty(struct reiserfs_t
 int journal_end(struct reiserfs_transaction_handle *th, struct super_block *p_s_sb, unsigned long nblocks) {
   if (!current->journal_info && th->t_refcount > 1)
     printk("REISER-NESTING: th NULL, refcount %d\n", th->t_refcount);
-  if (th->t_refcount > 1) {
+
+  th->t_refcount--;
+  if (th->t_refcount > 0) {
     struct reiserfs_transaction_handle *cur_th = current->journal_info ;
 
     /* we aren't allowed to close a nested transaction on a different
@@ -2531,7 +2786,6 @@ int journal_end(struct reiserfs_transact
     if (cur_th->t_super != th->t_super)
       BUG() ;
 
-    th->t_refcount--;
     if (th != cur_th) {
       memcpy(current->journal_info, th, sizeof(*th));
       th->t_trans_id = 0;
@@ -2648,14 +2902,7 @@ int journal_end_sync(struct reiserfs_tra
 }
 
 /*
-** used to get memory back from async commits that are floating around
-** and to reclaim any blocks deleted but unusable because their commits
-** haven't hit disk yet.  called from bitmap.c
-**
-** if it starts flushing things, it ors SCHEDULE_OCCURRED into repeat.
-** note, this is just if schedule has a chance of occurring.  I need to 
-** change flush_commit_lists to have a repeat parameter too.
-**
+** writeback the pending async commits to disk
 */
 static void flush_async_commits(void *p) {
   struct super_block *p_s_sb = p;
@@ -2670,6 +2917,9 @@ static void flush_async_commits(void *p)
       flush_commit_list(p_s_sb, jl, 1);
   }
   unlock_kernel();
+  atomic_inc(&SB_JOURNAL(p_s_sb)->j_async_throttle);
+  filemap_fdatawrite(p_s_sb->s_bdev->bd_inode->i_mapping);
+  atomic_dec(&SB_JOURNAL(p_s_sb)->j_async_throttle);
 }
 
 /*
@@ -3072,6 +3322,7 @@ static int do_journal_end(struct reiserf
   if (!check_journal_end(th, p_s_sb, nblocks, flags)) {
     p_s_sb->s_dirt = 1;
     wake_queued_writers(p_s_sb);
+    reiserfs_async_progress_wait(p_s_sb);
     goto out ;
   }
 
@@ -3248,23 +3499,38 @@ static int do_journal_end(struct reiserf
   SB_JOURNAL(p_s_sb)->j_next_async_flush = 0 ;
   init_journal_hash(p_s_sb) ; 
 
+  // make sure reiserfs_add_jh sees the new current_jl before we
+  // write out the tails
+  smp_mb();
+
   /* tail conversion targets have to hit the disk before we end the
    * transaction.  Otherwise a later transaction might repack the tail
    * before this transaction commits, leaving the data block unflushed and
    * clean, if we crash before the later transaction commits, the data block
    * is lost.
    */
-  fsync_buffers_list(&(SB_JOURNAL(p_s_sb)->j_dirty_buffers_lock),
-		     &(SB_JOURNAL(p_s_sb)->j_dirty_buffers)) ;
+  if (!list_empty(&jl->j_tail_bh_list)) {
+      unlock_kernel();
+      write_ordered_buffers(&SB_JOURNAL(p_s_sb)->j_dirty_buffers_lock,
+			    SB_JOURNAL(p_s_sb), jl, &jl->j_tail_bh_list);
+      lock_kernel();
+  }
+  if (!list_empty(&jl->j_tail_bh_list))
+      BUG();
   up(&jl->j_commit_lock);
 
   /* honor the flush wishes from the caller, simple commits can
   ** be done outside the journal lock, they are done below
+  **
+  ** if we don't flush the commit list right now, we put it into
+  ** the work queue so the people waiting on the async progress work
+  ** queue don't wait for this proc to flush journal lists and such.
   */
   if (flush) {
     flush_commit_list(p_s_sb, jl, 1) ;
     flush_journal_list(p_s_sb, jl, 1) ;
-  }
+  } else
+    queue_work(commit_wq, &SB_JOURNAL(p_s_sb)->j_work);
 
 
   /* if the next transaction has any chance of wrapping, flush 
@@ -3322,15 +3588,12 @@ first_jl:
   clear_bit(WRITERS_QUEUED, &SB_JOURNAL(p_s_sb)->j_state);
   wake_up(&(SB_JOURNAL(p_s_sb)->j_join_wait)) ;
 
-  if (!flush) {
-      if (wait_on_commit) {
-	  if (journal_list_still_alive(p_s_sb, commit_trans_id))
-	      flush_commit_list(p_s_sb, jl, 1) ;
-      } else {
-          queue_work(commit_wq, &SB_JOURNAL(p_s_sb)->j_work);
-      }
+  if (!flush && wait_on_commit &&
+      journal_list_still_alive(p_s_sb, commit_trans_id)) {
+	  flush_commit_list(p_s_sb, jl, 1) ;
   }
 out:
   reiserfs_check_lock_depth("journal end2");
+  th->t_trans_id = 0;
   return 0 ;
 }
diff -puN fs/reiserfs/super.c~reiserfs-jh-2 fs/reiserfs/super.c
--- 25/fs/reiserfs/super.c~reiserfs-jh-2	2004-04-06 20:57:15.724001656 -0700
+++ 25-akpm/fs/reiserfs/super.c	2004-04-06 20:57:15.748997856 -0700
@@ -510,6 +510,14 @@ typedef struct {
 		    applied BEFORE setmask */
 } opt_desc_t;
 
+/* possible values for -o data= */
+static const arg_desc_t logging_mode[] = {
+    {"ordered", 1<<REISERFS_DATA_ORDERED, (1<<REISERFS_DATA_LOG|1<<REISERFS_DATA_WRITEBACK)},
+    {"journal", 1<<REISERFS_DATA_LOG, (1<<REISERFS_DATA_ORDERED|1<<REISERFS_DATA_WRITEBACK)},
+    {"writeback", 1<<REISERFS_DATA_WRITEBACK, (1<<REISERFS_DATA_ORDERED|1<<REISERFS_DATA_LOG)},
+    {NULL, 0}
+};
+
 /* possible values for "-o block-allocator=" and bits which are to be set in
    s_mount_opt of reiserfs specific part of in-core super block */
 static const arg_desc_t balloc[] = {
@@ -664,6 +672,7 @@ static int reiserfs_parse_options (struc
 	{"nolog", 0, 0, 0, 0}, /* This is unsupported */
 	{"replayonly", 0, 0, 1<<REPLAYONLY, 0},
 	{"block-allocator", 'a', balloc, 0, 0},
+	{"data", 'd', logging_mode, 0, 0},
 	{"resize", 'r', 0, 0, 0},
 	{"jdev", 'j', 0, 0, 0},
 	{"nolargeio", 'w', 0, 0, 0},
@@ -737,6 +746,33 @@ static int reiserfs_parse_options (struc
     return 1;
 }
 
+static void switch_data_mode(struct super_block *s, unsigned long mode) {
+    REISERFS_SB(s)->s_mount_opt &= ~((1 << REISERFS_DATA_LOG) |
+                                       (1 << REISERFS_DATA_ORDERED) |
+				       (1 << REISERFS_DATA_WRITEBACK));
+    REISERFS_SB(s)->s_mount_opt |= (1 << mode);
+}
+
+static void handle_data_mode(struct super_block *s, unsigned long mount_options)
+{
+    if (mount_options & (1 << REISERFS_DATA_LOG)) {
+        if (!reiserfs_data_log(s)) {
+	    switch_data_mode(s, REISERFS_DATA_LOG);
+	    printk("reiserfs: switching to journaled data mode\n");
+	}
+    } else if (mount_options & (1 << REISERFS_DATA_ORDERED)) {
+        if (!reiserfs_data_ordered(s)) {
+	    switch_data_mode(s, REISERFS_DATA_ORDERED);
+	    printk("reiserfs: switching to ordered data mode\n");
+	}
+    } else if (mount_options & (1 << REISERFS_DATA_WRITEBACK)) {
+        if (!reiserfs_data_writeback(s)) {
+	    switch_data_mode(s, REISERFS_DATA_WRITEBACK);
+	    printk("reiserfs: switching to writeback data mode\n");
+	}
+    }
+}
+
 static void handle_attrs( struct super_block *s )
 {
 	struct reiserfs_super_block * rs;
@@ -814,6 +850,7 @@ static int reiserfs_remount (struct supe
     if (!(s->s_flags & MS_RDONLY))
 	return 0; /* We are read-write already */
 
+    handle_data_mode(s, mount_options);
     REISERFS_SB(s)->s_mount_state = sb_umount_state(rs) ;
     s->s_flags &= ~MS_RDONLY ; /* now it is safe to call journal_begin */
     journal_begin(&th, s, 10) ;
@@ -1306,6 +1343,21 @@ static int reiserfs_fill_super (struct s
     SPRINTK(silent, "reiserfs:warning: - it is slow mode for debugging.\n");
 #endif
 
+    /* make data=ordered the default */
+    if (!reiserfs_data_log(s) && !reiserfs_data_ordered(s) &&
+        !reiserfs_data_writeback(s))
+    {
+         REISERFS_SB(s)->s_mount_opt |= (1 << REISERFS_DATA_ORDERED);
+    }
+
+    if (reiserfs_data_log(s)) {
+        printk("reiserfs: using journaled data mode\n");
+    } else if (reiserfs_data_ordered(s)) {
+        printk("reiserfs: using ordered data mode\n");
+    } else {
+        printk("reiserfs: using writeback data mode\n");
+    }
+
     // set_device_ro(s->s_dev, 1) ;
     if( journal_init(s, jdev_name, old_format, commit_max_age) ) {
 	SPRINTK(silent, "sh-2022: reiserfs_fill_super: unable to initialize journal space\n") ;
diff -puN include/linux/reiserfs_fs.h~reiserfs-jh-2 include/linux/reiserfs_fs.h
--- 25/include/linux/reiserfs_fs.h~reiserfs-jh-2	2004-04-06 20:57:15.726001352 -0700
+++ 25-akpm/include/linux/reiserfs_fs.h	2004-04-06 20:57:15.751997400 -0700
@@ -1707,6 +1707,15 @@ struct reiserfs_journal_header {
 #define journal_getblk(s, block) __getblk(SB_JOURNAL(s)->j_dev_bd, block, s->s_blocksize)
 #define journal_bread(s, block) __bread(SB_JOURNAL(s)->j_dev_bd, block, s->s_blocksize)
 
+enum reiserfs_bh_state_bits {
+    BH_JDirty = BH_PrivateStart,
+    BH_JDirty_wait,
+    BH_JNew,
+    BH_JPrepared,
+    BH_JRestore_dirty,
+    BH_JTest, // debugging only will go away
+};
+
 /*
 ** transaction handle which is passed around for all journal calls
 */
@@ -1726,7 +1735,36 @@ struct reiserfs_transaction_handle {
 				   should be displaced from others */
 } ;
 
+/* used to keep track of ordered and tail writes, attached to the buffer
+ * head through b_journal_head.
+ */
+struct reiserfs_jh {
+    struct reiserfs_journal_list *jl;
+    struct buffer_head *bh;
+    struct list_head list;
+};
+
+void reiserfs_free_jh(struct buffer_head *bh);
+int reiserfs_add_tail_list(struct inode *inode, struct buffer_head *bh);
+int reiserfs_add_ordered_list(struct inode *inode, struct buffer_head *bh);
 int journal_mark_dirty(struct reiserfs_transaction_handle *, struct super_block *, struct buffer_head *bh) ;
+
+static inline int reiserfs_transaction_running(struct super_block *s) {
+    struct reiserfs_transaction_handle *th = current->journal_info ;
+    if (th && th->t_super == s)
+        return 1 ;
+    if (th && th->t_super == NULL)
+        BUG();
+    return 0 ;
+}
+
+int reiserfs_async_progress_wait(struct super_block *s);
+
+struct reiserfs_transaction_handle *
+reiserfs_persistent_transaction(struct super_block *, int count);
+int reiserfs_end_persistent_transaction(struct reiserfs_transaction_handle *);
+int reiserfs_commit_page(struct inode *inode, struct page *page,
+		unsigned from, unsigned to);
 int reiserfs_flush_old_commits(struct super_block *);
 void reiserfs_commit_for_inode(struct inode *) ;
 void reiserfs_update_inode_transaction(struct inode *) ;
@@ -1741,7 +1779,6 @@ int journal_release(struct reiserfs_tran
 int journal_release_error(struct reiserfs_transaction_handle*, struct super_block *) ;
 int journal_end(struct reiserfs_transaction_handle *, struct super_block *, unsigned long) ;
 int journal_end_sync(struct reiserfs_transaction_handle *, struct super_block *, unsigned long) ;
-int journal_mark_dirty_nolog(struct reiserfs_transaction_handle *, struct super_block *, struct buffer_head *bh) ;
 int journal_mark_freed(struct reiserfs_transaction_handle *, struct super_block *, b_blocknr_t blocknr) ;
 int journal_transaction_should_end(struct reiserfs_transaction_handle *, int) ;
 int reiserfs_in_journal(struct super_block *p_s_sb, int bmap_nr, int bit_nr, int searchall, b_blocknr_t *next) ;
@@ -1749,11 +1786,6 @@ int journal_begin(struct reiserfs_transa
 
 int buffer_journaled(const struct buffer_head *bh) ;
 int mark_buffer_journal_new(struct buffer_head *bh) ;
-int reiserfs_add_page_to_flush_list(struct reiserfs_transaction_handle *,
-                                    struct inode *, struct buffer_head *) ;
-int reiserfs_remove_page_from_flush_list(struct reiserfs_transaction_handle *,
-                                         struct inode *) ;
-
 int reiserfs_allocate_list_bitmaps(struct super_block *s, struct reiserfs_list_bitmap *, int) ;
 
 				/* why is this kerplunked right here? */
diff -puN include/linux/reiserfs_fs_sb.h~reiserfs-jh-2 include/linux/reiserfs_fs_sb.h
--- 25/include/linux/reiserfs_fs_sb.h~reiserfs-jh-2	2004-04-06 20:57:15.727001200 -0700
+++ 25-akpm/include/linux/reiserfs_fs_sb.h	2004-04-06 20:57:15.752997248 -0700
@@ -107,21 +107,6 @@ typedef enum {
 #define JOURNAL_HASH_SIZE 8192   
 #define JOURNAL_NUM_BITMAPS 5 /* number of copies of the bitmaps to have floating.  Must be >= 2 */
 
-/* these are bh_state bit flag offset numbers, for use in the buffer head */
-
-#define BH_JDirty       16      /* journal data needs to be written before buffer can be marked dirty */
-#define BH_JDirty_wait 18	/* commit is done, buffer marked dirty */
-#define BH_JNew 19		/* buffer allocated during this transaction, no need to write if freed during this trans too */
-
-/* ugly.  metadata blocks must be prepared before they can be logged.  
-** prepared means unlocked and cleaned.  If the block is prepared, but not
-** logged for some reason, any bits cleared while preparing it must be 
-** set again.
-*/
-#define BH_JPrepared 20		/* block has been prepared for the log */
-#define BH_JRestore_dirty 22    /* restore the dirty bit later */
-#define BH_JTest 23             /* debugging use only */
-
 /* One of these for every block in every transaction
 ** Each one is in two hash tables.  First, a hash of the current transaction, and after journal_end, a
 ** hash of all the in memory transactions.
@@ -178,6 +163,11 @@ struct reiserfs_journal_list {
 
   /* time ordered list of all transactions we haven't tried to flush yet */
   struct list_head j_working_list;
+
+  /* list of tail conversion targets in need of flush before commit */
+  struct list_head j_tail_bh_list;
+  /* list of data=ordered buffers in need of flush before commit */
+  struct list_head j_bh_list;
   int j_refcount;
 } ;
 
@@ -253,7 +243,9 @@ struct reiserfs_journal {
   unsigned long j_max_trans_size ;
   unsigned long j_max_batch_size ;
 
+  /* when flushing ordered buffers, throttle new ordered writers */
   struct work_struct j_work;
+  atomic_t j_async_throttle;
 };
 
 #define JOURNAL_DESC_MAGIC "ReIsErLB" /* ick.  magic string to find desc blocks in the journal */
@@ -408,11 +400,12 @@ struct reiserfs_sb_info
 #define REISERFS_3_5 0
 #define REISERFS_3_6 1
 
+enum reiserfs_mount_options {
 /* Mount options */
-#define REISERFS_LARGETAIL 0  /* large tails will be created in a session */
-#define REISERFS_SMALLTAIL 17  /* small (for files less than block size) tails will be created in a session */
-#define REPLAYONLY 3 /* replay journal and return 0. Use by fsck */
-#define REISERFS_CONVERT 5    /* -o conv: causes conversion of old
+    REISERFS_LARGETAIL,  /* large tails will be created in a session */
+    REISERFS_SMALLTAIL,  /* small (for files less than block size) tails will be created in a session */
+    REPLAYONLY, /* replay journal and return 0. Use by fsck */
+    REISERFS_CONVERT,    /* -o conv: causes conversion of old
                                  format super block to the new
                                  format. If not specified - old
                                  partition will be dealt with in a
@@ -426,26 +419,29 @@ struct reiserfs_sb_info
 ** the existing hash on the FS, so if you have a tea hash disk, and mount
 ** with -o hash=rupasov, the mount will fail.
 */
-#define FORCE_TEA_HASH 6      /* try to force tea hash on mount */
-#define FORCE_RUPASOV_HASH 7  /* try to force rupasov hash on mount */
-#define FORCE_R5_HASH 8       /* try to force rupasov hash on mount */
-#define FORCE_HASH_DETECT 9   /* try to detect hash function on mount */
-
+    FORCE_TEA_HASH,      /* try to force tea hash on mount */
+    FORCE_RUPASOV_HASH,  /* try to force rupasov hash on mount */
+    FORCE_R5_HASH,       /* try to force rupasov hash on mount */
+    FORCE_HASH_DETECT,   /* try to detect hash function on mount */
+
+    REISERFS_DATA_LOG,
+    REISERFS_DATA_ORDERED,
+    REISERFS_DATA_WRITEBACK,
 
 /* used for testing experimental features, makes benchmarking new
    features with and without more convenient, should never be used by
    users in any code shipped to users (ideally) */
 
-#define REISERFS_NO_BORDER 11
-#define REISERFS_NO_UNHASHED_RELOCATION 12
-#define REISERFS_HASHED_RELOCATION 13
-
-#define REISERFS_ATTRS 15
-
-#define REISERFS_TEST1 11
-#define REISERFS_TEST2 12
-#define REISERFS_TEST3 13
-#define REISERFS_TEST4 14 
+    REISERFS_NO_BORDER,
+    REISERFS_NO_UNHASHED_RELOCATION,
+    REISERFS_HASHED_RELOCATION,
+    REISERFS_ATTRS,
+
+    REISERFS_TEST1,
+    REISERFS_TEST2,
+    REISERFS_TEST3,
+    REISERFS_TEST4,
+};
 
 #define reiserfs_r5_hash(s) (REISERFS_SB(s)->s_mount_opt & (1 << FORCE_R5_HASH))
 #define reiserfs_rupasov_hash(s) (REISERFS_SB(s)->s_mount_opt & (1 << FORCE_RUPASOV_HASH))
@@ -459,11 +455,12 @@ struct reiserfs_sb_info
 #define have_large_tails(s) (REISERFS_SB(s)->s_mount_opt & (1 << REISERFS_LARGETAIL))
 #define have_small_tails(s) (REISERFS_SB(s)->s_mount_opt & (1 << REISERFS_SMALLTAIL))
 #define replay_only(s) (REISERFS_SB(s)->s_mount_opt & (1 << REPLAYONLY))
-#define reiserfs_dont_log(s) (REISERFS_SB(s)->s_mount_opt & (1 << REISERFS_NOLOG))
 #define reiserfs_attrs(s) (REISERFS_SB(s)->s_mount_opt & (1 << REISERFS_ATTRS))
 #define old_format_only(s) (REISERFS_SB(s)->s_properties & (1 << REISERFS_3_5))
 #define convert_reiserfs(s) (REISERFS_SB(s)->s_mount_opt & (1 << REISERFS_CONVERT))
-
+#define reiserfs_data_log(s) (REISERFS_SB(s)->s_mount_opt & (1 << REISERFS_DATA_LOG))
+#define reiserfs_data_ordered(s) (REISERFS_SB(s)->s_mount_opt & (1 << REISERFS_DATA_ORDERED))
+#define reiserfs_data_writeback(s) (REISERFS_SB(s)->s_mount_opt & (1 << REISERFS_DATA_WRITEBACK))
 
 void reiserfs_file_buffer (struct buffer_head * bh, int list);
 extern struct file_system_type reiserfs_fs_type;

_