Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • willsalmon/buildstream
  • CumHoleZH/buildstream
  • tchaik/buildstream
  • DCotyPortfolio/buildstream
  • jesusoctavioas/buildstream
  • patrickmmartin/buildstream
  • franred/buildstream
  • tintou/buildstream
  • alatiera/buildstream
  • martinblanchard/buildstream
  • neverdie22042524/buildstream
  • Mattlk13/buildstream
  • PServers/buildstream
  • phamnghia610909/buildstream
  • chiaratolentino/buildstream
  • eysz7-x-x/buildstream
  • kerrick1/buildstream
  • matthew-yates/buildstream
  • twofeathers/buildstream
  • mhadjimichael/buildstream
  • pointswaves/buildstream
  • Mr.JackWilson/buildstream
  • Tw3akG33k/buildstream
  • AlexFazakas/buildstream
  • eruidfkiy/buildstream
  • clamotion2/buildstream
  • nanonyme/buildstream
  • wickyjaaa/buildstream
  • nmanchev/buildstream
  • bojorquez.ja/buildstream
  • mostynb/buildstream
  • highpit74/buildstream
  • Demo112/buildstream
  • ba2014sheer/buildstream
  • tonimadrino/buildstream
  • usuario2o/buildstream
  • Angelika123456/buildstream
  • neo355/buildstream
  • corentin-ferlay/buildstream
  • coldtom/buildstream
  • wifitvbox81/buildstream
  • 358253885/buildstream
  • seanborg/buildstream
  • SotK/buildstream
  • DouglasWinship/buildstream
  • karansthr97/buildstream
  • louib/buildstream
  • bwh-ct/buildstream
  • robjh/buildstream
  • we88c0de/buildstream
  • zhengxian5555/buildstream
51 results
Show changes
Commits on Source (43)
Showing
with 474 additions and 221 deletions
......@@ -1259,14 +1259,9 @@ into the ``setup.py``, as such, whenever the frontend command line
interface changes, the static man pages should be regenerated and
committed with that.
To do this, first ensure you have ``click_man`` installed, possibly
with::
To do this, run the following from the the toplevel directory of BuildStream::
pip3 install --user click_man
Then, in the toplevel directory of buildstream, run the following::
python3 setup.py --command-packages=click_man.commands man_pages
tox -e man
And commit the result, ensuring that you have added anything in
the ``man/`` subdirectory, which will be automatically included
......
......@@ -20,7 +20,7 @@ buildstream 1.3.1
an element's sources and generated build scripts you can do the command
`bst source-checkout --include-build-scripts --tar foo.bst some-file.tar`
o BREAKING CHANGE: `bst track` and `bst fetch` commands are now osbolete.
o BREAKING CHANGE: `bst track` and `bst fetch` commands are now obsolete.
Their functionality is provided by `bst source track` and
`bst source fetch` respectively.
......
......@@ -64,7 +64,10 @@ class ArtifactCache():
self._required_elements = set() # The elements required for this session
self._cache_size = None # The current cache size, sometimes it's an estimate
self._cache_quota = None # The cache quota
self._cache_quota_original = None # The cache quota as specified by the user, in bytes
self._cache_quota_headroom = None # The headroom in bytes before reaching the quota or full disk
self._cache_lower_threshold = None # The target cache size for a cleanup
self._remotes_setup = False # Check to prevent double-setup of remotes
# Per-project list of _CASRemote instances.
......@@ -216,11 +219,33 @@ class ArtifactCache():
#
# Clean the artifact cache as much as possible.
#
# Args:
# progress (callable): A callback to call when a ref is removed
#
# Returns:
# (int): The size of the cache after having cleaned up
#
def clean(self):
def clean(self, progress=None):
artifacts = self.list_artifacts()
context = self.context
# Some accumulative statistics
removed_ref_count = 0
space_saved = 0
# Start off with an announcement with as much info as possible
volume_size, volume_avail = self.cas._get_cache_volume_size()
self._message(MessageType.STATUS, "Starting cache cleanup",
detail=("Elements required by the current build plan: {}\n" +
"User specified quota: {} ({})\n" +
"Cache usage: {}\n" +
"Cache volume: {} total, {} available")
.format(len(self._required_elements),
context.config_cache_quota,
utils._pretty_size(self._cache_quota_original, dec_places=2),
utils._pretty_size(self.cas.get_cache_size(), dec_places=2),
utils._pretty_size(volume_size, dec_places=2),
utils._pretty_size(volume_avail, dec_places=2)))
# Build a set of the cache keys which are required
# based on the required elements at cleanup time
......@@ -235,9 +260,10 @@ class ArtifactCache():
])
# Do a real computation of the cache size once, just in case
self.compute_cache_size()
usage = self.cas.compute_cache_size()
self._message(MessageType.STATUS, "Cache usage recomputed: {}".format(usage))
while self.get_cache_size() >= self._cache_lower_threshold:
while self.cas.get_cache_size() >= self._cache_lower_threshold:
try:
to_remove = artifacts.pop(0)
except IndexError:
......@@ -245,13 +271,20 @@ class ArtifactCache():
# can't remove them, we have to abort the build.
#
# FIXME: Asking the user what to do may be neater
#
default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'],
'buildstream.conf')
detail = ("There is not enough space to complete the build.\n"
"Please increase the cache-quota in {}."
.format(self.context.config_origin or default_conf))
if self.has_quota_exceeded():
detail = ("Aborted after removing {} refs and saving {} disk space.\n"
"The remaining {} in the cache is required by the {} elements in your build plan\n\n"
"There is not enough space to complete the build.\n"
"Please increase the cache-quota in {} and/or make more disk space."
.format(removed_ref_count,
utils._pretty_size(space_saved, dec_places=2),
utils._pretty_size(self.cas.get_cache_size(), dec_places=2),
len(self._required_elements),
(context.config_origin or default_conf)))
if self.full():
raise ArtifactError("Cache too full. Aborting.",
detail=detail,
reason="cache-too-full")
......@@ -264,24 +297,34 @@ class ArtifactCache():
# Remove the actual artifact, if it's not required.
size = self.remove(to_remove)
# Remove the size from the removed size
self.set_cache_size(self._cache_size - size)
removed_ref_count += 1
space_saved += size
# This should be O(1) if implemented correctly
return self.get_cache_size()
self._message(MessageType.STATUS,
"Freed {: <7} {}".format(
utils._pretty_size(size, dec_places=2),
to_remove))
# compute_cache_size()
#
# Computes the real artifact cache size by calling
# the abstract calculate_cache_size() method.
#
# Returns:
# (int): The size of the artifact cache.
#
def compute_cache_size(self):
self._cache_size = self.cas.calculate_cache_size()
# Remove the size from the removed size
self.cas.set_cache_size(self.cas._cache_size - size)
return self._cache_size
# User callback
#
# Currently this process is fairly slow, but we should
# think about throttling this progress() callback if this
# becomes too intense.
if progress:
progress()
# Informational message about the side effects of the cleanup
self._message(MessageType.INFO, "Cleanup completed",
detail=("Removed {} refs and saving {} disk space.\n" +
"Cache usage is now: {}")
.format(removed_ref_count,
utils._pretty_size(space_saved, dec_places=2),
utils._pretty_size(self.cas.get_cache_size(), dec_places=2)))
return self.cas.get_cache_size()
# add_artifact_size()
#
......@@ -292,61 +335,33 @@ class ArtifactCache():
# artifact_size (int): The size to add.
#
def add_artifact_size(self, artifact_size):
cache_size = self.get_cache_size()
cache_size = self.cas.get_cache_size()
cache_size += artifact_size
self.set_cache_size(cache_size)
self.cas.set_cache_size(cache_size)
# get_cache_size()
# full()
#
# Fetches the cached size of the cache, this is sometimes
# an estimate and periodically adjusted to the real size
# when a cache size calculation job runs.
#
# When it is an estimate, the value is either correct, or
# it is greater than the actual cache size.
# Checks if the artifact cache is full, either
# because the user configured quota has been exceeded
# or because the underlying disk is almost full.
#
# Returns:
# (int) An approximation of the artifact cache size.
# (bool): True if the artifact cache is full
#
def get_cache_size(self):
# If we don't currently have an estimate, figure out the real cache size.
if self._cache_size is None:
stored_size = self._read_cache_size()
if stored_size is not None:
self._cache_size = stored_size
else:
self.compute_cache_size()
return self._cache_size
def full(self):
# set_cache_size()
#
# Forcefully set the overall cache size.
#
# This is used to update the size in the main process after
# having calculated in a cleanup or a cache size calculation job.
#
# Args:
# cache_size (int): The size to set.
#
def set_cache_size(self, cache_size):
if self.get_cache_size() > self._cache_quota:
return True
assert cache_size is not None
_, volume_avail = self._get_cache_volume_size()
if volume_avail < self._cache_quota_headroom:
return True
self._cache_size = cache_size
self._write_cache_size(self._cache_size)
return False
# has_quota_exceeded()
#
# Checks if the current artifact cache size exceeds the quota.
#
# Returns:
# (bool): True of the quota is exceeded
#
def has_quota_exceeded(self):
return self.get_cache_size() > self._cache_quota
return self.cas.get_cache_size() > self._cache_quota
# preflight():
#
......@@ -459,8 +474,7 @@ class ArtifactCache():
# `ArtifactCache.get_artifact_fullname`)
#
# Returns:
# (int|None) The amount of space pruned from the repository in
# Bytes, or None if defer_prune is True
# (int): The amount of space recovered in the cache, in bytes
#
def remove(self, ref):
......@@ -793,44 +807,6 @@ class ArtifactCache():
with self.context.timed_activity("Initializing remote caches", silent_nested=True):
self.initialize_remotes(on_failure=remote_failed)
# _write_cache_size()
#
# Writes the given size of the artifact to the cache's size file
#
# Args:
# size (int): The size of the artifact cache to record
#
def _write_cache_size(self, size):
assert isinstance(size, int)
size_file_path = os.path.join(self.context.artifactdir, CACHE_SIZE_FILE)
with utils.save_file_atomic(size_file_path, "w") as f:
f.write(str(size))
# _read_cache_size()
#
# Reads and returns the size of the artifact cache that's stored in the
# cache's size file
#
# Returns:
# (int): The size of the artifact cache, as recorded in the file
#
def _read_cache_size(self):
size_file_path = os.path.join(self.context.artifactdir, CACHE_SIZE_FILE)
if not os.path.exists(size_file_path):
return None
with open(size_file_path, "r") as f:
size = f.read()
try:
num_size = int(size)
except ValueError as e:
raise ArtifactError("Size '{}' parsed from '{}' was not an integer".format(
size, size_file_path)) from e
return num_size
# _calculate_cache_quota()
#
# Calculates and sets the cache quota and lower threshold based on the
......@@ -844,24 +820,20 @@ class ArtifactCache():
# is taken from the user requested cache_quota.
#
if 'BST_TEST_SUITE' in os.environ:
headroom = 0
self._cache_quota_headroom = 0
else:
headroom = 2e9
artifactdir_volume = self.context.artifactdir
while not os.path.exists(artifactdir_volume):
artifactdir_volume = os.path.dirname(artifactdir_volume)
self._cache_quota_headroom = 2e9
try:
cache_quota = utils._parse_size(self.context.config_cache_quota, artifactdir_volume)
cache_quota = utils._parse_size(self.context.config_cache_quota,
self.context.artifactdir)
except utils.UtilError as e:
raise LoadError(LoadErrorReason.INVALID_DATA,
"{}\nPlease specify the value in bytes or as a % of full disk space.\n"
"\nValid values are, for example: 800M 10G 1T 50%\n"
.format(str(e))) from e
available_space, total_size = self._get_volume_space_info_for(artifactdir_volume)
cache_size = self.get_cache_size()
total_size, available_space = self.cas._get_cache_volume_size()
cache_size = self.cas.get_cache_size()
# Ensure system has enough storage for the cache_quota
#
......@@ -871,27 +843,39 @@ class ArtifactCache():
#
if cache_quota is None: # Infinity, set to max system storage
cache_quota = cache_size + available_space
if cache_quota < headroom: # Check minimum
if cache_quota < self._cache_quota_headroom: # Check minimum
raise LoadError(LoadErrorReason.INVALID_DATA,
"Invalid cache quota ({}): ".format(utils._pretty_size(cache_quota)) +
"BuildStream requires a minimum cache quota of 2G.")
elif cache_quota > cache_size + available_space: # Check maximum
if '%' in self.context.config_cache_quota:
available = (available_space / total_size) * 100
available = '{}% of total disk space'.format(round(available, 1))
else:
available = utils._pretty_size(available_space)
elif cache_quota > total_size:
# A quota greater than the total disk size is certianly an error
raise ArtifactError("Your system does not have enough available " +
"space to support the cache quota specified.",
detail=("You have specified a quota of {quota} total disk space.\n" +
"The filesystem containing {local_cache_path} only " +
"has {available_size} available.")
"has {total_size} total disk space.")
.format(
quota=self.context.config_cache_quota,
local_cache_path=self.context.artifactdir,
available_size=available),
total_size=utils._pretty_size(total_size)),
reason='insufficient-storage-for-quota')
elif cache_quota > cache_size + available_space:
# The quota does not fit in the available space, this is a warning
if '%' in self.context.config_cache_quota:
available = (available_space / total_size) * 100
available = '{}% of total disk space'.format(round(available, 1))
else:
available = utils._pretty_size(available_space)
self._message(MessageType.WARN,
"Your system does not have enough available " +
"space to support the cache quota specified.",
detail=("You have specified a quota of {quota} total disk space.\n" +
"The filesystem containing {local_cache_path} only " +
"has {available_size} available.")
.format(quota=self.context.config_cache_quota,
local_cache_path=self.context.artifactdir,
available_size=available))
# Place a slight headroom (2e9 (2GB) on the cache_quota) into
# cache_quota to try and avoid exceptions.
......@@ -900,23 +884,10 @@ class ArtifactCache():
# if we end up writing more than 2G, but hey, this stuff is
# already really fuzzy.
#
self._cache_quota = cache_quota - headroom
self._cache_quota_original = cache_quota
self._cache_quota = cache_quota - self._cache_quota_headroom
self._cache_lower_threshold = self._cache_quota / 2
# _get_volume_space_info_for
#
# Get the available space and total space for the given volume
#
# Args:
# volume: volume for which to get the size
#
# Returns:
# A tuple containing first the availabe number of bytes on the requested
# volume, then the total number of bytes of the volume.
def _get_volume_space_info_for(self, volume):
stat = os.statvfs(volume)
return stat.f_bsize * stat.f_bavail, stat.f_bsize * stat.f_blocks
# _configured_remote_artifact_cache_specs():
#
......
......@@ -17,5 +17,5 @@
# Authors:
# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
from .cascache import CASCache
from .cascache import CASCache, CASCacheUsage
from .casremote import CASRemote, CASRemoteSpec
......@@ -36,20 +36,61 @@ from .._exceptions import CASCacheError
from .casremote import BlobNotFound, _CASBatchRead, _CASBatchUpdate
CACHE_SIZE_FILE = "cache_size"
# CASCacheUsage
#
# A simple object to report the current CAS cache usage details.
#
# Note that this uses the user configured cache quota
# rather than the internal quota with protective headroom
# removed, to provide a more sensible value to display to
# the user.
#
# Args:
# cas (CASCache): The CAS cache to get the status of
#
class CASCacheUsage():
def __init__(self, cas):
self.quota_config = cas._config_cache_quota # Configured quota
self.quota_size = cas._cache_quota_original # Resolved cache quota in bytes
self.used_size = cas.get_cache_size() # Size used by artifacts in bytes
self.used_percent = 0 # Percentage of the quota used
if self.quota_size is not None:
self.used_percent = int(self.used_size * 100 / self.quota_size)
# Formattable into a human readable string
#
def __str__(self):
return "{} / {} ({}%)" \
.format(utils._pretty_size(self.used_size, dec_places=1),
self.quota_config,
self.used_percent)
# A CASCache manages a CAS repository as specified in the Remote Execution API.
#
# Args:
# path (str): The root directory for the CAS repository
# cache_quota (int): User configured cache quota
#
class CASCache():
def __init__(self, path):
def __init__(self, path, cache_quota):
self.casdir = os.path.join(path, 'cas')
self.tmpdir = os.path.join(path, 'tmp')
os.makedirs(os.path.join(self.casdir, 'refs', 'heads'), exist_ok=True)
os.makedirs(os.path.join(self.casdir, 'objects'), exist_ok=True)
os.makedirs(self.tmpdir, exist_ok=True)
self._config_cache_quota = cache_quota
self._cache_size = None # The current cache size, sometimes it's an estimate
self._cache_quota = None # The cache quota
self._cache_quota_original = None # The cache quota as specified by the user, in bytes
self._cache_lower_threshold = None # The target cache size for a cleanup
# preflight():
#
# Preflight check.
......@@ -587,6 +628,65 @@ class CASCache():
reachable = set()
self._reachable_refs_dir(reachable, tree, update_mtime=True)
# compute_cache_size()
#
# Computes the real artifact cache size by calling
# the abstract calculate_cache_size() method.
#
# Returns:
# (int): The size of the artifact cache.
#
def compute_cache_size(self):
old_cache_size = self._cache_size
new_cache_size = self.calculate_cache_size()
if old_cache_size != new_cache_size:
self._cache_size = new_cache_size
return self._cache_size
# get_cache_size()
#
# Fetches the cached size of the cache, this is sometimes
# an estimate and periodically adjusted to the real size
# when a cache size calculation job runs.
#
# When it is an estimate, the value is either correct, or
# it is greater than the actual cache size.
#
# Returns:
# (int) An approximation of the artifact cache size, in bytes.
#
def get_cache_size(self):
# If we don't currently have an estimate, figure out the real cache size.
if self._cache_size is None:
stored_size = self._read_cache_size()
if stored_size is not None:
self._cache_size = stored_size
else:
self._cache_size = self.compute_cache_size()
# self._message(MessageType.STATUS, "Cache usage recomputed: {}".format(usage))
return self._cache_size
# set_cache_size()
#
# Forcefully set the overall cache size.
#
# This is used to update the size in the main process after
# having calculated in a cleanup or a cache size calculation job.
#
# Args:
# cache_size (int): The size to set.
#
def set_cache_size(self, cache_size):
assert cache_size is not None
self._cache_size = cache_size
self._write_cache_size(self._cache_size)
################################################
# Local Private Methods #
################################################
......@@ -1015,6 +1115,60 @@ class CASCache():
# Send final batch
batch.send()
# _read_cache_size()
#
# Reads and returns the size of the artifact cache that's stored in the
# cache's size file
#
# Returns:
# (int): The size of the artifact cache, as recorded in the file
#
def _read_cache_size(self):
size_file_path = os.path.join(self.casdir, CACHE_SIZE_FILE)
if not os.path.exists(size_file_path):
return None
with open(size_file_path, "r") as f:
size = f.read()
try:
num_size = int(size)
except ValueError as e:
raise CASCacheError("Size '{}' parsed from '{}' was not an integer".format(
size, size_file_path)) from e
return num_size
# _write_cache_size()
#
# Writes the given size of the artifact to the cache's size file
#
# Args:
# size (int): The size of the artifact cache to record
#
def _write_cache_size(self, size):
assert isinstance(size, int)
size_file_path = os.path.join(self.casdir, CACHE_SIZE_FILE)
with utils.save_file_atomic(size_file_path, "w") as f:
f.write(str(size))
# _get_cache_volume_size()
#
# Get the available space and total space for the volume on
# which the artifact cache is located.
#
# Returns:
# (int): The total number of bytes on the volume
# (int): The number of available bytes on the volume
#
# NOTE: We use this stub to allow the test cases
# to override what an artifact cache thinks
# about it's disk size and available bytes.
#
def _get_cache_volume_size(self):
return utils._get_volume_size(self.casdir)
def _grouper(iterable, n):
while True:
......
......@@ -61,7 +61,7 @@ class ArtifactTooLargeException(Exception):
def create_server(repo, *, enable_push,
max_head_size=int(10e9),
min_head_size=int(2e9)):
cas = CASCache(os.path.abspath(repo))
cas = CASCache(os.path.abspath(repo), max_head_size)
# Use max_workers default from Python 3.5+
max_workers = (os.cpu_count() or 1) * 5
......@@ -324,7 +324,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
blob_response.digest.size_bytes = digest.size_bytes
if len(blob_request.data) != digest.size_bytes:
blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
blob_response.status.code = code_pb2.FAILED_PRECONDITION
continue
try:
......@@ -335,10 +335,10 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
out.flush()
server_digest = self.cas.add_object(path=out.name)
if server_digest.hash != digest.hash:
blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
blob_response.status.code = code_pb2.FAILED_PRECONDITION
except ArtifactTooLargeException:
blob_response.status.code = grpc.StatusCode.RESOURCE_EXHAUSTED
blob_response.status.code = code_pb2.RESOURCE_EXHAUSTED
return response
......
......@@ -31,7 +31,7 @@ from ._exceptions import LoadError, LoadErrorReason, BstError
from ._message import Message, MessageType
from ._profile import Topics, profile_start, profile_end
from ._artifactcache import ArtifactCache
from ._cas import CASCache
from ._cas import CASCache, CASCacheUsage
from ._workspaces import Workspaces, WorkspaceProjectCache
from .plugin import _plugin_lookup
from .sandbox import SandboxRemote
......@@ -58,12 +58,21 @@ class Context():
# Filename indicating which configuration file was used, or None for the defaults
self.config_origin = None
# The directory under which other directories are based
self.rootcachedir = None
# The directory where various sources are stored
self.sourcedir = None
# The directory where build sandboxes will be created
self.builddir = None
# The directory for CAS
self.casdir = None
# The directory for temporary files
self.tmpdir = None
# Default root location for workspaces
self.workspacedir = None
......@@ -188,13 +197,30 @@ class Context():
user_config = _yaml.load(config)
_yaml.composite(defaults, user_config)
# Give deprecation warnings
if defaults.get('builddir'):
print("builddir is deprecated, use rootcachedir")
else:
defaults['builddir'] = os.path.join(defaults['rootcachedir'], 'build')
if defaults.get('artifactdir'):
print("artifactdir is deprecated, use rootcachedir")
else:
defaults['artifactdir'] = os.path.join(defaults['rootcachedir'], 'artifacts')
_yaml.node_validate(defaults, [
'sourcedir', 'builddir', 'artifactdir', 'logdir',
'rootcachedir', 'sourcedir', 'builddir', 'artifactdir', 'logdir',
'scheduler', 'artifacts', 'logging', 'projects',
'cache', 'prompt', 'workspacedir', 'remote-execution'
'cache', 'prompt', 'workspacedir', 'remote-execution',
])
for directory in ['sourcedir', 'builddir', 'artifactdir', 'logdir', 'workspacedir']:
# add directories not set by users
defaults['tmpdir'] = os.path.join(defaults['rootcachedir'], 'tmp')
defaults['casdir'] = os.path.join(defaults['rootcachedir'], 'cas')
for directory in ['rootcachedir', 'sourcedir', 'builddir',
'artifactdir', 'logdir', 'workspacedir', 'casdir',
'tmpdir']:
# Allow the ~ tilde expansion and any environment variables in
# path specification in the config files.
#
......@@ -289,6 +315,16 @@ class Context():
return self._artifactcache
# get_cache_usage()
#
# Fetches the current usage of the artifact cache
#
# Returns:
# (CASCacheUsage): The current status
#
def get_cache_usage(self):
return CASCacheUsage(self.get_cascache())
# add_project():
#
# Add a project to the context.
......@@ -654,7 +690,7 @@ class Context():
def get_cascache(self):
if self._cascache is None:
self._cascache = CASCache(self.artifactdir)
self._cascache = CASCache(self.rootcachedir, self.config_cache_quota)
return self._cascache
......
......@@ -194,11 +194,6 @@ class App():
except BstError as e:
self._error_exit(e, "Error instantiating platform")
try:
self.context.artifactcache.preflight()
except BstError as e:
self._error_exit(e, "Error instantiating artifact cache")
# Create the logger right before setting the message handler
self.logger = LogLine(self.context,
self._content_profile,
......@@ -211,6 +206,13 @@ class App():
# Propagate pipeline feedback to the user
self.context.set_message_handler(self._message_handler)
# Preflight the artifact cache after initializing logging,
# this can cause messages to be emitted.
try:
self.context.artifactcache.preflight()
except BstError as e:
self._error_exit(e, "Error instantiating artifact cache")
#
# Load the Project
#
......
......@@ -11,7 +11,6 @@ from .._exceptions import BstError, LoadError, AppError
from .._versions import BST_FORMAT_VERSION
from .complete import main_bashcomplete, complete_path, CompleteUnhandled
##################################################################
# Override of click's main entry point #
##################################################################
......@@ -526,7 +525,7 @@ def shell(app, element, sysroot, mount, isolate, build_, cli_buildtree, command)
else:
scope = Scope.RUN
use_buildtree = False
use_buildtree = None
with app.initialized():
if not element:
......@@ -534,7 +533,8 @@ def shell(app, element, sysroot, mount, isolate, build_, cli_buildtree, command)
if not element:
raise AppError('Missing argument "ELEMENT".')
dependencies = app.stream.load_selection((element,), selection=PipelineSelection.NONE)
dependencies = app.stream.load_selection((element,), selection=PipelineSelection.NONE,
use_artifact_config=True)
element = dependencies[0]
prompt = app.shell_prompt(element)
mounts = [
......@@ -543,20 +543,31 @@ def shell(app, element, sysroot, mount, isolate, build_, cli_buildtree, command)
]
cached = element._cached_buildtree()
if cli_buildtree == "always":
if cached:
use_buildtree = True
else:
raise AppError("No buildtree is cached but the use buildtree option was specified")
elif cli_buildtree == "never":
pass
elif cli_buildtree == "try":
use_buildtree = cached
if cli_buildtree in ("always", "try"):
use_buildtree = cli_buildtree
if not cached and use_buildtree == "always":
click.echo("WARNING: buildtree is not cached locally, will attempt to pull from available remotes",
err=True)
else:
if app.interactive and cached:
use_buildtree = bool(click.confirm('Do you want to use the cached buildtree?'))
# If the value has defaulted to ask and in non interactive mode, don't consider the buildtree, this
# being the default behaviour of the command
if app.interactive and cli_buildtree == "ask":
if cached and bool(click.confirm('Do you want to use the cached buildtree?')):
use_buildtree = "always"
elif not cached:
try:
choice = click.prompt("Do you want to pull & use a cached buildtree?",
type=click.Choice(['try', 'always', 'never']),
err=True, show_choices=True)
except click.Abort:
click.echo('Aborting', err=True)
sys.exit(-1)
if choice != "never":
use_buildtree = choice
if use_buildtree and not element._cached_success():
click.echo("Warning: using a buildtree from a failed build.")
click.echo("WARNING: using a buildtree from a failed build.", err=True)
try:
exitcode = app.stream.shell(element, scope, prompt,
......
......@@ -353,13 +353,17 @@ class _StatusHeader():
def render(self, line_length, elapsed):
project = self._context.get_toplevel_project()
line_length = max(line_length, 80)
size = 0
text = ''
#
# Line 1: Session time, project name, session / total elements
#
# ========= 00:00:00 project-name (143/387) =========
#
session = str(len(self._stream.session_elements))
total = str(len(self._stream.total_elements))
# Format and calculate size for target and overall time code
size = 0
text = ''
size += len(total) + len(session) + 4 # Size for (N/N) with a leading space
size += 8 # Size of time code
size += len(project.name) + 1
......@@ -372,6 +376,12 @@ class _StatusHeader():
self._format_profile.fmt(')')
line1 = self._centered(text, size, line_length, '=')
#
# Line 2: Dynamic list of queue status reports
#
# (Fetched:0 117 0)→ (Built:4 0 0)
#
size = 0
text = ''
......@@ -389,10 +399,28 @@ class _StatusHeader():
line2 = self._centered(text, size, line_length, ' ')
size = 24
text = self._format_profile.fmt("~~~~~ ") + \
self._content_profile.fmt('Active Tasks') + \
self._format_profile.fmt(" ~~~~~")
#
# Line 3: Cache usage percentage report
#
# ~~~~~~ cache: 69% ~~~~~~
#
usage = self._context.get_cache_usage()
usage_percent = '{}%'.format(usage.used_percent)
size = 21
size += len(usage_percent)
if usage.used_percent >= 95:
formatted_usage_percent = self._error_profile.fmt(usage_percent)
elif usage.used_percent >= 80:
formatted_usage_percent = self._content_profile.fmt(usage_percent)
else:
formatted_usage_percent = self._success_profile.fmt(usage_percent)
text = self._format_profile.fmt("~~~~~~ ") + \
self._content_profile.fmt('cache') + \
self._format_profile.fmt(': ') + \
formatted_usage_percent + \
self._format_profile.fmt(' ~~~~~~')
line3 = self._centered(text, size, line_length, ' ')
return line1 + '\n' + line2 + '\n' + line3
......
......@@ -452,6 +452,7 @@ class LogLine(Widget):
values["Session Start"] = starttime.strftime('%A, %d-%m-%Y at %H:%M:%S')
values["Project"] = "{} ({})".format(project.name, project.directory)
values["Targets"] = ", ".join([t.name for t in stream.targets])
values["Cache Usage"] = "{}".format(context.get_cache_usage())
text += self._format_values(values)
# User configurations
......
......@@ -605,7 +605,7 @@ class _GitSourceBase(Source):
detail = "The ref provided for the element does not exist locally " + \
"in the provided track branch / tag '{}'.\n".format(self.tracking) + \
"You may wish to track the element to update the ref from '{}' ".format(self.tracking) + \
"with `bst track`,\n" + \
"with `bst source track`,\n" + \
"or examine the upstream at '{}' for the specific ref.".format(self.mirror.url)
self.warn("{}: expected ref '{}' was not found in given track '{}' for staged repository: '{}'\n"
......
......@@ -557,7 +557,7 @@ class Loader():
ticker(filename, 'Fetching subproject from {} source'.format(source.get_kind()))
source._fetch(sources[0:idx])
else:
detail = "Try fetching the project with `bst fetch {}`".format(filename)
detail = "Try fetching the project with `bst source fetch {}`".format(filename)
raise LoadError(LoadErrorReason.SUBPROJECT_FETCH_NEEDED,
"Subproject fetch needed for junction: {}".format(filename),
detail=detail)
......@@ -565,7 +565,7 @@ class Loader():
# Handle the case where a subproject has no ref
#
elif source.get_consistency() == Consistency.INCONSISTENT:
detail = "Try tracking the junction element with `bst track {}`".format(filename)
detail = "Try tracking the junction element with `bst source track {}`".format(filename)
raise LoadError(LoadErrorReason.SUBPROJECT_INCONSISTENT,
"Subproject has no ref for junction: {}".format(filename),
detail=detail)
......
......@@ -373,7 +373,7 @@ class Pipeline():
if source._get_consistency() == Consistency.INCONSISTENT:
detail += " {} is missing ref\n".format(source)
detail += '\n'
detail += "Try tracking these elements first with `bst track`\n"
detail += "Try tracking these elements first with `bst source track`\n"
raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline")
......@@ -406,7 +406,7 @@ class Pipeline():
if source._get_consistency() != Consistency.CACHED:
detail += " {}\n".format(source)
detail += '\n'
detail += "Try fetching these elements first with `bst fetch`,\n" + \
detail += "Try fetching these elements first with `bst source fetch`,\n" + \
"or run this command with `--fetch` option\n"
raise PipelineError("Uncached sources", detail=detail, reason="uncached-sources")
......
#
# Copyright (C) 2017 Codethink Limited
# Copyright (C) 2019 Bloomberg Finance LP
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
......@@ -16,6 +17,7 @@
#
# Authors:
# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
# James Ennis <james.ennis@codethink.co.uk>
import cProfile
import pstats
......@@ -46,6 +48,8 @@ class Topics():
LOAD_CONTEXT = 'load-context'
LOAD_PROJECT = 'load-project'
LOAD_PIPELINE = 'load-pipeline'
LOAD_SELECTION = 'load-selection'
SCHEDULER = 'scheduler'
SHOW = 'show'
ARTIFACT_RECEIVE = 'artifact-receive'
ALL = 'all'
......
......@@ -25,14 +25,14 @@ class CacheSizeJob(Job):
self._complete_cb = complete_cb
context = self._scheduler.context
self._artifacts = context.artifactcache
self._cas = context.get_cascache()
def child_process(self):
return self._artifacts.compute_cache_size()
return self._cas.compute_cache_size()
def parent_complete(self, status, result):
if status == JobStatus.OK:
self._artifacts.set_cache_size(result)
self._cas.set_cache_size(result)
if self._complete_cb:
self._complete_cb(status, result)
......
......@@ -25,14 +25,27 @@ class CleanupJob(Job):
self._complete_cb = complete_cb
context = self._scheduler.context
self._cas = context.get_cascache()
self._artifacts = context.artifactcache
def child_process(self):
return self._artifacts.clean()
def progress():
self.send_message('update-cache-size',
self._cas.get_cache_size())
return self._artifacts.clean(progress)
def handle_message(self, message_type, message):
# Update the cache size in the main process as we go,
# this provides better feedback in the UI.
if message_type == 'update-cache-size':
self._cas.set_cache_size(message)
return True
return False
def parent_complete(self, status, result):
if status == JobStatus.OK:
self._artifacts.set_cache_size(result)
self._cas.set_cache_size(result)
if self._complete_cb:
self._complete_cb(status, result)
......@@ -58,10 +58,10 @@ class JobStatus():
# Used to distinguish between status messages and return values
class Envelope():
class _Envelope():
def __init__(self, message_type, message):
self._message_type = message_type
self._message = message
self.message_type = message_type
self.message = message
# Process class that doesn't call waitpid on its own.
......@@ -117,6 +117,8 @@ class Job():
self._logfile = logfile
self._task_id = None
print("job init")
# spawn()
#
# Spawns the job.
......@@ -275,10 +277,37 @@ class Job():
def set_task_id(self, task_id):
self._task_id = task_id
# send_message()
#
# To be called from inside Job.child_process() implementations
# to send messages to the main process during processing.
#
# These messages will be processed by the class's Job.handle_message()
# implementation.
#
def send_message(self, message_type, message):
self._queue.put(_Envelope(message_type, message))
#######################################################
# Abstract Methods #
#######################################################
# handle_message()
#
# Handle a custom message. This will be called in the main process in
# response to any messages sent to the main proces using the
# Job.send_message() API from inside a Job.child_process() implementation
#
# Args:
# message_type (str): A string to identify the message type
# message (any): A simple serializable object
#
# Returns:
# (bool): Should return a truthy value if message_type is handled.
#
def handle_message(self, message_type, message):
return False
# parent_complete()
#
# This will be executed after the job finishes, and is expected to
......@@ -354,7 +383,6 @@ class Job():
# queue (multiprocessing.Queue): The message queue for IPC
#
def _child_action(self, queue):
# This avoids some SIGTSTP signals from grandchildren
# getting propagated up to the master process
os.setsid()
......@@ -416,7 +444,7 @@ class Job():
elapsed=elapsed, detail=e.detail,
logfile=filename, sandbox=e.sandbox)
self._queue.put(Envelope('child_data', self.child_process_data()))
self._queue.put(_Envelope('child_data', self.child_process_data()))
# Report the exception to the parent (for internal testing purposes)
self._child_send_error(e)
......@@ -442,7 +470,7 @@ class Job():
else:
# No exception occurred in the action
self._queue.put(Envelope('child_data', self.child_process_data()))
self._queue.put(_Envelope('child_data', self.child_process_data()))
self._child_send_result(result)
elapsed = datetime.datetime.now() - starttime
......@@ -469,7 +497,7 @@ class Job():
domain = e.domain
reason = e.reason
envelope = Envelope('error', {
envelope = _Envelope('error', {
'domain': domain,
'reason': reason
})
......@@ -487,7 +515,7 @@ class Job():
#
def _child_send_result(self, result):
if result is not None:
envelope = Envelope('result', result)
envelope = _Envelope('result', result)
self._queue.put(envelope)
# _child_shutdown()
......@@ -524,7 +552,7 @@ class Job():
if message.message_type == MessageType.LOG:
return
self._queue.put(Envelope('message', message))
self._queue.put(_Envelope('message', message))
# _parent_shutdown()
#
......@@ -588,24 +616,28 @@ class Job():
if not self._listening:
return
if envelope._message_type == 'message':
if envelope.message_type == 'message':
# Propagate received messages from children
# back through the context.
self._scheduler.context.message(envelope._message)
elif envelope._message_type == 'error':
self._scheduler.context.message(envelope.message)
elif envelope.message_type == 'error':
# For regression tests only, save the last error domain / reason
# reported from a child task in the main process, this global state
# is currently managed in _exceptions.py
set_last_task_error(envelope._message['domain'],
envelope._message['reason'])
elif envelope._message_type == 'result':
set_last_task_error(envelope.message['domain'],
envelope.message['reason'])
elif envelope.message_type == 'result':
assert self._result is None
self._result = envelope._message
elif envelope._message_type == 'child_data':
self._result = envelope.message
elif envelope.message_type == 'child_data':
# If we retry a job, we assign a new value to this
self.child_data = envelope._message
else:
raise Exception()
self.child_data = envelope.message
# Try Job subclass specific messages now
elif not self.handle_message(envelope.message_type,
envelope.message):
assert 0, "Unhandled message type '{}': {}" \
.format(envelope.message_type, envelope.message)
# _parent_process_queue()
#
......
......@@ -100,7 +100,7 @@ class BuildQueue(Queue):
# If the estimated size outgrows the quota, ask the scheduler
# to queue a job to actually check the real cache size.
#
if artifacts.has_quota_exceeded():
if artifacts.full():
self._scheduler.check_cache_size()
def done(self, job, element, result, status):
......
......@@ -29,6 +29,7 @@ from contextlib import contextmanager
# Local imports
from .resources import Resources, ResourceType
from .jobs import JobStatus, CacheSizeJob, CleanupJob
from .._profile import Topics, profile_start, profile_end
# A decent return code for Scheduler.run()
......@@ -154,11 +155,16 @@ class Scheduler():
# Check if we need to start with some cache maintenance
self._check_cache_management()
# Start the profiler
profile_start(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues))
# Run the queues
self._sched()
self.loop.run_forever()
self.loop.close()
profile_end(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues))
# Stop handling unix signals
self._disconnect_signals()
......@@ -297,7 +303,7 @@ class Scheduler():
# starts while we are checking the cache.
#
artifacts = self.context.artifactcache
if artifacts.has_quota_exceeded():
if artifacts.full():
self._sched_cache_size_job(exclusive=True)
# _spawn_job()
......@@ -308,10 +314,10 @@ class Scheduler():
# job (Job): The job to spawn
#
def _spawn_job(self, job):
job.spawn()
self._active_jobs.append(job)
if self._job_start_callback:
self._job_start_callback(job)
job.spawn()
# Callback for the cache size job
def _cache_size_job_complete(self, status, cache_size):
......@@ -332,7 +338,7 @@ class Scheduler():
context = self.context
artifacts = context.artifactcache
if artifacts.has_quota_exceeded():
if artifacts.full():
self._cleanup_scheduled = True
# Callback for the cleanup job
......