Skip to content
Snippets Groups Projects
Commit 2dafb5a8 authored by Jim MacArthur's avatar Jim MacArthur
Browse files

_casbaseddirectory.py: Enable direct CAS-to-CAS import.

parent 7f79b9ce
No related branches found
No related tags found
No related merge requests found
......@@ -30,7 +30,6 @@ See also: :ref:`sandboxing`.
from collections import OrderedDict
import os
import tempfile
import stat
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
......@@ -51,6 +50,24 @@ class IndexEntry():
self.modified = modified
class ResolutionException(Exception):
""" Superclass of all exceptions that can be raised by
CasBasedDirectory._resolve. Should not be used outside this module. """
pass
class InfiniteSymlinkException(ResolutionException):
""" Raised when an infinite symlink loop is found. """
pass
class AbsoluteSymlinkException(ResolutionException):
"""Raised if we try to follow an absolute symlink (i.e. one whose
target starts with the path separator) and we have disallowed
following such symlinks. """
pass
# CasBasedDirectory intentionally doesn't call its superclass constuctor,
# which is meant to be unimplemented.
# pylint: disable=super-init-not-called
......@@ -168,29 +185,34 @@ class CasBasedDirectory(Directory):
self.index[name] = IndexEntry(dirnode, buildstream_object=newdir)
return newdir
def _add_new_file(self, basename, filename):
def _add_file(self, basename, filename, modified=False):
filenode = self.pb2_directory.files.add()
filenode.name = filename
self.cas_cache.add_object(digest=filenode.digest, path=os.path.join(basename, filename))
is_executable = os.access(os.path.join(basename, filename), os.X_OK)
filenode.is_executable = is_executable
self.index[filename] = IndexEntry(filenode, modified=(filename in self.index))
self.index[filename] = IndexEntry(filenode, modified=modified or filename in self.index)
def _copy_link_from_filesystem(self, basename, filename):
self._add_new_link_direct(filename, os.readlink(os.path.join(basename, filename)))
def _add_new_link(self, basename, filename):
existing_link = self._find_pb2_entry(filename)
def _add_new_link_direct(self, name, target):
existing_link = self._find_pb2_entry(name)
if existing_link:
symlinknode = existing_link
else:
symlinknode = self.pb2_directory.symlinks.add()
symlinknode.name = filename
assert isinstance(symlinknode, remote_execution_pb2.SymlinkNode)
symlinknode.name = name
# A symlink node has no digest.
symlinknode.target = os.readlink(os.path.join(basename, filename))
self.index[filename] = IndexEntry(symlinknode, modified=(existing_link is not None))
symlinknode.target = target
self.index[name] = IndexEntry(symlinknode, modified=(existing_link is not None))
def delete_entry(self, name):
for collection in [self.pb2_directory.files, self.pb2_directory.symlinks, self.pb2_directory.directories]:
if name in collection:
collection.remove(name)
for thing in collection:
if thing.name == name:
collection.remove(thing)
if name in self.index:
del self.index[name]
......@@ -231,9 +253,13 @@ class CasBasedDirectory(Directory):
if isinstance(entry, CasBasedDirectory):
return entry.descend(subdirectory_spec[1:], create)
else:
# May be a symlink
target = self._resolve(subdirectory_spec[0], force_create=create)
if isinstance(target, CasBasedDirectory):
return target
error = "Cannot descend into {}, which is a '{}' in the directory {}"
raise VirtualDirectoryError(error.format(subdirectory_spec[0],
type(entry).__name__,
type(self.index[subdirectory_spec[0]].pb_object).__name__,
self))
else:
if create:
......@@ -254,36 +280,112 @@ class CasBasedDirectory(Directory):
else:
return self
def _resolve_symlink_or_directory(self, name):
"""Used only by _import_files_from_directory. Tries to resolve a
directory name or symlink name. 'name' must be an entry in this
directory. It must be a single symlink or directory name, not a path
separated by path separators. If it's an existing directory name, it
just returns the Directory object for that. If it's a symlink, it will
attempt to find the target of the symlink and return that as a
Directory object.
If a symlink target doesn't exist, it will attempt to create it
as a directory as long as it's within this directory tree.
def _resolve(self, name, absolute_symlinks_resolve=True, force_create=False, seen_objects=None):
"""Resolves any name to an object. If the name points to a symlink in
this directory, it returns the thing it points to,
recursively.
Returns a CasBasedDirectory, FileNode or None. None indicates
either that 'target' does not exist in this directory, or is a
symlink chain which points to a nonexistent name (broken
symlink).
Raises:
- InfiniteSymlinkException if 'name' points to an infinite symlink loop.
- AbsoluteSymlinkException if 'name' points to an absolute symlink and absolute_symlinks_resolve is False.
If force_create is on, this will attempt to create directories to make symlinks and directories resolve.
If force_create is off, this will never alter this directory.
"""
if isinstance(self.index[name].buildstream_object, Directory):
return self.index[name].buildstream_object
# OK then, it's a symlink
symlink = self._find_pb2_entry(name)
if name not in self.index:
return None
# First check if it's a normal object and return that
index_entry = self.index[name]
if isinstance(index_entry.buildstream_object, Directory):
return index_entry.buildstream_object
elif isinstance(index_entry.pb_object, remote_execution_pb2.FileNode):
return index_entry.pb_object
assert isinstance(index_entry.pb_object, remote_execution_pb2.SymlinkNode)
if seen_objects is None:
seen_objects = [index_entry.pb_object]
else:
if index_entry.pb_object in seen_objects:
# Infinite symlink loop detected
raise InfiniteSymlinkException()
symlink = index_entry.pb_object
components = symlink.target.split(CasBasedDirectory._pb2_path_sep)
absolute = symlink.target.startswith(CasBasedDirectory._pb2_absolute_path_prefix)
if absolute:
root = self.find_root()
if absolute_symlinks_resolve:
start_directory = self.find_root()
# Discard the first empty element
components.pop(0)
else:
# Unresolvable absolute symlink
raise AbsoluteSymlinkException()
else:
root = self
directory = root
components = symlink.target.split(CasBasedDirectory._pb2_path_sep)
for c in components:
if c == "..":
directory = directory.parent
start_directory = self
directory = start_directory
while True:
if not components:
# If we run out of components, the directory we're currently in
# is the resolved component.
return directory
c = components.pop(0)
if c == ".":
pass
elif c == "..":
if directory.parent is not None:
directory = directory.parent
# If directory.parent *is* None, this is an attempt to access
# '..' from the root, which is valid under POSIX; it just
# returns the root.
elif c in directory.index:
# Recursive resolve and continue
try:
f = directory._resolve(c, absolute_symlinks_resolve, seen_objects=seen_objects,
force_create=force_create)
except ResolutionException:
return None
if isinstance(f, CasBasedDirectory):
directory = f
elif isinstance(f, remote_execution_pb2.FileNode):
if components:
# We have components still to resolve, but one of the path components
# is a file.
if force_create:
self.delete_entry(c)
directory = directory.descend(c, create=True)
else:
errormsg = ("Reached a file called {} while trying to resolve a symlink; " +
"cannot proceed. The remaining path components are {}.")
raise ResolutionException(errormsg.format(c, components))
else:
# It's a file, and there's no path components left, so just return that.
return f
else:
# f was not found, or wasn't resolvable
if force_create:
directory = directory.descend(c, create=True)
else:
return None
else:
directory = directory.descend(c, create=True)
return directory
# c is not in our index
if force_create:
directory = directory.descend(c, create=True)
else:
return None
# You can only exit the while loop with a return, or exception, so you shouldn't be here.
def _check_replacement(self, name, path_prefix, fileListResult):
""" Checks whether 'name' exists, and if so, whether we can overwrite it.
......@@ -297,6 +399,7 @@ class CasBasedDirectory(Directory):
return True
if (isinstance(existing_entry,
(remote_execution_pb2.FileNode, remote_execution_pb2.SymlinkNode))):
self.delete_entry(name)
fileListResult.overwritten.append(relative_pathname)
return True
elif isinstance(existing_entry, remote_execution_pb2.DirectoryNode):
......@@ -314,23 +417,44 @@ class CasBasedDirectory(Directory):
.format(name, type(existing_entry)))
return False # In case asserts are disabled
def _import_directory_recursively(self, directory_name, source_directory, remaining_path, path_prefix):
""" _import_directory_recursively and _import_files_from_directory will be called alternately
as a directory tree is descended. """
if directory_name in self.index:
subdir = self._resolve_symlink_or_directory(directory_name)
else:
subdir = self._add_directory(directory_name)
new_path_prefix = os.path.join(path_prefix, directory_name)
subdir_result = subdir._import_files_from_directory(os.path.join(source_directory, directory_name),
[os.path.sep.join(remaining_path)],
path_prefix=new_path_prefix)
return subdir_result
def _replace_anything_with_dir(self, name, path_prefix, overwritten_files_list):
self.delete_entry(name)
subdir = self._add_directory(name)
overwritten_files_list.append(os.path.join(path_prefix, name))
return subdir
def _import_files_from_directory(self, source_directory, files, path_prefix=""):
""" Imports files from a traditional directory """
""" Imports files from a traditional directory. """
def _ensure_followable(name, path_prefix):
""" Makes sure 'name' is a directory or symlink to a directory which can be descended into. """
if isinstance(self.index[name].buildstream_object, Directory):
return self.descend(name)
try:
target = self._resolve(name, force_create=True)
except InfiniteSymlinkException:
return self._replace_anything_with_dir(name, path_prefix, result.overwritten)
if isinstance(target, CasBasedDirectory):
return target
elif isinstance(target, remote_execution_pb2.FileNode):
return self._replace_anything_with_dir(name, path_prefix, result.overwritten)
return target
def _import_directory_recursively(directory_name, source_directory, remaining_path, path_prefix):
""" _import_directory_recursively and _import_files_from_directory will be called alternately
as a directory tree is descended. """
if directory_name in self.index:
subdir = _ensure_followable(directory_name, path_prefix)
else:
subdir = self._add_directory(directory_name)
new_path_prefix = os.path.join(path_prefix, directory_name)
subdir_result = subdir._import_files_from_directory(os.path.join(source_directory, directory_name),
[os.path.sep.join(remaining_path)],
path_prefix=new_path_prefix)
return subdir_result
result = FileListResult()
for entry in sorted(files):
for entry in files:
split_path = entry.split(os.path.sep)
# The actual file on the FS we're importing
import_file = os.path.join(source_directory, entry)
......@@ -338,14 +462,18 @@ class CasBasedDirectory(Directory):
relative_pathname = os.path.join(path_prefix, entry)
if len(split_path) > 1:
directory_name = split_path[0]
# Hand this off to the importer for that subdir. This will only do one file -
# a better way would be to hand off all the files in this subdir at once.
subdir_result = self._import_directory_recursively(directory_name, source_directory,
split_path[1:], path_prefix)
# Hand this off to the importer for that subdir.
# It would be advantageous to batch these together by
# directory_name. However, we can't do it out of
# order, since importing symlinks affects the results
# of other imports.
subdir_result = _import_directory_recursively(directory_name, source_directory,
split_path[1:], path_prefix)
result.combine(subdir_result)
elif os.path.islink(import_file):
if self._check_replacement(entry, path_prefix, result):
self._add_new_link(source_directory, entry)
self._copy_link_from_filesystem(source_directory, entry)
result.files_written.append(relative_pathname)
elif os.path.isdir(import_file):
# A plain directory which already exists isn't a problem; just ignore it.
......@@ -353,10 +481,86 @@ class CasBasedDirectory(Directory):
self._add_directory(entry)
elif os.path.isfile(import_file):
if self._check_replacement(entry, path_prefix, result):
self._add_new_file(source_directory, entry)
self._add_file(source_directory, entry, modified=relative_pathname in result.overwritten)
result.files_written.append(relative_pathname)
return result
@staticmethod
def _files_in_subdir(sorted_files, dirname):
"""Filters sorted_files and returns only the ones which have
'dirname' as a prefix, with that prefix removed.
"""
if not dirname.endswith(os.path.sep):
dirname += os.path.sep
return [f[len(dirname):] for f in sorted_files if f.startswith(dirname)]
def _partial_import_cas_into_cas(self, source_directory, files, path_prefix="", file_list_required=True):
""" Import only the files and symlinks listed in 'files' from source_directory to this one.
Args:
source_directory (:class:`.CasBasedDirectory`): The directory to import from
files ([str]): List of pathnames to import.
path_prefix (str): Prefix used to add entries to the file list result.
file_list_required: Whether to update the file list while processing.
"""
result = FileListResult()
processed_directories = set()
for f in files:
fullname = os.path.join(path_prefix, f)
components = f.split(os.path.sep)
if len(components) > 1:
# We are importing a thing which is in a subdirectory. We may have already seen this dirname
# for a previous file.
dirname = components[0]
if dirname not in processed_directories:
# Now strip off the first directory name and import files recursively.
subcomponents = CasBasedDirectory._files_in_subdir(files, dirname)
# We will fail at this point if there is a file or symlink to file called 'dirname'.
if dirname in self.index:
resolved_component = self._resolve(dirname, force_create=True)
if isinstance(resolved_component, remote_execution_pb2.FileNode):
dest_subdir = self._replace_anything_with_dir(dirname, path_prefix, result.overwritten)
else:
dest_subdir = resolved_component
else:
dest_subdir = self.descend(dirname, create=True)
src_subdir = source_directory.descend(dirname)
import_result = dest_subdir._partial_import_cas_into_cas(src_subdir, subcomponents,
path_prefix=fullname,
file_list_required=file_list_required)
result.combine(import_result)
processed_directories.add(dirname)
elif isinstance(source_directory.index[f].buildstream_object, CasBasedDirectory):
# The thing in the input file list is a directory on
# its own. We don't need to do anything other than create it if it doesn't exist.
# If we already have an entry with the same name that isn't a directory, that
# will be dealt with when importing files in this directory.
if f not in self.index:
self.descend(f, create=True)
else:
# We're importing a file or symlink - replace anything with the same name.
importable = self._check_replacement(f, path_prefix, result)
if importable:
item = source_directory.index[f].pb_object
if isinstance(item, remote_execution_pb2.FileNode):
filenode = self.pb2_directory.files.add(digest=item.digest, name=f,
is_executable=item.is_executable)
self.index[f] = IndexEntry(filenode, modified=True)
else:
assert isinstance(item, remote_execution_pb2.SymlinkNode)
self._add_new_link_direct(name=f, target=item.target)
else:
result.ignored.append(os.path.join(path_prefix, f))
return result
def _import_cas_into_cas(self, source_directory, files=None):
""" A full import is significantly quicker than a partial import, because we can just
replace one directory with another's hash, without doing any recursion.
"""
# You must pass a list into _partial_import (not a generator)
return self._partial_import_cas_into_cas(source_directory, list(files))
def import_files(self, external_pathspec, *, files=None,
report_written=True, update_utimes=False,
can_link=False):
......@@ -378,28 +582,27 @@ class CasBasedDirectory(Directory):
can_link (bool): Ignored, since hard links do not have any meaning within CAS.
"""
if isinstance(external_pathspec, FileBasedDirectory):
source_directory = external_pathspec._get_underlying_directory()
elif isinstance(external_pathspec, CasBasedDirectory):
# TODO: This transfers from one CAS to another via the
# filesystem, which is very inefficient. Alter this so it
# transfers refs across directly.
with tempfile.TemporaryDirectory(prefix="roundtrip") as tmpdir:
external_pathspec.export_files(tmpdir)
if files is None:
files = list_relative_paths(tmpdir)
result = self._import_files_from_directory(tmpdir, files=files)
return result
else:
source_directory = external_pathspec
if files is None:
files = list_relative_paths(source_directory)
if isinstance(external_pathspec, str):
files = list_relative_paths(external_pathspec)
else:
assert isinstance(external_pathspec, Directory)
files = external_pathspec.list_relative_paths()
if isinstance(external_pathspec, FileBasedDirectory):
source_directory = external_pathspec.get_underlying_directory()
result = self._import_files_from_directory(source_directory, files=files)
elif isinstance(external_pathspec, str):
source_directory = external_pathspec
result = self._import_files_from_directory(source_directory, files=files)
else:
assert isinstance(external_pathspec, CasBasedDirectory)
result = self._import_cas_into_cas(external_pathspec, files=files)
# TODO: No notice is taken of report_written, update_utimes or can_link.
# Current behaviour is to fully populate the report, which is inefficient,
# but still correct.
result = self._import_files_from_directory(source_directory, files=files)
# We need to recalculate and store the hashes of all directories both
# up and down the tree; we have changed our directory by importing files
......@@ -526,7 +729,7 @@ class CasBasedDirectory(Directory):
filelist.append(k)
return filelist
def list_relative_paths(self):
def list_relative_paths(self, relpath=""):
"""Provide a list of all relative paths.
NOTE: This list is not in the same order as utils.list_relative_paths.
......@@ -534,13 +737,32 @@ class CasBasedDirectory(Directory):
Return value: List(str) - list of all paths
"""
filelist = []
for (k, v) in self.index.items():
if isinstance(v.buildstream_object, CasBasedDirectory):
filelist.extend([k + os.path.sep + x for x in v.buildstream_object.list_relative_paths()])
elif isinstance(v.pb_object, remote_execution_pb2.FileNode):
filelist.append(k)
return filelist
symlink_list = filter(lambda i: isinstance(i[1].pb_object, remote_execution_pb2.SymlinkNode),
self.index.items())
file_list = list(filter(lambda i: isinstance(i[1].pb_object, remote_execution_pb2.FileNode),
self.index.items()))
directory_list = filter(lambda i: isinstance(i[1].buildstream_object, CasBasedDirectory),
self.index.items())
# We need to mimic the behaviour of os.walk, in which symlinks
# to directories count as directories and symlinks to file or
# broken symlinks count as files. os.walk doesn't follow
# symlinks, so we don't recurse.
for (k, v) in sorted(symlink_list):
target = self._resolve(k, absolute_symlinks_resolve=True)
if isinstance(target, CasBasedDirectory):
yield os.path.join(relpath, k)
else:
file_list.append((k, v))
if file_list == [] and relpath != "":
yield relpath
else:
for (k, v) in sorted(file_list):
yield os.path.join(relpath, k)
for (k, v) in sorted(directory_list):
yield from v.buildstream_object.list_relative_paths(relpath=os.path.join(relpath, k))
def recalculate_hash(self):
""" Recalcuates the hash for this directory and store the results in
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment