Commit 49c22f20 authored by bst-marge-bot's avatar bst-marge-bot

Merge branch 'juerg/partial-cas-remote' into 'master'

Initial partial CAS support for remote execution

See merge request !1232
parents bcf02294 60290223
Pipeline #53938494 failed with stage
in 391 minutes and 16 seconds
......@@ -359,30 +359,6 @@ class ArtifactCache(BaseCache):
return None
# push_directory():
#
# Push the given virtual directory to all remotes.
#
# Args:
# project (Project): The current project
# directory (Directory): A virtual directory object to push.
#
# Raises:
# (ArtifactError): if there was an error
#
def push_directory(self, project, directory):
if self._has_push_remotes:
push_remotes = [r for r in self._remotes[project] if r.spec.push]
else:
push_remotes = []
if not push_remotes:
raise ArtifactError("push_directory was called, but no remote artifact " +
"servers are configured as push remotes.")
for remote in push_remotes:
self.cas.push_directory(remote, directory)
# push_message():
#
# Push the given protobuf message to all remotes.
......@@ -439,3 +415,24 @@ class ArtifactCache(BaseCache):
cache_id = self.cas.resolve_ref(ref, update_mtime=True)
vdir = CasBasedDirectory(self.cas, digest=cache_id).descend('logs')
return vdir
# fetch_missing_blobs():
#
# Fetch missing blobs from configured remote repositories.
#
# Args:
# project (Project): The current project
# missing_blobs (list): The Digests of the blobs to fetch
#
def fetch_missing_blobs(self, project, missing_blobs):
for remote in self._remotes[project]:
if not missing_blobs:
break
remote.init()
# fetch_blobs() will return the blobs that are still missing
missing_blobs = self.cas.fetch_blobs(remote, missing_blobs)
if missing_blobs:
raise ArtifactError("Blobs not found on configured artifact servers")
......@@ -272,8 +272,14 @@ class CASCache():
tree.hash = response.digest.hash
tree.size_bytes = response.digest.size_bytes
# Fetch artifact, excluded_subdirs determined in pullqueue
self._fetch_directory(remote, tree, excluded_subdirs=excluded_subdirs)
# Fetch Directory objects
self._fetch_directory(remote, tree)
# Fetch files, excluded_subdirs determined in pullqueue
required_blobs = self._required_blobs(tree, excluded_subdirs=excluded_subdirs)
missing_blobs = self.local_missing_blobs(required_blobs)
if missing_blobs:
self.fetch_blobs(remote, missing_blobs)
self.set_ref(ref, tree)
......@@ -373,23 +379,6 @@ class CASCache():
return not skipped_remote
# push_directory():
#
# Push the given virtual directory to a remote.
#
# Args:
# remote (CASRemote): The remote to push to
# directory (Directory): A virtual directory object to push.
#
# Raises:
# (CASCacheError): if there was an error
#
def push_directory(self, remote, directory):
remote.init()
digest = directory._get_digest()
self._send_directory(remote, digest)
# objpath():
#
# Return the path of an object based on its digest.
......@@ -648,6 +637,54 @@ class CASCache():
reachable = set()
self._reachable_refs_dir(reachable, tree, update_mtime=True)
# remote_missing_blobs_for_directory():
#
# Determine which blobs of a directory tree are missing on the remote.
#
# Args:
# digest (Digest): The directory digest
#
# Returns: List of missing Digest objects
#
def remote_missing_blobs_for_directory(self, remote, digest):
required_blobs = self._required_blobs(digest)
missing_blobs = dict()
# Limit size of FindMissingBlobs request
for required_blobs_group in _grouper(required_blobs, 512):
request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name)
for required_digest in required_blobs_group:
d = request.blob_digests.add()
d.hash = required_digest.hash
d.size_bytes = required_digest.size_bytes
response = remote.cas.FindMissingBlobs(request)
for missing_digest in response.missing_blob_digests:
d = remote_execution_pb2.Digest()
d.hash = missing_digest.hash
d.size_bytes = missing_digest.size_bytes
missing_blobs[d.hash] = d
return missing_blobs.values()
# local_missing_blobs():
#
# Check local cache for missing blobs.
#
# Args:
# digests (list): The Digests of blobs to check
#
# Returns: Missing Digest objects
#
def local_missing_blobs(self, digests):
missing_blobs = []
for digest in digests:
objpath = self.objpath(digest)
if not os.path.exists(objpath):
missing_blobs.append(digest)
return missing_blobs
################################################
# Local Private Methods #
################################################
......@@ -841,7 +878,10 @@ class CASCache():
for dirnode in directory.directories:
self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime)
def _required_blobs(self, directory_digest):
def _required_blobs(self, directory_digest, *, excluded_subdirs=None):
if not excluded_subdirs:
excluded_subdirs = []
# parse directory, and recursively add blobs
d = remote_execution_pb2.Digest()
d.hash = directory_digest.hash
......@@ -860,7 +900,8 @@ class CASCache():
yield d
for dirnode in directory.directories:
yield from self._required_blobs(dirnode.digest)
if dirnode.name not in excluded_subdirs:
yield from self._required_blobs(dirnode.digest)
# _temporary_object():
#
......@@ -900,8 +941,8 @@ class CASCache():
return objpath
def _batch_download_complete(self, batch):
for digest, data in batch.send():
def _batch_download_complete(self, batch, *, missing_blobs=None):
for digest, data in batch.send(missing_blobs=missing_blobs):
with self._temporary_object() as f:
f.write(data)
f.flush()
......@@ -953,21 +994,19 @@ class CASCache():
#
# Fetches remote directory and adds it to content addressable store.
#
# Fetches files, symbolic links and recursively other directories in
# the remote directory and adds them to the content addressable
# store.
# This recursively fetches directory objects but doesn't fetch any
# files.
#
# Args:
# remote (Remote): The remote to use.
# dir_digest (Digest): Digest object for the directory to fetch.
# excluded_subdirs (list): The optional list of subdirs to not fetch
#
def _fetch_directory(self, remote, dir_digest, *, excluded_subdirs=None):
def _fetch_directory(self, remote, dir_digest):
# TODO Use GetTree() if the server supports it
fetch_queue = [dir_digest]
fetch_next_queue = []
batch = _CASBatchRead(remote)
if not excluded_subdirs:
excluded_subdirs = []
while len(fetch_queue) + len(fetch_next_queue) > 0:
if not fetch_queue:
......@@ -982,13 +1021,8 @@ class CASCache():
directory.ParseFromString(f.read())
for dirnode in directory.directories:
if dirnode.name not in excluded_subdirs:
batch = self._fetch_directory_node(remote, dirnode.digest, batch,
fetch_queue, fetch_next_queue, recursive=True)
for filenode in directory.files:
batch = self._fetch_directory_node(remote, filenode.digest, batch,
fetch_queue, fetch_next_queue)
batch = self._fetch_directory_node(remote, dirnode.digest, batch,
fetch_queue, fetch_next_queue, recursive=True)
# Fetch final batch
self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
......@@ -1016,30 +1050,55 @@ class CASCache():
return dirdigest
def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
required_blobs = self._required_blobs(digest)
# fetch_blobs():
#
# Fetch blobs from remote CAS. Returns missing blobs that could not be fetched.
#
# Args:
# remote (CASRemote): The remote repository to fetch from
# digests (list): The Digests of blobs to fetch
#
# Returns: The Digests of the blobs that were not available on the remote CAS
#
def fetch_blobs(self, remote, digests):
missing_blobs = []
missing_blobs = dict()
# Limit size of FindMissingBlobs request
for required_blobs_group in _grouper(required_blobs, 512):
request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name)
batch = _CASBatchRead(remote)
for required_digest in required_blobs_group:
d = request.blob_digests.add()
d.hash = required_digest.hash
d.size_bytes = required_digest.size_bytes
for digest in digests:
if (digest.size_bytes >= remote.max_batch_total_size_bytes or
not remote.batch_read_supported):
# Too large for batch request, download in independent request.
try:
self._ensure_blob(remote, digest)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.NOT_FOUND:
missing_blobs.append(digest)
else:
raise CASCacheError("Failed to fetch blob: {}".format(e)) from e
else:
if not batch.add(digest):
# Not enough space left in batch request.
# Complete pending batch first.
self._batch_download_complete(batch, missing_blobs=missing_blobs)
response = remote.cas.FindMissingBlobs(request)
for missing_digest in response.missing_blob_digests:
d = remote_execution_pb2.Digest()
d.hash = missing_digest.hash
d.size_bytes = missing_digest.size_bytes
missing_blobs[d.hash] = d
batch = _CASBatchRead(remote)
batch.add(digest)
# Upload any blobs missing on the server
self._send_blobs(remote, missing_blobs.values(), u_uid)
# Complete last pending batch
self._batch_download_complete(batch, missing_blobs=missing_blobs)
return missing_blobs
def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
# send_blobs():
#
# Upload blobs to remote CAS.
#
# Args:
# remote (CASRemote): The remote repository to upload to
# digests (list): The Digests of Blobs to upload
#
def send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
batch = _CASBatchUpdate(remote)
for digest in digests:
......@@ -1061,6 +1120,12 @@ class CASCache():
# Send final batch
batch.send()
def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
missing_blobs = self.remote_missing_blobs_for_directory(remote, digest)
# Upload any blobs missing on the server
self.send_blobs(remote, missing_blobs, u_uid)
class CASQuota:
def __init__(self, context):
......
......@@ -221,28 +221,6 @@ class CASRemote():
return error
# verify_digest_on_remote():
#
# Check whether the object is already on the server in which case
# there is no need to upload it.
#
# Args:
# digest (Digest): The object digest.
#
def verify_digest_on_remote(self, digest):
self.init()
request = remote_execution_pb2.FindMissingBlobsRequest()
if self.instance_name:
request.instance_name = self.instance_name
request.blob_digests.extend([digest])
response = self.cas.FindMissingBlobs(request)
if digest in response.missing_blob_digests:
return False
return True
# push_message():
#
# Push the given protobuf message to a remote.
......@@ -344,7 +322,7 @@ class _CASBatchRead():
self._size = new_batch_size
return True
def send(self):
def send(self, *, missing_blobs=None):
assert not self._sent
self._sent = True
......@@ -355,8 +333,12 @@ class _CASBatchRead():
for response in batch_response.responses:
if response.status.code == code_pb2.NOT_FOUND:
raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format(
response.digest.hash, response.status.code))
if missing_blobs is None:
raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format(
response.digest.hash, response.status.code))
else:
missing_blobs.append(response.digest)
if response.status.code != code_pb2.OK:
raise CASRemoteError("Failed to download blob {}: {}".format(
response.digest.hash, response.status.code))
......
......@@ -34,7 +34,7 @@ from ..storage._casbaseddirectory import CasBasedDirectory
from .. import _signals
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from .._protos.google.rpc import code_pb2
from .._exceptions import SandboxError
from .._exceptions import BstError, SandboxError
from .. import _yaml
from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
from .._cas import CASRemote, CASRemoteSpec
......@@ -293,9 +293,13 @@ class SandboxRemote(Sandbox):
def _run(self, command, flags, *, cwd, env):
stdout, stderr = self._get_output()
context = self._get_context()
project = self._get_project()
cascache = context.get_cascache()
artifactcache = context.artifactcache
# set up virtual dircetory
upload_vdir = self.get_virtual_directory()
cascache = self._get_context().get_cascache()
# Create directories for all marked directories. This emulates
# some of the behaviour of other sandboxes, which create these
......@@ -331,15 +335,32 @@ class SandboxRemote(Sandbox):
if not action_result:
casremote = CASRemote(self.storage_remote_spec)
try:
casremote.init()
except grpc.RpcError as e:
raise SandboxError("Failed to contact remote execution CAS endpoint at {}: {}"
.format(self.storage_url, e)) from e
# Now, push that key (without necessarily needing a ref) to the remote.
# Determine blobs missing on remote
try:
cascache.push_directory(casremote, upload_vdir)
missing_blobs = cascache.remote_missing_blobs_for_directory(casremote, input_root_digest)
except grpc.RpcError as e:
raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
raise SandboxError("Failed to determine missing blobs: {}".format(e)) from e
if not casremote.verify_digest_on_remote(upload_vdir._get_digest()):
raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
# Check if any blobs are also missing locally (partial artifact)
# and pull them from the artifact cache.
try:
local_missing_blobs = cascache.local_missing_blobs(missing_blobs)
if local_missing_blobs:
artifactcache.fetch_missing_blobs(project, local_missing_blobs)
except (grpc.RpcError, BstError) as e:
raise SandboxError("Failed to pull missing blobs from artifact cache: {}".format(e)) from e
# Now, push the missing blobs to the remote.
try:
cascache.send_blobs(casremote, missing_blobs)
except grpc.RpcError as e:
raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
# Push command and action
try:
......
......@@ -135,114 +135,6 @@ def _test_push(user_config_file, project_dir, element_name, element_key, queue):
queue.put("No remote configured for element {}".format(element_name))
@pytest.mark.datafiles(DATA_DIR)
def test_push_directory(cli, tmpdir, datafiles):
project_dir = str(datafiles)
# First build the project without the artifact cache configured
result = cli.run(project=project_dir, args=['build', 'target.bst'])
result.assert_success()
# Assert that we are now cached locally
assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
# Set up an artifact cache.
with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
# Configure artifact share
rootcache_dir = os.path.join(str(tmpdir), 'cache')
user_config_file = str(tmpdir.join('buildstream.conf'))
user_config = {
'scheduler': {
'pushers': 1
},
'artifacts': {
'url': share.repo,
'push': True,
},
'cachedir': rootcache_dir
}
# Write down the user configuration file
_yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file)
# Fake minimal context
context = Context()
context.load(config=user_config_file)
context.set_message_handler(message_handler)
# Load the project and CAS cache
project = Project(project_dir, context)
project.ensure_fully_loaded()
artifactcache = context.artifactcache
cas = artifactcache.cas
# Assert that the element's artifact is cached
element = project.load_elements(['target.bst'])[0]
element_key = cli.get_element_key(project_dir, 'target.bst')
assert artifactcache.contains(element, element_key)
# Manually setup the CAS remote
artifactcache.setup_remotes(use_config=True)
artifactcache.initialize_remotes()
assert artifactcache.has_push_remotes(plugin=element)
# Recreate the CasBasedDirectory object from the cached artifact
artifact_ref = element.get_artifact_name(element_key)
artifact_digest = cas.resolve_ref(artifact_ref)
queue = multiprocessing.Queue()
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
process = multiprocessing.Process(target=_queue_wrapper,
args=(_test_push_directory, queue, user_config_file,
project_dir, artifact_digest))
try:
# Keep SIGINT blocked in the child process
with _signals.blocked([signal.SIGINT], ignore=False):
process.start()
directory_hash = queue.get()
process.join()
except KeyboardInterrupt:
utils._kill_process_tree(process.pid)
raise
assert directory_hash
assert artifact_digest.hash == directory_hash
assert share.has_object(artifact_digest)
def _test_push_directory(user_config_file, project_dir, artifact_digest, queue):
# Fake minimal context
context = Context()
context.load(config=user_config_file)
context.set_message_handler(message_handler)
# Load the project manually
project = Project(project_dir, context)
project.ensure_fully_loaded()
# Create a local CAS cache handle
cas = context.artifactcache
# Manually setup the CAS remote
cas.setup_remotes(use_config=True)
cas.initialize_remotes()
if cas.has_push_remotes():
# Create a CasBasedDirectory from local CAS cache content
directory = CasBasedDirectory(context.artifactcache.cas, digest=artifact_digest)
# Push the CasBasedDirectory object
cas.push_directory(project, directory)
digest = directory._get_digest()
queue.put(digest.hash)
else:
queue.put("No remote configured")
@pytest.mark.datafiles(DATA_DIR)
def test_push_message(tmpdir, datafiles):
project_dir = str(datafiles)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment