Commit ced66128 authored by Matthew Sevey's avatar Matthew Sevey

Merge branch 'worker-cache-nonblocking' into 'master'

change worker cache update to be non blocking

See merge request !4549
parents 9acd244a 1fdd22a1
Pipeline #152540925 failed with stages
in 30 minutes and 55 seconds
- fixed issue where workers would freeze for a bit after a new block appeared
......@@ -28,6 +28,7 @@ require (
gitlab.com/NebulousLabs/threadgroup v0.0.0-20200527092543-afa01960408c
gitlab.com/NebulousLabs/writeaheadlog v0.0.0-20190814160017-69f300e9bcb8
golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b // indirect
golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f // indirect
)
......
......@@ -173,6 +173,7 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191105034135-c7e5f84aec59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20191107222254-f4817d981bb6/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200109152110-61a87790db17 h1:nVJ3guKA9qdkEQ3TUdXI9QSINo2CUPM/cySEvw2w8I0=
......@@ -186,6 +187,9 @@ golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 h1:XQyxROzUlZH+WIQwySDgnISgOivlhjIEwaQaJEJrrN0=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b h1:Wh+f8QHJXR411sJR8/vRBTZ7YapZaRvUcLFFJhusH0k=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
......@@ -201,6 +205,7 @@ golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAG
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
......@@ -221,6 +226,9 @@ golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd h1:/e+gpKk9r3dJobndpTytxS2gOy6m5uvpg+ISQoEcusQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7 h1:EBZoQjiKKPaLbPrbpssUfuHtwM6KV/vb4U85g/cigFY=
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
......
......@@ -59,6 +59,7 @@ type (
// atomicCache contains a pointer to the latest cache in the worker.
// Atomics are used to minimze lock contention on the worker object.
atomicCache unsafe.Pointer // points to a workerCache object
atomicCacheUpdating uint64 // ensures only one cache update happens at a time
atomicPriceTable unsafe.Pointer // points to a workerPriceTable object
atomicPriceTableUpdateRunning uint64 // used for a sanity check
......@@ -110,6 +111,39 @@ type (
}
)
// managedKill will kill the worker.
func (w *worker) managedKill() {
w.mu.Lock()
defer w.mu.Unlock()
select {
case <-w.killChan:
return
default:
close(w.killChan)
}
}
// staticKilled is a convenience function to determine if a worker has been
// killed or not.
func (w *worker) staticKilled() bool {
select {
case <-w.killChan:
return true
default:
return false
}
}
// staticWake will wake the worker from sleeping. This should be called any time
// that a job is queued or a job completes.
func (w *worker) staticWake() {
select {
case w.wakeChan <- struct{}{}:
default:
}
}
// status returns the status of the worker.
func (w *worker) status() modules.WorkerStatus {
downloadOnCoolDown := w.onDownloadCooldown()
......@@ -204,28 +238,9 @@ func (r *Renter) newWorker(hostPubKey types.SiaPublicKey) (*worker, error) {
w.initJobUploadSnapshotQueue()
// Get the worker cache set up before returning the worker. This prevents a
// race condition in some tests.
if !w.staticTryUpdateCache() {
return nil, errors.New("unable to build cache for worker")
w.managedUpdateCache()
if w.staticCache() == nil {
return nil, errors.New("unable to build a cache for the worker")
}
return w, nil
}
// staticKilled is a convenience function to determine if a worker has been
// killed or not.
func (w *worker) staticKilled() bool {
select {
case <-w.killChan:
return true
default:
return false
}
}
// staticWake will wake the worker from sleeping. This should be called any time
// that a job is queued or a job completes.
func (w *worker) staticWake() {
select {
case w.wakeChan <- struct{}{}:
default:
}
}
......@@ -35,24 +35,40 @@ type (
}
)
// staticUpdatedCache performs the actual worker cache update.
func (w *worker) staticUpdatedCache() *workerCache {
// managedUpdateCache performs the actual worker cache update. The function is
// managed because it calls exported functions on the hostdb and on the
// consensus set.
//
// NOTE: The concurrency around the atomicCacheUpdating value is a little bit
// annoying. You can't just use 'defer atmoic.StoreUint64()` because you need to
// update the value before calling tg.AfterFunc at the end of the function.
func (w *worker) managedUpdateCache() {
// Check if there is already a cache update in progress. If not, atomically
// signal that a cache update is in progress.
if !atomic.CompareAndSwapUint64(&w.atomicCacheUpdating, 0, 1) {
return
}
// Grab the host to check the version.
host, ok, err := w.renter.hostDB.Host(w.staticHostPubKey)
if !ok || err != nil {
w.renter.log.Printf("Worker %v could not update the cache, hostdb found host %v, with error: %v", w.staticHostPubKeyStr, ok, err)
return nil
w.renter.log.Printf("Worker %v could not update the cache, hostdb found host %v, with error: %v, worker being killed", w.staticHostPubKeyStr, ok, err)
w.managedKill()
atomic.StoreUint64(&w.atomicCacheUpdating, 0)
return
}
// Grab the renter contract from the host contractor.
renterContract, exists := w.renter.hostContractor.ContractByPublicKey(w.staticHostPubKey)
if !exists {
w.renter.log.Printf("Worker %v could not update the cache, host not found in contractor", w.staticHostPubKeyStr)
return nil
w.renter.log.Printf("Worker %v could not update the cache, host not found in contractor, worker being killed", w.staticHostPubKeyStr)
w.managedKill()
atomic.StoreUint64(&w.atomicCacheUpdating, 0)
return
}
// Create the cache object.
return &workerCache{
newCache := &workerCache{
staticBlockHeight: w.renter.cs.Height(),
staticContractID: renterContract.ID,
staticContractUtility: renterContract.Utility,
......@@ -61,34 +77,37 @@ func (w *worker) staticUpdatedCache() *workerCache {
staticLastUpdate: time.Now(),
}
// Atomically store the cache object in the worker.
ptr := unsafe.Pointer(newCache)
atomic.StorePointer(&w.atomicCache, ptr)
// Wake the worker when the cache needs to be updated again. Note that we
// need to signal the cache update is complete before waking the worker,
// just in case a bizarre race condition means that the worker wakes
// immediately, then sees that an update is in progress, then fails to
// update its cache.
atomic.StoreUint64(&w.atomicCacheUpdating, 0)
w.renter.tg.AfterFunc(workerCacheUpdateFrequency, func() {
w.staticWake()
})
}
// staticTryUpdateCache will perform a cache update on the worker.
//
// 'false' will be returned if the cache cannot be updated, signaling that the
// worker should exit.
func (w *worker) staticTryUpdateCache() bool {
// Check if an update is necessary. If not, return success.
func (w *worker) staticTryUpdateCache() {
// Check if an update is necessary.
cache := w.staticCache()
if cache != nil && time.Since(cache.staticLastUpdate) < workerCacheUpdateFrequency {
return true
}
// Get the new cache.
newCache := w.staticUpdatedCache()
if newCache == nil {
return false
return
}
// Wake the worker when the cache needs to be updated again.
w.renter.tg.AfterFunc(workerCacheUpdateFrequency, func() {
w.staticWake()
})
// Atomically store the cache object in the worker.
ptr := unsafe.Pointer(newCache)
atomic.StorePointer(&w.atomicCache, ptr)
return true
// Get the new cache in a goroutine. This is because the cache update grabs
// a lock on the consensus object, which can sometimes take a while if there
// are new blocks being processed or a reorg being processed.
w.renter.tg.Launch(w.managedUpdateCache)
}
// staticCache returns the current worker cache object.
......
......@@ -292,12 +292,7 @@ func (w *worker) threadedWorkLoop() {
if !w.managedBlockUntilReady() {
return
}
// Update the cache for the worker if needed.
if !w.staticTryUpdateCache() {
w.renter.log.Printf("worker %v is being killed because the cache could not be updated", w.staticHostPubKeyStr)
return
}
w.staticTryUpdateCache()
// Attempt to launch a serial job. If there is already a job running,
// this will no-op. If no job is running, a goroutine will be spun up
......
......@@ -93,16 +93,10 @@ func (wp *workerPool) callUpdate() {
wp.workers[id] = w
// Start the work loop in a separate goroutine
go func() {
// We have to call tg.Add inside of the goroutine because we are
// holding the workerpool's mutex lock and it's not permitted to
// call tg.Add while holding a lock.
if err := wp.renter.tg.Add(); err != nil {
return
}
defer wp.renter.tg.Done()
w.threadedWorkLoop()
}()
err = wp.renter.tg.Launch(w.threadedWorkLoop)
if err != nil {
return
}
}
// Remove a worker for any worker that is not in the set of new contracts.
......@@ -117,7 +111,9 @@ func (wp *workerPool) callUpdate() {
_, exists := contractMap[id]
if !exists {
delete(wp.workers, id)
close(worker.killChan)
// Kill the worker in a goroutine. This avoids locking issues, as
// wp.mu is currently locked.
go worker.managedKill()
}
}
}
......@@ -167,7 +163,9 @@ func (r *Renter) newWorkerPool() *workerPool {
wp.renter.tg.OnStop(func() error {
wp.mu.RLock()
for _, w := range wp.workers {
close(w.killChan)
// Kill the worker in a goroutine. This avoids locking issues, as
// wp.mu is currently read locked.
go w.managedKill()
}
wp.mu.RUnlock()
return nil
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment