Commit f61c0ae0 authored by David Vorick's avatar David Vorick

migrate tpool and consensus set to demoting locks

parent eeebfe18
......@@ -6,6 +6,7 @@ all: install
# Sia.
dependencies:
go install -race std
go get -u github.com/NebulousLabs/demotemutex
go get -u github.com/NebulousLabs/ed25519
go get -u github.com/NebulousLabs/entropy-mnemonics
go get -u github.com/NebulousLabs/go-upnp
......
......@@ -133,7 +133,7 @@ func (cs *ConsensusSet) addBlockToTree(b types.Block) (revertedBlocks, appliedBl
// it will be relayed to connected peers. This function should only be called
// for new, untrusted blocks.
func (cs *ConsensusSet) AcceptBlock(b types.Block) error {
lockID := cs.mu.Lock()
cs.mu.Lock()
// Start verification inside of a bolt View tx.
err := cs.db.View(func(tx *bolt.Tx) error {
......@@ -152,7 +152,7 @@ func (cs *ConsensusSet) AcceptBlock(b types.Block) error {
return nil
})
if err != nil {
cs.mu.Unlock(lockID)
cs.mu.Unlock()
return err
}
......@@ -162,11 +162,11 @@ func (cs *ConsensusSet) AcceptBlock(b types.Block) error {
// the longest fork.
revertedBlocks, appliedBlocks, err := cs.addBlockToTree(b)
if err != nil {
cs.mu.Unlock(lockID)
cs.mu.Unlock()
return err
}
cs.mu.Demote()
defer cs.mu.RUnlock(lockID)
defer cs.mu.DemotedUnlock()
if len(appliedBlocks) > 0 {
cs.updateSubscribers(revertedBlocks, appliedBlocks)
}
......
......@@ -252,8 +252,6 @@ func TestExtremeFutureTimestampHandling(t *testing.T) {
// block still has not been added to the consensus set (prove that the
// block was correctly discarded).
time.Sleep(time.Second * time.Duration(3+types.ExtremeFutureThreshold))
lockID := cst.cs.mu.RLock()
defer cst.cs.mu.RUnlock(lockID)
_, err = cst.cs.dbGetBlockMap(solvedBlock.ID())
if err != errNilItem {
t.Error("extreme future block made it into the consensus set after waiting")
......@@ -315,9 +313,7 @@ func TestFutureTimestampHandling(t *testing.T) {
// Check that after waiting until the block is no longer too far in the
// future, the block gets added to the consensus set.
time.Sleep(time.Second * 3) // 3 seconds, as the block was originally 2 seconds too far into the future.
lockID := cst.cs.mu.RLock()
_, err = cst.cs.dbGetBlockMap(solvedBlock.ID())
cst.cs.mu.RUnlock(lockID)
if err == errNilItem {
t.Fatalf("future block was not added to the consensus set after waiting the appropriate amount of time")
}
......
......@@ -9,11 +9,11 @@ package consensus
import (
"errors"
"github.com/NebulousLabs/demotemutex"
"github.com/boltdb/bolt"
"github.com/NebulousLabs/Sia/modules"
"github.com/NebulousLabs/Sia/persist"
"github.com/NebulousLabs/Sia/sync"
"github.com/NebulousLabs/Sia/types"
)
......@@ -54,7 +54,7 @@ type ConsensusSet struct {
checkingConsistency bool
persistDir string
mu *sync.RWMutex
mu demotemutex.DemoteMutex
}
// New returns a new ConsensusSet, containing at least the genesis block. If
......@@ -89,7 +89,7 @@ func New(gateway modules.Gateway, persistDir string) (*ConsensusSet, error) {
dosBlocks: make(map[types.BlockID]struct{}),
persistDir: persistDir,
mu: sync.New(modules.SafeMutexDelay, 1),
mu: demotemutex.DemoteMutex{},
}
// Create the diffs for the genesis siafund outputs.
......@@ -133,8 +133,8 @@ func (cs *ConsensusSet) ChildTarget(id types.BlockID) (target types.Target, exis
// Close safely closes the block database.
func (cs *ConsensusSet) Close() error {
lockID := cs.mu.Lock()
defer cs.mu.Unlock(lockID)
cs.mu.Lock()
defer cs.mu.Unlock()
return cs.db.Close()
}
......
......@@ -119,8 +119,9 @@ func (cs *ConsensusSet) updateSubscribers(revertedBlocks []*processedBlock, appl
// ConsensusChange(5) will return the 6th consensus change that was issued to
// subscribers. ConsensusChanges can be assumed to be consecutive.
func (cs *ConsensusSet) ConsensusChange(i int) (cc modules.ConsensusChange, err error) {
id := cs.mu.RLock()
defer cs.mu.RUnlock(id)
cs.mu.RLock()
defer cs.mu.RUnlock()
err = cs.db.View(func(tx *bolt.Tx) error {
cc, err = cs.computeConsensusChange(tx, i)
return err
......@@ -134,7 +135,9 @@ func (cs *ConsensusSet) ConsensusChange(i int) (cc modules.ConsensusChange, err
// ConsensusSetSubscribe accepts a new subscriber who will receive a call to
// ProcessConsensusChange every time there is a change in the consensus set.
func (cs *ConsensusSet) ConsensusSetSubscribe(subscriber modules.ConsensusSetSubscriber) {
id := cs.mu.Lock()
cs.mu.Lock()
defer cs.mu.Unlock()
cs.subscribers = append(cs.subscribers, subscriber)
err := cs.db.View(func(tx *bolt.Tx) error {
for i := range cs.changeLog {
......@@ -149,5 +152,4 @@ func (cs *ConsensusSet) ConsensusSetSubscribe(subscriber modules.ConsensusSetSub
if build.DEBUG && err != nil {
panic(err)
}
cs.mu.Unlock(id)
}
......@@ -125,7 +125,7 @@ func (cs *ConsensusSet) sendBlocks(conn modules.PeerConn) error {
found := false
var start types.BlockHeight
var csHeight types.BlockHeight
lockID := cs.mu.RLock()
cs.mu.RLock()
err = cs.db.View(func(tx *bolt.Tx) error {
csHeight = blockHeight(tx)
for _, id := range knownBlocks {
......@@ -149,7 +149,7 @@ func (cs *ConsensusSet) sendBlocks(conn modules.PeerConn) error {
}
return nil
})
cs.mu.RUnlock(lockID)
cs.mu.RUnlock()
if err != nil {
return err
}
......@@ -171,7 +171,7 @@ func (cs *ConsensusSet) sendBlocks(conn modules.PeerConn) error {
for moreAvailable {
// Get the set of blocks to send.
var blocks []types.Block
lockID = cs.mu.RLock()
cs.mu.RLock()
cs.db.View(func(tx *bolt.Tx) error {
height := blockHeight(tx)
for i := start; i <= height && i < start+MaxCatchUpBlocks; i++ {
......@@ -189,7 +189,7 @@ func (cs *ConsensusSet) sendBlocks(conn modules.PeerConn) error {
start += MaxCatchUpBlocks
return nil
})
cs.mu.RUnlock(lockID)
cs.mu.RUnlock()
if err != nil {
return err
}
......
......@@ -184,7 +184,7 @@ func TestBlockHistory(t *testing.T) {
})
// validate history
lockID := cst.cs.mu.Lock()
cst.cs.mu.Lock()
// first 10 IDs are linear
for i := types.BlockHeight(0); i < 10; i++ {
id, err := cst.cs.dbGetPath(cst.cs.dbBlockHeight() - i)
......@@ -215,7 +215,7 @@ func TestBlockHistory(t *testing.T) {
t.Errorf("Wrong ID in history: expected %v, got %v", genesisID, history[31])
}
cst.cs.mu.Unlock(lockID)
cst.cs.mu.Unlock()
// remaining IDs should be empty
var emptyID types.BlockID
......
......@@ -14,10 +14,10 @@ var (
func init() {
if build.Release == "dev" {
SafeMutexDelay = 25 * time.Second
SafeMutexDelay = 40 * time.Second
} else if build.Release == "standard" {
SafeMutexDelay = 60 * time.Second
} else if build.Release == "testing" {
SafeMutexDelay = 10 * time.Second
SafeMutexDelay = 20 * time.Second
}
}
......@@ -278,8 +278,8 @@ func (tp *TransactionPool) acceptTransactionSet(ts []types.Transaction) error {
// transactions. If the transaction is accepted, it will be relayed to
// connected peers.
func (tp *TransactionPool) AcceptTransactionSet(ts []types.Transaction) error {
id := tp.mu.Lock()
defer tp.mu.Unlock(id)
tp.mu.Lock()
defer tp.mu.Unlock()
err := tp.acceptTransactionSet(ts)
if err != nil {
......
......@@ -35,7 +35,7 @@ func (tp *TransactionPool) updateSubscribersConsensus(cc modules.ConsensusChange
// Subscribers will receive all consensus set changes as well as transaction
// pool changes, and should not subscribe to both.
func (tp *TransactionPool) TransactionPoolSubscribe(subscriber modules.TransactionPoolSubscriber) {
lockID := tp.mu.Lock()
tp.mu.Lock()
tp.subscribers = append(tp.subscribers, subscriber)
for i := 0; i <= tp.consensusChangeIndex; i++ {
cc, err := tp.consensusSet.ConsensusChange(i)
......@@ -47,9 +47,9 @@ func (tp *TransactionPool) TransactionPoolSubscribe(subscriber modules.Transacti
// Release the lock between iterations to smooth out performance a bit
// - tpool does not need to hold the lock for 15,000 consensus change
// objects.
tp.mu.Unlock(lockID)
tp.mu.Unlock()
runtime.Gosched()
lockID = tp.mu.Lock()
tp.mu.Lock()
}
// Send the new subscriber the transaction pool set.
......@@ -62,5 +62,5 @@ func (tp *TransactionPool) TransactionPoolSubscribe(subscriber modules.Transacti
cc = cc.Append(tSetDiff)
}
subscriber.ReceiveUpdatedUnconfirmedTransactions(txns, cc)
tp.mu.Unlock(lockID)
tp.mu.Unlock()
}
......@@ -3,9 +3,10 @@ package transactionpool
import (
"errors"
"github.com/NebulousLabs/demotemutex"
"github.com/NebulousLabs/Sia/crypto"
"github.com/NebulousLabs/Sia/modules"
"github.com/NebulousLabs/Sia/sync"
"github.com/NebulousLabs/Sia/types"
)
......@@ -58,7 +59,7 @@ type (
consensusChangeIndex int
subscribers []modules.TransactionPoolSubscriber
mu *sync.RWMutex
mu demotemutex.DemoteMutex
}
)
......@@ -86,7 +87,7 @@ func New(cs modules.ConsensusSet, g modules.Gateway) (*TransactionPool, error) {
// change will then have an index of '0'.
consensusChangeIndex: -1,
mu: sync.New(modules.SafeMutexDelay, 5),
mu: demotemutex.DemoteMutex{},
}
// Register RPCs
......
......@@ -16,7 +16,7 @@ func (tp *TransactionPool) purge() {
// ProcessConsensusChange gets called to inform the transaction pool of changes
// to the consensus set.
func (tp *TransactionPool) ProcessConsensusChange(cc modules.ConsensusChange) {
lockID := tp.mu.Lock()
tp.mu.Lock()
// TODO: Right now, transactions that were reverted to not get saved and
// retried, because some transactions such as storage proofs might be
......@@ -66,12 +66,12 @@ func (tp *TransactionPool) ProcessConsensusChange(cc modules.ConsensusChange) {
tp.mu.Demote()
tp.updateSubscribersConsensus(cc)
tp.updateSubscribersTransactions()
tp.mu.RUnlock(lockID)
tp.mu.DemotedUnlock()
}
// PurgeTransactionPool deletes all transactions from the transaction pool.
func (tp *TransactionPool) PurgeTransactionPool() {
lockID := tp.mu.Lock()
tp.mu.Lock()
tp.purge()
tp.mu.Unlock(lockID)
tp.mu.Unlock()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment