Skip to content
Snippets Groups Projects
Commit 9bd3a625 authored by Jim MacArthur's avatar Jim MacArthur Committed by Martin Blanchard
Browse files

cascache.py: Modifications necessary for remote execution

Add helper method for object pushing and pulling: push_directory() and
pull_tree() and a verifier verify_key_pushed().

#454
parent 6a0cdedf
No related branches found
No related tags found
No related merge requests found
......@@ -19,6 +19,7 @@
import hashlib
import itertools
import io
import multiprocessing
import os
import signal
......@@ -66,6 +67,10 @@ class CASCache(ArtifactCache):
self._calculate_cache_quota()
self._enable_push = enable_push
if self._enable_push:
self._uuid = str(uuid.uuid4())
else:
self._uuid = None
# Per-project list of _CASRemote instances.
self._remotes = {}
......@@ -76,6 +81,7 @@ class CASCache(ArtifactCache):
################################################
# Implementation of abstract methods #
################################################
def contains(self, element, key):
refpath = self._refpath(self.get_artifact_fullname(element, key))
......@@ -259,6 +265,25 @@ class CASCache(ArtifactCache):
return False
def pull_tree(self, project, digest):
""" Pull a single Tree rather than an artifact.
Does not update local refs. """
for remote in self._remotes[project]:
try:
remote.init()
digest = self._fetch_tree(remote, digest)
# no need to pull from additional remotes
return digest
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
raise
return None
def link_key(self, element, oldkey, newkey):
oldref = self.get_artifact_fullname(element, oldkey)
newref = self.get_artifact_fullname(element, newkey)
......@@ -267,106 +292,123 @@ class CASCache(ArtifactCache):
self.set_ref(newref, tree)
def _push_refs_to_remote(self, refs, remote):
skipped_remote = True
try:
for ref in refs:
tree = self.resolve_ref(ref)
# Check whether ref is already on the server in which case
# there is no need to push the artifact
try:
request = buildstream_pb2.GetReferenceRequest()
request.key = ref
response = remote.ref_storage.GetReference(request)
if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
# ref is already on the server with the same tree
continue
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
# Intentionally re-raise RpcError for outer except block.
raise
self._send_directory(remote, tree)
request = buildstream_pb2.UpdateReferenceRequest()
request.keys.append(ref)
request.digest.hash = tree.hash
request.digest.size_bytes = tree.size_bytes
remote.ref_storage.UpdateReference(request)
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
return not skipped_remote
def push(self, element, keys):
refs = [self.get_artifact_fullname(element, key) for key in keys]
project = element._get_project()
refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
push_remotes = [r for r in self._remotes[project] if r.spec.push]
pushed = False
for remote in push_remotes:
remote.init()
skipped_remote = True
element.info("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url))
try:
for ref in refs:
tree = self.resolve_ref(ref)
# Check whether ref is already on the server in which case
# there is no need to push the artifact
try:
request = buildstream_pb2.GetReferenceRequest()
request.key = ref
response = remote.ref_storage.GetReference(request)
if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
# ref is already on the server with the same tree
continue
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
# Intentionally re-raise RpcError for outer except block.
raise
missing_blobs = {}
required_blobs = self._required_blobs(tree)
# Limit size of FindMissingBlobs request
for required_blobs_group in _grouper(required_blobs, 512):
request = remote_execution_pb2.FindMissingBlobsRequest()
for required_digest in required_blobs_group:
d = request.blob_digests.add()
d.hash = required_digest.hash
d.size_bytes = required_digest.size_bytes
response = remote.cas.FindMissingBlobs(request)
for digest in response.missing_blob_digests:
d = remote_execution_pb2.Digest()
d.hash = digest.hash
d.size_bytes = digest.size_bytes
missing_blobs[d.hash] = d
# Upload any blobs missing on the server
skipped_remote = False
for digest in missing_blobs.values():
uuid_ = uuid.uuid4()
resource_name = '/'.join(['uploads', str(uuid_), 'blobs',
digest.hash, str(digest.size_bytes)])
def request_stream(resname):
with open(self.objpath(digest), 'rb') as f:
assert os.fstat(f.fileno()).st_size == digest.size_bytes
offset = 0
finished = False
remaining = digest.size_bytes
while not finished:
chunk_size = min(remaining, 64 * 1024)
remaining -= chunk_size
request = bytestream_pb2.WriteRequest()
request.write_offset = offset
# max. 64 kB chunks
request.data = f.read(chunk_size)
request.resource_name = resname
request.finish_write = remaining <= 0
yield request
offset += chunk_size
finished = request.finish_write
response = remote.bytestream.Write(request_stream(resource_name))
request = buildstream_pb2.UpdateReferenceRequest()
request.keys.append(ref)
request.digest.hash = tree.hash
request.digest.size_bytes = tree.size_bytes
remote.ref_storage.UpdateReference(request)
pushed = True
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
if skipped_remote:
if element:
element.info("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url))
if self._push_refs_to_remote(refs, remote):
pushed = True
elif element:
self.context.message(Message(
None,
MessageType.SKIPPED,
"Remote ({}) already has {} cached".format(
remote.spec.url, element._get_brief_display_key())
))
return pushed
def push_directory(self, project, directory):
push_remotes = [r for r in self._remotes[project] if r.spec.push]
if directory.ref is None:
return None
for remote in push_remotes:
remote.init()
self._send_directory(remote, directory.ref)
return directory.ref
def push_message(self, project, message):
push_remotes = [r for r in self._remotes[project] if r.spec.push]
message_buffer = message.SerializeToString()
message_sha = hashlib.sha256(message_buffer)
message_digest = remote_execution_pb2.Digest()
message_digest.hash = message_sha.hexdigest()
message_digest.size_bytes = len(message_buffer)
for remote in push_remotes:
remote.init()
with io.BytesIO(message_buffer) as b:
self._send_blob(remote, message_digest, b)
return message_digest
def _verify_digest_on_remote(self, remote, digest):
# Check whether ref is already on the server in which case
# there is no need to push the artifact
request = remote_execution_pb2.FindMissingBlobsRequest()
request.blob_digests.extend([digest])
response = remote.cas.FindMissingBlobs(request)
if digest in response.missing_blob_digests:
return False
return True
def verify_key_pushed(self, project, digest):
push_remotes = [r for r in self._remotes[project] if r.spec.push]
pushed = False
for remote in push_remotes:
remote.init()
if self._verify_digest_on_remote(remote, digest):
pushed = True
return pushed
################################################
......@@ -599,6 +641,7 @@ class CASCache(ArtifactCache):
################################################
# Local Private Methods #
################################################
def _checkout(self, dest, tree):
os.makedirs(dest, exist_ok=True)
......@@ -782,16 +825,16 @@ class CASCache(ArtifactCache):
for dirnode in directory.directories:
yield from self._required_blobs(dirnode.digest)
def _fetch_blob(self, remote, digest, out):
def _fetch_blob(self, remote, digest, stream):
resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
request = bytestream_pb2.ReadRequest()
request.resource_name = resource_name
request.read_offset = 0
for response in remote.bytestream.Read(request):
out.write(response.data)
stream.write(response.data)
stream.flush()
out.flush()
assert digest.size_bytes == os.fstat(out.fileno()).st_size
assert digest.size_bytes == os.fstat(stream.fileno()).st_size
def _fetch_directory(self, remote, tree):
objpath = self.objpath(tree)
......@@ -827,6 +870,92 @@ class CASCache(ArtifactCache):
digest = self.add_object(path=out.name)
assert digest.hash == tree.hash
def _fetch_tree(self, remote, digest):
# download but do not store the Tree object
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
self._fetch_blob(remote, digest, out)
tree = remote_execution_pb2.Tree()
with open(out.name, 'rb') as f:
tree.ParseFromString(f.read())
tree.children.extend([tree.root])
for directory in tree.children:
for filenode in directory.files:
fileobjpath = self.objpath(filenode.digest)
if os.path.exists(fileobjpath):
# already in local cache
continue
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
self._fetch_blob(remote, filenode.digest, f)
added_digest = self.add_object(path=f.name)
assert added_digest.hash == filenode.digest.hash
# place directory blob only in final location when we've downloaded
# all referenced blobs to avoid dangling references in the repository
dirbuffer = directory.SerializeToString()
dirdigest = self.add_object(buffer=dirbuffer)
assert dirdigest.size_bytes == len(dirbuffer)
return dirdigest
def _send_blob(self, remote, digest, stream):
resource_name = '/'.join(['uploads', self._uuid, 'blobs',
digest.hash, str(digest.size_bytes)])
def request_stream(resname, instream):
offset = 0
finished = False
remaining = digest.size_bytes
while not finished:
chunk_size = min(remaining, 64 * 1024)
remaining -= chunk_size
request = bytestream_pb2.WriteRequest()
request.write_offset = offset
# max. 64 kB chunks
request.data = instream.read(chunk_size)
request.resource_name = resname
request.finish_write = remaining <= 0
yield request
offset += chunk_size
finished = request.finish_write
response = remote.bytestream.Write(request_stream(resource_name, stream))
assert response.committed_size == digest.size_bytes
def _send_directory(self, remote, digest):
required_blobs = self._required_blobs(digest)
missing_blobs = dict()
# Limit size of FindMissingBlobs request
for required_blobs_group in _grouper(required_blobs, 512):
request = remote_execution_pb2.FindMissingBlobsRequest()
for required_digest in required_blobs_group:
d = request.blob_digests.add()
d.hash = required_digest.hash
d.size_bytes = required_digest.size_bytes
response = remote.cas.FindMissingBlobs(request)
for digest in response.missing_blob_digests:
d = remote_execution_pb2.Digest()
d.hash = digest.hash
d.size_bytes = digest.size_bytes
missing_blobs[d.hash] = d
# Upload any blobs missing on the server
for blob_digest in missing_blobs.values():
with open(self.objpath(blob_digest), 'rb') as f:
assert os.fstat(f.fileno()).st_size == blob_digest.size_bytes
self._send_blob(remote, blob_digest, f)
# Represents a single remote CAS cache.
#
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment