Commit 90aa3408 authored by David Vorick's avatar David Vorick Committed by GitHub

Merge pull request #2178 from NebulousLabs/renter-fixes

Further renter fixes
parents c55cdf3a 512bce46
......@@ -123,23 +123,29 @@ type (
)
// newSectionDownload initialises and returns a download object for the specified chunk.
func (r *Renter) newSectionDownload(f *file, destination modules.DownloadWriter, currentContracts map[modules.NetAddress]types.FileContractID, offset, length uint64) *download {
func (r *Renter) newSectionDownload(f *file, destination modules.DownloadWriter, offset, length uint64) *download {
d := newDownload(f, destination)
if length == 0 {
build.Critical("download length should not be zero")
d.fail(errors.New("download length should not be zero"))
return d
}
// Settings specific to a chunk download.
d.offset = offset
d.length = length
// Calculate chunks to download.
minChunk := offset / f.chunkSize()
maxChunk := (offset + length) / f.chunkSize()
maxChunk := (offset + length - 1) / f.chunkSize() // maxChunk is 1-indexed
// mark the chunks as not being downloaded yet
for i := minChunk; i <= maxChunk; i++ {
d.finishedChunks[i] = false
}
d.initPieceSet(f, currentContracts, r)
d.initPieceSet(f, r)
return d
}
......@@ -160,8 +166,7 @@ func newDownload(f *file, destination modules.DownloadWriter) *download {
}
// initPieceSet initialises the piece set, including calculations of the total download size.
func (d *download) initPieceSet(f *file,
currentContracts map[modules.NetAddress]types.FileContractID, r *Renter) {
func (d *download) initPieceSet(f *file, r *Renter) {
// Allocate the piece size and progress bar so that the download will
// finish at exactly 100%. Due to rounding error and padding, there is not
// a strict mapping between 'progress' and 'bytes downloaded' - it is
......@@ -583,14 +588,13 @@ func (r *Renter) managedWaitOnDownloadWork(ds *downloadState) {
}
// Add this returned piece to the appropriate chunk.
cd.completedPieces[finishedDownload.pieceIndex] = finishedDownload.data
atomic.AddUint64(&cd.download.atomicDataReceived, cd.download.reportedPieceSize)
if cd.download.downloadErr != nil {
r.log.Debugln("Piece succeeded but download failed; removing active piece")
ds.activePieces--
if _, ok := cd.completedPieces[finishedDownload.pieceIndex]; ok {
r.log.Debugln("Piece", finishedDownload.pieceIndex, "already added")
ds.incompleteChunks = append(ds.incompleteChunks, cd)
return
}
cd.completedPieces[finishedDownload.pieceIndex] = finishedDownload.data
atomic.AddUint64(&cd.download.atomicDataReceived, cd.download.reportedPieceSize)
// If the chunk has completed, perform chunk recovery.
if len(cd.completedPieces) == cd.download.erasureCode.MinPieces() {
......
......@@ -7,7 +7,6 @@ import (
"sync/atomic"
"github.com/NebulousLabs/Sia/modules"
"github.com/NebulousLabs/Sia/types"
)
// Download performs a file download using the passed parameters.
......@@ -48,12 +47,6 @@ func (r *Renter) Download(p modules.RenterDownloadParameters) error {
dw = NewDownloadFileWriter(p.Destination, p.Offset, p.Length)
}
// Build current contracts map.
currentContracts := make(map[modules.NetAddress]types.FileContractID)
for _, contract := range r.hostContractor.Contracts() {
currentContracts[contract.NetAddress] = contract.ID
}
// sentinel: if length == 0, download the entire file
if p.Length == 0 {
p.Length = file.size - p.Offset
......@@ -64,7 +57,7 @@ func (r *Renter) Download(p modules.RenterDownloadParameters) error {
}
// Create the download object and add it to the queue.
d := r.newSectionDownload(file, dw, currentContracts, p.Offset, p.Length)
d := r.newSectionDownload(file, dw, p.Offset, p.Length)
lockID = r.mu.Lock()
r.downloadQueue = append(r.downloadQueue, d)
......
......@@ -29,7 +29,6 @@ type Downloader struct {
// the underlying contract to pay the host proportionally to the data
// retrieve.
func (hd *Downloader) Sector(root crypto.Hash) (_ modules.RenterContract, _ []byte, err error) {
extendDeadline(hd.conn, modules.NegotiateDownloadTime)
defer extendDeadline(hd.conn, time.Hour) // reset deadline when finished
// calculate price
......@@ -48,6 +47,7 @@ func (hd *Downloader) Sector(root crypto.Hash) (_ modules.RenterContract, _ []by
rev := newDownloadRevision(hd.contract.LastRevision, sectorPrice)
// initiate download by confirming host settings
extendDeadline(hd.conn, modules.NegotiateSettingsTime)
if err := startDownload(hd.conn, hd.host); err != nil {
return modules.RenterContract{}, nil, err
}
......@@ -64,6 +64,7 @@ func (hd *Downloader) Sector(root crypto.Hash) (_ modules.RenterContract, _ []by
}
// send download action
extendDeadline(hd.conn, 2*time.Minute)
err = encoding.WriteObject(hd.conn, []modules.DownloadAction{{
MerkleRoot: root,
Offset: 0,
......@@ -83,6 +84,7 @@ func (hd *Downloader) Sector(root crypto.Hash) (_ modules.RenterContract, _ []by
}()
// send the revision to the host for approval
extendDeadline(hd.conn, 2*time.Minute)
signedTxn, err := negotiateRevision(hd.conn, rev, hd.contract.SecretKey)
if err == modules.ErrStopResponse {
// if host gracefully closed, close our connection as well; this will
......@@ -94,6 +96,7 @@ func (hd *Downloader) Sector(root crypto.Hash) (_ modules.RenterContract, _ []by
}
// read sector data, completing one iteration of the download loop
extendDeadline(hd.conn, modules.NegotiateDownloadTime)
var sectors [][]byte
if err := encoding.ReadObject(hd.conn, &sectors, modules.SectorSize+16); err != nil {
return modules.RenterContract{}, nil, err
......
......@@ -77,16 +77,20 @@ func (he *Editor) Close() error {
// host for approval. If negotiation is successful, it updates the underlying
// Contract.
func (he *Editor) runRevisionIteration(actions []modules.RevisionAction, rev types.FileContractRevision, newRoots []crypto.Hash) (err error) {
// Increase Successful/Failed interactions accordingly
defer func() {
// Increase Successful/Failed interactions accordingly
if err != nil {
he.hdb.IncrementFailedInteractions(he.contract.HostPublicKey)
} else {
he.hdb.IncrementSuccessfulInteractions(he.contract.HostPublicKey)
}
// reset deadline
extendDeadline(he.conn, time.Hour)
}()
// initiate revision
extendDeadline(he.conn, modules.NegotiateSettingsTime)
if err := startRevision(he.conn, he.host); err != nil {
return err
}
......@@ -103,11 +107,13 @@ func (he *Editor) runRevisionIteration(actions []modules.RevisionAction, rev typ
}
// send actions
extendDeadline(he.conn, modules.NegotiateFileContractRevisionTime)
if err := encoding.WriteObject(he.conn, actions); err != nil {
return err
}
// send revision to host and exchange signatures
extendDeadline(he.conn, 2*time.Minute)
signedTxn, err := negotiateRevision(he.conn, rev, he.contract.SecretKey)
if err == modules.ErrStopResponse {
// if host gracefully closed, close our connection as well; this will
......@@ -127,10 +133,6 @@ func (he *Editor) runRevisionIteration(actions []modules.RevisionAction, rev typ
// Upload negotiates a revision that adds a sector to a file contract.
func (he *Editor) Upload(data []byte) (modules.RenterContract, crypto.Hash, error) {
// allot 10 minutes for this exchange; sufficient to transfer 4 MB over 50 kbps
extendDeadline(he.conn, modules.NegotiateFileContractRevisionTime)
defer extendDeadline(he.conn, time.Hour) // reset deadline
// calculate price
// TODO: height is never updated, so we'll wind up overpaying on long-running uploads
blockBytes := types.NewCurrency64(modules.SectorSize * uint64(he.contract.FileContract.WindowEnd-he.height))
......@@ -182,10 +184,6 @@ func (he *Editor) Upload(data []byte) (modules.RenterContract, crypto.Hash, erro
// Delete negotiates a revision that removes a sector from a file contract.
func (he *Editor) Delete(root crypto.Hash) (modules.RenterContract, error) {
// allot 2 minutes for this exchange
extendDeadline(he.conn, 120*time.Second)
defer extendDeadline(he.conn, time.Hour) // reset deadline
// calculate the new Merkle root
newRoots := make([]crypto.Hash, 0, len(he.contract.MerkleRoots))
index := -1
......@@ -217,10 +215,6 @@ func (he *Editor) Delete(root crypto.Hash) (modules.RenterContract, error) {
// Modify negotiates a revision that edits a sector in a file contract.
func (he *Editor) Modify(oldRoot, newRoot crypto.Hash, offset uint64, newData []byte) (modules.RenterContract, error) {
// allot 10 minutes for this exchange; sufficient to transfer 4 MB over 50 kbps
extendDeadline(he.conn, modules.NegotiateFileContractRevisionTime)
defer extendDeadline(he.conn, time.Hour) // reset deadline
// calculate price
sectorBandwidthPrice := he.host.UploadBandwidthPrice.Mul64(uint64(len(newData)))
if he.contract.RenterFunds().Cmp(sectorBandwidthPrice) < 0 {
......
......@@ -13,7 +13,6 @@ import (
"github.com/NebulousLabs/Sia/build"
"github.com/NebulousLabs/Sia/crypto"
"github.com/NebulousLabs/Sia/modules"
"github.com/NebulousLabs/Sia/types"
)
......@@ -319,12 +318,6 @@ func (r *Renter) managedDownloadChunkData(rs *repairState, file *file, offset ui
rs.downloadingChunks[chunkID] = struct{}{}
defer delete(rs.downloadingChunks, chunkID)
// build current contracts map
currentContracts := make(map[modules.NetAddress]types.FileContractID)
for _, contract := range r.hostContractor.Contracts() {
currentContracts[contract.NetAddress] = contract.ID
}
downloadSize := file.chunkSize()
if offset+downloadSize > file.size {
downloadSize = file.size - offset
......@@ -334,7 +327,7 @@ func (r *Renter) managedDownloadChunkData(rs *repairState, file *file, offset ui
buf := NewDownloadBufferWriter(file.chunkSize(), int64(offset))
// create the download object and push it on to the download queue
d := r.newSectionDownload(file, buf, currentContracts, offset, downloadSize)
d := r.newSectionDownload(file, buf, offset, downloadSize)
done := make(chan struct{})
defer close(done)
go func() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment