Commit 367f0fab authored by Matthew Sevey's avatar Matthew Sevey

renter: increase number of stuck chunks added to heap

parent eeffade7
Pipeline #50992259 passed with stages
in 63 minutes and 16 seconds
......@@ -100,6 +100,12 @@ const (
)
// Constants that tune the health and repair processes.
const (
// maxStuckChunksInHeap is the maximum number of stuck chunks that the
// repair code will add to the heap at a time
maxStuckChunksInHeap = 5
)
var (
// fileRepairInterval defines how long the renter should wait before
// continuing to repair a file that was recently repaired.
......
......@@ -800,8 +800,10 @@ func NewCustomRenter(g modules.Gateway, cs modules.ConsensusSet, tpool modules.T
downloadHeap: new(downloadChunkHeap),
uploadHeap: uploadHeap{
heapChunks: make(map[uploadChunkID]struct{}),
repairingChunks: make(map[uploadChunkID]struct{}),
stuckHeapChunks: make(map[uploadChunkID]struct{}),
unstuckHeapChunks: make(map[uploadChunkID]struct{}),
newUploads: make(chan struct{}, 1),
repairNeeded: make(chan struct{}, 1),
stuckChunkFound: make(chan struct{}, 1),
......
......@@ -71,8 +71,9 @@ type uploadHeap struct {
//
// repairingChunks is a map containing all the chunks are that currently
// assigned to workers and are being repaired/worked on.
heapChunks map[uploadChunkID]struct{}
repairingChunks map[uploadChunkID]struct{}
repairingChunks map[uploadChunkID]struct{}
stuckHeapChunks map[uploadChunkID]struct{}
unstuckHeapChunks map[uploadChunkID]struct{}
// Control channels
newUploads chan struct{}
......@@ -94,14 +95,27 @@ func (uh *uploadHeap) managedLen() int {
// managedPush will try and add a chunk to the upload heap. If the chunk is
// added it will return true otherwise it will return false
func (uh *uploadHeap) managedPush(uuc *unfinishedUploadChunk) bool {
// Check whether this chunk is already being repaired. If not, add it to the
// upload chunk heap.
var added bool
// Grab chunk stuck status
uuc.mu.Lock()
chunkStuck := uuc.stuck
uuc.mu.Unlock()
// Check if chunk is in any of the heap maps
uh.mu.Lock()
_, exists1 := uh.heapChunks[uuc.id]
_, exists2 := uh.repairingChunks[uuc.id]
if !exists1 && !exists2 {
uh.heapChunks[uuc.id] = struct{}{}
_, existsUnstuckHeap := uh.unstuckHeapChunks[uuc.id]
_, existsRepairing := uh.repairingChunks[uuc.id]
_, existsStuckHeap := uh.stuckHeapChunks[uuc.id]
// Check if the chunk can be added to the heap
canAddStuckChunk := chunkStuck && !existsStuckHeap && !existsRepairing && len(uh.stuckHeapChunks) < maxStuckChunksInHeap
canAddUnstuckChunk := !chunkStuck && !existsUnstuckHeap && !existsRepairing
if canAddStuckChunk {
uh.stuckHeapChunks[uuc.id] = struct{}{}
uh.heap.Push(uuc)
added = true
} else if canAddUnstuckChunk {
uh.unstuckHeapChunks[uuc.id] = struct{}{}
uh.heap.Push(uuc)
added = true
}
......@@ -114,7 +128,8 @@ func (uh *uploadHeap) managedPop() (uc *unfinishedUploadChunk) {
uh.mu.Lock()
if len(uh.heap) > 0 {
uc = heap.Pop(&uh.heap).(*unfinishedUploadChunk)
delete(uh.heapChunks, uc.id)
delete(uh.unstuckHeapChunks, uc.id)
delete(uh.stuckHeapChunks, uc.id)
}
uh.mu.Unlock()
return uc
......@@ -303,43 +318,51 @@ func (r *Renter) buildUnfinishedChunks(entry *siafile.SiaFileSetEntry, hosts map
return incompleteChunks
}
// managedBuildAndPushRandomChunk randomly selects a file and builds the
// unfinished chunks, then randomly adds one chunk to the upload heap
func (r *Renter) managedBuildAndPushRandomChunk(files []*siafile.SiaFileSetEntry, hosts map[string]struct{}, target repairTarget, offline, goodForRenew map[string]bool) {
// managedBuildAndPushRandomChunks randomly selects a file and builds the
// unfinished chunks, then randomly adds chunksToAdd chunks to the upload heap
func (r *Renter) managedBuildAndPushRandomChunk(files []*siafile.SiaFileSetEntry, chunksToAdd int, hosts map[string]struct{}, target repairTarget, offline, goodForRenew map[string]bool) {
// Sanity check that there are files
if len(files) == 0 {
return
}
// Grab a random file
randFileIndex := fastrand.Intn(len(files))
file := files[randFileIndex]
id := r.mu.Lock()
// Build the unfinished stuck chunks from the file
unfinishedUploadChunks := r.buildUnfinishedChunks(file, hosts, target, offline, goodForRenew)
r.mu.Unlock(id)
// Sanity check that there are stuck chunks
if len(unfinishedUploadChunks) == 0 {
r.log.Println("WARN: no stuck unfinishedUploadChunks returned from buildUnfinishedChunks, so no stuck chunks will be added to the heap")
return
}
// Add a random stuck chunk to the upload heap and set its stuckRepair field
// to true
randChunkIndex := fastrand.Intn(len(unfinishedUploadChunks))
randChunk := unfinishedUploadChunks[randChunkIndex]
randChunk.stuckRepair = true
if !r.uploadHeap.managedPush(randChunk) {
// Chunk wasn't added to the heap. Close the file
err := randChunk.fileEntry.Close()
if err != nil {
r.log.Println("WARN: unable to close file:", err)
// Create random indices for files
p := fastrand.Perm(len(files))
for i := 0; i < chunksToAdd && i < len(files); i++ {
// Grab random file
file := files[p[i]]
// Build the unfinished stuck chunks from the file
id := r.mu.Lock()
unfinishedUploadChunks := r.buildUnfinishedChunks(file, hosts, target, offline, goodForRenew)
r.mu.Unlock(id)
// Sanity check that there are stuck chunks
if len(unfinishedUploadChunks) == 0 {
r.log.Println("WARN: no stuck unfinishedUploadChunks returned from buildUnfinishedChunks for", file.SiaPath())
continue
}
}
// Close the unused unfinishedUploadChunks
unfinishedUploadChunks = append(unfinishedUploadChunks[:randChunkIndex], unfinishedUploadChunks[randChunkIndex+1:]...)
for _, chunk := range unfinishedUploadChunks {
err := chunk.fileEntry.Close()
if err != nil {
r.log.Println("WARN: unable to close file:", err)
// Add random stuck chunks to the upload heap and set its stuckRepair field
// to true
randChunkIndex := fastrand.Intn(len(unfinishedUploadChunks))
randChunk := unfinishedUploadChunks[randChunkIndex]
randChunk.stuckRepair = true
if !r.uploadHeap.managedPush(randChunk) {
// Chunk wasn't added to the heap. Close the file
r.log.Debugln("WARN: stuck chunk", randChunk.id, "wasn't added to heap")
err := randChunk.fileEntry.Close()
if err != nil {
r.log.Println("WARN: unable to close file:", err)
}
}
unfinishedUploadChunks = append(unfinishedUploadChunks[:randChunkIndex], unfinishedUploadChunks[randChunkIndex+1:]...)
// Close the unused unfinishedUploadChunks
for _, chunk := range unfinishedUploadChunks {
err := chunk.fileEntry.Close()
if err != nil {
r.log.Println("WARN: unable to close file:", err)
}
}
}
return
......@@ -436,7 +459,7 @@ func (r *Renter) managedBuildChunkHeap(dirSiaPath string, hosts map[string]struc
switch target {
case targetStuckChunks:
r.log.Debugln("Adding stuck chunk to heap")
r.managedBuildAndPushRandomChunk(files, hosts, target, offline, goodForRenew)
r.managedBuildAndPushRandomChunk(files, maxStuckChunksInHeap, hosts, target, offline, goodForRenew)
case targetUnstuckChunks:
r.log.Debugln("Adding chunks to heap")
r.managedBuildAndPushChunks(files, hosts, target, offline, goodForRenew)
......
......@@ -3,6 +3,7 @@ package renter
import (
"encoding/hex"
"fmt"
"math"
"testing"
"gitlab.com/NebulousLabs/Sia/crypto"
......@@ -175,14 +176,19 @@ func TestBuildChunkHeap(t *testing.T) {
// Call managedBuildChunkHeap again as the stuck loop, since the previous
// call saw all the chunks as not downloadable it will have marked them as
// stuck so we should now see one chunk in the heap
// stuck.
//
// For the stuck loop managedBuildChunkHeap will randomly grab one chunk
// from maxChunksInHeap files to add to the heap. There are two files
// created in the test so we would expect 2 or maxStuckChunksInHeap,
// whichever is less, chunks to be added to the heap
rt.renter.managedBuildChunkHeap("", hosts, targetStuckChunks)
if rt.renter.uploadHeap.managedLen() != 1 {
t.Fatalf("Expected heap length of %v but got %v", 1, rt.renter.uploadHeap.managedLen())
expectedChunks := math.Min(2, float64(maxStuckChunksInHeap))
if rt.renter.uploadHeap.managedLen() != int(expectedChunks) {
t.Fatalf("Expected heap length of %v but got %v", expectedChunks, rt.renter.uploadHeap.managedLen())
}
// Pop all chunks off and confirm they are not stuck and not marked as
// stuckRepair
// Pop all chunks off and confirm they are stuck and marked as stuckRepair
chunk := rt.renter.uploadHeap.managedPop()
for chunk != nil {
if !chunk.stuck || !chunk.stuckRepair {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment