Commit 686250b9 authored by Matthew Sevey's avatar Matthew Sevey

Merge branch 'master' into ea-fund-missedhostpayout

parents 44b42c78 ced66128
Pipeline #152554745 failed with stages
in 6 minutes and 25 seconds
......@@ -80,7 +80,8 @@ pkgs = ./build \
release-pkgs = ./cmd/siac ./cmd/siad
# lockcheckpkgs are the packages that are checked for locking violations.
lockcheckpkgs = ./modules/renter/hostdb
lockcheckpkgs = ./modules/host/mdm \
./modules/renter/hostdb
# run determines which tests run when running any variation of 'make test'.
run = .
......
- fixed issue where workers would freeze for a bit after a new block appeared
......@@ -410,7 +410,16 @@ Streams a series of consensus changes, starting from the provided change ID.
### Path Parameters
### REQUIRED
**id** | string
The consensus change ID to subscribe from.
The consensus change ID to subscribe from. There are two sentinel values:
to subscribe from the genesis block use:
```
0000000000000000000000000000000000000000000000000000000000000000
```
To skip all existing blocks and subscribe only to subsequent changes, use:
```
0100000000000000000000000000000000000000000000000000000000000000
```
In addition, each consensus change contains its own ID.
### Response
......
......@@ -24,10 +24,11 @@ require (
gitlab.com/NebulousLabs/merkletree v0.0.0-20200118113624-07fbf710afc4
gitlab.com/NebulousLabs/monitor v0.0.0-20191205095550-2b0fd3e1012a
gitlab.com/NebulousLabs/ratelimit v0.0.0-20191111145210-66b93e150b27
gitlab.com/NebulousLabs/siamux v0.0.0-20200518120401-20f1d5034f03
gitlab.com/NebulousLabs/threadgroup v0.0.0-20200518123758-b458460120c6
gitlab.com/NebulousLabs/siamux v0.0.0-20200529161118-72d7f2cfd76d
gitlab.com/NebulousLabs/threadgroup v0.0.0-20200527092543-afa01960408c
gitlab.com/NebulousLabs/writeaheadlog v0.0.0-20190814160017-69f300e9bcb8
golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b // indirect
golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f // indirect
)
......
......@@ -155,12 +155,12 @@ gitlab.com/NebulousLabs/monitor v0.0.0-20191205095550-2b0fd3e1012a/go.mod h1:QxX
gitlab.com/NebulousLabs/ratelimit v0.0.0-20180716154200-1308156c2eaf/go.mod h1:vowDA1cdvtWW678ugB7L/yKT2pCN37aH6zYp9NF5Isc=
gitlab.com/NebulousLabs/ratelimit v0.0.0-20191111145210-66b93e150b27 h1:G8v2awvHcrvulXibvNpzLx3RlTuX0hB+0AZdl18pl50=
gitlab.com/NebulousLabs/ratelimit v0.0.0-20191111145210-66b93e150b27/go.mod h1:hvNy5sMP9gGrNQ7kNgb+vWuiPptqTk4W45bQbbT/vmg=
gitlab.com/NebulousLabs/siamux v0.0.0-20200518120401-20f1d5034f03 h1:+n3skulaiACA3z4wXFCrkYaP2JZvmPYWC4XRqe5hN4c=
gitlab.com/NebulousLabs/siamux v0.0.0-20200518120401-20f1d5034f03/go.mod h1:E7SGW22LVJ3sqIRKcLq/scpqp2BYjwGtUW3qlCXHC3A=
gitlab.com/NebulousLabs/siamux v0.0.0-20200529161118-72d7f2cfd76d h1:X6tD6Bv5JAkt+zBqi4Kn/z3Ukn5dTM832bfn/FhYldY=
gitlab.com/NebulousLabs/siamux v0.0.0-20200529161118-72d7f2cfd76d/go.mod h1:jDvnB27qgEs/XX57+v19hkriIUlUid8Qr/k7Kcy4TJ0=
gitlab.com/NebulousLabs/threadgroup v0.0.0-20180716154133-88a11db9e46c h1:psW9YBmnyKKCddPncr7mwJCx6n7FzlIs1EWIiSo7fyQ=
gitlab.com/NebulousLabs/threadgroup v0.0.0-20180716154133-88a11db9e46c/go.mod h1:w05nvlkvHlk3Vfc7mcU29Toic1X0BcYUnKoTHS0ea2Y=
gitlab.com/NebulousLabs/threadgroup v0.0.0-20200518123758-b458460120c6 h1:Z+YR+b4s+QAknrp8CBwBwcBqKblk5QPJ4yf0G592jYo=
gitlab.com/NebulousLabs/threadgroup v0.0.0-20200518123758-b458460120c6/go.mod h1:av52iTyGuPtGU+GMcqfGtZu2vxhIjPgrxvIwVYelEvs=
gitlab.com/NebulousLabs/threadgroup v0.0.0-20200527092543-afa01960408c h1:MW61ufjI9Psod1dCYDPhVGUC8YHqfqAmmoU5LzI7bpo=
gitlab.com/NebulousLabs/threadgroup v0.0.0-20200527092543-afa01960408c/go.mod h1:av52iTyGuPtGU+GMcqfGtZu2vxhIjPgrxvIwVYelEvs=
gitlab.com/NebulousLabs/writeaheadlog v0.0.0-20190703190009-cb822c37bc94/go.mod h1:Lhpa9AcbWcYKcc4amZsOHqJdQglnkWrGuUI68XC7U2Q=
gitlab.com/NebulousLabs/writeaheadlog v0.0.0-20190814160017-69f300e9bcb8 h1:u74TgFUPYl9G1rYLUKmavwJoF8Li/qJLMSiU5ntafKA=
gitlab.com/NebulousLabs/writeaheadlog v0.0.0-20190814160017-69f300e9bcb8/go.mod h1:Lhpa9AcbWcYKcc4amZsOHqJdQglnkWrGuUI68XC7U2Q=
......@@ -173,6 +173,7 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191105034135-c7e5f84aec59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20191107222254-f4817d981bb6/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200109152110-61a87790db17 h1:nVJ3guKA9qdkEQ3TUdXI9QSINo2CUPM/cySEvw2w8I0=
......@@ -186,6 +187,9 @@ golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 h1:XQyxROzUlZH+WIQwySDgnISgOivlhjIEwaQaJEJrrN0=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b h1:Wh+f8QHJXR411sJR8/vRBTZ7YapZaRvUcLFFJhusH0k=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
......@@ -201,6 +205,7 @@ golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAG
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
......@@ -221,6 +226,9 @@ golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd h1:/e+gpKk9r3dJobndpTytxS2gOy6m5uvpg+ISQoEcusQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7 h1:EBZoQjiKKPaLbPrbpssUfuHtwM6KV/vb4U85g/cigFY=
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
......
......@@ -64,7 +64,6 @@ package host
// TODO: update_test.go has commented out tests.
import (
"container/heap"
"errors"
"fmt"
"net"
......@@ -111,7 +110,7 @@ var (
rpcPriceGuaranteePeriod = build.Select(build.Var{
Standard: 10 * time.Minute,
Dev: 5 * time.Minute,
Testing: 15 * time.Second,
Testing: 1 * time.Minute,
}).(time.Duration)
// pruneExpiredRPCPriceTableFrequency is the frequency at which the host
......@@ -209,7 +208,7 @@ type Host struct {
// 'guaranteed' map.
type hostPrices struct {
current modules.RPCPriceTable
guaranteed map[modules.UniqueID]*modules.RPCPriceTable
guaranteed map[modules.UniqueID]*hostRPCPriceTable
staticMinHeap priceTableHeap
mu sync.RWMutex
}
......@@ -222,7 +221,7 @@ func (hp *hostPrices) managedCurrent() modules.RPCPriceTable {
}
// managedGet returns the price table with given uid
func (hp *hostPrices) managedGet(uid modules.UniqueID) (pt *modules.RPCPriceTable, found bool) {
func (hp *hostPrices) managedGet(uid modules.UniqueID) (pt *hostRPCPriceTable, found bool) {
hp.mu.RLock()
defer hp.mu.RUnlock()
pt, found = hp.guaranteed[uid]
......@@ -240,7 +239,7 @@ func (hp *hostPrices) managedSetCurrent(pt modules.RPCPriceTable) {
// managedTrack adds the given price table to the 'guaranteed' map, that holds
// all of the price tables the host has recently guaranteed to renters. It will
// also add it to the heap which facilates efficient pruning of that map.
func (hp *hostPrices) managedTrack(pt *modules.RPCPriceTable) {
func (hp *hostPrices) managedTrack(pt *hostRPCPriceTable) {
hp.mu.Lock()
hp.guaranteed[pt.UID] = pt
hp.mu.Unlock()
......@@ -277,61 +276,6 @@ type lockedObligation struct {
n uint
}
// priceTableHeap is a helper type that contains a min heap of rpc price tables,
// sorted on their expiry. The heap is guarded by its own mutex and allows for
// peeking at the min expiry.
type priceTableHeap struct {
heap rpcPriceTableHeap
mu sync.Mutex
}
// PopExpired returns the UIDs for all rpc price tables that have expired
func (pth *priceTableHeap) PopExpired() (expired []modules.UniqueID) {
pth.mu.Lock()
defer pth.mu.Unlock()
now := time.Now().Unix()
for {
if pth.heap.Len() == 0 {
return
}
pt := heap.Pop(&pth.heap)
if now < pt.(*modules.RPCPriceTable).Expiry {
heap.Push(&pth.heap, pt)
break
}
expired = append(expired, pt.(*modules.RPCPriceTable).UID)
}
return
}
// Push will add a price table to the heap.
func (pth *priceTableHeap) Push(pt *modules.RPCPriceTable) {
pth.mu.Lock()
defer pth.mu.Unlock()
heap.Push(&pth.heap, pt)
}
// rpcPriceTableHeap is a min heap of rpc price tables
type rpcPriceTableHeap []*modules.RPCPriceTable
// Implementation of heap.Interface for rpcPriceTableHeap.
func (pth rpcPriceTableHeap) Len() int { return len(pth) }
func (pth rpcPriceTableHeap) Less(i, j int) bool { return pth[i].Expiry < pth[j].Expiry }
func (pth rpcPriceTableHeap) Swap(i, j int) { pth[i], pth[j] = pth[j], pth[i] }
func (pth *rpcPriceTableHeap) Push(x interface{}) {
pt := x.(*modules.RPCPriceTable)
*pth = append(*pth, pt)
}
func (pth *rpcPriceTableHeap) Pop() interface{} {
old := *pth
n := len(old)
pt := old[n-1]
*pth = old[0 : n-1]
return pt
}
// checkUnlockHash will check that the host has an unlock hash. If the host
// does not have an unlock hash, an attempt will be made to get an unlock hash
// from the wallet. That may fail due to the wallet being locked, in which case
......@@ -376,11 +320,9 @@ func (h *Host) managedInternalSettings() modules.HostInternalSettings {
// managedUpdatePriceTable will recalculate the RPC costs and update the host's
// price table accordingly.
func (h *Host) managedUpdatePriceTable() {
// create a new RPC price table and set the expiry
// create a new RPC price table
es := h.managedExternalSettings()
priceTable := modules.RPCPriceTable{
Expiry: time.Now().Add(rpcPriceGuaranteePeriod).Unix(),
// TODO: hardcoded cost should be updated to use a better value.
AccountBalanceCost: types.NewCurrency64(1),
FundAccountCost: types.NewCurrency64(1),
......@@ -459,9 +401,9 @@ func newHost(dependencies modules.Dependencies, smDeps modules.Dependencies, cs
dependencies: dependencies,
lockedStorageObligations: make(map[types.FileContractID]*lockedObligation),
staticPriceTables: &hostPrices{
guaranteed: make(map[modules.UniqueID]*modules.RPCPriceTable),
guaranteed: make(map[modules.UniqueID]*hostRPCPriceTable),
staticMinHeap: priceTableHeap{
heap: make([]*modules.RPCPriceTable, 0),
heap: make([]*hostRPCPriceTable, 0),
},
},
persistDir: persistDir,
......
......@@ -28,6 +28,14 @@ import (
"gitlab.com/NebulousLabs/Sia/types"
)
const (
// priceTableExpiryBuffer defines a buffer period that ensures the price
// table is valid for at least as long as the buffer period when we consider
// it valid. This ensures a call to `managedFetchPriceTable` does not return
// a price table that expires the next second.
priceTableExpiryBuffer = 15 * time.Second
)
// A hostTester is the helper object for host testing, including helper modules
// and methods for controlling synchronization.
type (
......@@ -260,7 +268,9 @@ type renterHostPair struct {
staticRenterMux *siamux.SiaMux
staticHT *hostTester
pt *modules.RPCPriceTable
pt *modules.RPCPriceTable
ptExpiry time.Time // keep track of when the price table is set to expire
mu sync.Mutex
}
......@@ -434,8 +444,8 @@ func (p *renterHostPair) managedExecuteProgram(epr modules.RPCExecuteProgramRequ
}
// Read the cancellation token.
ct := make([]byte, modules.MDMCancellationTokenLen)
_, err = io.ReadFull(stream, ct)
var ct modules.MDMCancellationToken
err = modules.RPCRead(stream, &ct)
if err != nil {
return nil, limit, err
}
......@@ -475,20 +485,16 @@ func (p *renterHostPair) managedExecuteProgram(epr modules.RPCExecuteProgramRequ
// managedFetchPriceTable returns the latest price table, if that price table is
// expired it will fetch a new one from the host.
func (p *renterHostPair) managedFetchPriceTable() (*modules.RPCPriceTable, error) {
// fetch a new pricetable if it's about to expire, rather than the second it
// expires. This ensures calls performed immediately after
// `managedFetchPriceTable` is called are set to succeed.
var expiryBuffer int64 = 3
p.mu.Lock()
expired := time.Now().Add(priceTableExpiryBuffer).After(p.ptExpiry)
p.mu.Unlock()
pt := p.managedPriceTable()
if pt.Expiry <= time.Now().Unix()+expiryBuffer {
err := p.managedUpdatePriceTable(true)
if err != nil {
if expired {
if err := p.managedUpdatePriceTable(true); err != nil {
return nil, err
}
return p.managedPriceTable(), nil
}
return pt, nil
return p.managedPriceTable(), nil
}
// managedFundEphemeralAccount will deposit the given amount in the pair's
......@@ -761,6 +767,7 @@ func (p *renterHostPair) managedUpdatePriceTable(payByFC bool) error {
// update the price table
p.mu.Lock()
p.pt = &pt
p.ptExpiry = time.Now().Add(pt.Validity)
p.mu.Unlock()
return nil
......@@ -792,9 +799,6 @@ func TestHostInitialization(t *testing.T) {
if reflect.DeepEqual(ht.host.staticPriceTables.current, modules.RPCPriceTable{}) {
t.Fatal("RPC price table wasn't initialized")
}
if ht.host.staticPriceTables.current.Expiry == 0 {
t.Fatal("RPC price table was not properly initialised")
}
}
// TestHostMultiClose checks that the host returns an error if Close is called
......
......@@ -11,7 +11,8 @@ import (
// newTestWriteStorePriceTable returns a custom price table for the cost tests.
func newTestWriteStorePriceTable() *modules.RPCPriceTable {
pt := &modules.RPCPriceTable{}
pt.Expiry = time.Now().Add(time.Minute).Unix()
pt.Validity = time.Minute
pt.WriteBaseCost = types.ZeroCurrency
pt.WriteLengthCost = types.ZeroCurrency
pt.WriteStoreCost = modules.DefaultStoragePrice
......
......@@ -54,6 +54,8 @@ func newTestStorageObligation(locked bool) *TestStorageObligation {
// BlockHeight returns an incremented blockheight every time it's called.
func (h *TestHost) BlockHeight() types.BlockHeight {
h.mu.Lock()
defer h.mu.Unlock()
h.blockHeight++
return h.blockHeight
}
......@@ -61,7 +63,9 @@ func (h *TestHost) BlockHeight() types.BlockHeight {
// HasSector indicates whether the host stores a sector with a given root or
// not.
func (h *TestHost) HasSector(sectorRoot crypto.Hash) bool {
h.mu.Lock()
_, exists := h.sectors[sectorRoot]
h.mu.Unlock()
return exists
}
......@@ -121,7 +125,8 @@ func (so *TestStorageObligation) Update(sectorRoots []crypto.Hash, sectorsRemove
// for every operation/rpc.
func newTestPriceTable() *modules.RPCPriceTable {
return &modules.RPCPriceTable{
Expiry: time.Now().Add(time.Minute).Unix(),
Validity: time.Minute,
UpdatePriceTableCost: types.NewCurrency64(1),
InitBaseCost: types.NewCurrency64(1),
MemoryTimeCost: types.NewCurrency64(1),
......
......@@ -81,24 +81,29 @@ func (pd *programData) threadedFetchData() {
}
}
for remainingData > 0 {
pd.mu.Lock()
select {
case <-pd.cancel:
pd.mu.Unlock()
quit(errors.New("stop called"))
return
default:
}
pd.mu.Unlock()
// Adjust the length of the packet according to the remaining data.
d := packet[:]
if remainingData <= int64(cap(d)) {
d = d[:remainingData]
}
pd.mu.Lock()
n, err := pd.r.Read(d)
pd.mu.Unlock()
if err != nil {
quit(err)
return
}
pd.mu.Lock()
remainingData -= int64(n)
pd.mu.Lock()
pd.data = append(pd.data, packet[:n]...)
// Sort the request and unlock the ones that are ready to be unlocked.
......@@ -195,7 +200,9 @@ func (pd *programData) Len() uint64 {
// Close will stop the background thread and wait for it to return.
func (pd *programData) Close() error {
pd.mu.Lock()
close(pd.cancel)
pd.mu.Unlock()
pd.wg.Wait()
return nil
}
......@@ -341,28 +341,6 @@ func (h *Host) threadedHandleConn(conn net.Conn) {
}
}
// staticReadPriceTableID receives a stream and reads the price table's UID from
// it, if it's a known UID we return the price table
func (h *Host) staticReadPriceTableID(stream siamux.Stream) (*modules.RPCPriceTable, error) {
// read the price table uid
var uid modules.UniqueID
err := modules.RPCRead(stream, &uid)
if err != nil {
return nil, errors.AddContext(err, "Failed to read price table UID")
}
// check if we know the uid, if we do return it
var found bool
pt, found := h.staticPriceTables.managedGet(uid)
if !found {
return nil, ErrPriceTableNotFound
}
// make sure the table isn't expired.
if pt.Expiry < time.Now().Unix() {
return nil, ErrPriceTableExpired
}
return pt, nil
}
// threadedHandleStream handles incoming SiaMux streams.
func (h *Host) threadedHandleStream(stream siamux.Stream) {
// close the stream when the method terminates
......@@ -424,36 +402,6 @@ func (h *Host) threadedHandleStream(stream siamux.Stream) {
}
}
// staticGetPriceTable receives a stream and reads the price table's UID from
// it, if it's a known UID we return the price table.
//
// NOTE: the price table UID is sent on every RPC call, this to ensure both
// renter and host use the same pricing. If we were to keep the price table
// related to the incoming stream (and thus its mux) as state, we would
// introduce race conditions where host and renter use different price tables
// within the context of a single RPC request. Having the renter specify it on
// every request avoids the possiblity of these race conditions.
func (h *Host) staticGetPriceTable(stream siamux.Stream) (*modules.RPCPriceTable, error) {
// read the price table uid
var uid modules.UniqueID
err := modules.RPCRead(stream, &uid)
if err != nil {
return nil, errors.AddContext(err, "Failed to read price table UID")
}
// check if we know the uid, if we do return it
pt, exists := h.staticPriceTables.managedGet(uid)
if !exists {
return nil, errors.New("Price table not found, it might be expired")
}
// check if it's still valid or if it has expired
if pt.Expiry < time.Now().Unix() {
return nil, errors.New("Price table expired")
}
return pt, nil
}
// threadedListen listens for incoming RPCs and spawns an appropriate handler for each.
func (h *Host) threadedListen(closeChan chan struct{}) {
defer close(closeChan)
......
......@@ -105,12 +105,8 @@ func (h *Host) managedRPCExecuteProgram(stream siamux.Stream) error {
}
// Return 16 bytes of data as a placeholder for a future cancellation token.
// NOTE: We write this to a buffer to save one call to `Write`. In the
// future we might reconsider this once we actually implement cancellation
// since this means the token is only returned after the first instruction
// is done executing.
buffer := bytes.NewBuffer(nil)
_, err = buffer.Write(make([]byte, modules.MDMCancellationTokenLen))
var ct modules.MDMCancellationToken
err = modules.RPCWrite(stream, ct)
if err != nil {
return errors.AddContext(err, "Failed to write cancellation token")
}
......@@ -155,6 +151,9 @@ func (h *Host) managedRPCExecuteProgram(stream siamux.Stream) error {
// Remember that the execution wasn't successful.
executionFailed = output.Error != nil
// Create a buffer
buffer := bytes.NewBuffer(nil)
// Send the response to the peer.
err = modules.RPCWrite(buffer, resp)
if err != nil {
......
......@@ -151,7 +151,7 @@ func TestExecuteReadSectorProgram(t *testing.T) {
// this particular program on the "renter" side. This way we can test that
// the bandwidth measured by the renter is large enough to be accepted by
// the host.
expectedDownload := uint64(5840) // download
expectedDownload := uint64(7300) // download
expectedUpload := uint64(10220) // upload
downloadCost := pt.DownloadBandwidthCost.Mul64(expectedDownload)
uploadCost := pt.UploadBandwidthCost.Mul64(expectedUpload)
......@@ -439,7 +439,7 @@ func TestExecuteHasSectorProgram(t *testing.T) {
// this particular program on the "renter" side. This way we can test that
// the bandwidth measured by the renter is large enough to be accepted by
// the host.
expectedDownload := uint64(2920) // download
expectedDownload := uint64(4380) // download
expectedUpload := uint64(10220) // upload
downloadCost := pt.DownloadBandwidthCost.Mul64(expectedDownload)
uploadCost := pt.UploadBandwidthCost.Mul64(expectedUpload)
......
package host
import (
"container/heap"
"encoding/json"
"sync"
"time"
"gitlab.com/NebulousLabs/Sia/modules"
......@@ -13,13 +15,82 @@ import (
var (
// ErrPriceTableNotFound is returned when the price table for a certain UID
// can not be found in the tracked price tables
ErrPriceTableNotFound = errors.New("Price table not found, it might be expired")
ErrPriceTableNotFound = errors.New("Price table not found")
// ErrPriceTableExpired is returned when the specified price table has
// expired
ErrPriceTableExpired = errors.New("Price table requested is expired")
)
type (
// priceTableHeap is a helper type that contains a min heap of rpc price
// tables, sorted on their expiry. The heap is guarded by its own mutex and
// allows for peeking at the min expiry.
priceTableHeap struct {
heap rpcPriceTableHeap
mu sync.Mutex
}
// rpcPriceTableHeap is a min heap of rpc price tables
rpcPriceTableHeap []*hostRPCPriceTable
// hostRPCPriceTable is a helper struct that wraps a price table alongside
// its creation timestamp. We need this, in combination with the price
// table's validity to figure out when to consider the price table to be
// expired.
hostRPCPriceTable struct {
modules.RPCPriceTable
creation time.Time
}
)
// Expiry returns the time at which the price table is considered to be expired
func (hpt *hostRPCPriceTable) Expiry() time.Time {
return hpt.creation.Add(hpt.Validity)
}
// PopExpired returns the UIDs for all rpc price tables that have expired
func (pth *priceTableHeap) PopExpired() (expired []modules.UniqueID) {
pth.mu.Lock()
defer pth.mu.Unlock()
now := time.Now()
for pth.heap.Len() > 0 {
pt := heap.Pop(&pth.heap).(*hostRPCPriceTable)
if now.Before(pt.Expiry()) {
heap.Push(&pth.heap, pt)
break
}
expired = append(expired, pt.UID)
}
return
}
// Push will add a price table to the heap.
func (pth *priceTableHeap) Push(pt *hostRPCPriceTable) {
pth.mu.Lock()
defer pth.mu.Unlock()
heap.Push(&pth.heap, pt)
}
// Implementation of heap.Interface for rpcPriceTableHeap.
func (pth rpcPriceTableHeap) Len() int { return len(pth) }
func (pth rpcPriceTableHeap) Less(i, j int) bool {
return pth[i].Expiry().Before(pth[j].Expiry())
}
func (pth rpcPriceTableHeap) Swap(i, j int) { pth[i], pth[j] = pth[j], pth[i] }
func (pth *rpcPriceTableHeap) Push(x interface{}) {
pt := x.(*hostRPCPriceTable)
*pth = append(*pth, pt)
}
func (pth *rpcPriceTableHeap) Pop() interface{} {
old := *pth
n := len(old)
pt := old[n-1]
*pth = old[0 : n-1]
return pt
}
// managedRPCUpdatePriceTable returns a copy of the host's current rpc price
// table. These prices are valid for the duration of the
// rpcPriceGuaranteePeriod, which is defined by the price table's Expiry
......@@ -27,9 +98,9 @@ func (h *Host) managedRPCUpdatePriceTable(stream siamux.Stream) error {
// copy the host's price table and give it a random UID
pt := h.staticPriceTables.managedCurrent()
fastrand.Read(pt.UID[:])
// update the epxiry to ensure prices are guaranteed for the duration of the
// rpcPriceGuaranteePeriod
pt.Expiry = time.Now().Add(rpcPriceGuaranteePeriod).Unix()
// set the validity to signal how long these prices are guaranteed for
pt.Validity = rpcPriceGuaranteePeriod
// set the host's current blockheight, this allows the renter to create
// valid withdrawal messages in case it is not synced yet
......@@ -64,7 +135,7 @@ func (h *Host) managedRPCUpdatePriceTable(stream siamux.Stream) error {
// after payment has been received, track the price table in the host's list
// of price tables and signal the renter we consider the price table valid
h.staticPriceTables.managedTrack(&pt)
h.staticPriceTables.managedTrack(&hostRPCPriceTable{pt, time.Now()})
var tracked modules.RPCTrackedPriceTableResponse
if err = modules.RPCWrite(stream, tracked); err != nil {
return errors.AddContext(err, "Failed to signal renter we tracked the price table")
......@@ -78,3 +149,27 @@ func (h *Host) managedRPCUpdatePriceTable(stream siamux.Stream) error {
}
return nil
}
// staticReadPriceTableID receives a stream and reads the price table's UID from
// it, if it's a known UID we return the price table
func (h *Host) staticReadPriceTableID(stream siamux.Stream) (*modules.RPCPriceTable, error) {
// read the price table uid
var uid modules.UniqueID
err := modules.RPCRead(stream, &uid)
if err != nil {
return nil, errors.AddContext(err, "Failed to read price table UID")
}
// check if we know the uid, if we do return it
var found bool
pt, found := h.staticPriceTables.managedGet(uid)
if !found {
return nil, ErrPriceTableNotFound
}
// make sure the table isn't expired.
if time.Now().After(pt.Expiry()) {
return nil, ErrPriceTableExpired
}
return &pt.RPCPriceTable, nil
}
......@@ -19,7 +19,7 @@ import (
// TestPriceTableMarshaling tests a PriceTable can be marshaled and unmarshaled
func TestPriceTableMarshaling(t *testing.T) {
pt := modules.RPCPriceTable{
Expiry: time.Now().Add(rpcPriceGuaranteePeriod).Unix(),
Validity: rpcPriceGuaranteePeriod,
HostBlockHeight: types.BlockHeight(fastrand.Intn(1e3)),
UpdatePriceTableCost: types.SiacoinPrecision,
InitBaseCost: types.SiacoinPrecision.Mul64(1e2),
......@@ -52,30 +52,44 @@ func TestPriceTableMinHeap(t *testing.T) {
t.Parallel()
now := time.Now()
pth := priceTableHeap{heap: make([]*modules.RPCPriceTable, 0)}
pth := priceTableHeap{heap: make([]*hostRPCPriceTable, 0)}
// add 4 price tables (out of order) that expire somewhere in the future
pt1 := modules.RPCPriceTable{Expiry: now.Add(9 * time.Minute).Unix()}