flexfilelayoutdev.c 18 KB
Newer Older
1
// SPDX-License-Identifier: GPL-2.0
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Device operations for the pnfs nfs4 file layout driver.
 *
 * Copyright (c) 2014, Primary Data, Inc. All rights reserved.
 *
 * Tao Peng <bergwolf@primarydata.com>
 */

#include <linux/nfs_fs.h>
#include <linux/vmalloc.h>
#include <linux/module.h>
#include <linux/sunrpc/addr.h>

#include "../internal.h"
#include "../nfs4session.h"
#include "flexfilelayout.h"

#define NFSDBG_FACILITY		NFSDBG_PNFS_LD

21 22
static unsigned int dataserver_timeo = NFS_DEF_TCP_RETRANS;
static unsigned int dataserver_retrans;
23

24 25
static bool ff_layout_has_available_ds(struct pnfs_layout_segment *lseg);

26 27
void nfs4_ff_layout_put_deviceid(struct nfs4_ff_layout_ds *mirror_ds)
{
28
	if (!IS_ERR_OR_NULL(mirror_ds))
29 30 31 32 33 34 35
		nfs4_put_deviceid_node(&mirror_ds->id_node);
}

void nfs4_ff_layout_free_deviceid(struct nfs4_ff_layout_ds *mirror_ds)
{
	nfs4_print_deviceid(&mirror_ds->id_node.deviceid);
	nfs4_pnfs_ds_put(mirror_ds->ds);
36
	kfree(mirror_ds->ds_versions);
37
	kfree_rcu(mirror_ds, id_node.rcu);
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
}

/* Decode opaque device data and construct new_ds using it */
struct nfs4_ff_layout_ds *
nfs4_ff_alloc_deviceid_node(struct nfs_server *server, struct pnfs_device *pdev,
			    gfp_t gfp_flags)
{
	struct xdr_stream stream;
	struct xdr_buf buf;
	struct page *scratch;
	struct list_head dsaddrs;
	struct nfs4_pnfs_ds_addr *da;
	struct nfs4_ff_layout_ds *new_ds = NULL;
	struct nfs4_ff_ds_version *ds_versions = NULL;
	u32 mp_count;
	u32 version_count;
	__be32 *p;
	int i, ret = -ENOMEM;

	/* set up xdr stream */
	scratch = alloc_page(gfp_flags);
	if (!scratch)
		goto out_err;

	new_ds = kzalloc(sizeof(struct nfs4_ff_layout_ds), gfp_flags);
	if (!new_ds)
		goto out_scratch;

	nfs4_init_deviceid_node(&new_ds->id_node,
				server,
				&pdev->dev_id);
	INIT_LIST_HEAD(&dsaddrs);

	xdr_init_decode_pages(&stream, &buf, pdev->pages, pdev->pglen);
	xdr_set_scratch_buffer(&stream, page_address(scratch), PAGE_SIZE);

	/* multipath count */
	p = xdr_inline_decode(&stream, 4);
	if (unlikely(!p))
		goto out_err_drain_dsaddrs;
	mp_count = be32_to_cpup(p);
	dprintk("%s: multipath ds count %d\n", __func__, mp_count);

	for (i = 0; i < mp_count; i++) {
		/* multipath ds */
		da = nfs4_decode_mp_ds_addr(server->nfs_client->cl_net,
					    &stream, gfp_flags);
		if (da)
			list_add_tail(&da->da_node, &dsaddrs);
	}
	if (list_empty(&dsaddrs)) {
		dprintk("%s: no suitable DS addresses found\n",
			__func__);
		ret = -ENOMEDIUM;
		goto out_err_drain_dsaddrs;
	}

	/* version count */
	p = xdr_inline_decode(&stream, 4);
	if (unlikely(!p))
		goto out_err_drain_dsaddrs;
	version_count = be32_to_cpup(p);
	dprintk("%s: version count %d\n", __func__, version_count);

102 103
	ds_versions = kcalloc(version_count,
			      sizeof(struct nfs4_ff_ds_version),
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
			      gfp_flags);
	if (!ds_versions)
		goto out_scratch;

	for (i = 0; i < version_count; i++) {
		/* 20 = version(4) + minor_version(4) + rsize(4) + wsize(4) +
		 * tightly_coupled(4) */
		p = xdr_inline_decode(&stream, 20);
		if (unlikely(!p))
			goto out_err_drain_dsaddrs;
		ds_versions[i].version = be32_to_cpup(p++);
		ds_versions[i].minor_version = be32_to_cpup(p++);
		ds_versions[i].rsize = nfs_block_size(be32_to_cpup(p++), NULL);
		ds_versions[i].wsize = nfs_block_size(be32_to_cpup(p++), NULL);
		ds_versions[i].tightly_coupled = be32_to_cpup(p);

		if (ds_versions[i].rsize > NFS_MAX_FILE_IO_SIZE)
			ds_versions[i].rsize = NFS_MAX_FILE_IO_SIZE;
		if (ds_versions[i].wsize > NFS_MAX_FILE_IO_SIZE)
			ds_versions[i].wsize = NFS_MAX_FILE_IO_SIZE;

125 126 127 128 129 130 131
		/*
		 * check for valid major/minor combination.
		 * currently we support dataserver which talk:
		 *   v3, v4.0, v4.1, v4.2
		 */
		if (!((ds_versions[i].version == 3 && ds_versions[i].minor_version == 0) ||
			(ds_versions[i].version == 4 && ds_versions[i].minor_version < 3))) {
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
			dprintk("%s: [%d] unsupported ds version %d-%d\n", __func__,
				i, ds_versions[i].version,
				ds_versions[i].minor_version);
			ret = -EPROTONOSUPPORT;
			goto out_err_drain_dsaddrs;
		}

		dprintk("%s: [%d] vers %u minor_ver %u rsize %u wsize %u coupled %d\n",
			__func__, i, ds_versions[i].version,
			ds_versions[i].minor_version,
			ds_versions[i].rsize,
			ds_versions[i].wsize,
			ds_versions[i].tightly_coupled);
	}

	new_ds->ds_versions = ds_versions;
	new_ds->ds_versions_cnt = version_count;

	new_ds->ds = nfs4_pnfs_ds_add(&dsaddrs, gfp_flags);
	if (!new_ds->ds)
		goto out_err_drain_dsaddrs;

	/* If DS was already in cache, free ds addrs */
	while (!list_empty(&dsaddrs)) {
		da = list_first_entry(&dsaddrs,
				      struct nfs4_pnfs_ds_addr,
				      da_node);
		list_del_init(&da->da_node);
		kfree(da->da_remotestr);
		kfree(da);
	}

	__free_page(scratch);
	return new_ds;

out_err_drain_dsaddrs:
	while (!list_empty(&dsaddrs)) {
		da = list_first_entry(&dsaddrs, struct nfs4_pnfs_ds_addr,
				      da_node);
		list_del_init(&da->da_node);
		kfree(da->da_remotestr);
		kfree(da);
	}

	kfree(ds_versions);
out_scratch:
	__free_page(scratch);
out_err:
	kfree(new_ds);

	dprintk("%s ERROR: returning %d\n", __func__, ret);
	return NULL;
}

186 187 188
static void ff_layout_mark_devid_invalid(struct pnfs_layout_segment *lseg,
		struct nfs4_deviceid_node *devid)
{
189
	nfs4_delete_deviceid(devid->ld, devid->nfs_client, &devid->deviceid);
190 191 192 193 194 195
	if (!ff_layout_has_available_ds(lseg))
		pnfs_error_mark_layout_for_return(lseg->pls_layout->plh_inode,
				lseg);
}

static bool ff_layout_mirror_valid(struct pnfs_layout_segment *lseg,
196 197
				   struct nfs4_ff_layout_mirror *mirror,
				   bool create)
198
{
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
	if (mirror == NULL || IS_ERR(mirror->mirror_ds))
		goto outerr;
	if (mirror->mirror_ds == NULL) {
		if (create) {
			struct nfs4_deviceid_node *node;
			struct pnfs_layout_hdr *lh = lseg->pls_layout;
			struct nfs4_ff_layout_ds *mirror_ds = ERR_PTR(-ENODEV);

			node = nfs4_find_get_deviceid(NFS_SERVER(lh->plh_inode),
					&mirror->devid, lh->plh_lc_cred,
					GFP_KERNEL);
			if (node)
				mirror_ds = FF_LAYOUT_MIRROR_DS(node);

			/* check for race with another call to this function */
			if (cmpxchg(&mirror->mirror_ds, NULL, mirror_ds) &&
			    mirror_ds != ERR_PTR(-ENODEV))
				nfs4_put_deviceid_node(node);
		} else
			goto outerr;
219
	}
220 221 222 223

	if (IS_ERR(mirror->mirror_ds))
		goto outerr;

224 225 226 227 228 229 230
	if (mirror->mirror_ds->ds == NULL) {
		struct nfs4_deviceid_node *devid;
		devid = &mirror->mirror_ds->id_node;
		ff_layout_mark_devid_invalid(lseg, devid);
		return false;
	}
	return true;
231 232 233
outerr:
	pnfs_error_mark_layout_for_return(lseg->pls_layout->plh_inode, lseg);
	return false;
234 235
}

236 237 238 239 240
static void extend_ds_error(struct nfs4_ff_layout_ds_err *err,
			    u64 offset, u64 length)
{
	u64 end;

241 242
	end = max_t(u64, pnfs_end_offset(err->offset, err->length),
		    pnfs_end_offset(offset, length));
243 244 245 246
	err->offset = min_t(u64, err->offset, offset);
	err->length = end - err->offset;
}

247 248 249
static int
ff_ds_error_match(const struct nfs4_ff_layout_ds_err *e1,
		const struct nfs4_ff_layout_ds_err *e2)
250
{
251 252 253 254 255 256
	int ret;

	if (e1->opnum != e2->opnum)
		return e1->opnum < e2->opnum ? -1 : 1;
	if (e1->status != e2->status)
		return e1->status < e2->status ? -1 : 1;
257 258
	ret = memcmp(e1->stateid.data, e2->stateid.data,
			sizeof(e1->stateid.data));
259 260 261 262 263
	if (ret != 0)
		return ret;
	ret = memcmp(&e1->deviceid, &e2->deviceid, sizeof(e1->deviceid));
	if (ret != 0)
		return ret;
264
	if (pnfs_end_offset(e1->offset, e1->length) < e2->offset)
265
		return -1;
266
	if (e1->offset > pnfs_end_offset(e2->offset, e2->length))
267 268 269
		return 1;
	/* If ranges overlap or are contiguous, they are the same */
	return 0;
270 271
}

272
static void
273 274 275
ff_layout_add_ds_error_locked(struct nfs4_flexfile_layout *flo,
			      struct nfs4_ff_layout_ds_err *dserr)
{
276 277 278 279 280 281 282 283 284 285 286 287
	struct nfs4_ff_layout_ds_err *err, *tmp;
	struct list_head *head = &flo->error_list;
	int match;

	/* Do insertion sort w/ merges */
	list_for_each_entry_safe(err, tmp, &flo->error_list, list) {
		match = ff_ds_error_match(err, dserr);
		if (match < 0)
			continue;
		if (match > 0) {
			/* Add entry "dserr" _before_ entry "err" */
			head = &err->list;
288 289
			break;
		}
290 291
		/* Entries match, so merge "err" into "dserr" */
		extend_ds_error(dserr, err->offset, err->length);
292
		list_replace(&err->list, &dserr->list);
293
		kfree(err);
294
		return;
295 296
	}

297
	list_add_tail(&dserr->list, head);
298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326
}

int ff_layout_track_ds_error(struct nfs4_flexfile_layout *flo,
			     struct nfs4_ff_layout_mirror *mirror, u64 offset,
			     u64 length, int status, enum nfs_opnum4 opnum,
			     gfp_t gfp_flags)
{
	struct nfs4_ff_layout_ds_err *dserr;

	if (status == 0)
		return 0;

	if (mirror->mirror_ds == NULL)
		return -EINVAL;

	dserr = kmalloc(sizeof(*dserr), gfp_flags);
	if (!dserr)
		return -ENOMEM;

	INIT_LIST_HEAD(&dserr->list);
	dserr->offset = offset;
	dserr->length = length;
	dserr->status = status;
	dserr->opnum = opnum;
	nfs4_stateid_copy(&dserr->stateid, &mirror->stateid);
	memcpy(&dserr->deviceid, &mirror->mirror_ds->id_node.deviceid,
	       NFS4_DEVICEID4_SIZE);

	spin_lock(&flo->generic_hdr.plh_inode->i_lock);
327
	ff_layout_add_ds_error_locked(flo, dserr);
328 329 330 331 332
	spin_unlock(&flo->generic_hdr.plh_inode->i_lock);

	return 0;
}

333
static const struct cred *
334 335
ff_layout_get_mirror_cred(struct nfs4_ff_layout_mirror *mirror, u32 iomode)
{
336
	const struct cred *cred, __rcu **pcred;
337

338 339 340 341
	if (iomode == IOMODE_READ)
		pcred = &mirror->ro_cred;
	else
		pcred = &mirror->rw_cred;
342 343 344 345 346 347 348

	rcu_read_lock();
	do {
		cred = rcu_dereference(*pcred);
		if (!cred)
			break;

349
		cred = get_cred_rcu(cred);
350 351 352 353 354
	} while(!cred);
	rcu_read_unlock();
	return cred;
}

355 356 357 358 359 360
struct nfs_fh *
nfs4_ff_layout_select_ds_fh(struct pnfs_layout_segment *lseg, u32 mirror_idx)
{
	struct nfs4_ff_layout_mirror *mirror = FF_LAYOUT_COMP(lseg, mirror_idx);
	struct nfs_fh *fh = NULL;

361
	if (!ff_layout_mirror_valid(lseg, mirror, false)) {
362
		pr_err_ratelimited("NFS: %s: No data server for mirror offset index %d\n",
363 364 365 366 367 368 369 370 371 372
			__func__, mirror_idx);
		goto out;
	}

	/* FIXME: For now assume there is only 1 version available for the DS */
	fh = &mirror->fh_versions[0];
out:
	return fh;
}

373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391
int
nfs4_ff_layout_select_ds_stateid(struct pnfs_layout_segment *lseg,
				u32 mirror_idx,
				nfs4_stateid *stateid)
{
	struct nfs4_ff_layout_mirror *mirror = FF_LAYOUT_COMP(lseg, mirror_idx);

	if (!ff_layout_mirror_valid(lseg, mirror, false)) {
		pr_err_ratelimited("NFS: %s: No data server for mirror offset index %d\n",
			__func__, mirror_idx);
		goto out;
	}

	nfs4_stateid_copy(stateid, &mirror->stateid);
	return 1;
out:
	return 0;
}

392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408
/**
 * nfs4_ff_layout_prepare_ds - prepare a DS connection for an RPC call
 * @lseg: the layout segment we're operating on
 * @ds_idx: index of the DS to use
 * @fail_return: return layout on connect failure?
 *
 * Try to prepare a DS connection to accept an RPC call. This involves
 * selecting a mirror to use and connecting the client to it if it's not
 * already connected.
 *
 * Since we only need a single functioning mirror to satisfy a read, we don't
 * want to return the layout if there is one. For writes though, any down
 * mirror should result in a LAYOUTRETURN. @fail_return is how we distinguish
 * between the two cases.
 *
 * Returns a pointer to a connected DS object on success or NULL on failure.
 */
409 410 411 412 413 414 415 416 417 418
struct nfs4_pnfs_ds *
nfs4_ff_layout_prepare_ds(struct pnfs_layout_segment *lseg, u32 ds_idx,
			  bool fail_return)
{
	struct nfs4_ff_layout_mirror *mirror = FF_LAYOUT_COMP(lseg, ds_idx);
	struct nfs4_pnfs_ds *ds = NULL;
	struct nfs4_deviceid_node *devid;
	struct inode *ino = lseg->pls_layout->plh_inode;
	struct nfs_server *s = NFS_SERVER(ino);
	unsigned int max_payload;
419
	int status;
420

421
	if (!ff_layout_mirror_valid(lseg, mirror, true)) {
422
		pr_err_ratelimited("NFS: %s: No data server for offset index %d\n",
423 424 425 426 427 428
			__func__, ds_idx);
		goto out;
	}

	devid = &mirror->mirror_ds->id_node;
	if (ff_layout_test_devid_unavailable(devid))
429
		goto out_fail;
430 431 432 433 434

	ds = mirror->mirror_ds->ds;
	/* matching smp_wmb() in _nfs4_pnfs_v3/4_ds_connect */
	smp_rmb();
	if (ds->ds_clp)
435
		goto out;
436 437 438 439

	/* FIXME: For now we assume the server sent only one version of NFS
	 * to use for the DS.
	 */
440
	status = nfs4_pnfs_ds_connect(s, ds, devid, dataserver_timeo,
441 442
			     dataserver_retrans,
			     mirror->mirror_ds->ds_versions[0].version,
443
			     mirror->mirror_ds->ds_versions[0].minor_version);
444 445

	/* connect success, check rsize/wsize limit */
446
	if (!status) {
447 448 449 450 451 452 453
		max_payload =
			nfs_block_size(rpc_max_payload(ds->ds_clp->cl_rpcclient),
				       NULL);
		if (mirror->mirror_ds->ds_versions[0].rsize > max_payload)
			mirror->mirror_ds->ds_versions[0].rsize = max_payload;
		if (mirror->mirror_ds->ds_versions[0].wsize > max_payload)
			mirror->mirror_ds->ds_versions[0].wsize = max_payload;
454
		goto out;
455
	}
456
out_fail:
457 458 459 460 461 462 463
	ff_layout_track_ds_error(FF_LAYOUT_FROM_HDR(lseg->pls_layout),
				 mirror, lseg->pls_range.offset,
				 lseg->pls_range.length, NFS4ERR_NXIO,
				 OP_ILLEGAL, GFP_NOIO);
	if (fail_return || !ff_layout_has_available_ds(lseg))
		pnfs_error_mark_layout_for_return(ino, lseg);
	ds = NULL;
464 465 466 467
out:
	return ds;
}

468
const struct cred *
469
ff_layout_get_ds_cred(struct pnfs_layout_segment *lseg, u32 ds_idx,
470
		      const struct cred *mdscred)
471 472
{
	struct nfs4_ff_layout_mirror *mirror = FF_LAYOUT_COMP(lseg, ds_idx);
473
	const struct cred *cred;
474

475
	if (mirror && !mirror->mirror_ds->ds_versions[0].tightly_coupled) {
476 477
		cred = ff_layout_get_mirror_cred(mirror, lseg->pls_range.iomode);
		if (!cred)
478
			cred = get_cred(mdscred);
479
	} else {
480
		cred = get_cred(mdscred);
481
	}
482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505
	return cred;
}

/**
* Find or create a DS rpc client with th MDS server rpc client auth flavor
* in the nfs_client cl_ds_clients list.
*/
struct rpc_clnt *
nfs4_ff_find_or_create_ds_client(struct pnfs_layout_segment *lseg, u32 ds_idx,
				 struct nfs_client *ds_clp, struct inode *inode)
{
	struct nfs4_ff_layout_mirror *mirror = FF_LAYOUT_COMP(lseg, ds_idx);

	switch (mirror->mirror_ds->ds_versions[0].version) {
	case 3:
		/* For NFSv3 DS, flavor is set when creating DS connections */
		return ds_clp->cl_rpcclient;
	case 4:
		return nfs4_find_or_create_ds_client(ds_clp, inode);
	default:
		BUG();
	}
}

506 507 508 509 510 511 512 513 514 515 516 517 518
void ff_layout_free_ds_ioerr(struct list_head *head)
{
	struct nfs4_ff_layout_ds_err *err;

	while (!list_empty(head)) {
		err = list_first_entry(head,
				struct nfs4_ff_layout_ds_err,
				list);
		list_del(&err->list);
		kfree(err);
	}
}

519
/* called with inode i_lock held */
520
int ff_layout_encode_ds_ioerr(struct xdr_stream *xdr, const struct list_head *head)
521
{
522
	struct nfs4_ff_layout_ds_err *err;
523 524
	__be32 *p;

525
	list_for_each_entry(err, head, list) {
526
		/* offset(8) + length(8) + stateid(NFS4_STATEID_SIZE)
527 528
		 * + array length + deviceid(NFS4_DEVICEID4_SIZE)
		 * + status(4) + opnum(4)
529 530
		 */
		p = xdr_reserve_space(xdr,
531
				28 + NFS4_STATEID_SIZE + NFS4_DEVICEID4_SIZE);
532 533 534 535 536 537
		if (unlikely(!p))
			return -ENOBUFS;
		p = xdr_encode_hyper(p, err->offset);
		p = xdr_encode_hyper(p, err->length);
		p = xdr_encode_opaque_fixed(p, &err->stateid,
					    NFS4_STATEID_SIZE);
538 539
		/* Encode 1 error */
		*p++ = cpu_to_be32(1);
540 541 542 543
		p = xdr_encode_opaque_fixed(p, &err->deviceid,
					    NFS4_DEVICEID4_SIZE);
		*p++ = cpu_to_be32(err->status);
		*p++ = cpu_to_be32(err->opnum);
544
		dprintk("%s: offset %llu length %llu status %d op %d\n",
545
			__func__, err->offset, err->length, err->status,
546
			err->opnum);
547 548 549 550 551
	}

	return 0;
}

552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596
static
unsigned int do_layout_fetch_ds_ioerr(struct pnfs_layout_hdr *lo,
				      const struct pnfs_layout_range *range,
				      struct list_head *head,
				      unsigned int maxnum)
{
	struct nfs4_flexfile_layout *flo = FF_LAYOUT_FROM_HDR(lo);
	struct inode *inode = lo->plh_inode;
	struct nfs4_ff_layout_ds_err *err, *n;
	unsigned int ret = 0;

	spin_lock(&inode->i_lock);
	list_for_each_entry_safe(err, n, &flo->error_list, list) {
		if (!pnfs_is_range_intersecting(err->offset,
				pnfs_end_offset(err->offset, err->length),
				range->offset,
				pnfs_end_offset(range->offset, range->length)))
			continue;
		if (!maxnum)
			break;
		list_move(&err->list, head);
		maxnum--;
		ret++;
	}
	spin_unlock(&inode->i_lock);
	return ret;
}

unsigned int ff_layout_fetch_ds_ioerr(struct pnfs_layout_hdr *lo,
				      const struct pnfs_layout_range *range,
				      struct list_head *head,
				      unsigned int maxnum)
{
	unsigned int ret;

	ret = do_layout_fetch_ds_ioerr(lo, range, head, maxnum);
	/* If we're over the max, discard all remaining entries */
	if (ret == maxnum) {
		LIST_HEAD(discard);
		do_layout_fetch_ds_ioerr(lo, range, &discard, -1);
		ff_layout_free_ds_ioerr(&discard);
	}
	return ret;
}

597
static bool ff_read_layout_has_available_ds(struct pnfs_layout_segment *lseg)
598 599 600
{
	struct nfs4_ff_layout_mirror *mirror;
	struct nfs4_deviceid_node *devid;
601
	u32 idx;
602 603 604

	for (idx = 0; idx < FF_LAYOUT_MIRROR_COUNT(lseg); idx++) {
		mirror = FF_LAYOUT_COMP(lseg, idx);
605 606 607 608 609
		if (mirror) {
			if (!mirror->mirror_ds)
				return true;
			if (IS_ERR(mirror->mirror_ds))
				continue;
610 611 612 613 614 615 616 617 618
			devid = &mirror->mirror_ds->id_node;
			if (!ff_layout_test_devid_unavailable(devid))
				return true;
		}
	}

	return false;
}

619 620 621 622 623 624 625 626
static bool ff_rw_layout_has_available_ds(struct pnfs_layout_segment *lseg)
{
	struct nfs4_ff_layout_mirror *mirror;
	struct nfs4_deviceid_node *devid;
	u32 idx;

	for (idx = 0; idx < FF_LAYOUT_MIRROR_COUNT(lseg); idx++) {
		mirror = FF_LAYOUT_COMP(lseg, idx);
627
		if (!mirror || IS_ERR(mirror->mirror_ds))
628
			return false;
629 630
		if (!mirror->mirror_ds)
			continue;
631 632 633 634 635 636 637 638
		devid = &mirror->mirror_ds->id_node;
		if (ff_layout_test_devid_unavailable(devid))
			return false;
	}

	return FF_LAYOUT_MIRROR_COUNT(lseg) != 0;
}

639
static bool ff_layout_has_available_ds(struct pnfs_layout_segment *lseg)
640 641 642 643 644 645 646
{
	if (lseg->pls_range.iomode == IOMODE_READ)
		return  ff_read_layout_has_available_ds(lseg);
	/* Note: RW layout needs all mirrors available */
	return ff_rw_layout_has_available_ds(lseg);
}

647 648 649 650 651 652
bool ff_layout_avoid_mds_available_ds(struct pnfs_layout_segment *lseg)
{
	return ff_layout_no_fallback_to_mds(lseg) ||
	       ff_layout_has_available_ds(lseg);
}

653 654 655 656 657 658
bool ff_layout_avoid_read_on_rw(struct pnfs_layout_segment *lseg)
{
	return lseg->pls_range.iomode == IOMODE_RW &&
	       ff_layout_no_read_on_rw(lseg);
}

659 660 661 662 663 664 665 666
module_param(dataserver_retrans, uint, 0644);
MODULE_PARM_DESC(dataserver_retrans, "The  number of times the NFSv4.1 client "
			"retries a request before it attempts further "
			" recovery  action.");
module_param(dataserver_timeo, uint, 0644);
MODULE_PARM_DESC(dataserver_timeo, "The time (in tenths of a second) the "
			"NFSv4.1  client  waits for a response from a "
			" data server before it retries an NFS request.");