Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • willsalmon/buildstream
  • CumHoleZH/buildstream
  • tchaik/buildstream
  • DCotyPortfolio/buildstream
  • jesusoctavioas/buildstream
  • patrickmmartin/buildstream
  • franred/buildstream
  • tintou/buildstream
  • alatiera/buildstream
  • martinblanchard/buildstream
  • neverdie22042524/buildstream
  • Mattlk13/buildstream
  • PServers/buildstream
  • phamnghia610909/buildstream
  • chiaratolentino/buildstream
  • eysz7-x-x/buildstream
  • kerrick1/buildstream
  • matthew-yates/buildstream
  • twofeathers/buildstream
  • mhadjimichael/buildstream
  • pointswaves/buildstream
  • Mr.JackWilson/buildstream
  • Tw3akG33k/buildstream
  • AlexFazakas/buildstream
  • eruidfkiy/buildstream
  • clamotion2/buildstream
  • nanonyme/buildstream
  • wickyjaaa/buildstream
  • nmanchev/buildstream
  • bojorquez.ja/buildstream
  • mostynb/buildstream
  • highpit74/buildstream
  • Demo112/buildstream
  • ba2014sheer/buildstream
  • tonimadrino/buildstream
  • usuario2o/buildstream
  • Angelika123456/buildstream
  • neo355/buildstream
  • corentin-ferlay/buildstream
  • coldtom/buildstream
  • wifitvbox81/buildstream
  • 358253885/buildstream
  • seanborg/buildstream
  • SotK/buildstream
  • DouglasWinship/buildstream
  • karansthr97/buildstream
  • louib/buildstream
  • bwh-ct/buildstream
  • robjh/buildstream
  • we88c0de/buildstream
  • zhengxian5555/buildstream
