Commit 9acd244a authored by Christopher Schinnerl's avatar Christopher Schinnerl

Merge branch 'pt-duration' into 'master'

Make Expiry on the PriceTable a duration

Closes #4201

See merge request !4512
parents 14224558 ff408e3d
Pipeline #152509189 failed with stages
in 29 minutes and 27 seconds
......@@ -64,7 +64,6 @@ package host
// TODO: update_test.go has commented out tests.
import (
"container/heap"
"errors"
"fmt"
"net"
......@@ -111,7 +110,7 @@ var (
rpcPriceGuaranteePeriod = build.Select(build.Var{
Standard: 10 * time.Minute,
Dev: 5 * time.Minute,
Testing: 15 * time.Second,
Testing: 1 * time.Minute,
}).(time.Duration)
// pruneExpiredRPCPriceTableFrequency is the frequency at which the host
......@@ -209,7 +208,7 @@ type Host struct {
// 'guaranteed' map.
type hostPrices struct {
current modules.RPCPriceTable
guaranteed map[modules.UniqueID]*modules.RPCPriceTable
guaranteed map[modules.UniqueID]*hostRPCPriceTable
staticMinHeap priceTableHeap
mu sync.RWMutex
}
......@@ -222,7 +221,7 @@ func (hp *hostPrices) managedCurrent() modules.RPCPriceTable {
}
// managedGet returns the price table with given uid
func (hp *hostPrices) managedGet(uid modules.UniqueID) (pt *modules.RPCPriceTable, found bool) {
func (hp *hostPrices) managedGet(uid modules.UniqueID) (pt *hostRPCPriceTable, found bool) {
hp.mu.RLock()
defer hp.mu.RUnlock()
pt, found = hp.guaranteed[uid]
......@@ -240,7 +239,7 @@ func (hp *hostPrices) managedSetCurrent(pt modules.RPCPriceTable) {
// managedTrack adds the given price table to the 'guaranteed' map, that holds
// all of the price tables the host has recently guaranteed to renters. It will
// also add it to the heap which facilates efficient pruning of that map.
func (hp *hostPrices) managedTrack(pt *modules.RPCPriceTable) {
func (hp *hostPrices) managedTrack(pt *hostRPCPriceTable) {
hp.mu.Lock()
hp.guaranteed[pt.UID] = pt
hp.mu.Unlock()
......@@ -277,61 +276,6 @@ type lockedObligation struct {
n uint
}
// priceTableHeap is a helper type that contains a min heap of rpc price tables,
// sorted on their expiry. The heap is guarded by its own mutex and allows for
// peeking at the min expiry.
type priceTableHeap struct {
heap rpcPriceTableHeap
mu sync.Mutex
}
// PopExpired returns the UIDs for all rpc price tables that have expired
func (pth *priceTableHeap) PopExpired() (expired []modules.UniqueID) {
pth.mu.Lock()
defer pth.mu.Unlock()
now := time.Now().Unix()
for {
if pth.heap.Len() == 0 {
return
}
pt := heap.Pop(&pth.heap)
if now < pt.(*modules.RPCPriceTable).Expiry {
heap.Push(&pth.heap, pt)
break
}
expired = append(expired, pt.(*modules.RPCPriceTable).UID)
}
return
}
// Push will add a price table to the heap.
func (pth *priceTableHeap) Push(pt *modules.RPCPriceTable) {
pth.mu.Lock()
defer pth.mu.Unlock()
heap.Push(&pth.heap, pt)
}
// rpcPriceTableHeap is a min heap of rpc price tables
type rpcPriceTableHeap []*modules.RPCPriceTable
// Implementation of heap.Interface for rpcPriceTableHeap.
func (pth rpcPriceTableHeap) Len() int { return len(pth) }
func (pth rpcPriceTableHeap) Less(i, j int) bool { return pth[i].Expiry < pth[j].Expiry }
func (pth rpcPriceTableHeap) Swap(i, j int) { pth[i], pth[j] = pth[j], pth[i] }
func (pth *rpcPriceTableHeap) Push(x interface{}) {
pt := x.(*modules.RPCPriceTable)
*pth = append(*pth, pt)
}
func (pth *rpcPriceTableHeap) Pop() interface{} {
old := *pth
n := len(old)
pt := old[n-1]
*pth = old[0 : n-1]
return pt
}
// checkUnlockHash will check that the host has an unlock hash. If the host
// does not have an unlock hash, an attempt will be made to get an unlock hash
// from the wallet. That may fail due to the wallet being locked, in which case
......@@ -376,11 +320,9 @@ func (h *Host) managedInternalSettings() modules.HostInternalSettings {
// managedUpdatePriceTable will recalculate the RPC costs and update the host's
// price table accordingly.
func (h *Host) managedUpdatePriceTable() {
// create a new RPC price table and set the expiry
// create a new RPC price table
es := h.managedExternalSettings()
priceTable := modules.RPCPriceTable{
Expiry: time.Now().Add(rpcPriceGuaranteePeriod).Unix(),
// TODO: hardcoded cost should be updated to use a better value.
AccountBalanceCost: types.NewCurrency64(1),
FundAccountCost: types.NewCurrency64(1),
......@@ -459,9 +401,9 @@ func newHost(dependencies modules.Dependencies, smDeps modules.Dependencies, cs
dependencies: dependencies,
lockedStorageObligations: make(map[types.FileContractID]*lockedObligation),
staticPriceTables: &hostPrices{
guaranteed: make(map[modules.UniqueID]*modules.RPCPriceTable),
guaranteed: make(map[modules.UniqueID]*hostRPCPriceTable),
staticMinHeap: priceTableHeap{
heap: make([]*modules.RPCPriceTable, 0),
heap: make([]*hostRPCPriceTable, 0),
},
},
persistDir: persistDir,
......
......@@ -28,6 +28,14 @@ import (
"gitlab.com/NebulousLabs/Sia/types"
)
const (
// priceTableExpiryBuffer defines a buffer period that ensures the price
// table is valid for at least as long as the buffer period when we consider
// it valid. This ensures a call to `managedFetchPriceTable` does not return
// a price table that expires the next second.
priceTableExpiryBuffer = 15 * time.Second
)
// A hostTester is the helper object for host testing, including helper modules
// and methods for controlling synchronization.
type (
......@@ -260,7 +268,9 @@ type renterHostPair struct {
staticRenterMux *siamux.SiaMux
staticHT *hostTester
pt *modules.RPCPriceTable
pt *modules.RPCPriceTable
ptExpiry time.Time // keep track of when the price table is set to expire
mu sync.Mutex
}
......@@ -475,20 +485,16 @@ func (p *renterHostPair) managedExecuteProgram(epr modules.RPCExecuteProgramRequ
// managedFetchPriceTable returns the latest price table, if that price table is
// expired it will fetch a new one from the host.
func (p *renterHostPair) managedFetchPriceTable() (*modules.RPCPriceTable, error) {
// fetch a new pricetable if it's about to expire, rather than the second it
// expires. This ensures calls performed immediately after
// `managedFetchPriceTable` is called are set to succeed.
var expiryBuffer int64 = 3
p.mu.Lock()
expired := time.Now().Add(priceTableExpiryBuffer).After(p.ptExpiry)
p.mu.Unlock()
pt := p.managedPriceTable()
if pt.Expiry <= time.Now().Unix()+expiryBuffer {
err := p.managedUpdatePriceTable(true)
if err != nil {
if expired {
if err := p.managedUpdatePriceTable(true); err != nil {
return nil, err
}
return p.managedPriceTable(), nil
}
return pt, nil
return p.managedPriceTable(), nil
}
// managedFundEphemeralAccount will deposit the given amount in the pair's
......@@ -761,6 +767,7 @@ func (p *renterHostPair) managedUpdatePriceTable(payByFC bool) error {
// update the price table
p.mu.Lock()
p.pt = &pt
p.ptExpiry = time.Now().Add(pt.Validity)
p.mu.Unlock()
return nil
......@@ -792,9 +799,6 @@ func TestHostInitialization(t *testing.T) {
if reflect.DeepEqual(ht.host.staticPriceTables.current, modules.RPCPriceTable{}) {
t.Fatal("RPC price table wasn't initialized")
}
if ht.host.staticPriceTables.current.Expiry == 0 {
t.Fatal("RPC price table was not properly initialised")
}
}
// TestHostMultiClose checks that the host returns an error if Close is called
......
......@@ -11,7 +11,8 @@ import (
// newTestWriteStorePriceTable returns a custom price table for the cost tests.
func newTestWriteStorePriceTable() *modules.RPCPriceTable {
pt := &modules.RPCPriceTable{}
pt.Expiry = time.Now().Add(time.Minute).Unix()
pt.Validity = time.Minute
pt.WriteBaseCost = types.ZeroCurrency
pt.WriteLengthCost = types.ZeroCurrency
pt.WriteStoreCost = modules.DefaultStoragePrice
......
......@@ -125,7 +125,8 @@ func (so *TestStorageObligation) Update(sectorRoots []crypto.Hash, sectorsRemove
// for every operation/rpc.
func newTestPriceTable() *modules.RPCPriceTable {
return &modules.RPCPriceTable{
Expiry: time.Now().Add(time.Minute).Unix(),
Validity: time.Minute,
UpdatePriceTableCost: types.NewCurrency64(1),
InitBaseCost: types.NewCurrency64(1),
MemoryTimeCost: types.NewCurrency64(1),
......
......@@ -341,28 +341,6 @@ func (h *Host) threadedHandleConn(conn net.Conn) {
}
}
// staticReadPriceTableID receives a stream and reads the price table's UID from
// it, if it's a known UID we return the price table
func (h *Host) staticReadPriceTableID(stream siamux.Stream) (*modules.RPCPriceTable, error) {
// read the price table uid
var uid modules.UniqueID
err := modules.RPCRead(stream, &uid)
if err != nil {
return nil, errors.AddContext(err, "Failed to read price table UID")
}
// check if we know the uid, if we do return it
var found bool
pt, found := h.staticPriceTables.managedGet(uid)
if !found {
return nil, ErrPriceTableNotFound
}
// make sure the table isn't expired.
if pt.Expiry < time.Now().Unix() {
return nil, ErrPriceTableExpired
}
return pt, nil
}
// threadedHandleStream handles incoming SiaMux streams.
func (h *Host) threadedHandleStream(stream siamux.Stream) {
// close the stream when the method terminates
......@@ -424,36 +402,6 @@ func (h *Host) threadedHandleStream(stream siamux.Stream) {
}
}
// staticGetPriceTable receives a stream and reads the price table's UID from
// it, if it's a known UID we return the price table.
//
// NOTE: the price table UID is sent on every RPC call, this to ensure both
// renter and host use the same pricing. If we were to keep the price table
// related to the incoming stream (and thus its mux) as state, we would
// introduce race conditions where host and renter use different price tables
// within the context of a single RPC request. Having the renter specify it on
// every request avoids the possiblity of these race conditions.
func (h *Host) staticGetPriceTable(stream siamux.Stream) (*modules.RPCPriceTable, error) {
// read the price table uid
var uid modules.UniqueID
err := modules.RPCRead(stream, &uid)
if err != nil {
return nil, errors.AddContext(err, "Failed to read price table UID")
}
// check if we know the uid, if we do return it
pt, exists := h.staticPriceTables.managedGet(uid)
if !exists {
return nil, errors.New("Price table not found, it might be expired")
}
// check if it's still valid or if it has expired
if pt.Expiry < time.Now().Unix() {
return nil, errors.New("Price table expired")
}
return pt, nil
}
// threadedListen listens for incoming RPCs and spawns an appropriate handler for each.
func (h *Host) threadedListen(closeChan chan struct{}) {
defer close(closeChan)
......
package host
import (
"container/heap"
"encoding/json"
"fmt"
"sync"
"time"
"gitlab.com/NebulousLabs/Sia/modules"
......@@ -14,13 +16,82 @@ import (
var (
// ErrPriceTableNotFound is returned when the price table for a certain UID
// can not be found in the tracked price tables
ErrPriceTableNotFound = errors.New("Price table not found, it might be expired")
ErrPriceTableNotFound = errors.New("Price table not found")
// ErrPriceTableExpired is returned when the specified price table has
// expired
ErrPriceTableExpired = errors.New("Price table requested is expired")
)
type (
// priceTableHeap is a helper type that contains a min heap of rpc price
// tables, sorted on their expiry. The heap is guarded by its own mutex and
// allows for peeking at the min expiry.
priceTableHeap struct {
heap rpcPriceTableHeap
mu sync.Mutex
}
// rpcPriceTableHeap is a min heap of rpc price tables
rpcPriceTableHeap []*hostRPCPriceTable
// hostRPCPriceTable is a helper struct that wraps a price table alongside
// its creation timestamp. We need this, in combination with the price
// table's validity to figure out when to consider the price table to be
// expired.
hostRPCPriceTable struct {
modules.RPCPriceTable
creation time.Time
}
)
// Expiry returns the time at which the price table is considered to be expired
func (hpt *hostRPCPriceTable) Expiry() time.Time {
return hpt.creation.Add(hpt.Validity)
}
// PopExpired returns the UIDs for all rpc price tables that have expired
func (pth *priceTableHeap) PopExpired() (expired []modules.UniqueID) {
pth.mu.Lock()
defer pth.mu.Unlock()
now := time.Now()
for pth.heap.Len() > 0 {
pt := heap.Pop(&pth.heap).(*hostRPCPriceTable)
if now.Before(pt.Expiry()) {
heap.Push(&pth.heap, pt)
break
}
expired = append(expired, pt.UID)
}
return
}
// Push will add a price table to the heap.
func (pth *priceTableHeap) Push(pt *hostRPCPriceTable) {
pth.mu.Lock()
defer pth.mu.Unlock()
heap.Push(&pth.heap, pt)
}
// Implementation of heap.Interface for rpcPriceTableHeap.
func (pth rpcPriceTableHeap) Len() int { return len(pth) }
func (pth rpcPriceTableHeap) Less(i, j int) bool {
return pth[i].Expiry().Before(pth[j].Expiry())
}
func (pth rpcPriceTableHeap) Swap(i, j int) { pth[i], pth[j] = pth[j], pth[i] }
func (pth *rpcPriceTableHeap) Push(x interface{}) {
pt := x.(*hostRPCPriceTable)
*pth = append(*pth, pt)
}
func (pth *rpcPriceTableHeap) Pop() interface{} {
old := *pth
n := len(old)
pt := old[n-1]
*pth = old[0 : n-1]
return pt
}
// managedRPCUpdatePriceTable returns a copy of the host's current rpc price
// table. These prices are valid for the duration of the
// rpcPriceGuaranteePeriod, which is defined by the price table's Expiry
......@@ -28,9 +99,9 @@ func (h *Host) managedRPCUpdatePriceTable(stream siamux.Stream) error {
// copy the host's price table and give it a random UID
pt := h.staticPriceTables.managedCurrent()
fastrand.Read(pt.UID[:])
// update the epxiry to ensure prices are guaranteed for the duration of the
// rpcPriceGuaranteePeriod
pt.Expiry = time.Now().Add(rpcPriceGuaranteePeriod).Unix()
// set the validity to signal how long these prices are guaranteed for
pt.Validity = rpcPriceGuaranteePeriod
// set the host's current blockheight, this allows the renter to create
// valid withdrawal messages in case it is not synced yet
......@@ -69,7 +140,7 @@ func (h *Host) managedRPCUpdatePriceTable(stream siamux.Stream) error {
// after payment has been received, track the price table in the host's list
// of price tables and signal the renter we consider the price table valid
h.staticPriceTables.managedTrack(&pt)
h.staticPriceTables.managedTrack(&hostRPCPriceTable{pt, time.Now()})
var tracked modules.RPCTrackedPriceTableResponse
if err = modules.RPCWrite(stream, tracked); err != nil {
return errors.AddContext(err, "Failed to signal renter we tracked the price table")
......@@ -83,3 +154,27 @@ func (h *Host) managedRPCUpdatePriceTable(stream siamux.Stream) error {
}
return nil
}
// staticReadPriceTableID receives a stream and reads the price table's UID from
// it, if it's a known UID we return the price table
func (h *Host) staticReadPriceTableID(stream siamux.Stream) (*modules.RPCPriceTable, error) {
// read the price table uid
var uid modules.UniqueID
err := modules.RPCRead(stream, &uid)
if err != nil {
return nil, errors.AddContext(err, "Failed to read price table UID")
}
// check if we know the uid, if we do return it
var found bool
pt, found := h.staticPriceTables.managedGet(uid)
if !found {
return nil, ErrPriceTableNotFound
}
// make sure the table isn't expired.
if time.Now().After(pt.Expiry()) {
return nil, ErrPriceTableExpired
}
return &pt.RPCPriceTable, nil
}
......@@ -19,7 +19,7 @@ import (
// TestPriceTableMarshaling tests a PriceTable can be marshaled and unmarshaled
func TestPriceTableMarshaling(t *testing.T) {
pt := modules.RPCPriceTable{
Expiry: time.Now().Add(rpcPriceGuaranteePeriod).Unix(),
Validity: rpcPriceGuaranteePeriod,
HostBlockHeight: types.BlockHeight(fastrand.Intn(1e3)),
UpdatePriceTableCost: types.SiacoinPrecision,
InitBaseCost: types.SiacoinPrecision.Mul64(1e2),
......@@ -52,30 +52,44 @@ func TestPriceTableMinHeap(t *testing.T) {
t.Parallel()
now := time.Now()
pth := priceTableHeap{heap: make([]*modules.RPCPriceTable, 0)}
pth := priceTableHeap{heap: make([]*hostRPCPriceTable, 0)}
// add 4 price tables (out of order) that expire somewhere in the future
pt1 := modules.RPCPriceTable{Expiry: now.Add(9 * time.Minute).Unix()}
pt2 := modules.RPCPriceTable{Expiry: now.Add(-3 * time.Minute).Unix()}
pt3 := modules.RPCPriceTable{Expiry: now.Add(-6 * time.Minute).Unix()}
pt4 := modules.RPCPriceTable{Expiry: now.Add(-1 * time.Minute).Unix()}
pt1 := hostRPCPriceTable{
modules.RPCPriceTable{Validity: rpcPriceGuaranteePeriod},
now.Add(-rpcPriceGuaranteePeriod),
}
pth.Push(&pt1)
pt2 := hostRPCPriceTable{
modules.RPCPriceTable{Validity: rpcPriceGuaranteePeriod},
now,
}
pth.Push(&pt2)
pt3 := hostRPCPriceTable{
modules.RPCPriceTable{Validity: rpcPriceGuaranteePeriod},
now.Add(-3 * rpcPriceGuaranteePeriod),
}
pth.Push(&pt3)
pt4 := hostRPCPriceTable{
modules.RPCPriceTable{Validity: rpcPriceGuaranteePeriod},
now.Add(-2 * rpcPriceGuaranteePeriod),
}
pth.Push(&pt4)
// verify it considers 3 to be expired if we pass it a threshold 7' from now
// verify it expires 3 of them
expired := pth.PopExpired()
if len(expired) != 3 {
t.Fatalf("Expected 3 price tables to be expired, yet managedExpired returned %d price tables", len(expired))
t.Fatalf("Unexpected amount of price tables expired, expected %v, received %d", 3, len(expired))
}
// verify 'pop' returns the last remaining price table
pth.mu.Lock()
expectedPt1 := heap.Pop(&pth.heap)
expectedPt2 := heap.Pop(&pth.heap)
pth.mu.Unlock()
if expectedPt1 != &pt1 {
t.Fatal("Expected the last price table to be equal to pt1, which is the price table with the highest expiry")
if expectedPt2 != &pt2 {
t.Fatal("Expected the last price table to be equal to pt2, which is the price table with the highest expiry")
}
}
......@@ -111,8 +125,7 @@ func TestPruneExpiredPriceTables(t *testing.T) {
t.Fatal("Expected the testing price table to be tracked but isn't")
}
// sleep for the duration of the expiry frequency, seeing as that is greater
// than the price guarantee period, it is the worst case
// retry until the price table expired and got pruned
err = build.Retry(10, pruneExpiredRPCPriceTableFrequency, func() error {
_, exists := ht.host.staticPriceTables.managedGet(pt.UID)
if exists {
......@@ -208,9 +221,9 @@ func TestUpdatePriceTableRPC(t *testing.T) {
if !tracked {
t.Fatalf("Expected price table with.UID %v to be tracked after successful update", pt.UID)
}
// ensure its expiry is in the future
if pt.Expiry <= time.Now().Unix() {
t.Fatal("Expected price table expiry to be in the future")
// ensure its validity is positive and different from zero
if pt.Validity.Seconds() <= 0 {
t.Fatal("Expected price table validity to be positive and non zero")
}
// ensure it contains the host's block height
......
......@@ -10,19 +10,6 @@ import (
"gitlab.com/NebulousLabs/Sia/modules"
)
// updateTimeInterval defines the amount of time after which we'll update the
// host's prices. This is a temporary variable and will be replaced when we add
// a duration to the host's price table. For now it's just half of the
// rpcPriceGuaranteePeriod set on the host
//
// TODO: Need to switch to setting the price table update based on the host
// timeout instead.
var updateTimeInterval = build.Select(build.Var{
Standard: 5 * time.Minute,
Dev: 3 * time.Minute,
Testing: 7 * time.Second,
}).(time.Duration)
type (
// workerPriceTable contains a price table and some information related to
// retrieving the next update.
......@@ -30,6 +17,9 @@ type (
// The actual price table.
staticPriceTable modules.RPCPriceTable
// The time at which the price table expires.
staticExpiryTime time.Time
// The next time that the worker should try to update the price table.
staticUpdateTime time.Time
......@@ -83,7 +73,7 @@ func (w *worker) staticSetPriceTable(pt *workerPriceTable) {
// before the current time, and the price table expiry defaults to the zero
// time.
func (wpt *workerPriceTable) staticValid() bool {
return wpt.staticPriceTable.Expiry > time.Now().Unix()
return time.Now().Before(wpt.staticExpiryTime)
}
// managedUpdatePriceTable performs the UpdatePriceTableRPC on the host.
......@@ -128,6 +118,7 @@ func (w *worker) staticUpdatePriceTable() {
// table, need to make a new one.
pt := &workerPriceTable{
staticPriceTable: currentPT.staticPriceTable,
staticExpiryTime: currentPT.staticExpiryTime,
staticUpdateTime: cooldownUntil(currentPT.staticConsecutiveFailures),
staticConsecutiveFailures: currentPT.staticConsecutiveFailures + 1,
staticRecentErr: err,
......@@ -193,12 +184,21 @@ func (w *worker) staticUpdatePriceTable() {
return
}
// Calculate the expiry time and set the update time to be half of the
// expiry window to ensure we update the PT before it expires
now := time.Now()
expiryTime := now.Add(pt.Validity)
expiryHalfTimeInS := (expiryTime.Unix() - now.Unix()) / 2
expiryHalfTime := time.Duration(expiryHalfTimeInS) * time.Second
newUpdateTime := time.Now().Add(expiryHalfTime)
// Update the price table. We preserve the recent error even though there
// has not been an error for debugging purposes, if there has been an error
// previously the devs like to be able to see what it was.
wpt := &workerPriceTable{
staticPriceTable: pt,
staticUpdateTime: time.Now().Add(updateTimeInterval),
staticExpiryTime: expiryTime,
staticUpdateTime: newUpdateTime,
staticConsecutiveFailures: 0,
staticRecentErr: currentPT.staticRecentErr,
}
......
......@@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"io"
"time"
"gitlab.com/NebulousLabs/Sia/crypto"
"gitlab.com/NebulousLabs/Sia/encoding"
......@@ -17,9 +18,9 @@ type RPCPriceTable struct {
// UID is a specifier that uniquely identifies this price table
UID UniqueID `json:"uid"`
// Expiry is a unix timestamp that specifies the time until which the
// MDMCostTable is valid.
Expiry int64 `json:"expiry"`
// Validity is a duration that specifies how long the host guarantees these
// prices for and are thus considered valid.
Validity time.Duration `json:"validity"`
// HostBlockHeight is the block height of the host. This allows the renter
// to create valid withdrawal messages in case it is not synced yet.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment