repair.go 18.1 KB
Newer Older
Luke Champine's avatar
Luke Champine committed
1 2
package renter

David Vorick's avatar
David Vorick committed
3 4 5 6 7
// TODO: There are no download-to-reupload strategies implemented.

// TODO: The chunkStatus stuff needs to recognize when two different contract
// ids are actually a part of the same file contract.

Luke Champine's avatar
Luke Champine committed
8
import (
David Vorick's avatar
David Vorick committed
9
	"errors"
Luke Champine's avatar
Luke Champine committed
10
	"io"
11
	"os"
12
	"time"
13

David Vorick's avatar
David Vorick committed
14
	"github.com/NebulousLabs/Sia/build"
Ava Howell's avatar
Ava Howell committed
15
	"github.com/NebulousLabs/Sia/modules"
16
	"github.com/NebulousLabs/Sia/types"
Luke Champine's avatar
Luke Champine committed
17 18
)

David Vorick's avatar
David Vorick committed
19
var (
20 21 22
	// errFileDeleted indicates that a chunk which is trying to be repaired
	// cannot be found in the renter.
	errFileDeleted = errors.New("cannot repair chunk as the file is not being tracked by the renter")
David Vorick's avatar
David Vorick committed
23 24
)

25
type (
David Vorick's avatar
David Vorick committed
26 27 28 29 30 31 32 33 34 35 36
	// chunkStatus contains information about a chunk to assist with repairing
	// the chunk.
	chunkStatus struct {
		// contracts is the set of file contracts which are already storing
		// pieces for the chunk.
		//
		// pieces contains the indices of the pieces that have already been
		// uploaded for this chunk.
		//
		// recordedGaps indicates the value that this chunk has recorded in the
		// gapCounts map.
37
		activePieces int
David Vorick's avatar
David Vorick committed
38 39 40 41
		contracts    map[types.FileContractID]struct{}
		pieces       map[uint64]struct{}
		recordedGaps int
		totalPieces  int
42 43 44 45 46
	}

	// chunkID can be used to uniquely identify a chunk within the repair
	// matrix.
	chunkID struct {
David Vorick's avatar
David Vorick committed
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
		index    uint64 // the index of the chunk within its file.
		filename string
	}

	// repairState tracks a bunch of chunks that are being actively repaired.
	repairState struct {
		// activeWorkers is the set of workers that is currently performing
		// work, thus are currently unavailable but will become available soon.
		//
		// availableWorkers is a set of workers that are currently available to
		// perform work, and do not have any current jobs.
		//
		// gapCounts tracks how many chunks have each number of gaps. This is
		// used to drive uploading optimizations.
		//
		// incompleteChunks tracks a set of chunks that don't yet have full
		// redundancy, along with information about which pieces and contracts
		// aren't being used.
		//
66 67 68
		// downloadingChunks tracks the set of chunks that are currently being
		// downloaded in order to be re-uploaded.
		//
69 70 71
		// cachedChunks tracks the set of chunks that have recently been retreived
		// from hosts.
		//
David Vorick's avatar
David Vorick committed
72
		// workerSet tracks the set of workers which can be used for uploading.
73 74 75 76 77
		activeWorkers     map[types.FileContractID]*worker
		availableWorkers  map[types.FileContractID]*worker
		gapCounts         map[int]int
		incompleteChunks  map[chunkID]*chunkStatus
		downloadingChunks map[chunkID]struct{}
78
		cachedChunks      map[chunkID][]byte
79
		resultChan        chan finishedUpload
80 81
	}
)
David Vorick's avatar
David Vorick committed
82

83
// numGaps returns the number of gaps that a chunk has.
David Vorick's avatar
David Vorick committed
84 85 86 87 88 89 90 91 92 93 94 95 96 97
func (cs *chunkStatus) numGaps(rs *repairState) int {
	incompatContracts := 0
	for contract := range cs.contracts {
		_, exists1 := rs.activeWorkers[contract]
		_, exists2 := rs.availableWorkers[contract]
		if exists1 || exists2 {
			incompatContracts++
		}
	}
	contractGaps := len(rs.activeWorkers) + len(rs.availableWorkers) - incompatContracts
	pieceGaps := cs.totalPieces - len(cs.pieces)

	if contractGaps < pieceGaps {
		return contractGaps
98
	}
David Vorick's avatar
David Vorick committed
99
	return pieceGaps
100 101
}

102 103 104 105
// managedAddFileToRepairState will take a file and add each of the incomplete
// chunks to the repair state, along with data about which pieces need
// attention.
func (r *Renter) managedAddFileToRepairState(rs *repairState, file *file) {
106 107
	// Check that the file is being tracked, and therefore candidate for
	// repair.
108 109 110 111 112 113 114
	file.mu.Lock()
	_, exists := r.tracking[file.name]
	file.mu.Unlock()
	if !exists {
		return
	}

David Vorick's avatar
David Vorick committed
115
	// Fetch the list of potential contracts from the repair state.
David Vorick's avatar
David Vorick committed
116
	contracts := make([]types.FileContractID, 0)
David Vorick's avatar
David Vorick committed
117 118 119 120
	for contract := range rs.activeWorkers {
		contracts = append(contracts, contract)
	}
	for contract := range rs.availableWorkers {
David Vorick's avatar
David Vorick committed
121 122 123
		contracts = append(contracts, contract)
	}

David Vorick's avatar
David Vorick committed
124 125
	// Create the data structures that allow us to fill out the status for each
	// chunk.
David Vorick's avatar
David Vorick committed
126
	chunkCount := file.numChunks()
David Vorick's avatar
David Vorick committed
127 128
	availablePieces := make([]map[uint64]struct{}, chunkCount)
	utilizedContracts := make([]map[types.FileContractID]struct{}, chunkCount)
David Vorick's avatar
David Vorick committed
129
	for i := uint64(0); i < chunkCount; i++ {
David Vorick's avatar
David Vorick committed
130 131
		availablePieces[i] = make(map[uint64]struct{})
		utilizedContracts[i] = make(map[types.FileContractID]struct{})
David Vorick's avatar
David Vorick committed
132 133
	}

134
	// Iterate through each contract and figure out which pieces are available.
David Vorick's avatar
David Vorick committed
135
	for _, contract := range file.contracts {
David Vorick's avatar
David Vorick committed
136 137 138
		// Check whether this contract is offline. Even if the contract is
		// offline, we want to record that the chunk has attempted to use this
		// contract.
139 140
		id := r.hostContractor.ResolveID(contract.ID)
		stable := !r.hostContractor.IsOffline(id) && r.hostContractor.GoodForRenew(id)
141

David Vorick's avatar
David Vorick committed
142
		// Scan all of the pieces of the contract.
David Vorick's avatar
David Vorick committed
143
		for _, piece := range contract.Pieces {
144
			utilizedContracts[piece.Chunk][id] = struct{}{}
David Vorick's avatar
David Vorick committed
145

146
			// Only mark the piece as complete if the piece can be recovered.
147
			if stable {
David Vorick's avatar
David Vorick committed
148
				availablePieces[piece.Chunk][piece.Piece] = struct{}{}
149
			}
David Vorick's avatar
David Vorick committed
150 151 152
		}
	}

David Vorick's avatar
David Vorick committed
153 154
	// Create the chunkStatus object for each chunk and add it to the set of
	// incomplete chunks.
David Vorick's avatar
David Vorick committed
155
	for i := uint64(0); i < chunkCount; i++ {
156 157 158 159
		// Skip this chunk if all pieces have been uploaded.
		if len(availablePieces[i]) >= file.erasureCode.NumPieces() {
			continue
		}
David Vorick's avatar
David Vorick committed
160

161 162 163 164 165 166 167
		// Skip this chunk if it's already in the set of incomplete chunks.
		cid := chunkID{i, file.name}
		_, exists := rs.incompleteChunks[cid]
		if exists {
			continue
		}

David Vorick's avatar
David Vorick committed
168 169 170 171 172 173
		// Create the chunkStatus object and add it to the set of incomplete
		// chunks.
		cs := &chunkStatus{
			contracts:   utilizedContracts[i],
			pieces:      availablePieces[i],
			totalPieces: file.erasureCode.NumPieces(),
174
		}
David Vorick's avatar
David Vorick committed
175 176 177
		cs.recordedGaps = cs.numGaps(rs)
		rs.incompleteChunks[cid] = cs
		rs.gapCounts[cs.recordedGaps]++
David Vorick's avatar
David Vorick committed
178 179 180
	}
}

181 182 183
// managedRepairIteration does a full file repair iteration, which includes
// scanning all of the files for missing pieces and attempting repair them by
// uploading to chunks.
David Vorick's avatar
David Vorick committed
184 185
func (r *Renter) managedRepairIteration(rs *repairState) {
	// Wait for work if there is nothing to do.
186
	if len(rs.activeWorkers) == 0 && len(rs.incompleteChunks) == 0 {
187
		select {
188 189
		case <-r.tg.StopChan():
			return
David Vorick's avatar
David Vorick committed
190
		case file := <-r.newRepairs:
191
			r.managedAddFileToRepairState(rs, file)
David Vorick's avatar
David Vorick committed
192
			return
193 194 195
		}
	}

David Vorick's avatar
David Vorick committed
196
	// Reset the available workers.
David Vorick's avatar
David Vorick committed
197
	contracts := r.hostContractor.Contracts()
David Vorick's avatar
David Vorick committed
198
	id := r.mu.Lock()
David Vorick's avatar
David Vorick committed
199
	r.updateWorkerPool(contracts)
David Vorick's avatar
David Vorick committed
200 201
	rs.availableWorkers = make(map[types.FileContractID]*worker)
	for id, worker := range r.workerPool {
202 203 204 205 206
		// Ignore the workers that are not good for uploading.
		if !worker.contract.GoodForUpload {
			continue
		}

David Vorick's avatar
David Vorick committed
207 208 209 210 211
		// Ignore workers that are already in the active set of workers.
		_, exists := rs.activeWorkers[worker.contractID]
		if exists {
			continue
		}
212

213 214 215 216
		// Ignore workers that have had an upload failure recently. The cooldown
		// time scales exponentially as the number of consecutive failures grow,
		// stopping at 10 doublings, or about 17 hours total cooldown.
		penalty := uint64(worker.consecutiveUploadFailures)
217
		if worker.consecutiveUploadFailures > time.Duration(maxConsecutivePenalty) {
218 219 220
			penalty = uint64(maxConsecutivePenalty)
		}
		if time.Since(worker.recentUploadFailure) < uploadFailureCooldown*(1<<penalty) {
David Vorick's avatar
David Vorick committed
221
			continue
David Vorick's avatar
David Vorick committed
222 223
		}

David Vorick's avatar
David Vorick committed
224 225
		rs.availableWorkers[id] = worker
	}
226 227 228 229 230 231 232 233 234
	r.mu.Unlock(id)

	// Determine the maximum number of gaps of any chunk in the repair matrix.
	maxGaps := 0
	for i, gaps := range rs.gapCounts {
		if i > maxGaps && gaps > 0 {
			maxGaps = i
		}
	}
David Vorick's avatar
David Vorick committed
235

236 237 238 239 240 241 242 243
	// prune the chunk cache
	for cid := range rs.cachedChunks {
		if len(rs.cachedChunks) <= maxChunkCacheSize {
			break
		}
		delete(rs.cachedChunks, cid)
	}

David Vorick's avatar
David Vorick committed
244 245 246
	// Scan through the chunks until a candidate for uploads is found.
	var chunksToDelete []chunkID
	for chunkID, chunkStatus := range rs.incompleteChunks {
247 248 249 250
		// check if the chunk is currently being downloaded for recovery
		if _, downloading := rs.downloadingChunks[chunkID]; downloading {
			continue
		}
David Vorick's avatar
David Vorick committed
251 252 253 254 255 256 257
		// Update the number of gaps for this chunk.
		numGaps := chunkStatus.numGaps(rs)
		rs.gapCounts[chunkStatus.recordedGaps]--
		rs.gapCounts[numGaps]++
		chunkStatus.recordedGaps = numGaps

		// Remove this chunk from the set of incomplete chunks if it has been
258 259
		// completed and there are no workers still working on it.
		if numGaps == 0 && chunkStatus.activePieces == 0 {
David Vorick's avatar
David Vorick committed
260 261
			chunksToDelete = append(chunksToDelete, chunkID)
			continue
David Vorick's avatar
David Vorick committed
262
		}
David Vorick's avatar
David Vorick committed
263 264 265 266

		// Skip this chunk if it does not have enough gaps.
		if maxGaps >= minPiecesRepair && numGaps < minPiecesRepair {
			continue
David Vorick's avatar
David Vorick committed
267 268
		}

David Vorick's avatar
David Vorick committed
269 270 271
		// Determine the set of useful workers - workers that are both
		// available and able to repair this chunk.
		var usefulWorkers []types.FileContractID
272
		for workerID, worker := range rs.availableWorkers {
David Vorick's avatar
David Vorick committed
273
			_, exists := chunkStatus.contracts[workerID]
274
			if !exists && worker.contract.GoodForUpload {
David Vorick's avatar
David Vorick committed
275
				usefulWorkers = append(usefulWorkers, workerID)
276
			}
David Vorick's avatar
David Vorick committed
277
		}
278

279 280
		// Skip this chunk if the set of useful workers does not meet the
		// minimum pieces requirement.
David Vorick's avatar
David Vorick committed
281 282 283
		if maxGaps >= minPiecesRepair && len(usefulWorkers) < minPiecesRepair {
			continue
		}
David Vorick's avatar
David Vorick committed
284

285 286 287 288 289 290
		// Skip this chunk if the set of useful workers is not complete, and
		// the maxGaps value is less than the minPiecesRepair value.
		if maxGaps < minPiecesRepair && len(usefulWorkers) < numGaps {
			continue
		}

David Vorick's avatar
David Vorick committed
291 292 293 294 295 296 297 298 299 300 301
		// Send off the work.
		err := r.managedScheduleChunkRepair(rs, chunkID, chunkStatus, usefulWorkers)
		if err != nil {
			r.log.Println("Unable to repair chunk:", err)
			chunksToDelete = append(chunksToDelete, chunkID)
			continue
		}
	}
	for _, cid := range chunksToDelete {
		delete(rs.incompleteChunks, cid)
	}
David Vorick's avatar
David Vorick committed
302

David Vorick's avatar
David Vorick committed
303 304 305
	// Block until some of the workers return.
	r.managedWaitOnRepairWork(rs)
}
306

307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348
// managedDownloadChunkData downloads the requested chunk from Sia, for use in
// the repair loop.
func (r *Renter) managedDownloadChunkData(rs *repairState, file *file, offset uint64, chunkIndex uint64, chunkID chunkID) ([]byte, error) {
	rs.downloadingChunks[chunkID] = struct{}{}
	defer delete(rs.downloadingChunks, chunkID)

	// build current contracts map
	currentContracts := make(map[modules.NetAddress]types.FileContractID)
	for _, contract := range r.hostContractor.Contracts() {
		currentContracts[contract.NetAddress] = contract.ID
	}

	downloadSize := file.chunkSize()
	if offset+downloadSize > file.size {
		downloadSize = file.size - offset
	}

	// create a DownloadBufferWriter for the chunk
	buf := NewDownloadBufferWriter(file.chunkSize())

	// create the download object and push it on to the download queue
	d := r.newSectionDownload(file, buf, currentContracts, offset, downloadSize)
	done := make(chan struct{})
	defer close(done)
	go func() {
		select {
		case r.newDownloads <- d:
		case <-done:
		}
	}()

	// wait for the download to complete and return the data
	select {
	case <-d.downloadFinished:
		return buf.Bytes(), d.Err()
	case <-r.tg.StopChan():
		return nil, errors.New("chunk download interrupted by shutdown")
	case <-time.After(chunkDownloadTimeout):
		return nil, errors.New("chunk download timed out")
	}
}

349 350 351 352
// managedGetChunkData grabs the requested `chunkID` from the file, in order to
// repair the file. If the `trackedFile` can be found on disk, grab the chunk
// from the file, otherwise attempt to queue a new download for only that chunk
// and return the downloaded chunk.
353
func (r *Renter) managedGetChunkData(rs *repairState, file *file, trackedFile trackedFile, chunkID chunkID) ([]byte, error) {
David Vorick's avatar
David Vorick committed
354
	chunkIndex := chunkID.index
Ava Howell's avatar
Ava Howell committed
355
	offset := chunkIndex * file.chunkSize()
Ava Howell's avatar
Ava Howell committed
356 357 358

	// try to read the chunk from disk
	f, err := os.Open(trackedFile.RepairPath)
David Vorick's avatar
David Vorick committed
359
	if err != nil {
Ava Howell's avatar
Ava Howell committed
360
		// if that fails, try to download the chunk
361
		return r.managedDownloadChunkData(rs, file, offset, chunkIndex, chunkID)
David Vorick's avatar
David Vorick committed
362
	}
Ava Howell's avatar
Ava Howell committed
363 364
	defer f.Close()

David Vorick's avatar
David Vorick committed
365
	chunkData := make([]byte, file.chunkSize())
Ava Howell's avatar
Ava Howell committed
366
	_, err = f.ReadAt(chunkData, int64(offset))
David Vorick's avatar
David Vorick committed
367 368 369 370 371 372
	if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
		// TODO: We should be doing better error handling here - shouldn't be
		// running into ErrUnexpectedEOF intentionally because if it happens
		// unintentionally we will believe that the chunk was read from memory
		// correctly.

Ava Howell's avatar
Ava Howell committed
373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392
		return nil, err
	}

	return chunkData, nil
}

// managedScheduleChunkRepair takes a chunk and schedules some repair on that
// chunk using the chunk state and a list of workers.
func (r *Renter) managedScheduleChunkRepair(rs *repairState, chunkID chunkID, chunkStatus *chunkStatus, usefulWorkers []types.FileContractID) error {
	// Check that the file is still in the renter.
	filename := chunkID.filename
	id := r.mu.RLock()
	file, exists1 := r.files[filename]
	meta, exists2 := r.tracking[filename]
	r.mu.RUnlock(id)
	if !exists1 || !exists2 {
		return errFileDeleted
	}

	// read the chunk into memory
393 394 395 396 397 398 399 400 401 402 403
	// check the cache first
	var chunkData []byte
	if cachedData, exists := rs.cachedChunks[chunkID]; exists {
		chunkData = cachedData
	} else {
		data, err := r.managedGetChunkData(rs, file, meta, chunkID)
		if err != nil {
			return build.ExtendErr("unable to get repair chunk:", err)
		}
		chunkData = data
		rs.cachedChunks[chunkID] = data
David Vorick's avatar
David Vorick committed
404
	}
405

406
	// Erasure code the pieces.
David Vorick's avatar
David Vorick committed
407 408 409 410
	pieces, err := file.erasureCode.Encode(chunkData)
	if err != nil {
		return build.ExtendErr("unable to erasure code chunk data", err)
	}
411

David Vorick's avatar
David Vorick committed
412 413 414 415 416 417
	// Get the set of pieces that are missing from the chunk.
	var missingPieces []uint64
	for i := uint64(0); i < uint64(file.erasureCode.NumPieces()); i++ {
		_, exists := chunkStatus.pieces[i]
		if !exists {
			missingPieces = append(missingPieces, i)
418
		}
David Vorick's avatar
David Vorick committed
419
	}
420

421 422 423 424 425 426 427
	// Truncate the pieces so that they match the size of the useful workers.
	if len(usefulWorkers) < len(missingPieces) {
		missingPieces = missingPieces[:len(usefulWorkers)]
	}

	// Encrypt the missing pieces.
	for _, missingPiece := range missingPieces {
Ava Howell's avatar
Ava Howell committed
428
		key := deriveKey(file.masterKey, chunkID.index, uint64(missingPiece))
429
		pieces[missingPiece] = key.EncryptBytes(pieces[missingPiece])
430 431
	}

David Vorick's avatar
David Vorick committed
432 433 434 435 436 437 438 439 440
	// Give each piece to a worker in the set of useful workers.
	for len(usefulWorkers) > 0 && len(missingPieces) > 0 {
		uw := uploadWork{
			chunkID:    chunkID,
			data:       pieces[missingPieces[0]],
			file:       file,
			pieceIndex: missingPieces[0],

			resultChan: rs.resultChan,
441
		}
David Vorick's avatar
David Vorick committed
442 443 444
		// Grab the worker, and update the worker tracking in the repair state.
		worker := rs.availableWorkers[usefulWorkers[0]]
		rs.activeWorkers[usefulWorkers[0]] = worker
445 446 447
		delete(rs.availableWorkers, usefulWorkers[0])

		chunkStatus.activePieces++
David Vorick's avatar
David Vorick committed
448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466
		chunkStatus.contracts[usefulWorkers[0]] = struct{}{}
		chunkStatus.pieces[missingPieces[0]] = struct{}{}

		// Update the number of gaps for this chunk.
		numGaps := chunkStatus.numGaps(rs)
		rs.gapCounts[chunkStatus.recordedGaps]--
		rs.gapCounts[numGaps]++
		chunkStatus.recordedGaps = numGaps

		// Update the set of useful workers and the set of missing pieces.
		missingPieces = missingPieces[1:]
		usefulWorkers = usefulWorkers[1:]

		// Deliver the payload to the worker.
		select {
		case worker.uploadChan <- uw:
		default:
			r.log.Critical("Worker is supposed to be available, but upload work channel is full")
			worker.uploadChan <- uw
467
		}
David Vorick's avatar
David Vorick committed
468 469 470
	}
	return nil
}
David Vorick's avatar
David Vorick committed
471

David Vorick's avatar
David Vorick committed
472 473 474 475 476 477 478
// managedWaitOnRepairWork will block until a worker returns from an upload,
// handling the results.
func (r *Renter) managedWaitOnRepairWork(rs *repairState) {
	// If there are no active workers, return early.
	if len(rs.activeWorkers) == 0 {
		return
	}
479

David Vorick's avatar
David Vorick committed
480 481 482 483 484
	// Wait for an upload to finish.
	var finishedUpload finishedUpload
	select {
	case finishedUpload = <-rs.resultChan:
	case file := <-r.newRepairs:
485
		r.managedAddFileToRepairState(rs, file)
David Vorick's avatar
David Vorick committed
486 487 488 489
		return
	case <-r.tg.StopChan():
		return
	}
David Vorick's avatar
David Vorick committed
490

491
	// Mark that the worker of this chunk has completed its work.
492 493 494 495 496 497 498 499 500
	if cs, ok := rs.incompleteChunks[finishedUpload.chunkID]; !ok {
		// The file was deleted mid-upload. Add the worker back to the set of
		// available workers.
		rs.availableWorkers[finishedUpload.workerID] = rs.activeWorkers[finishedUpload.workerID]
		delete(rs.activeWorkers, finishedUpload.workerID)
		return
	} else {
		cs.activePieces--
	}
501

David Vorick's avatar
David Vorick committed
502 503 504 505 506 507 508
	// If there was no error, add the worker back to the set of
	// available workers and wait for the next worker.
	if finishedUpload.err == nil {
		rs.availableWorkers[finishedUpload.workerID] = rs.activeWorkers[finishedUpload.workerID]
		delete(rs.activeWorkers, finishedUpload.workerID)
		return
	}
David Vorick's avatar
David Vorick committed
509

David Vorick's avatar
David Vorick committed
510 511 512
	// Log the error and retire the worker.
	r.log.Debugln("Error while performing upload to", finishedUpload.workerID, "::", finishedUpload.err)
	delete(rs.activeWorkers, finishedUpload.workerID)
David Vorick's avatar
David Vorick committed
513

David Vorick's avatar
David Vorick committed
514 515 516
	// Indicate in the set of incomplete chunks that this piece was not
	// completed.
	rs.incompleteChunks[finishedUpload.chunkID].pieces[finishedUpload.pieceIndex] = struct{}{}
517
}
David Vorick's avatar
David Vorick committed
518

519 520 521 522 523 524 525 526 527 528 529 530 531
// threadedQueueRepairs is a goroutine that runs in the background and
// continuously adds files to the repair loop, slow enough that it's not a
// resource burden but fast enough that no file is ever at risk.
//
// NOTE: This loop is pretty naive in terms of work management. As the number
// of files goes up, and as the number of chunks per file goes up, this will
// become a performance bottleneck, and even inhibit repair progress.
func (r *Renter) threadedQueueRepairs() {
	for {
		// Compress the set of files into a slice.
		id := r.mu.RLock()
		var files []*file
		for _, file := range r.files {
532 533 534 535
			if _, ok := r.tracking[file.name]; ok {
				// Only repair files that are being tracked.
				files = append(files, file)
			}
536 537 538
		}
		r.mu.RUnlock(id)

539
		// Add files.
540 541
		for _, file := range files {
			// Send the file down the repair channel.
David Vorick's avatar
David Vorick committed
542 543
			select {
			case r.newRepairs <- file:
544 545 546 547 548
			case <-r.tg.StopChan():
				return
			}
		}

549
		// Chill out for an extra 15 minutes before going through the files
550
		// again.
David Vorick's avatar
David Vorick committed
551
		select {
552
		case <-time.After(repairQueueInterval):
553 554 555 556 557 558
		case <-r.tg.StopChan():
			return
		}
	}
}

559 560 561 562
// threadedRepairLoop improves the health of files tracked by the renter by
// reuploading their missing pieces. Multiple repair attempts may be necessary
// before the file reaches full redundancy.
func (r *Renter) threadedRepairLoop() {
David Vorick's avatar
David Vorick committed
563
	rs := &repairState{
564 565 566 567
		activeWorkers:     make(map[types.FileContractID]*worker),
		availableWorkers:  make(map[types.FileContractID]*worker),
		gapCounts:         make(map[int]int),
		incompleteChunks:  make(map[chunkID]*chunkStatus),
568
		cachedChunks:      make(map[chunkID][]byte),
569 570
		downloadingChunks: make(map[chunkID]struct{}),
		resultChan:        make(chan finishedUpload),
David Vorick's avatar
David Vorick committed
571
	}
572 573 574
	for {
		if r.tg.Add() != nil {
			return
David Vorick's avatar
David Vorick committed
575
		}
David Vorick's avatar
David Vorick committed
576
		r.managedRepairIteration(rs)
577
		r.tg.Done()
David Vorick's avatar
David Vorick committed
578 579
	}
}