51 results
Show changes
Commits on Source (11)
......@@ -46,6 +46,30 @@ class ArtifactCacheSpec(CASRemoteSpec):
pass
# ArtifactCacheUsage
#
# A simple object to report the current artifact cache
# usage details.
#
# Note that this uses the user configured cache quota
# rather than the internal quota with protective headroom
# removed, to provide a more sensible value to display to
# the user.
#
# Args:
# artifacts (ArtifactCache): The artifact cache to get the status of
#
class ArtifactCacheUsage():
def __init__(self, artifacts):
context = artifacts.context
self.quota_config = context.config_cache_quota # Configured quota
self.quota_size = artifacts._cache_quota_original # Resolved cache quota in bytes
self.used_size = artifacts.get_cache_size() # Size used by artifacts in bytes
self.used_percent = None # Percentage of the quota used
self.used_percent = int(self.used_size * 100 / self.quota_size)
# An ArtifactCache manages artifacts.
#
# Args:
......@@ -64,6 +88,7 @@ class ArtifactCache():
self._required_elements = set() # The elements required for this session
self._cache_size = None # The current cache size, sometimes it's an estimate
self._cache_quota = None # The cache quota
self._cache_quota_original = None # The cache quota as specified by the user, in bytes
self._cache_lower_threshold = None # The target cache size for a cleanup
self._remotes_setup = False # Check to prevent double-setup of remotes
......@@ -307,7 +332,7 @@ class ArtifactCache():
# it is greater than the actual cache size.
#
# Returns:
# (int) An approximation of the artifact cache size.
# (int) An approximation of the artifact cache size, in bytes.
#
def get_cache_size(self):
......@@ -848,19 +873,16 @@ class ArtifactCache():
else:
headroom = 2e9
artifactdir_volume = self.context.artifactdir
while not os.path.exists(artifactdir_volume):
artifactdir_volume = os.path.dirname(artifactdir_volume)
try:
cache_quota = utils._parse_size(self.context.config_cache_quota, artifactdir_volume)
cache_quota = utils._parse_size(self.context.config_cache_quota,
self.context.artifactdir)
except utils.UtilError as e:
raise LoadError(LoadErrorReason.INVALID_DATA,
"{}\nPlease specify the value in bytes or as a % of full disk space.\n"
"\nValid values are, for example: 800M 10G 1T 50%\n"
.format(str(e))) from e
available_space, total_size = self._get_volume_space_info_for(artifactdir_volume)
total_size, available_space = self._get_cache_volume_size()
cache_size = self.get_cache_size()
# Ensure system has enough storage for the cache_quota
......@@ -882,16 +904,16 @@ class ArtifactCache():
else:
available = utils._pretty_size(available_space)
raise LoadError(LoadErrorReason.INVALID_DATA,
("Your system does not have enough available " +
"space to support the cache quota specified.\n" +
"\nYou have specified a quota of {quota} total disk space.\n" +
"The filesystem containing {local_cache_path} only " +
"has: {available_size} available.")
.format(
quota=self.context.config_cache_quota,
local_cache_path=self.context.artifactdir,
available_size=available))
raise ArtifactError("Your system does not have enough available " +
"space to support the cache quota specified.",
detail=("You have specified a quota of {quota} total disk space.\n" +
"The filesystem containing {local_cache_path} only " +
"has {available_size} available.")
.format(
quota=self.context.config_cache_quota,
local_cache_path=self.context.artifactdir,
available_size=available),
reason='insufficient-storage-for-quota')
# Place a slight headroom (2e9 (2GB) on the cache_quota) into
# cache_quota to try and avoid exceptions.
......@@ -900,22 +922,25 @@ class ArtifactCache():
# if we end up writing more than 2G, but hey, this stuff is
# already really fuzzy.
#
self._cache_quota_original = cache_quota
self._cache_quota = cache_quota - headroom
self._cache_lower_threshold = self._cache_quota / 2
# _get_volume_space_info_for
# _get_cache_volume_size()
#
# Get the available space and total space for the given volume
#
# Args:
# volume: volume for which to get the size
# Get the available space and total space for the volume on
# which the artifact cache is located.
#
# Returns:
# A tuple containing first the availabe number of bytes on the requested
# volume, then the total number of bytes of the volume.
def _get_volume_space_info_for(self, volume):
stat = os.statvfs(volume)
return stat.f_bsize * stat.f_bavail, stat.f_bsize * stat.f_blocks
# (int): The total number of bytes on the volume
# (int): The number of available bytes on the volume
#
# NOTE: We use this stub to allow the test cases
# to override what an artifact cache thinks
# about it's disk size and available bytes.
#
def _get_cache_volume_size(self):
return utils._get_volume_size(self.context.artifactdir)
# _configured_remote_artifact_cache_specs():
......
......@@ -30,7 +30,7 @@ from . import _yaml
from ._exceptions import LoadError, LoadErrorReason, BstError
from ._message import Message, MessageType
from ._profile import Topics, profile_start, profile_end
from ._artifactcache import ArtifactCache
from ._artifactcache import ArtifactCache, ArtifactCacheUsage
from ._cas import CASCache
from ._workspaces import Workspaces, WorkspaceProjectCache, WORKSPACE_PROJECT_FILE
from .plugin import _plugin_lookup
......@@ -289,6 +289,16 @@ class Context():
return self._artifactcache
# get_artifact_cache_usage()
#
# Fetches the current usage of the artifact cache
#
# Returns:
# (ArtifactCacheUsage): The current status
#
def get_artifact_cache_usage(self):
return ArtifactCacheUsage(self.artifactcache)
# add_project():
#
# Add a project to the context.
......
......@@ -33,6 +33,7 @@ from .. import __version__ as bst_version
from .._exceptions import ImplError
from .._message import MessageType
from ..plugin import _plugin_lookup
from .. import utils
# These messages are printed a bit differently
......@@ -178,26 +179,22 @@ class ElementName(Widget):
def __init__(self, context, content_profile, format_profile):
super(ElementName, self).__init__(context, content_profile, format_profile)
# Pre initialization format string, before we know the length of
# element names in the pipeline
self._fmt_string = '{: <30}'
def render(self, message):
action_name = message.action_name
element_id = message.task_id or message.unique_id
if element_id is None:
return ""
plugin = _plugin_lookup(element_id)
name = plugin._get_full_name()
if element_id is not None:
plugin = _plugin_lookup(element_id)
name = plugin._get_full_name()
name = '{: <30}'.format(name)
else:
name = 'core activity'
name = '{: <30}'.format(name)
# Sneak the action name in with the element name
action_name = message.action_name
if not action_name:
action_name = "Main"
return self.content_profile.fmt("{: >5}".format(action_name.lower())) + \
self.format_profile.fmt(':') + \
self.content_profile.fmt(self._fmt_string.format(name))
self.format_profile.fmt(':') + self.content_profile.fmt(name)
# A widget for displaying the primary message text
......@@ -219,9 +216,12 @@ class CacheKey(Widget):
def render(self, message):
element_id = message.task_id or message.unique_id
if element_id is None or not self._key_length:
if not self._key_length:
return ""
if element_id is None:
return ' ' * self._key_length
missing = False
key = ' ' * self._key_length
plugin = _plugin_lookup(element_id)
......@@ -450,12 +450,16 @@ class LogLine(Widget):
self._resolved_keys = {element: element._get_cache_key() for element in stream.session_elements}
# Main invocation context
usage = context.get_artifact_cache_usage()
text += '\n'
text += self.content_profile.fmt("BuildStream Version {}\n".format(bst_version), bold=True)
values = OrderedDict()
values["Session Start"] = starttime.strftime('%A, %d-%m-%Y at %H:%M:%S')
values["Project"] = "{} ({})".format(project.name, project.directory)
values["Targets"] = ", ".join([t.name for t in stream.targets])
values["Cache Usage"] = "{} / {} ({}%)".format(
utils._pretty_size(usage.used_size, dec_places=1),
usage.quota_config, usage.used_percent)
text += self._format_values(values)
# User configurations
......
......@@ -163,4 +163,4 @@ class Resources():
def unregister_exclusive_interest(self, resources, source):
for resource in resources:
self._exclusive_resources[resource].remove(source)
self._exclusive_resources[resource].discard(source)
......@@ -40,8 +40,8 @@ class SchedStatus():
# Some action names for the internal jobs we launch
#
_ACTION_NAME_CLEANUP = 'cleanup'
_ACTION_NAME_CACHE_SIZE = 'cache_size'
_ACTION_NAME_CLEANUP = 'clean'
_ACTION_NAME_CACHE_SIZE = 'size'
# Scheduler()
......@@ -151,6 +151,9 @@ class Scheduler():
# Handle unix signals while running
self._connect_signals()
# Check if we need to start with some cache maintenance
self._check_cache_management()
# Run the queues
self._sched()
self.loop.run_forever()
......@@ -272,6 +275,31 @@ class Scheduler():
# Local Private Methods #
#######################################################
# _check_cache_management()
#
# Run an initial check if we need to lock the cache
# resource and check the size and possibly launch
# a cleanup.
#
# Sessions which do not add to the cache are not affected.
#
def _check_cache_management(self):
# Only trigger the check for a scheduler run which has
# queues which require the CACHE resource.
if not any(q for q in self.queues
if ResourceType.CACHE in q.resources):
return
# If the estimated size outgrows the quota, queue a job to
# actually check the real cache size initially, this one
# should have exclusive access to the cache to ensure nothing
# starts while we are checking the cache.
#
artifacts = self.context.artifactcache
if artifacts.has_quota_exceeded():
self._sched_cache_size_job(exclusive=True)
# _spawn_job()
#
# Spanws a job
......@@ -292,6 +320,11 @@ class Scheduler():
self._cache_size_running = None
self.resources.release([ResourceType.CACHE, ResourceType.PROCESS])
# Unregister the exclusive interest if there was any
self.resources.unregister_exclusive_interest(
[ResourceType.CACHE], 'cache-size'
)
# Schedule a cleanup job if we've hit the threshold
if status != JobStatus.OK:
return
......@@ -344,11 +377,35 @@ class Scheduler():
# Runs a cache size job if one is scheduled to run now and
# sufficient recources are available.
#
def _sched_cache_size_job(self):
# Args:
# exclusive (bool): Run a cache size job immediately and
# hold the ResourceType.CACHE resource
# exclusively (used at startup).
#
def _sched_cache_size_job(self, *, exclusive=False):
# The exclusive argument is not intended (or safe) for arbitrary use.
if exclusive:
assert not self._cache_size_scheduled
assert not self._cache_size_running
assert not self._active_jobs
self._cache_size_scheduled = True
if self._cache_size_scheduled and not self._cache_size_running:
if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS]):
# Handle the exclusive launch
exclusive_resources = set()
if exclusive:
exclusive_resources.add(ResourceType.CACHE)
self.resources.register_exclusive_interest(
exclusive_resources, 'cache-size'
)
# Reserve the resources (with the possible exclusive cache resource)
if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS],
exclusive_resources):
# Update state and launch
self._cache_size_scheduled = False
self._cache_size_running = \
CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE,
......
......@@ -633,6 +633,27 @@ def _get_dir_size(path):
return get_size(path)
# _get_volume_size():
#
# Gets the overall usage and total size of a mounted filesystem in bytes.
#
# Args:
# path (str): The path to check
#
# Returns:
# (int): The total number of bytes on the volume
# (int): The number of available bytes on the volume
#
def _get_volume_size(path):
try:
stat_ = os.statvfs(path)
except OSError as e:
raise UtilError("Failed to retrieve stats on volume for path '{}': {}"
.format(path, e)) from e
return stat_.f_bsize * stat_.f_blocks, stat_.f_bsize * stat_.f_bavail
# _parse_size():
#
# Convert a string representing data size to a number of
......@@ -667,8 +688,7 @@ def _parse_size(size, volume):
if num > 100:
raise UtilError("{}% is not a valid percentage value.".format(num))
stat_ = os.statvfs(volume)
disk_size = stat_.f_blocks * stat_.f_bsize
disk_size, _ = _get_volume_size(volume)
return disk_size * (num / 100)
......
import os
import pytest
from unittest import mock
from buildstream import _yaml
from buildstream._artifactcache import CACHE_SIZE_FILE
from buildstream._exceptions import ErrorDomain
from tests.testutils import cli, create_element_size
......@@ -60,3 +62,29 @@ def test_cache_size_write(cli, tmpdir):
with open(sizefile, "r") as f:
size_data = f.read()
size = int(size_data)
def test_quota_over_1024T(cli, tmpdir):
KiB = 1024
MiB = (KiB * 1024)
GiB = (MiB * 1024)
TiB = (GiB * 1024)
cli.configure({
'cache': {
'quota': 2048 * TiB
}
})
project = tmpdir.join("main")
os.makedirs(str(project))
_yaml.dump({'name': 'main'}, str(project.join("project.conf")))
volume_space_patch = mock.patch(
"buildstream._artifactcache.ArtifactCache._get_cache_volume_size",
autospec=True,
return_value=(1025 * TiB, 1025 * TiB)
)
with volume_space_patch:
result = cli.run(project, args=["build", "file.bst"])
result.assert_main_error(ErrorDomain.ARTIFACT, 'insufficient-storage-for-quota')
......@@ -18,6 +18,7 @@
#
import os
import re
from unittest import mock
import pytest
......@@ -304,20 +305,28 @@ def test_never_delete_required_track(cli, datafiles, tmpdir):
# Ensure that only valid cache quotas make it through the loading
# process.
@pytest.mark.parametrize("quota,success", [
("1", True),
("1K", True),
("50%", True),
("infinity", True),
("0", True),
("-1", False),
("pony", False),
("7K", False),
("70%", False),
("200%", False)
#
# This test virtualizes the condition to assume a storage volume
# has 10K total disk space, and 6K of it is already in use (not
# including any space used by the artifact cache).
#
@pytest.mark.parametrize("quota,err_domain,err_reason", [
# Valid configurations
("1", 'success', None),
("1K", 'success', None),
("50%", 'success', None),
("infinity", 'success', None),
("0", 'success', None),
# Invalid configurations
("-1", ErrorDomain.LOAD, LoadErrorReason.INVALID_DATA),
("pony", ErrorDomain.LOAD, LoadErrorReason.INVALID_DATA),
("200%", ErrorDomain.LOAD, LoadErrorReason.INVALID_DATA),
# Not enough space for these caches
("7K", ErrorDomain.ARTIFACT, 'insufficient-storage-for-quota'),
("70%", ErrorDomain.ARTIFACT, 'insufficient-storage-for-quota')
])
@pytest.mark.datafiles(DATA_DIR)
def test_invalid_cache_quota(cli, datafiles, tmpdir, quota, success):
def test_invalid_cache_quota(cli, datafiles, tmpdir, quota, err_domain, err_reason):
project = os.path.join(datafiles.dirname, datafiles.basename)
os.makedirs(os.path.join(project, 'elements'))
......@@ -342,9 +351,9 @@ def test_invalid_cache_quota(cli, datafiles, tmpdir, quota, success):
total_space = 10000
volume_space_patch = mock.patch(
"buildstream._artifactcache.ArtifactCache._get_volume_space_info_for",
"buildstream._artifactcache.ArtifactCache._get_cache_volume_size",
autospec=True,
return_value=(free_space, total_space),
return_value=(total_space, free_space),
)
cache_size_patch = mock.patch(
......@@ -356,10 +365,10 @@ def test_invalid_cache_quota(cli, datafiles, tmpdir, quota, success):
with volume_space_patch, cache_size_patch:
res = cli.run(project=project, args=['workspace', 'list'])
if success:
if err_domain == 'success':
res.assert_success()
else:
res.assert_main_error(ErrorDomain.LOAD, LoadErrorReason.INVALID_DATA)
res.assert_main_error(err_domain, err_reason)
@pytest.mark.datafiles(DATA_DIR)
......@@ -409,3 +418,65 @@ def test_extract_expiry(cli, datafiles, tmpdir):
assert os.path.isdir(refsdirtarget2)
assert not os.path.exists(refsdirtarget)
# Ensures that when launching BuildStream with a full artifact cache,
# the cache size and cleanup jobs are run before any other jobs.
#
@pytest.mark.datafiles(DATA_DIR)
def test_cleanup_first(cli, datafiles, tmpdir):
project = os.path.join(datafiles.dirname, datafiles.basename)
element_path = 'elements'
cache_location = os.path.join(project, 'cache', 'artifacts', 'ostree')
checkout = os.path.join(project, 'checkout')
cli.configure({
'cache': {
'quota': 10000000,
}
})
# Create an element that uses almost the entire cache (an empty
# ostree cache starts at about ~10KiB, so we need a bit of a
# buffer)
create_element_size('target.bst', project, element_path, [], 8000000)
res = cli.run(project=project, args=['build', 'target.bst'])
res.assert_success()
assert cli.get_element_state(project, 'target.bst') == 'cached'
# Now configure with a smaller quota, create a situation
# where the cache must be cleaned up before building anything else.
#
# Fix the fetchers and builders just to ensure a predictable
# sequence of events (although it does not effect this test)
cli.configure({
'cache': {
'quota': 5000000,
},
'scheduler': {
'fetchers': 1,
'builders': 1
}
})
# Our cache is now more than full, BuildStream
create_element_size('target2.bst', project, element_path, [], 4000000)
res = cli.run(project=project, args=['build', 'target2.bst'])
res.assert_success()
# Find all of the activity (like push, pull, fetch) lines
results = re.findall(r'\[.*\]\[.*\]\[\s*(\S+):.*\]\s*START\s*.*\.log', res.stderr)
# Don't bother checking the order of 'fetch', it is allowed to start
# before or after the initial cache size job, runs in parallel, and does
# not require ResourceType.CACHE.
results.remove('fetch')
print(results)
# Assert the expected sequence of events
assert results == ['size', 'clean', 'build']
# Check that the correct element remains in the cache
assert cli.get_element_state(project, 'target.bst') != 'cached'
assert cli.get_element_state(project, 'target2.bst') == 'cached'
import os
from unittest import mock
from buildstream import _yaml
from ..testutils.runcli import cli
KiB = 1024
MiB = (KiB * 1024)
GiB = (MiB * 1024)
TiB = (GiB * 1024)
def test_parse_size_over_1024T(cli, tmpdir):
cli.configure({
'cache': {
'quota': 2048 * TiB
}
})
project = tmpdir.join("main")
os.makedirs(str(project))
_yaml.dump({'name': 'main'}, str(project.join("project.conf")))
volume_space_patch = mock.patch(
"buildstream._artifactcache.ArtifactCache._get_volume_space_info_for",
autospec=True,
return_value=(1025 * TiB, 1025 * TiB)
)
with volume_space_patch:
result = cli.run(project, args=["build", "file.bst"])
failure_msg = 'Your system does not have enough available space to support the cache quota specified.'
assert failure_msg in result.stderr