Skip to content
Snippets Groups Projects
workerloop.go 14.06 KiB
package renter

import (
	"sync/atomic"
	"time"

	"gitlab.com/NebulousLabs/Sia/build"

	"gitlab.com/NebulousLabs/errors"
)

type (
	// workerLoopState tracks the state of the worker loop.
	workerLoopState struct {
		// Variables to count the number of jobs running. Note that these
		// variables can only be incremented in the primary work loop of the
		// worker, because there are blocking conditions within the primary work
		// loop that need to know only one thread is running at a time, and
		// safety is derived from knowing that no new threads are launching
		// while we are waiting for all existing threads to finish.
		//
		// These values can be decremented in a goroutine.
		atomicAsyncJobsRunning uint64
		atomicSerialJobRunning uint64

		// atomicSuspectRevisionMismatch indicates that the worker encountered
		// some error where it believes that it needs to resync its contract
		// with the host.
		atomicSuspectRevisionMismatch uint64

		// Variables to track the total amount of async data outstanding. This
		// indicates the total amount of data that we expect to use from async
		// jobs that we have submitted for the worker.
		atomicReadDataOutstanding  uint64
		atomicWriteDataOutstanding uint64

		// The read data limit and the write data limit define how much work is
		// allowed to be outstanding before new jobs will be blocked from being
		// launched async.
		atomicReadDataLimit  uint64
		atomicWriteDataLimit uint64
	}
)

// staticSerialJobRunning indicates whether a serial job is currently running
// for the worker.
func (wls *workerLoopState) staticSerialJobRunning() bool {
	return atomic.LoadUint64(&wls.atomicSerialJobRunning) == 1
}

// externLaunchSerialJob will launch a serial job for the worker, ensuring that
// exclusivity is handled correctly.
//
// The 'extern' indicates that this function is only allowed to be called from
// 'threadedWorkLoop', and it is expected that only one instance of
// 'threadedWorkLoop' is ever created per-worker.
func (w *worker) externLaunchSerialJob(job func()) {
	// Mark that there is now a job running. Only one job may be running at a
	// time.
	ok := atomic.CompareAndSwapUint64(&w.staticLoopState.atomicSerialJobRunning, 0, 1)
	if !ok {
		// There already is a job running. This is not allowed.
		w.renter.log.Critical("running a job when another job is already running")
	}

	fn := func() {
		// Execute the job in a goroutine.
		job()
		// After the job has executed, update to indicate that no serial job
		// is running.
		atomic.StoreUint64(&w.staticLoopState.atomicSerialJobRunning, 0)
		// After updating to indicate that no serial job is running, wake the
		// worker to check for a new serial job.
		w.staticWake()
	}
	err := w.renter.tg.Launch(fn)
	if err != nil {
		// Renter has closed, job will not be executed.
		atomic.StoreUint64(&w.staticLoopState.atomicSerialJobRunning, 0)
		return
	}
}

// externTryLaunchSerialJob will attempt to launch a serial job on the worker.
// Only one serial job is allowed to be running at a time (each serial job
// requires exclusive access to the worker's contract). If there is already a
// serial job running, nothing will happen.
//
// The 'extern' indicates that this function is only allowed to be called from
// 'threadedWorkLoop', and it is expected that only one instance of
// 'threadedWorkLoop' is ever created per-worker.
func (w *worker) externTryLaunchSerialJob() {
	// If there is already a serial job running, that job has exclusivity, do
	// nothing.
	if w.staticLoopState.staticSerialJobRunning() {
		return
	}

	// Perform a disrupt for testing. See the implementation in
	// workerloop_test.go for more info.
	if w.renter.deps.Disrupt("TestJobSerialExecution") {
		return
	}

	// Check every potential serial job that the worker may be required to
	// perform. This scheduling allows a flood of jobs earlier in the list to
	// starve out jobs later in the list. At some point we will probably
	// revisit this to try and address the starvation issue.
	if w.managedNeedsToUpdatePriceTable() {
		w.externLaunchSerialJob(w.staticUpdatePriceTable)
		return
	}
	if w.managedNeedsToRefillAccount() {
		w.externLaunchSerialJob(w.managedRefillAccount)
		return
	}
	job := w.staticJobDownloadSnapshotQueue.callNext()
	if job != nil {
		w.externLaunchSerialJob(job.callExecute)
		return
	}
	job = w.staticJobUploadSnapshotQueue.callNext()
	if job != nil {
		w.externLaunchSerialJob(job.callExecute)
		return
	}
	if w.managedHasDownloadJob() {
		w.externLaunchSerialJob(w.managedPerformDownloadChunkJob)
		return
	}
	if w.managedHasUploadJob() {
		w.externLaunchSerialJob(w.managedPerformUploadChunkJob)
		return
	}
}

// externLaunchAsyncJob accepts a function to retrieve a job and then uses that
// to retrieve a job and launch it. The bandwidth consumption will be updated as
// the job starts and finishes.
func (w *worker) externLaunchAsyncJob(job workerJob) bool {
	// Add the resource requirements to the worker loop state. Also add this
	// thread to the number of jobs running.
	uploadBandwidth, downloadBandwidth := job.callExpectedBandwidth()
	atomic.AddUint64(&w.staticLoopState.atomicReadDataOutstanding, downloadBandwidth)
	atomic.AddUint64(&w.staticLoopState.atomicWriteDataOutstanding, uploadBandwidth)
	atomic.AddUint64(&w.staticLoopState.atomicAsyncJobsRunning, 1)
	fn := func() {
		job.callExecute()
		// Subtract the outstanding data now that the job is complete. Atomic
		// subtraction works by adding and using some bit tricks.
		atomic.AddUint64(&w.staticLoopState.atomicReadDataOutstanding, -downloadBandwidth)
		atomic.AddUint64(&w.staticLoopState.atomicWriteDataOutstanding, -uploadBandwidth)
		atomic.AddUint64(&w.staticLoopState.atomicAsyncJobsRunning, ^uint64(0)) // subtract 1
		// Wake the worker to run any additional async jobs that may have been
		// blocked / ignored because there was not enough bandwidth available.
		w.staticWake()
	}
	err := w.renter.tg.Launch(fn)
	if err != nil {
		// Renter has closed, but we want to represent that the work was
		// processed anyway - returning true indicates that the worker should
		// continue processing jobs.
		atomic.AddUint64(&w.staticLoopState.atomicReadDataOutstanding, -downloadBandwidth)
		atomic.AddUint64(&w.staticLoopState.atomicWriteDataOutstanding, -uploadBandwidth)
		atomic.AddUint64(&w.staticLoopState.atomicAsyncJobsRunning, ^uint64(0)) // subtract 1
		return true
	}
	return true
}

// externTryLaunchAsyncJob will look at the async jobs which are in the worker
// queue and attempt to launch any that are ready. The job launcher will fail if
// the price table is out of date or if the worker account is empty.
//
// The job launcher will also fail if the worker has too much work in jobs
// already queued. Every time a job is launched, a bandwidth estimate is made.
// The worker will not allow more than a certain amount of bandwidth to be
// queued at once to prevent jobs from being spread too thin and sharing too
// much bandwidth.
func (w *worker) externTryLaunchAsyncJob() bool {
	// Verify that the worker has not reached its limits for doing multiple
	// jobs at once.
	readLimit := atomic.LoadUint64(&w.staticLoopState.atomicReadDataLimit)
	writeLimit := atomic.LoadUint64(&w.staticLoopState.atomicWriteDataLimit)
	readOutstanding := atomic.LoadUint64(&w.staticLoopState.atomicReadDataOutstanding)
	writeOutstanding := atomic.LoadUint64(&w.staticLoopState.atomicWriteDataOutstanding)
	if readOutstanding > readLimit || writeOutstanding > writeLimit {
		// Worker does not need to discard jobs, it is making progress, it's
		// just not launching any new jobs until its current jobs finish up.
		return false
	}

	// Perform a disrupt for testing. This is some code that ensures async job
	// launches are controlled correctly. The disrupt operates on a mock worker,
	// so it needs to happen after the ratelimit checks but before the cache,
	// price table, and account checks.
	if w.renter.deps.Disrupt("TestAsyncJobLaunches") {
		return true
	}

	// Hosts that do not support the async protocol cannot do async jobs.
	cache := w.staticCache()
	if build.VersionCmp(cache.staticHostVersion, minAsyncVersion) < 0 {
		w.managedDiscardAsyncJobs(errors.New("host version does not support async jobs"))
		return false
	}

	// A valid price table is required to perform async tasks.
	if !w.staticPriceTable().staticValid() {
		w.managedDiscardAsyncJobs(errors.New("price table with host is no longer valid"))
		return false
	}

	// RHP3 must not be on cooldown to perform async tasks.
	if w.managedOnMaintenanceCooldown() {
		w.managedDiscardAsyncJobs(errors.New("the worker account is on cooldown"))
		return false
	}

	// Check every potential async job that can be launched.
	job := w.staticJobHasSectorQueue.callNext()
	if job != nil {
		w.externLaunchAsyncJob(job)
		return true
	}
	// Check if registry jobs are supported.
	if build.VersionCmp(cache.staticHostVersion, minRegistryVersion) >= 0 {
		job = w.staticJobUpdateRegistryQueue.callNext()
		if job != nil {
			w.externLaunchAsyncJob(job)
			return true
		}
		job = w.staticJobReadRegistryQueue.callNext()
		if job != nil {
			w.externLaunchAsyncJob(job)
			return true
		}
	}
	job = w.staticJobReadQueue.callNext()
	if job != nil {
		w.externLaunchAsyncJob(job)
		return true
	}
	return false
}

