Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • willsalmon/buildstream
  • CumHoleZH/buildstream
  • tchaik/buildstream
  • DCotyPortfolio/buildstream
  • jesusoctavioas/buildstream
  • patrickmmartin/buildstream
  • franred/buildstream
  • tintou/buildstream
  • alatiera/buildstream
  • martinblanchard/buildstream
  • neverdie22042524/buildstream
  • Mattlk13/buildstream
  • PServers/buildstream
  • phamnghia610909/buildstream
  • chiaratolentino/buildstream
  • eysz7-x-x/buildstream
  • kerrick1/buildstream
  • matthew-yates/buildstream
  • twofeathers/buildstream
  • mhadjimichael/buildstream
  • pointswaves/buildstream
  • Mr.JackWilson/buildstream
  • Tw3akG33k/buildstream
  • AlexFazakas/buildstream
  • eruidfkiy/buildstream
  • clamotion2/buildstream
  • nanonyme/buildstream
  • wickyjaaa/buildstream
  • nmanchev/buildstream
  • bojorquez.ja/buildstream
  • mostynb/buildstream
  • highpit74/buildstream
  • Demo112/buildstream
  • ba2014sheer/buildstream
  • tonimadrino/buildstream
  • usuario2o/buildstream
  • Angelika123456/buildstream
  • neo355/buildstream
  • corentin-ferlay/buildstream
  • coldtom/buildstream
  • wifitvbox81/buildstream
  • 358253885/buildstream
  • seanborg/buildstream
  • SotK/buildstream
  • DouglasWinship/buildstream
  • karansthr97/buildstream
  • louib/buildstream
  • bwh-ct/buildstream
  • robjh/buildstream
  • we88c0de/buildstream
  • zhengxian5555/buildstream
51 results
Show changes
Commits on Source (14)
Showing
with 654 additions and 446 deletions
......@@ -17,17 +17,22 @@
# Authors:
# Tristan Maat <tristan.maat@codethink.co.uk>
import multiprocessing
import os
import signal
import string
from collections import namedtuple
from collections.abc import Mapping
from ..types import _KeyStrength
from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason
from .._exceptions import ArtifactError, CASError, LoadError, LoadErrorReason
from .._message import Message, MessageType
from .. import _signals
from .. import utils
from .. import _yaml
from .cascache import CASCache, CASRemote
CACHE_SIZE_FILE = "cache_size"
......@@ -93,7 +98,8 @@ class ArtifactCache():
def __init__(self, context):
self.context = context
self.extractdir = os.path.join(context.artifactdir, 'extract')
self.tmpdir = os.path.join(context.artifactdir, 'tmp')
self.cas = CASCache(context.artifactdir)
self.global_remote_specs = []
self.project_remote_specs = {}
......@@ -104,12 +110,15 @@ class ArtifactCache():
self._cache_lower_threshold = None # The target cache size for a cleanup
self._remotes_setup = False # Check to prevent double-setup of remotes
# Per-project list of _CASRemote instances.
self._remotes = {}
self._has_fetch_remotes = False
self._has_push_remotes = False
os.makedirs(self.extractdir, exist_ok=True)
os.makedirs(self.tmpdir, exist_ok=True)
################################################
# Methods implemented on the abstract class #
################################################
self._calculate_cache_quota()
# get_artifact_fullname()
#
......@@ -240,8 +249,10 @@ class ArtifactCache():
for key in (strong_key, weak_key):
if key:
try:
self.update_mtime(element, key)
except ArtifactError:
ref = self.get_artifact_fullname(element, key)
self.cas.update_mtime(ref)
except CASError:
pass
# clean():
......@@ -252,7 +263,7 @@ class ArtifactCache():
# (int): The size of the cache after having cleaned up
#
def clean(self):
artifacts = self.list_artifacts() # pylint: disable=assignment-from-no-return
artifacts = self.list_artifacts()
# Build a set of the cache keys which are required
# based on the required elements at cleanup time
......@@ -294,7 +305,7 @@ class ArtifactCache():
if key not in required_artifacts:
# Remove the actual artifact, if it's not required.
size = self.remove(to_remove) # pylint: disable=assignment-from-no-return
size = self.remove(to_remove)
# Remove the size from the removed size
self.set_cache_size(self._cache_size - size)
......@@ -311,7 +322,7 @@ class ArtifactCache():
# (int): The size of the artifact cache.
#
def compute_cache_size(self):
self._cache_size = self.calculate_cache_size() # pylint: disable=assignment-from-no-return
self._cache_size = self.cas.calculate_cache_size()
return self._cache_size
......@@ -380,28 +391,12 @@ class ArtifactCache():
def has_quota_exceeded(self):
return self.get_cache_size() > self._cache_quota
################################################
# Abstract methods for subclasses to implement #
################################################
# preflight():
#
# Preflight check.
#
def preflight(self):
pass
# update_mtime()
#
# Update the mtime of an artifact.
#
# Args:
# element (Element): The Element to update
# key (str): The key of the artifact.
#
def update_mtime(self, element, key):
raise ImplError("Cache '{kind}' does not implement update_mtime()"
.format(kind=type(self).__name__))
self.cas.preflight()
# initialize_remotes():
#
......@@ -411,7 +406,59 @@ class ArtifactCache():
# on_failure (callable): Called if we fail to contact one of the caches.
#
def initialize_remotes(self, *, on_failure=None):
pass
remote_specs = self.global_remote_specs
for project in self.project_remote_specs:
remote_specs += self.project_remote_specs[project]
remote_specs = list(utils._deduplicate(remote_specs))
remotes = {}
q = multiprocessing.Queue()
for remote_spec in remote_specs:
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
p = multiprocessing.Process(target=self.cas.initialize_remote, args=(remote_spec, q))
try:
# Keep SIGINT blocked in the child process
with _signals.blocked([signal.SIGINT], ignore=False):
p.start()
error = q.get()
p.join()
except KeyboardInterrupt:
utils._kill_process_tree(p.pid)
raise
if error and on_failure:
on_failure(remote_spec.url, error)
elif error:
raise ArtifactError(error)
else:
self._has_fetch_remotes = True
if remote_spec.push:
self._has_push_remotes = True
remotes[remote_spec.url] = CASRemote(remote_spec)
for project in self.context.get_projects():
remote_specs = self.global_remote_specs
if project in self.project_remote_specs:
remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project]))
project_remotes = []
for remote_spec in remote_specs:
# Errors are already handled in the loop above,
# skip unreachable remotes here.
if remote_spec.url not in remotes:
continue
remote = remotes[remote_spec.url]
project_remotes.append(remote)
self._remotes[project] = project_remotes
# contains():
#
......@@ -425,8 +472,9 @@ class ArtifactCache():
# Returns: True if the artifact is in the cache, False otherwise
#
def contains(self, element, key):
raise ImplError("Cache '{kind}' does not implement contains()"
.format(kind=type(self).__name__))
ref = self.get_artifact_fullname(element, key)
return self.cas.contains(ref)
# list_artifacts():
#
......@@ -437,8 +485,7 @@ class ArtifactCache():
# `ArtifactCache.get_artifact_fullname` in LRU order
#
def list_artifacts(self):
raise ImplError("Cache '{kind}' does not implement list_artifacts()"
.format(kind=type(self).__name__))
return self.cas.list_refs()
# remove():
#
......@@ -450,9 +497,31 @@ class ArtifactCache():
# generated by
# `ArtifactCache.get_artifact_fullname`)
#
def remove(self, artifact_name):
raise ImplError("Cache '{kind}' does not implement remove()"
.format(kind=type(self).__name__))
# Returns:
# (int|None) The amount of space pruned from the repository in
# Bytes, or None if defer_prune is True
#
def remove(self, ref):
# Remove extract if not used by other ref
tree = self.cas.resolve_ref(ref)
ref_name, ref_hash = os.path.split(ref)
extract = os.path.join(self.extractdir, ref_name, tree.hash)
keys_file = os.path.join(extract, 'meta', 'keys.yaml')
if os.path.exists(keys_file):
keys_meta = _yaml.load(keys_file)
keys = [keys_meta['strong'], keys_meta['weak']]
remove_extract = True
for other_hash in keys:
if other_hash == ref_hash:
continue
remove_extract = False
break
if remove_extract:
utils._force_rmtree(extract)
return self.cas.remove(ref)
# extract():
#
......@@ -472,8 +541,11 @@ class ArtifactCache():
# Returns: path to extracted artifact
#
def extract(self, element, key):
raise ImplError("Cache '{kind}' does not implement extract()"
.format(kind=type(self).__name__))
ref = self.get_artifact_fullname(element, key)
path = os.path.join(self.extractdir, element._get_project().name, element.normal_name)
return self.cas.extract(ref, path)
# commit():
#
......@@ -485,8 +557,9 @@ class ArtifactCache():
# keys (list): The cache keys to use
#
def commit(self, element, content, keys):
raise ImplError("Cache '{kind}' does not implement commit()"
.format(kind=type(self).__name__))
refs = [self.get_artifact_fullname(element, key) for key in keys]
self.cas.commit(refs, content)
# diff():
#
......@@ -500,8 +573,10 @@ class ArtifactCache():
# subdir (str): A subdirectory to limit the comparison to
#
def diff(self, element, key_a, key_b, *, subdir=None):
raise ImplError("Cache '{kind}' does not implement diff()"
.format(kind=type(self).__name__))
ref_a = self.get_artifact_fullname(element, key_a)
ref_b = self.get_artifact_fullname(element, key_b)
return self.cas.diff(ref_a, ref_b, subdir=subdir)
# has_fetch_remotes():
#
......@@ -513,7 +588,16 @@ class ArtifactCache():
# Returns: True if any remote repositories are configured, False otherwise
#
def has_fetch_remotes(self, *, element=None):
return False
if not self._has_fetch_remotes:
# No project has fetch remotes
return False
elif element is None:
# At least one (sub)project has fetch remotes
return True
else:
# Check whether the specified element's project has fetch remotes
remotes_for_project = self._remotes[element._get_project()]
return bool(remotes_for_project)
# has_push_remotes():
#
......@@ -525,7 +609,16 @@ class ArtifactCache():
# Returns: True if any remote repository is configured, False otherwise
#
def has_push_remotes(self, *, element=None):
return False
if not self._has_push_remotes:
# No project has push remotes
return False
elif element is None:
# At least one (sub)project has push remotes
return True
else:
# Check whether the specified element's project has push remotes
remotes_for_project = self._remotes[element._get_project()]
return any(remote.spec.push for remote in remotes_for_project)
# push():
#
......@@ -542,8 +635,28 @@ class ArtifactCache():
# (ArtifactError): if there was an error
#
def push(self, element, keys):
raise ImplError("Cache '{kind}' does not implement push()"
.format(kind=type(self).__name__))
refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
project = element._get_project()
push_remotes = [r for r in self._remotes[project] if r.spec.push]
pushed = False
for remote in push_remotes:
remote.init()
display_key = element._get_brief_display_key()
element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
if self.cas.push(refs, remote):
element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
pushed = True
else:
element.info("Remote ({}) already has {} cached".format(
remote.spec.url, element._get_brief_display_key()
))
return pushed
# pull():
#
......@@ -558,8 +671,130 @@ class ArtifactCache():
# (bool): True if pull was successful, False if artifact was not available
#
def pull(self, element, key, *, progress=None):
raise ImplError("Cache '{kind}' does not implement pull()"
.format(kind=type(self).__name__))
ref = self.get_artifact_fullname(element, key)
project = element._get_project()
for remote in self._remotes[project]:
try:
display_key = element._get_brief_display_key()
element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
if self.cas.pull(ref, remote, progress=progress):
element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
# no need to pull from additional remotes
return True
else:
element.info("Remote ({}) does not have {} cached".format(
remote.spec.url, element._get_brief_display_key()
))
except CASError as e:
raise ArtifactError("Failed to pull artifact {}: {}".format(
element._get_brief_display_key(), e)) from e
return False
# pull_tree():
#
# Pull a single Tree rather than an artifact.
# Does not update local refs.
#
# Args:
# project (Project): The current project
# digest (Digest): The digest of the tree
#
def pull_tree(self, project, digest):
for remote in self._remotes[project]:
digest = self.cas.pull_tree(remote, digest)
if digest:
# no need to pull from additional remotes
return digest
return None
# push_directory():
#
# Push the given virtual directory to all remotes.
#
# Args:
# project (Project): The current project
# directory (Directory): A virtual directory object to push.
#
# Raises:
# (ArtifactError): if there was an error
#
def push_directory(self, project, directory):
if self._has_push_remotes:
push_remotes = [r for r in self._remotes[project] if r.spec.push]
else:
push_remotes = []
if not push_remotes:
raise ArtifactError("push_directory was called, but no remote artifact " +
"servers are configured as push remotes.")
if directory.ref is None:
return
for remote in push_remotes:
self.cas.push_directory(remote, directory)
# push_message():
#
# Push the given protobuf message to all remotes.
#
# Args:
# project (Project): The current project
# message (Message): A protobuf message to push.
#
# Raises:
# (ArtifactError): if there was an error
#
def push_message(self, project, message):
if self._has_push_remotes:
push_remotes = [r for r in self._remotes[project] if r.spec.push]
else:
push_remotes = []
if not push_remotes:
raise ArtifactError("push_message was called, but no remote artifact " +
"servers are configured as push remotes.")
for remote in push_remotes:
message_digest = self.cas.push_message(remote, message)
return message_digest
# verify_digest_pushed():
#
# Check whether the object is already on the server in which case
# there is no need to upload it.
#
# Args:
# project (Project): The current project
# digest (Digest): The object digest.
#
def verify_digest_pushed(self, project, digest):
if self._has_push_remotes:
push_remotes = [r for r in self._remotes[project] if r.spec.push]
else:
push_remotes = []
if not push_remotes:
raise ArtifactError("verify_digest_pushed was called, but no remote artifact " +
"servers are configured as push remotes.")
pushed = False
for remote in push_remotes:
if self.cas.verify_digest_on_remote(remote, digest):
pushed = True
return pushed
# link_key():
#
......@@ -571,19 +806,10 @@ class ArtifactCache():
# newkey (str): A new cache key for the artifact
#
def link_key(self, element, oldkey, newkey):
raise ImplError("Cache '{kind}' does not implement link_key()"
.format(kind=type(self).__name__))
oldref = self.get_artifact_fullname(element, oldkey)
newref = self.get_artifact_fullname(element, newkey)
# calculate_cache_size()
#
# Return the real artifact cache size.
#
# Returns:
# (int): The size of the artifact cache.
#
def calculate_cache_size(self):
raise ImplError("Cache '{kind}' does not implement calculate_cache_size()"
.format(kind=type(self).__name__))
self.cas.link_ref(oldref, newref)
################################################
# Local Private Methods #
......
This diff is collapsed.
......@@ -32,8 +32,9 @@ from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remo
from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
from .._exceptions import ArtifactError
from .._context import Context
from .._exceptions import CASError
from .cascache import CASCache
# The default limit for gRPC messages is 4 MiB.
......@@ -55,26 +56,23 @@ class ArtifactTooLargeException(Exception):
# enable_push (bool): Whether to allow blob uploads and artifact updates
#
def create_server(repo, *, enable_push):
context = Context()
context.artifactdir = os.path.abspath(repo)
artifactcache = context.artifactcache
cas = CASCache(os.path.abspath(repo))
# Use max_workers default from Python 3.5+
max_workers = (os.cpu_count() or 1) * 5
server = grpc.server(futures.ThreadPoolExecutor(max_workers))
bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
_ByteStreamServicer(artifactcache, enable_push=enable_push), server)
_ByteStreamServicer(cas, enable_push=enable_push), server)
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
_ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
_ContentAddressableStorageServicer(cas, enable_push=enable_push), server)
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
_CapabilitiesServicer(), server)
buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server(
_ReferenceStorageServicer(artifactcache, enable_push=enable_push), server)
_ReferenceStorageServicer(cas, enable_push=enable_push), server)
return server
......@@ -333,7 +331,7 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
response.digest.hash = tree.hash
response.digest.size_bytes = tree.size_bytes
except ArtifactError:
except CASError:
context.set_code(grpc.StatusCode.NOT_FOUND)
return response
......@@ -437,7 +435,7 @@ def _clean_up_cache(cas, object_size):
return 0
# obtain a list of LRP artifacts
LRP_artifacts = cas.list_artifacts()
LRP_artifacts = cas.list_refs()
removed_size = 0 # in bytes
while object_size - removed_size > free_disk_space:
......
......@@ -31,7 +31,6 @@ from ._exceptions import LoadError, LoadErrorReason, BstError
from ._message import Message, MessageType
from ._profile import Topics, profile_start, profile_end
from ._artifactcache import ArtifactCache
from ._artifactcache.cascache import CASCache
from ._workspaces import Workspaces
from .plugin import _plugin_lookup
......@@ -233,7 +232,7 @@ class Context():
@property
def artifactcache(self):
if not self._artifactcache:
self._artifactcache = CASCache(self)
self._artifactcache = ArtifactCache(self)
return self._artifactcache
......
......@@ -47,7 +47,6 @@ class ElementFactory(PluginContext):
# Args:
# context (object): The Context object for processing
# project (object): The project object
# artifacts (ArtifactCache): The artifact cache
# meta (object): The loaded MetaElement
#
# Returns: A newly created Element object of the appropriate kind
......@@ -56,9 +55,9 @@ class ElementFactory(PluginContext):
# PluginError (if the kind lookup failed)
# LoadError (if the element itself took issue with the config)
#
def create(self, context, project, artifacts, meta):
def create(self, context, project, meta):
element_type, default_config = self.lookup(meta.kind)
element = element_type(context, project, artifacts, meta, default_config)
element = element_type(context, project, meta, default_config)
version = self._format_versions.get(meta.kind, 0)
self._assert_plugin_format(element, version)
return element
......@@ -90,6 +90,7 @@ class ErrorDomain(Enum):
APP = 12
STREAM = 13
VIRTUAL_FS = 14
CAS = 15
# BstError is an internal base exception class for BuildSream
......@@ -274,6 +275,15 @@ class ArtifactError(BstError):
super().__init__(message, detail=detail, domain=ErrorDomain.ARTIFACT, reason=reason, temporary=True)
# CASError
#
# Raised when errors are encountered in the CAS
#
class CASError(BstError):
def __init__(self, message, *, detail=None, reason=None, temporary=False):
super().__init__(message, detail=detail, domain=ErrorDomain.CAS, reason=reason, temporary=True)
# PipelineError
#
# Raised from pipeline operations
......
......@@ -537,7 +537,7 @@ class Loader():
raise LoadError(LoadErrorReason.INVALID_DATA,
"{}: Expected junction but element kind is {}".format(filename, meta_element.kind))
element = Element._new_from_meta(meta_element, self._context.artifactcache)
element = Element._new_from_meta(meta_element)
element._preflight()
sources = list(element.sources())
......
......@@ -106,7 +106,7 @@ class Pipeline():
profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in targets))
elements = self._project.load_elements(targets, self._artifacts,
elements = self._project.load_elements(targets,
rewritable=rewritable,
fetch_subprojects=fetch_subprojects)
......
......@@ -224,18 +224,17 @@ class Project():
# Instantiate and return an element
#
# Args:
# artifacts (ArtifactCache): The artifact cache
# meta (MetaElement): The loaded MetaElement
# first_pass (bool): Whether to use first pass configuration (for junctions)
#
# Returns:
# (Element): A newly created Element object of the appropriate kind
#
def create_element(self, artifacts, meta, *, first_pass=False):
def create_element(self, meta, *, first_pass=False):
if first_pass:
return self.first_pass_config.element_factory.create(self._context, self, artifacts, meta)
return self.first_pass_config.element_factory.create(self._context, self, meta)
else:
return self.config.element_factory.create(self._context, self, artifacts, meta)
return self.config.element_factory.create(self._context, self, meta)
# create_source()
#
......@@ -305,7 +304,6 @@ class Project():
#
# Args:
# targets (list): Target names
# artifacts (ArtifactCache): Artifact cache
# rewritable (bool): Whether the loaded files should be rewritable
# this is a bit more expensive due to deep copies
# fetch_subprojects (bool): Whether we should fetch subprojects as a part of the
......@@ -314,7 +312,7 @@ class Project():
# Returns:
# (list): A list of loaded Element
#
def load_elements(self, targets, artifacts, *,
def load_elements(self, targets, *,
rewritable=False, fetch_subprojects=False):
with self._context.timed_activity("Loading elements", silent_nested=True):
meta_elements = self.loader.load(targets, rewritable=rewritable,
......@@ -323,7 +321,7 @@ class Project():
with self._context.timed_activity("Resolving elements"):
elements = [
Element._new_from_meta(meta, artifacts)
Element._new_from_meta(meta)
for meta in meta_elements
]
......
......@@ -174,7 +174,7 @@ class Element(Plugin):
*Since: 1.4*
"""
def __init__(self, context, project, artifacts, meta, plugin_conf):
def __init__(self, context, project, meta, plugin_conf):
self.__cache_key_dict = None # Dict for cache key calculation
self.__cache_key = None # Our cached cache key
......@@ -199,7 +199,7 @@ class Element(Plugin):
self.__sources = [] # List of Sources
self.__weak_cache_key = None # Our cached weak cache key
self.__strict_cache_key = None # Our cached cache key for strict builds
self.__artifacts = artifacts # Artifact cache
self.__artifacts = context.artifactcache # Artifact cache
self.__consistency = Consistency.INCONSISTENT # Cached overall consistency state
self.__strong_cached = None # Whether we have a cached artifact
self.__weak_cached = None # Whether we have a cached artifact
......@@ -872,14 +872,13 @@ class Element(Plugin):
# and its dependencies from a meta element.
#
# Args:
# artifacts (ArtifactCache): The artifact cache
# meta (MetaElement): The meta element
#
# Returns:
# (Element): A newly created Element instance
#
@classmethod
def _new_from_meta(cls, meta, artifacts):
def _new_from_meta(cls, meta):
if not meta.first_pass:
meta.project.ensure_fully_loaded()
......@@ -887,7 +886,7 @@ class Element(Plugin):
if meta in cls.__instantiated_elements:
return cls.__instantiated_elements[meta]
element = meta.project.create_element(artifacts, meta, first_pass=meta.first_pass)
element = meta.project.create_element(meta, first_pass=meta.first_pass)
cls.__instantiated_elements[meta] = element
# Instantiate sources
......@@ -904,10 +903,10 @@ class Element(Plugin):
# Instantiate dependencies
for meta_dep in meta.dependencies:
dependency = Element._new_from_meta(meta_dep, artifacts)
dependency = Element._new_from_meta(meta_dep)
element.__runtime_dependencies.append(dependency)
for meta_dep in meta.build_dependencies:
dependency = Element._new_from_meta(meta_dep, artifacts)
dependency = Element._new_from_meta(meta_dep)
element.__build_dependencies.append(dependency)
return element
......@@ -1411,16 +1410,9 @@ class Element(Plugin):
finally:
# Staging may produce directories with less than 'rwx' permissions
# for the owner, which will break tempfile, so we need to use chmod
# occasionally.
def make_dir_writable(fn, path, excinfo):
os.chmod(os.path.dirname(path), 0o777)
if os.path.isdir(path):
os.rmdir(path)
else:
os.remove(path)
shutil.rmtree(temp_staging_directory, onerror=make_dir_writable)
# for the owner, which breaks tempfile. _force_rmtree will deal
# with these.
utils._force_rmtree(temp_staging_directory)
# Ensure deterministic mtime of sources at build time
vdirectory.set_deterministic_mtime()
# Ensure deterministic owners of sources at build time
......@@ -2057,7 +2049,7 @@ class Element(Plugin):
'sources': [s._get_unique_key(workspace is None) for s in self.__sources],
'workspace': '' if workspace is None else workspace.get_key(self._get_project()),
'public': self.__public,
'cache': type(self.__artifacts).__name__
'cache': 'CASCache'
}
self.__cache_key_dict['fatal-warnings'] = sorted(project._fatal_warnings)
......
......@@ -28,10 +28,7 @@ from ..storage._filebaseddirectory import FileBasedDirectory
from ..storage._casbaseddirectory import CasBasedDirectory
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from .._protos.google.rpc import code_pb2
class SandboxError(Exception):
pass
from .._exceptions import SandboxError
# SandboxRemote()
......
......@@ -79,7 +79,7 @@ class CasBasedDirectory(Directory):
self.filename = filename
self.common_name = common_name
self.pb2_directory = remote_execution_pb2.Directory()
self.cas_cache = context.artifactcache
self.cas_cache = context.artifactcache.cas
if ref:
with open(self.cas_cache.objpath(ref), 'rb') as f:
self.pb2_directory.ParseFromString(f.read())
......
......@@ -634,7 +634,7 @@ def _parse_size(size, volume):
# _pretty_size()
#
# Converts a number of bytes into a string representation in KB, MB, GB, TB
# Converts a number of bytes into a string representation in KiB, MiB, GiB, TiB
# represented as K, M, G, T etc.
#
# Args:
......@@ -646,10 +646,11 @@ def _parse_size(size, volume):
def _pretty_size(size, dec_places=0):
psize = size
unit = 'B'
for unit in ('B', 'K', 'M', 'G', 'T'):
units = ('B', 'K', 'M', 'G', 'T')
for unit in units:
if psize < 1024:
break
else:
elif unit != units[-1]:
psize /= 1024
return "{size:g}{unit}".format(size=round(psize, dec_places), unit=unit)
......
......@@ -90,7 +90,7 @@ def test_pull(cli, tmpdir, datafiles):
cas = context.artifactcache
# Assert that the element's artifact is **not** cached
element = project.load_elements(['target.bst'], cas)[0]
element = project.load_elements(['target.bst'])[0]
element_key = cli.get_element_key(project_dir, 'target.bst')
assert not cas.contains(element, element_key)
......@@ -132,7 +132,7 @@ def _test_pull(user_config_file, project_dir, artifact_dir,
cas = context.artifactcache
# Load the target element
element = project.load_elements([element_name], cas)[0]
element = project.load_elements([element_name])[0]
# Manually setup the CAS remote
cas.setup_remotes(use_config=True)
......@@ -190,15 +190,16 @@ def test_pull_tree(cli, tmpdir, datafiles):
# Load the project and CAS cache
project = Project(project_dir, context)
project.ensure_fully_loaded()
cas = context.artifactcache
artifactcache = context.artifactcache
cas = artifactcache.cas
# Assert that the element's artifact is cached
element = project.load_elements(['target.bst'], cas)[0]
element = project.load_elements(['target.bst'])[0]
element_key = cli.get_element_key(project_dir, 'target.bst')
assert cas.contains(element, element_key)
assert artifactcache.contains(element, element_key)
# Retrieve the Directory object from the cached artifact
artifact_ref = cas.get_artifact_fullname(element, element_key)
artifact_ref = artifactcache.get_artifact_fullname(element, element_key)
artifact_digest = cas.resolve_ref(artifact_ref)
queue = multiprocessing.Queue()
......@@ -268,12 +269,13 @@ def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest
project.ensure_fully_loaded()
# Create a local CAS cache handle
cas = context.artifactcache
artifactcache = context.artifactcache
cas = artifactcache.cas
# Manually setup the CAS remote
cas.setup_remotes(use_config=True)
artifactcache.setup_remotes(use_config=True)
if cas.has_push_remotes():
if artifactcache.has_push_remotes():
directory = remote_execution_pb2.Directory()
with open(cas.objpath(artifact_digest), 'rb') as f:
......@@ -284,7 +286,7 @@ def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest
tree_maker(cas, tree, directory)
# Push the Tree as a regular message
tree_digest = cas.push_message(project, tree)
tree_digest = artifactcache.push_message(project, tree)
queue.put((tree_digest.hash, tree_digest.size_bytes))
else:
......
......@@ -69,7 +69,7 @@ def test_push(cli, tmpdir, datafiles):
cas = context.artifactcache
# Assert that the element's artifact is cached
element = project.load_elements(['target.bst'], cas)[0]
element = project.load_elements(['target.bst'])[0]
element_key = cli.get_element_key(project_dir, 'target.bst')
assert cas.contains(element, element_key)
......@@ -111,7 +111,7 @@ def _test_push(user_config_file, project_dir, artifact_dir,
cas = context.artifactcache
# Load the target element
element = project.load_elements([element_name], cas)[0]
element = project.load_elements([element_name])[0]
# Manually setup the CAS remote
cas.setup_remotes(use_config=True)
......@@ -165,20 +165,21 @@ def test_push_directory(cli, tmpdir, datafiles):
# Load the project and CAS cache
project = Project(project_dir, context)
project.ensure_fully_loaded()
cas = context.artifactcache
artifactcache = context.artifactcache
cas = artifactcache.cas
# Assert that the element's artifact is cached
element = project.load_elements(['target.bst'], cas)[0]
element = project.load_elements(['target.bst'])[0]
element_key = cli.get_element_key(project_dir, 'target.bst')
assert cas.contains(element, element_key)
assert artifactcache.contains(element, element_key)
# Manually setup the CAS remote
cas.setup_remotes(use_config=True)
cas.initialize_remotes()
assert cas.has_push_remotes(element=element)
artifactcache.setup_remotes(use_config=True)
artifactcache.initialize_remotes()
assert artifactcache.has_push_remotes(element=element)
# Recreate the CasBasedDirectory object from the cached artifact
artifact_ref = cas.get_artifact_fullname(element, element_key)
artifact_ref = artifactcache.get_artifact_fullname(element, element_key)
artifact_digest = cas.resolve_ref(artifact_ref)
queue = multiprocessing.Queue()
......
......@@ -13,7 +13,7 @@ import pytest_cov
from buildstream import _yaml
from buildstream._artifactcache.casserver import create_server
from buildstream._context import Context
from buildstream._exceptions import ArtifactError
from buildstream._exceptions import CASError
from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
......@@ -48,7 +48,7 @@ class ArtifactShare():
context = Context()
context.artifactdir = self.repodir
self.cas = context.artifactcache
self.cas = context.artifactcache.cas
self.total_space = total_space
self.free_space = free_space
......@@ -135,7 +135,7 @@ class ArtifactShare():
try:
tree = self.cas.resolve_ref(artifact_key)
return True
except ArtifactError:
except CASError:
return False
# close():
......
from contextlib import contextmanager
import os
# MockAttributeResult
#
# A class to take a dictionary of kwargs and make them accessible via
# attributes of the object.
#
class MockAttributeResult(dict):
__getattr__ = dict.get
# mock_statvfs():
#
# Gets a function which mocks statvfs and returns a statvfs result with the kwargs accessible.
#
# Returns:
# func(path) -> object: object will have all the kwargs accessible via object.kwarg
#
# Example:
# statvfs = mock_statvfs(f_blocks=10)
# result = statvfs("regardless/of/path")
# assert result.f_blocks == 10 # True
def mock_statvfs(**kwargs):
def statvfs(path):
return MockAttributeResult(kwargs)
return statvfs
# monkey_patch()
#
# with monkey_patch("statvfs", custom_statvfs):
# assert os.statvfs == custom_statvfs # True
# assert os.statvfs == custom_statvfs # False
#
@contextmanager
def monkey_patch(to_patch, patched_func):
orig = getattr(os, to_patch)
setattr(os, to_patch, patched_func)
try:
yield
finally:
setattr(os, to_patch, orig)
from buildstream import _yaml
from ..testutils import mock_os
from ..testutils.runcli import cli
import os
import pytest
KiB = 1024
MiB = (KiB * 1024)
GiB = (MiB * 1024)
TiB = (GiB * 1024)
def test_parse_size_over_1024T(cli, tmpdir):
BLOCK_SIZE = 4096
cli.configure({
'cache': {
'quota': 2048 * TiB
}
})
project = tmpdir.join("main")
os.makedirs(str(project))
_yaml.dump({'name': 'main'}, str(project.join("project.conf")))
bavail = (1025 * TiB) / BLOCK_SIZE
patched_statvfs = mock_os.mock_statvfs(f_bavail=bavail, f_bsize=BLOCK_SIZE)
with mock_os.monkey_patch("statvfs", patched_statvfs):
result = cli.run(project, args=["build", "file.bst"])
assert "1025T of available system storage" in result.stderr