Commit 6152a966 authored by bst-marge-bot's avatar bst-marge-bot

Merge branch 'juerg/partial-cas' into 'master'

Do not fetch file blobs for build-only dependencies with remote execution

See merge request !1278
parents 4c391dd6 78ed45d0
Pipeline #56485227 failed with stages
in 40 minutes and 57 seconds
......@@ -469,7 +469,7 @@ class Artifact():
# Determine whether directories are required
require_directories = context.require_artifact_directories
# Determine whether file contents are required as well
require_files = context.require_artifact_files
require_files = context.require_artifact_files or self._element._artifact_files_required()
filesdigest = vdir._get_child_digest('files')
......
......@@ -436,3 +436,30 @@ class ArtifactCache(BaseCache):
if missing_blobs:
raise ArtifactError("Blobs not found on configured artifact servers")
# find_missing_blobs():
#
# Find missing blobs from configured push remote repositories.
#
# Args:
# project (Project): The current project
# missing_blobs (list): The Digests of the blobs to check
#
# Returns:
# (list): The Digests of the blobs missing on at least one push remote
#
def find_missing_blobs(self, project, missing_blobs):
if not missing_blobs:
return []
push_remotes = [r for r in self._remotes[project] if r.spec.push]
remote_missing_blobs_set = set()
for remote in push_remotes:
remote.init()
remote_missing_blobs = self.cas.remote_missing_blobs(remote, missing_blobs)
remote_missing_blobs_set.update(remote_missing_blobs)
return list(remote_missing_blobs_set)
......@@ -268,15 +268,13 @@ class CASCache():
request.key = ref
response = remote.ref_storage.GetReference(request)
tree = remote_execution_pb2.Digest()
tree.hash = response.digest.hash
tree.size_bytes = response.digest.size_bytes
tree = response.digest
# Fetch Directory objects
self._fetch_directory(remote, tree)
# Fetch files, excluded_subdirs determined in pullqueue
required_blobs = self._required_blobs(tree, excluded_subdirs=excluded_subdirs)
required_blobs = self.required_blobs_for_directory(tree, excluded_subdirs=excluded_subdirs)
missing_blobs = self.local_missing_blobs(required_blobs)
if missing_blobs:
self.fetch_blobs(remote, missing_blobs)
......@@ -368,8 +366,7 @@ class CASCache():
request = buildstream_pb2.UpdateReferenceRequest(instance_name=remote.spec.instance_name)
request.keys.append(ref)
request.digest.hash = tree.hash
request.digest.size_bytes = tree.size_bytes
request.digest.CopyFrom(tree)
remote.ref_storage.UpdateReference(request)
skipped_remote = False
......@@ -647,23 +644,33 @@ class CASCache():
# Returns: List of missing Digest objects
#
def remote_missing_blobs_for_directory(self, remote, digest):
required_blobs = self._required_blobs(digest)
required_blobs = self.required_blobs_for_directory(digest)
return self.remote_missing_blobs(remote, required_blobs)
# remote_missing_blobs():
#
# Determine which blobs are missing on the remote.
#
# Args:
# blobs (Digest): The directory digest
#
# Returns: List of missing Digest objects
#
def remote_missing_blobs(self, remote, blobs):
missing_blobs = dict()
# Limit size of FindMissingBlobs request
for required_blobs_group in _grouper(required_blobs, 512):
for required_blobs_group in _grouper(blobs, 512):
request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name)
for required_digest in required_blobs_group:
d = request.blob_digests.add()
d.hash = required_digest.hash
d.size_bytes = required_digest.size_bytes
d.CopyFrom(required_digest)
response = remote.cas.FindMissingBlobs(request)
for missing_digest in response.missing_blob_digests:
d = remote_execution_pb2.Digest()
d.hash = missing_digest.hash
d.size_bytes = missing_digest.size_bytes
d.CopyFrom(missing_digest)
missing_blobs[d.hash] = d
return missing_blobs.values()
......@@ -685,6 +692,31 @@ class CASCache():
missing_blobs.append(digest)
return missing_blobs
# required_blobs_for_directory():
#
# Generator that returns the Digests of all blobs in the tree specified by
# the Digest of the toplevel Directory object.
#
def required_blobs_for_directory(self, directory_digest, *, excluded_subdirs=None):
if not excluded_subdirs:
excluded_subdirs = []
# parse directory, and recursively add blobs
yield directory_digest
directory = remote_execution_pb2.Directory()
with open(self.objpath(directory_digest), 'rb') as f:
directory.ParseFromString(f.read())
for filenode in directory.files:
yield filenode.digest
for dirnode in directory.directories:
if dirnode.name not in excluded_subdirs:
yield from self.required_blobs_for_directory(dirnode.digest)
################################################
# Local Private Methods #
################################################
......@@ -881,31 +913,6 @@ class CASCache():
for dirnode in directory.directories:
self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime, check_exists=check_exists)
def _required_blobs(self, directory_digest, *, excluded_subdirs=None):
if not excluded_subdirs:
excluded_subdirs = []
# parse directory, and recursively add blobs
d = remote_execution_pb2.Digest()
d.hash = directory_digest.hash
d.size_bytes = directory_digest.size_bytes
yield d
directory = remote_execution_pb2.Directory()
with open(self.objpath(directory_digest), 'rb') as f:
directory.ParseFromString(f.read())
for filenode in directory.files:
d = remote_execution_pb2.Digest()
d.hash = filenode.digest.hash
d.size_bytes = filenode.digest.size_bytes
yield d
for dirnode in directory.directories:
if dirnode.name not in excluded_subdirs:
yield from self._required_blobs(dirnode.digest)
# _temporary_object():
#
# Returns:
......@@ -1042,11 +1049,6 @@ class CASCache():
tree.children.extend([tree.root])
for directory in tree.children:
for filenode in directory.files:
self._ensure_blob(remote, filenode.digest)
# place directory blob only in final location when we've downloaded
# all referenced blobs to avoid dangling references in the repository
dirbuffer = directory.SerializeToString()
dirdigest = self.add_object(buffer=dirbuffer)
assert dirdigest.size_bytes == len(dirbuffer)
......
......@@ -247,6 +247,13 @@ class Stream():
# Assert that the elements we're not going to track are consistent
self._pipeline.assert_consistent(elements)
if all(project.remote_execution_specs for project in self._context.get_projects()):
# Remote execution is configured for all projects.
# Require artifact files only for target elements and their runtime dependencies.
self._context.set_artifact_files_optional()
for element in self.targets:
element._set_artifact_files_required()
# Now construct the queues
#
track_queue = None
......
......@@ -227,6 +227,7 @@ class Element(Plugin):
self.__staged_sources_directory = None # Location where Element.stage_sources() was called
self.__tainted = None # Whether the artifact is tainted and should not be shared
self.__required = False # Whether the artifact is required in the current session
self.__artifact_files_required = False # Whether artifact files are required in the local cache
self.__build_result = None # The result of assembling this Element (success, description, detail)
self._build_log_path = None # The path of the build log for this Element
self.__artifact = Artifact(self, context) # Artifact class for direct artifact composite interaction
......@@ -1556,6 +1557,29 @@ class Element(Plugin):
def _is_required(self):
return self.__required
# _set_artifact_files_required():
#
# Mark artifact files for this element and its runtime dependencies as
# required in the local cache.
#
def _set_artifact_files_required(self):
if self.__artifact_files_required:
# Already done
return
self.__artifact_files_required = True
# Request artifact files of runtime dependencies
for dep in self.dependencies(Scope.RUN, recurse=False):
dep._set_artifact_files_required()
# _artifact_files_required():
#
# Returns whether artifact files for this element have been marked as required.
#
def _artifact_files_required(self):
return self.__artifact_files_required
# _schedule_assemble():
#
# This is called in the main process before the element is assembled
......@@ -1661,6 +1685,15 @@ class Element(Plugin):
with _signals.terminator(cleanup_rootdir), \
self.__sandbox(rootdir, output_file, output_file, self.__sandbox_config) as sandbox: # noqa
# Let the sandbox know whether the buildtree will be required.
# This allows the remote execution sandbox to skip buildtree
# download when it's not needed.
buildroot = self.get_variable('build-root')
cache_buildtrees = context.cache_buildtrees
if cache_buildtrees != 'never':
always_cache_buildtrees = cache_buildtrees == 'always'
sandbox._set_build_directory(buildroot, always=always_cache_buildtrees)
if not self.BST_RUN_COMMANDS:
# Element doesn't need to run any commands in the sandbox.
#
......@@ -2348,7 +2381,7 @@ class Element(Plugin):
# supports it.
#
def __use_remote_execution(self):
return self.__remote_execution_specs and self.BST_VIRTUAL_DIRECTORY
return bool(self.__remote_execution_specs)
# __sandbox():
#
......@@ -2376,8 +2409,15 @@ class Element(Plugin):
if directory is not None and allow_remote and self.__use_remote_execution():
if not self.BST_VIRTUAL_DIRECTORY:
raise ElementError("Element {} is configured to use remote execution but plugin does not support it."
.format(self.name), detail="Plugin '{kind}' does not support virtual directories."
.format(kind=self.get_kind()))
self.info("Using a remote sandbox for artifact {} with directory '{}'".format(self.name, directory))
output_files_required = context.require_artifact_files or self._artifact_files_required()
sandbox = SandboxRemote(context, project,
directory,
plugin=self,
......@@ -2386,16 +2426,11 @@ class Element(Plugin):
config=config,
specs=self.__remote_execution_specs,
bare_directory=bare_directory,
allow_real_directory=False)
allow_real_directory=False,
output_files_required=output_files_required)
yield sandbox
elif directory is not None and os.path.exists(directory):
if allow_remote and self.__remote_execution_specs:
self.warn("Artifact {} is configured to use remote execution but element plugin does not support it."
.format(self.name), detail="Element plugin '{kind}' does not support virtual directories."
.format(kind=self.get_kind()), warning_token="remote-failure")
self.info("Falling back to local sandbox for artifact {}".format(self.name))
sandbox = platform.create_sandbox(context, project,
directory,
......@@ -2960,6 +2995,11 @@ class Element(Plugin):
subdir = "buildtree"
excluded_subdirs.remove(subdir)
# If file contents are not required for this element, don't pull them.
# The directories themselves will always be pulled.
if not context.require_artifact_files and not self._artifact_files_required():
excluded_subdirs.append("files")
return (subdir, excluded_subdirs)
# __cache_sources():
......
......@@ -29,6 +29,7 @@ import grpc
from .. import utils
from .._message import Message, MessageType
from .sandbox import Sandbox, SandboxCommandError, _SandboxBatch
from ..storage.directory import VirtualDirectoryError
from ..storage._casbaseddirectory import CasBasedDirectory
from .. import _signals
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
......@@ -53,6 +54,8 @@ class SandboxRemote(Sandbox):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._output_files_required = kwargs.get('output_files_required', True)
config = kwargs['specs'] # This should be a RemoteExecutionSpec
if config is None:
return
......@@ -251,7 +254,7 @@ class SandboxRemote(Sandbox):
raise SandboxError("Failed trying to send CancelOperation request: "
"{} ({})".format(e.details(), e.code().name))
def process_job_output(self, output_directories, output_files):
def process_job_output(self, output_directories, output_files, *, failure):
# Reads the remote execution server response to an execution request.
#
# output_directories is an array of OutputDirectory objects.
......@@ -274,10 +277,12 @@ class SandboxRemote(Sandbox):
raise SandboxError("Output directory structure had no digest attached.")
context = self._get_context()
project = self._get_project()
cascache = context.get_cascache()
artifactcache = context.artifactcache
casremote = CASRemote(self.storage_remote_spec)
# Now do a pull to ensure we have the necessary parts.
# Now do a pull to ensure we have the full directory structure.
dir_digest = cascache.pull_tree(casremote, tree_digest)
if dir_digest is None or not dir_digest.hash or not dir_digest.size_bytes:
raise SandboxError("Output directory structure pulling from remote failed.")
......@@ -289,6 +294,42 @@ class SandboxRemote(Sandbox):
new_dir = CasBasedDirectory(context.artifactcache.cas, digest=dir_digest)
self._set_virtual_directory(new_dir)
# Fetch the file blobs if needed
if self._output_files_required or artifactcache.has_push_remotes():
required_blobs = []
directories = []
directories.append(self._output_directory)
if self._build_directory and (self._build_directory_always or failure):
directories.append(self._build_directory)
for directory in directories:
try:
vdir = new_dir.descend(*directory.strip(os.sep).split(os.sep))
dir_digest = vdir._get_digest()
required_blobs += cascache.required_blobs_for_directory(dir_digest)
except VirtualDirectoryError:
# If the directory does not exist, there is no need to
# download file blobs.
pass
local_missing_blobs = cascache.local_missing_blobs(required_blobs)
if local_missing_blobs:
if self._output_files_required:
# Fetch all blobs from Remote Execution CAS server
blobs_to_fetch = local_missing_blobs
else:
# Output files are not required in the local cache,
# however, artifact push remotes will need them.
# Only fetch blobs that are missing on one or multiple
# artifact servers.
blobs_to_fetch = artifactcache.find_missing_blobs(project, local_missing_blobs)
remote_missing_blobs = cascache.fetch_blobs(casremote, blobs_to_fetch)
if remote_missing_blobs:
raise SandboxError("{} output files are missing on the CAS server"
.format(len(remote_missing_blobs)))
def _run(self, command, flags, *, cwd, env):
stdout, stderr = self._get_output()
......@@ -377,7 +418,8 @@ class SandboxRemote(Sandbox):
action_result = self._extract_action_result(operation)
# Get output of build
self.process_job_output(action_result.output_directories, action_result.output_files)
self.process_job_output(action_result.output_directories, action_result.output_files,
failure=action_result.exit_code != 0)
if stdout:
if action_result.stdout_raw:
......
......@@ -147,6 +147,8 @@ class Sandbox():
os.makedirs(directory_, exist_ok=True)
self._output_directory = None
self._build_directory = None
self._build_directory_always = None
self._vdir = None
self._usebuildtree = False
......@@ -592,6 +594,20 @@ class Sandbox():
def _disable_run(self):
self.__allow_run = False
# _set_build_directory()
#
# Sets the build directory - the directory which may be preserved as
# buildtree in the artifact.
#
# Args:
# directory (str): An absolute path within the sandbox
# always (bool): True if the build directory should always be downloaded,
# False if it should be downloaded only on failure
#
def _set_build_directory(self, directory, *, always):
self._build_directory = directory
self._build_directory_always = always
# _SandboxBatch()
#
......
import os
import pytest
from buildstream._exceptions import ErrorDomain
from buildstream.plugintestutils import cli_remote_execution as cli
from buildstream.plugintestutils.integration import assert_contains
pytestmark = pytest.mark.remoteexecution
DATA_DIR = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"project"
)
# Test that `bst build` does not download file blobs of a build-only dependency
# to the local cache.
@pytest.mark.datafiles(DATA_DIR)
def test_build_dependency_partial_local_cas(cli, datafiles):
project = str(datafiles)
element_name = 'no-runtime-deps.bst'
builddep_element_name = 'autotools/amhello.bst'
checkout = os.path.join(cli.directory, 'checkout')
builddep_checkout = os.path.join(cli.directory, 'builddep-checkout')
services = cli.ensure_services()
assert set(services) == set(['action-cache', 'execution', 'storage'])
result = cli.run(project=project, args=['build', element_name])
result.assert_success()
# Verify that the target element is available in local cache
result = cli.run(project=project, args=['artifact', 'checkout', element_name,
'--directory', checkout])
result.assert_success()
assert_contains(checkout, ['/test'])
# Verify that the build-only dependency is not (complete) in the local cache
result = cli.run(project=project, args=['artifact', 'checkout', builddep_element_name,
'--directory', builddep_checkout])
result.assert_main_error(ErrorDomain.STREAM, 'uncached-checkout-attempt')
kind: manual
description: Test element without runtime dependencies
build-depends:
- autotools/amhello.bst
config:
install-commands:
- echo Test > %{install-root}/test
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment