Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • willsalmon/buildstream
  • CumHoleZH/buildstream
  • tchaik/buildstream
  • DCotyPortfolio/buildstream
  • jesusoctavioas/buildstream
  • patrickmmartin/buildstream
  • franred/buildstream
  • tintou/buildstream
  • alatiera/buildstream
  • martinblanchard/buildstream
  • neverdie22042524/buildstream
  • Mattlk13/buildstream
  • PServers/buildstream
  • phamnghia610909/buildstream
  • chiaratolentino/buildstream
  • eysz7-x-x/buildstream
  • kerrick1/buildstream
  • matthew-yates/buildstream
  • twofeathers/buildstream
  • mhadjimichael/buildstream
  • pointswaves/buildstream
  • Mr.JackWilson/buildstream
  • Tw3akG33k/buildstream
  • AlexFazakas/buildstream
  • eruidfkiy/buildstream
  • clamotion2/buildstream
  • nanonyme/buildstream
  • wickyjaaa/buildstream
  • nmanchev/buildstream
  • bojorquez.ja/buildstream
  • mostynb/buildstream
  • highpit74/buildstream
  • Demo112/buildstream
  • ba2014sheer/buildstream
  • tonimadrino/buildstream
  • usuario2o/buildstream
  • Angelika123456/buildstream
  • neo355/buildstream
  • corentin-ferlay/buildstream
  • coldtom/buildstream
  • wifitvbox81/buildstream
  • 358253885/buildstream
  • seanborg/buildstream
  • SotK/buildstream
  • DouglasWinship/buildstream
  • karansthr97/buildstream
  • louib/buildstream
  • bwh-ct/buildstream
  • robjh/buildstream
  • we88c0de/buildstream
  • zhengxian5555/buildstream
51 results
Show changes
Commits on Source (42)
Showing
with 496 additions and 243 deletions
......@@ -81,19 +81,16 @@ ArtifactCacheSpec.__new__.__defaults__ = (None, None, None)
class ArtifactCache():
def __init__(self, context):
self.context = context
self.required_artifacts = set()
self.extractdir = os.path.join(context.artifactdir, 'extract')
self.tmpdir = os.path.join(context.artifactdir, 'tmp')
self.estimated_size = None
self.global_remote_specs = []
self.project_remote_specs = {}
self._local = False
self.cache_size = None
self.cache_quota = None
self.cache_lower_threshold = None
self._required_artifacts = set() # The artifacts required for this session
self._cache_size = None # The current cache size, sometimes it's an estimate
self._cache_quota = None # The cache quota
self._cache_lower_threshold = None # The target cache size for a cleanup
os.makedirs(self.extractdir, exist_ok=True)
os.makedirs(self.tmpdir, exist_ok=True)
......@@ -212,8 +209,8 @@ class ArtifactCache():
weak_key = element._get_cache_key(strength=_KeyStrength.WEAK)
for key in (strong_key, weak_key):
if key and key not in self.required_artifacts:
self.required_artifacts.add(key)
if key and key not in self._required_artifacts:
self._required_artifacts.add(key)
# We also update the usage times of any artifacts
# we will be using, which helps preventing a
......@@ -228,10 +225,16 @@ class ArtifactCache():
#
# Clean the artifact cache as much as possible.
#
# Returns:
# (int): The size of the cache after having cleaned up
#
def clean(self):
artifacts = self.list_artifacts()
while self.calculate_cache_size() >= self.cache_quota - self.cache_lower_threshold:
# Do a real computation of the cache size once, just in case
self.compute_cache_size()
while self.get_cache_size() >= self._cache_lower_threshold:
try:
to_remove = artifacts.pop(0)
except IndexError:
......@@ -245,7 +248,7 @@ class ArtifactCache():
"Please increase the cache-quota in {}."
.format(self.context.config_origin or default_conf))
if self.calculate_cache_size() > self.cache_quota:
if self.get_quota_exceeded():
raise ArtifactError("Cache too full. Aborting.",
detail=detail,
reason="cache-too-full")
......@@ -253,46 +256,94 @@ class ArtifactCache():
break
key = to_remove.rpartition('/')[2]
if key not in self.required_artifacts:
if key not in self._required_artifacts:
# Remove the actual artifact, if it's not required.
size = self.remove(to_remove)
if size:
self.cache_size -= size
# Remove the size from the removed size
self.set_cache_size(self._cache_size - size)
# This should be O(1) if implemented correctly
return self.calculate_cache_size()
return self.get_cache_size()
# get_approximate_cache_size()
# compute_cache_size()
#
# A cheap method that aims to serve as an upper limit on the
# artifact cache size.
# Computes the real artifact cache size by calling
# the abstract calculate_cache_size() method.
#
# The cache size reported by this function will normally be larger
# than the real cache size, since it is calculated using the
# pre-commit artifact size, but for very small artifacts in
# certain caches additional overhead could cause this to be
# smaller than, but close to, the actual size.
# Returns:
# (int): The size of the artifact cache.
#
def compute_cache_size(self):
self._cache_size = self.calculate_cache_size()
return self._cache_size
# add_artifact_size()
#
# Nonetheless, in practice this should be safe to use as an upper
# limit on the cache size.
# Adds the reported size of a newly cached artifact to the
# overall estimated size.
#
# If the cache has built-in constant-time size reporting, please
# feel free to override this method with a more accurate
# implementation.
# Args:
# artifact_size (int): The size to add.
#
def add_artifact_size(self, artifact_size):
cache_size = self.get_cache_size()
cache_size += artifact_size
self.set_cache_size(cache_size)
# get_cache_size()
#
# Fetches the cached size of the cache, this is sometimes
# an estimate and periodically adjusted to the real size
# when a cache size calculation job runs.
#
# When it is an estimate, the value is either correct, or
# it is greater than the actual cache size.
#
# Returns:
# (int) An approximation of the artifact cache size.
#
def get_approximate_cache_size(self):
# If we don't currently have an estimate, figure out the real
# cache size.
if self.estimated_size is None:
def get_cache_size(self):
# If we don't currently have an estimate, figure out the real cache size.
if self._cache_size is None:
stored_size = self._read_cache_size()
if stored_size is not None:
self.estimated_size = stored_size
self._cache_size = stored_size
else:
self.estimated_size = self.calculate_cache_size()
self.compute_cache_size()
return self._cache_size
return self.estimated_size
# set_cache_size()
#
# Forcefully set the overall cache size.
#
# This is used to update the size in the main process after
# having calculated in a cleanup or a cache size calculation job.
#
# Args:
# cache_size (int): The size to set.
#
def set_cache_size(self, cache_size):
assert cache_size is not None
self._cache_size = cache_size
self._write_cache_size(self._cache_size)
# get_quota_exceeded()
#
# Checks if the current artifact cache size exceeds the quota.
#
# Returns:
# (bool): True of the quota is exceeded
#
def get_quota_exceeded(self):
return self.get_cache_size() > self._cache_quota
################################################
# Abstract methods for subclasses to implement #
......@@ -484,11 +535,8 @@ class ArtifactCache():
#
# Return the real artifact cache size.
#
# Implementations should also use this to update estimated_size.
#
# Returns:
#
# (int) The size of the artifact cache.
# (int): The size of the artifact cache.
#
def calculate_cache_size(self):
raise ImplError("Cache '{kind}' does not implement calculate_cache_size()"
......@@ -535,39 +583,13 @@ class ArtifactCache():
with self.context.timed_activity("Initializing remote caches", silent_nested=True):
self.initialize_remotes(on_failure=remote_failed)
# _add_artifact_size()
#
# Since we cannot keep track of the cache size between threads,
# this method will be called by the main process every time a
# process that added something to the cache finishes.
#
# This will then add the reported size to
# ArtifactCache.estimated_size.
#
def _add_artifact_size(self, artifact_size):
if not self.estimated_size:
self.estimated_size = self.calculate_cache_size()
self.estimated_size += artifact_size
self._write_cache_size(self.estimated_size)
# _set_cache_size()
#
# Similarly to the above method, when we calculate the actual size
# in a child thread, we can't update it. We instead pass the value
# back to the main thread and update it there.
#
def _set_cache_size(self, cache_size):
self.estimated_size = cache_size
# set_cache_size is called in cleanup, where it may set the cache to None
if self.estimated_size is not None:
self._write_cache_size(self.estimated_size)
# _write_cache_size()
#
# Writes the given size of the artifact to the cache's size file
#
# Args:
# size (int): The size of the artifact cache to record
#
def _write_cache_size(self, size):
assert isinstance(size, int)
size_file_path = os.path.join(self.context.artifactdir, CACHE_SIZE_FILE)
......@@ -579,6 +601,9 @@ class ArtifactCache():
# Reads and returns the size of the artifact cache that's stored in the
# cache's size file
#
# Returns:
# (int): The size of the artifact cache, as recorded in the file
#
def _read_cache_size(self):
size_file_path = os.path.join(self.context.artifactdir, CACHE_SIZE_FILE)
......@@ -628,13 +653,13 @@ class ArtifactCache():
stat = os.statvfs(artifactdir_volume)
available_space = (stat.f_bsize * stat.f_bavail)
cache_size = self.get_approximate_cache_size()
cache_size = self.get_cache_size()
# Ensure system has enough storage for the cache_quota
#
# If cache_quota is none, set it to the maximum it could possibly be.
#
# Also check that cache_quota is atleast as large as our headroom.
# Also check that cache_quota is at least as large as our headroom.
#
if cache_quota is None: # Infinity, set to max system storage
cache_quota = cache_size + available_space
......@@ -660,8 +685,8 @@ class ArtifactCache():
# if we end up writing more than 2G, but hey, this stuff is
# already really fuzzy.
#
self.cache_quota = cache_quota - headroom
self.cache_lower_threshold = self.cache_quota / 2
self._cache_quota = cache_quota - headroom
self._cache_lower_threshold = self._cache_quota / 2
# _configured_remote_artifact_cache_specs():
......
......@@ -19,6 +19,7 @@
import hashlib
import itertools
import io
import multiprocessing
import os
import signal
......@@ -76,6 +77,7 @@ class CASCache(ArtifactCache):
################################################
# Implementation of abstract methods #
################################################
def contains(self, element, key):
refpath = self._refpath(self.get_artifact_fullname(element, key))
......@@ -120,8 +122,6 @@ class CASCache(ArtifactCache):
for ref in refs:
self.set_ref(ref, tree)
self.cache_size = None
def diff(self, element, key_a, key_b, *, subdir=None):
ref_a = self.get_artifact_fullname(element, key_a)
ref_b = self.get_artifact_fullname(element, key_b)
......@@ -153,6 +153,7 @@ class CASCache(ArtifactCache):
q = multiprocessing.Queue()
for remote_spec in remote_specs:
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q))
try:
......@@ -259,6 +260,25 @@ class CASCache(ArtifactCache):
return False
def pull_tree(self, project, digest):
""" Pull a single Tree rather than an artifact.
Does not update local refs. """
for remote in self._remotes[project]:
try:
remote.init()
digest = self._fetch_tree(remote, digest)
# no need to pull from additional remotes
return digest
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
raise
return None
def link_key(self, element, oldkey, newkey):
oldref = self.get_artifact_fullname(element, oldkey)
newref = self.get_artifact_fullname(element, newkey)
......@@ -267,8 +287,46 @@ class CASCache(ArtifactCache):
self.set_ref(newref, tree)
def _push_refs_to_remote(self, refs, remote):
skipped_remote = True
try:
for ref in refs:
tree = self.resolve_ref(ref)
# Check whether ref is already on the server in which case
# there is no need to push the artifact
try:
request = buildstream_pb2.GetReferenceRequest()
request.key = ref
response = remote.ref_storage.GetReference(request)
if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
# ref is already on the server with the same tree
continue
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
# Intentionally re-raise RpcError for outer except block.
raise
self._send_directory(remote, tree)
request = buildstream_pb2.UpdateReferenceRequest()
request.keys.append(ref)
request.digest.hash = tree.hash
request.digest.size_bytes = tree.size_bytes
remote.ref_storage.UpdateReference(request)
skipped_remote = False
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
return not skipped_remote
def push(self, element, keys):
refs = [self.get_artifact_fullname(element, key) for key in keys]
refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
project = element._get_project()
......@@ -278,95 +336,77 @@ class CASCache(ArtifactCache):
for remote in push_remotes:
remote.init()
skipped_remote = True
element.info("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url))
try:
for ref in refs:
tree = self.resolve_ref(ref)
# Check whether ref is already on the server in which case
# there is no need to push the artifact
try:
request = buildstream_pb2.GetReferenceRequest()
request.key = ref
response = remote.ref_storage.GetReference(request)
if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
# ref is already on the server with the same tree
continue
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
# Intentionally re-raise RpcError for outer except block.
raise
missing_blobs = {}
required_blobs = self._required_blobs(tree)
# Limit size of FindMissingBlobs request
for required_blobs_group in _grouper(required_blobs, 512):
request = remote_execution_pb2.FindMissingBlobsRequest()
for required_digest in required_blobs_group:
d = request.blob_digests.add()
d.hash = required_digest.hash
d.size_bytes = required_digest.size_bytes
response = remote.cas.FindMissingBlobs(request)
for digest in response.missing_blob_digests:
d = remote_execution_pb2.Digest()
d.hash = digest.hash
d.size_bytes = digest.size_bytes
missing_blobs[d.hash] = d
# Upload any blobs missing on the server
skipped_remote = False
for digest in missing_blobs.values():
uuid_ = uuid.uuid4()
resource_name = '/'.join(['uploads', str(uuid_), 'blobs',
digest.hash, str(digest.size_bytes)])
def request_stream(resname):
with open(self.objpath(digest), 'rb') as f:
assert os.fstat(f.fileno()).st_size == digest.size_bytes
offset = 0
finished = False
remaining = digest.size_bytes
while not finished:
chunk_size = min(remaining, 64 * 1024)
remaining -= chunk_size
request = bytestream_pb2.WriteRequest()
request.write_offset = offset
# max. 64 kB chunks
request.data = f.read(chunk_size)
request.resource_name = resname
request.finish_write = remaining <= 0
yield request
offset += chunk_size
finished = request.finish_write
response = remote.bytestream.Write(request_stream(resource_name))
request = buildstream_pb2.UpdateReferenceRequest()
request.keys.append(ref)
request.digest.hash = tree.hash
request.digest.size_bytes = tree.size_bytes
remote.ref_storage.UpdateReference(request)
pushed = True
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
element.info("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url))
if skipped_remote:
if self._push_refs_to_remote(refs, remote):
pushed = True
else:
self.context.message(Message(
None,
MessageType.SKIPPED,
"Remote ({}) already has {} cached".format(
remote.spec.url, element._get_brief_display_key())
))
return pushed
def push_directory(self, project, directory):
push_remotes = [r for r in self._remotes[project] if r.spec.push]
if directory.ref is None:
return None
for remote in push_remotes:
remote.init()
self._send_directory(remote, directory.ref)
return directory.ref
def push_message(self, project, message):
push_remotes = [r for r in self._remotes[project] if r.spec.push]
message_buffer = message.SerializeToString()
message_sha = hashlib.sha256(message_buffer)
message_digest = remote_execution_pb2.Digest()
message_digest.hash = message_sha.hexdigest()
message_digest.size_bytes = len(message_buffer)
for remote in push_remotes:
remote.init()
with io.BytesIO(message_buffer) as b:
self._send_blob(remote, message_digest, b)
return message_digest
def _verify_digest_on_remote(self, remote, digest):
# Check whether ref is already on the server in which case
# there is no need to push the artifact
request = remote_execution_pb2.FindMissingBlobsRequest()
request.blob_digests.extend([digest])
response = remote.cas.FindMissingBlobs(request)
if digest in response.missing_blob_digests:
return False
return True
def verify_digest_pushed(self, project, digest):
push_remotes = [r for r in self._remotes[project] if r.spec.push]
pushed = False
for remote in push_remotes:
remote.init()
if self._verify_digest_on_remote(remote, digest):
pushed = True
return pushed
################################################
......@@ -488,11 +528,7 @@ class CASCache(ArtifactCache):
raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e
def calculate_cache_size(self):
if self.cache_size is None:
self.cache_size = utils._get_dir_size(self.casdir)
self.estimated_size = self.cache_size
return self.cache_size
return utils._get_dir_size(self.casdir)
# list_artifacts():
#
......@@ -599,6 +635,7 @@ class CASCache(ArtifactCache):
################################################
# Local Private Methods #
################################################
def _checkout(self, dest, tree):
os.makedirs(dest, exist_ok=True)
......@@ -761,16 +798,16 @@ class CASCache(ArtifactCache):
#
q.put(str(e))
def _required_blobs(self, tree):
def _required_blobs(self, directory_digest):
# parse directory, and recursively add blobs
d = remote_execution_pb2.Digest()
d.hash = tree.hash
d.size_bytes = tree.size_bytes
d.hash = directory_digest.hash
d.size_bytes = directory_digest.size_bytes
yield d
directory = remote_execution_pb2.Directory()
with open(self.objpath(tree), 'rb') as f:
with open(self.objpath(directory_digest), 'rb') as f:
directory.ParseFromString(f.read())
for filenode in directory.files:
......@@ -782,16 +819,16 @@ class CASCache(ArtifactCache):
for dirnode in directory.directories:
yield from self._required_blobs(dirnode.digest)
def _fetch_blob(self, remote, digest, out):
def _fetch_blob(self, remote, digest, stream):
resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
request = bytestream_pb2.ReadRequest()
request.resource_name = resource_name
request.read_offset = 0
for response in remote.bytestream.Read(request):
out.write(response.data)
stream.write(response.data)
stream.flush()
out.flush()
assert digest.size_bytes == os.fstat(out.fileno()).st_size
assert digest.size_bytes == os.fstat(stream.fileno()).st_size
def _fetch_directory(self, remote, tree):
objpath = self.objpath(tree)
......@@ -827,6 +864,92 @@ class CASCache(ArtifactCache):
digest = self.add_object(path=out.name)
assert digest.hash == tree.hash
def _fetch_tree(self, remote, digest):
# download but do not store the Tree object
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
self._fetch_blob(remote, digest, out)
tree = remote_execution_pb2.Tree()
with open(out.name, 'rb') as f:
tree.ParseFromString(f.read())
tree.children.extend([tree.root])
for directory in tree.children:
for filenode in directory.files:
fileobjpath = self.objpath(filenode.digest)
if os.path.exists(fileobjpath):
# already in local cache
continue
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
self._fetch_blob(remote, filenode.digest, f)
added_digest = self.add_object(path=f.name)
assert added_digest.hash == filenode.digest.hash
# place directory blob only in final location when we've downloaded
# all referenced blobs to avoid dangling references in the repository
dirbuffer = directory.SerializeToString()
dirdigest = self.add_object(buffer=dirbuffer)
assert dirdigest.size_bytes == len(dirbuffer)
return dirdigest
def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()):
resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
digest.hash, str(digest.size_bytes)])
def request_stream(resname, instream):
offset = 0
finished = False
remaining = digest.size_bytes
while not finished:
chunk_size = min(remaining, 64 * 1024)
remaining -= chunk_size
request = bytestream_pb2.WriteRequest()
request.write_offset = offset
# max. 64 kB chunks
request.data = instream.read(chunk_size)
request.resource_name = resname
request.finish_write = remaining <= 0
yield request
offset += chunk_size
finished = request.finish_write
response = remote.bytestream.Write(request_stream(resource_name, stream))
assert response.committed_size == digest.size_bytes
def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
required_blobs = self._required_blobs(digest)
missing_blobs = dict()
# Limit size of FindMissingBlobs request
for required_blobs_group in _grouper(required_blobs, 512):
request = remote_execution_pb2.FindMissingBlobsRequest()
for required_digest in required_blobs_group:
d = request.blob_digests.add()
d.hash = required_digest.hash
d.size_bytes = required_digest.size_bytes
response = remote.cas.FindMissingBlobs(request)
for missing_digest in response.missing_blob_digests:
d = remote_execution_pb2.Digest()
d.hash = missing_digest.hash
d.size_bytes = missing_digest.size_bytes
missing_blobs[d.hash] = d
# Upload any blobs missing on the server
for blob_digest in missing_blobs.values():
with open(self.objpath(blob_digest), 'rb') as f:
assert os.fstat(f.fileno()).st_size == blob_digest.size_bytes
self._send_blob(remote, blob_digest, f, u_uid=u_uid)
# Represents a single remote CAS cache.
#
......
......@@ -128,6 +128,7 @@ class Project():
self._shell_host_files = [] # A list of HostMount objects
self.artifact_cache_specs = None
self.remote_execution_url = None
self._sandbox = None
self._splits = None
......@@ -471,7 +472,7 @@ class Project():
'aliases', 'name',
'artifacts', 'options',
'fail-on-overlap', 'shell', 'fatal-warnings',
'ref-storage', 'sandbox', 'mirrors'
'ref-storage', 'sandbox', 'mirrors', 'remote-execution'
])
#
......@@ -482,6 +483,11 @@ class Project():
# Load artifacts pull/push configuration for this project
self.artifact_cache_specs = ArtifactCache.specs_from_config_node(config, self.directory)
# Load remote-execution configuration for this project
remote_execution = _yaml.node_get(config, Mapping, 'remote-execution')
_yaml.node_validate(remote_execution, ['url'])
self.remote_execution_url = _yaml.node_get(remote_execution, str, 'url')
# Load sandbox environment variables
self.base_environment = _yaml.node_get(config, Mapping, 'environment')
self.base_env_nocache = _yaml.node_get(config, list, 'environment-nocache')
......
......@@ -24,15 +24,19 @@ class CacheSizeJob(Job):
def __init__(self, *args, complete_cb, **kwargs):
super().__init__(*args, **kwargs)
self._complete_cb = complete_cb
self._cache = Platform._instance.artifactcache
platform = Platform.get_platform()
self._artifacts = platform.artifactcache
def child_process(self):
return self._cache.calculate_cache_size()
return self._artifacts.compute_cache_size()
def parent_complete(self, success, result):
self._cache._set_cache_size(result)
if self._complete_cb:
self._complete_cb(result)
if success:
self._artifacts.set_cache_size(result)
if self._complete_cb:
self._complete_cb(result)
def child_process_data(self):
return {}
......@@ -24,15 +24,19 @@ class CleanupJob(Job):
def __init__(self, *args, complete_cb, **kwargs):
super().__init__(*args, **kwargs)
self._complete_cb = complete_cb
self._cache = Platform._instance.artifactcache
platform = Platform.get_platform()
self._artifacts = platform.artifactcache
def child_process(self):
return self._cache.clean()
return self._artifacts.clean()
def parent_complete(self, success, result):
self._cache._set_cache_size(result)
if self._complete_cb:
self._complete_cb()
if success:
self._artifacts.set_cache_size(result)
if self._complete_cb:
self._complete_cb()
def child_process_data(self):
return {}
......@@ -109,13 +109,7 @@ class ElementJob(Job):
data = {}
workspace = self._element._get_workspace()
artifact_size = self._element._get_artifact_size()
cache_size = self._element._get_artifact_cache().calculate_cache_size()
if workspace is not None:
data['workspace'] = workspace.to_dict()
if artifact_size is not None:
data['artifact_size'] = artifact_size
data['cache_size'] = cache_size
return data
......@@ -109,7 +109,7 @@ class Job():
# Private members
#
self._scheduler = scheduler # The scheduler
self._queue = multiprocessing.Queue() # A message passing queue
self._queue = None # A message passing queue
self._process = None # The Process object
self._watcher = None # Child process watcher
self._listening = False # Whether the parent is currently listening
......@@ -130,6 +130,8 @@ class Job():
#
def spawn(self):
self._queue = multiprocessing.Queue()
self._tries += 1
self._parent_start_listening()
......@@ -552,6 +554,9 @@ class Job():
self.parent_complete(returncode == RC_OK, self._result)
self._scheduler.job_completed(self, returncode == RC_OK)
# Force the deletion of the queue and process objects to try and clean up FDs
self._queue = self._process = None
# _parent_process_envelope()
#
# Processes a message Envelope deserialized form the message queue.
......
......@@ -24,6 +24,7 @@ from . import Queue, QueueStatus
from ..jobs import ElementJob
from ..resources import ResourceType
from ..._message import MessageType
from ..._platform import Platform
# A queue which assembles elements
......@@ -32,7 +33,7 @@ class BuildQueue(Queue):
action_name = "Build"
complete_name = "Built"
resources = [ResourceType.PROCESS]
resources = [ResourceType.PROCESS, ResourceType.CACHE]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
......@@ -67,8 +68,7 @@ class BuildQueue(Queue):
return super().enqueue(to_queue)
def process(self, element):
element._assemble()
return element._get_unique_id()
return element._assemble()
def status(self, element):
# state of dependencies may have changed, recalculate element state
......@@ -87,18 +87,22 @@ class BuildQueue(Queue):
return QueueStatus.READY
def _check_cache_size(self, job, element):
if not job.child_data:
return
def _check_cache_size(self, job, element, artifact_size):
artifact_size = job.child_data.get('artifact_size', False)
# After completing a build job, add the artifact size
# as returned from Element._assemble() to the estimated
# artifact cache size
#
platform = Platform.get_platform()
artifacts = platform.artifactcache
if artifact_size:
cache = element._get_artifact_cache()
cache._add_artifact_size(artifact_size)
artifacts.add_artifact_size(artifact_size)
if cache.get_approximate_cache_size() > cache.cache_quota:
self._scheduler._check_cache_size_real()
# If the estimated size outgrows the quota, ask the scheduler
# to queue a job to actually check the real cache size.
#
if artifacts.get_quota_exceeded():
self._scheduler.check_cache_size()
def done(self, job, element, result, success):
......@@ -106,8 +110,8 @@ class BuildQueue(Queue):
# Inform element in main process that assembly is done
element._assemble_done()
# This has to be done after _assemble_done, such that the
# element may register its cache key as required
self._check_cache_size(job, element)
# This has to be done after _assemble_done, such that the
# element may register its cache key as required
self._check_cache_size(job, element, result)
return True
......@@ -29,7 +29,7 @@ class PullQueue(Queue):
action_name = "Pull"
complete_name = "Pulled"
resources = [ResourceType.DOWNLOAD]
resources = [ResourceType.DOWNLOAD, ResourceType.CACHE]
def process(self, element):
# returns whether an artifact was downloaded or not
......@@ -62,7 +62,7 @@ class PullQueue(Queue):
# Build jobs will check the "approximate" size first. Since we
# do not get an artifact size from pull jobs, we have to
# actually check the cache size.
self._scheduler._check_cache_size_real()
self._scheduler.check_cache_size()
# Element._pull() returns True if it downloaded an artifact,
# here we want to appear skipped if we did not download.
......
......@@ -301,8 +301,6 @@ class Queue():
# Update values that need to be synchronized in the main task
# before calling any queue implementation
self._update_workspaces(element, job)
if job.child_data:
element._get_artifact_cache().cache_size = job.child_data.get('cache_size')
# Give the result of the job to the Queue implementor,
# and determine if it should be considered as processed
......
......@@ -8,7 +8,7 @@ class ResourceType():
class Resources():
def __init__(self, num_builders, num_fetchers, num_pushers):
self._max_resources = {
ResourceType.CACHE: 1,
ResourceType.CACHE: 0,
ResourceType.DOWNLOAD: num_fetchers,
ResourceType.PROCESS: num_builders,
ResourceType.UPLOAD: num_pushers
......
......@@ -241,6 +241,25 @@ class Scheduler():
self._schedule_queue_jobs()
self._sched()
# check_cache_size():
#
# Queues a cache size calculation job, after the cache
# size is calculated, a cleanup job will be run automatically
# if needed.
#
# FIXME: This should ensure that only one cache size job
# is ever pending at a given time. If a cache size
# job is already running, it is correct to queue
# a new one, it is incorrect to have more than one
# of these jobs pending at a given time, though.
#
def check_cache_size(self):
job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size',
resources=[ResourceType.CACHE,
ResourceType.PROCESS],
complete_cb=self._run_cleanup)
self.schedule_jobs([job])
#######################################################
# Local Private Methods #
#######################################################
......@@ -316,26 +335,32 @@ class Scheduler():
self.schedule_jobs(ready)
self._sched()
# _run_cleanup()
#
# Schedules the cache cleanup job if the passed size
# exceeds the cache quota.
#
# Args:
# cache_size (int): The calculated cache size (ignored)
#
# NOTE: This runs in response to completion of the cache size
# calculation job lauched by Scheduler.check_cache_size(),
# which will report the calculated cache size.
#
def _run_cleanup(self, cache_size):
platform = Platform.get_platform()
if cache_size and cache_size < platform.artifactcache.cache_quota:
artifacts = platform.artifactcache
if not artifacts.get_quota_exceeded():
return
job = CleanupJob(self, 'cleanup', 'cleanup',
job = CleanupJob(self, 'cleanup', 'cleanup/cleanup',
resources=[ResourceType.CACHE,
ResourceType.PROCESS],
exclusive_resources=[ResourceType.CACHE],
complete_cb=None)
self.schedule_jobs([job])
def _check_cache_size_real(self):
job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size',
resources=[ResourceType.CACHE,
ResourceType.PROCESS],
exclusive_resources=[ResourceType.CACHE],
complete_cb=self._run_cleanup)
self.schedule_jobs([job])
# _suspend_jobs()
#
# Suspend all ongoing jobs.
......
......@@ -155,6 +155,9 @@ class BuildElement(Element):
command_dir = build_root
sandbox.set_work_directory(command_dir)
# Tell sandbox which directory is preserved in the finished artifact
sandbox.set_output_directory(install_root)
# Setup environment
sandbox.set_environment(self.get_environment())
......
......@@ -204,3 +204,6 @@ shell:
# Command to run when `bst shell` does not provide a command
#
command: [ 'sh', '-i' ]
remote-execution:
url: ""
\ No newline at end of file
......@@ -95,6 +95,7 @@ from . import _site
from ._platform import Platform
from .plugin import CoreWarnings
from .sandbox._config import SandboxConfig
from .sandbox._sandboxremote import SandboxRemote
from .storage.directory import Directory
from .storage._filebaseddirectory import FileBasedDirectory
......@@ -212,7 +213,6 @@ class Element(Plugin):
self.__staged_sources_directory = None # Location where Element.stage_sources() was called
self.__tainted = None # Whether the artifact is tainted and should not be shared
self.__required = False # Whether the artifact is required in the current session
self.__artifact_size = None # The size of data committed to the artifact cache
self.__build_result = None # The result of assembling this Element
self._build_log_path = None # The path of the build log for this Element
......@@ -250,6 +250,12 @@ class Element(Plugin):
# Extract Sandbox config
self.__sandbox_config = self.__extract_sandbox_config(meta)
# Extract remote execution URL
if not self.__is_junction:
self.__remote_execution_url = project.remote_execution_url
else:
self.__remote_execution_url = None
def __lt__(self, other):
return self.name < other.name
......@@ -1502,6 +1508,9 @@ class Element(Plugin):
# - Call the public abstract methods for the build phase
# - Cache the resulting artifact
#
# Returns:
# (int): The size of the newly cached artifact
#
def _assemble(self):
# Assert call ordering
......@@ -1570,6 +1579,8 @@ class Element(Plugin):
finally:
if collect is not None:
try:
# Sandbox will probably have replaced its virtual directory, so get it again
sandbox_vroot = sandbox.get_virtual_directory()
collectvdir = sandbox_vroot.descend(collect.lstrip(os.sep).split(os.sep))
except VirtualDirectoryError:
# No collect directory existed
......@@ -1646,7 +1657,7 @@ class Element(Plugin):
}), os.path.join(metadir, 'workspaced-dependencies.yaml'))
with self.timed_activity("Caching artifact"):
self.__artifact_size = utils._get_dir_size(assembledir)
artifact_size = utils._get_dir_size(assembledir)
self.__artifacts.commit(self, assembledir, self.__get_cache_keys_for_commit())
if collect is not None and collectvdir is None:
......@@ -1658,6 +1669,8 @@ class Element(Plugin):
# Finally cleanup the build dir
cleanup_rootdir()
return artifact_size
def _get_build_log(self):
return self._build_log_path
......@@ -1899,25 +1912,6 @@ class Element(Plugin):
workspaces = self._get_context().get_workspaces()
return workspaces.get_workspace(self._get_full_name())
# _get_artifact_size()
#
# Get the size of the artifact produced by this element in the
# current pipeline - if this element has not been assembled or
# pulled, this will be None.
#
# Note that this is the size of an artifact *before* committing it
# to the cache, the size on disk may differ. It can act as an
# approximate guide for when to do a proper size calculation.
#
# Returns:
# (int|None): The size of the artifact
#
def _get_artifact_size(self):
return self.__artifact_size
def _get_artifact_cache(self):
return self.__artifacts
# _write_script():
#
# Writes a script to the given directory.
......@@ -2146,7 +2140,32 @@ class Element(Plugin):
project = self._get_project()
platform = Platform.get_platform()
if directory is not None and os.path.exists(directory):
if self.__remote_execution_url and self.BST_VIRTUAL_DIRECTORY:
if not self.__artifacts.has_push_remotes(element=self):
# Give an early warning if remote execution will not work
raise ElementError("Artifact {} is configured to use remote execution but has no push remotes. "
.format(self.name) +
"The remote artifact server(s) may not be correctly configured or contactable.")
self.info("Using a remote sandbox for artifact {}".format(self.name))
sandbox = SandboxRemote(context, project,
directory,
stdout=stdout,
stderr=stderr,
config=config,
server_url=self.__remote_execution_url,
allow_real_directory=False)
yield sandbox
elif directory is not None and os.path.exists(directory):
if self.__remote_execution_url:
self.warn("Artifact {} is configured to use remote execution but element plugin does not support it."
.format(self.name), detail="Element plugin '{kind}' does not support virtual directories."
.format(kind=self.get_kind()), warning_token="remote-failure")
self.info("Falling back to local sandbox for artifact {}".format(self.name))
sandbox = platform.create_sandbox(context, project,
directory,
stdout=stdout,
......
......@@ -57,7 +57,8 @@ from buildstream import BuildElement
# Element implementation for the 'autotools' kind.
class AutotoolsElement(BuildElement):
pass
# Supports virtual directories (required for remote execution)
BST_VIRTUAL_DIRECTORY = True
# Plugin entry point
......
......@@ -50,6 +50,40 @@ variables:
#
# notparallel: True
# Automatically remove libtool archive files
#
# Set remove-libtool-modules to "true" to remove .la files for
# modules intended to be opened with lt_dlopen()
#
# Set remove-libtool-libraries to "true" to remove .la files for
# libraries
#
# Value must be "true" or "false"
remove-libtool-modules: "false"
remove-libtool-libraries: "false"
delete-libtool-archives: |
if %{remove-libtool-modules} || %{remove-libtool-libraries}; then
find "%{install-root}" -name "*.la" -print0 | while read -d '' -r file; do
if grep '^shouldnotlink=yes$' "${file}" &>/dev/null; then
if %{remove-libtool-modules}; then
echo "Removing ${file}."
rm "${file}"
else
echo "Not removing ${file}."
fi
else
if %{remove-libtool-libraries}; then
echo "Removing ${file}."
rm "${file}"
else
echo "Not removing ${file}."
fi
fi
done
fi
config:
# Commands for configuring the software
......@@ -72,6 +106,8 @@ config:
install-commands:
- |
%{make-install}
- |
%{delete-libtool-archives}
# Commands for stripping debugging information out of
# installed binaries
......
......@@ -56,7 +56,8 @@ from buildstream import BuildElement
# Element implementation for the 'cmake' kind.
class CMakeElement(BuildElement):
pass
# Supports virtual directories (required for remote execution)
BST_VIRTUAL_DIRECTORY = True
# Plugin entry point
......
......@@ -38,7 +38,8 @@ from buildstream import BuildElement
# Element implementation for the 'make' kind.
class MakeElement(BuildElement):
pass
# Supports virtual directories (required for remote execution)
BST_VIRTUAL_DIRECTORY = True
# Plugin entry point
......
......@@ -53,7 +53,8 @@ from buildstream import BuildElement
# Element implementation for the 'meson' kind.
class MesonElement(BuildElement):
pass
# Supports virtual directories (required for remote execution)
BST_VIRTUAL_DIRECTORY = True
# Plugin entry point
......