Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • willsalmon/buildstream
  • CumHoleZH/buildstream
  • tchaik/buildstream
  • DCotyPortfolio/buildstream
  • jesusoctavioas/buildstream
  • patrickmmartin/buildstream
  • franred/buildstream
  • tintou/buildstream
  • alatiera/buildstream
  • martinblanchard/buildstream
  • neverdie22042524/buildstream
  • Mattlk13/buildstream
  • PServers/buildstream
  • phamnghia610909/buildstream
  • chiaratolentino/buildstream
  • eysz7-x-x/buildstream
  • kerrick1/buildstream
  • matthew-yates/buildstream
  • twofeathers/buildstream
  • mhadjimichael/buildstream
  • pointswaves/buildstream
  • Mr.JackWilson/buildstream
  • Tw3akG33k/buildstream
  • AlexFazakas/buildstream
  • eruidfkiy/buildstream
  • clamotion2/buildstream
  • nanonyme/buildstream
  • wickyjaaa/buildstream
  • nmanchev/buildstream
  • bojorquez.ja/buildstream
  • mostynb/buildstream
  • highpit74/buildstream
  • Demo112/buildstream
  • ba2014sheer/buildstream
  • tonimadrino/buildstream
  • usuario2o/buildstream
  • Angelika123456/buildstream
  • neo355/buildstream
  • corentin-ferlay/buildstream
  • coldtom/buildstream
  • wifitvbox81/buildstream
  • 358253885/buildstream
  • seanborg/buildstream
  • SotK/buildstream
  • DouglasWinship/buildstream
  • karansthr97/buildstream
  • louib/buildstream
  • bwh-ct/buildstream
  • robjh/buildstream
  • we88c0de/buildstream
  • zhengxian5555/buildstream
51 results
Show changes
Commits on Source (2)
  • Raoul Hidalgo Charman's avatar
    artifactcache: Move pull logic into CASRemote · a613213b
    Raoul Hidalgo Charman authored
    Seperates the pull logic into a remote/local API, so that artifact cache
    iterates over blob digests checks whether it has them, and then requests them
    if not. The request command allows batching of blobs where appropriate.
    
    Tests have been updated to ensure the correct tmpdir is set up in process
    wrappers, else invalid cross link errors happen in the CI. Additional asserts
    have been added to check that the temporary directories are cleared by the end
    of a pull.
    
    Part of #802
    a613213b
  • Raoul Hidalgo Charman's avatar
    artifactcache: implement new push methods · 44e1adb9
    Raoul Hidalgo Charman authored
    Similar to the pull methods, this implements a yield_directory_digests methods
    that iterates over blobs in the local CAS, with the upload_blob sending blobs
    to a remote and batching them where appropriate.
    
    Part of #802
    44e1adb9
......@@ -28,7 +28,8 @@ from ._message import Message, MessageType
from . import utils
from . import _yaml
from ._cas import CASRemote, CASRemoteSpec
from ._cas import BlobNotFound, CASRemote, CASRemoteSpec
from ._cas.transfer import cas_directory_upload, cas_directory_download, cas_tree_download
CACHE_SIZE_FILE = "cache_size"
......@@ -607,16 +608,41 @@ class ArtifactCache():
for remote in push_remotes:
remote.init()
skipped_remote = True
display_key = element._get_brief_display_key()
element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
if self.cas.push(refs, remote):
element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
try:
for ref in refs:
# Check whether ref is already on the server in which case
# there is no need to push the ref
root_digest = self.cas.resolve_ref(ref)
response = remote.get_reference(ref)
if (response is not None and
response.hash == root_digest.hash and
response.size_bytes == root_digest.size_bytes):
element.info("Remote ({}) already has {} cached".format(
remote.spec.url, element._get_brief_display_key()))
continue
# upload blobs
cas_directory_upload(self.cas, remote, root_digest)
remote.update_reference(ref, root_digest)
skipped_remote = False
except CASError as e:
if str(e.reason) == "StatusCode.RESOURCE_EXHAUSTED":
element.warn("Failed to push element to {}: Resource exhuasted"
.format(remote.spec.url))
continue
else:
raise ArtifactError("Failed to push refs {}: {}".format(refs, e),
temporary=True) from e
if skipped_remote is False:
pushed = True
else:
element.info("Remote ({}) already has {} cached".format(
remote.spec.url, element._get_brief_display_key()
))
element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
return pushed
......@@ -644,19 +670,31 @@ class ArtifactCache():
display_key = element._get_brief_display_key()
element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
if self.cas.pull(ref, remote, progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs):
element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
if subdir:
# Attempt to extract subdir into artifact extract dir if it already exists
# without containing the subdir. If the respective artifact extract dir does not
# exist a complete extraction will complete.
self.extract(element, key, subdir)
# no need to pull from additional remotes
return True
else:
root_digest = remote.get_reference(ref)
if not root_digest:
element.info("Remote ({}) does not have {} cached".format(
remote.spec.url, element._get_brief_display_key()
))
remote.spec.url, element._get_brief_display_key()))
continue
try:
cas_directory_download(self.cas, remote, root_digest, excluded_subdirs)
except BlobNotFound:
element.info("Remote ({}) is missing blobs for {}".format(
remote.spec.url, element._get_brief_display_key()))
continue
self.cas.set_ref(ref, root_digest)
if subdir:
# Attempt to extract subdir into artifact extract dir if it already exists
# without containing the subdir. If the respective artifact extract dir does not
# exist a complete extraction will complete.
self.extract(element, key, subdir)
element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
# no need to pull from additional remotes
return True
except CASError as e:
raise ArtifactError("Failed to pull artifact {}: {}".format(
......@@ -671,15 +709,16 @@ class ArtifactCache():
#
# Args:
# project (Project): The current project
# digest (Digest): The digest of the tree
# tree_digest (Digest): The digest of the tree
#
def pull_tree(self, project, digest):
def pull_tree(self, project, tree_digest):
for remote in self._remotes[project]:
digest = self.cas.pull_tree(remote, digest)
if digest:
# no need to pull from additional remotes
return digest
try:
root_digest = cas_tree_download(self.cas, remote, tree_digest)
except BlobNotFound:
continue
else:
return root_digest
return None
......@@ -708,7 +747,7 @@ class ArtifactCache():
return
for remote in push_remotes:
self.cas.push_directory(remote, directory)
cas_directory_upload(self.cas, remote, directory.ref)
# push_message():
#
......@@ -917,6 +956,19 @@ class ArtifactCache():
stat = os.statvfs(volume)
return stat.f_bsize * stat.f_bavail, stat.f_bsize * stat.f_blocks
def _fetch_directory(self, remote, root_digest, excluded_subdirs):
for blob_digest in remote.yield_directory_digests(
root_digest, excluded_subdirs=excluded_subdirs):
if self.cas.check_blob(blob_digest):
continue
remote.request_blob(blob_digest)
for blob_file in remote.get_blobs():
self.cas.add_object(path=blob_file.name, link_directly=True)
# Request final CAS batch
for blob_file in remote.get_blobs(complete_batch=True):
self.cas.add_object(path=blob_file.name, link_directly=True)
# _configured_remote_artifact_cache_specs():
#
......
......@@ -18,4 +18,4 @@
# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
from .cascache import CASCache
from .casremote import CASRemote, CASRemoteSpec
from .casremote import CASRemote, CASRemoteSpec, BlobNotFound
......@@ -18,23 +18,16 @@
# Jürg Billeter <juerg.billeter@codethink.co.uk>
import hashlib
import itertools
import os
import stat
import tempfile
import uuid
import contextlib
import grpc
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from .._protos.buildstream.v2 import buildstream_pb2
from .. import utils
from .._exceptions import CASCacheError
from .casremote import BlobNotFound, _CASBatchRead, _CASBatchUpdate
# A CASCache manages a CAS repository as specified in the Remote Execution API.
#
......@@ -183,73 +176,6 @@ class CASCache():
return modified, removed, added
# pull():
#
# Pull a ref from a remote repository.
#
# Args:
# ref (str): The ref to pull
# remote (CASRemote): The remote repository to pull from
# progress (callable): The progress callback, if any
# subdir (str): The optional specific subdir to pull
# excluded_subdirs (list): The optional list of subdirs to not pull
#
# Returns:
# (bool): True if pull was successful, False if ref was not available
#
def pull(self, ref, remote, *, progress=None, subdir=None, excluded_subdirs=None):
try:
remote.init()
request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name)
request.key = ref
response = remote.ref_storage.GetReference(request)
tree = remote_execution_pb2.Digest()
tree.hash = response.digest.hash
tree.size_bytes = response.digest.size_bytes
# Check if the element artifact is present, if so just fetch the subdir.
if subdir and os.path.exists(self.objpath(tree)):
self._fetch_subdir(remote, tree, subdir)
else:
# Fetch artifact, excluded_subdirs determined in pullqueue
self._fetch_directory(remote, tree, excluded_subdirs=excluded_subdirs)
self.set_ref(ref, tree)
return True
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
raise CASCacheError("Failed to pull ref {}: {}".format(ref, e)) from e
else:
return False
except BlobNotFound as e:
return False
# pull_tree():
#
# Pull a single Tree rather than a ref.
# Does not update local refs.
#
# Args:
# remote (CASRemote): The remote to pull from
# digest (Digest): The digest of the tree
#
def pull_tree(self, remote, digest):
try:
remote.init()
digest = self._fetch_tree(remote, digest)
return digest
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
raise
return None
# link_ref():
#
# Add an alias for an existing ref.
......@@ -263,73 +189,6 @@ class CASCache():
self.set_ref(newref, tree)
# push():
#
# Push committed refs to remote repository.
#
# Args:
# refs (list): The refs to push
# remote (CASRemote): The remote to push to
#
# Returns:
# (bool): True if any remote was updated, False if no pushes were required
#
# Raises:
# (CASCacheError): if there was an error
#
def push(self, refs, remote):
skipped_remote = True
try:
for ref in refs:
tree = self.resolve_ref(ref)
# Check whether ref is already on the server in which case
# there is no need to push the ref
try:
request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name)
request.key = ref
response = remote.ref_storage.GetReference(request)
if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
# ref is already on the server with the same tree
continue
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
# Intentionally re-raise RpcError for outer except block.
raise
self._send_directory(remote, tree)
request = buildstream_pb2.UpdateReferenceRequest(instance_name=remote.spec.instance_name)
request.keys.append(ref)
request.digest.hash = tree.hash
request.digest.size_bytes = tree.size_bytes
remote.ref_storage.UpdateReference(request)
skipped_remote = False
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
raise CASCacheError("Failed to push ref {}: {}".format(refs, e), temporary=True) from e
return not skipped_remote
# push_directory():
#
# Push the given virtual directory to a remote.
#
# Args:
# remote (CASRemote): The remote to push to
# directory (Directory): A virtual directory object to push.
#
# Raises:
# (CASCacheError): if there was an error
#
def push_directory(self, remote, directory):
remote.init()
self._send_directory(remote, directory.ref)
# objpath():
#
# Return the path of an object based on its digest.
......@@ -591,6 +450,37 @@ class CASCache():
reachable = set()
self._reachable_refs_dir(reachable, tree, update_mtime=True)
# Check to see if a blob is in the local CAS
# return None if not
def check_blob(self, digest):
objpath = self.objpath(digest)
if os.path.exists(objpath):
# already in local repository
return objpath
else:
return None
def yield_directory_digests(self, directory_digest):
# parse directory, and recursively add blobs
d = remote_execution_pb2.Digest()
d.hash = directory_digest.hash
d.size_bytes = directory_digest.size_bytes
yield d
directory = remote_execution_pb2.Directory()
with open(self.objpath(directory_digest), 'rb') as f:
directory.ParseFromString(f.read())
for filenode in directory.files:
d = remote_execution_pb2.Digest()
d.hash = filenode.digest.hash
d.size_bytes = filenode.digest.size_bytes
yield d
for dirnode in directory.directories:
yield from self.yield_directory_digests(dirnode.digest)
################################################
# Local Private Methods #
################################################
......@@ -779,202 +669,3 @@ class CASCache():
for dirnode in directory.directories:
yield from self._required_blobs(dirnode.digest)
# _ensure_blob():
#
# Fetch and add blob if it's not already local.
#
# Args:
# remote (Remote): The remote to use.
# digest (Digest): Digest object for the blob to fetch.
#
# Returns:
# (str): The path of the object
#
def _ensure_blob(self, remote, digest):
objpath = self.objpath(digest)
if os.path.exists(objpath):
# already in local repository
return objpath
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
remote._fetch_blob(digest, f)
added_digest = self.add_object(path=f.name, link_directly=True)
assert added_digest.hash == digest.hash
return objpath
def _batch_download_complete(self, batch):
for digest, data in batch.send():
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
f.write(data)
f.flush()
added_digest = self.add_object(path=f.name, link_directly=True)
assert added_digest.hash == digest.hash
# Helper function for _fetch_directory().
def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue):
self._batch_download_complete(batch)
# All previously scheduled directories are now locally available,
# move them to the processing queue.
fetch_queue.extend(fetch_next_queue)
fetch_next_queue.clear()
return _CASBatchRead(remote)
# Helper function for _fetch_directory().
def _fetch_directory_node(self, remote, digest, batch, fetch_queue, fetch_next_queue, *, recursive=False):
in_local_cache = os.path.exists(self.objpath(digest))
if in_local_cache:
# Skip download, already in local cache.
pass
elif (digest.size_bytes >= remote.max_batch_total_size_bytes or
not remote.batch_read_supported):
# Too large for batch request, download in independent request.
self._ensure_blob(remote, digest)
in_local_cache = True
else:
if not batch.add(digest):
# Not enough space left in batch request.
# Complete pending batch first.
batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
batch.add(digest)
if recursive:
if in_local_cache:
# Add directory to processing queue.
fetch_queue.append(digest)
else:
# Directory will be available after completing pending batch.
# Add directory to deferred processing queue.
fetch_next_queue.append(digest)
return batch
# _fetch_directory():
#
# Fetches remote directory and adds it to content addressable store.
#
# Fetches files, symbolic links and recursively other directories in
# the remote directory and adds them to the content addressable
# store.
#
# Args:
# remote (Remote): The remote to use.
# dir_digest (Digest): Digest object for the directory to fetch.
# excluded_subdirs (list): The optional list of subdirs to not fetch
#
def _fetch_directory(self, remote, dir_digest, *, excluded_subdirs=None):
fetch_queue = [dir_digest]
fetch_next_queue = []
batch = _CASBatchRead(remote)
if not excluded_subdirs:
excluded_subdirs = []
while len(fetch_queue) + len(fetch_next_queue) > 0:
if not fetch_queue:
batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
dir_digest = fetch_queue.pop(0)
objpath = self._ensure_blob(remote, dir_digest)
directory = remote_execution_pb2.Directory()
with open(objpath, 'rb') as f:
directory.ParseFromString(f.read())
for dirnode in directory.directories:
if dirnode.name not in excluded_subdirs:
batch = self._fetch_directory_node(remote, dirnode.digest, batch,
fetch_queue, fetch_next_queue, recursive=True)
for filenode in directory.files:
batch = self._fetch_directory_node(remote, filenode.digest, batch,
fetch_queue, fetch_next_queue)
# Fetch final batch
self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
def _fetch_subdir(self, remote, tree, subdir):
subdirdigest = self._get_subdir(tree, subdir)
self._fetch_directory(remote, subdirdigest)
def _fetch_tree(self, remote, digest):
# download but do not store the Tree object
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
remote._fetch_blob(digest, out)
tree = remote_execution_pb2.Tree()
with open(out.name, 'rb') as f:
tree.ParseFromString(f.read())
tree.children.extend([tree.root])
for directory in tree.children:
for filenode in directory.files:
self._ensure_blob(remote, filenode.digest)
# place directory blob only in final location when we've downloaded
# all referenced blobs to avoid dangling references in the repository
dirbuffer = directory.SerializeToString()
dirdigest = self.add_object(buffer=dirbuffer)
assert dirdigest.size_bytes == len(dirbuffer)
return dirdigest
def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
required_blobs = self._required_blobs(digest)
missing_blobs = dict()
# Limit size of FindMissingBlobs request
for required_blobs_group in _grouper(required_blobs, 512):
request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name)
for required_digest in required_blobs_group:
d = request.blob_digests.add()
d.hash = required_digest.hash
d.size_bytes = required_digest.size_bytes
response = remote.cas.FindMissingBlobs(request)
for missing_digest in response.missing_blob_digests:
d = remote_execution_pb2.Digest()
d.hash = missing_digest.hash
d.size_bytes = missing_digest.size_bytes
missing_blobs[d.hash] = d
# Upload any blobs missing on the server
self._send_blobs(remote, missing_blobs.values(), u_uid)
def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
batch = _CASBatchUpdate(remote)
for digest in digests:
with open(self.objpath(digest), 'rb') as f:
assert os.fstat(f.fileno()).st_size == digest.size_bytes
if (digest.size_bytes >= remote.max_batch_total_size_bytes or
not remote.batch_update_supported):
# Too large for batch request, upload in independent request.
remote._send_blob(digest, f, u_uid=u_uid)
else:
if not batch.add(digest, f):
# Not enough space left in batch request.
# Complete pending batch first.
batch.send()
batch = _CASBatchUpdate(remote)
batch.add(digest, f)
# Send final batch
batch.send()
def _grouper(iterable, n):
while True:
try:
current = next(iterable)
except StopIteration:
return
yield itertools.chain([current], itertools.islice(iterable, n - 1))
from collections import namedtuple
import io
import itertools
import os
import multiprocessing
import signal
import tempfile
from urllib.parse import urlparse
import uuid
......@@ -96,6 +98,11 @@ class CASRemote():
self.tmpdir = str(tmpdir)
os.makedirs(self.tmpdir, exist_ok=True)
self.__tmp_downloads = [] # files in the tmpdir waiting to be added to local caches
self.__batch_read = None
self.__batch_update = None
def init(self):
if not self._initialized:
url = urlparse(self.spec.url)
......@@ -153,6 +160,7 @@ class CASRemote():
request = remote_execution_pb2.BatchReadBlobsRequest()
response = self.cas.BatchReadBlobs(request)
self.batch_read_supported = True
self.__batch_read = _CASBatchRead(self)
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.UNIMPLEMENTED:
raise
......@@ -163,6 +171,7 @@ class CASRemote():
request = remote_execution_pb2.BatchUpdateBlobsRequest()
response = self.cas.BatchUpdateBlobs(request)
self.batch_update_supported = True
self.__batch_update = _CASBatchUpdate(self)
except grpc.RpcError as e:
if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
e.code() != grpc.StatusCode.PERMISSION_DENIED):
......@@ -259,6 +268,210 @@ class CASRemote():
return message_digest
# get_reference():
#
# Args:
# ref (str): The ref to request
#
# Returns:
# (digest): digest of ref, None if not found
#
def get_reference(self, ref):
try:
self.init()
request = buildstream_pb2.GetReferenceRequest()
request.key = ref
return self.ref_storage.GetReference(request).digest
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
raise CASRemoteError("Failed to find ref {}: {}".format(ref, e)) from e
else:
return None
# update_reference():
#
# Args:
# ref (str): Reference to update
# digest (Digest): New digest to update ref with
def update_reference(self, ref, digest):
request = buildstream_pb2.UpdateReferenceRequest()
request.keys.append(ref)
request.digest.hash = digest.hash
request.digest.size_bytes = digest.size_bytes
self.ref_storage.UpdateReference(request)
def get_tree_blob(self, tree_digest):
self.init()
f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
self._fetch_blob(tree_digest, f)
tree = remote_execution_pb2.Tree()
with open(f.name, 'rb') as tmp:
tree.ParseFromString(tmp.read())
return tree
# yield_directory_digests():
#
# Recursively iterates over digests for files, symbolic links and other
# directories starting from a root digest
#
# Args:
# root_digest (digest): The root_digest to get a tree of
# progress (callable): The progress callback, if any
# subdir (str): The optional specific subdir to pull
# excluded_subdirs (list): The optional list of subdirs to not pull
#
# Returns:
# (iter digests): recursively iterates over digests contained in root directory
#
def yield_directory_digests(self, root_digest, *, progress=None,
subdir=None, excluded_subdirs=None):
self.init()
# Fetch artifact, excluded_subdirs determined in pullqueue
if excluded_subdirs is None:
excluded_subdirs = []
# get directory blob
f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
self._fetch_blob(root_digest, f)
directory = remote_execution_pb2.Directory()
with open(f.name, 'rb') as tmp:
directory.ParseFromString(tmp.read())
yield root_digest
for filenode in directory.files:
yield filenode.digest
for dirnode in directory.directories:
if dirnode.name not in excluded_subdirs:
yield from self.yield_directory_digests(dirnode.digest)
# yield_tree_digests():
#
# Fetches a tree file from digests and then iterates over child digests
#
# Args:
# tree_digest (digest): tree digest
#
# Returns:
# (iter digests): iterates over digests in tree message
def yield_tree_digests(self, tree):
self.init()
tree.children.extend([tree.root])
for directory in tree.children:
for filenode in directory.files:
yield filenode.digest
# add the directory to downloaded tmp files to be added
f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
f.write(directory.SerializeToString())
f.flush()
self.__tmp_downloads.append(f)
# request_blob():
#
# Request blob, triggering download depending via bytestream or cas
# BatchReadBlobs depending on size.
#
# Args:
# digest (Digest): digest of the requested blob
#
def request_blob(self, digest):
if (not self.batch_read_supported or
digest.size_bytes > self.max_batch_total_size_bytes):
f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
self._fetch_blob(digest, f)
self.__tmp_downloads.append(f)
elif self.__batch_read.add(digest) is False:
self._download_batch()
self.__batch_read.add(digest)
# get_blobs():
#
# Yield over downloaded blobs in the tmp file locations, causing the files
# to be deleted once they go out of scope.
#
# Args:
# complete_batch (bool): download any outstanding batch read request
#
# Returns:
# iterator over NamedTemporaryFile
def get_blobs(self, complete_batch=False):
# Send read batch request and download
if (complete_batch is True and
self.batch_read_supported is True):
self._download_batch()
while self.__tmp_downloads:
yield self.__tmp_downloads.pop()
# upload_blob():
#
# Push blobs given an iterator over blob files
#
# Args:
# digest (Digest): digest we want to upload
# blob_file (str): Name of file location
# u_uid (str): Used to identify to the bytestream service
#
def upload_blob(self, digest, blob_file, u_uid=uuid.uuid4()):
with open(blob_file, 'rb') as f:
assert os.fstat(f.fileno()).st_size == digest.size_bytes
if (digest.size_bytes >= self.max_batch_total_size_bytes or
not self.batch_update_supported):
# Too large for batch request, upload in independent request.
self._send_blob(digest, f, u_uid=u_uid)
else:
if self.__batch_update.add(digest, f) is False:
self.__batch_update.send()
self.__batch_update = _CASBatchUpdate(self)
self.__batch_update.add(digest, f)
# send_update_batch():
#
# Sends anything left in the update batch
#
def send_update_batch(self):
# make sure everything is sent
self.__batch_update.send()
self.__batch_update = _CASBatchUpdate(self)
# find_missing_blobs()
#
# Does FindMissingBlobs request to remote
#
# Args:
# required_blobs ([Digest]): list of blobs required
#
# Returns:
# (Dict(Digest)): missing blobs
def find_missing_blobs(self, required_blobs):
self.init()
missing_blobs = dict()
# Limit size of FindMissingBlobs request
for required_blobs_group in _grouper(required_blobs, 512):
request = remote_execution_pb2.FindMissingBlobsRequest()
for required_digest in required_blobs_group:
d = request.blob_digests.add()
d.hash = required_digest.hash
d.size_bytes = required_digest.size_bytes
response = self.cas.FindMissingBlobs(request)
for missing_digest in response.missing_blob_digests:
d = remote_execution_pb2.Digest()
d.hash = missing_digest.hash
d.size_bytes = missing_digest.size_bytes
missing_blobs[d.hash] = d
return missing_blobs
################################################
# Local Private Methods #
################################################
......@@ -297,10 +510,31 @@ class CASRemote():
offset += chunk_size
finished = request.finish_write
response = self.bytestream.Write(request_stream(resource_name, stream))
try:
response = self.bytestream.Write(request_stream(resource_name, stream))
except grpc.RpcError as e:
raise CASRemoteError("Failed to upload blob: {}".format(e), reason=e.code())
assert response.committed_size == digest.size_bytes
def _download_batch(self):
for _, data in self.__batch_read.send():
f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
f.write(data)
f.flush()
self.__tmp_downloads.append(f)
self.__batch_read = _CASBatchRead(self)
def _grouper(iterable, n):
while True:
try:
current = next(iterable)
except StopIteration:
return
yield itertools.chain([current], itertools.islice(iterable, n - 1))
# Represents a batch of blobs queued for fetching.
#
......@@ -333,7 +567,11 @@ class _CASBatchRead():
if not self._request.digests:
return
batch_response = self._remote.cas.BatchReadBlobs(self._request)
try:
batch_response = self._remote.cas.BatchReadBlobs(self._request)
except grpc.RpcError as e:
raise CASRemoteError("Failed to read blob batch: {}".format(e),
reason=e.code()) from e
for response in batch_response.responses:
if response.status.code == code_pb2.NOT_FOUND:
......@@ -381,7 +619,12 @@ class _CASBatchUpdate():
if not self._request.requests:
return
batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
# Want to raise a CASRemoteError if
try:
batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
except grpc.RpcError as e:
raise CASRemoteError("Failed to upload blob batch: {}".format(e),
reason=e.code()) from e
for response in batch_response.responses:
if response.status.code != code_pb2.OK:
......
#
# Copyright (C) 2017-2018 Codethink Limited
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk>
from ..utils import _message_digest
def cas_directory_download(caslocal, casremote, root_digest, excluded_subdirs):
for blob_digest in casremote.yield_directory_digests(
root_digest, excluded_subdirs=excluded_subdirs):
if caslocal.check_blob(blob_digest):
continue
casremote.request_blob(blob_digest)
for blob_file in casremote.get_blobs():
caslocal.add_object(path=blob_file.name, link_directly=True)
# Request final CAS batch
for blob_file in casremote.get_blobs(complete_batch=True):
caslocal.add_object(path=blob_file.name, link_directly=True)
def cas_tree_download(caslocal, casremote, tree_digest):
tree = casremote.get_tree_blob(tree_digest)
for blob_digest in casremote.yield_tree_digests(tree):
if caslocal.check_blob(blob_digest):
continue
casremote.request_blob(blob_digest)
for blob_file in casremote.get_blobs():
caslocal.add_object(path=blob_file.name, link_directly=True)
# Get the last batch
for blob_file in casremote.get_blobs(complete_batch=True):
caslocal.add_object(path=blob_file.name, link_directly=True)
# get root digest from tree and return that
return _message_digest(tree.root.SerializeToString())
def cas_directory_upload(caslocal, casremote, root_digest):
required_blobs = caslocal.yield_directory_digests(root_digest)
missing_blobs = casremote.find_missing_blobs(required_blobs)
for blob in missing_blobs.values():
blob_file = caslocal.objpath(blob)
casremote.upload_blob(blob, blob_file)
# send remaining blobs
casremote.send_update_batch()
......@@ -39,6 +39,7 @@ from .._exceptions import SandboxError
from .. import _yaml
from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
from .._cas import CASRemote, CASRemoteSpec
from .._cas.transfer import cas_tree_download, cas_directory_upload
class RemoteExecutionSpec(namedtuple('RemoteExecutionSpec', 'exec_service storage_service action_service')):
......@@ -281,8 +282,7 @@ class SandboxRemote(Sandbox):
cascache = context.get_cascache()
casremote = CASRemote(self.storage_remote_spec, context.tmpdir)
# Now do a pull to ensure we have the necessary parts.
dir_digest = cascache.pull_tree(casremote, tree_digest)
dir_digest = cas_tree_download(cascache, casremote, tree_digest)
if dir_digest is None or not dir_digest.hash or not dir_digest.size_bytes:
raise SandboxError("Output directory structure pulling from remote failed.")
......@@ -345,7 +345,7 @@ class SandboxRemote(Sandbox):
# Now, push that key (without necessarily needing a ref) to the remote.
try:
cascache.push_directory(casremote, upload_vdir)
cas_directory_upload(cascache, casremote, upload_vdir.ref)
except grpc.RpcError as e:
raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
......
......@@ -55,6 +55,11 @@ def integration_cache(request):
except FileNotFoundError:
pass
# Clean up the tmp dir, should be empty but isn't in CI tests
try:
shutil.rmtree(os.path.join(cache_dir, 'tmp'))
except FileNotFoundError:
pass
def clean_platform_cache():
Platform._instance = None
......
......@@ -110,7 +110,7 @@ def test_pull(cli, tmpdir, datafiles):
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
process = multiprocessing.Process(target=_queue_wrapper,
args=(_test_pull, queue, user_config_file, project_dir,
artifact_dir, 'target.bst', element_key))
artifact_dir, tmpdir, 'target.bst', element_key))
try:
# Keep SIGINT blocked in the child process
......@@ -126,14 +126,18 @@ def test_pull(cli, tmpdir, datafiles):
assert not error
assert cas.contains(element, element_key)
# Check that the tmp dir is cleared out
assert os.listdir(os.path.join(str(tmpdir), 'cache', 'tmp')) == []
def _test_pull(user_config_file, project_dir, artifact_dir,
def _test_pull(user_config_file, project_dir, artifact_dir, tmpdir,
element_name, element_key, queue):
# Fake minimal context
context = Context()
context.load(config=user_config_file)
context.artifactdir = artifact_dir
context.set_message_handler(message_handler)
context.tmpdir = os.path.join(str(tmpdir), 'cache', 'tmp')
# Load the project manually
project = Project(project_dir, context)
......@@ -218,7 +222,7 @@ def test_pull_tree(cli, tmpdir, datafiles):
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
process = multiprocessing.Process(target=_queue_wrapper,
args=(_test_push_tree, queue, user_config_file, project_dir,
artifact_dir, artifact_digest))
artifact_dir, tmpdir, artifact_digest))
try:
# Keep SIGINT blocked in the child process
......@@ -239,6 +243,9 @@ def test_pull_tree(cli, tmpdir, datafiles):
# Assert that we are not cached locally anymore
assert cli.get_element_state(project_dir, 'target.bst') != 'cached'
# Check that the tmp dir is cleared out
assert os.listdir(os.path.join(str(tmpdir), 'cache', 'tmp')) == []
tree_digest = remote_execution_pb2.Digest(hash=tree_hash,
size_bytes=tree_size)
......@@ -246,7 +253,7 @@ def test_pull_tree(cli, tmpdir, datafiles):
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
process = multiprocessing.Process(target=_queue_wrapper,
args=(_test_pull_tree, queue, user_config_file, project_dir,
artifact_dir, tree_digest))
artifact_dir, tmpdir, tree_digest))
try:
# Keep SIGINT blocked in the child process
......@@ -267,13 +274,18 @@ def test_pull_tree(cli, tmpdir, datafiles):
# Ensure the entire Tree stucture has been pulled
assert os.path.exists(cas.objpath(directory_digest))
# Check that the tmp dir is cleared out
assert os.listdir(os.path.join(str(tmpdir), 'cache', 'tmp')) == []
def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest, queue):
def _test_push_tree(user_config_file, project_dir, artifact_dir, tmpdir,
artifact_digest, queue):
# Fake minimal context
context = Context()
context.load(config=user_config_file)
context.artifactdir = artifact_dir
context.set_message_handler(message_handler)
context.tmpdir = os.path.join(str(tmpdir), 'cache', 'tmp')
# Load the project manually
project = Project(project_dir, context)
......@@ -304,12 +316,14 @@ def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest
queue.put("No remote configured")
def _test_pull_tree(user_config_file, project_dir, artifact_dir, artifact_digest, queue):
def _test_pull_tree(user_config_file, project_dir, artifact_dir, tmpdir,
artifact_digest, queue):
# Fake minimal context
context = Context()
context.load(config=user_config_file)
context.artifactdir = artifact_dir
context.set_message_handler(message_handler)
context.tmpdir = os.path.join(str(tmpdir), 'cache', 'tmp')
# Load the project manually
project = Project(project_dir, context)
......
......@@ -89,7 +89,7 @@ def test_push(cli, tmpdir, datafiles):
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
process = multiprocessing.Process(target=_queue_wrapper,
args=(_test_push, queue, user_config_file, project_dir,
artifact_dir, 'target.bst', element_key))
artifact_dir, tmpdir, 'target.bst', element_key))
try:
# Keep SIGINT blocked in the child process
......@@ -105,14 +105,18 @@ def test_push(cli, tmpdir, datafiles):
assert not error
assert share.has_artifact('test', 'target.bst', element_key)
# Check tmpdir for downloads is cleared
assert os.listdir(os.path.join(str(tmpdir), 'cache', 'tmp')) == []
def _test_push(user_config_file, project_dir, artifact_dir,
def _test_push(user_config_file, project_dir, artifact_dir, tmpdir,
element_name, element_key, queue):
# Fake minimal context
context = Context()
context.load(config=user_config_file)
context.artifactdir = artifact_dir
context.set_message_handler(message_handler)
context.tmpdir = os.path.join(str(tmpdir), 'cache', 'tmp')
# Load the project manually
project = Project(project_dir, context)
......@@ -196,9 +200,10 @@ def test_push_directory(cli, tmpdir, datafiles):
queue = multiprocessing.Queue()
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
process = multiprocessing.Process(target=_queue_wrapper,
args=(_test_push_directory, queue, user_config_file,
project_dir, artifact_dir, artifact_digest))
process = multiprocessing.Process(
target=_queue_wrapper,
args=(_test_push_directory, queue, user_config_file, project_dir,
artifact_dir, tmpdir, artifact_digest))
try:
# Keep SIGINT blocked in the child process
......@@ -215,13 +220,17 @@ def test_push_directory(cli, tmpdir, datafiles):
assert artifact_digest.hash == directory_hash
assert share.has_object(artifact_digest)
assert os.listdir(os.path.join(str(tmpdir), 'cache', 'tmp')) == []
def _test_push_directory(user_config_file, project_dir, artifact_dir, artifact_digest, queue):
def _test_push_directory(user_config_file, project_dir, artifact_dir, tmpdir,
artifact_digest, queue):
# Fake minimal context
context = Context()
context.load(config=user_config_file)
context.artifactdir = artifact_dir
context.set_message_handler(message_handler)
context.tmpdir = os.path.join(str(tmpdir), 'cache', 'tmp')
# Load the project manually
project = Project(project_dir, context)
......@@ -273,7 +282,7 @@ def test_push_message(cli, tmpdir, datafiles):
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
process = multiprocessing.Process(target=_queue_wrapper,
args=(_test_push_message, queue, user_config_file,
project_dir, artifact_dir))
project_dir, artifact_dir, tmpdir))
try:
# Keep SIGINT blocked in the child process
......@@ -291,13 +300,16 @@ def test_push_message(cli, tmpdir, datafiles):
size_bytes=message_size)
assert share.has_object(message_digest)
assert os.listdir(os.path.join(str(tmpdir), 'cache', 'tmp')) == []
def _test_push_message(user_config_file, project_dir, artifact_dir, queue):
def _test_push_message(user_config_file, project_dir, artifact_dir, tmpdir, queue):
# Fake minimal context
context = Context()
context.load(config=user_config_file)
context.artifactdir = artifact_dir
context.set_message_handler(message_handler)
context.tmpdir = os.path.join(str(tmpdir), 'cache', 'tmp')
# Load the project manually
project = Project(project_dir, context)
......
......@@ -23,6 +23,7 @@ def default_state(cli, tmpdir, share):
'artifacts': {'url': share.repo, 'push': False},
'artifactdir': os.path.join(str(tmpdir), 'artifacts'),
'cache': {'pull-buildtrees': False},
'tmpdir': os.path.join(str(tmpdir), 'cache', 'tmp'),
})
......@@ -79,6 +80,9 @@ def test_pullbuildtrees(cli, tmpdir, datafiles, integration_cache):
assert os.path.isdir(buildtreedir)
default_state(cli, tmpdir, share1)
# Check tmpdir for downloads is cleared
assert os.listdir(os.path.join(integration_cache, 'tmp')) == []
# Pull artifact with pullbuildtrees set in user config, then assert
# that pulling with the same user config doesn't creates a pull job,
# or when buildtrees cli flag is set.
......@@ -91,6 +95,9 @@ def test_pullbuildtrees(cli, tmpdir, datafiles, integration_cache):
assert element_name not in result.get_pulled_elements()
default_state(cli, tmpdir, share1)
# Check tmpdir for downloads is cleared
assert os.listdir(os.path.join(integration_cache, 'tmp')) == []
# Pull artifact with default config and buildtrees cli flag set, then assert
# that pulling with pullbuildtrees set in user config doesn't create a pull
# job.
......@@ -101,6 +108,9 @@ def test_pullbuildtrees(cli, tmpdir, datafiles, integration_cache):
assert element_name not in result.get_pulled_elements()
default_state(cli, tmpdir, share1)
# Check tmpdir for downloads is cleared
assert os.listdir(os.path.join(integration_cache, 'tmp')) == []
# Assert that a partial build element (not containing a populated buildtree dir)
# can't be pushed to an artifact share, then assert that a complete build element
# can be. This will attempt a partial pull from share1 and then a partial push
......