Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • willsalmon/buildstream
  • CumHoleZH/buildstream
  • tchaik/buildstream
  • DCotyPortfolio/buildstream
  • jesusoctavioas/buildstream
  • patrickmmartin/buildstream
  • franred/buildstream
  • tintou/buildstream
  • alatiera/buildstream
  • martinblanchard/buildstream
  • neverdie22042524/buildstream
  • Mattlk13/buildstream
  • PServers/buildstream
  • phamnghia610909/buildstream
  • chiaratolentino/buildstream
  • eysz7-x-x/buildstream
  • kerrick1/buildstream
  • matthew-yates/buildstream
  • twofeathers/buildstream
  • mhadjimichael/buildstream
  • pointswaves/buildstream
  • Mr.JackWilson/buildstream
  • Tw3akG33k/buildstream
  • AlexFazakas/buildstream
  • eruidfkiy/buildstream
  • clamotion2/buildstream
  • nanonyme/buildstream
  • wickyjaaa/buildstream
  • nmanchev/buildstream
  • bojorquez.ja/buildstream
  • mostynb/buildstream
  • highpit74/buildstream
  • Demo112/buildstream
  • ba2014sheer/buildstream
  • tonimadrino/buildstream
  • usuario2o/buildstream
  • Angelika123456/buildstream
  • neo355/buildstream
  • corentin-ferlay/buildstream
  • coldtom/buildstream
  • wifitvbox81/buildstream
  • 358253885/buildstream
  • seanborg/buildstream
  • SotK/buildstream
  • DouglasWinship/buildstream
  • karansthr97/buildstream
  • louib/buildstream
  • bwh-ct/buildstream
  • robjh/buildstream
  • we88c0de/buildstream
  • zhengxian5555/buildstream
51 results
Show changes
Commits on Source (48)
Showing
with 203 additions and 119 deletions
...@@ -10,6 +10,12 @@ Major feature additions should be proposed on the ...@@ -10,6 +10,12 @@ Major feature additions should be proposed on the
before being considered for inclusion, we strongly recommend proposing before being considered for inclusion, we strongly recommend proposing
in advance of commencing work. in advance of commencing work.
If you are experiencing an issue with BuildStream or would like to submit a small patch/feature, then
you can open issue `here <https://gitlab.com/BuildStream/buildstream/issues/new?issue%5Bassignee_id%5D=&issue%5Bmilestone_id%5D=>`
For policies on how to submit and issue and how to use our project labels, we recommend that you read the policies guide
`here <https://gitlab.com/BuildStream/nosoftware/alignment/blob/master/BuildStream_policies.md>`
New features must be well documented and tested either in our main New features must be well documented and tested either in our main
test suite if possible, or otherwise in the integration tests. test suite if possible, or otherwise in the integration tests.
......
# Basic toplevel package includes # Basic toplevel package includes
include BuildStream.doap include BuildStream.doap
include COPYING include COPYING
include HACKING.rst include CONTRIBUTING.rst
include MAINTAINERS include MAINTAINERS
include NEWS include NEWS
include README.rst include README.rst
......
...@@ -51,7 +51,7 @@ class ArtifactCacheSpec(namedtuple('ArtifactCacheSpec', 'url push server_cert cl ...@@ -51,7 +51,7 @@ class ArtifactCacheSpec(namedtuple('ArtifactCacheSpec', 'url push server_cert cl
url = _yaml.node_get(spec_node, str, 'url') url = _yaml.node_get(spec_node, str, 'url')
push = _yaml.node_get(spec_node, bool, 'push', default_value=False) push = _yaml.node_get(spec_node, bool, 'push', default_value=False)
if not url: if not url:
provenance = _yaml.node_get_provenance(spec_node) provenance = _yaml.node_get_provenance(spec_node, 'url')
raise LoadError(LoadErrorReason.INVALID_DATA, raise LoadError(LoadErrorReason.INVALID_DATA,
"{}: empty artifact cache URL".format(provenance)) "{}: empty artifact cache URL".format(provenance))
...@@ -67,6 +67,16 @@ class ArtifactCacheSpec(namedtuple('ArtifactCacheSpec', 'url push server_cert cl ...@@ -67,6 +67,16 @@ class ArtifactCacheSpec(namedtuple('ArtifactCacheSpec', 'url push server_cert cl
if client_cert and basedir: if client_cert and basedir:
client_cert = os.path.join(basedir, client_cert) client_cert = os.path.join(basedir, client_cert)
if client_key and not client_cert:
provenance = _yaml.node_get_provenance(spec_node, 'client-key')
raise LoadError(LoadErrorReason.INVALID_DATA,
"{}: 'client-key' was specified without 'client-cert'".format(provenance))
if client_cert and not client_key:
provenance = _yaml.node_get_provenance(spec_node, 'client-cert')
raise LoadError(LoadErrorReason.INVALID_DATA,
"{}: 'client-cert' was specified without 'client-key'".format(provenance))
return ArtifactCacheSpec(url, push, server_cert, client_key, client_cert) return ArtifactCacheSpec(url, push, server_cert, client_key, client_cert)
...@@ -87,10 +97,11 @@ class ArtifactCache(): ...@@ -87,10 +97,11 @@ class ArtifactCache():
self.global_remote_specs = [] self.global_remote_specs = []
self.project_remote_specs = {} self.project_remote_specs = {}
self._required_artifacts = set() # The artifacts required for this session self._required_elements = set() # The elements required for this session
self._cache_size = None # The current cache size, sometimes it's an estimate self._cache_size = None # The current cache size, sometimes it's an estimate
self._cache_quota = None # The cache quota self._cache_quota = None # The cache quota
self._cache_lower_threshold = None # The target cache size for a cleanup self._cache_lower_threshold = None # The target cache size for a cleanup
self._remotes_setup = False # Check to prevent double-setup of remotes
os.makedirs(self.extractdir, exist_ok=True) os.makedirs(self.extractdir, exist_ok=True)
os.makedirs(self.tmpdir, exist_ok=True) os.makedirs(self.tmpdir, exist_ok=True)
...@@ -143,6 +154,10 @@ class ArtifactCache(): ...@@ -143,6 +154,10 @@ class ArtifactCache():
# #
def setup_remotes(self, *, use_config=False, remote_url=None): def setup_remotes(self, *, use_config=False, remote_url=None):
# Ensure we do not double-initialise since this can be expensive
assert(not self._remotes_setup)
self._remotes_setup = True
# Initialize remote artifact caches. We allow the commandline to override # Initialize remote artifact caches. We allow the commandline to override
# the user config in some cases (for example `bst push --remote=...`). # the user config in some cases (for example `bst push --remote=...`).
has_remote_caches = False has_remote_caches = False
...@@ -189,33 +204,40 @@ class ArtifactCache(): ...@@ -189,33 +204,40 @@ class ArtifactCache():
(str(provenance))) (str(provenance)))
return cache_specs return cache_specs
# append_required_artifacts(): # mark_required_elements():
# #
# Append to the list of elements whose artifacts are required for # Mark elements whose artifacts are required for the current run.
# the current run. Artifacts whose elements are in this list will #
# be locked by the artifact cache and not touched for the duration # Artifacts whose elements are in this list will be locked by the artifact
# of the current pipeline. # cache and not touched for the duration of the current pipeline.
# #
# Args: # Args:
# elements (iterable): A set of elements to mark as required # elements (iterable): A set of elements to mark as required
# #
def append_required_artifacts(self, elements): def mark_required_elements(self, elements):
# We lock both strong and weak keys - deleting one but not the
# other won't save space in most cases anyway, but would be a # We risk calling this function with a generator, so we
# user inconvenience. # better consume it first.
#
elements = list(elements)
# Mark the elements as required. We cannot know that we know the
# cache keys yet, so we only check that later when deleting.
#
self._required_elements.update(elements)
# For the cache keys which were resolved so far, we bump
# the atime of them.
#
# This is just in case we have concurrent instances of
# BuildStream running with the same artifact cache, it will
# reduce the likelyhood of one instance deleting artifacts
# which are required by the other.
for element in elements: for element in elements:
strong_key = element._get_cache_key(strength=_KeyStrength.STRONG) strong_key = element._get_cache_key(strength=_KeyStrength.STRONG)
weak_key = element._get_cache_key(strength=_KeyStrength.WEAK) weak_key = element._get_cache_key(strength=_KeyStrength.WEAK)
for key in (strong_key, weak_key): for key in (strong_key, weak_key):
if key and key not in self._required_artifacts: if key:
self._required_artifacts.add(key)
# We also update the usage times of any artifacts
# we will be using, which helps preventing a
# buildstream process that runs in parallel with
# this one from removing artifacts in-use.
try: try:
self.update_atime(key) self.update_atime(key)
except ArtifactError: except ArtifactError:
...@@ -231,6 +253,18 @@ class ArtifactCache(): ...@@ -231,6 +253,18 @@ class ArtifactCache():
def clean(self): def clean(self):
artifacts = self.list_artifacts() artifacts = self.list_artifacts()
# Build a set of the cache keys which are required
# based on the required elements at cleanup time
#
# We lock both strong and weak keys - deleting one but not the
# other won't save space, but would be a user inconvenience.
required_artifacts = set()
for element in self._required_elements:
required_artifacts.update([
element._get_cache_key(strength=_KeyStrength.STRONG),
element._get_cache_key(strength=_KeyStrength.WEAK)
])
# Do a real computation of the cache size once, just in case # Do a real computation of the cache size once, just in case
self.compute_cache_size() self.compute_cache_size()
...@@ -248,7 +282,7 @@ class ArtifactCache(): ...@@ -248,7 +282,7 @@ class ArtifactCache():
"Please increase the cache-quota in {}." "Please increase the cache-quota in {}."
.format(self.context.config_origin or default_conf)) .format(self.context.config_origin or default_conf))
if self.get_quota_exceeded(): if self.has_quota_exceeded():
raise ArtifactError("Cache too full. Aborting.", raise ArtifactError("Cache too full. Aborting.",
detail=detail, detail=detail,
reason="cache-too-full") reason="cache-too-full")
...@@ -256,7 +290,7 @@ class ArtifactCache(): ...@@ -256,7 +290,7 @@ class ArtifactCache():
break break
key = to_remove.rpartition('/')[2] key = to_remove.rpartition('/')[2]
if key not in self._required_artifacts: if key not in required_artifacts:
# Remove the actual artifact, if it's not required. # Remove the actual artifact, if it's not required.
size = self.remove(to_remove) size = self.remove(to_remove)
...@@ -335,14 +369,14 @@ class ArtifactCache(): ...@@ -335,14 +369,14 @@ class ArtifactCache():
self._cache_size = cache_size self._cache_size = cache_size
self._write_cache_size(self._cache_size) self._write_cache_size(self._cache_size)
# get_quota_exceeded() # has_quota_exceeded()
# #
# Checks if the current artifact cache size exceeds the quota. # Checks if the current artifact cache size exceeds the quota.
# #
# Returns: # Returns:
# (bool): True of the quota is exceeded # (bool): True of the quota is exceeded
# #
def get_quota_exceeded(self): def has_quota_exceeded(self):
return self.get_cache_size() > self._cache_quota return self.get_cache_size() > self._cache_quota
################################################ ################################################
......
...@@ -117,7 +117,7 @@ class CASCache(ArtifactCache): ...@@ -117,7 +117,7 @@ class CASCache(ArtifactCache):
def commit(self, element, content, keys): def commit(self, element, content, keys):
refs = [self.get_artifact_fullname(element, key) for key in keys] refs = [self.get_artifact_fullname(element, key) for key in keys]
tree = self._create_tree(content) tree = self._commit_directory(content)
for ref in refs: for ref in refs:
self.set_ref(ref, tree) self.set_ref(ref, tree)
...@@ -665,7 +665,21 @@ class CASCache(ArtifactCache): ...@@ -665,7 +665,21 @@ class CASCache(ArtifactCache):
def _refpath(self, ref): def _refpath(self, ref):
return os.path.join(self.casdir, 'refs', 'heads', ref) return os.path.join(self.casdir, 'refs', 'heads', ref)
def _create_tree(self, path, *, digest=None): # _commit_directory():
#
# Adds local directory to content addressable store.
#
# Adds files, symbolic links and recursively other directories in
# a local directory to the content addressable store.
#
# Args:
# path (str): Path to the directory to add.
# dir_digest (Digest): An optional Digest object to use.
#
# Returns:
# (Digest): Digest object for the directory added.
#
def _commit_directory(self, path, *, dir_digest=None):
directory = remote_execution_pb2.Directory() directory = remote_execution_pb2.Directory()
for name in sorted(os.listdir(path)): for name in sorted(os.listdir(path)):
...@@ -674,7 +688,7 @@ class CASCache(ArtifactCache): ...@@ -674,7 +688,7 @@ class CASCache(ArtifactCache):
if stat.S_ISDIR(mode): if stat.S_ISDIR(mode):
dirnode = directory.directories.add() dirnode = directory.directories.add()
dirnode.name = name dirnode.name = name
self._create_tree(full_path, digest=dirnode.digest) self._commit_directory(full_path, dir_digest=dirnode.digest)
elif stat.S_ISREG(mode): elif stat.S_ISREG(mode):
filenode = directory.files.add() filenode = directory.files.add()
filenode.name = name filenode.name = name
...@@ -684,10 +698,14 @@ class CASCache(ArtifactCache): ...@@ -684,10 +698,14 @@ class CASCache(ArtifactCache):
symlinknode = directory.symlinks.add() symlinknode = directory.symlinks.add()
symlinknode.name = name symlinknode.name = name
symlinknode.target = os.readlink(full_path) symlinknode.target = os.readlink(full_path)
elif stat.S_ISSOCK(mode):
# The process serving the socket can't be cached anyway
pass
else: else:
raise ArtifactError("Unsupported file type for {}".format(full_path)) raise ArtifactError("Unsupported file type for {}".format(full_path))
return self.add_object(digest=digest, buffer=directory.SerializeToString()) return self.add_object(digest=dir_digest,
buffer=directory.SerializeToString())
def _get_subdir(self, tree, subdir): def _get_subdir(self, tree, subdir):
head, name = os.path.split(subdir) head, name = os.path.split(subdir)
...@@ -830,14 +848,26 @@ class CASCache(ArtifactCache): ...@@ -830,14 +848,26 @@ class CASCache(ArtifactCache):
assert digest.size_bytes == os.fstat(stream.fileno()).st_size assert digest.size_bytes == os.fstat(stream.fileno()).st_size
def _fetch_directory(self, remote, tree): # _fetch_directory():
objpath = self.objpath(tree) #
# Fetches remote directory and adds it to content addressable store.
#
# Fetches files, symbolic links and recursively other directories in
# the remote directory and adds them to the content addressable
# store.
#
# Args:
# remote (Remote): The remote to use.
# dir_digest (Digest): Digest object for the directory to fetch.
#
def _fetch_directory(self, remote, dir_digest):
objpath = self.objpath(dir_digest)
if os.path.exists(objpath): if os.path.exists(objpath):
# already in local cache # already in local cache
return return
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out: with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
self._fetch_blob(remote, tree, out) self._fetch_blob(remote, dir_digest, out)
directory = remote_execution_pb2.Directory() directory = remote_execution_pb2.Directory()
...@@ -845,7 +875,7 @@ class CASCache(ArtifactCache): ...@@ -845,7 +875,7 @@ class CASCache(ArtifactCache):
directory.ParseFromString(f.read()) directory.ParseFromString(f.read())
for filenode in directory.files: for filenode in directory.files:
fileobjpath = self.objpath(tree) fileobjpath = self.objpath(filenode.digest)
if os.path.exists(fileobjpath): if os.path.exists(fileobjpath):
# already in local cache # already in local cache
continue continue
...@@ -859,10 +889,11 @@ class CASCache(ArtifactCache): ...@@ -859,10 +889,11 @@ class CASCache(ArtifactCache):
for dirnode in directory.directories: for dirnode in directory.directories:
self._fetch_directory(remote, dirnode.digest) self._fetch_directory(remote, dirnode.digest)
# place directory blob only in final location when we've downloaded # Place directory blob only in final location when we've
# all referenced blobs to avoid dangling references in the repository # downloaded all referenced blobs to avoid dangling
# references in the repository.
digest = self.add_object(path=out.name) digest = self.add_object(path=out.name)
assert digest.hash == tree.hash assert digest.hash == dir_digest.hash
def _fetch_tree(self, remote, digest): def _fetch_tree(self, remote, digest):
# download but do not store the Tree object # download but do not store the Tree object
......
...@@ -119,7 +119,6 @@ class Context(): ...@@ -119,7 +119,6 @@ class Context():
self._log_handle = None self._log_handle = None
self._log_filename = None self._log_filename = None
self.config_cache_quota = 'infinity' self.config_cache_quota = 'infinity'
self.artifactdir_volume = None
# load() # load()
# #
......
# #
# Copyright (c) 2014 by Armin Ronacher.
# Copyright (C) 2016 Codethink Limited # Copyright (C) 2016 Codethink Limited
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
...@@ -14,8 +15,22 @@ ...@@ -14,8 +15,22 @@
# You should have received a copy of the GNU Lesser General Public # You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>. # License along with this library. If not, see <http://www.gnu.org/licenses/>.
# #
# This module was forked from the python click library. # This module was forked from the python click library, Included
# original copyright notice from the Click library and following disclaimer
# as per their LICENSE requirements.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
import collections import collections
import copy import copy
import os import os
......
...@@ -42,9 +42,10 @@ from .mount import Mount ...@@ -42,9 +42,10 @@ from .mount import Mount
# #
class SafeHardlinks(Mount): class SafeHardlinks(Mount):
def __init__(self, directory, tempdir): def __init__(self, directory, tempdir, fuse_mount_options={}):
self.directory = directory self.directory = directory
self.tempdir = tempdir self.tempdir = tempdir
super().__init__(fuse_mount_options=fuse_mount_options)
def create_operations(self): def create_operations(self):
return SafeHardlinkOps(self.directory, self.tempdir) return SafeHardlinkOps(self.directory, self.tempdir)
...@@ -121,7 +122,7 @@ class SafeHardlinkOps(Operations): ...@@ -121,7 +122,7 @@ class SafeHardlinkOps(Operations):
st = os.lstat(full_path) st = os.lstat(full_path)
return dict((key, getattr(st, key)) for key in ( return dict((key, getattr(st, key)) for key in (
'st_atime', 'st_ctime', 'st_gid', 'st_mode', 'st_atime', 'st_ctime', 'st_gid', 'st_mode',
'st_mtime', 'st_nlink', 'st_size', 'st_uid')) 'st_mtime', 'st_nlink', 'st_size', 'st_uid', 'st_rdev'))
def readdir(self, path, fh): def readdir(self, path, fh):
full_path = self._full_path(path) full_path = self._full_path(path)
......
...@@ -87,6 +87,9 @@ class Mount(): ...@@ -87,6 +87,9 @@ class Mount():
# User Facing API # # User Facing API #
################################################ ################################################
def __init__(self, fuse_mount_options={}):
self._fuse_mount_options = fuse_mount_options
# mount(): # mount():
# #
# User facing API for mounting a fuse subclass implementation # User facing API for mounting a fuse subclass implementation
...@@ -184,7 +187,8 @@ class Mount(): ...@@ -184,7 +187,8 @@ class Mount():
# Run fuse in foreground in this child process, internally libfuse # Run fuse in foreground in this child process, internally libfuse
# will handle SIGTERM and gracefully exit it's own little main loop. # will handle SIGTERM and gracefully exit it's own little main loop.
# #
FUSE(self.__operations, self.__mountpoint, nothreads=True, foreground=True, nonempty=True) FUSE(self.__operations, self.__mountpoint, nothreads=True, foreground=True, nonempty=True,
**self._fuse_mount_options)
# Explicit 0 exit code, if the operations crashed for some reason, the exit # Explicit 0 exit code, if the operations crashed for some reason, the exit
# code will not be 0, and we want to know about it. # code will not be 0, and we want to know about it.
......
...@@ -385,7 +385,10 @@ class Project(): ...@@ -385,7 +385,10 @@ class Project():
self._project_conf = _yaml.load(projectfile) self._project_conf = _yaml.load(projectfile)
except LoadError as e: except LoadError as e:
# Raise a more specific error here # Raise a more specific error here
raise LoadError(LoadErrorReason.MISSING_PROJECT_CONF, str(e)) if e.reason == LoadErrorReason.MISSING_FILE:
raise LoadError(LoadErrorReason.MISSING_PROJECT_CONF, str(e)) from e
else:
raise
pre_config_node = _yaml.node_copy(self._default_config_node) pre_config_node = _yaml.node_copy(self._default_config_node)
_yaml.composite(pre_config_node, self._project_conf) _yaml.composite(pre_config_node, self._project_conf)
...@@ -598,7 +601,10 @@ class Project(): ...@@ -598,7 +601,10 @@ class Project():
# any conditionals specified for project option declarations, # any conditionals specified for project option declarations,
# or conditionally specifying the project name; will be ignored. # or conditionally specifying the project name; will be ignored.
# #
# Don't forget to also resolve options in the element and source overrides.
output.options.process_node(config) output.options.process_node(config)
output.options.process_node(output.element_overrides)
output.options.process_node(output.source_overrides)
# Load base variables # Load base variables
output.base_variables = _yaml.node_get(config, Mapping, 'variables') output.base_variables = _yaml.node_get(config, Mapping, 'variables')
......
#
# Copyright (C) 2018 Codethink Limited
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Tristan Maat <tristan.maat@codethink.co.uk>
from .elementjob import ElementJob from .elementjob import ElementJob
from .cachesizejob import CacheSizeJob from .cachesizejob import CacheSizeJob
from .cleanupjob import CleanupJob from .cleanupjob import CleanupJob
...@@ -21,9 +21,8 @@ from ..._platform import Platform ...@@ -21,9 +21,8 @@ from ..._platform import Platform
class CleanupJob(Job): class CleanupJob(Job):
def __init__(self, *args, complete_cb, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self._complete_cb = complete_cb
platform = Platform.get_platform() platform = Platform.get_platform()
self._artifacts = platform.artifactcache self._artifacts = platform.artifactcache
...@@ -34,9 +33,3 @@ class CleanupJob(Job): ...@@ -34,9 +33,3 @@ class CleanupJob(Job):
def parent_complete(self, success, result): def parent_complete(self, success, result):
if success: if success:
self._artifacts.set_cache_size(result) self._artifacts.set_cache_size(result)
if self._complete_cb:
self._complete_cb()
def child_process_data(self):
return {}
...@@ -101,7 +101,7 @@ class BuildQueue(Queue): ...@@ -101,7 +101,7 @@ class BuildQueue(Queue):
# If the estimated size outgrows the quota, ask the scheduler # If the estimated size outgrows the quota, ask the scheduler
# to queue a job to actually check the real cache size. # to queue a job to actually check the real cache size.
# #
if artifacts.get_quota_exceeded(): if artifacts.has_quota_exceeded():
self._scheduler.check_cache_size() self._scheduler.check_cache_size()
def done(self, job, element, result, success): def done(self, job, element, result, success):
......
...@@ -351,14 +351,13 @@ class Scheduler(): ...@@ -351,14 +351,13 @@ class Scheduler():
platform = Platform.get_platform() platform = Platform.get_platform()
artifacts = platform.artifactcache artifacts = platform.artifactcache
if not artifacts.get_quota_exceeded(): if not artifacts.has_quota_exceeded():
return return
job = CleanupJob(self, 'cleanup', 'cleanup/cleanup', job = CleanupJob(self, 'cleanup', 'cleanup/cleanup',
resources=[ResourceType.CACHE, resources=[ResourceType.CACHE,
ResourceType.PROCESS], ResourceType.PROCESS],
exclusive_resources=[ResourceType.CACHE], exclusive_resources=[ResourceType.CACHE])
complete_cb=None)
self.schedule_jobs([job]) self.schedule_jobs([job])
# _suspend_jobs() # _suspend_jobs()
......
...@@ -938,13 +938,10 @@ class Stream(): ...@@ -938,13 +938,10 @@ class Stream():
# Set the "required" artifacts that should not be removed # Set the "required" artifacts that should not be removed
# while this pipeline is active # while this pipeline is active
# #
# FIXME: The set of required artifacts is only really needed # It must include all the artifacts which are required by the
# for build and pull tasks. # final product. Note that this is a superset of the build plan.
# #
# It must include all the artifacts which are required by the self._artifacts.mark_required_elements(self._pipeline.dependencies(elements, Scope.ALL))
# final product. Note that this is a superset of the build plan.
#
self._artifacts.append_required_artifacts((e for e in self._pipeline.dependencies(elements, Scope.ALL)))
if selection == PipelineSelection.PLAN and dynamic_plan: if selection == PipelineSelection.PLAN and dynamic_plan:
# We use a dynamic build plan, only request artifacts of top-level targets, # We use a dynamic build plan, only request artifacts of top-level targets,
......
...@@ -33,4 +33,4 @@ BST_FORMAT_VERSION = 16 ...@@ -33,4 +33,4 @@ BST_FORMAT_VERSION = 16
# or if buildstream was changed in a way which can cause # or if buildstream was changed in a way which can cause
# the same cache key to produce something that is no longer # the same cache key to produce something that is no longer
# the same. # the same.
BST_CORE_ARTIFACT_VERSION = 5 BST_CORE_ARTIFACT_VERSION = 6
...@@ -200,7 +200,6 @@ class Element(Plugin): ...@@ -200,7 +200,6 @@ class Element(Plugin):
self.__strict_cache_key = None # Our cached cache key for strict builds self.__strict_cache_key = None # Our cached cache key for strict builds
self.__artifacts = artifacts # Artifact cache self.__artifacts = artifacts # Artifact cache
self.__consistency = Consistency.INCONSISTENT # Cached overall consistency state self.__consistency = Consistency.INCONSISTENT # Cached overall consistency state
self.__cached = None # Whether we have a cached artifact
self.__strong_cached = None # Whether we have a cached artifact self.__strong_cached = None # Whether we have a cached artifact
self.__weak_cached = None # Whether we have a cached artifact self.__weak_cached = None # Whether we have a cached artifact
self.__assemble_scheduled = False # Element is scheduled to be assembled self.__assemble_scheduled = False # Element is scheduled to be assembled
...@@ -1126,8 +1125,6 @@ class Element(Plugin): ...@@ -1126,8 +1125,6 @@ class Element(Plugin):
# Query caches now that the weak and strict cache keys are available # Query caches now that the weak and strict cache keys are available
key_for_cache_lookup = self.__strict_cache_key if context.get_strict() else self.__weak_cache_key key_for_cache_lookup = self.__strict_cache_key if context.get_strict() else self.__weak_cache_key
if not self.__cached:
self.__cached = self.__artifacts.contains(self, key_for_cache_lookup)
if not self.__strong_cached: if not self.__strong_cached:
self.__strong_cached = self.__artifacts.contains(self, self.__strict_cache_key) self.__strong_cached = self.__artifacts.contains(self, self.__strict_cache_key)
if key_for_cache_lookup == self.__weak_cache_key: if key_for_cache_lookup == self.__weak_cache_key:
...@@ -1489,15 +1486,20 @@ class Element(Plugin): ...@@ -1489,15 +1486,20 @@ class Element(Plugin):
workspace.clear_running_files() workspace.clear_running_files()
self._get_context().get_workspaces().save_config() self._get_context().get_workspaces().save_config()
# We also need to update the required artifacts, since # This element will have already been marked as
# workspaced dependencies do not have a fixed cache key # required, but we bump the atime again, in case
# when the build starts. # we did not know the cache key until now.
# #
# This does *not* cause a race condition, because # FIXME: This is not exactly correct, we should be
# _assemble_done is called before a cleanup job may be # doing this at the time which we have discovered
# launched. # a new cache key, this just happens to be the
# last place where that can happen.
# #
self.__artifacts.append_required_artifacts([self]) # Ultimately, we should be refactoring
# Element._update_state() such that we know
# when a cache key is actually discovered.
#
self.__artifacts.mark_required_elements([self])
# _assemble(): # _assemble():
# #
...@@ -2079,7 +2081,7 @@ class Element(Plugin): ...@@ -2079,7 +2081,7 @@ class Element(Plugin):
def __is_cached(self, keystrength): def __is_cached(self, keystrength):
if keystrength is None: if keystrength is None:
return self.__cached keystrength = _KeyStrength.STRONG if self._get_context().get_strict() else _KeyStrength.WEAK
return self.__strong_cached if keystrength == _KeyStrength.STRONG else self.__weak_cached return self.__strong_cached if keystrength == _KeyStrength.STRONG else self.__weak_cached
......
...@@ -30,7 +30,7 @@ from .._fuse import SafeHardlinks ...@@ -30,7 +30,7 @@ from .._fuse import SafeHardlinks
# Helper data object representing a single mount point in the mount map # Helper data object representing a single mount point in the mount map
# #
class Mount(): class Mount():
def __init__(self, sandbox, mount_point, safe_hardlinks): def __init__(self, sandbox, mount_point, safe_hardlinks, fuse_mount_options={}):
scratch_directory = sandbox._get_scratch_directory() scratch_directory = sandbox._get_scratch_directory()
# Getting _get_underlying_directory() here is acceptable as # Getting _get_underlying_directory() here is acceptable as
# we're part of the sandbox code. This will fail if our # we're part of the sandbox code. This will fail if our
...@@ -39,6 +39,7 @@ class Mount(): ...@@ -39,6 +39,7 @@ class Mount():
self.mount_point = mount_point self.mount_point = mount_point
self.safe_hardlinks = safe_hardlinks self.safe_hardlinks = safe_hardlinks
self._fuse_mount_options = fuse_mount_options
# FIXME: When the criteria for mounting something and it's parent # FIXME: When the criteria for mounting something and it's parent
# mount is identical, then there is no need to mount an additional # mount is identical, then there is no need to mount an additional
...@@ -82,7 +83,7 @@ class Mount(): ...@@ -82,7 +83,7 @@ class Mount():
@contextmanager @contextmanager
def mounted(self, sandbox): def mounted(self, sandbox):
if self.safe_hardlinks: if self.safe_hardlinks:
mount = SafeHardlinks(self.mount_origin, self.mount_tempdir) mount = SafeHardlinks(self.mount_origin, self.mount_tempdir, self._fuse_mount_options)
with mount.mounted(self.mount_source): with mount.mounted(self.mount_source):
yield yield
else: else:
...@@ -100,12 +101,12 @@ class Mount(): ...@@ -100,12 +101,12 @@ class Mount():
# #
class MountMap(): class MountMap():
def __init__(self, sandbox, root_readonly): def __init__(self, sandbox, root_readonly, fuse_mount_options={}):
# We will be doing the mounts in the order in which they were declared. # We will be doing the mounts in the order in which they were declared.
self.mounts = OrderedDict() self.mounts = OrderedDict()
# We want safe hardlinks on rootfs whenever root is not readonly # We want safe hardlinks on rootfs whenever root is not readonly
self.mounts['/'] = Mount(sandbox, '/', not root_readonly) self.mounts['/'] = Mount(sandbox, '/', not root_readonly, fuse_mount_options)
for mark in sandbox._get_marked_directories(): for mark in sandbox._get_marked_directories():
directory = mark['directory'] directory = mark['directory']
...@@ -113,7 +114,7 @@ class MountMap(): ...@@ -113,7 +114,7 @@ class MountMap():
# We want safe hardlinks for any non-root directory where # We want safe hardlinks for any non-root directory where
# artifacts will be staged to # artifacts will be staged to
self.mounts[directory] = Mount(sandbox, directory, artifact) self.mounts[directory] = Mount(sandbox, directory, artifact, fuse_mount_options)
# get_mount_source() # get_mount_source()
# #
......
...@@ -63,20 +63,8 @@ class SandboxBwrap(Sandbox): ...@@ -63,20 +63,8 @@ class SandboxBwrap(Sandbox):
# Fallback to the sandbox default settings for # Fallback to the sandbox default settings for
# the cwd and env. # the cwd and env.
# #
if cwd is None: cwd = self._get_work_directory(cwd=cwd)
cwd = self._get_work_directory() env = self._get_environment(cwd=cwd, env=env)
if env is None:
env = self._get_environment()
if cwd is None:
cwd = '/'
# Naive getcwd implementations can break when bind-mounts to different
# paths on the same filesystem are present. Letting the command know
# what directory it is in makes it unnecessary to call the faulty
# getcwd.
env['PWD'] = cwd
if not self._has_command(command[0], env): if not self._has_command(command[0], env):
raise SandboxError("Staged artifacts do not provide command " raise SandboxError("Staged artifacts do not provide command "
......
...@@ -35,6 +35,9 @@ from . import Sandbox, SandboxFlags ...@@ -35,6 +35,9 @@ from . import Sandbox, SandboxFlags
class SandboxChroot(Sandbox): class SandboxChroot(Sandbox):
_FUSE_MOUNT_OPTIONS = {'dev': True}
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
...@@ -48,21 +51,11 @@ class SandboxChroot(Sandbox): ...@@ -48,21 +51,11 @@ class SandboxChroot(Sandbox):
def run(self, command, flags, *, cwd=None, env=None): def run(self, command, flags, *, cwd=None, env=None):
# Default settings # Fallback to the sandbox default settings for
if cwd is None: # the cwd and env.
cwd = self._get_work_directory() #
cwd = self._get_work_directory(cwd=cwd)
if cwd is None: env = self._get_environment(cwd=cwd, env=env)
cwd = '/'
if env is None:
env = self._get_environment()
# Naive getcwd implementations can break when bind-mounts to different
# paths on the same filesystem are present. Letting the command know
# what directory it is in makes it unnecessary to call the faulty
# getcwd.
env['PWD'] = cwd
if not self._has_command(command[0], env): if not self._has_command(command[0], env):
raise SandboxError("Staged artifacts do not provide command " raise SandboxError("Staged artifacts do not provide command "
...@@ -77,7 +70,8 @@ class SandboxChroot(Sandbox): ...@@ -77,7 +70,8 @@ class SandboxChroot(Sandbox):
# Create the mount map, this will tell us where # Create the mount map, this will tell us where
# each mount point needs to be mounted from and to # each mount point needs to be mounted from and to
self.mount_map = MountMap(self, flags & SandboxFlags.ROOT_READ_ONLY) self.mount_map = MountMap(self, flags & SandboxFlags.ROOT_READ_ONLY,
self._FUSE_MOUNT_OPTIONS)
root_mount_source = self.mount_map.get_mount_source('/') root_mount_source = self.mount_map.get_mount_source('/')
# Create a sysroot and run the command inside it # Create a sysroot and run the command inside it
......
...@@ -27,7 +27,7 @@ from . import Sandbox ...@@ -27,7 +27,7 @@ from . import Sandbox
from ..storage._filebaseddirectory import FileBasedDirectory from ..storage._filebaseddirectory import FileBasedDirectory
from ..storage._casbaseddirectory import CasBasedDirectory from ..storage._casbaseddirectory import CasBasedDirectory
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from .._artifactcache.cascache import CASCache from .._platform import Platform
class SandboxError(Exception): class SandboxError(Exception):
...@@ -43,7 +43,6 @@ class SandboxRemote(Sandbox): ...@@ -43,7 +43,6 @@ class SandboxRemote(Sandbox):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.cascache = None
url = urlparse(kwargs['server_url']) url = urlparse(kwargs['server_url'])
if not url.scheme or not url.hostname or not url.port: if not url.scheme or not url.hostname or not url.port:
...@@ -56,12 +55,6 @@ class SandboxRemote(Sandbox): ...@@ -56,12 +55,6 @@ class SandboxRemote(Sandbox):
self.server_url = '{}:{}'.format(url.hostname, url.port) self.server_url = '{}:{}'.format(url.hostname, url.port)
def _get_cascache(self):
if self.cascache is None:
self.cascache = CASCache(self._get_context())
self.cascache.setup_remotes(use_config=True)
return self.cascache
def run_remote_command(self, command, input_root_digest, working_directory, environment): def run_remote_command(self, command, input_root_digest, working_directory, environment):
# Sends an execution request to the remote execution server. # Sends an execution request to the remote execution server.
# #
...@@ -78,8 +71,8 @@ class SandboxRemote(Sandbox): ...@@ -78,8 +71,8 @@ class SandboxRemote(Sandbox):
output_files=[], output_files=[],
output_directories=[self._output_directory], output_directories=[self._output_directory],
platform=None) platform=None)
platform = Platform.get_platform()
cascache = self._get_cascache() cascache = platform.artifactcache
# Upload the Command message to the remote CAS server # Upload the Command message to the remote CAS server
command_digest = cascache.push_message(self._get_project(), remote_command) command_digest = cascache.push_message(self._get_project(), remote_command)
if not command_digest or not cascache.verify_digest_pushed(self._get_project(), command_digest): if not command_digest or not cascache.verify_digest_pushed(self._get_project(), command_digest):
...@@ -141,7 +134,8 @@ class SandboxRemote(Sandbox): ...@@ -141,7 +134,8 @@ class SandboxRemote(Sandbox):
if tree_digest is None or not tree_digest.hash: if tree_digest is None or not tree_digest.hash:
raise SandboxError("Output directory structure had no digest attached.") raise SandboxError("Output directory structure had no digest attached.")
cascache = self._get_cascache() platform = Platform.get_platform()
cascache = platform.artifactcache
# Now do a pull to ensure we have the necessary parts. # Now do a pull to ensure we have the necessary parts.
dir_digest = cascache.pull_tree(self._get_project(), tree_digest) dir_digest = cascache.pull_tree(self._get_project(), tree_digest)
if dir_digest is None or not dir_digest.hash or not dir_digest.size_bytes: if dir_digest is None or not dir_digest.hash or not dir_digest.size_bytes:
...@@ -176,7 +170,8 @@ class SandboxRemote(Sandbox): ...@@ -176,7 +170,8 @@ class SandboxRemote(Sandbox):
upload_vdir.recalculate_hash() upload_vdir.recalculate_hash()
cascache = self._get_cascache() platform = Platform.get_platform()
cascache = platform.artifactcache
# Now, push that key (without necessarily needing a ref) to the remote. # Now, push that key (without necessarily needing a ref) to the remote.
vdir_digest = cascache.push_directory(self._get_project(), upload_vdir) vdir_digest = cascache.push_directory(self._get_project(), upload_vdir)
if not vdir_digest or not cascache.verify_digest_pushed(self._get_project(), vdir_digest): if not vdir_digest or not cascache.verify_digest_pushed(self._get_project(), vdir_digest):
......