Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • willsalmon/buildstream
  • CumHoleZH/buildstream
  • tchaik/buildstream
  • DCotyPortfolio/buildstream
  • jesusoctavioas/buildstream
  • patrickmmartin/buildstream
  • franred/buildstream
  • tintou/buildstream
  • alatiera/buildstream
  • martinblanchard/buildstream
  • neverdie22042524/buildstream
  • Mattlk13/buildstream
  • PServers/buildstream
  • phamnghia610909/buildstream
  • chiaratolentino/buildstream
  • eysz7-x-x/buildstream
  • kerrick1/buildstream
  • matthew-yates/buildstream
  • twofeathers/buildstream
  • mhadjimichael/buildstream
  • pointswaves/buildstream
  • Mr.JackWilson/buildstream
  • Tw3akG33k/buildstream
  • AlexFazakas/buildstream
  • eruidfkiy/buildstream
  • clamotion2/buildstream
  • nanonyme/buildstream
  • wickyjaaa/buildstream
  • nmanchev/buildstream
  • bojorquez.ja/buildstream
  • mostynb/buildstream
  • highpit74/buildstream
  • Demo112/buildstream
  • ba2014sheer/buildstream
  • tonimadrino/buildstream
  • usuario2o/buildstream
  • Angelika123456/buildstream
  • neo355/buildstream
  • corentin-ferlay/buildstream
  • coldtom/buildstream
  • wifitvbox81/buildstream
  • 358253885/buildstream
  • seanborg/buildstream
  • SotK/buildstream
  • DouglasWinship/buildstream
  • karansthr97/buildstream
  • louib/buildstream
  • bwh-ct/buildstream
  • robjh/buildstream
  • we88c0de/buildstream
  • zhengxian5555/buildstream
51 results
Show changes
Commits on Source (67)
Showing
with 1141 additions and 454 deletions
......@@ -10,6 +10,11 @@ stages:
- test
- post
variables:
PYTEST_ADDOPTS: "--color=yes"
INTEGRATION_CACHE: "${CI_PROJECT_DIR}/cache/integration-cache"
TEST_COMMAND: 'python3 setup.py test --index-url invalid://uri --addopts --integration'
#####################################################
# Prepare stage #
#####################################################
......@@ -52,66 +57,63 @@ source_dist:
# Run premerge commits
#
.linux-tests-template: &linux-tests
.tests-template: &tests
stage: test
variables:
PYTEST_ADDOPTS: "--color=yes"
script:
COVERAGE_DIR: coverage-linux
before_script:
# Diagnostics
- mount
- df -h
- useradd -Um buildstream
- chown -R buildstream:buildstream .
- export INTEGRATION_CACHE="$(pwd)/cache/integration-cache"
# Unpack and get into dist/buildstream
# Unpack
- cd dist && ./unpack.sh
- chown -R buildstream:buildstream buildstream
- cd buildstream
script:
- useradd -Um buildstream
- chown -R buildstream:buildstream .
# Run the tests from the source distribution, We run as a simple
# user to test for permission issues
- su buildstream -c 'python3 setup.py test --index-url invalid://uri --addopts --integration'
- su buildstream -c "${TEST_COMMAND}"
# Go back to the toplevel and collect our reports
- cd ../..
- mkdir -p coverage-linux/
- cp dist/buildstream/.coverage coverage-linux/coverage."${CI_JOB_NAME}"
after_script:
# Collect our reports
- mkdir -p ${COVERAGE_DIR}
- cp dist/buildstream/.coverage ${COVERAGE_DIR}/coverage."${CI_JOB_NAME}"
except:
- schedules
artifacts:
paths:
- coverage-linux/
- ${COVERAGE_DIR}
tests-debian-9:
image: buildstream/testsuite-debian:9-master-123-7ce6581b
<<: *linux-tests
except:
- schedules
<<: *tests
tests-fedora-27:
image: buildstream/testsuite-fedora:27-master-123-7ce6581b
<<: *linux-tests
except:
- schedules
<<: *tests
tests-fedora-28:
image: buildstream/testsuite-fedora:28-master-123-7ce6581b
<<: *linux-tests
except:
- schedules
<<: *tests
tests-ubuntu-18.04:
image: buildstream/testsuite-ubuntu:18.04-master-123-7ce6581b
<<: *linux-tests
except:
- schedules
<<: *tests
overnight-fedora-28-aarch64:
image: buildstream/testsuite-fedora:aarch64-28-master-123-7ce6581b
tags:
- aarch64
<<: *linux-tests
<<: *tests
# We need to override the exclusion from the template
# in order to run on schedules
except: []
only:
- schedules
......@@ -119,13 +121,12 @@ tests-unix:
# Use fedora here, to a) run a test on fedora and b) ensure that we
# can get rid of ostree - this is not possible with debian-8
image: buildstream/testsuite-fedora:27-master-123-7ce6581b
stage: test
<<: *tests
variables:
BST_FORCE_BACKEND: "unix"
PYTEST_ADDOPTS: "--color=yes"
script:
COVERAGE_DIR: coverage-unix
- export INTEGRATION_CACHE="$(pwd)/cache/integration-cache"
script:
# We remove the Bubblewrap and OSTree packages here so that we catch any
# codepaths that try to use them. Removing OSTree causes fuse-libs to
......@@ -133,22 +134,26 @@ tests-unix:
- dnf mark install fuse-libs
- dnf erase -y bubblewrap ostree
# Unpack and get into dist/buildstream
- cd dist && ./unpack.sh && cd buildstream
# Since the unix platform is required to run as root, no user change required
- python3 setup.py test --index-url invalid://uri --addopts --integration
- ${TEST_COMMAND}
# Go back to the toplevel and collect our reports
- cd ../..
- mkdir -p coverage-unix/
- cp dist/buildstream/.coverage coverage-unix/coverage.unix
except:
- schedules
artifacts:
paths:
- coverage-unix/
- logs-unix/
tests-fedora-missing-deps:
# Ensure that tests behave nicely while missing bwrap and ostree
image: buildstream/testsuite-fedora:28-master-119-552f5fc6
<<: *tests
script:
# We remove the Bubblewrap and OSTree packages here so that we catch any
# codepaths that try to use them. Removing OSTree causes fuse-libs to
# disappear unless we mark it as user-installed.
- dnf mark install fuse-libs
- dnf erase -y bubblewrap ostree
- useradd -Um buildstream
- chown -R buildstream:buildstream .
- ${TEST_COMMAND}
# Automatically build documentation for every commit, we want to know
......@@ -177,8 +182,8 @@ docs:
stage: test
variables:
BST_EXT_URL: git+https://gitlab.com/BuildStream/bst-external.git
BST_EXT_REF: 1d6ab71151b93c8cbc0a91a36ffe9270f3b835f1 # 0.5.1
FD_SDK_REF: 88d7c22c2281b987faa02edd57df80d430eecf1f # 18.08.11-35-g88d7c22c
BST_EXT_REF: 573843768f4d297f85dc3067465b3c7519a8dcc3 # 0.7.0
FD_SDK_REF: 612f66e218445eee2b1a9d7dd27c9caba571612e # freedesktop-sdk-18.08.19-54-g612f66e2
before_script:
- |
mkdir -p "${HOME}/.config"
......@@ -273,7 +278,7 @@ coverage:
- pip3 install --no-index .
- mkdir report
- cd report
- cp ../../../coverage-unix/coverage.unix .
- cp ../../../coverage-unix/coverage.* .
- cp ../../../coverage-linux/coverage.* .
- ls coverage.*
- coverage combine --rcfile=../.coveragerc -a coverage.*
......
......@@ -38,13 +38,35 @@ buildstream 1.3.1
a bug fix to workspaces so they can be build in workspaces too.
o Creating a build shell through the interactive mode or `bst shell --build`
will now use the cached build tree. It is now easier to debug local build
failures.
will now use the cached build tree if available locally. It is now easier to
debug local build failures.
o `bst shell --sysroot` now takes any directory that contains a sysroot,
instead of just a specially-formatted build-root with a `root` and `scratch`
subdirectory.
o The buildstream.conf file learned new 'prompt.auto-init',
'prompt.really-workspace-close-remove-dir', and
'prompt.really-workspace-reset-hard' options. These allow users to suppress
certain confirmation prompts, e.g. double-checking that the user meant to
run the command as typed.
o Due to the element `build tree` being cached in the respective artifact their
size in some cases has significantly increased. In *most* cases the build trees
are not utilised when building targets, as such by default bst 'pull' & 'build'
will not fetch build trees from remotes. This behaviour can be overridden with
the cli main option '--pull-buildtrees', or the user configuration cache group
option 'pull-buildtrees = True'. The override will also add the build tree to
already cached artifacts. When attempting to populate an artifactcache server
with cached artifacts, only 'complete' elements can be pushed. If the element
is expected to have a populated build tree then it must be cached before pushing.
o Added new `bst source-checkout` command to checkout sources of an element.
o `bst workspace open` now supports the creation of multiple elements and
allows the user to set a default location for their creation. This has meant
that the new CLI is no longer backwards compatible with buildstream 1.2.
=================
buildstream 1.1.5
......
......@@ -476,6 +476,22 @@ class ArtifactCache():
return self.cas.contains(ref)
# contains_subdir_artifact():
#
# Check whether an artifact element contains a digest for a subdir
# which is populated in the cache, i.e non dangling.
#
# Args:
# element (Element): The Element to check
# key (str): The cache key to use
# subdir (str): The subdir to check
#
# Returns: True if the subdir exists & is populated in the cache, False otherwise
#
def contains_subdir_artifact(self, element, key, subdir):
ref = self.get_artifact_fullname(element, key)
return self.cas.contains_subdir_artifact(ref, subdir)
# list_artifacts():
#
# List artifacts in this cache in LRU order.
......@@ -533,6 +549,7 @@ class ArtifactCache():
# Args:
# element (Element): The Element to extract
# key (str): The cache key to use
# subdir (str): Optional specific subdir to extract
#
# Raises:
# ArtifactError: In cases there was an OSError, or if the artifact
......@@ -540,12 +557,12 @@ class ArtifactCache():
#
# Returns: path to extracted artifact
#
def extract(self, element, key):
def extract(self, element, key, subdir=None):
ref = self.get_artifact_fullname(element, key)
path = os.path.join(self.extractdir, element._get_project().name, element.normal_name)
return self.cas.extract(ref, path)
return self.cas.extract(ref, path, subdir=subdir)
# commit():
#
......@@ -666,11 +683,13 @@ class ArtifactCache():
# element (Element): The Element whose artifact is to be fetched
# key (str): The cache key to use
# progress (callable): The progress callback, if any
# subdir (str): The optional specific subdir to pull
# excluded_subdirs (list): The optional list of subdirs to not pull
#
# Returns:
# (bool): True if pull was successful, False if artifact was not available
#
def pull(self, element, key, *, progress=None):
def pull(self, element, key, *, progress=None, subdir=None, excluded_subdirs=None):
ref = self.get_artifact_fullname(element, key)
project = element._get_project()
......@@ -680,8 +699,13 @@ class ArtifactCache():
display_key = element._get_brief_display_key()
element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
if self.cas.pull(ref, remote, progress=progress):
if self.cas.pull(ref, remote, progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs):
element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
if subdir:
# Attempt to extract subdir into artifact extract dir if it already exists
# without containing the subdir. If the respective artifact extract dir does not
# exist a complete extraction will complete.
self.extract(element, key, subdir)
# no need to pull from additional remotes
return True
else:
......
......@@ -24,7 +24,7 @@ import os
import stat
import tempfile
import uuid
import errno
import contextlib
from urllib.parse import urlparse
import grpc
......@@ -43,6 +43,13 @@ from .._exceptions import CASError
_MAX_PAYLOAD_BYTES = 1024 * 1024
class BlobNotFound(CASError):
def __init__(self, blob, msg):
self.blob = blob
super().__init__(msg)
# A CASCache manages a CAS repository as specified in the Remote Execution API.
#
# Args:
......@@ -82,6 +89,27 @@ class CASCache():
# This assumes that the repository doesn't have any dangling pointers
return os.path.exists(refpath)
# contains_subdir_artifact():
#
# Check whether the specified artifact element tree has a digest for a subdir
# which is populated in the cache, i.e non dangling.
#
# Args:
# ref (str): The ref to check
# subdir (str): The subdir to check
#
# Returns: True if the subdir exists & is populated in the cache, False otherwise
#
def contains_subdir_artifact(self, ref, subdir):
tree = self.resolve_ref(ref)
# This assumes that the subdir digest is present in the element tree
subdirdigest = self._get_subdir(tree, subdir)
objpath = self.objpath(subdirdigest)
# True if subdir content is cached or if empty as expected
return os.path.exists(objpath)
# extract():
#
# Extract cached directory for the specified ref if it hasn't
......@@ -90,37 +118,44 @@ class CASCache():
# Args:
# ref (str): The ref whose directory to extract
# path (str): The destination path
# subdir (str): Optional specific dir to extract
#
# Raises:
# CASError: In cases there was an OSError, or if the ref did not exist.
#
# Returns: path to extracted directory
#
def extract(self, ref, path):
def extract(self, ref, path, subdir=None):
tree = self.resolve_ref(ref, update_mtime=True)
dest = os.path.join(path, tree.hash)
originaldest = dest = os.path.join(path, tree.hash)
# If artifact is already extracted, check if the optional subdir
# has also been extracted. If the artifact has not been extracted
# a full extraction would include the optional subdir
if os.path.isdir(dest):
# directory has already been extracted
return dest
if subdir:
if not os.path.isdir(os.path.join(dest, subdir)):
dest = os.path.join(dest, subdir)
tree = self._get_subdir(tree, subdir)
else:
return dest
else:
return dest
with tempfile.TemporaryDirectory(prefix='tmp', dir=self.tmpdir) as tmpdir:
checkoutdir = os.path.join(tmpdir, ref)
self._checkout(checkoutdir, tree)
os.makedirs(os.path.dirname(dest), exist_ok=True)
try:
os.rename(checkoutdir, dest)
utils.move_atomic(checkoutdir, dest)
except utils.DirectoryExistsError:
# Another process beat us to rename
pass
except OSError as e:
# With rename it's possible to get either ENOTEMPTY or EEXIST
# in the case that the destination path is a not empty directory.
#
# If rename fails with these errors, another process beat
# us to it so just ignore.
if e.errno not in [errno.ENOTEMPTY, errno.EEXIST]:
raise CASError("Failed to extract directory for ref '{}': {}".format(ref, e)) from e
raise CASError("Failed to extract directory for ref '{}': {}".format(ref, e)) from e
return dest
return originaldest
# commit():
#
......@@ -193,11 +228,13 @@ class CASCache():
# ref (str): The ref to pull
# remote (CASRemote): The remote repository to pull from
# progress (callable): The progress callback, if any
# subdir (str): The optional specific subdir to pull
# excluded_subdirs (list): The optional list of subdirs to not pull
#
# Returns:
# (bool): True if pull was successful, False if ref was not available
#
def pull(self, ref, remote, *, progress=None):
def pull(self, ref, remote, *, progress=None, subdir=None, excluded_subdirs=None):
try:
remote.init()
......@@ -209,7 +246,12 @@ class CASCache():
tree.hash = response.digest.hash
tree.size_bytes = response.digest.size_bytes
self._fetch_directory(remote, tree)
# Check if the element artifact is present, if so just fetch the subdir.
if subdir and os.path.exists(self.objpath(tree)):
self._fetch_subdir(remote, tree, subdir)
else:
# Fetch artifact, excluded_subdirs determined in pullqueue
self._fetch_directory(remote, tree, excluded_subdirs=excluded_subdirs)
self.set_ref(ref, tree)
......@@ -219,6 +261,8 @@ class CASCache():
raise CASError("Failed to pull ref {}: {}".format(ref, e)) from e
else:
return False
except BlobNotFound as e:
return False
# pull_tree():
#
......@@ -391,13 +435,14 @@ class CASCache():
# digest (Digest): An optional Digest object to populate
# path (str): Path to file to add
# buffer (bytes): Byte buffer to add
# link_directly (bool): Whether file given by path can be linked
#
# Returns:
# (Digest): The digest of the added object
#
# Either `path` or `buffer` must be passed, but not both.
#
def add_object(self, *, digest=None, path=None, buffer=None):
def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False):
# Exactly one of the two parameters has to be specified
assert (path is None) != (buffer is None)
......@@ -407,28 +452,34 @@ class CASCache():
try:
h = hashlib.sha256()
# Always write out new file to avoid corruption if input file is modified
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
# Set mode bits to 0644
os.chmod(out.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
if path:
with open(path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
h.update(chunk)
out.write(chunk)
with contextlib.ExitStack() as stack:
if path is not None and link_directly:
tmp = stack.enter_context(open(path, 'rb'))
for chunk in iter(lambda: tmp.read(4096), b""):
h.update(chunk)
else:
h.update(buffer)
out.write(buffer)
tmp = stack.enter_context(tempfile.NamedTemporaryFile(dir=self.tmpdir))
# Set mode bits to 0644
os.chmod(tmp.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
if path:
with open(path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
h.update(chunk)
tmp.write(chunk)
else:
h.update(buffer)
tmp.write(buffer)
out.flush()
tmp.flush()
digest.hash = h.hexdigest()
digest.size_bytes = os.fstat(out.fileno()).st_size
digest.size_bytes = os.fstat(tmp.fileno()).st_size
# Place file at final location
objpath = self.objpath(digest)
os.makedirs(os.path.dirname(objpath), exist_ok=True)
os.link(out.name, objpath)
os.link(tmp.name, objpath)
except FileExistsError as e:
# We can ignore the failed link() if the object is already in the repo.
......@@ -526,6 +577,41 @@ class CASCache():
# first ref of this list will be the file modified earliest.
return [ref for _, ref in sorted(zip(mtimes, refs))]
# list_objects():
#
# List cached objects in Least Recently Modified (LRM) order.
#
# Returns:
# (list) - A list of objects and timestamps in LRM order
#
def list_objects(self):
objs = []
mtimes = []
for root, _, files in os.walk(os.path.join(self.casdir, 'objects')):
for filename in files:
obj_path = os.path.join(root, filename)
try:
mtimes.append(os.path.getmtime(obj_path))
except FileNotFoundError:
pass
else:
objs.append(obj_path)
# NOTE: Sorted will sort from earliest to latest, thus the
# first element of this list will be the file modified earliest.
return sorted(zip(mtimes, objs))
def clean_up_refs_until(self, time):
ref_heads = os.path.join(self.casdir, 'refs', 'heads')
for root, _, files in os.walk(ref_heads):
for filename in files:
ref_path = os.path.join(root, filename)
# Obtain the mtime (the time a file was last modified)
if os.path.getmtime(ref_path) < time:
os.unlink(ref_path)
# remove():
#
# Removes the given symbolic ref from the repo.
......@@ -585,6 +671,10 @@ class CASCache():
return pruned
def update_tree_mtime(self, tree):
reachable = set()
self._reachable_refs_dir(reachable, tree, update_mtime=True)
################################################
# Local Private Methods #
################################################
......@@ -607,8 +697,10 @@ class CASCache():
stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
for dirnode in directory.directories:
fullpath = os.path.join(dest, dirnode.name)
self._checkout(fullpath, dirnode.digest)
# Don't try to checkout a dangling ref
if os.path.exists(self.objpath(dirnode.digest)):
fullpath = os.path.join(dest, dirnode.name)
self._checkout(fullpath, dirnode.digest)
for symlinknode in directory.symlinks:
# symlink
......@@ -729,10 +821,13 @@ class CASCache():
a += 1
b += 1
def _reachable_refs_dir(self, reachable, tree):
def _reachable_refs_dir(self, reachable, tree, update_mtime=False):
if tree.hash in reachable:
return
if update_mtime:
os.utime(self.objpath(tree))
reachable.add(tree.hash)
directory = remote_execution_pb2.Directory()
......@@ -741,10 +836,12 @@ class CASCache():
directory.ParseFromString(f.read())
for filenode in directory.files:
if update_mtime:
os.utime(self.objpath(filenode.digest))
reachable.add(filenode.digest.hash)
for dirnode in directory.directories:
self._reachable_refs_dir(reachable, dirnode.digest)
self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime)
def _required_blobs(self, directory_digest):
# parse directory, and recursively add blobs
......@@ -798,7 +895,7 @@ class CASCache():
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
self._fetch_blob(remote, digest, f)
added_digest = self.add_object(path=f.name)
added_digest = self.add_object(path=f.name, link_directly=True)
assert added_digest.hash == digest.hash
return objpath
......@@ -809,7 +906,7 @@ class CASCache():
f.write(data)
f.flush()
added_digest = self.add_object(path=f.name)
added_digest = self.add_object(path=f.name, link_directly=True)
assert added_digest.hash == digest.hash
# Helper function for _fetch_directory().
......@@ -863,11 +960,14 @@ class CASCache():
# Args:
# remote (Remote): The remote to use.
# dir_digest (Digest): Digest object for the directory to fetch.
# excluded_subdirs (list): The optional list of subdirs to not fetch
#
def _fetch_directory(self, remote, dir_digest):
def _fetch_directory(self, remote, dir_digest, *, excluded_subdirs=None):
fetch_queue = [dir_digest]
fetch_next_queue = []
batch = _CASBatchRead(remote)
if not excluded_subdirs:
excluded_subdirs = []
while len(fetch_queue) + len(fetch_next_queue) > 0:
if not fetch_queue:
......@@ -882,8 +982,9 @@ class CASCache():
directory.ParseFromString(f.read())
for dirnode in directory.directories:
batch = self._fetch_directory_node(remote, dirnode.digest, batch,
fetch_queue, fetch_next_queue, recursive=True)
if dirnode.name not in excluded_subdirs:
batch = self._fetch_directory_node(remote, dirnode.digest, batch,
fetch_queue, fetch_next_queue, recursive=True)
for filenode in directory.files:
batch = self._fetch_directory_node(remote, filenode.digest, batch,
......@@ -892,6 +993,10 @@ class CASCache():
# Fetch final batch
self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
def _fetch_subdir(self, remote, tree, subdir):
subdirdigest = self._get_subdir(tree, subdir)
self._fetch_directory(remote, subdirdigest)
def _fetch_tree(self, remote, digest):
# download but do not store the Tree object
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
......@@ -1113,6 +1218,9 @@ class _CASBatchRead():
batch_response = self._remote.cas.BatchReadBlobs(self._request)
for response in batch_response.responses:
if response.status.code == code_pb2.NOT_FOUND:
raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format(
response.digest.hash, response.status.code))
if response.status.code != code_pb2.OK:
raise CASError("Failed to download blob {}: {}".format(
response.digest.hash, response.status.code))
......
......@@ -24,6 +24,9 @@ import signal
import sys
import tempfile
import uuid
import errno
import ctypes
import threading
import click
import grpc
......@@ -31,6 +34,7 @@ import grpc
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
from .._protos.google.rpc import code_pb2
from .._exceptions import CASError
......@@ -55,18 +59,22 @@ class ArtifactTooLargeException(Exception):
# repo (str): Path to CAS repository
# enable_push (bool): Whether to allow blob uploads and artifact updates
#
def create_server(repo, *, enable_push):
def create_server(repo, *, enable_push,
max_head_size=int(10e9),
min_head_size=int(2e9)):
cas = CASCache(os.path.abspath(repo))
# Use max_workers default from Python 3.5+
max_workers = (os.cpu_count() or 1) * 5
server = grpc.server(futures.ThreadPoolExecutor(max_workers))
cache_cleaner = _CacheCleaner(cas, max_head_size, min_head_size)
bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
_ByteStreamServicer(cas, enable_push=enable_push), server)
_ByteStreamServicer(cas, cache_cleaner, enable_push=enable_push), server)
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
_ContentAddressableStorageServicer(cas, enable_push=enable_push), server)
_ContentAddressableStorageServicer(cas, cache_cleaner, enable_push=enable_push), server)
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
_CapabilitiesServicer(), server)
......@@ -84,9 +92,19 @@ def create_server(repo, *, enable_push):
@click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)")
@click.option('--enable-push', default=False, is_flag=True,
help="Allow clients to upload blobs and update artifact cache")
@click.option('--head-room-min', type=click.INT,
help="Disk head room minimum in bytes",
default=2e9)
@click.option('--head-room-max', type=click.INT,
help="Disk head room maximum in bytes",
default=10e9)
@click.argument('repo')
def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
server = create_server(repo, enable_push=enable_push)
def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
head_room_min, head_room_max):
server = create_server(repo,
max_head_size=head_room_max,
min_head_size=head_room_min,
enable_push=enable_push)
use_tls = bool(server_key)
......@@ -127,11 +145,43 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
server.stop(0)
class _FallocateCall:
FALLOC_FL_KEEP_SIZE = 1
FALLOC_FL_PUNCH_HOLE = 2
FALLOC_FL_NO_HIDE_STALE = 4
FALLOC_FL_COLLAPSE_RANGE = 8
FALLOC_FL_ZERO_RANGE = 16
FALLOC_FL_INSERT_RANGE = 32
FALLOC_FL_UNSHARE_RANGE = 64
def __init__(self):
self.libc = ctypes.CDLL("libc.so.6", use_errno=True)
try:
self.fallocate64 = self.libc.fallocate64
except AttributeError:
self.fallocate = self.libc.fallocate
def __call__(self, fd, mode, offset, length):
if hasattr(self, 'fallocate64'):
ret = self.fallocate64(ctypes.c_int(fd), ctypes.c_int(mode),
ctypes.c_int64(offset), ctypes.c_int64(length))
else:
ret = self.fallocate(ctypes.c_int(fd), ctypes.c_int(mode),
ctypes.c_int(offset), ctypes.c_int(length))
if ret == -1:
err = ctypes.get_errno()
raise OSError(errno, os.strerror(err))
return ret
class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
def __init__(self, cas, *, enable_push):
def __init__(self, cas, cache_cleaner, *, enable_push):
super().__init__()
self.cas = cas
self.enable_push = enable_push
self.fallocate = _FallocateCall()
self.cache_cleaner = cache_cleaner
def Read(self, request, context):
resource_name = request.resource_name
......@@ -189,17 +239,34 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
context.set_code(grpc.StatusCode.NOT_FOUND)
return response
try:
_clean_up_cache(self.cas, client_digest.size_bytes)
except ArtifactTooLargeException as e:
context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
context.set_details(str(e))
return response
while True:
if client_digest.size_bytes == 0:
break
try:
self.cache_cleaner.clean_up(client_digest.size_bytes)
except ArtifactTooLargeException as e:
context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
context.set_details(str(e))
return response
try:
self.fallocate(out.fileno(), 0, 0, client_digest.size_bytes)
break
except OSError as e:
# Multiple upload can happen in the same time
if e.errno != errno.ENOSPC:
raise
elif request.resource_name:
# If it is set on subsequent calls, it **must** match the value of the first request.
if request.resource_name != resource_name:
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
return response
if (offset + len(request.data)) > client_digest.size_bytes:
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
return response
out.write(request.data)
offset += len(request.data)
if request.finish_write:
......@@ -207,7 +274,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
return response
out.flush()
digest = self.cas.add_object(path=out.name)
digest = self.cas.add_object(path=out.name, link_directly=True)
if digest.hash != client_digest.hash:
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
return response
......@@ -220,18 +287,26 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
def __init__(self, cas, *, enable_push):
def __init__(self, cas, cache_cleaner, *, enable_push):
super().__init__()
self.cas = cas
self.enable_push = enable_push
self.cache_cleaner = cache_cleaner
def FindMissingBlobs(self, request, context):
response = remote_execution_pb2.FindMissingBlobsResponse()
for digest in request.blob_digests:
if not _has_object(self.cas, digest):
d = response.missing_blob_digests.add()
d.hash = digest.hash
d.size_bytes = digest.size_bytes
objpath = self.cas.objpath(digest)
try:
os.utime(objpath)
except OSError as e:
if e.errno != errno.ENOENT:
raise
else:
d = response.missing_blob_digests.add()
d.hash = digest.hash
d.size_bytes = digest.size_bytes
return response
def BatchReadBlobs(self, request, context):
......@@ -250,12 +325,12 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
try:
with open(self.cas.objpath(digest), 'rb') as f:
if os.fstat(f.fileno()).st_size != digest.size_bytes:
blob_response.status.code = grpc.StatusCode.NOT_FOUND
blob_response.status.code = code_pb2.NOT_FOUND
continue
blob_response.data = f.read(digest.size_bytes)
except FileNotFoundError:
blob_response.status.code = grpc.StatusCode.NOT_FOUND
blob_response.status.code = code_pb2.NOT_FOUND
return response
......@@ -285,7 +360,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
continue
try:
_clean_up_cache(self.cas, digest.size_bytes)
self.cache_cleaner.clean_up(digest.size_bytes)
with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
out.write(blob_request.data)
......@@ -328,6 +403,12 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
try:
tree = self.cas.resolve_ref(request.key, update_mtime=True)
try:
self.cas.update_tree_mtime(tree)
except FileNotFoundError:
self.cas.remove(request.key, defer_prune=True)
context.set_code(grpc.StatusCode.NOT_FOUND)
return response
response.digest.hash = tree.hash
response.digest.size_bytes = tree.size_bytes
......@@ -400,60 +481,79 @@ def _digest_from_upload_resource_name(resource_name):
return None
def _has_object(cas, digest):
objpath = cas.objpath(digest)
return os.path.exists(objpath)
class _CacheCleaner:
__cleanup_cache_lock = threading.Lock()
# _clean_up_cache()
#
# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
# is enough space for the incoming artifact
#
# Args:
# cas: CASCache object
# object_size: The size of the object being received in bytes
#
# Returns:
# int: The total bytes removed on the filesystem
#
def _clean_up_cache(cas, object_size):
# Determine the available disk space, in bytes, of the file system
# which mounts the repo
stats = os.statvfs(cas.casdir)
buffer_ = int(2e9) # Add a 2 GB buffer
free_disk_space = (stats.f_bfree * stats.f_bsize) - buffer_
total_disk_space = (stats.f_blocks * stats.f_bsize) - buffer_
if object_size > total_disk_space:
raise ArtifactTooLargeException("Artifact of size: {} is too large for "
"the filesystem which mounts the remote "
"cache".format(object_size))
if object_size <= free_disk_space:
# No need to clean up
return 0
# obtain a list of LRP artifacts
LRP_artifacts = cas.list_refs()
removed_size = 0 # in bytes
while object_size - removed_size > free_disk_space:
try:
to_remove = LRP_artifacts.pop(0) # The first element in the list is the LRP artifact
except IndexError:
# This exception is caught if there are no more artifacts in the list
# LRP_artifacts. This means the the artifact is too large for the filesystem
# so we abort the process
raise ArtifactTooLargeException("Artifact of size {} is too large for "
"the filesystem which mounts the remote "
"cache".format(object_size))
def __init__(self, cas, max_head_size, min_head_size=int(2e9)):
self.__cas = cas
self.__max_head_size = max_head_size
self.__min_head_size = min_head_size
removed_size += cas.remove(to_remove, defer_prune=False)
def __has_space(self, object_size):
stats = os.statvfs(self.__cas.casdir)
free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size
total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size
if removed_size > 0:
logging.info("Successfully removed {} bytes from the cache".format(removed_size))
else:
logging.info("No artifacts were removed from the cache.")
if object_size > total_disk_space:
raise ArtifactTooLargeException("Artifact of size: {} is too large for "
"the filesystem which mounts the remote "
"cache".format(object_size))
return removed_size
return object_size <= free_disk_space
# _clean_up_cache()
#
# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
# is enough space for the incoming artifact
#
# Args:
# object_size: The size of the object being received in bytes
#
# Returns:
# int: The total bytes removed on the filesystem
#
def clean_up(self, object_size):
if self.__has_space(object_size):
return 0
with _CacheCleaner.__cleanup_cache_lock:
if self.__has_space(object_size):
# Another thread has done the cleanup for us
return 0
stats = os.statvfs(self.__cas.casdir)
target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size
# obtain a list of LRP artifacts
LRP_objects = self.__cas.list_objects()
removed_size = 0 # in bytes
last_mtime = 0
while object_size - removed_size > target_disk_space:
try:
last_mtime, to_remove = LRP_objects.pop(0) # The first element in the list is the LRP artifact
except IndexError:
# This exception is caught if there are no more artifacts in the list
# LRP_artifacts. This means the the artifact is too large for the filesystem
# so we abort the process
raise ArtifactTooLargeException("Artifact of size {} is too large for "
"the filesystem which mounts the remote "
"cache".format(object_size))
try:
size = os.stat(to_remove).st_size
os.unlink(to_remove)
removed_size += size
except FileNotFoundError:
pass
self.__cas.clean_up_refs_until(last_mtime)
if removed_size > 0:
logging.info("Successfully removed {} bytes from the cache".format(removed_size))
else:
logging.info("No artifacts were removed from the cache.")
return removed_size
......@@ -59,29 +59,32 @@ class Context():
# The directory where build sandboxes will be created
self.builddir = None
# Default root location for workspaces
self.workspacedir = None
# The local binary artifact cache directory
self.artifactdir = None
# The locations from which to push and pull prebuilt artifacts
self.artifact_cache_specs = []
self.artifact_cache_specs = None
# The directory to store build logs
self.logdir = None
# The abbreviated cache key length to display in the UI
self.log_key_length = 0
self.log_key_length = None
# Whether debug mode is enabled
self.log_debug = False
self.log_debug = None
# Whether verbose mode is enabled
self.log_verbose = False
self.log_verbose = None
# Maximum number of lines to print from build logs
self.log_error_lines = 0
self.log_error_lines = None
# Maximum number of lines to print in the master log for a detailed message
self.log_message_lines = 0
self.log_message_lines = None
# Format string for printing the pipeline at startup time
self.log_element_format = None
......@@ -90,19 +93,37 @@ class Context():
self.log_message_format = None
# Maximum number of fetch or refresh tasks
self.sched_fetchers = 4
self.sched_fetchers = None
# Maximum number of build tasks
self.sched_builders = 4
self.sched_builders = None
# Maximum number of push tasks
self.sched_pushers = 4
self.sched_pushers = None
# Maximum number of retries for network tasks
self.sched_network_retries = 2
self.sched_network_retries = None
# What to do when a build fails in non interactive mode
self.sched_error_action = 'continue'
self.sched_error_action = None
# Size of the artifact cache in bytes
self.config_cache_quota = None
# Whether or not to attempt to pull build trees globally
self.pull_buildtrees = None
# Boolean, whether to offer to create a project for the user, if we are
# invoked outside of a directory where we can resolve the project.
self.prompt_auto_init = None
# Boolean, whether we double-check with the user that they meant to
# remove a workspace directory.
self.prompt_workspace_close_remove_dir = None
# Boolean, whether we double-check with the user that they meant to do
# a hard reset of a workspace, potentially losing changes.
self.prompt_workspace_reset_hard = None
# Whether elements must be rebuilt when their dependencies have changed
self._strict_build_plan = None
......@@ -120,7 +141,6 @@ class Context():
self._workspaces = None
self._log_handle = None
self._log_filename = None
self.config_cache_quota = 'infinity'
# load()
#
......@@ -160,10 +180,10 @@ class Context():
_yaml.node_validate(defaults, [
'sourcedir', 'builddir', 'artifactdir', 'logdir',
'scheduler', 'artifacts', 'logging', 'projects',
'cache'
'cache', 'prompt', 'workspacedir',
])
for directory in ['sourcedir', 'builddir', 'artifactdir', 'logdir']:
for directory in ['sourcedir', 'builddir', 'artifactdir', 'logdir', 'workspacedir']:
# Allow the ~ tilde expansion and any environment variables in
# path specification in the config files.
#
......@@ -178,13 +198,16 @@ class Context():
# our artifactdir - the artifactdir may not have been created
# yet.
cache = _yaml.node_get(defaults, Mapping, 'cache')
_yaml.node_validate(cache, ['quota'])
_yaml.node_validate(cache, ['quota', 'pull-buildtrees'])
self.config_cache_quota = _yaml.node_get(cache, str, 'quota', default_value='infinity')
self.config_cache_quota = _yaml.node_get(cache, str, 'quota')
# Load artifact share configuration
self.artifact_cache_specs = ArtifactCache.specs_from_config_node(defaults)
# Load pull build trees configuration
self.pull_buildtrees = _yaml.node_get(cache, bool, 'pull-buildtrees')
# Load logging config
logging = _yaml.node_get(defaults, Mapping, 'logging')
_yaml.node_validate(logging, [
......@@ -206,12 +229,34 @@ class Context():
'on-error', 'fetchers', 'builders',
'pushers', 'network-retries'
])
self.sched_error_action = _yaml.node_get(scheduler, str, 'on-error')
self.sched_error_action = _node_get_option_str(
scheduler, 'on-error', ['continue', 'quit', 'terminate'])
self.sched_fetchers = _yaml.node_get(scheduler, int, 'fetchers')
self.sched_builders = _yaml.node_get(scheduler, int, 'builders')
self.sched_pushers = _yaml.node_get(scheduler, int, 'pushers')
self.sched_network_retries = _yaml.node_get(scheduler, int, 'network-retries')
# Load prompt preferences
#
# We convert string options to booleans here, so we can be both user
# and coder-friendly. The string options are worded to match the
# responses the user would give at the cli, for least surprise. The
# booleans are converted here because it's easiest to eyeball that the
# strings are right.
#
prompt = _yaml.node_get(
defaults, Mapping, 'prompt')
_yaml.node_validate(prompt, [
'auto-init', 'really-workspace-close-remove-dir',
'really-workspace-reset-hard',
])
self.prompt_auto_init = _node_get_option_str(
prompt, 'auto-init', ['ask', 'no']) == 'ask'
self.prompt_workspace_close_remove_dir = _node_get_option_str(
prompt, 'really-workspace-close-remove-dir', ['ask', 'yes']) == 'ask'
self.prompt_workspace_reset_hard = _node_get_option_str(
prompt, 'really-workspace-reset-hard', ['ask', 'yes']) == 'ask'
# Load per-projects overrides
self._project_overrides = _yaml.node_get(defaults, Mapping, 'projects', default_value={})
......@@ -222,13 +267,6 @@ class Context():
profile_end(Topics.LOAD_CONTEXT, 'load')
valid_actions = ['continue', 'quit']
if self.sched_error_action not in valid_actions:
provenance = _yaml.node_get_provenance(scheduler, 'on-error')
raise LoadError(LoadErrorReason.INVALID_DATA,
"{}: on-error should be one of: {}".format(
provenance, ", ".join(valid_actions)))
@property
def artifactcache(self):
if not self._artifactcache:
......@@ -581,3 +619,30 @@ class Context():
os.environ['XDG_CONFIG_HOME'] = os.path.expanduser('~/.config')
if not os.environ.get('XDG_DATA_HOME'):
os.environ['XDG_DATA_HOME'] = os.path.expanduser('~/.local/share')
# _node_get_option_str()
#
# Like _yaml.node_get(), but also checks value is one of the allowed option
# strings. Fetches a value from a dictionary node, and makes sure it's one of
# the pre-defined options.
#
# Args:
# node (dict): The dictionary node
# key (str): The key to get a value for in node
# allowed_options (iterable): Only accept these values
#
# Returns:
# The value, if found in 'node'.
#
# Raises:
# LoadError, when the value is not of the expected type, or is not found.
#
def _node_get_option_str(node, key, allowed_options):
result = _yaml.node_get(node, str, key)
if result not in allowed_options:
provenance = _yaml.node_get_provenance(node, key)
raise LoadError(LoadErrorReason.INVALID_DATA,
"{}: {} should be one of: {}".format(
provenance, key, ", ".join(allowed_options)))
return result
......@@ -182,7 +182,8 @@ class App():
'fetchers': 'sched_fetchers',
'builders': 'sched_builders',
'pushers': 'sched_pushers',
'network_retries': 'sched_network_retries'
'network_retries': 'sched_network_retries',
'pull_buildtrees': 'pull_buildtrees'
}
for cli_option, context_attr in override_map.items():
option_value = self._main_options.get(cli_option)
......@@ -221,9 +222,10 @@ class App():
# Let's automatically start a `bst init` session in this case
if e.reason == LoadErrorReason.MISSING_PROJECT_CONF and self.interactive:
click.echo("A project was not detected in the directory: {}".format(directory), err=True)
click.echo("", err=True)
if click.confirm("Would you like to create a new project here ?"):
self.init_project(None)
if self.context.prompt_auto_init:
click.echo("", err=True)
if click.confirm("Would you like to create a new project here?"):
self.init_project(None)
self._error_exit(e, "Error loading project")
......
......@@ -219,6 +219,8 @@ def print_version(ctx, param, value):
help="Specify a project option")
@click.option('--default-mirror', default=None,
help="The mirror to fetch from first, before attempting other mirrors")
@click.option('--pull-buildtrees', is_flag=True, default=None,
help="Include an element's build tree when pulling remote element artifacts")
@click.pass_context
def cli(context, **kwargs):
"""Build and manipulate BuildStream projects
......@@ -662,6 +664,33 @@ def checkout(app, element, location, force, deps, integrate, hardlinks, tar):
tar=tar)
##################################################################
# Source Checkout Command #
##################################################################
@cli.command(name='source-checkout', short_help='Checkout sources for an element')
@click.option('--except', 'except_', multiple=True,
type=click.Path(readable=False),
help="Except certain dependencies")
@click.option('--deps', '-d', default='none',
type=click.Choice(['build', 'none', 'run', 'all']),
help='The dependencies whose sources to checkout (default: none)')
@click.option('--fetch', 'fetch_', default=False, is_flag=True,
help='Fetch elements if they are not fetched')
@click.argument('element',
type=click.Path(readable=False))
@click.argument('location', type=click.Path())
@click.pass_obj
def source_checkout(app, element, location, deps, fetch_, except_):
"""Checkout sources of an element to the specified location
"""
with app.initialized():
app.stream.source_checkout(element,
location=location,
deps=deps,
fetch=fetch_,
except_targets=except_)
##################################################################
# Workspace Command #
##################################################################
......@@ -678,31 +707,23 @@ def workspace():
@click.option('--no-checkout', default=False, is_flag=True,
help="Do not checkout the source, only link to the given directory")
@click.option('--force', '-f', default=False, is_flag=True,
help="Overwrite files existing in checkout directory")
help="The workspace will be created even if the directory in which it will be created is not empty " +
"or if a workspace for that element already exists")
@click.option('--track', 'track_', default=False, is_flag=True,
help="Track and fetch new source references before checking out the workspace")
@click.argument('element',
type=click.Path(readable=False))
@click.argument('directory', type=click.Path(file_okay=False))
@click.option('--directory', type=click.Path(file_okay=False), default=None,
help="Only for use when a single Element is given: Set the directory to use to create the workspace")
@click.argument('elements', nargs=-1, type=click.Path(readable=False), required=True)
@click.pass_obj
def workspace_open(app, no_checkout, force, track_, element, directory):
def workspace_open(app, no_checkout, force, track_, directory, elements):
"""Open a workspace for manual source modification"""
if os.path.exists(directory):
if not os.path.isdir(directory):
click.echo("Checkout directory is not a directory: {}".format(directory), err=True)
sys.exit(-1)
if not (no_checkout or force) and os.listdir(directory):
click.echo("Checkout directory is not empty: {}".format(directory), err=True)
sys.exit(-1)
with app.initialized():
app.stream.workspace_open(element, directory,
app.stream.workspace_open(elements,
no_checkout=no_checkout,
track_first=track_,
force=force)
force=force,
custom_dir=directory)
##################################################################
......@@ -743,7 +764,7 @@ def workspace_close(app, remove_dir, all_, elements):
if nonexisting:
raise AppError("Workspace does not exist", detail="\n".join(nonexisting))
if app.interactive and remove_dir:
if app.interactive and remove_dir and app.context.prompt_workspace_close_remove_dir:
if not click.confirm('This will remove all your changes, are you sure?'):
click.echo('Aborting', err=True)
sys.exit(-1)
......@@ -777,7 +798,7 @@ def workspace_reset(app, soft, track_, all_, elements):
if all_ and not app.stream.workspace_exists():
raise AppError("No open workspaces to reset")
if app.interactive and not soft:
if app.interactive and not soft and app.context.prompt_workspace_reset_hard:
if not click.confirm('This will remove all your changes, are you sure?'):
click.echo('Aborting', err=True)
sys.exit(-1)
......
......@@ -370,7 +370,7 @@ class Pipeline():
detail += " Element: {} is inconsistent\n".format(element._get_full_name())
for source in element.sources():
if source._get_consistency() == Consistency.INCONSISTENT:
detail += " Source {} is missing ref\n".format(source)
detail += " {} is missing ref\n".format(source)
detail += '\n'
detail += "Try tracking these elements first with `bst track`\n"
......@@ -383,6 +383,33 @@ class Pipeline():
detail += " " + element._get_full_name() + "\n"
raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline-workspaced")
# assert_sources_cached()
#
# Asserts that sources for the given list of elements are cached.
#
# Args:
# elements (list): The list of elements
#
def assert_sources_cached(self, elements):
uncached = []
with self._context.timed_activity("Checking sources"):
for element in elements:
if element._get_consistency() != Consistency.CACHED:
uncached.append(element)
if uncached:
detail = "Sources are not cached for the following elements:\n\n"
for element in uncached:
detail += " Following sources for element: {} are not cached:\n".format(element._get_full_name())
for source in element.sources():
if source._get_consistency() != Consistency.CACHED:
detail += " {}\n".format(source)
detail += '\n'
detail += "Try fetching these elements first with `bst fetch`,\n" + \
"or run this command with `--fetch` option\n"
raise PipelineError("Uncached sources", detail=detail, reason="uncached-sources")
#############################################################
# Private Methods #
#############################################################
......
......@@ -379,27 +379,7 @@ class Stream():
elements, _ = self._load((target,), (), fetch_subprojects=True)
target = elements[0]
if not tar:
try:
os.makedirs(location, exist_ok=True)
except OSError as e:
raise StreamError("Failed to create checkout directory: '{}'"
.format(e)) from e
if not tar:
if not os.access(location, os.W_OK):
raise StreamError("Checkout directory '{}' not writable"
.format(location))
if not force and os.listdir(location):
raise StreamError("Checkout directory '{}' not empty"
.format(location))
elif os.path.exists(location) and location != '-':
if not os.access(location, os.W_OK):
raise StreamError("Output file '{}' not writable"
.format(location))
if not force and os.path.exists(location):
raise StreamError("Output file '{}' already exists"
.format(location))
self._check_location_writable(location, force=force, tar=tar)
# Stage deps into a temporary sandbox first
try:
......@@ -443,49 +423,71 @@ class Stream():
raise StreamError("Error while staging dependencies into a sandbox"
": '{}'".format(e), detail=e.detail, reason=e.reason) from e
# source_checkout()
#
# Checkout sources of the target element to the specified location
#
# Args:
# target (str): The target element whose sources to checkout
# location (str): Location to checkout the sources to
# deps (str): The dependencies to checkout
# fetch (bool): Whether to fetch missing sources
# except_targets (list): List of targets to except from staging
#
def source_checkout(self, target, *,
location=None,
deps='none',
fetch=False,
except_targets=()):
self._check_location_writable(location)
elements, _ = self._load((target,), (),
selection=deps,
except_targets=except_targets,
fetch_subprojects=True)
# Assert all sources are cached
if fetch:
self._fetch(elements)
self._pipeline.assert_sources_cached(elements)
# Stage all sources determined by scope
try:
self._write_element_sources(location, elements)
except BstError as e:
raise StreamError("Error while writing sources"
": '{}'".format(e), detail=e.detail, reason=e.reason) from e
# workspace_open
#
# Open a project workspace
#
# Args:
# target (str): The target element to open the workspace for
# directory (str): The directory to stage the source in
# targets (list): List of target elements to open workspaces for
# no_checkout (bool): Whether to skip checking out the source
# track_first (bool): Whether to track and fetch first
# force (bool): Whether to ignore contents in an existing directory
# custom_dir (str): Custom location to create a workspace or false to use default location.
#
def workspace_open(self, target, directory, *,
def workspace_open(self, targets, *,
no_checkout,
track_first,
force):
force,
custom_dir):
# This function is a little funny but it is trying to be as atomic as possible.
if track_first:
track_targets = (target,)
track_targets = targets
else:
track_targets = ()
elements, track_elements = self._load((target,), track_targets,
elements, track_elements = self._load(targets, track_targets,
selection=PipelineSelection.REDIRECT,
track_selection=PipelineSelection.REDIRECT)
target = elements[0]
directory = os.path.abspath(directory)
if not list(target.sources()):
build_depends = [x.name for x in target.dependencies(Scope.BUILD, recurse=False)]
if not build_depends:
raise StreamError("The given element has no sources")
detail = "Try opening a workspace on one of its dependencies instead:\n"
detail += " \n".join(build_depends)
raise StreamError("The given element has no sources", detail=detail)
workspaces = self._context.get_workspaces()
# Check for workspace config
workspace = workspaces.get_workspace(target._get_full_name())
if workspace and not force:
raise StreamError("Workspace '{}' is already defined at: {}"
.format(target.name, workspace.get_absolute_path()))
# If we're going to checkout, we need at least a fetch,
# if we were asked to track first, we're going to fetch anyway.
#
......@@ -495,29 +497,88 @@ class Stream():
track_elements = elements
self._fetch(elements, track_elements=track_elements)
if not no_checkout and target._get_consistency() != Consistency.CACHED:
raise StreamError("Could not stage uncached source. " +
"Use `--track` to track and " +
"fetch the latest version of the " +
"source.")
if workspace:
workspaces.delete_workspace(target._get_full_name())
workspaces.save_config()
shutil.rmtree(directory)
try:
os.makedirs(directory, exist_ok=True)
except OSError as e:
raise StreamError("Failed to create workspace directory: {}".format(e)) from e
expanded_directories = []
# To try to be more atomic, loop through the elements and raise any errors we can early
for target in elements:
if not list(target.sources()):
build_depends = [x.name for x in target.dependencies(Scope.BUILD, recurse=False)]
if not build_depends:
raise StreamError("The element {} has no sources".format(target.name))
detail = "Try opening a workspace on one of its dependencies instead:\n"
detail += " \n".join(build_depends)
raise StreamError("The element {} has no sources".format(target.name), detail=detail)
# Check for workspace config
workspace = workspaces.get_workspace(target._get_full_name())
if workspace and not force:
raise StreamError("Element '{}' already has workspace defined at: {}"
.format(target.name, workspace.get_absolute_path()))
if not no_checkout and target._get_consistency() != Consistency.CACHED:
raise StreamError("Could not stage uncached source. For {} ".format(target.name) +
"Use `--track` to track and " +
"fetch the latest version of the " +
"source.")
if not custom_dir:
directory = os.path.abspath(os.path.join(self._context.workspacedir, target.name))
if directory[-4:] == '.bst':
directory = directory[:-4]
expanded_directories.append(directory)
if custom_dir:
if len(elements) != 1:
raise StreamError("Exactly one element can be given if --directory is used",
reason='directory-with-multiple-elements')
expanded_directories = [custom_dir, ]
else:
# If this fails it is a bug in what ever calls this, usually cli.py and so can not be tested for via the
# run bst test mechanism.
assert len(elements) == len(expanded_directories)
for target, directory in zip(elements, expanded_directories):
if os.path.exists(directory):
if not os.path.isdir(directory):
raise StreamError("For element '{}', Directory path is not a directory: {}"
.format(target.name, directory), reason='bad-directory')
if not (no_checkout or force) and os.listdir(directory):
raise StreamError("For element '{}', Directory path is not empty: {}"
.format(target.name, directory), reason='bad-directory')
# So far this function has tried to catch as many issues as possible with out making any changes
# Now it dose the bits that can not be made atomic.
targetGenerator = zip(elements, expanded_directories)
for target, directory in targetGenerator:
self._message(MessageType.INFO, "Creating workspace for element {}"
.format(target.name))
workspace = workspaces.get_workspace(target._get_full_name())
if workspace:
workspaces.delete_workspace(target._get_full_name())
workspaces.save_config()
shutil.rmtree(directory)
try:
os.makedirs(directory, exist_ok=True)
except OSError as e:
todo_elements = " ".join([str(target.name) for target, directory_dict in targetGenerator])
if todo_elements:
# This output should make creating the remaining workspaces as easy as possible.
todo_elements = "\nDid not try to create workspaces for " + todo_elements
raise StreamError("Failed to create workspace directory: {}".format(e) + todo_elements) from e
workspaces.create_workspace(target._get_full_name(), directory)
workspaces.create_workspace(target._get_full_name(), directory)
if not no_checkout:
with target.timed_activity("Staging sources to {}".format(directory)):
target._open_workspace()
if not no_checkout:
with target.timed_activity("Staging sources to {}".format(directory)):
target._open_workspace()
workspaces.save_config()
self._message(MessageType.INFO, "Saved workspace configuration")
# Saving the workspace once it is set up means that if the next workspace fails to be created before
# the configuration gets saved. The successfully created workspace still gets saved.
workspaces.save_config()
self._message(MessageType.INFO, "Created a workspace for element: {}"
.format(target._get_full_name()))
# workspace_close
#
......@@ -726,7 +787,7 @@ class Stream():
if self._write_element_script(source_directory, element)
]
self._write_element_sources(tempdir, elements)
self._write_element_sources(os.path.join(tempdir, "source"), elements)
self._write_build_script(tempdir, elements)
self._collect_sources(tempdir, tar_location,
target.normal_name, compression)
......@@ -1068,6 +1129,39 @@ class Stream():
self._enqueue_plan(fetch_plan)
self._run()
# _check_location_writable()
#
# Check if given location is writable.
#
# Args:
# location (str): Destination path
# force (bool): Allow files to be overwritten
# tar (bool): Whether destination is a tarball
#
# Raises:
# (StreamError): If the destination is not writable
#
def _check_location_writable(self, location, force=False, tar=False):
if not tar:
try:
os.makedirs(location, exist_ok=True)
except OSError as e:
raise StreamError("Failed to create destination directory: '{}'"
.format(e)) from e
if not os.access(location, os.W_OK):
raise StreamError("Destination directory '{}' not writable"
.format(location))
if not force and os.listdir(location):
raise StreamError("Destination directory '{}' not empty"
.format(location))
elif os.path.exists(location) and location != '-':
if not os.access(location, os.W_OK):
raise StreamError("Output file '{}' not writable"
.format(location))
if not force and os.path.exists(location):
raise StreamError("Output file '{}' already exists"
.format(location))
# Helper function for checkout()
#
def _checkout_hardlinks(self, sandbox_vroot, directory):
......@@ -1089,11 +1183,10 @@ class Stream():
# Write all source elements to the given directory
def _write_element_sources(self, directory, elements):
for element in elements:
source_dir = os.path.join(directory, "source")
element_source_dir = os.path.join(source_dir, element.normal_name)
os.makedirs(element_source_dir)
element._stage_sources_at(element_source_dir)
element_source_dir = self._get_element_dirname(directory, element)
if list(element.sources()):
os.makedirs(element_source_dir)
element._stage_sources_at(element_source_dir)
# Write a master build script to the sandbox
def _write_build_script(self, directory, elements):
......@@ -1122,3 +1215,25 @@ class Stream():
with tarfile.open(tar_name, permissions) as tar:
tar.add(directory, arcname=element_name)
# _get_element_dirname()
#
# Get path to directory for an element based on its normal name.
#
# For cross-junction elements, the path will be prefixed with the name
# of the junction element.
#
# Args:
# directory (str): path to base directory
# element (Element): the element
#
# Returns:
# (str): Path to directory for this element
#
def _get_element_dirname(self, directory, element):
parts = [element.normal_name]
while element._get_project() != self._project:
element = element._get_project().junction
parts.append(element.normal_name)
return os.path.join(directory, *reversed(parts))
......@@ -351,6 +351,7 @@ _sentinel = object()
# expected_type (type): The expected type for the value being searched
# key (str): The key to get a value for in node
# indices (list of ints): Optionally decend into lists of lists
# default_value: Optionally return this value if the key is not found
#
# Returns:
# The value if found in node, otherwise default_value is returned
......@@ -475,12 +476,18 @@ def node_get_project_path(node, key, project_dir, *,
is_inside = project_dir_path.resolve() in full_resolved_path.parents or (
full_resolved_path == project_dir_path)
if path.is_absolute() or not is_inside:
if not is_inside:
raise LoadError(LoadErrorReason.PROJ_PATH_INVALID,
"{}: Specified path '{}' must not lead outside of the "
"project directory"
.format(provenance, path_str))
if path.is_absolute():
raise LoadError(LoadErrorReason.PROJ_PATH_INVALID,
"{}: Absolute path: '{}' invalid.\n"
"Please specify a path relative to the project's root."
.format(provenance, path))
if full_resolved_path.is_socket() or (
full_resolved_path.is_fifo() or
full_resolved_path.is_block_device()):
......
......@@ -22,6 +22,9 @@ artifactdir: ${XDG_CACHE_HOME}/buildstream/artifacts
# Location to store build logs
logdir: ${XDG_CACHE_HOME}/buildstream/logs
# Default root location for workspaces, blank for no default set.
workspacedir: .
#
# Cache
#
......@@ -35,6 +38,9 @@ cache:
# to the isize of the file system containing the cache.
quota: infinity
# Whether to pull build trees when downloading element artifacts
pull-buildtrees: False
#
# Scheduler
#
......@@ -97,3 +103,35 @@ logging:
[%{elapsed}][%{key}][%{element}] %{action} %{message}
#
# Prompt overrides
#
# Here you can suppress 'are you sure?' and other kinds of prompts by supplying
# override values. Note that e.g. 'yes' and 'no' have the same meaning here as
# they do in the actual cli prompt.
#
prompt:
# Whether to create a project with 'bst init' if we are invoked outside of a
# directory where we can resolve the project.
#
# ask - Prompt the user to choose.
# no - Never create the project.
#
auto-init: ask
# Whether to really proceed with 'bst workspace close --remove-dir' removing
# a workspace directory, potentially losing changes.
#
# ask - Ask the user if they are sure.
# yes - Always remove, without asking.
#
really-workspace-close-remove-dir: ask
# Whether to really proceed with 'bst workspace reset' doing a hard reset of
# a workspace, potentially losing changes.
#
# ask - Ask the user if they are sure.
# yes - Always hard reset, without asking.
#
really-workspace-reset-hard: ask
......@@ -85,7 +85,8 @@ import shutil
from . import _yaml
from ._variables import Variables
from ._versions import BST_CORE_ARTIFACT_VERSION
from ._exceptions import BstError, LoadError, LoadErrorReason, ImplError, ErrorDomain
from ._exceptions import BstError, LoadError, LoadErrorReason, ImplError, \
ErrorDomain
from .utils import UtilError
from . import Plugin, Consistency, Scope
from . import SandboxFlags
......@@ -1397,12 +1398,12 @@ class Element(Plugin):
with self.timed_activity("Staging local files at {}"
.format(workspace.get_absolute_path())):
workspace.stage(temp_staging_directory)
elif self._cached():
# We have a cached buildtree to use, instead
# Check if we have a cached buildtree to use
elif self.__cached_buildtree():
artifact_base, _ = self.__extract()
import_dir = os.path.join(artifact_base, 'buildtree')
else:
# No workspace, stage directly
# No workspace or cached buildtree, stage source directly
for source in self.sources():
source._stage(temp_staging_directory)
......@@ -1553,7 +1554,6 @@ class Element(Plugin):
self.__dynamic_public = _yaml.node_copy(self.__public)
# Call the abstract plugin methods
collect = None
try:
# Step 1 - Configure
self.configure_sandbox(sandbox)
......@@ -1564,7 +1564,7 @@ class Element(Plugin):
# Step 4 - Assemble
collect = self.assemble(sandbox) # pylint: disable=assignment-from-no-return
self.__set_build_result(success=True, description="succeeded")
except BstError as e:
except ElementError as e:
# Shelling into a sandbox is useful to debug this error
e.sandbox = True
......@@ -1586,104 +1586,105 @@ class Element(Plugin):
self.warn("Failed to preserve workspace state for failed build sysroot: {}"
.format(e))
if isinstance(e, ElementError):
collect = e.collect # pylint: disable=no-member
self.__set_build_result(success=False, description=str(e), detail=e.detail)
self._cache_artifact(rootdir, sandbox, e.collect)
raise
else:
return self._cache_artifact(rootdir, sandbox, collect)
finally:
if collect is not None:
try:
sandbox_vroot = sandbox.get_virtual_directory()
collectvdir = sandbox_vroot.descend(collect.lstrip(os.sep).split(os.sep))
except VirtualDirectoryError:
# No collect directory existed
collectvdir = None
# Create artifact directory structure
assembledir = os.path.join(rootdir, 'artifact')
filesdir = os.path.join(assembledir, 'files')
logsdir = os.path.join(assembledir, 'logs')
metadir = os.path.join(assembledir, 'meta')
buildtreedir = os.path.join(assembledir, 'buildtree')
os.mkdir(assembledir)
if collect is not None and collectvdir is not None:
os.mkdir(filesdir)
os.mkdir(logsdir)
os.mkdir(metadir)
os.mkdir(buildtreedir)
# Hard link files from collect dir to files directory
if collect is not None and collectvdir is not None:
collectvdir.export_files(filesdir, can_link=True)
try:
sandbox_vroot = sandbox.get_virtual_directory()
sandbox_build_dir = sandbox_vroot.descend(
self.get_variable('build-root').lstrip(os.sep).split(os.sep))
# Hard link files from build-root dir to buildtreedir directory
sandbox_build_dir.export_files(buildtreedir)
except VirtualDirectoryError:
# Directory could not be found. Pre-virtual
# directory behaviour was to continue silently
# if the directory could not be found.
pass
# Copy build log
log_filename = context.get_log_filename()
self._build_log_path = os.path.join(logsdir, 'build.log')
if log_filename:
shutil.copyfile(log_filename, self._build_log_path)
# Store public data
_yaml.dump(_yaml.node_sanitize(self.__dynamic_public), os.path.join(metadir, 'public.yaml'))
# Store result
build_result_dict = {"success": self.__build_result[0], "description": self.__build_result[1]}
if self.__build_result[2] is not None:
build_result_dict["detail"] = self.__build_result[2]
_yaml.dump(build_result_dict, os.path.join(metadir, 'build-result.yaml'))
# ensure we have cache keys
self._assemble_done()
# Store keys.yaml
_yaml.dump(_yaml.node_sanitize({
'strong': self._get_cache_key(),
'weak': self._get_cache_key(_KeyStrength.WEAK),
}), os.path.join(metadir, 'keys.yaml'))
# Store dependencies.yaml
_yaml.dump(_yaml.node_sanitize({
e.name: e._get_cache_key() for e in self.dependencies(Scope.BUILD)
}), os.path.join(metadir, 'dependencies.yaml'))
# Store workspaced.yaml
_yaml.dump(_yaml.node_sanitize({
'workspaced': True if self._get_workspace() else False
}), os.path.join(metadir, 'workspaced.yaml'))
# Store workspaced-dependencies.yaml
_yaml.dump(_yaml.node_sanitize({
'workspaced-dependencies': [
e.name for e in self.dependencies(Scope.BUILD)
if e._get_workspace()
]
}), os.path.join(metadir, 'workspaced-dependencies.yaml'))
with self.timed_activity("Caching artifact"):
artifact_size = utils._get_dir_size(assembledir)
self.__artifacts.commit(self, assembledir, self.__get_cache_keys_for_commit())
if collect is not None and collectvdir is None:
raise ElementError(
"Directory '{}' was not found inside the sandbox, "
"unable to collect artifact contents"
.format(collect))
# Finally cleanup the build dir
cleanup_rootdir()
def _cache_artifact(self, rootdir, sandbox, collect):
if collect is not None:
try:
sandbox_vroot = sandbox.get_virtual_directory()
collectvdir = sandbox_vroot.descend(collect.lstrip(os.sep).split(os.sep))
except VirtualDirectoryError:
# No collect directory existed
collectvdir = None
# Create artifact directory structure
assembledir = os.path.join(rootdir, 'artifact')
filesdir = os.path.join(assembledir, 'files')
logsdir = os.path.join(assembledir, 'logs')
metadir = os.path.join(assembledir, 'meta')
buildtreedir = os.path.join(assembledir, 'buildtree')
os.mkdir(assembledir)
if collect is not None and collectvdir is not None:
os.mkdir(filesdir)
os.mkdir(logsdir)
os.mkdir(metadir)
os.mkdir(buildtreedir)
# Hard link files from collect dir to files directory
if collect is not None and collectvdir is not None:
collectvdir.export_files(filesdir, can_link=True)
try:
sandbox_vroot = sandbox.get_virtual_directory()
sandbox_build_dir = sandbox_vroot.descend(
self.get_variable('build-root').lstrip(os.sep).split(os.sep))
# Hard link files from build-root dir to buildtreedir directory
sandbox_build_dir.export_files(buildtreedir)
except VirtualDirectoryError:
# Directory could not be found. Pre-virtual
# directory behaviour was to continue silently
# if the directory could not be found.
pass
# Copy build log
log_filename = self._get_context().get_log_filename()
self._build_log_path = os.path.join(logsdir, 'build.log')
if log_filename:
shutil.copyfile(log_filename, self._build_log_path)
# Store public data
_yaml.dump(_yaml.node_sanitize(self.__dynamic_public), os.path.join(metadir, 'public.yaml'))
# Store result
build_result_dict = {"success": self.__build_result[0], "description": self.__build_result[1]}
if self.__build_result[2] is not None:
build_result_dict["detail"] = self.__build_result[2]
_yaml.dump(build_result_dict, os.path.join(metadir, 'build-result.yaml'))
# ensure we have cache keys
self._assemble_done()
# Store keys.yaml
_yaml.dump(_yaml.node_sanitize({
'strong': self._get_cache_key(),
'weak': self._get_cache_key(_KeyStrength.WEAK),
}), os.path.join(metadir, 'keys.yaml'))
# Store dependencies.yaml
_yaml.dump(_yaml.node_sanitize({
e.name: e._get_cache_key() for e in self.dependencies(Scope.BUILD)
}), os.path.join(metadir, 'dependencies.yaml'))
# Store workspaced.yaml
_yaml.dump(_yaml.node_sanitize({
'workspaced': True if self._get_workspace() else False
}), os.path.join(metadir, 'workspaced.yaml'))
# Store workspaced-dependencies.yaml
_yaml.dump(_yaml.node_sanitize({
'workspaced-dependencies': [
e.name for e in self.dependencies(Scope.BUILD)
if e._get_workspace()
]
}), os.path.join(metadir, 'workspaced-dependencies.yaml'))
with self.timed_activity("Caching artifact"):
artifact_size = utils._get_dir_size(assembledir)
self.__artifacts.commit(self, assembledir, self.__get_cache_keys_for_commit())
if collect is not None and collectvdir is None:
raise ElementError(
"Directory '{}' was not found inside the sandbox, "
"unable to collect artifact contents"
.format(collect))
return artifact_size
def _get_build_log(self):
......@@ -1691,7 +1692,9 @@ class Element(Plugin):
# _pull_pending()
#
# Check whether the artifact will be pulled.
# Check whether the artifact will be pulled. If the pull operation is to
# include a specific subdir of the element artifact (from cli or user conf)
# then the local cache is queried for the subdirs existence.
#
# Returns:
# (bool): Whether a pull operation is pending
......@@ -1701,8 +1704,15 @@ class Element(Plugin):
# Workspace builds are never pushed to artifact servers
return False
if self.__strong_cached:
# Artifact already in local cache
# Check whether the pull has been invoked with a specific subdir requested
# in user context, as to complete a partial artifact
subdir, _ = self.__pull_directories()
if self.__strong_cached and subdir:
# If we've specified a subdir, check if the subdir is cached locally
if self.__artifacts.contains_subdir_artifact(self, self.__strict_cache_key, subdir):
return False
elif self.__strong_cached:
return False
# Pull is pending if artifact remote server available
......@@ -1724,33 +1734,6 @@ class Element(Plugin):
self._update_state()
def _pull_strong(self, *, progress=None):
weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
key = self.__strict_cache_key
if not self.__artifacts.pull(self, key, progress=progress):
return False
# update weak ref by pointing it to this newly fetched artifact
self.__artifacts.link_key(self, key, weak_key)
return True
def _pull_weak(self, *, progress=None):
weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
if not self.__artifacts.pull(self, weak_key, progress=progress):
return False
# extract strong cache key from this newly fetched artifact
self._pull_done()
# create tag for strong cache key
key = self._get_cache_key(strength=_KeyStrength.STRONG)
self.__artifacts.link_key(self, weak_key, key)
return True
# _pull():
#
# Pull artifact from remote artifact repository into local artifact cache.
......@@ -1763,11 +1746,15 @@ class Element(Plugin):
def progress(percent, message):
self.status(message)
# Get optional specific subdir to pull and optional list to not pull
# based off of user context
subdir, excluded_subdirs = self.__pull_directories()
# Attempt to pull artifact without knowing whether it's available
pulled = self._pull_strong(progress=progress)
pulled = self.__pull_strong(progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs)
if not pulled and not self._cached() and not context.get_strict():
pulled = self._pull_weak(progress=progress)
pulled = self.__pull_weak(progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs)
if not pulled:
return False
......@@ -1787,10 +1774,12 @@ class Element(Plugin):
# No push remotes for this element's project
return True
if not self._cached():
# Do not push elements that aren't cached, or that are cached with a dangling buildtree
# artifact unless element type is expected to have an an empty buildtree directory
if not self.__cached_buildtree():
return True
# Do not push tained artifact
# Do not push tainted artifact
if self.__get_tainted():
return True
......@@ -2674,6 +2663,106 @@ class Element(Plugin):
return utils._deduplicate(keys)
# __pull_strong():
#
# Attempt pulling given element from configured artifact caches with
# the strict cache key
#
# Args:
# progress (callable): The progress callback, if any
# subdir (str): The optional specific subdir to pull
# excluded_subdirs (list): The optional list of subdirs to not pull
#
# Returns:
# (bool): Whether or not the pull was successful
#
def __pull_strong(self, *, progress=None, subdir=None, excluded_subdirs=None):
weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
key = self.__strict_cache_key
if not self.__artifacts.pull(self, key, progress=progress, subdir=subdir,
excluded_subdirs=excluded_subdirs):
return False
# update weak ref by pointing it to this newly fetched artifact
self.__artifacts.link_key(self, key, weak_key)
return True
# __pull_weak():
#
# Attempt pulling given element from configured artifact caches with
# the weak cache key
#
# Args:
# progress (callable): The progress callback, if any
# subdir (str): The optional specific subdir to pull
# excluded_subdirs (list): The optional list of subdirs to not pull
#
# Returns:
# (bool): Whether or not the pull was successful
#
def __pull_weak(self, *, progress=None, subdir=None, excluded_subdirs=None):
weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
if not self.__artifacts.pull(self, weak_key, progress=progress, subdir=subdir,
excluded_subdirs=excluded_subdirs):
return False
# extract strong cache key from this newly fetched artifact
self._pull_done()
# create tag for strong cache key
key = self._get_cache_key(strength=_KeyStrength.STRONG)
self.__artifacts.link_key(self, weak_key, key)
return True
# __cached_buildtree():
#
# Check if cached element artifact contains expected buildtree
#
# Returns:
# (bool): True if artifact cached with buildtree, False if
# element not cached or missing expected buildtree
#
def __cached_buildtree(self):
context = self._get_context()
if not self._cached():
return False
elif context.get_strict():
if not self.__artifacts.contains_subdir_artifact(self, self.__strict_cache_key, 'buildtree'):
return False
elif not self.__artifacts.contains_subdir_artifact(self, self.__weak_cache_key, 'buildtree'):
return False
return True
# __pull_directories():
#
# Which directories to include or exclude given the current
# context
#
# Returns:
# subdir (str): The optional specific subdir to include, based
# on user context
# excluded_subdirs (list): The optional list of subdirs to not
# pull, referenced against subdir value
#
def __pull_directories(self):
context = self._get_context()
# Current default exclusions on pull
excluded_subdirs = ["buildtree"]
subdir = ''
# If buildtrees are to be pulled, remove the value from exclusion list
# and set specific subdir
if context.pull_buildtrees:
subdir = "buildtree"
excluded_subdirs.remove(subdir)
return (subdir, excluded_subdirs)
def _overlap_error_detail(f, forbidden_overlap_elements, elements):
if forbidden_overlap_elements:
......
......@@ -19,7 +19,7 @@ variables:
cmake-args: |
-DCMAKE_INSTALL_PREFIX:PATH="%{prefix}" \
-DCMAKE_INSTALL_LIBDIR=%{lib} %{cmake-extra} %{cmake-global} %{cmake-local}
-DCMAKE_INSTALL_LIBDIR:PATH="%{lib}" %{cmake-extra} %{cmake-global} %{cmake-local}
cmake: |
......
......@@ -86,7 +86,6 @@ This plugin also utilises the following configurable core plugin warnings:
"""
import os
import errno
import re
import shutil
from collections.abc import Mapping
......@@ -97,6 +96,7 @@ from configparser import RawConfigParser
from buildstream import Source, SourceError, Consistency, SourceFetcher
from buildstream import utils
from buildstream.plugin import CoreWarnings
from buildstream.utils import move_atomic, DirectoryExistsError
GIT_MODULES = '.gitmodules'
......@@ -141,21 +141,16 @@ class GitMirror(SourceFetcher):
fail="Failed to clone git repository {}".format(url),
fail_temporarily=True)
# Attempt atomic rename into destination, this will fail if
# another process beat us to the punch
try:
os.rename(tmpdir, self.mirror)
move_atomic(tmpdir, self.mirror)
except DirectoryExistsError:
# Another process was quicker to download this repository.
# Let's discard our own
self.source.status("{}: Discarding duplicate clone of {}"
.format(self.source, url))
except OSError as e:
# When renaming and the destination repo already exists, os.rename()
# will fail with ENOTEMPTY, since an empty directory will be silently
# replaced
if e.errno == errno.ENOTEMPTY:
self.source.status("{}: Discarding duplicate clone of {}"
.format(self.source, url))
else:
raise SourceError("{}: Failed to move cloned git repository {} from '{}' to '{}': {}"
.format(self.source, url, tmpdir, self.mirror, e)) from e
raise SourceError("{}: Failed to move cloned git repository {} from '{}' to '{}': {}"
.format(self.source, url, tmpdir, self.mirror, e)) from e
def _fetch(self, alias_override=None):
url = self.source.translate_url(self.url,
......
......@@ -68,7 +68,6 @@ details on common configuration options for sources.
The ``pip`` plugin is available since :ref:`format version 16 <project_format_version>`
"""
import errno
import hashlib
import os
import re
......@@ -80,6 +79,7 @@ _PYPI_INDEX_URL = 'https://pypi.org/simple/'
# Used only for finding pip command
_PYTHON_VERSIONS = [
'python', # when running in a venv, we might not have the exact version
'python2.7',
'python3.0',
'python3.1',
......@@ -192,13 +192,14 @@ class PipSource(Source):
# process has fetched the sources before us and ensure that we do
# not raise an error in that case.
try:
os.makedirs(self._mirror)
os.rename(package_dir, self._mirror)
except FileExistsError:
return
utils.move_atomic(package_dir, self._mirror)
except utils.DirectoryExistsError:
# Another process has beaten us and has fetched the sources
# before us.
pass
except OSError as e:
if e.errno != errno.ENOTEMPTY:
raise
raise SourceError("{}: Failed to move downloaded pip packages from '{}' to '{}': {}"
.format(self, package_dir, self._mirror, e)) from e
def stage(self, directory):
with self.timed_activity("Staging Python packages", silent_nested=True):
......
......@@ -20,15 +20,18 @@
import os
from urllib.parse import urlparse
from functools import partial
import grpc
from . import Sandbox
from ..storage._filebaseddirectory import FileBasedDirectory
from ..storage._casbaseddirectory import CasBasedDirectory
from .. import _signals
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from .._protos.google.rpc import code_pb2
from .._exceptions import SandboxError
from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
# SandboxRemote()
......@@ -51,6 +54,7 @@ class SandboxRemote(Sandbox):
"Only plain HTTP is currenlty supported (no HTTPS).")
self.server_url = '{}:{}'.format(url.hostname, url.port)
self.operation_name = None
def run_remote_command(self, command, input_root_digest, working_directory, environment):
# Sends an execution request to the remote execution server.
......@@ -102,10 +106,13 @@ class SandboxRemote(Sandbox):
operation_iterator = stub.WaitExecution(request)
for operation in operation_iterator:
if not self.operation_name:
self.operation_name = operation.name
if operation.done:
return operation
else:
last_operation = operation
except grpc.RpcError as e:
status_code = e.code()
if status_code == grpc.StatusCode.UNAVAILABLE:
......@@ -125,19 +132,39 @@ class SandboxRemote(Sandbox):
return last_operation
# Set up signal handler to trigger cancel_operation on SIGTERM
operation = None
with self._get_context().timed_activity("Waiting for the remote build to complete"):
with self._get_context().timed_activity("Waiting for the remote build to complete"), \
_signals.terminator(partial(self.cancel_operation, channel)):
operation = __run_remote_command(stub, execute_request=request)
if operation is None:
return None
elif operation.done:
return operation
while operation is not None and not operation.done:
operation = __run_remote_command(stub, running_operation=operation)
return operation
def cancel_operation(self, channel):
# If we don't have the name can't send request.
if self.operation_name is None:
return
stub = operations_pb2_grpc.OperationsStub(channel)
request = operations_pb2.CancelOperationRequest(
name=str(self.operation_name))
try:
stub.CancelOperation(request)
except grpc.RpcError as e:
if (e.code() == grpc.StatusCode.UNIMPLEMENTED or
e.code() == grpc.StatusCode.INVALID_ARGUMENT):
pass
else:
raise SandboxError("Failed trying to send CancelOperation request: "
"{} ({})".format(e.details(), e.code().name))
def process_job_output(self, output_directories, output_files):
# Reads the remote execution server response to an execution request.
#
......@@ -182,7 +209,7 @@ class SandboxRemote(Sandbox):
# to replace the sandbox's virtual directory with that. Creating a new virtual directory object
# from another hash will be interesting, though...
new_dir = CasBasedDirectory(self._get_context(), ref=dir_digest)
new_dir = CasBasedDirectory(self._get_context().artifactcache.cas, ref=dir_digest)
self._set_virtual_directory(new_dir)
def run(self, command, flags, *, cwd=None, env=None):
......@@ -191,7 +218,7 @@ class SandboxRemote(Sandbox):
if isinstance(upload_vdir, FileBasedDirectory):
# Make a new temporary directory to put source in
upload_vdir = CasBasedDirectory(self._get_context(), ref=None)
upload_vdir = CasBasedDirectory(self._get_context().artifactcache.cas, ref=None)
upload_vdir.import_files(self.get_virtual_directory()._get_underlying_directory())
upload_vdir.recalculate_hash()
......
......@@ -156,7 +156,7 @@ class Sandbox():
"""
if self._vdir is None or self._never_cache_vdirs:
if 'BST_CAS_DIRECTORIES' in os.environ:
self._vdir = CasBasedDirectory(self.__context, ref=None)
self._vdir = CasBasedDirectory(self.__context.artifactcache.cas, ref=None)
else:
self._vdir = FileBasedDirectory(self._root)
return self._vdir
......
......@@ -249,13 +249,11 @@ class CasBasedDirectory(Directory):
_pb2_path_sep = "/"
_pb2_absolute_path_prefix = "/"
def __init__(self, context, ref=None, parent=None, common_name="untitled", filename=None):
self.context = context
self.cas_directory = os.path.join(context.artifactdir, 'cas')
def __init__(self, cas_cache, ref=None, parent=None, common_name="untitled", filename=None):
self.filename = filename
self.common_name = common_name
self.pb2_directory = remote_execution_pb2.Directory()
self.cas_cache = context.artifactcache.cas
self.cas_cache = cas_cache
if ref:
with open(self.cas_cache.objpath(ref), 'rb') as f:
self.pb2_directory.ParseFromString(f.read())
......@@ -270,7 +268,7 @@ class CasBasedDirectory(Directory):
if self._directory_read:
return
for entry in self.pb2_directory.directories:
buildStreamDirectory = CasBasedDirectory(self.context, ref=entry.digest,
buildStreamDirectory = CasBasedDirectory(self.cas_cache, ref=entry.digest,
parent=self, filename=entry.name)
self.index[entry.name] = IndexEntry(entry, buildstream_object=buildStreamDirectory)
for entry in self.pb2_directory.files:
......@@ -333,7 +331,7 @@ class CasBasedDirectory(Directory):
.format(name, str(self), type(newdir)))
dirnode = self._find_pb2_entry(name)
else:
newdir = CasBasedDirectory(self.context, parent=self, filename=name)
newdir = CasBasedDirectory(self.cas_cache, parent=self, filename=name)
dirnode = self.pb2_directory.directories.add()
dirnode.name = name
......
......@@ -72,6 +72,11 @@ class ProgramNotFoundError(BstError):
super().__init__(message, domain=ErrorDomain.PROG_NOT_FOUND, reason=reason)
class DirectoryExistsError(OSError):
"""Raised when a `os.rename` is attempted but the destination is an existing directory.
"""
class FileListResult():
"""An object which stores the result of one of the operations
which run on a list of files.
......@@ -500,6 +505,44 @@ def get_bst_version():
.format(__version__))
def move_atomic(source, destination, *, ensure_parents=True):
"""Move the source to the destination using atomic primitives.
This uses `os.rename` to move a file or directory to a new destination.
It wraps some `OSError` thrown errors to ensure their handling is correct.
The main reason for this to exist is that rename can throw different errors
for the same symptom (https://www.unix.com/man-page/POSIX/3posix/rename/)
when we are moving a directory.
We are especially interested here in the case when the destination already
exists, is a directory and is not empty. In this case, either EEXIST or
ENOTEMPTY can be thrown.
In order to ensure consistent handling of these exceptions, this function
should be used instead of `os.rename`
Args:
source (str or Path): source to rename
destination (str or Path): destination to which to move the source
ensure_parents (bool): Whether or not to create the parent's directories
of the destination (default: True)
Raises:
DirectoryExistsError: if the destination directory already exists and is
not empty
OSError: if another filesystem level error occured
"""
if ensure_parents:
os.makedirs(os.path.dirname(str(destination)), exist_ok=True)
try:
os.rename(str(source), str(destination))
except OSError as exc:
if exc.errno in (errno.EEXIST, errno.ENOTEMPTY):
raise DirectoryExistsError(*exc.args) from exc
raise
@contextmanager
def save_file_atomic(filename, mode='w', *, buffering=-1, encoding=None,
errors=None, newline=None, closefd=True, opener=None, tempdir=None):
......