Skip to content
Snippets Groups Projects
Commit a78aaacb authored by Raoul Hidalgo Charman's avatar Raoul Hidalgo Charman
Browse files

pull_tree: WIP commit for what I've got so far.

doesn't work.
parent f112bfae
No related branches found
No related tags found
Loading
Pipeline #41870040 failed
......@@ -694,15 +694,26 @@ class ArtifactCache():
#
# Args:
# project (Project): The current project
# digest (Digest): The digest of the tree
# tree_digest (Digest): The digest of the tree
#
def pull_tree(self, project, digest):
def pull_tree(self, project, tree_digest):
for remote in self._remotes[project]:
digest = self.cas.pull_tree(remote, digest)
try:
for blob_digest in remote.yield_tree_digests(tree_digest):
if self.cas.check_blob(filenode.digest):
continue
remote.request_blob(filenode.digest)
for blob_file in remote.get_blobs():
self.cas.add_object(path=blob_file.name, link_directly=True)
if digest:
# no need to pull from additional remotes
return digest
# Get the last batch
for blob_file in remote.get_blobs(request_batch=True):
self.cas.add_object(path=blob_file.name, link_directly=True)
except BlobNotFound:
continue
else:
return tree_digest
return None
......
......@@ -183,29 +183,6 @@ class CASCache():
return modified, removed, added
# pull_tree():
#
# Pull a single Tree rather than a ref.
# Does not update local refs.
#
# Args:
# remote (CASRemote): The remote to pull from
# digest (Digest): The digest of the tree
#
def pull_tree(self, remote, digest):
try:
remote.init()
digest = self._fetch_tree(remote, digest)
return digest
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
raise
return None
# link_ref():
#
# Add an alias for an existing ref.
......@@ -771,29 +748,6 @@ class CASCache():
return objpath
def _fetch_tree(self, remote, digest):
# download but do not store the Tree object
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
remote._fetch_blob(digest, out)
tree = remote_execution_pb2.Tree()
with open(out.name, 'rb') as f:
tree.ParseFromString(f.read())
tree.children.extend([tree.root])
for directory in tree.children:
for filenode in directory.files:
self._ensure_blob(remote, filenode.digest)
# place directory blob only in final location when we've downloaded
# all referenced blobs to avoid dangling references in the repository
dirbuffer = directory.SerializeToString()
dirdigest = self.add_object(buffer=dirbuffer)
assert dirdigest.size_bytes == len(dirbuffer)
return dirdigest
def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
required_blobs = self._required_blobs(digest)
......
......@@ -281,6 +281,17 @@ class CASRemote():
else:
return None
def get_tree_blob(self, tree_digest):
self.init()
f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
self._fetch_blob(tree_digest, f)
tree = remote_execution_pb2.Tree()
with open(f.name, 'rb') as tmp:
tree.ParseFromString(tmp.read())
return tree
# yield_directory_digests():
#
# Iterate over blobs digests starting from a root digest
......@@ -301,6 +312,27 @@ class CASRemote():
# Fetch artifact, excluded_subdirs determined in pullqueue
yield from self._yield_directory_digests(root_digest, excluded_subdirs=excluded_subdirs)
def yield_tree_digests(self, tree_digest):
self.init()
# get tree file
f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
self._fetch_blob(tree_digest, f)
tree = remote_execution_pb2.Tree()
tree.ParseFromString(f.read())
tree.children.extend([tree.root])
for directory in tree.children:
for filenode in directory.files:
yield filenode.digest
# add the directory to files to add to cas store
f2 = tempfile.NamedTemporaryFile(dir=self.tmpdir)
f2.write(directory.SerializeToString())
self.__tmp_downloads.append(f2)
self.__tmp_downloads.append(f)
# request_blob():
#
# Request blob, triggering download depending via bytestream or cas
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment