Commit 13b7735e authored by Luke Champine's avatar Luke Champine

Merge branch 'download-close' into 'master'

Close Downloader if chunk download is completed

See merge request !3267
parents 33bf5199 d16bf80a
Pipeline #39623673 passed with stages
in 32 minutes and 19 seconds
......@@ -65,13 +65,11 @@ func (s *streamer) Read(p []byte) (n int, err error) {
}
// Calculate how much we can download. We never download more than a single chunk.
chunkSize := s.file.staticChunkSize()
remainingData := uint64(fileSize - s.offset)
requestedData := uint64(len(p))
remainingChunk := chunkSize - uint64(s.offset)%chunkSize
length := min(remainingData, requestedData, remainingChunk)
length := min(remainingData, requestedData)
// Download data
// Download data.
buffer := bytes.NewBuffer([]byte{})
d, err := s.r.managedNewDownload(downloadParams{
destination: newDownloadDestinationWriteCloserFromWriter(buffer),
......
......@@ -2,6 +2,7 @@ package proto
import (
"net"
"strings"
"sync"
"time"
......@@ -83,10 +84,12 @@ func (hd *Downloader) Sector(root crypto.Hash) (_ modules.RenterContract, _ []by
// Increase Successful/Failed interactions accordingly
defer func() {
if err != nil {
// Ignore ErrStopResponse and closed network connecton errors since
// they are not considered a failed interaction with the host.
if err != nil && err != modules.ErrStopResponse && !strings.Contains(err.Error(), "use of closed network connection") {
hd.hdb.IncrementFailedInteractions(contract.HostPublicKey())
err = errors.Extend(err, modules.ErrHostFault)
} else if err == nil {
} else {
hd.hdb.IncrementSuccessfulInteractions(contract.HostPublicKey())
}
}()
......@@ -101,9 +104,9 @@ func (hd *Downloader) Sector(root crypto.Hash) (_ modules.RenterContract, _ []by
extendDeadline(hd.conn, connTimeout)
signedTxn, err := negotiateRevision(hd.conn, rev, contract.SecretKey, hd.height)
if err == modules.ErrStopResponse {
// if host gracefully closed, close our connection as well; this will
// cause the next download to fail. However, we must delay closing
// until we've finished downloading the sector.
// If the host wants to stop communicating after this iteration, close
// our connection; this will cause the next download to fail. However,
// we must delay closing until we've finished downloading the sector.
defer hd.conn.Close()
} else if err != nil {
return modules.RenterContract{}, nil, err
......
......@@ -124,13 +124,11 @@ func (sc *streamCache) pruneCache(size uint64) {
// TODO: in the future we might need cache invalidation. At the
// moment this doesn't worry us since our files are static.
func (sc *streamCache) Retrieve(udc *unfinishedDownloadChunk) bool {
udc.mu.Lock()
defer udc.mu.Unlock()
sc.mu.Lock()
defer sc.mu.Unlock()
cd, cached := sc.streamMap[udc.staticCacheID]
if !cached {
sc.mu.Unlock()
return false
}
......@@ -138,6 +136,10 @@ func (sc *streamCache) Retrieve(udc *unfinishedDownloadChunk) bool {
cd.lastAccess = time.Now()
sc.streamMap[udc.staticCacheID] = cd
sc.streamHeap.update(cd, cd.id, cd.data, cd.lastAccess)
sc.mu.Unlock()
udc.mu.Lock()
defer udc.mu.Unlock()
start := udc.staticFetchOffset
end := start + udc.staticFetchLength
......
......@@ -34,6 +34,17 @@ func (w *worker) managedDownload(udc *unfinishedDownloadChunk) {
return
}
defer d.Close()
// If the download of the chunk is marked as done or if we are shutting
// down, we close the downloader early to interrupt the download.
go func() {
select {
case <-w.renter.tg.StopChan():
case <-udc.download.completeChan:
}
d.Close()
}()
pieceData, err := d.Sector(udc.staticChunkMap[string(w.contract.HostPublicKey.Key)].root)
if err != nil {
w.renter.log.Debugln("worker failed to download sector:", err)
......@@ -171,7 +182,7 @@ func (w *worker) ownedProcessDownloadChunk(udc *unfinishedDownloadChunk) *unfini
// worker and return nil. Worker only needs to be removed if worker is being
// dropped.
udc.mu.Lock()
chunkComplete := udc.piecesCompleted >= udc.erasureCode.MinPieces()
chunkComplete := udc.piecesCompleted >= udc.erasureCode.MinPieces() || udc.download.staticComplete()
chunkFailed := udc.piecesCompleted+udc.workersRemaining < udc.erasureCode.MinPieces()
pieceData, workerHasPiece := udc.staticChunkMap[string(w.contract.HostPublicKey.Key)]
pieceCompleted := udc.completedPieces[pieceData.index]
......
......@@ -146,6 +146,7 @@ func TestRenterThree(t *testing.T) {
// Specify subtests to run
subTests := []test{
{"TestAllowanceDefaultSet", testAllowanceDefaultSet},
{"TestStreamLargeFile", testStreamLargeFile},
}
// Run tests
......@@ -745,7 +746,7 @@ func testUploadDownload(t *testing.T, tg *siatest.TestGroup) {
// Upload file, creating a piece for each host in the group
dataPieces := uint64(1)
parityPieces := uint64(len(tg.Hosts())) - dataPieces
fileSize := 100 + siatest.Fuzz()
fileSize := fastrand.Intn(2*int(modules.SectorSize)) + siatest.Fuzz() + 2 // between 1 and 2*SectorSize + 3 bytes
localFile, remoteFile, err := renter.UploadNewFileBlocking(fileSize, dataPieces, parityPieces, false)
if err != nil {
t.Fatal("Failed to upload a file for testing: ", err)
......@@ -817,6 +818,30 @@ func testUploadWithAndWithoutForceParameter(t *testing.T, tg *siatest.TestGroup)
}
}
// testStreamLargeFile tests that using the streaming endpoint to download
// multiple chunks works.
func testStreamLargeFile(t *testing.T, tg *siatest.TestGroup) {
// Grab the first of the group's renters
renter := tg.Renters()[0]
// Upload file, creating a piece for each host in the group
dataPieces := uint64(1)
parityPieces := uint64(len(tg.Hosts())) - dataPieces
fileSize := int(10 * siatest.ChunkSize(dataPieces))
localFile, remoteFile, err := renter.UploadNewFileBlocking(fileSize, dataPieces, parityPieces, false)
if err != nil {
t.Fatal("Failed to upload a file for testing: ", err)
}
// Stream the file partially a few times. At least 1 byte is streamed.
for i := 0; i < 5; i++ {
from := fastrand.Intn(fileSize - 1) // [0..fileSize-2]
to := from + 1 + fastrand.Intn(fileSize-from-1) // [from+1..fileSize-1]
_, err = renter.StreamPartial(remoteFile, localFile, uint64(from), uint64(to))
if err != nil {
t.Fatal(err)
}
}
}
// TestRenterInterrupt executes a number of subtests using the same TestGroup to
// save time on initialization
func TestRenterInterrupt(t *testing.T) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment