[pnfs] sorting out pnfs cvs

Dean Hildebrand dhildebz at eecs.umich.edu
Fri Feb 23 09:23:41 EST 2007


> 	- The patch includes some non-pnfs-related changes (increase of
> 	  io size on server, for example), and I don't know how to find
> 	  the version of the patch that was used for that, and can't
> 	  figure out how to get that information out of cvs.
>
>   
I thought Andy create several cuts, one for the base kernel, one with 
non-pnfs-related changes, and one with pnfs changes.  Is that true?

Dean

> So as a quick hack I did a diff between 2.6.18.3 and the latest cvs,
> threw out everything in the diff that didn't touch:
>
> 	Makefile
> 	fs/inode.c
> 	fs/nfs/
> 	fs/nfsd
> 	net/sunrpc/
> 	include/linux/nfs*
> 	include/linux/nfsd*
> 	include/linux/sunrpc
> 	include/linux/fs.h
>
> and applied the result to the top of a 2.6.18.3 git tree:
>
> 	git://linux-nfs.org/~bfields/exports/linux-pnfs.git
>
> Diff against 2.6.18.3 appended.
>
> Does that look right?  Corrections or better ideas?
>
> I'll set up a shared repository and make some brief instructions on how
> to use it after we figure this out.
>
> This patch is way too big, of course, so it'll need to be split up into
> logical steps at some point.
>
> --b.
>
> diff --git a/Makefile b/Makefile
> index 9eda185..d87f684 100644
> --- a/Makefile
> +++ b/Makefile
> @@ -1,7 +1,7 @@
>  VERSION = 2
>  PATCHLEVEL = 6
>  SUBLEVEL = 18
> -EXTRAVERSION = .3
> +EXTRAVERSION = .3-largeio-pnfs
>  NAME=Avast! A bilge rat!
>  
>  # *DOCUMENTATION*
> diff --git a/fs/inode.c b/fs/inode.c
> index 0bf9f04..35a057b 100644
> --- a/fs/inode.c
> +++ b/fs/inode.c
> @@ -81,6 +81,7 @@ static struct hlist_head *inode_hashtable __read_mostly;
>   * the i_state of an inode while it is in use..
>   */
>  DEFINE_SPINLOCK(inode_lock);
> +EXPORT_SYMBOL(inode_lock);
>  
>  /*
>   * iprune_mutex provides exclusion between the kswapd or try_to_free_pages
> diff --git a/fs/nfs/Makefile b/fs/nfs/Makefile
> index 0b572a0..7b60e19 100644
> --- a/fs/nfs/Makefile
> +++ b/fs/nfs/Makefile
> @@ -13,7 +13,10 @@ nfs-$(CONFIG_NFS_V3_ACL)	+= nfs3acl.o
>  nfs-$(CONFIG_NFS_V4)	+= nfs4proc.o nfs4xdr.o nfs4state.o nfs4renewd.o \
>  			   delegation.o idmap.o \
>  			   callback.o callback_xdr.o callback_proc.o \
> -			   nfs4namespace.o
> +			   nfs4namespace.o pnfs.o
>  nfs-$(CONFIG_NFS_DIRECTIO) += direct.o
>  nfs-$(CONFIG_SYSCTL) += sysctl.o
>  nfs-objs		:= $(nfs-y)
> +
> +obj-m += nfslayoutdriver.o
> +nfslayoutdriver-objs := nfs4filelayout.o nfs4filelayoutdev.o
> diff --git a/fs/nfs/callback.h b/fs/nfs/callback.h
> index b252e7f..fdb1dd8 100644
> --- a/fs/nfs/callback.h
> +++ b/fs/nfs/callback.h
> @@ -20,6 +20,7 @@ enum nfs4_callback_procnum {
>  enum nfs4_callback_opnum {
>  	OP_CB_GETATTR = 3,
>  	OP_CB_RECALL  = 4,
> +	OP_CB_LAYOUTRECALL  = 5,
>  	OP_CB_ILLEGAL = 10044,
>  };
>  
> @@ -59,8 +60,27 @@ struct cb_recallargs {
>  	uint32_t truncate;
>  };
>  
> +enum layout_recall_type {
> +	RECALL_FILE = 1,
> +	RECALL_FSID = 2,
> +	RECALL_ALL  = 3
> +};
> +
> +struct cb_pnfs_layoutrecallargs {
> +	struct sockaddr_in	*cbl_addr;
> +	struct nfs_fh		cbl_fh;
> +	uint64_t		cbl_offset;
> +	uint64_t		cbl_length;
> +	struct nfs_fsid		cbl_fsid;
> +	uint32_t		cbl_recall_type;
> +	uint32_t		cbl_layout_type;
> +	uint32_t		cbl_iomode;
> +	uint32_t		cbl_layoutchanged;
> +};
> +
>  extern unsigned nfs4_callback_getattr(struct cb_getattrargs *args, struct cb_getattrres *res);
>  extern unsigned nfs4_callback_recall(struct cb_recallargs *args, void *dummy);
> +extern unsigned nfs4_callback_pnfs_layoutrecall(struct cb_pnfs_layoutrecallargs *args, void *dummy);
>  
>  extern int nfs_callback_up(void);
>  extern int nfs_callback_down(void);
> diff --git a/fs/nfs/callback_proc.c b/fs/nfs/callback_proc.c
> index 7719483..b526bc3 100644
> --- a/fs/nfs/callback_proc.c
> +++ b/fs/nfs/callback_proc.c
> @@ -85,3 +85,49 @@ out:
>  	dprintk("%s: exit with status = %d\n", __FUNCTION__, ntohl(res));
>  	return res;
>  }
> +
> +/*
> + * Layout is not actually returned until the client executes the
> + * LAYOUTRETURN operation.
> + * The general semantics are that once a layout has been recalled,
> + * all in flight I/O ops are completed and then LAYOUTRETURN is called.
> + *
> + * XXX The layout driver needs to choose to write all buffered I/O using:
> + * 	1) the layoutdriver if still available or
> + * 	2) the NFSv4 READ/WRITE ops after the layout is returned
> + */
> +unsigned nfs4_callback_pnfs_layoutrecall(struct cb_pnfs_layoutrecallargs *args, void *dummy)
> +{
> +	struct nfs4_client *clp;
> +	struct inode *inode = NULL;
> +	unsigned res = -ENOENT;
> +
> +	res = htonl(NFS4ERR_BADHANDLE);
> +	clp = nfs4_find_client(&args->cbl_addr->sin_addr);
> +	if (clp == NULL)
> +		goto out;
> +
> +	if (args->cbl_recall_type == RECALL_FILE) {
> +		inode = nfs_layout_find_inode(clp, &args->cbl_fh);
> +		if (inode == NULL)
> +			goto out_putclient;
> +	}
> +	/* Set up a helper thread to actually return the delegation */
> +	switch(nfs_async_return_layout(clp, inode, &args->cbl_fsid)) {
> +		case 0:
> +			res = 0;
> +			break;
> +		case -ENOENT:
> +			res = htonl(NFS4ERR_NOENT);
> +			break;
> +		default:
> +			res = htonl(NFS4ERR_RESOURCE);
> +	}
> +	if (inode)
> +		iput(inode);
> +out_putclient:
> +	nfs4_put_client(clp);
> +out:
> +	dprintk("%s: exit with status = %d\n", __FUNCTION__, ntohl(res));
> +	return res;
> +}
> diff --git a/fs/nfs/callback_xdr.c b/fs/nfs/callback_xdr.c
> index 29f9321..82d9cf3 100644
> --- a/fs/nfs/callback_xdr.c
> +++ b/fs/nfs/callback_xdr.c
> @@ -19,9 +19,15 @@
>  				CB_OP_GETATTR_BITMAP_MAXSZ + \
>  				2 + 2 + 3 + 3)
>  #define CB_OP_RECALL_RES_MAXSZ	(CB_OP_HDR_RES_MAXSZ)
> +#define CB_OP_LAYOUTRECALL_RES_MAXSZ	(CB_OP_HDR_RES_MAXSZ)
>  
>  #define NFSDBG_FACILITY NFSDBG_CALLBACK
>  
> +#define READ64(x)         do {			\
> +	(x) = (u64)ntohl(*p++) << 32;		\
> +	(x) |= ntohl(*p++);			\
> +} while (0)
> +
>  typedef unsigned (*callback_process_op_t)(void *, void *);
>  typedef unsigned (*callback_decode_arg_t)(struct svc_rqst *, struct xdr_stream *, void *);
>  typedef unsigned (*callback_encode_res_t)(struct svc_rqst *, struct xdr_stream *, void *);
> @@ -204,6 +210,42 @@ out:
>  	return status;
>  }
>  
> +static unsigned decode_pnfs_layoutrecall_args(struct svc_rqst *rqstp, struct xdr_stream *xdr, struct cb_pnfs_layoutrecallargs *args)
> +{
> +	uint32_t *p;
> +	unsigned status = 0;
> +
> +	args->cbl_addr = &rqstp->rq_addr;
> +	p = read_buf(xdr, 4 * sizeof(uint32_t));
> +
> +	args->cbl_layout_type = ntohl(*p++);
> +	args->cbl_iomode = ntohl(*p++);
> +	args->cbl_layoutchanged = ntohl(*p++);
> +	args->cbl_recall_type = ntohl(*p++);
> +	
> +        if (args->cbl_recall_type == RECALL_FSID) {
> +		p = read_buf(xdr, 2 * sizeof(uint64_t));
> +		READ64(args->cbl_fsid.major);
> +		READ64(args->cbl_fsid.minor);
> +        }
> +        else if (args->cbl_recall_type == RECALL_FILE) {
> +		status = decode_fh(xdr, &args->cbl_fh);
> +		if (unlikely(status != 0))
> +			goto out;
> +		
> +		p = read_buf(xdr, 2 * sizeof(uint64_t));
> +		READ64(args->cbl_offset);
> +		READ64(args->cbl_length);
> +	}
> +	dprintk("%s: ltype %d iomode %d changed %d recall_type %d fsid %llx-%llx\n",
> +		__FUNCTION__, args->cbl_layout_type, args->cbl_iomode,
> +		args->cbl_layoutchanged, args->cbl_recall_type,
> +		args->cbl_fsid.major, args->cbl_fsid.minor);
> +out:
> +	dprintk("%s: exit with status = %d\n", __FUNCTION__, status);
> +	return 0;
> +}
> +
>  static unsigned encode_string(struct xdr_stream *xdr, unsigned int len, const char *str)
>  {
>  	uint32_t *p;
> @@ -369,6 +411,7 @@ static unsigned process_op(struct svc_rqst *rqstp,
>  		switch (op_nr) {
>  			case OP_CB_GETATTR:
>  			case OP_CB_RECALL:
> +			case OP_CB_LAYOUTRECALL:
>  				op = &callback_ops[op_nr];
>  				break;
>  			default:
> @@ -452,6 +495,11 @@ static struct callback_op callback_ops[] = {
>  		.process_op = (callback_process_op_t)nfs4_callback_recall,
>  		.decode_args = (callback_decode_arg_t)decode_recall_args,
>  		.res_maxsize = CB_OP_RECALL_RES_MAXSZ,
> +	},
> +	[OP_CB_LAYOUTRECALL] = {
> +		.process_op = (callback_process_op_t)nfs4_callback_pnfs_layoutrecall,
> +		.decode_args = (callback_decode_arg_t)decode_pnfs_layoutrecall_args,
> +		.res_maxsize = CB_OP_LAYOUTRECALL_RES_MAXSZ,
>  	}
>  };
>  
> diff --git a/fs/nfs/delegation.c b/fs/nfs/delegation.c
> index 9540a31..4e6c8bb 100644
> --- a/fs/nfs/delegation.c
> +++ b/fs/nfs/delegation.c
> @@ -15,9 +15,13 @@
>  #include <linux/nfs4.h>
>  #include <linux/nfs_fs.h>
>  #include <linux/nfs_xdr.h>
> +#include <linux/writeback.h>
>  
>  #include "nfs4_fs.h"
>  #include "delegation.h"
> +#include "pnfs.h"
> +
> +#define NFSDBG_FACILITY NFSDBG_CALLBACK
>  
>  static struct nfs_delegation *nfs_alloc_delegation(void)
>  {
> @@ -306,6 +310,15 @@ struct recall_threadargs {
>  	int result;
>  };
>  
> +struct recall_layout_threadargs {
> +	struct inode *inode;
> +	struct nfs4_client *clp;
> +	const nfs4_stateid *stateid;
> +        struct nfs_fsid fsid;
> +	struct completion started;
> +	int result;
> +};
> +
>  static int recall_thread(void *data)
>  {
>  	struct recall_threadargs *args = (struct recall_threadargs *)data;
> @@ -345,6 +358,80 @@ static int recall_thread(void *data)
>  	module_put_and_exit(0);
>  }
>  
> +static int recall_layout_thread(void *data)
> +{
> +	struct inode *inode;
> +	struct nfs4_client *clp;
> +	struct nfs_server *server= NULL;
> +	struct super_block *sb = NULL;
> +	struct recall_layout_threadargs *args = (struct recall_layout_threadargs *)data;
> +	int found = 0;
> +	
> +	daemonize("nfsv4-layoutreturn");
> +
> +	dprintk("%s: fsid 0x%llx-0x%llx start\n",
> +		__FUNCTION__, args->fsid.major, args->fsid.minor);
> +
> +	clp = args->clp;
> +	args->result = 0;
> +	complete(&args->started);
> +
> +//??? commit the files first ???
> +
> +	if (args->inode != NULL) {
> +		pnfs_return_layout(args->inode);
> +		goto out;
> +	}
> +
> +	down_read(&clp->cl_sem);
> +	list_for_each_entry(server, &clp->cl_superblocks, nfs4_siblings) {
> +		dprintk("%s: fsid 0x%llx-0x%llx 0x%llx-0x%llx\n",
> +			__FUNCTION__, args->fsid.major, args->fsid.minor,
> +			server->fsid.major, server->fsid.minor);
> +			
> +		if (server->fsid.major == args->fsid.major &&
> +			server->fsid.minor == args->fsid.minor) {
> +			found = 1;
> +			break;
> +		}
> +	}
> +	
> +	up_read(&clp->cl_sem);
> +	
> +        if (found) {
> +        	sb = server->sb;
> +		if (!sb)
> +			goto out;
> +	}
> +	else
> +		goto out;
> +	
> +	/* XXX UGLY UGLY hack alert! */
> +	do {
> +		found = 0;
> +		spin_lock(&inode_lock);
> +		list_for_each_entry(inode, &sb->s_inodes, i_sb_list) {
> +			if (NFS_I(inode)->current_layout) {
> +				found = 1;
> +				break;
> +			}
> +		}
> +		spin_unlock(&inode_lock);
> +
> +		if (found) {
> +			igrab(inode);
> +			pnfs_return_layout(inode);
> +			iput(inode);
> +		}
> +		
> +	} while(found);
> +
> +out:
> +	module_put_and_exit(0);
> +	printk("%s: exit status %d\n", __FUNCTION__, args->result);
> +	return 0;
> +}
> +
>  /*
>   * Asynchronous delegation recall!
>   */
> @@ -369,6 +456,30 @@ out_module_put:
>  }
>  
>  /*
> + * Asynchronous layout recall!
> + */
> +int nfs_async_return_layout(struct nfs4_client *clp, struct inode *inode, struct nfs_fsid *fsid)
> +{
> +	struct recall_layout_threadargs data = {
> +		.clp = clp,
> +		.inode = inode,
> +		.fsid = *fsid,
> +	};
> +	int status;
> +
> +	init_completion(&data.started);
> +	__module_get(THIS_MODULE);
> +	status = kernel_thread(recall_layout_thread, &data, CLONE_KERNEL);
> +	if (status < 0)
> +		goto out_module_put;
> +	wait_for_completion(&data.started);
> +	return data.result;
> +out_module_put:
> +	module_put(THIS_MODULE);
> +	return status;
> +}
> +
> +/*
>   * Retrieve the inode associated with a delegation
>   */
>  struct inode *nfs_delegation_find_inode(struct nfs4_client *clp, const struct nfs_fh *fhandle)
> @@ -387,6 +498,31 @@ struct inode *nfs_delegation_find_inode(struct nfs4_client *clp, const struct nf
>  }
>  
>  /*
> + * Retrieve the inode associated with a layout
> + */
> +struct inode *nfs_layout_find_inode(struct nfs4_client *clp, const struct nfs_fh *fhandle)
> +{
> +	struct nfs4_state_owner *sp;
> +	struct nfs4_state *state;
> +	struct inode *res = NULL;
> +
> +	/* Reset all sequence ids to zero */
> +	list_for_each_entry(sp, &clp->cl_state_owners, so_list) {
> +		spin_lock(&sp->so_lock);
> +		list_for_each_entry(state, &sp->so_states, open_states) {
> +			if (nfs_compare_fh(fhandle, &NFS_I(state->inode)->fh) == 0) {
> +				res = igrab(state->inode);
> +				break;
> +			}
> +		}
> +		spin_unlock(&sp->so_lock);
> +		if (res)
> +			break;
> +	}
> +	return res;
> +}
> +
> +/*
>   * Mark all delegations as needing to be reclaimed
>   */
>  void nfs_delegation_mark_reclaim(struct nfs4_client *clp)
> diff --git a/fs/nfs/delegation.h b/fs/nfs/delegation.h
> index 3858694..a594131 100644
> --- a/fs/nfs/delegation.h
> +++ b/fs/nfs/delegation.h
> @@ -28,8 +28,10 @@ int nfs_inode_set_delegation(struct inode *inode, struct rpc_cred *cred, struct
>  void nfs_inode_reclaim_delegation(struct inode *inode, struct rpc_cred *cred, struct nfs_openres *res);
>  int __nfs_inode_return_delegation(struct inode *inode);
>  int nfs_async_inode_return_delegation(struct inode *inode, const nfs4_stateid *stateid);
> +int nfs_async_return_layout(struct nfs4_client *clp, struct inode *inode, struct nfs_fsid *fsid);
>  
>  struct inode *nfs_delegation_find_inode(struct nfs4_client *clp, const struct nfs_fh *fhandle);
> +struct inode *nfs_layout_find_inode(struct nfs4_client *clp, const struct nfs_fh *fhandle);
>  void nfs_return_all_delegations(struct super_block *sb);
>  void nfs_expire_all_delegations(struct nfs4_client *clp);
>  void nfs_handle_cb_pathdown(struct nfs4_client *clp);
> diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c
> index 76ca1cb..e4ddfe4 100644
> --- a/fs/nfs/direct.c
> +++ b/fs/nfs/direct.c
> @@ -55,6 +55,7 @@
>  #include <asm/atomic.h>
>  
>  #include "iostat.h"
> +#include "pnfs.h"
>  
>  #define NFSDBG_FACILITY		NFSDBG_VFS
>  
> @@ -74,6 +75,8 @@ struct nfs_direct_req {
>  	/* completion state */
>  	atomic_t		io_count;	/* i/os we're waiting for */
>  	spinlock_t		lock;		/* protect completion state */
> +	size_t			user_count;	/* total bytes to move */
> +	loff_t			pos;		/* starting offset in file */
>  	ssize_t			count,		/* bytes actually processed */
>  				error;		/* any reported error */
>  	struct completion	completion;	/* wait for i/o completion */
> @@ -242,7 +245,12 @@ static void nfs_direct_read_result(struct rpc_task *task, void *calldata)
>  
>  static const struct rpc_call_ops nfs_read_direct_ops = {
>  	.rpc_call_done = nfs_direct_read_result,
> -	.rpc_release = nfs_readdata_release,
> +#ifdef CONFIG_NFS_V4
> +	.rpc_release = nfs4_readdata_release,
> +#else
> +        .rpc_release = nfs_readdata_release,
> +#endif
> +
>  };
>  
>  /*
> @@ -256,13 +264,19 @@ static ssize_t nfs_direct_read_schedule(struct nfs_direct_req *dreq, unsigned lo
>  {
>  	struct nfs_open_context *ctx = dreq->ctx;
>  	struct inode *inode = ctx->dentry->d_inode;
> +#if defined(CONFIG_NFS_V4)
> +	size_t rsize = NFS_SERVER(inode)->ds_rsize;
> +#else
>  	size_t rsize = NFS_SERVER(inode)->rsize;
> +#endif
>  	unsigned int pgbase;
>  	int result;
>  	ssize_t started = 0;
>  
>  	get_dreq(dreq);
>  
> +	dreq->user_count = count;
> +	dreq->pos = pos;
>  	do {
>  		struct nfs_read_data *data;
>  		size_t bytes;
> @@ -271,7 +285,16 @@ static ssize_t nfs_direct_read_schedule(struct nfs_direct_req *dreq, unsigned lo
>  		bytes = min(rsize,count);
>  
>  		result = -ENOMEM;
> +
> +#ifdef CONFIG_NFS_V4
> +               if (NFS_PROTO(inode)->setup_sequence)
> +                       data = nfs4_readdata_alloc(pgbase + bytes);
> +               else
> +                       data = nfs_readdata_alloc(pgbase + bytes);
> +#else
>  		data = nfs_readdata_alloc(pgbase + bytes);
> +#endif
> +
>  		if (unlikely(!data))
>  			break;
>  
> @@ -282,7 +305,15 @@ static ssize_t nfs_direct_read_schedule(struct nfs_direct_req *dreq, unsigned lo
>  		if (unlikely(result < data->npages)) {
>  			if (result > 0)
>  				nfs_direct_release_pages(data->pagevec, result);
> -			nfs_readdata_release(data);
> +
> +#ifdef CONFIG_NFS_V4
> +                       if (NFS_PROTO(inode)->setup_sequence)
> +                               nfs4_readdata_release(data);
> +                       else
> +                               nfs_readdata_release(data);
> +#else
> +                        nfs_readdata_release(data);
> +#endif
>  			break;
>  		}
>  
> @@ -301,22 +332,29 @@ static ssize_t nfs_direct_read_schedule(struct nfs_direct_req *dreq, unsigned lo
>  		data->res.eof = 0;
>  		data->res.count = bytes;
>  
> -		rpc_init_task(&data->task, NFS_CLIENT(inode), RPC_TASK_ASYNC,
> +		/* Only create an rpc request if utilizing NFSv4 I/O */
> +		if (!pnfs_use_read(inode, dreq->user_count)) {
> +			rpc_init_task(&data->task, NFS_CLIENT(inode), RPC_TASK_ASYNC,
>  				&nfs_read_direct_ops, data);
> -		NFS_PROTO(inode)->read_setup(data);
> +			NFS_PROTO(inode)->read_setup(data);
>  
> -		data->task.tk_cookie = (unsigned long) inode;
> +			data->task.tk_cookie = (unsigned long) inode;
>  
> -		lock_kernel();
> -		rpc_execute(&data->task);
> -		unlock_kernel();
> +			lock_kernel();
> +			rpc_execute(&data->task);
> +			unlock_kernel();
>  
> -		dfprintk(VFS, "NFS: %5u initiated direct read call (req %s/%Ld, %zu bytes @ offset %Lu)\n",
> +			dfprintk(VFS, "NFS: %5u initiated direct read call (req %s/%Ld, %zu bytes @ offset %Lu)\n",
>  				data->task.tk_pid,
>  				inode->i_sb->s_id,
>  				(long long)NFS_FILEID(inode),
>  				bytes,
>  				(unsigned long long)data->args.offset);
> +		} else {
> +			dprintk("%s Using pNFS direct read\n",__FUNCTION__);
> +			data->call_ops = &nfs_read_direct_ops;
> +			pnfs_readpages(data);
> +		}
>  
>  		started += bytes;
>  		user_addr += bytes;
> @@ -357,8 +395,21 @@ static ssize_t nfs_direct_read(struct kiocb *iocb, unsigned long user_addr, size
>  	nfs_add_stats(inode, NFSIOS_DIRECTREADBYTES, count);
>  	rpc_clnt_sigmask(clnt, &oldset);
>  	result = nfs_direct_read_schedule(dreq, user_addr, count, pos);
> -	if (!result)
> +	if (result !=0)
> +		goto out;
> +	if (pnfs_use_nfsv4_rproto(inode, count))
>  		result = nfs_direct_wait(dreq);
> +	else {
> +		/* TODO: Do I need a new pNFS callback to wait
> +		* on outstanding requests?  How do I identify
> +		* to the layout driver that they are all part
> +		* of the same overall o_direct request.  For now
> +		* assume I/O is sync.
> +		*/
> +		result = dreq->count;
> +		kref_put(&dreq->kref, nfs_direct_req_release);
> +	}
> +out:
>  	rpc_clnt_sigunmask(clnt, &oldset);
>  
>  	return result;
> @@ -429,9 +480,19 @@ static void nfs_direct_commit_result(struct rpc_task *task, void *calldata)
>  	struct nfs_write_data *data = calldata;
>  	struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req;
>  
> +	dprintk( "%s Begin\n", __FUNCTION__);
> +
>  	/* Call the NFS version-specific code */
>  	if (NFS_PROTO(data->inode)->commit_done(task, data) != 0)
>  		return;
> +
> +	/* TODO: Non-nfsv4 LD's don't handle re-execution well yet since
> +	* pnfs callback functions don't know the reexecution is
> +	* happening.
> +	*/
> +	if (!pnfs_use_nfsv4_wproto(data->inode, dreq->user_count))
> +		goto complete;
> +
>  	if (unlikely(task->tk_status < 0)) {
>  		dreq->error = task->tk_status;
>  		dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
> @@ -441,8 +502,16 @@ static void nfs_direct_commit_result(struct rpc_task *task, void *calldata)
>  		dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
>  	}
>  
> +complete:
>  	dprintk("NFS: %5u commit returned %d\n", task->tk_pid, task->tk_status);
> +#if defined(CONFIG_NFS_V4)
> +	/* Set flag indicating we need a layout commit */
> +	if (task->tk_status >= 0 && pnfs_use_write(data->inode, data->args.count)) {
> +		pnfs_need_layoutcommit(NFS_I(data->inode), data->args.context);
> +	}
> +#endif
>  	nfs_direct_write_complete(dreq, data->inode);
> +	dprintk( "%s End\n", __FUNCTION__);
>  }
>  
>  static const struct rpc_call_ops nfs_commit_direct_ops = {
> @@ -454,6 +523,8 @@ static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq)
>  {
>  	struct nfs_write_data *data = dreq->commit_data;
>  
> +	dprintk( "%s Begin\n", __FUNCTION__);
> +
>  	data->inode = dreq->inode;
>  	data->cred = dreq->ctx->cred;
>  
> @@ -464,26 +535,34 @@ static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq)
>  	data->res.fattr = &data->fattr;
>  	data->res.verf = &data->verf;
>  
> -	rpc_init_task(&data->task, NFS_CLIENT(dreq->inode), RPC_TASK_ASYNC,
> +	/* Do pNFS specific commit if needed */
> +	if (!pnfs_use_write(data->inode, dreq->user_count)) {
> +		rpc_init_task(&data->task, NFS_CLIENT(dreq->inode), RPC_TASK_ASYNC,
>  				&nfs_commit_direct_ops, data);
> -	NFS_PROTO(data->inode)->commit_setup(data, 0);
> +		NFS_PROTO(data->inode)->commit_setup(data, 0);
>  
> -	data->task.tk_priority = RPC_PRIORITY_NORMAL;
> -	data->task.tk_cookie = (unsigned long)data->inode;
> -	/* Note: task.tk_ops->rpc_release will free dreq->commit_data */
> -	dreq->commit_data = NULL;
> +		data->task.tk_priority = RPC_PRIORITY_NORMAL;
> +		data->task.tk_cookie = (unsigned long)data->inode;
> +		/* Note: task.tk_ops->rpc_release will free dreq->commit_data */
> +		dreq->commit_data = NULL;
>  
> -	dprintk("NFS: %5u initiated commit call\n", data->task.tk_pid);
> +		dprintk("NFS: %5u initiated commit call\n", data->task.tk_pid);
>  
> -	lock_kernel();
> -	rpc_execute(&data->task);
> -	unlock_kernel();
> +		lock_kernel();
> +		rpc_execute(&data->task);
> +		unlock_kernel();
> +	} else {
> +		data->call_ops = &nfs_commit_direct_ops;
> +		pnfs_commit(data->inode, NULL, RPC_TASK_ASYNC, data);
> +	}
>  }
>  
>  static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode)
>  {
>  	int flags = dreq->flags;
>  
> +	dprintk( "%s Begin (flags %d)\n", __FUNCTION__, flags);
> +
>  	dreq->flags = 0;
>  	switch (flags) {
>  		case NFS_ODIRECT_DO_COMMIT:
> @@ -493,9 +572,26 @@ static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode
>  			nfs_direct_write_reschedule(dreq);
>  			break;
>  		default:
> +                       dprintk( "%s complete commit\n", __FUNCTION__);
> +#if defined(CONFIG_NFS_V4)
> +			/* pNFS: Update last byte written field for
> +			* layout commit.  User user_count in pnfs_user_write
> +			* since it was used originally.  Use count to
> +			* update last byte since that is the amount written.
> +			*/
> +			if (dreq->count > 0 &&
> +			    pnfs_use_write(dreq->inode, dreq->user_count))
> +				pnfs_update_last_write(NFS_I(dreq->inode),
> +								dreq->pos,
> +								dreq->count);
> +#endif
>  			nfs_end_data_update(inode);
>  			if (dreq->commit_data != NULL)
> -				nfs_commit_free(dreq->commit_data);
> +#ifdef CONFIG_NFS_V4
> +                               nfs4_commit_free(dreq->commit_data);
> +#else
> +                                nfs_commit_free(dreq->commit_data);
> +#endif
>  			nfs_direct_free_writedata(dreq);
>  			nfs_direct_complete(dreq);
>  	}
> @@ -503,7 +599,11 @@ static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode
>  
>  static void nfs_alloc_commit_data(struct nfs_direct_req *dreq)
>  {
> -	dreq->commit_data = nfs_commit_alloc();
> +#ifdef CONFIG_NFS_V4
> +       dreq->commit_data = nfs4_commit_alloc();
> +#else
> +        dreq->commit_data = nfs_commit_alloc();
> +#endif
>  	if (dreq->commit_data != NULL)
>  		dreq->commit_data->req = (struct nfs_page *) dreq;
>  }
> @@ -537,8 +637,9 @@ static void nfs_direct_write_result(struct rpc_task *task, void *calldata)
>  	else
>  		dreq->error = task->tk_status;
>  
> -	if (data->res.verf->committed != NFS_FILE_SYNC) {
> -		switch (dreq->flags) {
> +	if (pnfs_use_nfsv4_wproto(data->inode, dreq->user_count)) {
> +		if (data->res.verf->committed != NFS_FILE_SYNC) {
> +			switch (dreq->flags) {
>  			case 0:
>  				memcpy(&dreq->verf, &data->verf, sizeof(dreq->verf));
>  				dreq->flags = NFS_ODIRECT_DO_COMMIT;
> @@ -548,7 +649,11 @@ static void nfs_direct_write_result(struct rpc_task *task, void *calldata)
>  					dprintk("NFS: %5u write verify failed\n", task->tk_pid);
>  					dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
>  				}
> +			}
>  		}
> +	} else if (data->args.stable != NFS_FILE_SYNC) {
> +		/* Set commit flag if the write wasn't stable. */
> +		dreq->flags = NFS_ODIRECT_DO_COMMIT;
>  	}
>  
>  	spin_unlock(&dreq->lock);
> @@ -587,9 +692,11 @@ static ssize_t nfs_direct_write_schedule(struct nfs_direct_req *dreq, unsigned l
>  	unsigned int pgbase;
>  	int result;
>  	ssize_t started = 0;
> -
> +	
>  	get_dreq(dreq);
>  
> +	dreq->user_count = count;
> +	dreq->pos = pos;
>  	do {
>  		struct nfs_write_data *data;
>  		size_t bytes;
> @@ -598,7 +705,12 @@ static ssize_t nfs_direct_write_schedule(struct nfs_direct_req *dreq, unsigned l
>  		bytes = min(wsize,count);
>  
>  		result = -ENOMEM;
> -		data = nfs_writedata_alloc(pgbase + bytes);
> +#ifdef CONFIG_NFS_V4
> +                data = nfs4_writedata_alloc(pgbase + bytes);
> +#else
> +                data = nfs_writedata_alloc(pgbase + bytes);
> +#endif
> +
>  		if (unlikely(!data))
>  			break;
>  
> @@ -630,24 +742,39 @@ static ssize_t nfs_direct_write_schedule(struct nfs_direct_req *dreq, unsigned l
>  		data->res.count = bytes;
>  		data->res.verf = &data->verf;
>  
> -		rpc_init_task(&data->task, NFS_CLIENT(inode), RPC_TASK_ASYNC,
> +		/* Only create an rpc request if utilizing NFSv4 I/O */
> +		if (!pnfs_use_write(inode, dreq->user_count)) {
> +			rpc_init_task(&data->task, NFS_CLIENT(inode), RPC_TASK_ASYNC,
>  				&nfs_write_direct_ops, data);
> -		NFS_PROTO(inode)->write_setup(data, sync);
> +			NFS_PROTO(inode)->write_setup(data, sync);
>  
> -		data->task.tk_priority = RPC_PRIORITY_NORMAL;
> -		data->task.tk_cookie = (unsigned long) inode;
> +			data->task.tk_priority = RPC_PRIORITY_NORMAL;
> +			data->task.tk_cookie = (unsigned long) inode;
>  
> -		lock_kernel();
> -		rpc_execute(&data->task);
> -		unlock_kernel();
> +			lock_kernel();
> +			rpc_execute(&data->task);
> +			unlock_kernel();
>  
> -		dfprintk(VFS, "NFS: %5u initiated direct write call (req %s/%Ld, %zu bytes @ offset %Lu)\n",
> +			dfprintk(VFS, "NFS: %5u initiated direct write call (req %s/%Ld, %zu bytes @ offset %Lu)\n",
>  				data->task.tk_pid,
>  				inode->i_sb->s_id,
>  				(long long)NFS_FILEID(inode),
>  				bytes,
>  				(unsigned long long)data->args.offset);
> -
> +              } else {
> +		/* Set stable arg. (from nfs4_proc_write_setup) */
> +			int stable;
> +			if (sync & FLUSH_STABLE) {
> +				if (!NFS_I(inode)->ncommit)
> +					stable = NFS_FILE_SYNC;
> +				else
> +					stable = NFS_DATA_SYNC;
> +			} else
> +				stable = NFS_UNSTABLE;
> +				data->args.stable = stable;
> +				data->call_ops = &nfs_write_direct_ops;
> +				pnfs_writepages(data, sync);
> +		}
>  		started += bytes;
>  		user_addr += bytes;
>  		pos += bytes;
> @@ -675,7 +802,11 @@ static ssize_t nfs_direct_write(struct kiocb *iocb, unsigned long user_addr, siz
>  	struct inode *inode = iocb->ki_filp->f_mapping->host;
>  	struct rpc_clnt *clnt = NFS_CLIENT(inode);
>  	struct nfs_direct_req *dreq;
> +#if defined(CONFIG_NFS_V4)
> +	size_t wsize = NFS_SERVER(inode)->ds_wsize;
> +#else
>  	size_t wsize = NFS_SERVER(inode)->wsize;
> +#endif
>  	int sync = 0;
>  
>  	dreq = nfs_direct_req_alloc();
> @@ -697,8 +828,21 @@ static ssize_t nfs_direct_write(struct kiocb *iocb, unsigned long user_addr, siz
>  
>  	rpc_clnt_sigmask(clnt, &oldset);
>  	result = nfs_direct_write_schedule(dreq, user_addr, count, pos, sync);
> -	if (!result)
> +	if (result != 0)
> +		goto out;
> +	if (pnfs_use_nfsv4_wproto(inode, count))
>  		result = nfs_direct_wait(dreq);
> +	else {
> +		/* TODO: Do I need a new pNFS callback to wait
> +		* on outstanding requests?  How do I identify
> +		* to the layout driver that they are all part
> +		* of the same overall o_direct request.  For now
> +		* assume I/O is sync.
> +		*/
> +		result = dreq->count;
> +		kref_put(&dreq->kref, nfs_direct_req_release);
> +	}
> +out:
>  	rpc_clnt_sigunmask(clnt, &oldset);
>  
>  	return result;
> diff --git a/fs/nfs/file.c b/fs/nfs/file.c
> index 48e8928..817eba5 100644
> --- a/fs/nfs/file.c
> +++ b/fs/nfs/file.c
> @@ -33,6 +33,7 @@
>  
>  #include "delegation.h"
>  #include "iostat.h"
> +#include "pnfs.h"
>  
>  #define NFSDBG_FACILITY		NFSDBG_FILE
>  
> @@ -44,22 +45,39 @@ static ssize_t nfs_file_sendfile(struct file *, loff_t *, size_t, read_actor_t,
>  static ssize_t nfs_file_read(struct kiocb *, char __user *, size_t, loff_t);
>  static ssize_t nfs_file_write(struct kiocb *, const char __user *, size_t, loff_t);
>  static int  nfs_file_flush(struct file *, fl_owner_t id);
> -static int  nfs_fsync(struct file *, struct dentry *dentry, int datasync);
> +int  nfs_fsync(struct file *, struct dentry *dentry, int datasync);
>  static int nfs_check_flags(int flags);
>  static int nfs_lock(struct file *filp, int cmd, struct file_lock *fl);
>  static int nfs_flock(struct file *filp, int cmd, struct file_lock *fl);
>  
>  const struct file_operations nfs_file_operations = {
>  	.llseek		= nfs_file_llseek,
> -	.read		= do_sync_read,
> -	.write		= do_sync_write,
> -	.aio_read		= nfs_file_read,
> -	.aio_write		= nfs_file_write,
> +	.read           = do_sync_read,
> +	.write          = do_sync_write,
> +	.fsync		= nfs_fsync,
> +	.aio_read	= nfs_file_read,
> +	.aio_write	= nfs_file_write,
> +	.mmap		= nfs_file_mmap,
> +	.open		= nfs_file_open,
> +	.flush		= nfs_file_flush,
> +	.release	= nfs_file_release,
> +	.lock		= nfs_lock,
> +	.flock		= nfs_flock,
> +	.sendfile	= nfs_file_sendfile,
> +	.check_flags	= nfs_check_flags,
> +};
> +
> +const struct file_operations pnfs_file_operations = {
> +	.llseek		= nfs_file_llseek,
> +	.read           = pnfs_file_read,
> +	.write          = pnfs_file_write,
> +	.fsync		= pnfs_fsync,
> +	.aio_read	= nfs_file_read,
> +	.aio_write	= nfs_file_write,
>  	.mmap		= nfs_file_mmap,
>  	.open		= nfs_file_open,
>  	.flush		= nfs_file_flush,
>  	.release	= nfs_file_release,
> -	.fsync		= nfs_fsync,
>  	.lock		= nfs_lock,
>  	.flock		= nfs_flock,
>  	.sendfile	= nfs_file_sendfile,
> @@ -257,7 +275,7 @@ nfs_file_mmap(struct file * file, struct vm_area_struct * vma)
>   * The return status from this call provides a reliable indication of
>   * whether any write errors occurred for this process.
>   */
> -static int
> +int
>  nfs_fsync(struct file *file, struct dentry *dentry, int datasync)
>  {
>  	struct nfs_open_context *ctx = (struct nfs_open_context *)file->private_data;
> diff --git a/fs/nfs/inode.c b/fs/nfs/inode.c
> index d349fb2..16b20d7 100644
> --- a/fs/nfs/inode.c
> +++ b/fs/nfs/inode.c
> @@ -37,6 +37,7 @@
>  #include <linux/vfs.h>
>  #include <linux/inet.h>
>  #include <linux/nfs_xdr.h>
> +#include <linux/nfs4_pnfs.h>
>  
>  #include <asm/system.h>
>  #include <asm/uaccess.h>
> @@ -46,6 +47,7 @@
>  #include "delegation.h"
>  #include "iostat.h"
>  #include "internal.h"
> +#include "pnfs.h"
>  
>  #define NFSDBG_FACILITY		NFSDBG_VFS
>  #define NFS_PARANOIA 1
> @@ -244,7 +246,7 @@ nfs_fhget(struct super_block *sb, struct nfs_fh *fh, struct nfs_fattr *fattr)
>  		 */
>  		inode->i_op = NFS_SB(sb)->rpc_ops->file_inode_ops;
>  		if (S_ISREG(inode->i_mode)) {
> -			inode->i_fop = &nfs_file_operations;
> +			inode->i_fop = NFS_SB(sb)->rpc_ops->file_ops;
>  			inode->i_data.a_ops = &nfs_file_aops;
>  			inode->i_data.backing_dev_info = &NFS_SB(sb)->backing_dev_info;
>  		} else if (S_ISDIR(inode->i_mode)) {
> @@ -903,13 +905,23 @@ static int nfs_update_inode(struct inode *inode, struct nfs_fattr *fattr)
>  			&& !nfs_fsid_equal(&server->fsid, &fattr->fsid))
>  		server->fsid = fattr->fsid;
>  
> +#ifdef CONFIG_NFS_V4 /* XXX CONFIG_PNFS */
> +	/*
> +	 * file needs layout commit, server attributes may be stale
> +	 */
> +	if (nfsi->layoutcommit_ctx && nfsi->change_attr >= fattr->change_attr) {
> +		dprintk("NFS: %s: layoutcommit is needed for file %s/%ld\n",
> +		        __FUNCTION__, inode->i_sb->s_id, inode->i_ino);
> +		return 0;
> +	}
> +#endif /* CONFIG_NFS_V4 */
> +
>  	/*
>  	 * Update the read time so we don't revalidate too often.
>  	 */
>  	nfsi->read_cache_jiffies = fattr->time_start;
>  	nfsi->last_updated = jiffies;
>  
> -	/* Are we racing with known updates of the metadata on the server? */
>  	data_stable = nfs_verify_change_attribute(inode, fattr->time_start);
>  	if (data_stable)
>  		nfsi->cache_validity &= ~(NFS_INO_INVALID_ATTR|NFS_INO_REVAL_PAGECACHE|NFS_INO_INVALID_ATIME);
> @@ -925,8 +937,16 @@ static int nfs_update_inode(struct inode *inode, struct nfs_fattr *fattr)
>  		if (nfsi->npages == 0) {
>  			/* No, but did we race with nfs_end_data_update()? */
>  			if (data_stable) {
> +#ifdef CONFIG_NFS_V4 /* XXX CONFIG_PNFS */
> +				/* File could be have been updated by other pnfs clients */
> +				if (!nfsi->layoutcommit_ctx || new_isize > cur_isize) {
> +					inode->i_size = new_isize;
> +					invalid |= NFS_INO_INVALID_DATA;
> +				}
> +#else /* CONFIG_NFS_V4 */
>  				inode->i_size = new_isize;
>  				invalid |= NFS_INO_INVALID_DATA;
> +#endif /* CONFIG_NFS_V4 */
>  			}
>  			invalid |= NFS_INO_INVALID_ATTR;
>  		} else if (new_isize > cur_isize) {
> @@ -934,8 +954,13 @@ static int nfs_update_inode(struct inode *inode, struct nfs_fattr *fattr)
>  			invalid |= NFS_INO_INVALID_ATTR|NFS_INO_INVALID_DATA;
>  		}
>  		nfsi->cache_change_attribute = jiffies;
> -		dprintk("NFS: isize change on server for file %s/%ld\n",
> -				inode->i_sb->s_id, inode->i_ino);
> +		dprintk("NFS: isize change on server for file %s/%ld "
> +		        "new=%lld cur=%lld npages=%d data_stable=%d "
> +		        "layoutcommit=%d fattr->change_attr %lld nfsi->change_attr %lld\n",
> +		        inode->i_sb->s_id, inode->i_ino,
> +		        new_isize, cur_isize, nfsi->npages, data_stable,
> +		        nfsi->layoutcommit_ctx != NULL,
> +		        fattr->change_attr, nfsi->change_attr);
>  	}
>  
>  	/* Check if the mtime agrees */
> @@ -1046,6 +1071,10 @@ void nfs4_clear_inode(struct inode *inode)
>  	nfs_inode_return_delegation(inode);
>  	/* First call standard NFS clear_inode() code */
>  	nfs_clear_inode(inode);
> +
> +	/* Return the layout and free it if this inode has a cached layout */
> +	pnfs_return_layout(inode);
> +	
>  	/* Now clear out any remaining state */
>  	while (!list_empty(&nfsi->open_states)) {
>  		struct nfs4_state *state;
> @@ -1077,8 +1106,11 @@ struct inode *nfs_alloc_inode(struct super_block *sb)
>  	nfsi->acl_access = ERR_PTR(-EAGAIN);
>  	nfsi->acl_default = ERR_PTR(-EAGAIN);
>  #endif
> -#ifdef CONFIG_NFS_V4
> +#ifdef CONFIG_NFS_V4 /* XXX CONFIG_PNFS */
>  	nfsi->nfs4_acl = NULL;
> +	nfsi->pnfs_layout_state = 0;
> +	nfsi->current_layout = NULL;
> +	nfsi->layoutcommit_ctx = NULL;
>  #endif /* CONFIG_NFS_V4 */
>  	return &nfsi->vfs_inode;
>  }
> @@ -1164,6 +1196,12 @@ static int __init init_nfs_fs(void)
>  	if (err)
>  		goto out0;
>  
> +#if defined(CONFIG_NFS_V4)
> +	err = pnfs_initialize();
> +	if (err)
> +                goto out00;
> +#endif
> +
>  #ifdef CONFIG_PROC_FS
>  	rpc_proc_register(&nfs_rpcstat);
>  #endif
> @@ -1174,6 +1212,10 @@ out:
>  #ifdef CONFIG_PROC_FS
>  	rpc_proc_unregister("nfs");
>  #endif
> +#if defined(CONFIG_NFS_V4) /* XXXX CONFIG_PNFS */
> +out00:
> +	pnfs_uninitialize();
> +#endif
>  	nfs_destroy_directcache();
>  out0:
>  	nfs_destroy_writepagecache();
> @@ -1189,6 +1231,9 @@ out4:
>  
>  static void __exit exit_nfs_fs(void)
>  {
> +#if defined(CONFIG_NFS_V4) /* XXX CONFIG_PNFS */
> +	pnfs_uninitialize();
> +#endif
>  	nfs_destroy_directcache();
>  	nfs_destroy_writepagecache();
>  	nfs_destroy_readpagecache();
> diff --git a/fs/nfs/internal.h b/fs/nfs/internal.h
> index e4f4e5d..d539d5a 100644
> --- a/fs/nfs/internal.h
> +++ b/fs/nfs/internal.h
> @@ -59,7 +59,7 @@ extern u32 *nfs4_decode_dirent(u32 *p, struct nfs_entry *entry, int plus);
>  
>  /* nfs4proc.c */
>  #ifdef CONFIG_NFS_V4
> -extern struct rpc_procinfo nfs4_procedures[];
> +extern struct rpc_procinfo *nfs4_procedures;
>  
>  extern int nfs4_proc_fs_locations(struct inode *dir, struct dentry *dentry,
>  				  struct nfs4_fs_locations *fs_locations,
> diff --git a/fs/nfs/nfs2xdr.c b/fs/nfs/nfs2xdr.c
> index 67391ee..03960f2 100644
> --- a/fs/nfs/nfs2xdr.c
> +++ b/fs/nfs/nfs2xdr.c
> @@ -31,6 +31,8 @@
>  /* Mapping from NFS error code to "errno" error code. */
>  #define errno_NFSERR_IO		EIO
>  
> +int nfs_stat_to_errno(int stat);
> +
>  /*
>   * Declare the space requirements for NFS arguments and replies as
>   * number of 32bit-words
> diff --git a/fs/nfs/nfs3proc.c b/fs/nfs/nfs3proc.c
> index 7143b1f..620d13d 100644
> --- a/fs/nfs/nfs3proc.c
> +++ b/fs/nfs/nfs3proc.c
> @@ -876,7 +876,13 @@ static void nfs3_proc_commit_setup(struct nfs_write_data *data, int how)
>  		.rpc_resp	= &data->res,
>  		.rpc_cred	= data->cred,
>  	};
> +	int flags;
>  
> +	/* Set up the initial task struct.  */
> +	flags = (how & FLUSH_SYNC) ? 0 : RPC_TASK_ASYNC;
> +
> +	rpc_init_task(&data->task, NFS_CLIENT(data->inode), flags,
> +        					data->call_ops, data);
>  	rpc_call_setup(&data->task, &msg, 0);
>  }
>  
> @@ -889,6 +895,7 @@ nfs3_proc_lock(struct file *filp, int cmd, struct file_lock *fl)
>  struct nfs_rpc_ops	nfs_v3_clientops = {
>  	.version	= 3,			/* protocol version */
>  	.dentry_ops	= &nfs_dentry_operations,
> +	.file_ops	= &nfs_file_operations,
>  	.dir_inode_ops	= &nfs3_dir_inode_operations,
>  	.file_inode_ops	= &nfs3_file_inode_operations,
>  	.getroot	= nfs3_proc_get_root,
> @@ -924,5 +931,12 @@ struct nfs_rpc_ops	nfs_v3_clientops = {
>  	.file_open	= nfs_open,
>  	.file_release	= nfs_release,
>  	.lock		= nfs3_proc_lock,
> +	.rsize		= nfs_rsize,
> +	.wsize		= nfs_wsize,
> +	.rpages		= nfs_rpages,
> +	.wpages		= nfs_wpages,
> +	.boundary	= nfs_boundary,
>  	.clear_acl_cache = nfs3_forget_cached_acls,
> +	.pagein_one	= nfs_pagein_one,
> +	.flush_one	= nfs_flush_one,
>  };
> diff --git a/fs/nfs/nfs41_sessions.h b/fs/nfs/nfs41_sessions.h
> new file mode 100644
> index 0000000..45a36d5
> --- /dev/null
> +++ b/fs/nfs/nfs41_sessions.h
> @@ -0,0 +1,64 @@
> +#ifndef __NFS4_1_SESSIONS_H__
> +#define __NFS4_1_SESSIONS_H__
> +
> +typedef unsigned char	 	sessionid_t[16];
> +typedef u32			streamchannel_attrs;
> +typedef u32			rdmachannel_attrs;
> +
> +struct nfs4_channel_attrs {
> +	unsigned long		max_rqst_sz;
> +	unsigned long		max_resp_sz;
> +	unsigned long		max_resp_sz_cached;
> +	unsigned long		max_ops;
> +	unsigned long		max_reqs;
> +	streamchannel_attrs	stream_attrs;
> +	rdmachannel_attrs	rdma_attrs;
> +};
> +
> +struct nfs4_channel {
> +	struct nfs4_channel_attrs 	chan_attrs;
> +	unsigned long 			nr_conns;
> +	struct list_head		rpc_clients;
> +};
> +
> +struct nfs4_session {
> +	/* Session related params */
> +	sessionid_t			sess_id;
> +	u32				seqid;	/* The seqid returned by exchange_id */
> +	u32				persist;
> +	u32				header_padding;
> +	u32				hash_alg;
> +	u32				ssv_len;
> +	u32				use_for_back_chan;
> +	u32				rdma_mode;
> +	
> +	/* Slotid management */
> +	unsigned long 			nr_slots_in_use;
> +	struct list_head 		slots_in_use;
> +	struct list_head 		unused_slots;
> +	struct rpc_wait_queue		slot_waitq;
> +	
> +	/* The fore and back channel */
> +	struct nfs4_channel		fore_channel;
> +	struct nfs4_channel		back_channel;
> +
> +	unsigned int			expired;
> +	struct nfs4_client *		client;
> +	struct list_head		session_hashtbl;
> +	spinlock_t 			session_lock;
> +	/* To prevent races between create_session and sequence */
> +	int 				mutating;
> +	struct semaphore		session_sem;
> +	atomic_t			ref_count;
> +};
> +
> +struct nfs4_slot {
> +	u32		 	slot_nr;
> +	u32		 	seq_nr;
> +	struct nfs4_session *	session;	
> +	struct list_head 	slot_list;	
> +};
> +
> +#endif
> +
> +
> diff --git a/fs/nfs/nfs4_fs.h b/fs/nfs/nfs4_fs.h
> index 9a10286..696e730 100644
> --- a/fs/nfs/nfs4_fs.h
> +++ b/fs/nfs/nfs4_fs.h
> @@ -9,7 +9,10 @@
>  #ifndef __LINUX_FS_NFS_NFS4_FS_H
>  #define __LINUX_FS_NFS_NFS4_FS_H
>  
> +#include "nfs41_sessions.h"
> +
>  #ifdef CONFIG_NFS_V4
> +#define NFSV4_MAX_MINORVERSION 1
>  
>  struct idmap;
>  
> @@ -52,6 +55,7 @@ struct nfs4_client {
>  	nfs4_verifier		cl_confirm;
>  	unsigned long		cl_state;
>  
> +	struct nfs4_session *   cl_session;
>  	u32			cl_lockowner_id;
>  
>  	/*
> @@ -65,12 +69,13 @@ struct nfs4_client {
>  	struct list_head	cl_unused;
>  	int			cl_nunused;
>  	spinlock_t		cl_lock;
> -	atomic_t		cl_count;
> +	atomic_t			cl_count;
>  
>  	struct rpc_clnt *	cl_rpcclient;
>  
>  	struct list_head	cl_superblocks;	/* List of nfs_server structs */
>  
> +	u32                     cl_minorversion;
>  	unsigned long		cl_lease_time;
>  	unsigned long		cl_last_renewal;
>  	struct work_struct	cl_renewd;
> @@ -214,12 +219,17 @@ extern int nfs4_proc_setclientid(struct nfs4_client *, u32, unsigned short, stru
>  extern int nfs4_proc_setclientid_confirm(struct nfs4_client *, struct rpc_cred *);
>  extern int nfs4_proc_async_renew(struct nfs4_client *, struct rpc_cred *);
>  extern int nfs4_proc_renew(struct nfs4_client *, struct rpc_cred *);
> +extern int nfs4_proc_async_sequence(struct nfs4_client *, struct rpc_cred *);
> +extern int nfs4_proc_sequence(struct nfs4_client *, struct rpc_cred *);
> +
>  extern int nfs4_do_close(struct inode *inode, struct nfs4_state *state);
>  extern struct dentry *nfs4_atomic_open(struct inode *, struct dentry *, struct nameidata *);
>  extern int nfs4_open_revalidate(struct inode *, struct dentry *, int, struct nameidata *);
>  extern int nfs4_server_capabilities(struct nfs_server *server, struct nfs_fh *fhandle);
>  extern int nfs4_proc_fs_locations(struct inode *dir, struct dentry *dentry,
>  		struct nfs4_fs_locations *fs_locations, struct page *page);
> +extern int nfs41_proc_setup_session(struct nfs4_client *clp);
> +extern int nfs4_proc_destroy_session(struct nfs4_client *);
>  
>  extern struct nfs4_state_recovery_ops nfs4_reboot_recovery_ops;
>  extern struct nfs4_state_recovery_ops nfs4_network_partition_recovery_ops;
> @@ -267,7 +277,10 @@ extern const nfs4_stateid zero_stateid;
>  
>  /* nfs4xdr.c */
>  extern uint32_t *nfs4_decode_dirent(uint32_t *p, struct nfs_entry *entry, int plus);
> -extern struct rpc_procinfo nfs4_procedures[];
> +extern struct rpc_procinfo *nfs4_procedures;
> +extern struct rpc_version *nfs4_minorversions[];
> +extern struct rpc_procinfo *nfs4_minorversion_procedures[];
> +
>  
>  struct nfs4_mount_data;
>  
> diff --git a/fs/nfs/nfs4filelayout.c b/fs/nfs/nfs4filelayout.c
> new file mode 100644
> index 0000000..3c48306
> --- /dev/null
> +++ b/fs/nfs/nfs4filelayout.c
> @@ -0,0 +1,685 @@
> +/*
> + *  linux/fs/nfs/nfs4filelayout.c
> + *
> + *  Module for the pnfs nfs4 file layout driver.
> + *  Defines all I/O and Policy interface operations, plus code
> + *  to register itself with the pNFS client.
> + *
> + *  Copyright (c) 2002 The Regents of the University of Michigan.
> + *  All rights reserved.
> + *
> + *  Dean Hildebrand <dhildebz at eecs.umich.edu>
> + *
> + *  Redistribution and use in source and binary forms, with or without
> + *  modification, are permitted provided that the following conditions
> + *  are met:
> + *
> + *  1. Redistributions of source code must retain the above copyright
> + *     notice, this list of conditions and the following disclaimer.
> + *  2. Redistributions in binary form must reproduce the above copyright
> + *     notice, this list of conditions and the following disclaimer in the
> + *     documentation and/or other materials provided with the distribution.
> + *  3. Neither the name of the University nor the names of its
> + *     contributors may be used to endorse or promote products derived
> + *     from this software without specific prior written permission.
> + *
> + *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED
> + *  WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
> + *  MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
> + *  DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
> + *  FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
> + *  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> + *  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
> + *  BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
> + *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
> + *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
> + *  SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
> + */
> +#include <linux/config.h>
> +#include <linux/module.h>
> +#include <linux/init.h>
> +
> +#include <linux/time.h>
> +#include <linux/kernel.h>
> +#include <linux/mm.h>
> +#include <linux/string.h>
> +#include <linux/stat.h>
> +#include <linux/errno.h>
> +#include <linux/unistd.h>
> +#include <linux/nfs_fs.h>
> +#include <linux/nfs_page.h>
> +#include <linux/nfs4_pnfs.h>
> +
> +#include "nfs4filelayout.h"
> +#include "nfs4_fs.h"
> +
> +#define NFSDBG_FACILITY         NFSDBG_FILELAYOUT
> +
> +MODULE_LICENSE("GPL");
> +MODULE_AUTHOR("Dean Hildebrand <dhildebz at eecs.umich.edu>");
> +MODULE_DESCRIPTION("The NFSv4 file layout driver");
> +
> +extern void nfs_execute_read(struct nfs_read_data *data);
> +extern void nfs_readdata_release(void *data);
> +extern int nfs_flush_task_priority(int how);
> +extern void nfs_writedata_release(void *data);
> +extern void nfs_execute_write(struct nfs_write_data *data);
> +extern void nfs_commit_rpcsetup(struct nfs_write_data *data, int sync);
> +extern struct nfs_write_data *nfs_commit_alloc(void);
> +extern void nfs_commit_free(struct nfs_write_data *p);
> +extern void nfs_initiate_write(struct nfs_write_data *, struct rpc_clnt *, const struct rpc_call_ops *, int);
> +extern void nfs_initiate_read(struct nfs_read_data *data, struct rpc_clnt *clnt, const struct rpc_call_ops *call_ops);
> +
> +/* Callback operations to the pNFS client */
> +struct pnfs_client_operations * pnfs_callback_ops;
> +
> +/* Initialize a mountpoint by retrieving the list of
> + * available devices for it.
> + * Return the pnfs_mount_type structure so the
> + * pNFS_client can refer to the mount point later on.
> + */
> +struct pnfs_mount_type*
> +filelayout_initialize_mountpoint(struct super_block* sb)
> +{
> +	struct filelayout_mount_type* fl_mt;
> +	struct pnfs_mount_type* mt;
> +	struct pnfs_devicelist *dlist;
> +	int status;
> +
> +	dlist = kmalloc(sizeof(struct pnfs_devicelist), GFP_KERNEL);
> +	if (!dlist)
> +		goto error_ret;
> +	fl_mt = kmalloc(sizeof(struct filelayout_mount_type), GFP_KERNEL);
> +	if (!fl_mt)
> +		goto cleanup_dlist;
> +	/* Initialize nfs4 file layout specific device list structure */
> +	fl_mt->hlist = kmalloc(sizeof(struct nfs4_pnfs_dev_hlist), GFP_KERNEL);
> +	if (!fl_mt->hlist)
> +		goto cleanup_fl_mt;
> +	mt = kmalloc(sizeof(struct pnfs_mount_type), GFP_KERNEL);
> +	if (!mt)
> +		goto cleanup_fl_mt;
> +
> +	fl_mt->fl_sb = sb;
> +	mt->mountid = (void*)fl_mt;
> +
> +	/* Retrieve device list from server*/
> +	status = pnfs_callback_ops->nfs_getdevicelist(sb, dlist);
> +	if (status)
> +		goto cleanup_mt;
> +	status = nfs4_pnfs_devlist_init(fl_mt->hlist);
> +	if (status)
> +		goto cleanup_mt;
> +
> +	/* Decode opaque devicelist and add to list of available
> +	* devices (data servers.
> +	*/
> +	status = decode_and_add_devicelist(fl_mt, dlist);
> +	if (status)
> +		goto cleanup_mt;
> +
> +	kfree(dlist);
> +	return mt;
> +
> +cleanup_mt: ;
> +	kfree(mt);
> +cleanup_fl_mt: ;
> +	if (fl_mt->hlist)
> +		kfree(fl_mt->hlist);
> +	kfree(fl_mt);
> +cleanup_dlist: ;
> +	kfree(dlist);
> +error_ret: ;
> +	return NULL;
> +}
> +
> +/* Uninitialize a mountpoint by destroying its device list.
> + */
> +int
> +filelayout_uninitialize_mountpoint(struct pnfs_mount_type* mountid)
> +{
> +struct filelayout_mount_type* fl_mt = NULL;
> +
> +	if (mountid)
> +		fl_mt = (struct filelayout_mount_type*)mountid->mountid;
> +
> +	nfs4_pnfs_devlist_destroy(fl_mt->hlist);
> +
> +	if (fl_mt != NULL)
> +		kfree(fl_mt);
> +	kfree(mountid);
> +	return 0;
> +}
> +
> +extern struct rpc_call_ops nfs_read_partial_ops;
> +
> +/* This function is used by the layout driver to caclulate the
> + * offset of the file on the dserver based on whether the 
> + * layout type is STRIPE_DENSE or STRIPE_SPARSE
> + */
> +loff_t
> +filelayout_get_dserver_offset(loff_t offset, struct nfs4_filelayout * layout)
> +{
> +	if (layout == NULL);
> +		return offset;
> +
> +	switch (layout->stripe_type) {
> +		case STRIPE_SPARSE:
> +			return offset;
> +
> +		case STRIPE_DENSE:
> +		{
> +			u32 stripe_size;
> +			u32 stripe_unit;
> +			loff_t off;
> +			loff_t tmp;
> +			u32 stripe_unit_idx;
> +
> +			stripe_size = layout->stripe_unit * layout->num_devs;
> +			/* XXX I do this because do_div seems to take a 32 bit dividend */
> +			stripe_unit = layout->stripe_unit; 
> +			tmp = off = offset;
> +
> +			do_div(off, stripe_size);
> +			stripe_unit_idx = do_div(tmp, stripe_unit);			
> +			
> +			return off * stripe_unit + stripe_unit_idx;
> +		}
> +	
> +		default:
> +			BUG();
> +	}
> +
> +	/* We should never get here... just to stop the gcc warning */
> +	return 0;
> +}
> +
> +/* Call ops for the async read/write cases
> + * In the case of dense layouts, the offset needs to be reset to its
> + * original value. 
> + */
> +static void filelayout_read_call_done(struct rpc_task *task, void *data)
> +{
> +	struct nfs_read_data *rdata = (struct nfs_read_data *)data;
> +
> +	if (rdata->orig_offset)
> +		rdata->args.offset = rdata->orig_offset;
> +
> +	/* Call the NFS call ops now */
> +	rdata->call_ops->rpc_call_done(task, data);
> +}
> +
> +static void filelayout_write_call_done(struct rpc_task *task, void *data)
> +{
> +	struct nfs_write_data *wdata = (struct nfs_write_data *)data;
> +
> +	if (wdata->orig_offset)
> +		wdata->args.offset = wdata->orig_offset;
> +
> +	/* Call the NFS call ops now */
> +	wdata->call_ops->rpc_call_done(task, data);
> +}
> +
> +struct rpc_call_ops filelayout_read_call_ops = {
> +	.rpc_call_done = filelayout_read_call_done,
> +};
> +
> +struct rpc_call_ops filelayout_write_call_ops = {
> +	.rpc_call_done = filelayout_write_call_done,
> +};
> +
> +/* Perform sync or async reads.
> + *
> + * An optimization for the NFS file layout driver
> + * allows the original read/write data structs to be passed in the
> + * last argument.
> + *
> +
> + * This is called after the pNFS client has already created, so I pass it
> + * in via the last argument (void*).  I think this is the only way as there
> + * are just too many NFS specific arguments in the read/write data structs
> + * to pass to the layout drivers.
> + *
> + * TODO:
> + * 1. This is a lot of arguments, create special non-nfs-specific structure?
> + */
> +ssize_t filelayout_read_pagelist(
> +	struct pnfs_layout_type * layoutid,
> +	struct inode * inode,
> +	struct page **pages,
> +	unsigned int pgbase,
> +	unsigned nr_pages,
> +	loff_t offset,
> +	size_t count,
> +	struct nfs_read_data* data)
> +{
> +	struct nfs4_filelayout* nfslay = NULL;
> +	struct nfs4_pnfs_dserver dserver;
> +	int status;
> +	struct nfs_server *server = NFS_SERVER(inode);
> +	struct nfs4_client *clp = server->nfs4_state;
> +
> +	if (layoutid) {
> +		nfslay = (struct nfs4_filelayout*)layoutid->layoutid;
> +		/* Retrieve the correct rpc_client for the byte range */
> +		status = nfs4_pnfs_dserver_get(inode,
> +						nfslay,
> +						offset,
> +						count,
> +						&dserver);
> +		if(status) {
> +			printk("%s: dserver get failed status %d use MDS\n",
> +							__FUNCTION__, status);
> +			data->pnfs_client = NFS_CLIENT(inode);
> +			data->session = clp->cl_session;
> +			data->args.fh = NFS_FH(inode);
> +			status = 0;
> +		}
> +		else {
> +			data->pnfs_client = dserver.dev_item->rpc_clnt;
> +			data->session = dserver.dev_item->session;
> +			data->args.fh = dserver.fh;
> +	
> +			/* Now get the file offset on the dserver
> +			 * Set the read offset to this offset, and
> +			 * save the original offset in orig_offset
> +			 */
> +			data->args.offset = filelayout_get_dserver_offset(offset, nfslay);
> +			data->orig_offset = offset;
> +		}
> +	}
> +	else { /* If no layout use MDS */
> +		dprintk("%s: no layout, use MDS\n", __FUNCTION__);
> +		data->pnfs_client = NFS_CLIENT(inode);
> +		data->session = clp->cl_session;
> +		data->args.fh = NFS_FH(inode);
> +	}
> +
> +	/* Perform a syncronous or asyncronous read */
> +        /* Now get the file offset on the dserver
> +         * Set the write offset to this offset, and
> +         * save the original offset in orig_offset
> +         */
> +        data->args.offset = filelayout_get_dserver_offset(offset, nfslay);
> +        data->orig_offset = offset;
> +
> +	if (data->pnfsflags & PNFS_ISSYNC) {
> +		/* sync */
> +		status = NFS_PROTO(inode)->read(data);
> +
> +		/* In the case of synchronous reads, we reset the offset here */
> +		data->args.offset = data->orig_offset;
> +	} else { 
> +		/* async */
> +		nfs_initiate_read(data, data->pnfs_client, &filelayout_read_call_ops);
> +
> +		/* In the case of aync reads, the offset will be reset in the 
> +		 * call_ops->rpc_call_done() routine 
> +		 */
> +                /* In the case of aync writes, the offset will be reset in the
> +                 * call_ops->rpc_call_done() routine
> +                 */
> +		status = 0;
> +	}
> +	return status;
> +}
> +
> +/* Perform sync or async writes.
> + *
> + * TODO: See filelayout_read_pagelist.
> + */
> +ssize_t filelayout_write_pagelist(
> +	struct pnfs_layout_type * layoutid,
> +	struct inode * inode,
> +	struct page **pages,
> +	unsigned int pgbase,
> +	unsigned nr_pages,
> +	loff_t offset,
> +	size_t count,
> +	int sync,
> +	struct nfs_write_data* data)
> +{
> +	struct nfs4_filelayout* nfslay = (struct nfs4_filelayout*)layoutid->layoutid;
> +	struct nfs4_pnfs_dserver dserver;
> +	struct nfs_page* req;
> +	struct list_head *h;
> +	int status;
> +
> +	/* Retrieve the correct rpc_client for the byte range */
> +	status = nfs4_pnfs_dserver_get(inode,
> +					nfslay,
> +					offset,
> +					count,
> +					&dserver);
> +	/* ANDROS: XXX should fail if no data server */
> +	if(!status) {
> +		data->pnfs_client = dserver.dev_item->rpc_clnt;
> +		data->session = dserver.dev_item->session;
> +		data->args.fh = dserver.fh;
> +	}
> +	dprintk("%s set wb_devid %d\n", __FUNCTION__, 
> +					dserver.dev_item[0].dev_id);
> +	list_for_each(h, &data->pages) {
> +		req = list_entry(h, struct nfs_page, wb_list);
> +		req->wb_devid = dserver.dev_item[0].dev_id;
> +	}
> +
> +        /* Now get the file offset on the dserver
> +         * Set the write offset to this offset, and
> +         * save the original offset in orig_offset
> +         */
> +        data->args.offset = filelayout_get_dserver_offset(offset, nfslay);
> +        data->orig_offset = offset;
> +
> +	/* Perform a syncronous or asyncronous read */
> +	if (data->pnfsflags & PNFS_ISSYNC) {
> +		/* sync */
> +		dprintk("NFS_FILELAYOUT: synchronous write call (req %s/%Ld, %u bytes @ offset %Lu)\n",
> +			inode->i_sb->s_id,
> +			(long long)NFS_FILEID(inode),
> +			count,
> +			(unsigned long long)data->args.offset);
> +		status = NFS_PROTO(inode)->write(data);
> +
> +                /* In the case of synchronous writes, we reset the offset here */
> +                data->args.offset = data->orig_offset;
> +	} else {
> +		/* async */
> +		nfs_initiate_write(data, data->pnfs_client, &filelayout_write_call_ops, sync);
> +                /* In the case of aync writes, the offset will be reset in the
> +                 * call_ops->rpc_call_done() routine
> +                 */
> +		status = 0;
> +	}
> +	return status;
> +}
> +
> +/* Create a filelayout layout structure and return it.  The pNFS client
> + * will use the pnfs_layout_type type to refer to the layout for this
> + * inode from now on.
> + */
> +struct pnfs_layout_type*
> +filelayout_alloc_layout(struct pnfs_mount_type * mountid, struct inode * inode)
> +{
> +	struct pnfs_layout_type* pnfslay = NULL;
> +	struct nfs4_filelayout* nfslay = NULL;
> +
> +	dprintk("NFS_FILELAYOUT: allocating layout\n");
> +
> +	pnfslay = kzalloc(sizeof(struct pnfs_layout_type), GFP_KERNEL);
> +	if (!pnfslay)
> +		return NULL;
> +	nfslay = kzalloc(sizeof(struct nfs4_filelayout), GFP_KERNEL);
> +	if (!nfslay)
> +		return NULL;
> +
> +	pnfslay->layoutid = (void*)nfslay;
> +	pnfslay->mountid = mountid;
> +	return pnfslay;
> +}
> +
> +/* Free a filelayout layout structure
> + */
> +void
> +filelayout_free_layout(struct pnfs_layout_type * layoutid, struct inode * inode, loff_t offset, size_t count)
> +{
> +	struct nfs4_filelayout* nfslay = NULL;
> +
> +	dprintk("NFS_FILELAYOUT: freeing layout\n");
> +
> +	if (layoutid)
> +		nfslay = (struct nfs4_filelayout*)layoutid->layoutid;
> +	if (nfslay != NULL)
> +		kfree(nfslay);
> +	kfree(layoutid);
> +}
> +
> +/* Decode layout and store in layoutid.  Overwrite any existing layout
> + * information for this file.
> + */
> +struct pnfs_layout_type*
> +filelayout_set_layout(struct pnfs_layout_type* layoutid, struct inode* inode, void* layout)
> +{
> +	struct nfs4_filelayout* fl = NULL;
> +	int i;
> +	uint32_t *p = (uint32_t*)layout;
> +
> +	dprintk("%s set_layout_map Begin\n", __FUNCTION__);
> +
> +	if (!layoutid)
> +		goto nfserr;
> +	fl = (struct nfs4_filelayout*)layoutid->layoutid;
> +	if (!fl)
> +		goto nfserr;
> +
> +	READ32(fl->stripe_type);
> +	READ32(fl->commit_through_mds);
> +	READ64(fl->stripe_unit);
> +	READ64(fl->file_size);
> +	READ32(fl->index_len);
> +	if (fl->index_len > 0) { //??? if>0 must build index list
> +		printk("filelayout_set_layout: XXX add loop for index list\n");
> +	}
> +	READ32(fl->num_devs);
> +
> +	dprintk("DEBUG: %s: type %d stripe_unit %lld file_size %lld devs %d\n",
> +				__func__, fl->stripe_type, fl->stripe_unit, 
> +				fl->file_size, fl->num_devs);
> +
> +	for (i = 0; i < fl->num_devs; i++) {
> +
> +		/* dev_id */
> +		READ32(fl->devs[i].dev_id);
> +		READ32(fl->devs[i].dev_index);
> +        
> +		/* fh */
> +		memset(&fl->devs[i].fh, 0, sizeof(struct nfs_fh));
> +		READ32(fl->devs[i].fh.size);
> +		COPYMEM(fl->devs[i].fh.data, fl->devs[i].fh.size);
> +		dprintk("DEBUG: %s: dev %d len %d\n", __func__,
> +		fl->devs[i].dev_id,fl->devs[i].fh.size);
> +	}
> +
> +	return layoutid;
> +nfserr:
> +	return NULL;
> +}
> +
> +/* Call nfs fsync function to flush buffers and eventually call
> + * the filelayout_write_pagelist and filelayout_commit functions.
> + */
> +int
> +filelayout_fsync( struct pnfs_layout_type * layoutid,
> +		struct file *file,
> +		struct dentry *dentry,
> +		int datasync)
> +{
> +	return pnfs_callback_ops->nfs_fsync(file, dentry, datasync);
> +}
> +
> +/* TODO: Technically we would need to execute a COMMIT op to each
> + * data server on which a page in 'pages' exists.
> + * Once we fix this, we will need to invoke the pnfs_commit_complete callback.
> + */
> +int
> +filelayout_commit(struct pnfs_layout_type * layoutid, struct inode* ino, struct list_head *pages, int sync, struct nfs_write_data* data)
> +{
> +	struct nfs_write_data   *dsdata = NULL;
> +	struct pnfs_layout_type* laytype;
> +	struct nfs4_filelayout* nfslay;
> +	struct nfs4_pnfs_dserver dserver;
> +	struct nfs_page* first;
> +	struct nfs_page* req;
> +	struct list_head *pos, *tmp;
> +	u32 dev_id;
> +	int i;
> +
> +	laytype = NFS_I(ino)->current_layout;
> +	nfslay = (struct nfs4_filelayout*)layoutid->layoutid;
> +
> +	dprintk("%s data %p pnfs_client %p nfslay %p\n",
> +			__FUNCTION__, data, data->pnfs_client, nfslay);
> +
> +	if (nfslay->commit_through_mds) {
> +		dprintk("%s data %p commit through mds\n", __FUNCTION__, data);
> +  		nfs_execute_write(data);
> +		return 0;
> +	}
> +	for (i = 0; i < nfslay->num_devs; i++) {
> +		dev_id = nfslay->devs[i].dev_id;
> +		if (!dsdata) {
> +			unsigned int pgcnt = 0;
> +
> +			list_for_each_safe(pos, tmp, &data->pages) {
> +				req = nfs_list_entry(pos);
> +				if (req->wb_devid == dev_id)
> +					pgcnt++;
> +			}
> +			dsdata = nfs_commit_alloc();
> +		}
> +		if (!dsdata)
> +			goto out_bad;
> +		dserver.dev_item = nfs4_pnfs_device_get(ino, dev_id);
> +		if (dserver.dev_item == NULL) {
> +			return 1;
> +		}
> +		list_for_each_safe(pos, tmp, &data->pages) {
> +			req = nfs_list_entry(pos);
> +			if (req->wb_devid == dev_id) {
> +				nfs_list_remove_request(req);
> +				nfs_list_add_request(req, &dsdata->pages);
> +			} 
> +		}
> +		if (list_empty(&dsdata->pages)) {
> +			if (list_empty(&data->pages)) {
> +				dprintk("%s exit i %d devid %d\n",
> +						__FUNCTION__, i,dev_id);
> +				nfs_commit_free(dsdata);
> +				return 0;
> +			} else
> +				continue;
> +		}
> +		first = nfs_list_entry(dsdata->pages.next);
> +
> +		dprintk("%s call nfs_commit_rpcsetup i %d devid %d\n",
> +						__FUNCTION__, i, dev_id);
> +
> +		dsdata->pnfs_client = dserver.dev_item->rpc_clnt;
> +		dsdata->session =  dserver.dev_item->session;
> +
> +		nfs_commit_rpcsetup(dsdata, sync);
> +
> +		/* TODO: Is the FH different from NFS_FH(data->inode)?
> +		 * (set in nfs_commit_rpcsetup)
> +		 */
> +		dserver.fh = &nfslay->devs[i].fh;
> +		dsdata->args.fh = dserver.fh;
> +
> +		nfs_execute_write(dsdata);
> +		dsdata = NULL;
> +	}
> +
> +	/* Release original commit data since it is not used */
> +	nfs_commit_free(data);
> +	return 0;
> +
> +out_bad:
> +	nfs_commit_free(data);
> +	return -ENOMEM;
> +}
> +
> +/* Return the stripesize for the specified file.
> + */
> +ssize_t
> +filelayout_get_stripesize(struct pnfs_layout_type* layoutid, struct inode* inode)
> +{
> +	struct nfs4_filelayout* fl = (struct nfs4_filelayout*)layoutid->layoutid;
> +	ssize_t stripesize = fl->stripe_unit;
> +	return stripesize;
> +}
> +
> +/* Split wsize/rsize chunks so they do not span multiple data servers
> + */
> +int
> +filelayout_gather_across_stripes(struct pnfs_mount_type* mountid)
> +{
> +	return 0;
> +}
> +
> +/* Use the NFSv4 page cache
> +*/
> +int
> +filelayout_use_pagecache(struct pnfs_layout_type* layoutid, struct inode* inode)
> +{
> +	return 1;
> +}
> +
> +/* Issue a layoutget in the same compound as OPEN
> + */
> +int
> +filelayout_layoutget_on_open(struct pnfs_mount_type* mountid)
> +{
> +	return 1;
> +}
> +
> +ssize_t 
> +filelayout_get_io_threshold(struct pnfs_layout_type *layoutid, struct inode *inode)
> +{
> +	return -1;
> +}
> +
> +
> +struct layoutdriver_io_operations filelayout_io_operations =
> +{
> +	.fsync                   = filelayout_fsync,
> +	.commit                  = filelayout_commit,
> +	.read_pagelist           = filelayout_read_pagelist,
> +	.write_pagelist          = filelayout_write_pagelist,
> +	.set_layout              = filelayout_set_layout,
> +	.alloc_layout            = filelayout_alloc_layout,
> +	.free_layout             = filelayout_free_layout,
> +	.initialize_mountpoint   = filelayout_initialize_mountpoint,
> +	.uninitialize_mountpoint = filelayout_uninitialize_mountpoint,
> +};
> +
> +struct layoutdriver_policy_operations filelayout_policy_operations =
> +{
> +	.get_stripesize        = filelayout_get_stripesize,
> +	.gather_across_stripes = filelayout_gather_across_stripes,
> +	.use_pagecache         = filelayout_use_pagecache,
> +	.layoutget_on_open     = filelayout_layoutget_on_open,
> +	.get_read_threshold    = filelayout_get_io_threshold,
> +	.get_write_threshold   = filelayout_get_io_threshold,
> +};
> +
> +
> +struct pnfs_layoutdriver_type filelayout_type =
> +{
> +	.id = LAYOUT_NFSV4_FILES,
> +	.name = "LAYOUT_NFSV4_FILES",
> +	.ld_io_ops = &filelayout_io_operations,
> +	.ld_policy_ops = &filelayout_policy_operations,
> +};
> +
> +static int __init nfs4filelayout_init(void)
> +{
> +	printk("%s: NFSv4 File Layout Driver Registering...\n", __FUNCTION__);
> +
> +	/* Need to register file_operations struct with global list to indicate
> +	* that NFS4 file layout is a possible pNFS I/O module
> +	*/
> +	pnfs_callback_ops = pnfs_register_layoutdriver(&filelayout_type);
> +
> +	return 0;
> +}
> +
> +static void __exit nfs4filelayout_exit(void)
> +{
> +	printk("%s: NFSv4 File Layout Driver Unregistering...\n", __FUNCTION__);
> +
> +	/* Unregister NFS4 file layout driver with pNFS client*/
> +	pnfs_unregister_layoutdriver(&filelayout_type);
> +}
> +
> +module_init(nfs4filelayout_init);
> +module_exit(nfs4filelayout_exit);
> diff --git a/fs/nfs/nfs4filelayout.h b/fs/nfs/nfs4filelayout.h
> new file mode 100644
> index 0000000..ec82b81
> --- /dev/null
> +++ b/fs/nfs/nfs4filelayout.h
> @@ -0,0 +1,114 @@
> +/*
> + *  pnfs_nfs4filelayout.h
> + *
> + *  NFSv4 file layout driver data structures.
> + *
> + *  Copyright (c) 2002 The Regents of the University of Michigan.
> + *  All rights reserved.
> + *
> + *  Dean Hildebrand   <dhildebz at eecs.umich.edu>
> + */
> +
> +#ifndef FS_NFS_NFS4FILELAYOUT_H
> +#define FS_NFS_NFS4FILELAYOUT_H
> +
> +#include <linux/nfs4_pnfs.h>
> +#include "nfs41_sessions.h"
> +
> +#define NFS4_PNFS_DEV_HASH_BITS 5
> +#define NFS4_PNFS_DEV_HASH (1 << NFS4_PNFS_DEV_HASH_BITS)
> +
> +#define NFS4_PNFS_MAX_DEVS 16
> +
> +struct nfs4_session *nfs41_alloc_session(void);
> +int _nfs4_proc_create_session(struct nfs4_client *clp, struct nfs4_session *session,
> +                                struct rpc_clnt *clnt);
> +int _nfs4_proc_destroy_session(struct nfs4_session **session, struct rpc_clnt *clnt);
> +
> +enum stripetype4 {
> +        STRIPE_SPARSE = 1,
> +        STRIPE_DENSE = 2
> +};
> +
> +struct nfs4_pnfs_dev_item {
> +	struct hlist_node hash_node;
> +	u32 dev_id;
> +	u32 ip_addr;
> +	u32 port;
> +	atomic_t count;
> +	struct rpc_clnt *rpc_clnt;
> +	struct nfs4_session *session;
> +};
> +
> +struct nfs4_pnfs_dev_hlist {
> +	rwlock_t          dev_lock;
> +	struct hlist_head dev_list[NFS4_PNFS_DEV_HASH];
> +};
> +
> +struct nfs4_pnfs_devaddr {
> +	u32 dev_id;
> +	u32 ip;
> +	u16 port;
> +};
> +
> +struct nfs4_pnfs_devlist {
> +	struct list_head         devlist;
> +	struct nfs4_pnfs_devaddr devaddr;
> +};
> +
> +struct nfs4_pnfs_dserver {
> +	struct nfs_fh        *fh;
> +	struct nfs4_pnfs_dev_item *dev_item;
> +};
> +
> +struct nfs4_filelayout_devs {
> +	u32 dev_id;
> +	u32 dev_index;
> +	struct nfs_fh fh;
> +};
> +
> +struct nfs4_filelayout {
> +	int uncommitted_write;
> +	loff_t last_commit_size;
> +	u64 layout_id;
> +	u64 offset;
> +	u64 length;
> +	u32 iomode;
> +	u64 file_size;
> +	u32 stripe_type;
> +	u32 commit_through_mds;
> +	u64 stripe_unit;
> +	unsigned int index_len;
> +	unsigned int num_devs;
> +	struct nfs4_filelayout_devs devs[NFS4_PNFS_MAX_DEVS];
> +};
> +
> +struct filelayout_mount_type {
> +	struct super_block* fl_sb;
> +	struct nfs4_pnfs_dev_hlist *hlist;
> +};
> +
> +int  nfs4_pnfs_devlist_init(struct nfs4_pnfs_dev_hlist *hlist);
> +void nfs4_pnfs_devlist_destroy(struct nfs4_pnfs_dev_hlist *hlist);
> +
> +int nfs4_pnfs_dserver_get(struct inode *inode,
> +			  struct nfs4_filelayout *layout,
> +			  u64 offset,
> +			  u32 count,
> +			  struct nfs4_pnfs_dserver *dserver);
> +int decode_and_add_devicelist(struct filelayout_mount_type *mt, struct pnfs_devicelist* devlist);
> +
> +struct nfs4_pnfs_dev_item *
> +nfs4_pnfs_device_get(struct inode *inode, u32 dev_id);
> +
> +#define READ32(x)         (x) = ntohl(*p++)
> +#define READ64(x)         do {			\
> +	(x) = (u64)ntohl(*p++) << 32;		\
> +	(x) |= ntohl(*p++);			\
> +} while (0)
> +#define COPYMEM(x,nbytes) do {			\
> +	memcpy((x), p, nbytes);			\
> +	p += XDR_QUADLEN(nbytes);		\
> +} while (0)
> +
> +#endif /* FS_NFS_NFS4FILELAYOUT_H */
> diff --git a/fs/nfs/nfs4filelayoutdev.c b/fs/nfs/nfs4filelayoutdev.c
> new file mode 100644
> index 0000000..634d14f
> --- /dev/null
> +++ b/fs/nfs/nfs4filelayoutdev.c
> @@ -0,0 +1,455 @@
> +/*
> + *  linux/fs/nfs/nfs4filelayoutdev.c
> + *
> + *  Device operations for the pnfs nfs4 file layout driver.
> + *
> + *  Copyright (c) 2002 The Regents of the University of Michigan.
> + *  All rights reserved.
> + *
> + *  Dean Hildebrand <dhildebz at eecs.umich.edu>
> + *  Garth Goodson   <Garth.Goodson at netapp.com>
> + *
> + *  Redistribution and use in source and binary forms, with or without
> + *  modification, are permitted provided that the following conditions
> + *  are met:
> + *
> + *  1. Redistributions of source code must retain the above copyright
> + *     notice, this list of conditions and the following disclaimer.
> + *  2. Redistributions in binary form must reproduce the above copyright
> + *     notice, this list of conditions and the following disclaimer in the
> + *     documentation and/or other materials provided with the distribution.
> + *  3. Neither the name of the University nor the names of its
> + *     contributors may be used to endorse or promote products derived
> + *     from this software without specific prior written permission.
> + *
> + *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED
> + *  WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
> + *  MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
> + *  DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
> + *  FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
> + *  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> + *  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
> + *  BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
> + *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
> + *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
> + *  SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
> + */
> +#include <linux/config.h>
> +#include <linux/completion.h>
> +#include <linux/module.h>
> +#include <linux/sched.h>
> +#include <linux/spinlock.h>
> +#include <linux/hash.h>
> +
> +#include <linux/nfs4.h>
> +#include <linux/nfs_fs.h>
> +#include <linux/nfs_xdr.h>
> +
> +#include <asm/div64.h>
> +
> +#include "nfs4filelayout.h"
> +#include "nfs4_fs.h"
> +
> +#define NFSDBG_FACILITY		NFSDBG_FILELAYOUT
> +
> +extern struct pnfs_client_operations * pnfs_callback_ops;
> +
> +struct rpc_clnt*
> +create_nfs_rpcclient(struct rpc_xprt *xprt,
> +				char* server_name,
> +				u32 version,
> +				rpc_authflavor_t authflavor,
> +				int *err);
> +
> +/* Assumes lock is held */
> +static inline struct nfs4_pnfs_dev_item *
> +_device_lookup(struct nfs4_pnfs_dev_hlist *hlist, u32 dev_id)
> +{
> +	unsigned long      hash;
> +	struct hlist_node *np;
> +
> +	dprintk("_device_lookup: dev_id=%u\n", dev_id);
> +
> +	hash = hash_long(dev_id, NFS4_PNFS_DEV_HASH_BITS);
> +
> +	hlist_for_each(np, &hlist->dev_list[hash]) {
> +		struct nfs4_pnfs_dev_item *dev;
> +		dev = hlist_entry(np, struct nfs4_pnfs_dev_item, hash_node);
> +		if (dev->dev_id == dev_id) {
> +			return dev;
> +		}
> +	}
> +	return NULL;
> +}
> +
> +/* Assumes lock is held */
> +static inline void
> +_device_add(struct nfs4_pnfs_dev_hlist *hlist, struct nfs4_pnfs_dev_item *dev)
> +{
> +	unsigned long      hash;
> +
> +	dprintk("_device_add: dev_id=%u, ip=%x, port=%hu\n", dev->dev_id,
> +		ntohl(dev->ip_addr), ntohs(dev->port));
> +
> +	hash = hash_long(dev->dev_id, NFS4_PNFS_DEV_HASH_BITS);
> +	hlist_add_head(&dev->hash_node, &hlist->dev_list[hash]);
> +}
> +
> +/* Create an rpc to the data server defined in 'dev' */
> +static int
> +device_create(struct rpc_clnt *mds_rpc, struct nfs4_pnfs_dev_item *dev)
> +{
> +	struct rpc_clnt      *clnt;
> +	struct rpc_xprt      *xprt;
> +	struct sockaddr_in    sin;
> +	int err = 0;
> +
> +	sin.sin_family = AF_INET;
> +	sin.sin_addr.s_addr = dev->ip_addr;
> +	sin.sin_port = dev->port;
> +
> +	dprintk("device_create: dev_id=%u, ip=%x, port=%hu\n", dev->dev_id, ntohl(dev->ip_addr), ntohs(dev->port));
> +
> +	xprt = xprt_create_proto(IPPROTO_TCP, &sin,
> +				 &mds_rpc->cl_xprt->timeout);
> +	if (IS_ERR(xprt)) {
> +		err = PTR_ERR(xprt);
> +		goto out;
> +	}
> +
> +	clnt = create_nfs_rpcclient(xprt, "nfs4_pnfs_dserver", mds_rpc->cl_vers, mds_rpc->cl_auth->au_flavor, &err);
> +	if (clnt == NULL) {
> +		printk("%s: Can't create nfs rpc client!\n", __FUNCTION__);
> +		goto out;
> +	}
> +
> +	dev->rpc_clnt = clnt;
> +
> + out:
> +	return err;
> +}
> +
> +static void
> +device_destroy(struct nfs4_pnfs_dev_item *dev)
> +{
> +	int status;
> +	
> +	if (!dev)
> +		return;
> +	
> +	if ((status = _nfs4_proc_destroy_session(&dev->session, dev->rpc_clnt)))
> +		printk(KERN_WARNING "destroy session on data server failed with status %d...\
> +				 blowing away device anyways!\n", status);
> +
> +	/*	BUG_ON(!atomic_sub_and_test(0, &dev->count)); */
> +	rpc_shutdown_client(dev->rpc_clnt);
> +	
> +	kfree(dev);
> +}
> +
> +int
> +nfs4_pnfs_devlist_init(struct nfs4_pnfs_dev_hlist *hlist)
> +{
> +	int i;
> +
> +	hlist->dev_lock = RW_LOCK_UNLOCKED;
> +
> +	for (i = 0; i < NFS4_PNFS_DEV_HASH; i++) {
> +		INIT_HLIST_HEAD(&hlist->dev_list[i]);
> +	}
> +
> +	return 0;
> +}
> +
> +/* De-alloc all devices for a mount point.  This is called in
> + * nfs4_kill_super.
> + */
> +void
> +nfs4_pnfs_devlist_destroy(struct nfs4_pnfs_dev_hlist *hlist)
> +{
> +	int i;
> +	
> +	if(hlist == NULL)
> +		return;
> +
> +	/* No lock held, as synchronization should occur at upper levels */
> +	for (i = 0; i < NFS4_PNFS_DEV_HASH; i++) {
> +		struct hlist_node *np, *next;
> +
> +		hlist_for_each_safe(np, next, &hlist->dev_list[i]) {
> +			struct nfs4_pnfs_dev_item *dev;
> +			dev = hlist_entry(np, struct nfs4_pnfs_dev_item, hash_node);
> +			hlist_del_rcu(&dev->hash_node);
> +			device_destroy(dev);
> +		}
> +	}
> +}
> +
> +/* Create the rpc client to the data server specific in
> + * 'dev', and add it to the list of available devices
> + * for this mount point.
> + */
> +static int
> +nfs4_pnfs_device_add(struct filelayout_mount_type *mt,
> +		     struct nfs4_pnfs_dev_item *dev)
> +{
> +	struct nfs4_pnfs_dev_item *tmp_dev;
> +	int err;
> +	struct nfs4_pnfs_dev_hlist *hlist = mt->hlist;
> +	struct nfs_server *server = NFS_SB(mt->fl_sb);
> +
> +	dprintk("nfs4_pnfs_device_add\n");
> +
> +	/* Create device */
> +	err = device_create(server->client, dev);
> +	if (err)
> +		return err;
> +
> +	dev->session = nfs41_alloc_session();
> +	if (!dev->session)
> +		return -ENOMEM;
> +
> +	err = _nfs4_proc_create_session(server->nfs4_state, 
> +				dev->session, dev->rpc_clnt);
> +
> +	if (err)
> +		return err;
> +			
> +	/* Write lock, do lookup again, and then add device */
> +	write_lock(&hlist->dev_lock);
> +	tmp_dev = _device_lookup(hlist, dev->dev_id);
> +	if (tmp_dev == NULL) {
> +		_device_add(hlist, dev);
> +	}
> +	write_unlock(&hlist->dev_lock);
> +
> +	/* Cleanup, if device was recently added */
> +	if (tmp_dev != NULL) {
> +		dprintk(" device found, not adding (after creation)\n");
> +		device_destroy(dev);
> +	}
> +
> +	return 0;
> +}
> +
> +/* Decode opaque device data and return the result
> + */
> +static struct nfs4_pnfs_dev_item*
> +decode_device(struct pnfs_device* dev)
> +{
> +	int len;
> +	int tmp[6];
> +	uint32_t *p = (uint32_t*)dev->dev_addr_buf;
> +	struct nfs4_pnfs_dev_item* file_dev;
> +	char r_addr[29]; /* max size of ip/port string */
> +
> +	if ((file_dev = kmalloc(sizeof(struct nfs4_pnfs_dev_item), GFP_KERNEL)) == NULL)
> +	{
> +		return NULL;
> +	}
> +
> +	/* Initialize dev */
> +	INIT_HLIST_NODE(&file_dev->hash_node);
> +	atomic_set(&file_dev->count, 0);
> +
> +	/* Device id */
> +	file_dev->dev_id = dev->dev_id;
> +
> +	/* Get the device type */
> +	READ32(dev->dev_type);
> +
> +	if (dev->dev_type != FILE_SIMPLE) {
> +		printk(KERN_NOTICE "Device type %d not supported!\n", dev->dev_type);
> +		return NULL;
> +	}
> +
> +	/* Get the device count */
> +	READ32(dev->dev_count);
> +	
> +	if (dev->dev_count > 1)
> +		printk(KERN_NOTICE "%s: Add loop for dev_count\n", __FUNCTION__);
> +
> +	/* Decode contents of device*/
> +
> +        /* device addr --  r_netid, r_addr */
> +
> +	/* check and skip r_netid */
> +	READ32(len);
> +	if (len != 3) /* "tcp" */
> +		return NULL;
> +	/* Read the bytes into a temporary buffer */
> +	/* TODO: should probably sanity check them */
> +	READ32(tmp[0]);
> +
> +	READ32(len);
> +	if (len > 29) {
> +		printk("%s: ERROR: Device ip/port string too long (%d)\n",__FUNCTION__, len);
> +		kfree(file_dev);
> +		return NULL;
> +	}
> +	memcpy(r_addr, p, len);
> +	r_addr[len] = '\0';
> +	sscanf(r_addr, "%d.%d.%d.%d.%d.%d", &tmp[0], &tmp[1],
> +	       &tmp[2], &tmp[3], &tmp[4], &tmp[5]);
> +	file_dev->ip_addr = htonl((tmp[0]<<24) | (tmp[1]<<16) |
> +				  (tmp[2]<<8) | (tmp[3]));
> +	file_dev->port = htons((tmp[4] << 8) | (tmp[5]));
> +	dprintk("%s: addr:port string = %s\n",__FUNCTION__, r_addr);
> +	
> +	return file_dev;
> +}
> +
> +/* Decode the opaque device specified in 'dev'
> + * and add it to the list of available devices for this
> + * mount point.
> + * Must at some point be followed up with device_destroy
> + */
> +static struct nfs4_pnfs_dev_item*
> +decode_and_add_device(struct filelayout_mount_type *mt, struct pnfs_device* dev)
> +{
> +	struct nfs4_pnfs_dev_item* file_dev;
> +
> +	file_dev = decode_device(dev);
> +
> +	if (!file_dev)
> +	{
> +		printk("%s Could not decode device\n", __FUNCTION__);
> +		return NULL;
> +	}
> +
> +	if (nfs4_pnfs_device_add(mt, file_dev))
> +		return NULL;
> +	return file_dev;
> +}
> +
> +/* Decode the opaque device list in 'devlist'
> + * and add it to the list of available devices for this
> + * mount point.
> + * Must at some point be followed up with device_destroy.
> + */
> +int
> +decode_and_add_devicelist(struct filelayout_mount_type *mt, struct pnfs_devicelist* devlist)
> +{
> +	int i, cnt;
> +
> +	for (i = 0,cnt=0; i < devlist->num_devs && cnt < NFS4_PNFS_DEV_MAXCOUNT; i++) {
> +		if (!decode_and_add_device(mt, &devlist->devs[cnt]))
> +			return 1;
> +		cnt++;
> +	}
> +	return 0;
> +}
> +
> +/* Retrieve the information for dev_id, add it to the list
> + * of available devices, and return it.
> + */
> +static struct nfs4_pnfs_dev_item *
> +get_device_info(struct filelayout_mount_type *mt, u32 dev_id)
> +{
> +	int rc;
> +	struct pnfs_device *pdev = NULL;
> +
> +	if ((pdev = kmalloc(sizeof(struct pnfs_device), GFP_KERNEL)) == NULL)
> +	{
> +		return NULL;
> +	}
> +
> +	pdev->dev_id = dev_id;
> +
> +	rc = pnfs_callback_ops->nfs_getdeviceinfo(mt->fl_sb, dev_id, pdev);
> +	if (rc) {
> +		return NULL;
> +        }
> +
> +	/* Found new device, need to decode it and then add it to the
> +	 * list of known devices for this mountpoint.
> +	 */
> +	return decode_and_add_device(mt, pdev);
> +}
> +
> +/* Lookup and return the device dev_id
> + */
> +struct nfs4_pnfs_dev_item *
> +nfs4_pnfs_device_get(struct inode *inode, u32 dev_id)
> +{
> +	struct nfs4_pnfs_dev_item *dev;
> +	struct nfs_server* server = NFS_SERVER(inode);
> +	struct filelayout_mount_type *mt = (struct filelayout_mount_type*)server->pnfs_mountid->mountid;
> +	struct nfs4_pnfs_dev_hlist *hlist = mt->hlist;
> +
> +	read_lock(&hlist->dev_lock);
> +	dev = _device_lookup(hlist, dev_id);
> +/*
> +	if (dev) {
> +		atomic_inc(&dev->count);
> +	}
> +*/
> +	read_unlock(&hlist->dev_lock);
> +	if (dev == NULL)
> +		dev = get_device_info(mt, dev_id);
> +
> +	return dev;
> +}
> +
> +/* Retrieve the rpc client for a specified byte range
> + * in 'inode' by filling in the contents of 'dserver'.
> + */
> +int
> +nfs4_pnfs_dserver_get(struct inode *inode,
> +		      struct nfs4_filelayout *layout,
> +		      u64 offset,
> +		      u32 count,
> +		      struct nfs4_pnfs_dserver *dserver)
> +{
> +	u32 dev_id;
> +	u64 tmp;
> +	u32 stripe_idx, dbg_stripe_idx;
> +
> +	if(!layout)
> +		return 1;
> +
> +	tmp = offset;
> +	/* Want ((offset / layout->stripe_unit) % layout->num_devs) */
> +	do_div(tmp, layout->stripe_unit);
> +	stripe_idx = do_div(tmp, layout->num_devs);
> +
> +	/* For debugging */
> +	tmp = offset + count - 1;
> +	do_div(tmp, layout->stripe_unit);
> +	dbg_stripe_idx = do_div(tmp, layout->num_devs);
> +
> +	dprintk("%s: offset=%Lu, count=%u, si=%u, dsi=%u, "
> +		   "num_devs=%u, stripe_unit=%Lu\n",
> +                   __FUNCTION__,
> +		   offset, count, stripe_idx, dbg_stripe_idx, layout->num_devs,
> +		   layout->stripe_unit);
> +
> +	BUG_ON(dbg_stripe_idx != stripe_idx);
> +
> +	dev_id = layout->devs[stripe_idx].dev_id;
> +
> +	dserver->dev_item = nfs4_pnfs_device_get(inode, dev_id);
> +	if (dserver->dev_item == NULL)
> +		return 1;
> +	dserver->fh = &layout->devs[stripe_idx].fh;
> +
> +	dprintk("%s: dev_id=%u, idx=%u, offset=%Lu, count=%u\n",
> +                    __FUNCTION__, dev_id, stripe_idx, offset, count);
> +
> +	return 0;
> +}
> +
> +/* Currently not used.
> + * I have disabled checking the device count until we can think of a good way
> + * to call nfs4_pnfs_device_put in a generic way from the pNFS client.
> + * The only way I think think of is to put the nfs4_pnfs_dev_item directly
> + * in the nfs4_write/read_data structure, which breaks the clear line between
> + * the pNFS client and layout drivers.  If I did do this, then I could call
> + * an ioctl on the NFSv4 file layout driver to decrement the device count.
> + */
> +static void
> +nfs4_pnfs_device_put(struct nfs4_pnfs_dev_hlist *hlist, struct nfs4_pnfs_dev_item *dev)
> +{
> +	dprintk("nfs4_pnfs_device_put: dev_id=%u\n", dev->dev_id);
> +	atomic_dec(&dev->count);
> +}
> diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
> index b14145b..63751d7 100644
> --- a/fs/nfs/nfs4proc.c
> +++ b/fs/nfs/nfs4proc.c
> @@ -48,10 +48,15 @@
>  #include <linux/smp_lock.h>
>  #include <linux/namei.h>
>  #include <linux/mount.h>
> +#include <linux/pnfs_xdr.h>
> +#include <linux/nfs4_pnfs.h>
> +#include <linux/module.h>
>  
>  #include "nfs4_fs.h"
>  #include "delegation.h"
>  #include "iostat.h"
> +#include "pnfs.h"
> +#include "callback.h"
>  
>  #define NFSDBG_FACILITY		NFSDBG_PROC
>  
> @@ -65,6 +70,9 @@ static int nfs4_async_handle_error(struct rpc_task *, const struct nfs_server *)
>  static int _nfs4_proc_access(struct inode *inode, struct nfs_access_entry *entry);
>  static int nfs4_handle_exception(const struct nfs_server *server, int errorcode, struct nfs4_exception *exception);
>  static int nfs4_wait_clnt_recover(struct rpc_clnt *clnt, struct nfs4_client *clp);
> +extern int pnfs_pagein_one(struct list_head *head, struct inode *inode);
> +extern int pnfs_flush_one(struct inode *, struct list_head *, int, int);
> +void nfs4_put_session(struct nfs4_session **session);
>  
>  /* Prevent leaks of NFSv4 errors into userland */
>  int nfs4_map_errors(int err)
> @@ -116,7 +124,7 @@ const u32 nfs4_fsinfo_bitmap[2] = { FATTR4_WORD0_MAXFILESIZE
>  			| FATTR4_WORD0_MAXREAD
>  			| FATTR4_WORD0_MAXWRITE
>  			| FATTR4_WORD0_LEASE_TIME,
> -			0
> +			FATTR4_WORD1_FS_LAYOUT_TYPES
>  };
>  
>  const u32 nfs4_fs_locations_bitmap[2] = {
> @@ -202,6 +210,105 @@ static void renew_lease(const struct nfs_server *server, unsigned long timestamp
>  	spin_unlock(&clp->cl_lock);
>  }
>  
> +int nfs4_recover_expired_lease(struct nfs_server *server)
> +{
> +	struct nfs4_client *clp = server->nfs4_state;
> +
> +	if (test_and_clear_bit(NFS4CLNT_LEASE_EXPIRED, &clp->cl_state)) {
> +		clp->cl_session->expired = 1;
> +		nfs4_schedule_state_recovery(clp);
> +	}
> +	return nfs4_wait_clnt_recover(server->client, clp);
> +}
> +
> +static int nfs41_proc_sequence_done(struct nfs4_session *session, struct nfs41_sequence_res *res, int status)
> +{
> +       unsigned long timestamp;
> +	struct nfs4_client *clp;
> +
> +	if (!session || !(clp = session->client))
> +		return 0;
> +
> +       if (!status) {
> +               timestamp = jiffies;
> +
> +               spin_lock(&clp->cl_lock);
> +               if (time_before(clp->cl_last_renewal,timestamp))
> +                       clp->cl_last_renewal = timestamp;
> +               spin_unlock(&clp->cl_lock);
> +       }
> +
> +       return status;
> +}
> +
> +static int _nfs4_proc_setup_sequence(struct nfs4_session *session, struct nfs41_sequence_args *args, struct nfs41_sequence_res *res)
> +{
> +        u32 *ptr;
> +        ptr = (u32 *)session->sess_id;
> +        dprintk("%s: %u:%u:%u:%u\n", __FUNCTION__, ptr[0], ptr[1], ptr[2], ptr[3]);
> +	
> +       memcpy(args->sessionid, (unsigned char *)session->sess_id, NFS4_MAX_SESSIONID_LEN);
> +
> +       return 0;
> +}
> +
> +static int nfs41_proc_setup_sequence_call(struct nfs4_session *session, struct nfs41_sequence_args *args, struct nfs41_sequence_res *res)
> +{
> +	int status;
> +	struct nfs_server *server;
> +	struct rpc_cred *cred;
> +	struct nfs4_client *clp;
> +
> +	if (!session)
> +		BUG();
> +
> +	clp = session->client;
> +
> +	if (!clp)
> +		BUG();
> +	/* Check if the session setup is in progress */
> +	down(&session->session_sem);
> +	if (session->mutating)
> +		BUG();
> +	up(&session->session_sem);
> +
> +	status = _nfs4_proc_setup_sequence(session, args, res);
> +	if (status)
> +		goto out;
> +
> +	/* This could be a call from the layout driver */
> +	if (!clp)
> +		goto out;
> +
> +	if (list_empty(&clp->cl_superblocks))
> +		BUG();
> +
> +	status = -1;
> +	list_for_each_entry(server, &clp->cl_superblocks, nfs4_siblings) {
> +		cred = rpcauth_lookupcred(server->client->cl_auth, 0);
> +		if (IS_ERR(cred))
> +			continue;
> +
> +		status = 0;
> +		break;
> +	}
> +
> +	if (status)
> +		goto out;
> +
> +	status = -ENOMEM;
> +	if (!(res->sp = nfs4_get_state_owner(server, cred))) {
> +		dprintk("%s: nfs4_get_state_owner failed!\n", __FUNCTION__);
> +		goto out;
> +	}
> +	
> +	status = nfs4_recover_expired_lease(server);
> +
> +	nfs4_put_state_owner(res->sp);
> +out:
> +	return status;
> +}
> +
>  static void update_changeattr(struct inode *dir, struct nfs4_change_info *cinfo)
>  {
>  	struct nfs_inode *nfsi = NFS_I(dir);
> @@ -228,6 +335,7 @@ struct nfs4_opendata {
>  	unsigned long timestamp;
>  	int rpc_status;
>  	int cancelled;
> +	struct nfs_server *server;
>  };
>  
>  static struct nfs4_opendata *nfs4_opendata_alloc(struct dentry *dentry,
> @@ -242,13 +350,31 @@ static struct nfs4_opendata *nfs4_opendata_alloc(struct dentry *dentry,
>  	p = kzalloc(sizeof(*p), GFP_KERNEL);
>  	if (p == NULL)
>  		goto err;
> +
> +        if (server->rpc_ops->setup_sequence) {
> +                p->o_arg.minorversion_info = kzalloc(sizeof(struct nfs41_sequence_args), GFP_KERNEL);
> +                if (!p->o_arg.minorversion_info)
> +                        goto err_free;
> +                p->o_res.minorversion_info = kzalloc(sizeof(struct nfs41_sequence_res), GFP_KERNEL);
> +                if (!p->o_res.minorversion_info)
> +                        goto err_free1;
> + 
> +                p->c_arg.minorversion_info = kzalloc(sizeof(struct nfs41_sequence_args), GFP_KERNEL);
> +                if (!p->c_arg.minorversion_info)
> +                        goto err_free2;
> +                p->c_res.minorversion_info = kzalloc(sizeof(struct nfs41_sequence_res), GFP_KERNEL);
> +                if (!p->c_res.minorversion_info)
> +                       goto err_free3;
> +        }
> +
>  	p->o_arg.seqid = nfs_alloc_seqid(&sp->so_seqid);
>  	if (p->o_arg.seqid == NULL)
> -		goto err_free;
> +		goto err_free4;
>  	atomic_set(&p->count, 1);
>  	p->dentry = dget(dentry);
>  	p->dir = parent;
>  	p->owner = sp;
> +	p->server = server;
>  	atomic_inc(&sp->so_count);
>  	p->o_arg.fh = NFS_FH(dir);
>  	p->o_arg.open_flags = flags,
> @@ -275,6 +401,15 @@ static struct nfs4_opendata *nfs4_opendata_alloc(struct dentry *dentry,
>  	p->c_arg.stateid = &p->o_res.stateid;
>  	p->c_arg.seqid = p->o_arg.seqid;
>  	return p;
> +
> +err_free4:
> +       kfree(p->c_res.minorversion_info);
> +err_free3:
> +       kfree(p->c_arg.minorversion_info);
> +err_free2:
> +       kfree(p->o_res.minorversion_info);
> +err_free1:
> +       kfree(p->o_arg.minorversion_info);
>  err_free:
>  	kfree(p);
>  err:
> @@ -289,6 +424,14 @@ static void nfs4_opendata_free(struct nfs4_opendata *p)
>  		nfs4_put_state_owner(p->owner);
>  		dput(p->dir);
>  		dput(p->dentry);
> +
> +                if (p->server->rpc_ops->setup_sequence) {
> +                        kfree(p->o_arg.minorversion_info);
> +                        kfree(p->o_res.minorversion_info);
> +                        kfree(p->c_arg.minorversion_info);
> +                        kfree(p->c_res.minorversion_info);
> +                }
> +
>  		kfree(p);
>  	}
>  }
> @@ -575,6 +718,11 @@ static void nfs4_open_confirm_done(struct rpc_task *task, void *calldata)
>  {
>  	struct nfs4_opendata *data = calldata;
>  
> +        if(data->server->rpc_ops->sequence_done)
> +                data->server->rpc_ops->sequence_done(data->server->nfs4_state->cl_session,
> +			 data->c_res.minorversion_info, task->tk_status);
> +
> +
>  	data->rpc_status = task->tk_status;
>  	if (RPC_ASSASSINATED(task))
>  		return;
> @@ -627,9 +775,22 @@ static int _nfs4_proc_open_confirm(struct nfs4_opendata *data)
>  	 * want to ensure that it takes the 'error' code path.
>  	 */
>  	data->rpc_status = -ENOMEM;
> -	task = rpc_run_task(server->client, RPC_TASK_ASYNC, &nfs4_open_confirm_ops, data);
> -	if (IS_ERR(task))
> -		return PTR_ERR(task);
> +
> +        if (server->rpc_ops->setup_sequence) {
> +                if (server->rpc_ops->setup_sequence(server->nfs4_state->cl_session, 
> +			data->c_arg.minorversion_info, 
> +			data->c_res.minorversion_info))
> +                        return -ENOMEM;
> +        }
> +
> +	task = rpc_run_task(server->client, RPC_TASK_ASYNC, 
> +		&nfs4_open_confirm_ops, data);
> +
> +	if (IS_ERR(task)) {
> +                status = PTR_ERR(task);
> +                goto out;
> +        }
> +
>  	status = nfs4_wait_for_completion_rpc_task(task);
>  	if (status != 0) {
>  		data->cancelled = 1;
> @@ -637,6 +798,12 @@ static int _nfs4_proc_open_confirm(struct nfs4_opendata *data)
>  	} else
>  		status = data->rpc_status;
>  	rpc_release_task(task);
> +
> +out:
> +        if (server->rpc_ops->sequence_done)
> +                server->rpc_ops->sequence_done(server->nfs4_state->cl_session, 
> +			data->c_res.minorversion_info, status);
> +	
>  	return status;
>  }
>  
> @@ -666,6 +833,10 @@ static void nfs4_open_done(struct rpc_task *task, void *calldata)
>  {
>  	struct nfs4_opendata *data = calldata;
>  
> +        if(data->server->rpc_ops->sequence_done)
> +                data->server->rpc_ops->sequence_done(data->server->nfs4_state->cl_session, 
> +		data->o_res.minorversion_info, task->tk_status);
> +
>  	data->rpc_status = task->tk_status;
>  	if (RPC_ASSASSINATED(task))
>  		return;
> @@ -733,9 +904,19 @@ static int _nfs4_proc_open(struct nfs4_opendata *data)
>  	 * want to ensure that it takes the 'error' code path.
>  	 */
>  	data->rpc_status = -ENOMEM;
> +
> +        if (server->rpc_ops->setup_sequence && (server->rpc_ops->setup_sequence(
> +		server->nfs4_state->cl_session, data->o_arg.minorversion_info, 
> +		data->o_res.minorversion_info)))
> +                return -ENOMEM;
> +
> +
>  	task = rpc_run_task(server->client, RPC_TASK_ASYNC, &nfs4_open_ops, data);
> -	if (IS_ERR(task))
> -		return PTR_ERR(task);
> +	if (IS_ERR(task)) {
> +                status = PTR_ERR(task);
> +                goto out;
> +        }
> +
>  	status = nfs4_wait_for_completion_rpc_task(task);
>  	if (status != 0) {
>  		data->cancelled = 1;
> @@ -744,7 +925,7 @@ static int _nfs4_proc_open(struct nfs4_opendata *data)
>  		status = data->rpc_status;
>  	rpc_release_task(task);
>  	if (status != 0)
> -		return status;
> +		goto out;
>  
>  	if (o_arg->open_flags & O_CREAT) {
>  		update_changeattr(dir, &o_res->cinfo);
> @@ -754,12 +935,20 @@ static int _nfs4_proc_open(struct nfs4_opendata *data)
>  	if(o_res->rflags & NFS4_OPEN_RESULT_CONFIRM) {
>  		status = _nfs4_proc_open_confirm(data);
>  		if (status != 0)
> -			return status;
> +			goto out;
>  	}
>  	nfs_confirm_seqid(&data->owner->so_seqid, 0);
> -	if (!(o_res->f_attr->valid & NFS_ATTR_FATTR))
> -		return server->rpc_ops->getattr(server, &o_res->fh, o_res->f_attr);
> -	return 0;
> +        if (!(o_res->f_attr->valid & NFS_ATTR_FATTR)) {
> +                status = server->rpc_ops->getattr(server, &o_res->fh, o_res->f_attr);
> +        }
> +
> +out:
> +        if (server->rpc_ops->sequence_done)
> +                server->rpc_ops->sequence_done(server->nfs4_state->cl_session, 
> +				data->o_res.minorversion_info, status);
> +
> +        return status;
> +
>  }
>  
>  static int _nfs4_do_access(struct inode *inode, struct rpc_cred *cred, int openflags)
> @@ -790,15 +979,6 @@ out:
>  	return -EACCES;
>  }
>  
> -int nfs4_recover_expired_lease(struct nfs_server *server)
> -{
> -	struct nfs4_client *clp = server->nfs4_state;
> -
> -	if (test_and_clear_bit(NFS4CLNT_LEASE_EXPIRED, &clp->cl_state))
> -		nfs4_schedule_state_recovery(clp);
> -	return nfs4_wait_clnt_recover(server->client, clp);
> -}
> -
>  /*
>   * OPEN_EXPIRED:
>   * 	reclaim state on the server after a network partition.
> @@ -982,6 +1162,12 @@ static int _nfs4_do_open(struct inode *dir, struct dentry *dentry, int flags, st
>  		goto err_opendata_free;
>  	if (opendata->o_res.delegation_type != 0)
>  		nfs_inode_set_delegation(state->inode, cred, &opendata->o_res);
> +
> +	/* Initialize extents */
> + 	NFS_I(state->inode)->current_layout = NULL;
> +	NFS_I(state->inode)->pnfs_write_begin_pos = 0;
> +	NFS_I(state->inode)->pnfs_write_end_pos = 0;
> +
>  	nfs4_opendata_free(opendata);
>  	nfs4_put_state_owner(sp);
>  	up_read(&clp->cl_sem);
> @@ -1045,15 +1231,20 @@ static int _nfs4_do_setattr(struct inode *inode, struct nfs_fattr *fattr,
>                  struct iattr *sattr, struct nfs4_state *state)
>  {
>  	struct nfs_server *server = NFS_SERVER(inode);
> +        struct nfs41_sequence_args seqargs;
> +        struct nfs41_sequence_res seqres;
> +
>          struct nfs_setattrargs  arg = {
>                  .fh             = NFS_FH(inode),
>                  .iap            = sattr,
>  		.server		= server,
>  		.bitmask = server->attr_bitmask,
> +		.minorversion_info = &seqargs,
>          };
>          struct nfs_setattrres  res = {
>  		.fattr		= fattr,
>  		.server		= server,
> +		.minorversion_info = &seqres,
>          };
>          struct rpc_message msg = {
>                  .rpc_proc       = &nfs4_procedures[NFSPROC4_CLNT_SETATTR],
> @@ -1063,6 +1254,13 @@ static int _nfs4_do_setattr(struct inode *inode, struct nfs_fattr *fattr,
>  	unsigned long timestamp = jiffies;
>  	int status;
>  
> +	if (server->rpc_ops->setup_sequence) {
> +                status = server->rpc_ops->setup_sequence(server->nfs4_state->cl_session, 
> +				&seqargs, &seqres);
> +                if (status)
> +                        return status;
> +        }
> +
>  	nfs_fattr_init(fattr);
>  
>  	if (nfs4_copy_delegation_stateid(&arg.stateid, inode)) {
> @@ -1076,6 +1274,11 @@ static int _nfs4_do_setattr(struct inode *inode, struct nfs_fattr *fattr,
>  	status = rpc_call_sync(server->client, &msg, 0);
>  	if (status == 0 && state != NULL)
>  		renew_lease(server, timestamp);
> +
> +	if (server->rpc_ops->sequence_done)
> +               server->rpc_ops->sequence_done(server->nfs4_state->cl_session, 
> +			&seqres, status);
> +
>  	return status;
>  }
>  
> @@ -1110,6 +1313,13 @@ static void nfs4_free_closedata(void *data)
>  	nfs4_put_open_state(calldata->state);
>  	nfs_free_seqid(calldata->arg.seqid);
>  	nfs4_put_state_owner(sp);
> +
> +        if (calldata->arg.minorversion_info)
> +                kfree(calldata->arg.minorversion_info);
> +
> +        if (calldata->res.minorversion_info)
> +                kfree(calldata->res.minorversion_info);
> +