Commit dd9b3bdb authored by David Vorick's avatar David Vorick

Merge branch 'siafile-addchunk' into 'master'

Upload Streaming

See merge request !3367
parents d1663d34 daf9bebd
Pipeline #57378496 failed with stages
in 2 minutes and 57 seconds
......@@ -2390,6 +2390,32 @@ The number of parity pieces to use when erasure coding the file. Total redundanc
standard success or error response. See [standard responses](#standard-responses).
## /renter/uploadstream/*siapath* [POST]
> curl example
```go
curl -A "Sia-Agent" -u "":<apipassword> "localhost:9980/renter/upload/myfile?datapieces=10&paritypieces=20" --data-binary @myfile.dat
```
uploads a file to the network using a stream.
### Path Parameters
#### REQUIRED
**siapath** | string
Location where the file will reside in the renter on the network. The path must be non-empty, may not include any path traversal strings ("./", "../"), and may not begin with a forward-slash character.
### Query String Parameters
#### OPTIONAL
**datapieces** | int
The number of data pieces to use when erasure coding the file.
**paritypieces** | int
The number of parity pieces to use when erasure coding the file. Total redundancy of the file is (datapieces+paritypieces)/datapieces.
### Response
standard success or error response. See [standard responses](#standard-responses).
# Transaction Pool
## /tpool/confirmed/:id [GET]
......
......@@ -622,6 +622,10 @@ type Renter interface {
// Upload uploads a file using the input parameters.
Upload(FileUploadParams) error
// UploadStreamFromReader reads from the provided reader until io.EOF is reached and
// upload the data to the Sia network.
UploadStreamFromReader(up FileUploadParams, reader io.Reader) error
// CreateDir creates a directory for the renter
CreateDir(siaPath SiaPath) error
......
......@@ -209,8 +209,10 @@ func (d *download) markComplete() {
func (d *download) onComplete(f downloadCompleteFunc) {
select {
case <-d.completeChan:
err := f(d.err)
d.log.Println("Failed to execute downloadCompleteFunc", err)
if err := f(d.err); err != nil {
d.log.Println("Failed to execute at least one downloadCompleteFunc", err)
}
return
default:
}
d.downloadCompleteFuncs = append(d.downloadCompleteFuncs, f)
......
......@@ -77,7 +77,7 @@ OUTER:
// 80% chance to add a piece.
if fastrand.Intn(100) < 80 {
spk := hostkeys[fastrand.Intn(len(hostkeys))]
offset := uint64(fastrand.Intn(int(sf.staticMetadata.StaticFileSize)))
offset := uint64(fastrand.Intn(int(sf.staticMetadata.FileSize)))
chunkIndex, _ := sf.Snapshot().ChunkIndexByOffset(offset)
pieceIndex := uint64(fastrand.Intn(sf.staticMetadata.staticErasureCode.NumPieces()))
if err := sf.AddPiece(spk, chunkIndex, pieceIndex, crypto.Hash{}); err != nil {
......
......@@ -24,7 +24,7 @@ type (
StaticPagesPerChunk uint8 `json:"pagesperchunk"` // number of pages reserved for storing a chunk.
StaticVersion [16]byte `json:"version"` // version of the sia file format used
StaticFileSize int64 `json:"filesize"` // total size of the file
FileSize int64 `json:"filesize"` // total size of the file
StaticPieceSize uint64 `json:"piecesize"` // size of a single piece of the file
LocalPath string `json:"localpath"` // file to the local copy of the file used for repairing
......@@ -235,10 +235,7 @@ func (sf *SiaFile) Rename(newSiaPath modules.SiaPath, newSiaFilePath string) err
}
updates = append(updates, headerUpdate...)
// Write the chunks to the new location.
chunksUpdates, err := sf.saveChunksUpdates()
if err != nil {
return err
}
chunksUpdates := sf.saveChunksUpdates()
updates = append(updates, chunksUpdates...)
// Apply updates.
return sf.createAndApplyTransaction(updates...)
......@@ -276,7 +273,9 @@ func (sf *SiaFile) SetLocalPath(path string) error {
// Size returns the file's size.
func (sf *SiaFile) Size() uint64 {
return uint64(sf.staticMetadata.StaticFileSize)
sf.mu.RLock()
defer sf.mu.RUnlock()
return uint64(sf.staticMetadata.FileSize)
}
// UpdateAccessTime updates the AccessTime timestamp to the current time.
......@@ -314,7 +313,7 @@ func (sf *SiaFile) UpdateCachedHealthMetadata(metadata CachedHealthMetadata) err
defer sf.mu.Unlock()
// Update the number of stuck chunks
var numStuckChunks uint64
for _, chunk := range sf.staticChunks {
for _, chunk := range sf.chunks {
if chunk.Stuck {
numStuckChunks++
}
......
......@@ -119,7 +119,7 @@ func loadSiaFile(path string, wal *writeaheadlog.WAL, deps modules.Dependencies)
if err != nil {
return nil, err
}
sf.staticChunks = append(sf.staticChunks, chunk)
sf.chunks = append(sf.chunks, chunk)
}
return sf, nil
}
......@@ -297,6 +297,9 @@ func (sf *SiaFile) createAndApplyTransaction(updates ...writeaheadlog.Update) er
if sf.deleted {
return errors.New("can't call createAndApplyTransaction on deleted file")
}
if len(updates) == 0 {
return nil
}
// This should never be called on a deleted file.
if sf.deleted {
return errors.New("shouldn't apply updates on deleted file")
......@@ -381,35 +384,29 @@ func (sf *SiaFile) saveFile() error {
if err != nil {
return errors.AddContext(err, "failed to to create save header updates")
}
chunksUpdates, err := sf.saveChunksUpdates()
if err != nil {
return errors.AddContext(err, "failed to create save chunks updates")
}
chunksUpdates := sf.saveChunksUpdates()
err = sf.createAndApplyTransaction(append(headerUpdates, chunksUpdates...)...)
return errors.AddContext(err, "failed to apply saveFile updates")
}
// saveChunkUpdate creates a writeaheadlog update that saves a single marshaled chunk
// to disk when applied.
func (sf *SiaFile) saveChunkUpdate(chunkIndex int) (writeaheadlog.Update, error) {
func (sf *SiaFile) saveChunkUpdate(chunkIndex int) writeaheadlog.Update {
offset := sf.chunkOffset(chunkIndex)
chunkBytes := marshalChunk(sf.staticChunks[chunkIndex])
return sf.createInsertUpdate(offset, chunkBytes), nil
chunkBytes := marshalChunk(sf.chunks[chunkIndex])
return sf.createInsertUpdate(offset, chunkBytes)
}
// saveChunksUpdates creates writeaheadlog updates which save the marshaled chunks of
// the SiaFile to disk when applied.
func (sf *SiaFile) saveChunksUpdates() ([]writeaheadlog.Update, error) {
func (sf *SiaFile) saveChunksUpdates() []writeaheadlog.Update {
// Marshal all the chunks and create updates for them.
updates := make([]writeaheadlog.Update, 0, len(sf.staticChunks))
for chunkIndex := range sf.staticChunks {
update, err := sf.saveChunkUpdate(chunkIndex)
if err != nil {
return nil, err
}
updates := make([]writeaheadlog.Update, 0, len(sf.chunks))
for chunkIndex := range sf.chunks {
update := sf.saveChunkUpdate(chunkIndex)
updates = append(updates, update)
}
return updates, nil
return updates
}
// saveHeaderUpdates creates writeaheadlog updates to saves the metadata and
......
......@@ -54,7 +54,7 @@ func (sfs *SiaFileSet) NewFromLegacyData(fd FileData) (*SiaFileSetEntry, error)
ChunkOffset: defaultReservedMDPages * pageSize,
ChangeTime: currentTime,
CreateTime: currentTime,
StaticFileSize: int64(fd.FileSize),
FileSize: int64(fd.FileSize),
LocalPath: fd.RepairPath,
StaticMasterKey: mk.Key(),
StaticMasterKeyType: mk.Type(),
......@@ -72,9 +72,9 @@ func (sfs *SiaFileSet) NewFromLegacyData(fd FileData) (*SiaFileSetEntry, error)
deleted: fd.Deleted,
wal: sfs.wal,
}
file.staticChunks = make([]chunk, len(fd.Chunks))
for i := range file.staticChunks {
file.staticChunks[i].Pieces = make([][]piece, file.staticMetadata.staticErasureCode.NumPieces())
file.chunks = make([]chunk, len(fd.Chunks))
for i := range file.chunks {
file.chunks[i].Pieces = make([][]piece, file.staticMetadata.staticErasureCode.NumPieces())
}
// Populate the pubKeyTable of the file and add the pieces.
......@@ -93,7 +93,7 @@ func (sfs *SiaFileSet) NewFromLegacyData(fd FileData) (*SiaFileSetEntry, error)
})
}
// Add the piece to the SiaFile.
file.staticChunks[chunkIndex].Pieces[pieceIndex] = append(file.staticChunks[chunkIndex].Pieces[pieceIndex], piece{
file.chunks[chunkIndex].Pieces[pieceIndex] = append(file.chunks[chunkIndex].Pieces[pieceIndex], piece{
HostTableOffset: tableOffset,
MerkleRoot: p.MerkleRoot,
})
......
......@@ -68,10 +68,10 @@ func equalFiles(sf, sf2 *SiaFile) error {
fmt.Println(sf2.pubKeyTable)
return errors.New("sf pubKeyTable doesn't equal sf2 pubKeyTable")
}
if !reflect.DeepEqual(sf.staticChunks, sf2.staticChunks) {
fmt.Println(len(sf.staticChunks), len(sf2.staticChunks))
fmt.Println("sf1", sf.staticChunks)
fmt.Println("sf2", sf2.staticChunks)
if !reflect.DeepEqual(sf.chunks, sf2.chunks) {
fmt.Println(len(sf.chunks), len(sf2.chunks))
fmt.Println("sf1", sf.chunks)
fmt.Println("sf2", sf2.chunks)
return errors.New("sf chunks don't equal sf2 chunks")
}
if sf.siaFilePath != sf2.siaFilePath {
......@@ -124,7 +124,7 @@ func newBlankTestFileAndWAL() (*SiaFile, *writeaheadlog.WAL, string) {
panic(err)
}
// Check that the number of chunks in the file is correct.
if len(sf.staticChunks) != numChunks {
if len(sf.chunks) != numChunks {
panic("newTestFile didn't create the expected number of chunks")
}
return sf, wal, walPath
......@@ -142,7 +142,7 @@ func newBlankTestFile() *SiaFile {
func newTestFile() *SiaFile {
sf := newBlankTestFile()
// Add pieces to each chunk.
for chunkIndex := range sf.staticChunks {
for chunkIndex := range sf.chunks {
for pieceIndex := 0; pieceIndex < sf.ErasureCode().NumPieces(); pieceIndex++ {
numPieces := fastrand.Intn(3) // up to 2 hosts for each piece
for i := 0; i < numPieces; i++ {
......@@ -224,7 +224,7 @@ func TestNewFile(t *testing.T) {
}
// Marshal the chunks.
var chunks [][]byte
for _, chunk := range sf.staticChunks {
for _, chunk := range sf.chunks {
c := marshalChunk(chunk)
chunks = append(chunks, c)
}
......@@ -241,7 +241,7 @@ func TestNewFile(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if fi.Size() != sf.chunkOffset(len(sf.staticChunks)-1)+int64(len(chunks[len(chunks)-1])) {
if fi.Size() != sf.chunkOffset(len(sf.chunks)-1)+int64(len(chunks[len(chunks)-1])) {
t.Fatal("file doesn't have right size")
}
// Compare the metadata to the on-disk metadata.
......@@ -264,7 +264,7 @@ func TestNewFile(t *testing.T) {
}
// Compare the chunks to the on-disk chunks one-by-one.
readChunk := make([]byte, int(sf.staticMetadata.StaticPagesPerChunk)*pageSize)
for chunkIndex := range sf.staticChunks {
for chunkIndex := range sf.chunks {
_, err := f.ReadAt(readChunk, sf.chunkOffset(chunkIndex))
if err != nil && err != io.EOF {
t.Fatal(err)
......@@ -697,15 +697,12 @@ func TestSaveChunk(t *testing.T) {
sf := newTestFile()
// Choose a random chunk from the file and replace it.
chunkIndex := fastrand.Intn(len(sf.staticChunks))
chunkIndex := fastrand.Intn(len(sf.chunks))
chunk := randomChunk()
sf.staticChunks[chunkIndex] = chunk
sf.chunks[chunkIndex] = chunk
// Write the chunk to disk using saveChunk.
update, err := sf.saveChunkUpdate(chunkIndex)
if err != nil {
t.Fatal(err)
}
update := sf.saveChunkUpdate(chunkIndex)
if err := sf.createAndApplyTransaction(update); err != nil {
t.Fatal(err)
}
......
This diff is collapsed.
......@@ -57,6 +57,80 @@ func randomPiece() piece {
return piece
}
// TestGrowNumChunks is a unit test for the SiaFile's GrowNumChunks method.
func TestGrowNumChunks(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
t.Parallel()
// Create a blank file.
sf, wal, _ := newBlankTestFileAndWAL()
expectedChunks := sf.NumChunks()
expectedSize := sf.Size()
// Declare a check method.
checkFile := func(sf *SiaFile, numChunks, size uint64) {
if numChunks != sf.NumChunks() {
t.Fatalf("Expected %v chunks but was %v", numChunks, sf.NumChunks())
}
if size != sf.Size() {
t.Fatalf("Expected size to be %v but was %v", size, sf.Size())
}
}
// Increase the size of the file by 1 chunk.
expectedChunks++
expectedSize += sf.ChunkSize()
err := sf.GrowNumChunks(expectedChunks)
if err != nil {
t.Fatal(err)
}
// Check the file after growing the chunks.
checkFile(sf, expectedChunks, expectedSize)
// Load the file from disk again to also check that persistence works.
sf, err = LoadSiaFile(sf.siaFilePath, wal)
if err != nil {
t.Fatal(err)
}
// Check that size and chunks still match.
checkFile(sf, expectedChunks, expectedSize)
// Call GrowNumChunks with the same argument again. This should be a no-op.
err = sf.GrowNumChunks(expectedChunks)
if err != nil {
t.Fatal(err)
}
// Check the file after growing the chunks.
checkFile(sf, expectedChunks, expectedSize)
// Load the file from disk again to also check that no wrong persistence
// happened.
sf, err = LoadSiaFile(sf.siaFilePath, wal)
if err != nil {
t.Fatal(err)
}
// Check that size and chunks still match.
checkFile(sf, expectedChunks, expectedSize)
// Grow the file by 2 chunks to see if multiple chunks also work.
expectedChunks += 2
expectedSize += 2 * sf.ChunkSize()
err = sf.GrowNumChunks(expectedChunks)
if err != nil {
t.Fatal(err)
}
// Check the file after growing the chunks.
checkFile(sf, expectedChunks, expectedSize)
// Load the file from disk again to also check that persistence works.
sf, err = LoadSiaFile(sf.siaFilePath, wal)
if err != nil {
t.Fatal(err)
}
// Check that size and chunks still match.
checkFile(sf, expectedChunks, expectedSize)
}
// TestPruneHosts is a unit test for the pruneHosts method.
func TestPruneHosts(t *testing.T) {
if testing.Short() {
t.SkipNow()
......@@ -75,7 +149,7 @@ func TestPruneHosts(t *testing.T) {
// Add one piece for every host to every pieceSet of the SiaFile.
for _, hk := range sf.HostPublicKeys() {
for chunkIndex, chunk := range sf.staticChunks {
for chunkIndex, chunk := range sf.chunks {
for pieceIndex := range chunk.Pieces {
if err := sf.AddPiece(hk, uint64(chunkIndex), uint64(pieceIndex), crypto.Hash{}); err != nil {
t.Fatal(err)
......@@ -103,8 +177,8 @@ func TestPruneHosts(t *testing.T) {
// Loop over all the pieces and make sure that the pieces with missing
// hosts were pruned and that the remaining pieces have the correct offset
// now.
for chunkIndex := range sf.staticChunks {
for _, pieceSet := range sf.staticChunks[chunkIndex].Pieces {
for chunkIndex := range sf.chunks {
for _, pieceSet := range sf.chunks[chunkIndex].Pieces {
if len(pieceSet) != 1 {
t.Fatalf("Expected 1 piece in the set but was %v", len(pieceSet))
}
......@@ -147,7 +221,7 @@ func TestDefragChunk(t *testing.T) {
sf := newBlankTestFile()
// Use the first chunk of the file for testing.
chunk := &sf.staticChunks[0]
chunk := &sf.chunks[0]
// Add 100 pieces to each set of pieces, all belonging to the same unused
// host.
......@@ -223,7 +297,7 @@ func TestDefragChunk(t *testing.T) {
var duration time.Duration
for i := 0; i < 50; i++ {
pk := sf.pubKeyTable[fastrand.Intn(len(sf.pubKeyTable))].PublicKey
pieceIndex := fastrand.Intn(len(sf.staticChunks[0].Pieces))
pieceIndex := fastrand.Intn(len(sf.chunks[0].Pieces))
before := time.Now()
if err := sf.AddPiece(pk, 0, uint64(pieceIndex), crypto.Hash{}); err != nil {
t.Fatal(err)
......@@ -266,7 +340,7 @@ func TestChunkHealth(t *testing.T) {
t.Fatal(err)
}
// Check that the number of chunks in the file is correct.
if len(sf.staticChunks) != numChunks {
if len(sf.chunks) != numChunks {
t.Fatal("newTestFile didn't create the expected number of chunks")
}
......@@ -283,7 +357,7 @@ func TestChunkHealth(t *testing.T) {
// Since we are using a pre set offlineMap, all the chunks should have the
// same health as the file
for i := range sf.staticChunks {
for i := range sf.chunks {
chunkHealth := sf.chunkHealth(i, offlineMap, goodForRenewMap)
if chunkHealth != fileHealth {
t.Log("ChunkHealth:", chunkHealth)
......@@ -330,7 +404,7 @@ func TestChunkHealth(t *testing.T) {
// Mark Chunk at index 1 as stuck and confirm that doesn't impact the result
// of chunkHealth
sf.staticChunks[1].Stuck = true
sf.chunks[1].Stuck = true
if sf.chunkHealth(1, offlineMap, goodForRenewMap) != newHealth {
t.Fatalf("Expected file to be %v, got %v", newHealth, sf.chunkHealth(1, offlineMap, goodForRenewMap))
}
......@@ -361,7 +435,7 @@ func TestMarkHealthyChunksAsUnstuck(t *testing.T) {
t.Fatal(err)
}
// Check that the number of chunks in the file is correct.
if len(sf.staticChunks) != numChunks {
if len(sf.chunks) != numChunks {
t.Fatal("newTestFile didn't create the expected number of chunks")
}
......@@ -389,7 +463,7 @@ func TestMarkHealthyChunksAsUnstuck(t *testing.T) {
}
// Add good pieces to first chunk
for pieceIndex := range sf.staticChunks[0].Pieces {
for pieceIndex := range sf.chunks[0].Pieces {
host := fmt.Sprintln("host", 0, pieceIndex)
spk := types.SiaPublicKey{}
spk.LoadString(host)
......@@ -416,7 +490,7 @@ func TestMarkHealthyChunksAsUnstuck(t *testing.T) {
goodForRenewMap = make(map[string]bool)
// Add good pieces to all chunks
for chunkIndex, chunk := range sf.staticChunks {
for chunkIndex, chunk := range sf.chunks {
for pieceIndex := range chunk.Pieces {
host := fmt.Sprintln("host", chunkIndex, pieceIndex)
spk := types.SiaPublicKey{}
......@@ -465,7 +539,7 @@ func TestMarkUnhealthyChunksAsStuck(t *testing.T) {
t.Fatal(err)
}
// Check that the number of chunks in the file is correct.
if len(sf.staticChunks) != numChunks {
if len(sf.chunks) != numChunks {
t.Fatal("newTestFile didn't create the expected number of chunks")
}
......@@ -485,14 +559,14 @@ func TestMarkUnhealthyChunksAsStuck(t *testing.T) {
}
// Reset chunk stuck status
for chunkIndex := range sf.staticChunks {
for chunkIndex := range sf.chunks {
if err := sf.SetStuck(uint64(chunkIndex), false); err != nil {
t.Fatal(err)
}
}
// Add good pieces to first chunk
for pieceIndex := range sf.staticChunks[0].Pieces {
for pieceIndex := range sf.chunks[0].Pieces {
host := fmt.Sprintln("host", 0, pieceIndex)
spk := types.SiaPublicKey{}
spk.LoadString(host)
......@@ -515,7 +589,7 @@ func TestMarkUnhealthyChunksAsStuck(t *testing.T) {
}
// Reset chunk stuck status
for chunkIndex := range sf.staticChunks {
for chunkIndex := range sf.chunks {
if err := sf.SetStuck(uint64(chunkIndex), false); err != nil {
t.Fatal(err)
}
......@@ -526,7 +600,7 @@ func TestMarkUnhealthyChunksAsStuck(t *testing.T) {
goodForRenewMap = make(map[string]bool)
// Add good pieces to all chunks
for chunkIndex, chunk := range sf.staticChunks {
for chunkIndex, chunk := range sf.chunks {
for pieceIndex := range chunk.Pieces {
host := fmt.Sprintln("host", chunkIndex, pieceIndex)
spk := types.SiaPublicKey{}
......@@ -567,7 +641,7 @@ func TestStuckChunks(t *testing.T) {
// Mark every other chunk as stuck
expectedStuckChunks := 0
for i := range sf.staticChunks {
for i := range sf.chunks {
if (i % 2) != 0 {
continue
}
......@@ -612,7 +686,7 @@ func TestStuckChunks(t *testing.T) {
}
// Check chunks and Stuck Chunk Table
for i, chunk := range sf.staticChunks {
for i, chunk := range sf.chunks {
if i%2 != 0 {
if chunk.Stuck {
t.Fatal("Found stuck chunk when un-stuck chunk was expected")
......
......@@ -136,14 +136,14 @@ func (sf *siaFileSetEntry) Snapshot() *Snapshot {
pkt := make([]HostPublicKey, len(sf.pubKeyTable))
copy(pkt, sf.pubKeyTable)
chunks := make([]Chunk, 0, len(sf.staticChunks))
chunks := make([]Chunk, 0, len(sf.chunks))
// Figure out how much memory we need to allocate for the piece sets and
// pieces.
var numPieceSets, numPieces int
for chunkIndex := range sf.staticChunks {
numPieceSets += len(sf.staticChunks[chunkIndex].Pieces)
for pieceIndex := range sf.staticChunks[chunkIndex].Pieces {
numPieces += len(sf.staticChunks[chunkIndex].Pieces[pieceIndex])
for chunkIndex := range sf.chunks {
numPieceSets += len(sf.chunks[chunkIndex].Pieces)
for pieceIndex := range sf.chunks[chunkIndex].Pieces {
numPieces += len(sf.chunks[chunkIndex].Pieces[pieceIndex])
}
}
// Allocate all the piece sets and pieces at once.
......@@ -151,13 +151,13 @@ func (sf *siaFileSetEntry) Snapshot() *Snapshot {
allPieces := make([]Piece, numPieces)
// Copy chunks.
for chunkIndex := range sf.staticChunks {
pieces := allPieceSets[:len(sf.staticChunks[chunkIndex].Pieces)]
allPieceSets = allPieceSets[len(sf.staticChunks[chunkIndex].Pieces):]
for chunkIndex := range sf.chunks {
pieces := allPieceSets[:len(sf.chunks[chunkIndex].Pieces)]
allPieceSets = allPieceSets[len(sf.chunks[chunkIndex].Pieces):]
for pieceIndex := range pieces {
pieces[pieceIndex] = allPieces[:len(sf.staticChunks[chunkIndex].Pieces[pieceIndex])]
allPieces = allPieces[len(sf.staticChunks[chunkIndex].Pieces[pieceIndex]):]
for i, piece := range sf.staticChunks[chunkIndex].Pieces[pieceIndex] {
pieces[pieceIndex] = allPieces[:len(sf.chunks[chunkIndex].Pieces[pieceIndex])]
allPieces = allPieces[len(sf.chunks[chunkIndex].Pieces[pieceIndex]):]
for i, piece := range sf.chunks[chunkIndex].Pieces[pieceIndex] {
pieces[pieceIndex][i] = Piece{
HostPubKey: sf.pubKeyTable[piece.HostTableOffset].PublicKey,
MerkleRoot: piece.MerkleRoot,
......@@ -176,7 +176,7 @@ func (sf *siaFileSetEntry) Snapshot() *Snapshot {
return &Snapshot{
staticChunks: chunks,
staticFileSize: sf.staticMetadata.StaticFileSize,
staticFileSize: sf.staticMetadata.FileSize,
staticPieceSize: sf.staticMetadata.StaticPieceSize,
staticErasureCode: sf.staticMetadata.staticErasureCode,
staticMasterKey: mk,
......
......@@ -24,12 +24,12 @@ func TestSnapshot(t *testing.T) {
snap := sf.Snapshot()
// Make sure the snapshot has the same fields as the SiaFile.
if len(sf.staticChunks) != len(snap.staticChunks) {
t.Errorf("expected %v chunks but got %v", len(sf.staticChunks), len(snap.staticChunks))
if len(sf.chunks) != len(snap.staticChunks) {
t.Errorf("expected %v chunks but got %v", len(sf.chunks), len(snap.staticChunks))
}
if sf.staticMetadata.StaticFileSize != snap.staticFileSize {
if sf.staticMetadata.FileSize != snap.staticFileSize {
t.Errorf("staticFileSize was %v but should be %v",
snap.staticFileSize, sf.staticMetadata.StaticFileSize)
snap.staticFileSize, sf.staticMetadata.FileSize)
}
if sf.staticMetadata.StaticPieceSize != snap.staticPieceSize {
t.Errorf("staticPieceSize was %v but should be %v",
......@@ -61,7 +61,7 @@ func TestSnapshot(t *testing.T) {
}
sf.siaFileSet.mu.Unlock()
// Compare the pieces.
for i := range sf.staticChunks {
for i := range sf.chunks {
sfPieces, err1 := sf.Pieces(uint64(i))
snapPieces, err2 := snap.Pieces(uint64(i))
if err := errors.Compose(err1, err2); err != nil {
......@@ -125,7 +125,7 @@ func benchmarkSnapshot(b *testing.B, fileSize uint64) {
// Add numPieces to each chunks.
for i := uint64(0); i < sf.NumChunks(); i++ {
for j := uint64(0); j < uint64(rc.NumPieces()); j++ {
sf.staticChunks[i].Pieces[j] = append(sf.staticChunks[i].Pieces[j], piece{})
sf.chunks[i].Pieces[j] = append(sf.chunks[i].Pieces[j], piece{})
}
}
// Save the file to disk.
......
package renter
import (
"bytes"
"fmt"
"io"
"os"
......@@ -42,6 +43,7 @@ type unfinishedUploadChunk struct {
piecesNeeded int // number of pieces to achieve a 100% complete upload
stuck bool // indicates if the chunk was marked as stuck during last repair
stuckRepair bool // indicates if the chunk was identified for repair by the stuck loop
priority bool // indicates if the chunks is supposed to be repaird asap
// The logical data is the data that is presented to the user when the user
// requests the chunk. The physical data is all of the pieces that get
......@@ -49,6 +51,10 @@ type unfinishedUploadChunk struct {
logicalChunkData [][]byte
physicalChunkData [][]byte
// sourceReader is an optional source for the logical chunk data. If
// available it will be tried before the repair path or remote repair.
sourceReader io.ReadCloser
// Worker synchronization fields. The mutex only protects these fields.
//
// When a worker passes over a piece for upload to go on standby:
......@@ -335,6 +341,34 @@ func (r *Renter) managedFetchLogicalChunkData(chunk *unfinishedUploadChunk) erro
chunkHealth := 1 - (float64(chunk.piecesCompleted-chunk.minimumPieces) / numParityPieces)
download := chunkHealth >= siafile.RemoteRepairDownloadThreshold
// If a sourceReader is available, use it.
var err error
if chunk.sourceReader != nil {
// Read up to chunk.length bytes from the stream.
byteBuf := make([]byte, chunk.length)
n, err := io.ReadFull(chunk.sourceReader, byteBuf)
defer chunk.sourceReader.Close()
// Adjust the fileSize. Since we don't know the length of the stream
// beforehand we simply assume that a whole chunk will be added to the
// file. That's why we subtract the difference between the size of a
// chunk and n here.
adjustedSize := chunk.fileEntry.Size() - chunk.length + uint64(n)
if errSize := chunk.fileEntry.SetFileSize(adjustedSize); errSize != nil {
return errors.AddContext(errSize, "failed to adjust FileSize")
}
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
return errors.AddContext(err, "failed to read chunk from sourceReader")
}
// Read the byteBuf into the sharded destination buffer.
buf := NewDownloadDestinationBuffer(chunk.length, chunk.fileEntry.PieceSize())
_, err = buf.ReadFrom(bytes.NewBuffer(byteBuf))
if err == nil || err == io.EOF || err == io.ErrUnexpectedEOF {
chunk.logicalChunkData = buf.buf
return nil
}
return errors.AddContext(err, "failed to get logicalChunkData from stream")
}
// Download the chunk if it's not on disk.
if chunk.fileEntry.LocalPath() == "" && download {
return r.managedDownloadLogicalChunkData(chunk)
......@@ -359,8 +393,8 @@ func (r *Renter) managedFetchLogicalChunkData(chunk *unfinishedUploadChunk) erro
// TODO: Once we have enabled support for small chunks, we should stop
// needing to ignore the EOF errors, because the chunk size should always
// match the tail end of the file. Until then, we ignore io.EOF.
buf := NewDownloadDestinationBuffer(chunk.length, chunk.fileEntry.PieceSize())
sr := io.NewSectionReader(osFile, chunk.offset, int64(chunk.length))
buf := NewDownloadDestinationBuffer(chunk.length, chunk.fileEntry.PieceSize())
_, err = buf.ReadFrom(sr)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF && download {
r.log.Debugln("failed to read file, downloading instead:", err)
......
......@@ -11,6 +11,7 @@ import (
"gitlab.com/NebulousLabs/Sia/modules"
"gitlab.com/NebulousLabs/Sia/modules/renter/siafile"
"gitlab.com/NebulousLabs/Sia/types"
"gitlab.com/NebulousLabs/errors"
"gitlab.com/NebulousLabs/fastrand"
......@@ -36,6 +37,14 @@ type uploadChunkHeap []*unfinishedUploadChunk
// Implementation of heap.Interface for uploadChunkHeap.
func (uch uploadChunkHeap) Len() int { return len(uch) }
func (uch uploadChunkHeap) Less(i, j int) bool {
// If chunk i is high priority, return true to prioritize it.
if uch[i].priority {
return true
}
// If chunk j is high priority, return false to prioritize it.
if uch[j].priority {
return false
}
// If the chunks have the same stuck status, check which chunk has the lower
// completion percentage.
if uch[i].stuck == uch[j].stuck {
......@@ -133,6 +142,92 @@ func (uh *uploadHeap) managedPop() (uc *unfinishedUploadChunk) {
return uc
}
// buildUnfinishedChunk will pull out a single unfinished chunk of a file.
func (r *Renter) buildUnfinishedChunk(entry *siafile.SiaFileSetEntry, chunkIndex uint64, hosts map[string]struct{}, hostPublicKeys map[string]types.SiaPublicKey, priority bool) *unfinishedUploadChunk {
uuc := &unfinishedUploadChunk{
fileEntry: entry.CopyEntry(),
id: uploadChunkID{
fileUID: entry.UID(),
index: chunkIndex,
},