// managedBlockUntilReady will block until the worker has internet connectivity.
// 'false' will be returned if a kill signal is received or if the renter is
// shut down before internet connectivity is restored. 'true' will be returned
// if internet connectivity is successfully restored.
func (w *worker) managedBlockUntilReady() bool {
	// Check internet connectivity. If the worker does not have internet
	// connectivity, block until connectivity is restored.
	for !w.renter.g.Online() {
		select {
		case <-w.renter.tg.StopChan():
			return false
		case <-w.killChan:
			return false
		case <-time.After(offlineCheckFrequency):
		}
	}
	return true
}

// managedDiscardAsyncJobs will drop all of the worker's async jobs because the
// worker has not met sufficient conditions to retain async jobs.
func (w *worker) managedDiscardAsyncJobs(err error) {
	w.staticJobHasSectorQueue.callDiscardAll(err)
	w.staticJobUpdateRegistryQueue.callDiscardAll(err)
	w.staticJobReadQueue.callDiscardAll(err)
}

// threadedWorkLoop is a perpetual loop run by the worker that accepts new jobs
// and performs them. Work is divided into two types of work, serial work and
// async work. Serial work requires exclusive access to the worker's contract,
// meaning that only one of these tasks can be performed at a time.  Async work
// can be performed with high parallelism.
func (w *worker) threadedWorkLoop() {
	// Perform a disrupt for testing.
	if w.renter.deps.Disrupt("DisableWorkerLoop") {
		return
	}

	// Upon shutdown, release all jobs.
	defer w.managedKillUploading()
	defer w.managedKillDownloading()
	defer w.staticJobHasSectorQueue.callKill()
	defer w.staticJobUpdateRegistryQueue.callKill()
	defer w.staticJobReadQueue.callKill()
	defer w.staticJobDownloadSnapshotQueue.callKill()
	defer w.staticJobUploadSnapshotQueue.callKill()

	if build.VersionCmp(w.staticCache().staticHostVersion, minAsyncVersion) >= 0 {
		// Ensure the renter's revision number of the underlying file contract
		// is in sync with the host's revision number. This check must happen at
		// the top as consecutive checks make use of the file contract for
		// payment.
		w.externTryFixRevisionMismatch()

		// The worker cannot execute any async tasks unless the price table of
		// the host is known, the balance of the worker account is known, and
		// the account has sufficient funds in it. This update is done as a
		// blocking update to ensure nothing else runs until the price table is
		// available.
		w.staticUpdatePriceTable()

		// Perform a balance check on the host and sync it to his version if
		// necessary. This avoids running into MaxBalanceExceeded errors upon
		// refill after an unclean shutdown.
		if w.staticPriceTable().staticValid() {
			w.externSyncAccountBalanceToHost()
		}

		// This update is done as a blocking update to ensure nothing else runs
		// until the account has filled.
		if w.managedNeedsToRefillAccount() {
			w.managedRefillAccount()
		}
	}

	// The worker will continuously perform jobs in a loop.
	for {
		// There are certain conditions under which the worker should either
		// block or exit. This function will block until those conditions are
		// met, returning 'true' when the worker can proceed and 'false' if the
		// worker should exit.
		if !w.managedBlockUntilReady() {
			return
		}

		// Try and fix a revision number mismatch if the flag is set. This will
		// be the case if other processes errored out with an error indicating a
		// mismatch.
		if w.staticSuspectRevisionMismatch() {
			w.externTryFixRevisionMismatch()
		}

		// Update the worker cache object, note that we do this after trying to
		// sync the revision as that might influence the contract, which is used
		// to build the cache object.
		w.staticTryUpdateCache()

		// If the worker needs to sync the account balance, perform a sync
		// operation. This should be attempted before launching any jobs.
		if w.managedNeedsToSyncAccountBalanceToHost() {
			w.externSyncAccountBalanceToHost()
		}

		// Attempt to launch a serial job. If there is already a job running,
		// this will no-op. If no job is running, a goroutine will be spun up
		// to run a job, this call is non-blocking.
		w.externTryLaunchSerialJob()

		// Attempt to launch an async job. If the async job launches
		// successfully, skip the blocking phase and attempt to launch another
		// async job.
		//
		// The worker will only allow a handful of async jobs to be running at
		// once, to protect the total usage of the network connection. The
		// worker wants to avoid a situation where 1,000 jobs each requiring a
		// large amount of bandwidth are all running simultaneously. If the
		// jobs are tiny in terms of resource footprints, the worker will allow
		// more of them to be running at once.
		if w.externTryLaunchAsyncJob() {
			continue
		}

		// Block until:
		//    + New work has been submitted
		//    + The worker is killed
		//    + The renter is stopped
		select {
		case <-w.wakeChan:
			continue
		case <-w.killChan:
			return
		case <-w.renter.tg.StopChan():
			return
		}
	}
}