Commit 7c405049 authored by Luke Champine's avatar Luke Champine

implement Renter functionality

parent ac8126c1
package modules
type RentFileParameters struct {
Filepath string
Nickname string
TotalPieces int
RequiredPieces int
OptimalPieces int
}
import (
"io"
type RentSmallFileParameters struct {
FullFile []byte
"github.com/NebulousLabs/Sia/consensus"
"github.com/NebulousLabs/Sia/hash"
)
type UploadParams struct {
Data io.ReadSeeker
Duration consensus.BlockHeight
Delay consensus.BlockHeight
FileSize uint64
MerkleRoot hash.Hash
// these fields are not seen by the host
Nickname string
TotalPieces int
RequiredPieces int
......@@ -21,10 +26,8 @@ type RentInfo struct {
}
type Renter interface {
Upload(UploadParams) error
Download(nickname, filepath string) error
Info() (RentInfo, error)
Rename(currentName, newName string) error
RentFile(RentFileParameters) error
RentSmallFile(RentSmallFileParameters) error
Info() (RentInfo, error)
}
package renter
import (
"errors"
"io"
"net"
"github.com/NebulousLabs/Sia/consensus"
"github.com/NebulousLabs/Sia/encoding"
"github.com/NebulousLabs/Sia/modules"
)
const (
// TODO: ask wallet
minerFee = 10
)
func (r *Renter) createContractTransaction(host modules.HostEntry, up modules.UploadParams) (t consensus.Transaction, contract consensus.FileContract, err error) {
// get state height
r.state.RLock()
height := r.state.Height()
r.state.RUnlock()
// Fill out the contract according to the whims of the host.
contract = consensus.FileContract{
FileMerkleRoot: up.MerkleRoot,
FileSize: up.FileSize,
Start: height + up.Delay,
End: height + up.Delay + up.Duration,
ValidProofAddress: host.CoinAddress,
MissedProofAddress: consensus.CoinAddress{}, // The empty address is the burn address.
}
// Create the transaction.
id, err := r.wallet.RegisterTransaction(t)
if err != nil {
return
}
fund := host.Price*consensus.Currency(up.Duration+up.Delay)*consensus.Currency(up.FileSize) + minerFee
err = r.wallet.FundTransaction(id, fund)
if err != nil {
return
}
err = r.wallet.AddMinerFee(id, minerFee)
if err != nil {
return
}
err = r.wallet.AddFileContract(id, contract)
if err != nil {
return
}
t, err = r.wallet.SignTransaction(id, false)
if err != nil {
return
}
return
}
func negotiateContract(host modules.HostEntry, t consensus.Transaction, up modules.UploadParams) error {
return host.IPAddress.Call("NegotiateContract", func(conn net.Conn) (err error) {
// send contract
if _, err = encoding.WriteObject(conn, t); err != nil {
return
}
// read response
var response string
if err = encoding.ReadObject(conn, &response, 128); err != nil {
return
}
if response != modules.AcceptContractResponse {
return errors.New(response)
}
// host accepted, so transmit file data
// (no prefix needed, since the host already knows the filesize
_, err = io.CopyN(conn, up.Data, int64(up.FileSize))
// reset seek position
up.Data.Seek(0, 0)
return
})
}
package renter
import (
"errors"
"fmt"
// "io"
"net"
// "os"
"time"
"github.com/NebulousLabs/Sia/consensus"
"github.com/NebulousLabs/Sia/encoding"
"github.com/NebulousLabs/Sia/hash"
"github.com/NebulousLabs/Sia/modules"
)
// TODO: ALSO WARNING: There's a bunch of code duplication here as a result of
// trying to get the release out. If you edit a part of this, make sure both
// halves (small file and big file) get the update, or save us all the trouble
// and dedup the code.
// ClientFundFileContract takes a template FileContract and returns a
// partial transaction containing an input for the contract, but no signatures.
//
// TODO: We need to get the id of the contract before we can start doing
// re-uploading.
func (r *Renter) proposeContract(filename string, duration consensus.BlockHeight) (fp FilePiece, err error) {
err = errors.New("proposeContract is not implemented - needs to be merged with other code")
return
/*
// Open the file, create a merkle hash.
file, err := os.Open(filename)
if err != nil {
return
}
defer file.Close()
info, err := file.Stat()
if err != nil {
return
}
merkle, err := hash.ReaderMerkleRoot(file, hash.CalculateSegments(uint64(info.Size())))
if err != nil {
return
}
// reset read position
if _, err = file.Seek(0, 0); err != nil {
return
}
// Find a host. If the search or the negotiation is unsuccessful,
// hostdb.FlagHost() will be called and another host will be requested. If
// there is an internal error (no hosts, or an unsuccessful flagging for
// example), the loop will break.
var host modules.HostEntry
var fileContract consensus.FileContract
for {
host, err = r.hostDB.RandomHost()
if err != nil {
return
}
// Fill out the contract according to the whims of the host.
// The contract fund: (burn * duration + price * full duration) * filesize
delay := consensus.BlockHeight(20)
contractFund := (host.Price*consensus.Currency(duration+delay) + host.Burn*consensus.Currency(duration)) * consensus.Currency(info.Size())
fileContract = consensus.FileContract{
ContractFund: contractFund,
FileMerkleRoot: merkle,
FileSize: uint64(info.Size()),
Start: r.state.Height() + delay,
End: r.state.Height() + duration + delay,
ChallengeWindow: host.Window,
Tolerance: host.Tolerance,
ValidProofPayout: host.Price * consensus.Currency(info.Size()) * consensus.Currency(host.Window),
ValidProofAddress: host.CoinAddress,
MissedProofPayout: host.Burn * consensus.Currency(info.Size()) * consensus.Currency(host.Window),
MissedProofAddress: consensus.CoinAddress{}, // The empty address is the burn address.
}
// Fund the client portion of the transaction.
minerFee := consensus.Currency(10) // TODO: ask wallet.
renterPortion := host.Price * consensus.Currency(duration+delay) * consensus.Currency(fileContract.FileSize)
var id string
id, err = r.wallet.RegisterTransaction(consensus.Transaction{})
if err != nil {
return
}
err = r.wallet.FundTransaction(id, renterPortion+minerFee)
if err != nil {
return
}
err = r.wallet.AddMinerFee(id, minerFee)
if err != nil {
return
}
err = r.wallet.AddFileContract(id, fileContract)
if err != nil {
return
}
var transaction consensus.Transaction
transaction, err = r.wallet.SignTransaction(id, false)
if err != nil {
return
}
// Negotiate the contract to the host.
err = host.IPAddress.Call("NegotiateContract", func(conn net.Conn) error {
// send contract
if _, err := encoding.WriteObject(conn, transaction); err != nil {
return err
}
// read response
var response string
if err := encoding.ReadObject(conn, &response, 128); err != nil {
return err
}
if response != modules.AcceptContractResponse {
return errors.New(response)
}
// host accepted, so transmit file data
// (no prefix needed, since FileSize is included in the metadata)
_, err = io.CopyN(conn, file, info.Size())
return err
})
if err == nil {
break
}
fmt.Println("Problem from NegotiateContract:", err)
err = r.hostDB.FlagHost(host.ID)
if err != nil {
return
}
}
// Record the file into the renter database.
fp = FilePiece{
Host: host,
Contract: fileContract,
}
return
*/
}
// TODO: Do the uploading in parallel.
func (r *Renter) RentFile(rfp modules.RentFileParameters) (err error) {
r.mu.Lock()
defer r.mu.Unlock()
_, exists := r.files[rfp.Nickname]
if exists {
return errors.New("file of that nickname already exists")
}
// Make an entry for this file.
var pieces []FilePiece
for i := 0; i < rfp.TotalPieces; i++ {
var piece FilePiece
piece, err = r.proposeContract(rfp.Filepath, consensus.BlockHeight(2000+1000*i))
if err != nil {
return
}
pieces = append(pieces, piece)
}
r.files[rfp.Nickname] = FileEntry{Pieces: pieces}
return
}
// TODO: Do the uploading in parallel.
//
// On mutexes: cannot do network stuff with a lock on, so we need to get the
// lock, get the contracts, and then drop the lock.
func (r *Renter) RentSmallFile(rsfp modules.RentSmallFileParameters) (err error) {
r.mu.RLock()
_, exists := r.files[rsfp.Nickname]
if exists {
return errors.New("file of that nickname already exists")
} else if rsfp.Nickname == "" {
return errors.New("cannot use empty string for nickname")
}
r.mu.RUnlock()
// Make an entry for this file.
var pieces []FilePiece
for i := 0; i < rsfp.TotalPieces; i++ {
var piece FilePiece
piece, err = r.proposeSmallContract(rsfp.FullFile, consensus.BlockHeight(800))
if err != nil {
return
}
pieces = append(pieces, piece)
r.mu.Lock()
r.files[rsfp.Nickname] = FileEntry{Pieces: pieces}
r.mu.Unlock()
}
return
}
func (r *Renter) proposeSmallContract(fullFile []byte, duration consensus.BlockHeight) (fp FilePiece, err error) {
merkle, err := hash.BytesMerkleRoot(fullFile)
if err != nil {
return
}
// Find a host. If the search or the negotiation is unsuccessful,
// hostdb.FlagHost() will be called and another host will be requested. If
// there is an internal error (no hosts, or an unsuccessful flagging for
// example), the loop will break.
var host modules.HostEntry
var fileContract consensus.FileContract
var contractID consensus.ContractID
for {
host, err = r.hostDB.RandomHost()
if err != nil {
return
}
// Fill out the contract according to the whims of the host.
// The contract fund: (burn * duration + price * full duration) * filesize
delay := consensus.BlockHeight(20)
contractFund := (host.Price*consensus.Currency(duration+delay) + host.Burn*consensus.Currency(duration)) * consensus.Currency(int64(len(fullFile)))
fileContract = consensus.FileContract{
FileMerkleRoot: merkle,
FileSize: uint64(len(fullFile)),
Start: r.state.Height() + delay,
End: r.state.Height() + duration + delay,
Payout: contractFund,
ValidProofAddress: host.CoinAddress,
MissedProofAddress: consensus.CoinAddress{}, // The empty address is the burn address.
}
// Fund the client portion of the transaction.
minerFee := consensus.Currency(10) // TODO: ask wallet.
renterPortion := host.Price * consensus.Currency(duration+delay) * consensus.Currency(fileContract.FileSize)
var id string
id, err = r.wallet.RegisterTransaction(consensus.Transaction{})
if err != nil {
return
}
// Try to fund the transaction, and wait if there isn't enough money.
err = r.wallet.FundTransaction(id, renterPortion+minerFee)
if err != nil && err != modules.LowBalanceErr {
return
}
for err == modules.LowBalanceErr {
// TODO: This is a dirty hack - the system will try to get the file
// through until it has enough money to actually get the file
// through. Significant problem :(
// There should be no locks at this point.
time.Sleep(time.Second * 30)
err = r.wallet.FundTransaction(id, renterPortion+minerFee)
}
err = r.wallet.AddMinerFee(id, minerFee)
if err != nil {
return
}
err = r.wallet.AddFileContract(id, fileContract)
if err != nil {
return
}
var transaction consensus.Transaction
transaction, err = r.wallet.SignTransaction(id, false)
if err != nil {
return
}
contractID = transaction.FileContractID(0)
// Negotiate the contract to the host.
err = host.IPAddress.Call("NegotiateContract", func(conn net.Conn) error {
// send contract
if _, err := encoding.WriteObject(conn, transaction); err != nil {
return err
}
// read response
var response string
if err := encoding.ReadObject(conn, &response, 128); err != nil {
return err
}
if response != modules.AcceptContractResponse {
return errors.New(response)
}
// host accepted, so transmit file data
// (no prefix needed, since FileSize is included in the metadata)
_, err = conn.Write(fullFile)
return err
})
if err == nil {
break
}
fmt.Println("Problem from NegotiateContract:", err)
err = r.hostDB.FlagHost(host.IPAddress)
if err != nil {
return
}
}
// Record the file into the renter database.
fp = FilePiece{
Host: host,
Contract: fileContract,
ContractID: contractID,
}
return
}
......@@ -2,14 +2,9 @@ package renter
import (
"errors"
"fmt"
"io"
"net"
"os"
"sync"
"github.com/NebulousLabs/Sia/consensus"
"github.com/NebulousLabs/Sia/encoding"
"github.com/NebulousLabs/Sia/modules"
)
......@@ -19,13 +14,9 @@ type FilePiece struct {
ContractID consensus.ContractID // The ID of the contract.
}
type FileEntry struct {
Pieces []FilePiece
}
type Renter struct {
state *consensus.State
files map[string]FileEntry
files map[string][]FilePiece
hostDB modules.HostDB
wallet modules.Wallet
......@@ -50,7 +41,7 @@ func New(state *consensus.State, hdb modules.HostDB, wallet modules.Wallet) (r *
state: state,
hostDB: hdb,
wallet: wallet,
files: make(map[string]FileEntry),
files: make(map[string][]FilePiece),
}
return
}
......@@ -72,55 +63,6 @@ func (r *Renter) Rename(currentName, newName string) error {
return nil
}
func (r *Renter) downloadPiece(piece FilePiece, destination string) (err error) {
return piece.Host.IPAddress.Call("RetrieveFile", func(conn net.Conn) error {
// send filehash
if _, err := encoding.WriteObject(conn, piece.ContractID); err != nil {
return err
}
// TODO: read error
// copy response into file
file, err := os.Create(destination)
if err != nil {
return err
}
_, err = io.CopyN(file, conn, int64(piece.Contract.FileSize))
file.Close()
if err != nil {
os.Remove(destination)
}
return err
})
}
// Download requests a file from the host it was stored with, and downloads it
// into the specified filename.
func (r *Renter) Download(nickname, filename string) (err error) {
entry, exists := r.files[nickname]
if !exists {
return errors.New("no file entry for file: " + nickname)
}
// We just need to get one piece, we'll keep contacting hosts until one
// doesn't return an error.
for _, piece := range entry.Pieces {
err = r.downloadPiece(piece, filename)
if err == nil {
return
} else {
fmt.Println("Renter got error:", err)
r.hostDB.FlagHost(piece.Host.IPAddress)
}
}
if err != nil {
err = errors.New("Too many hosts returned errors - could not recover the file.")
return
}
return
}
func (r *Renter) Info() (ri modules.RentInfo) {
r.mu.RLock()
defer r.mu.RUnlock()
......
package renter
import (
"errors"
"io"
"net"
"os"
"github.com/NebulousLabs/Sia/encoding"
"github.com/NebulousLabs/Sia/modules"
)
const (
maxUploadAttempts = 10
)
// uploadPiece attempts to negotiate a contract with a host. If the negotiate
// is successful, uploadPiece will upload the file contents to the host, and
// return a FilePiece object specifying the host chosen and the details of the
// contract.
func (r *Renter) uploadPiece(up modules.UploadParams) (piece FilePiece, err error) {
// Try 'maxUploadAttempts' hosts before giving up.
for attempts := 0; attempts < maxUploadAttempts; attempts++ {
// Select a host. An error here is unrecoverable.
host, hostErr := r.hostDB.RandomHost()
if hostErr != nil {
err = errors.New("could not get a host from the HostDB: " + hostErr.Error())
return
}
// Create file contract using this host's parameters. An error here is
// unrecoverable.
t, fileContract, txnErr := r.createContractTransaction(host, up)
if txnErr != nil {
err = errors.New("unable to create contract transaction: " + txnErr.Error())
return
}
// Negotiate the contract with the host. If the negotiation is
// successful, the file will be uploaded.
err = negotiateContract(host, t, up)
if err != nil {
// If there was a problem, we need to try again with a new host.
r.hostDB.FlagHost(host.IPAddress)
continue
}
// Otherwise, we're done.
piece = FilePiece{
Host: host,
Contract: fileContract,
}
return
}
err = errors.New("no hosts accepted the file contract")
return
}
// Upload implements the modules.Renter interface. It selects a host to upload
// to, negotiates a contract with it, and uploads the file contents.
func (r *Renter) Upload(up modules.UploadParams) (err error) {
r.mu.RLock()
_, exists := r.files[up.Nickname]
r.mu.RUnlock()
if exists {
return errors.New("file with that nickname already exists")
}
pieces := make([]FilePiece, up.TotalPieces)
for i := range pieces {
// upload the piece to a host. The host is chosen by uploadPiece.
// TODO: what happens if we can't upload to all the hosts?
pieces[i], err = r.uploadPiece(up)
if err != nil {
return
}
}
r.mu.Lock()
r.files[up.Nickname] = pieces
r.mu.Unlock()
return
}
// downloadPiece attempts to retrieve a file from a host.
func (r *Renter) downloadPiece(piece FilePiece, path string) error {
return piece.Host.IPAddress.Call("RetrieveFile", func(conn net.Conn) (err error) {
// send filehash
if _, err = encoding.WriteObject(conn, piece.ContractID); err != nil {
return
}
// TODO: read error
// create file
file, err := os.Create(path)
if err != nil {
return
}
defer file.Close()
// copy response into file
_, err = io.CopyN(file, conn, int64(piece.Contract.FileSize))
if err != nil {
os.Remove(path)
return
}
return
})
}
// Download implements the modules.Renter interface. It requests a file from
// the hosts it was stored with, and downloads it into the specified filename.
func (r *Renter) Download(nickname, filename string) error {
r.mu.RLock()
pieces, exists := r.files[nickname]
r.mu.RUnlock()
if !exists {
return errors.New("no record of file: " + nickname)
}
// We only need one piece, so iterate through the hosts until a download
// succeeds.
// TODO: smarter ordering here? i.e. prioritize known fast hosts?
for _, piece := range pieces {
downloadErr := r.downloadPiece(piece, filename)
if downloadErr == nil {
return nil
}
// log?
r.hostDB.FlagHost(piece.Host.IPAddress)
}
return errors.New("Too many hosts returned errors - could not recover the file")
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment