Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • willsalmon/buildstream
  • CumHoleZH/buildstream
  • tchaik/buildstream
  • DCotyPortfolio/buildstream
  • jesusoctavioas/buildstream
  • patrickmmartin/buildstream
  • franred/buildstream
  • tintou/buildstream
  • alatiera/buildstream
  • martinblanchard/buildstream
  • neverdie22042524/buildstream
  • Mattlk13/buildstream
  • PServers/buildstream
  • phamnghia610909/buildstream
  • chiaratolentino/buildstream
  • eysz7-x-x/buildstream
  • kerrick1/buildstream
  • matthew-yates/buildstream
  • twofeathers/buildstream
  • mhadjimichael/buildstream
  • pointswaves/buildstream
  • Mr.JackWilson/buildstream
  • Tw3akG33k/buildstream
  • AlexFazakas/buildstream
  • eruidfkiy/buildstream
  • clamotion2/buildstream
  • nanonyme/buildstream
  • wickyjaaa/buildstream
  • nmanchev/buildstream
  • bojorquez.ja/buildstream
  • mostynb/buildstream
  • highpit74/buildstream
  • Demo112/buildstream
  • ba2014sheer/buildstream
  • tonimadrino/buildstream
  • usuario2o/buildstream
  • Angelika123456/buildstream
  • neo355/buildstream
  • corentin-ferlay/buildstream
  • coldtom/buildstream
  • wifitvbox81/buildstream
  • 358253885/buildstream
  • seanborg/buildstream
  • SotK/buildstream
  • DouglasWinship/buildstream
  • karansthr97/buildstream
  • louib/buildstream
  • bwh-ct/buildstream
  • robjh/buildstream
  • we88c0de/buildstream
  • zhengxian5555/buildstream
51 results
Show changes
Commits on Source (135)
Showing
with 1731 additions and 1994 deletions
......@@ -6,6 +6,8 @@ include =
omit =
# Omit profiling helper module
*/buildstream/_profile.py
# Omit generated code
*/buildstream/_protos/*
*/.eggs/*
[report]
......
image: buildstream/testsuite-debian:8-master-57-be5a863
image: buildstream/testsuite-debian:9-master-102-9067e269
cache:
key: "$CI_JOB_NAME-"
......@@ -88,22 +88,19 @@ source_dist:
paths:
- coverage-linux/
tests-debian-8:
<<: *linux-tests
tests-debian-9:
image: buildstream/buildstream-debian:master-81-caa5241
image: buildstream/testsuite-debian:9-master-102-9067e269
<<: *linux-tests
tests-fedora-27:
image: buildstream/buildstream-fedora:master-56-5d7ee17
image: buildstream/testsuite-fedora:27-master-102-9067e269
<<: *linux-tests
tests-unix:
# Use fedora here, to a) run a test on fedora and b) ensure that we
# can get rid of ostree - this is not possible with debian-8
image: buildstream/buildstream-fedora:master-56-5d7ee17
image: buildstream/testsuite-fedora:27-master-102-9067e269
stage: test
variables:
BST_FORCE_BACKEND: "unix"
......@@ -223,7 +220,6 @@ coverage:
- coverage combine --rcfile=../.coveragerc -a coverage.*
- coverage report --rcfile=../.coveragerc -m
dependencies:
- tests-debian-8
- tests-debian-9
- tests-fedora-27
- tests-unix
......
......@@ -11,7 +11,7 @@ ignore=CVS,tests,doc
# Add files or directories matching the regex patterns to the blacklist. The
# regex matches against base names, not paths.
ignore-patterns=
ignore-patterns=.*_pb2.py,.*_pb2_grpc.py
# Python code to execute, usually for sys.path manipulation such as
# pygtk.require().
......@@ -190,7 +190,7 @@ ignored-classes=optparse.Values,thread._local,_thread._local,contextlib.closing,
# (useful for modules/projects where namespaces are manipulated during runtime
# and thus existing member attributes cannot be deduced by static analysis. It
# supports qualified module names, as well as Unix pattern matching.
ignored-modules=pkg_resources,gi.repository
ignored-modules=pkg_resources,gi.repository,grpc,buildstream._protos.*
# Show a hint with possible names when a member name was not found. The aspect
# of finding the hint is based on edit distance.
......
......@@ -23,26 +23,31 @@ a reasonable timeframe for identifying these.
Patch submissions
-----------------
Branches must be submitted as merge requests in gitlab and should usually
be associated to an issue report on gitlab.
Branches must be submitted as merge requests in gitlab. If the branch
fixes an issue or is related to any issues, these issues must be mentioned
in the merge request or preferably the commit messages themselves.
Commits in the branch which address specific issues must specify the
issue number in the commit message.
Branch names for merge requests should be prefixed with the submitter's
name or nickname, e.g. ``username/implement-flying-ponies``.
Merge requests that are not yet ready for review must be prefixed with the
``WIP:`` identifier. A merge request is not ready for review until the
submitter expects that the patch is ready to actually land.
You may open merge requests for the branches you create before you
are ready to have them reviewed upstream, as long as your merge request
is not yet ready for review then it must be prefixed with the ``WIP:``
identifier.
Submitted branches must not contain a history of the work done in the
feature branch. Please use git's interactive rebase feature in order to
compose a clean patch series suitable for submission.
We prefer that test case and documentation changes be submitted
in separate commits from the code changes which they test.
We prefer that documentation changes be submitted in separate commits from
the code changes which they document, and new test cases are also preferred
in separate commits.
Ideally every commit in the history of master passes its test cases. This
makes bisections more easy to perform, but is not always practical with
more complex branches.
If a commit in your branch modifies behavior such that a test must also
be changed to match the new behavior, then the tests should be updated
with the same commit. Ideally every commit in the history of master passes
its test cases, this makes bisections more easy to perform, but is not
always practical with more complex branches.
Commit messages
......@@ -54,9 +59,6 @@ the change.
The summary line must start with what changed, followed by a colon and
a very brief description of the change.
If there is an associated issue, it **must** be mentioned somewhere
in the commit message.
**Example**::
element.py: Added the frobnicator so that foos are properly frobbed.
......@@ -65,8 +67,6 @@ in the commit message.
the element. Elements that are not properly frobnicated raise
an error to inform the user of invalid frobnication rules.
This fixes issue #123
Coding style
------------
......@@ -294,7 +294,7 @@ committed with that.
To do this, first ensure you have ``click_man`` installed, possibly
with::
pip install --user click_man
pip3 install --user click_man
Then, in the toplevel directory of buildstream, run the following::
......@@ -409,6 +409,28 @@ regenerate them locally in order to build the docs.
command: build hello.bst
Protocol Buffers
----------------
BuildStream uses protobuf and gRPC for serialization and communication with
artifact cache servers. This requires ``.proto`` files and Python code
generated from the ``.proto`` files using protoc. All these files live in the
``buildstream/_protos`` directory. The generated files are included in the
git repository to avoid depending on grpcio-tools for user installations.
Regenerating code
~~~~~~~~~~~~~~~~~
When ``.proto`` files are modified, the corresponding Python code needs to
be regenerated. As a prerequisite for code generation you need to install
``grpcio-tools`` using pip or some other mechanism::
pip3 install --user grpcio-tools
To actually regenerate the code::
./setup.py build_grpc
Testing BuildStream
-------------------
BuildStream uses pytest for regression tests and testing out
......@@ -428,7 +450,7 @@ To run the tests, just type::
At the toplevel.
When debugging a test, it can be desirable to see the stdout
and stderr generated by a test, to do this use the --addopts
and stderr generated by a test, to do this use the ``--addopts``
function to feed arguments to pytest as such::
./setup.py test --addopts -s
......@@ -508,7 +530,7 @@ tool.
Python provides `cProfile <https://docs.python.org/3/library/profile.html>`_
which gives you a list of all functions called during execution and how much
time was spent in each function. Here is an example of running `bst --help`
time was spent in each function. Here is an example of running ``bst --help``
under cProfile:
python3 -m cProfile -o bst.cprofile -- $(which bst) --help
......
......@@ -18,3 +18,6 @@ recursive-include tests *.bst
recursive-include tests *.conf
recursive-include tests *.sh
recursive-include tests *.expected
# Protocol Buffers
recursive-include buildstream/_protos *.proto
=================
buildstream 1.3.1
=================
o Add a `--tar` option to `bst checkout` which allows a tarball to be
created from the artifact contents.
=================
buildstream 1.1.4
=================
......@@ -9,6 +16,16 @@ buildstream 1.1.4
o Added new simple `make` element
o Switch to Remote Execution CAS-based artifact cache on all platforms.
Artifact servers need to be migrated.
o BuildStream now requires python version >= 3.5
o BuildStream will now automatically clean up old artifacts when it
runs out of space. The exact behavior is configurable in the user's
buildstream.conf.
=================
buildstream 1.1.3
......
......@@ -25,7 +25,7 @@ BuildStream offers the following advantages:
* **Declarative build instructions/definitions**
BuildStream provides a a flexible and extensible framework for the modelling
BuildStream provides a flexible and extensible framework for the modelling
of software build pipelines in a declarative YAML format, which allows you to
manipulate filesystem data in a controlled, reproducible sandboxed environment.
......@@ -61,25 +61,29 @@ How does BuildStream work?
==========================
BuildStream operates on a set of YAML files (.bst files), as follows:
* loads the YAML files which describe the target(s) and all dependencies
* evaluates the version information and build instructions to calculate a build
* Loads the YAML files which describe the target(s) and all dependencies.
* Evaluates the version information and build instructions to calculate a build
graph for the target(s) and all dependencies and unique cache-keys for each
element
* retrieves elements from cache if they are already built, or builds them in a
sandboxed environment using the instructions declared in the .bst files
* transforms/configures and/or deploys the resulting target(s) based on the
element.
* Retrieves previously built elements (artifacts) from a local/remote cache, or
builds the elements in a sandboxed environment using the instructions declared
in the .bst files.
* Transforms/configures and/or deploys the resulting target(s) based on the
instructions declared in the .bst files.
How can I get started?
======================
The easiest way to get started is to explore some existing .bst files, for example:
To start using BuildStream, first,
`install <https://buildstream.gitlab.io/buildstream/main_install.html>`_
BuildStream onto your machine and then follow our
`tutorial <https://buildstream.gitlab.io/buildstream/using_tutorial.html>`_.
We also recommend exploring some existing BuildStream projects:
* https://gitlab.gnome.org/GNOME/gnome-build-meta/
* https://gitlab.com/freedesktop-sdk/freedesktop-sdk
* https://gitlab.com/baserock/definitions
* https://gitlab.com/BuildStream/buildstream-examples/tree/master/build-x86image
* https://gitlab.com/BuildStream/buildstream-examples/tree/master/netsurf-flatpak
If you have any questions please ask on our `#buildstream <irc://irc.gnome.org/buildstream>`_ channel in `irc.gnome.org <irc://irc.gnome.org>`_
......@@ -21,7 +21,8 @@ import os
import string
from collections import Mapping, namedtuple
from .._exceptions import ImplError, LoadError, LoadErrorReason
from ..element_enums import KeyStrength
from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason
from .._message import Message, MessageType
from .. import utils
from .. import _yaml
......@@ -35,22 +36,38 @@ from .. import _yaml
# push (bool): Whether we should attempt to push artifacts to this cache,
# in addition to pulling from it.
#
class ArtifactCacheSpec(namedtuple('ArtifactCacheSpec', 'url push')):
class ArtifactCacheSpec(namedtuple('ArtifactCacheSpec', 'url push server_cert client_key client_cert')):
# _new_from_config_node
#
# Creates an ArtifactCacheSpec() from a YAML loaded node
#
@staticmethod
def _new_from_config_node(spec_node):
_yaml.node_validate(spec_node, ['url', 'push'])
def _new_from_config_node(spec_node, basedir=None):
_yaml.node_validate(spec_node, ['url', 'push', 'server-cert', 'client-key', 'client-cert'])
url = _yaml.node_get(spec_node, str, 'url')
push = _yaml.node_get(spec_node, bool, 'push', default_value=False)
if not url:
provenance = _yaml.node_get_provenance(spec_node)
raise LoadError(LoadErrorReason.INVALID_DATA,
"{}: empty artifact cache URL".format(provenance))
return ArtifactCacheSpec(url, push)
server_cert = _yaml.node_get(spec_node, str, 'server-cert', default_value=None)
if server_cert and basedir:
server_cert = os.path.join(basedir, server_cert)
client_key = _yaml.node_get(spec_node, str, 'client-key', default_value=None)
if client_key and basedir:
client_key = os.path.join(basedir, client_key)
client_cert = _yaml.node_get(spec_node, str, 'client-cert', default_value=None)
if client_cert and basedir:
client_cert = os.path.join(basedir, client_cert)
return ArtifactCacheSpec(url, push, server_cert, client_key, client_cert)
ArtifactCacheSpec.__new__.__defaults__ = (None, None, None)
# An ArtifactCache manages artifacts.
......@@ -61,11 +78,16 @@ class ArtifactCacheSpec(namedtuple('ArtifactCacheSpec', 'url push')):
class ArtifactCache():
def __init__(self, context):
self.context = context
self.required_artifacts = set()
self.extractdir = os.path.join(context.artifactdir, 'extract')
self.max_size = context.cache_quota
self.estimated_size = None
self.global_remote_specs = []
self.project_remote_specs = {}
self._local = False
self.cache_size = None
os.makedirs(context.artifactdir, exist_ok=True)
......@@ -138,6 +160,7 @@ class ArtifactCache():
#
# Args:
# config_node (dict): The config block, which may contain the 'artifacts' key
# basedir (str): The base directory for relative paths
#
# Returns:
# A list of ArtifactCacheSpec instances.
......@@ -146,15 +169,15 @@ class ArtifactCache():
# LoadError, if the config block contains invalid keys.
#
@staticmethod
def specs_from_config_node(config_node):
def specs_from_config_node(config_node, basedir=None):
cache_specs = []
artifacts = config_node.get('artifacts', [])
if isinstance(artifacts, Mapping):
cache_specs.append(ArtifactCacheSpec._new_from_config_node(artifacts))
cache_specs.append(ArtifactCacheSpec._new_from_config_node(artifacts, basedir))
elif isinstance(artifacts, list):
for spec_node in artifacts:
cache_specs.append(ArtifactCacheSpec._new_from_config_node(spec_node))
cache_specs.append(ArtifactCacheSpec._new_from_config_node(spec_node, basedir))
else:
provenance = _yaml.node_get_provenance(config_node, key='artifacts')
raise _yaml.LoadError(_yaml.LoadErrorReason.INVALID_DATA,
......@@ -162,10 +185,119 @@ class ArtifactCache():
(str(provenance)))
return cache_specs
# append_required_artifacts():
#
# Append to the list of elements whose artifacts are required for
# the current run. Artifacts whose elements are in this list will
# be locked by the artifact cache and not touched for the duration
# of the current pipeline.
#
# Args:
# elements (iterable): A set of elements to mark as required
#
def append_required_artifacts(self, elements):
# We lock both strong and weak keys - deleting one but not the
# other won't save space in most cases anyway, but would be a
# user inconvenience.
for element in elements:
strong_key = element._get_cache_key(strength=KeyStrength.STRONG)
weak_key = element._get_cache_key(strength=KeyStrength.WEAK)
for key in (strong_key, weak_key):
if key and key not in self.required_artifacts:
self.required_artifacts.add(key)
# We also update the usage times of any artifacts
# we will be using, which helps preventing a
# buildstream process that runs in parallel with
# this one from removing artifacts in-use.
try:
self.update_atime(key)
except ArtifactError:
pass
# clean():
#
# Clean the artifact cache as much as possible.
#
def clean(self):
artifacts = self.list_artifacts()
while self.calculate_cache_size() >= self.context.cache_quota - self.context.cache_lower_threshold:
try:
to_remove = artifacts.pop(0)
except IndexError:
# If too many artifacts are required, and we therefore
# can't remove them, we have to abort the build.
#
# FIXME: Asking the user what to do may be neater
default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'],
'buildstream.conf')
detail = ("There is not enough space to build the given element.\n"
"Please increase the cache-quota in {}."
.format(self.context.config_origin or default_conf))
if self.calculate_cache_size() > self.context.cache_quota:
raise ArtifactError("Cache too full. Aborting.",
detail=detail,
reason="cache-too-full")
else:
break
key = to_remove.rpartition('/')[2]
if key not in self.required_artifacts:
size = self.remove(to_remove)
if size:
self.cache_size -= size
# This should be O(1) if implemented correctly
return self.calculate_cache_size()
# get_approximate_cache_size()
#
# A cheap method that aims to serve as an upper limit on the
# artifact cache size.
#
# The cache size reported by this function will normally be larger
# than the real cache size, since it is calculated using the
# pre-commit artifact size, but for very small artifacts in
# certain caches additional overhead could cause this to be
# smaller than, but close to, the actual size.
#
# Nonetheless, in practice this should be safe to use as an upper
# limit on the cache size.
#
# If the cache has built-in constant-time size reporting, please
# feel free to override this method with a more accurate
# implementation.
#
# Returns:
# (int) An approximation of the artifact cache size.
#
def get_approximate_cache_size(self):
# If we don't currently have an estimate, figure out the real
# cache size.
if self.estimated_size is None:
self.estimated_size = self.calculate_cache_size()
return self.estimated_size
################################################
# Abstract methods for subclasses to implement #
################################################
# update_atime()
#
# Update the atime of an artifact.
#
# Args:
# key (str): The key of the artifact.
#
def update_atime(self, key):
raise ImplError("Cache '{kind}' does not implement contains()"
.format(kind=type(self).__name__))
# initialize_remotes():
#
# This will contact each remote cache.
......@@ -191,6 +323,32 @@ class ArtifactCache():
raise ImplError("Cache '{kind}' does not implement contains()"
.format(kind=type(self).__name__))
# list_artifacts():
#
# List artifacts in this cache in LRU order.
#
# Returns:
# ([str]) - A list of artifact names as generated by
# `ArtifactCache.get_artifact_fullname` in LRU order
#
def list_artifacts(self):
raise ImplError("Cache '{kind}' does not implement list_artifacts()"
.format(kind=type(self).__name__))
# remove():
#
# Removes the artifact for the specified ref from the local
# artifact cache.
#
# Args:
# ref (artifact_name): The name of the artifact to remove (as
# generated by
# `ArtifactCache.get_artifact_fullname`)
#
def remove(self, artifact_name):
raise ImplError("Cache '{kind}' does not implement remove()"
.format(kind=type(self).__name__))
# extract():
#
# Extract cached artifact for the specified Element if it hasn't
......@@ -225,15 +383,6 @@ class ArtifactCache():
raise ImplError("Cache '{kind}' does not implement commit()"
.format(kind=type(self).__name__))
# can_diff():
#
# Whether this cache implementation can diff (unfortunately
# there's no way to tell if an implementation is going to throw
# ImplError without abc).
#
def can_diff(self):
return False
# diff():
#
# Return a list of files that have been added or modified between
......@@ -320,6 +469,20 @@ class ArtifactCache():
raise ImplError("Cache '{kind}' does not implement link_key()"
.format(kind=type(self).__name__))
# calculate_cache_size()
#
# Return the real artifact cache size.
#
# Implementations should also use this to update estimated_size.
#
# Returns:
#
# (int) The size of the artifact cache.
#
def calculate_cache_size(self):
raise ImplError("Cache '{kind}' does not implement calculate_cache_size()"
.format(kind=type(self).__name__))
################################################
# Local Private Methods #
################################################
......@@ -356,11 +519,35 @@ class ArtifactCache():
#
def _initialize_remotes(self):
def remote_failed(url, error):
self._message(MessageType.WARN, "Failed to fetch remote refs from {}: {}".format(url, error))
self._message(MessageType.WARN, "Failed to initialize remote {}: {}".format(url, error))
with self.context.timed_activity("Initializing remote caches", silent_nested=True):
self.initialize_remotes(on_failure=remote_failed)
# _add_artifact_size()
#
# Since we cannot keep track of the cache size between threads,
# this method will be called by the main process every time a
# process that added something to the cache finishes.
#
# This will then add the reported size to
# ArtifactCache.estimated_size.
#
def _add_artifact_size(self, artifact_size):
if not self.estimated_size:
self.estimated_size = self.calculate_cache_size()
self.estimated_size += artifact_size
# _set_cache_size()
#
# Similarly to the above method, when we calculate the actual size
# in a child thread, we can't update it. We instead pass the value
# back to the main thread and update it there.
#
def _set_cache_size(self, cache_size):
self.estimated_size = cache_size
# _configured_remote_artifact_cache_specs():
#
......
This diff is collapsed.
#
# Copyright (C) 2018 Codethink Limited
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Jürg Billeter <juerg.billeter@codethink.co.uk>
from concurrent import futures
import logging
import os
import signal
import sys
import tempfile
import click
import grpc
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
from .._exceptions import ArtifactError
from .._context import Context
from .cascache import CASCache
# Trying to push an artifact that is too large
class ArtifactTooLargeException(Exception):
pass
# create_server():
#
# Create gRPC CAS artifact server as specified in the Remote Execution API.
#
# Args:
# repo (str): Path to CAS repository
# enable_push (bool): Whether to allow blob uploads and artifact updates
#
def create_server(repo, *, enable_push):
context = Context()
context.artifactdir = os.path.abspath(repo)
artifactcache = CASCache(context)
# Use max_workers default from Python 3.5+
max_workers = (os.cpu_count() or 1) * 5
server = grpc.server(futures.ThreadPoolExecutor(max_workers))
bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
_ByteStreamServicer(artifactcache, enable_push=enable_push), server)
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
_ContentAddressableStorageServicer(artifactcache), server)
buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server(
_ReferenceStorageServicer(artifactcache, enable_push=enable_push), server)
return server
@click.command(short_help="CAS Artifact Server")
@click.option('--port', '-p', type=click.INT, required=True, help="Port number")
@click.option('--server-key', help="Private server key for TLS (PEM-encoded)")
@click.option('--server-cert', help="Public server certificate for TLS (PEM-encoded)")
@click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)")
@click.option('--enable-push', default=False, is_flag=True,
help="Allow clients to upload blobs and update artifact cache")
@click.argument('repo')
def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
server = create_server(repo, enable_push=enable_push)
use_tls = bool(server_key)
if bool(server_cert) != use_tls:
click.echo("ERROR: --server-key and --server-cert are both required for TLS", err=True)
sys.exit(-1)
if client_certs and not use_tls:
click.echo("ERROR: --client-certs can only be used with --server-key", err=True)
sys.exit(-1)
if use_tls:
# Read public/private key pair
with open(server_key, 'rb') as f:
server_key_bytes = f.read()
with open(server_cert, 'rb') as f:
server_cert_bytes = f.read()
if client_certs:
with open(client_certs, 'rb') as f:
client_certs_bytes = f.read()
else:
client_certs_bytes = None
credentials = grpc.ssl_server_credentials([(server_key_bytes, server_cert_bytes)],
root_certificates=client_certs_bytes,
require_client_auth=bool(client_certs))
server.add_secure_port('[::]:{}'.format(port), credentials)
else:
server.add_insecure_port('[::]:{}'.format(port))
# Run artifact server
server.start()
try:
while True:
signal.pause()
except KeyboardInterrupt:
server.stop(0)
class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
def __init__(self, cas, *, enable_push):
super().__init__()
self.cas = cas
self.enable_push = enable_push
def Read(self, request, context):
resource_name = request.resource_name
client_digest = _digest_from_resource_name(resource_name)
assert request.read_offset <= client_digest.size_bytes
try:
with open(self.cas.objpath(client_digest), 'rb') as f:
assert os.fstat(f.fileno()).st_size == client_digest.size_bytes
if request.read_offset > 0:
f.seek(request.read_offset)
remaining = client_digest.size_bytes - request.read_offset
while remaining > 0:
chunk_size = min(remaining, 64 * 1024)
remaining -= chunk_size
response = bytestream_pb2.ReadResponse()
# max. 64 kB chunks
response.data = f.read(chunk_size)
yield response
except FileNotFoundError:
context.set_code(grpc.StatusCode.NOT_FOUND)
def Write(self, request_iterator, context):
response = bytestream_pb2.WriteResponse()
if not self.enable_push:
context.set_code(grpc.StatusCode.PERMISSION_DENIED)
return response
offset = 0
finished = False
resource_name = None
with tempfile.NamedTemporaryFile(dir=os.path.join(self.cas.casdir, 'tmp')) as out:
for request in request_iterator:
assert not finished
assert request.write_offset == offset
if resource_name is None:
# First request
resource_name = request.resource_name
client_digest = _digest_from_resource_name(resource_name)
try:
_clean_up_cache(self.cas, client_digest.size_bytes)
except ArtifactTooLargeException as e:
context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
context.set_details(str(e))
return response
elif request.resource_name:
# If it is set on subsequent calls, it **must** match the value of the first request.
assert request.resource_name == resource_name
out.write(request.data)
offset += len(request.data)
if request.finish_write:
assert client_digest.size_bytes == offset
out.flush()
digest = self.cas.add_object(path=out.name)
assert digest.hash == client_digest.hash
finished = True
assert finished
response.committed_size = offset
return response
class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
def __init__(self, cas):
super().__init__()
self.cas = cas
def FindMissingBlobs(self, request, context):
response = remote_execution_pb2.FindMissingBlobsResponse()
for digest in request.blob_digests:
if not _has_object(self.cas, digest):
d = response.missing_blob_digests.add()
d.hash = digest.hash
d.size_bytes = digest.size_bytes
return response
class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
def __init__(self, cas, *, enable_push):
super().__init__()
self.cas = cas
self.enable_push = enable_push
def GetReference(self, request, context):
response = buildstream_pb2.GetReferenceResponse()
try:
tree = self.cas.resolve_ref(request.key, update_mtime=True)
response.digest.hash = tree.hash
response.digest.size_bytes = tree.size_bytes
except ArtifactError:
context.set_code(grpc.StatusCode.NOT_FOUND)
return response
def UpdateReference(self, request, context):
response = buildstream_pb2.UpdateReferenceResponse()
if not self.enable_push:
context.set_code(grpc.StatusCode.PERMISSION_DENIED)
return response
for key in request.keys:
self.cas.set_ref(key, request.digest)
return response
def Status(self, request, context):
response = buildstream_pb2.StatusResponse()
response.allow_updates = self.enable_push
return response
def _digest_from_resource_name(resource_name):
parts = resource_name.split('/')
assert len(parts) == 2
digest = remote_execution_pb2.Digest()
digest.hash = parts[0]
digest.size_bytes = int(parts[1])
return digest
def _has_object(cas, digest):
objpath = cas.objpath(digest)
return os.path.exists(objpath)
# _clean_up_cache()
#
# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
# is enough space for the incoming artifact
#
# Args:
# cas: CASCache object
# object_size: The size of the object being received in bytes
#
# Returns:
# int: The total bytes removed on the filesystem
#
def _clean_up_cache(cas, object_size):
# Determine the available disk space, in bytes, of the file system
# which mounts the repo
stats = os.statvfs(cas.casdir)
buffer_ = int(2e9) # Add a 2 GB buffer
free_disk_space = (stats.f_bfree * stats.f_bsize) - buffer_
total_disk_space = (stats.f_blocks * stats.f_bsize) - buffer_
if object_size > total_disk_space:
raise ArtifactTooLargeException("Artifact of size: {} is too large for "
"the filesystem which mounts the remote "
"cache".format(object_size))
if object_size <= free_disk_space:
# No need to clean up
return 0
# obtain a list of LRP artifacts
LRP_artifacts = cas.list_artifacts()
removed_size = 0 # in bytes
while object_size - removed_size > free_disk_space:
try:
to_remove = LRP_artifacts.pop(0) # The first element in the list is the LRP artifact
except IndexError:
# This exception is caught if there are no more artifacts in the list
# LRP_artifacts. This means the the artifact is too large for the filesystem
# so we abort the process
raise ArtifactTooLargeException("Artifact of size {} is too large for "
"the filesystem which mounts the remote "
"cache".format(object_size))
removed_size += cas.remove(to_remove, defer_prune=False)
if removed_size > 0:
logging.info("Successfully removed {} bytes from the cache".format(removed_size))
else:
logging.info("No artifacts were removed from the cache.")
return removed_size
#
# Copyright (C) 2017-2018 Codethink Limited
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Jürg Billeter <juerg.billeter@codethink.co.uk>
import multiprocessing
import os
import signal
import tempfile
from .. import _ostree, _signals, utils
from .._exceptions import ArtifactError
from .._ostree import OSTreeError
from . import ArtifactCache
from .pushreceive import initialize_push_connection
from .pushreceive import push as push_artifact
from .pushreceive import PushException
# An OSTreeCache manages artifacts in an OSTree repository
#
# Args:
# context (Context): The BuildStream context
# project (Project): The BuildStream project
# enable_push (bool): Whether pushing is allowed by the platform
#
# Pushing is explicitly disabled by the platform in some cases,
# like when we are falling back to functioning without using
# user namespaces.
#
class OSTreeCache(ArtifactCache):
def __init__(self, context, *, enable_push):
super().__init__(context)
self.enable_push = enable_push
ostreedir = os.path.join(context.artifactdir, 'ostree')
self.repo = _ostree.ensure(ostreedir, False)
# Per-project list of OSTreeRemote instances.
self._remotes = {}
self._has_fetch_remotes = False
self._has_push_remotes = False
################################################
# Implementation of abstract methods #
################################################
def has_fetch_remotes(self, *, element=None):
if not self._has_fetch_remotes:
# No project has push remotes
return False
elif element is None:
# At least one (sub)project has fetch remotes
return True
else:
# Check whether the specified element's project has fetch remotes
remotes_for_project = self._remotes[element._get_project()]
return bool(remotes_for_project)
def has_push_remotes(self, *, element=None):
if not self._has_push_remotes:
# No project has push remotes
return False
elif element is None:
# At least one (sub)project has push remotes
return True
else:
# Check whether the specified element's project has push remotes
remotes_for_project = self._remotes[element._get_project()]
return any(remote.spec.push for remote in remotes_for_project)
def contains(self, element, key):
ref = self.get_artifact_fullname(element, key)
return _ostree.exists(self.repo, ref)
def extract(self, element, key):
ref = self.get_artifact_fullname(element, key)
# resolve ref to checksum
rev = _ostree.checksum(self.repo, ref)
# Extracting a nonexistent artifact is a bug
assert rev, "Artifact missing for {}".format(ref)
dest = os.path.join(self.extractdir, element._get_project().name, element.normal_name, rev)
if os.path.isdir(dest):
# artifact has already been extracted
return dest
os.makedirs(self.extractdir, exist_ok=True)
with tempfile.TemporaryDirectory(prefix='tmp', dir=self.extractdir) as tmpdir:
checkoutdir = os.path.join(tmpdir, ref)
_ostree.checkout(self.repo, checkoutdir, rev, user=True)
os.makedirs(os.path.dirname(dest), exist_ok=True)
try:
os.rename(checkoutdir, dest)
except OSError as e:
# With rename, it's possible to get either ENOTEMPTY or EEXIST
# in the case that the destination path is a not empty directory.
#
# If rename fails with these errors, another process beat
# us to it so just ignore.
if e.errno not in [os.errno.ENOTEMPTY, os.errno.EEXIST]:
raise ArtifactError("Failed to extract artifact for ref '{}': {}"
.format(ref, e)) from e
return dest
def commit(self, element, content, keys):
refs = [self.get_artifact_fullname(element, key) for key in keys]
try:
_ostree.commit(self.repo, content, refs)
except OSTreeError as e:
raise ArtifactError("Failed to commit artifact: {}".format(e)) from e
def can_diff(self):
return True
def diff(self, element, key_a, key_b, *, subdir=None):
_, a, _ = self.repo.read_commit(self.get_artifact_fullname(element, key_a))
_, b, _ = self.repo.read_commit(self.get_artifact_fullname(element, key_b))
if subdir:
a = a.get_child(subdir)
b = b.get_child(subdir)
subpath = a.get_path()
else:
subpath = '/'
modified, removed, added = _ostree.diff_dirs(a, b)
modified = [os.path.relpath(item.target.get_path(), subpath) for item in modified]
removed = [os.path.relpath(item.get_path(), subpath) for item in removed]
added = [os.path.relpath(item.get_path(), subpath) for item in added]
return modified, removed, added
def pull(self, element, key, *, progress=None):
project = element._get_project()
ref = self.get_artifact_fullname(element, key)
for remote in self._remotes[project]:
try:
# fetch the artifact from highest priority remote using the specified cache key
remote_name = self._ensure_remote(self.repo, remote.pull_url)
_ostree.fetch(self.repo, remote=remote_name, ref=ref, progress=progress)
return True
except OSTreeError:
# Try next remote
continue
return False
def link_key(self, element, oldkey, newkey):
oldref = self.get_artifact_fullname(element, oldkey)
newref = self.get_artifact_fullname(element, newkey)
# resolve ref to checksum
rev = _ostree.checksum(self.repo, oldref)
# create additional ref for the same checksum
_ostree.set_ref(self.repo, newref, rev)
def push(self, element, keys):
any_pushed = False
project = element._get_project()
push_remotes = [r for r in self._remotes[project] if r.spec.push]
if not push_remotes:
raise ArtifactError("Push is not enabled for any of the configured remote artifact caches.")
refs = [self.get_artifact_fullname(element, key) for key in keys]
for remote in push_remotes:
any_pushed |= self._push_to_remote(remote, element, refs)
return any_pushed
def initialize_remotes(self, *, on_failure=None):
remote_specs = self.global_remote_specs.copy()
for project in self.project_remote_specs:
remote_specs.extend(self.project_remote_specs[project])
remote_specs = list(utils._deduplicate(remote_specs))
remote_results = {}
# Callback to initialize one remote in a 'multiprocessing' subprocess.
#
# We cannot do this in the main process because of the way the tasks
# run by the main scheduler calls into libostree using
# fork()-without-exec() subprocesses. OSTree fetch operations in
# subprocesses hang if fetch operations were previously done in the
# main process.
#
def child_action(url, q):
try:
push_url, pull_url = self._initialize_remote(url)
q.put((None, push_url, pull_url))
except Exception as e: # pylint: disable=broad-except
# Whatever happens, we need to return it to the calling process
#
q.put((str(e), None, None))
# Kick off all the initialization jobs one by one.
#
# Note that we cannot use multiprocessing.Pool here because it's not
# possible to pickle local functions such as child_action().
#
q = multiprocessing.Queue()
for remote_spec in remote_specs:
p = multiprocessing.Process(target=child_action, args=(remote_spec.url, q))
try:
# Keep SIGINT blocked in the child process
with _signals.blocked([signal.SIGINT], ignore=False):
p.start()
error, push_url, pull_url = q.get()
p.join()
except KeyboardInterrupt:
utils._kill_process_tree(p.pid)
raise
if error and on_failure:
on_failure(remote_spec.url, error)
elif error:
raise ArtifactError(error)
else:
if remote_spec.push and push_url:
self._has_push_remotes = True
if pull_url:
self._has_fetch_remotes = True
remote_results[remote_spec.url] = (push_url, pull_url)
# Prepare push_urls and pull_urls for each project
for project in self.context.get_projects():
remote_specs = self.global_remote_specs
if project in self.project_remote_specs:
remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project]))
remotes = []
for remote_spec in remote_specs:
# Errors are already handled in the loop above,
# skip unreachable remotes here.
if remote_spec.url not in remote_results:
continue
push_url, pull_url = remote_results[remote_spec.url]
if remote_spec.push and not push_url:
raise ArtifactError("Push enabled but not supported by repo at: {}".format(remote_spec.url))
remote = _OSTreeRemote(remote_spec, pull_url, push_url)
remotes.append(remote)
self._remotes[project] = remotes
################################################
# Local Private Methods #
################################################
# _initialize_remote():
#
# Do protocol-specific initialization necessary to use a given OSTree
# remote.
#
# The SSH protocol that we use only supports pushing so initializing these
# involves contacting the remote to find out the corresponding pull URL.
#
# Args:
# url (str): URL of the remote
#
# Returns:
# (str, str): the pull URL and push URL for the remote
#
# Raises:
# ArtifactError: if there was an error
def _initialize_remote(self, url):
if url.startswith('ssh://'):
try:
push_url = url
pull_url = initialize_push_connection(url)
except PushException as e:
raise ArtifactError(e) from e
elif url.startswith('/'):
push_url = pull_url = 'file://' + url
elif url.startswith('file://'):
push_url = pull_url = url
elif url.startswith('http://') or url.startswith('https://'):
push_url = None
pull_url = url
else:
raise ArtifactError("Unsupported URL: {}".format(url))
return push_url, pull_url
# _ensure_remote():
#
# Ensure that our OSTree repo has a remote configured for the given URL.
# Note that SSH access to remotes is not handled by libostree itself.
#
# Args:
# repo (OSTree.Repo): an OSTree repository
# pull_url (str): the URL where libostree can pull from the remote
#
# Returns:
# (str): the name of the remote, which can be passed to various other
# operations implemented by the _ostree module.
#
# Raises:
# OSTreeError: if there was a problem reported by libostree
def _ensure_remote(self, repo, pull_url):
remote_name = utils.url_directory_name(pull_url)
_ostree.configure_remote(repo, remote_name, pull_url)
return remote_name
def _push_to_remote(self, remote, element, refs):
with utils._tempdir(dir=self.context.artifactdir, prefix='push-repo-') as temp_repo_dir:
with element.timed_activity("Preparing compressed archive"):
# First create a temporary archive-z2 repository, we can
# only use ostree-push with archive-z2 local repo.
temp_repo = _ostree.ensure(temp_repo_dir, True)
# Now push the ref we want to push into our temporary archive-z2 repo
for ref in refs:
_ostree.fetch(temp_repo, remote=self.repo.get_path().get_uri(), ref=ref)
with element.timed_activity("Sending artifact"), \
element._output_file() as output_file:
try:
pushed = push_artifact(temp_repo.get_path().get_path(),
remote.push_url,
refs, output_file)
except PushException as e:
raise ArtifactError("Failed to push artifact {}: {}".format(refs, e)) from e
return pushed
# Represents a single remote OSTree cache.
#
class _OSTreeRemote():
def __init__(self, spec, pull_url, push_url):
self.spec = spec
self.pull_url = pull_url
self.push_url = push_url
This diff is collapsed.
#
# Copyright (C) 2017 Codethink Limited
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Tristan Maat <tristan.maat@codethink.co.uk>
import os
import shutil
import tarfile
import subprocess
from .. import utils, ProgramNotFoundError
from .._exceptions import ArtifactError
from . import ArtifactCache
class TarCache(ArtifactCache):
def __init__(self, context):
super().__init__(context)
self.tardir = os.path.join(context.artifactdir, 'tar')
os.makedirs(self.tardir, exist_ok=True)
################################################
# Implementation of abstract methods #
################################################
def contains(self, element, key):
path = os.path.join(self.tardir, _tarpath(element, key))
return os.path.isfile(path)
def commit(self, element, content, keys):
os.makedirs(os.path.join(self.tardir, element._get_project().name, element.normal_name), exist_ok=True)
with utils._tempdir() as temp:
for key in keys:
ref = _tarpath(element, key)
refdir = os.path.join(temp, key)
shutil.copytree(content, refdir, symlinks=True)
_Tar.archive(os.path.join(self.tardir, ref), key, temp)
def extract(self, element, key):
fullname = self.get_artifact_fullname(element, key)
path = _tarpath(element, key)
# Extracting a nonexistent artifact is a bug
assert os.path.isfile(os.path.join(self.tardir, path)), "Artifact missing for {}".format(fullname)
# If the destination already exists, the artifact has been extracted
dest = os.path.join(self.extractdir, fullname)
if os.path.isdir(dest):
return dest
os.makedirs(self.extractdir, exist_ok=True)
with utils._tempdir(dir=self.extractdir) as tmpdir:
_Tar.extract(os.path.join(self.tardir, path), tmpdir)
os.makedirs(os.path.join(self.extractdir, element._get_project().name, element.normal_name),
exist_ok=True)
try:
os.rename(os.path.join(tmpdir, key), dest)
except OSError as e:
# With rename, it's possible to get either ENOTEMPTY or EEXIST
# in the case that the destination path is a not empty directory.
#
# If rename fails with these errors, another process beat
# us to it so just ignore.
if e.errno not in [os.errno.ENOTEMPTY, os.errno.EEXIST]:
raise ArtifactError("Failed to extract artifact '{}': {}"
.format(fullname, e)) from e
return dest
# _tarpath()
#
# Generate a relative tarball path for a given element and it's cache key
#
# Args:
# element (Element): The Element object
# key (str): The element's cache key
#
# Returns:
# (str): The relative path to use for storing tarballs
#
def _tarpath(element, key):
project = element._get_project()
return os.path.join(project.name, element.normal_name, key + '.tar.bz2')
# A helper class that contains tar archive/extract functions
class _Tar():
# archive()
#
# Attempt to archive the given tarfile with the `tar` command,
# falling back to python's `tarfile` if this fails.
#
# Args:
# location (str): The path to the tar to create
# content (str): The path to the content to archvive
# cwd (str): The cwd
#
# This is done since AIX tar does not support 2G+ files.
#
@classmethod
def archive(cls, location, content, cwd=os.getcwd()):
try:
cls._archive_with_tar(location, content, cwd)
return
except tarfile.TarError:
pass
except ProgramNotFoundError:
pass
# If the former did not complete successfully, we try with
# python's tar implementation (since it's hard to detect
# specific issues with specific tar implementations - a
# fallback).
try:
cls._archive_with_python(location, content, cwd)
except tarfile.TarError as e:
raise ArtifactError("Failed to archive {}: {}"
.format(location, e)) from e
# extract()
#
# Attempt to extract the given tarfile with the `tar` command,
# falling back to python's `tarfile` if this fails.
#
# Args:
# location (str): The path to the tar to extract
# dest (str): The destination path to extract to
#
# This is done since python tarfile extraction is horrendously
# slow (2 hrs+ for base images).
#
@classmethod
def extract(cls, location, dest):
try:
cls._extract_with_tar(location, dest)
return
except tarfile.TarError:
pass
except ProgramNotFoundError:
pass
try:
cls._extract_with_python(location, dest)
except tarfile.TarError as e:
raise ArtifactError("Failed to extract {}: {}"
.format(location, e)) from e
# _get_host_tar()
#
# Get the host tar command.
#
# Raises:
# ProgramNotFoundError: If the tar executable cannot be
# located
#
@classmethod
def _get_host_tar(cls):
tar_cmd = None
for potential_tar_cmd in ['gtar', 'tar']:
try:
tar_cmd = utils.get_host_tool(potential_tar_cmd)
break
except ProgramNotFoundError:
continue
# If we still couldn't find tar, raise the ProgramNotfounderror
if tar_cmd is None:
raise ProgramNotFoundError("Did not find tar in PATH: {}"
.format(os.environ.get('PATH')))
return tar_cmd
# _archive_with_tar()
#
# Archive with an implementation of the `tar` command
#
# Args:
# location (str): The path to the tar to create
# content (str): The path to the content to archvive
# cwd (str): The cwd
#
# Raises:
# TarError: If an error occurs during extraction
# ProgramNotFoundError: If the tar executable cannot be
# located
#
@classmethod
def _archive_with_tar(cls, location, content, cwd):
tar_cmd = cls._get_host_tar()
process = subprocess.Popen(
[tar_cmd, 'jcaf', location, content],
cwd=cwd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
_, err = process.communicate()
if process.poll() != 0:
# Clean up in case the command failed in a broken state
try:
os.remove(location)
except FileNotFoundError:
pass
raise tarfile.TarError("Failed to archive '{}': {}"
.format(content, err.decode('utf8')))
# _archive_with_python()
#
# Archive with the python `tarfile` module
#
# Args:
# location (str): The path to the tar to create
# content (str): The path to the content to archvive
# cwd (str): The cwd
#
# Raises:
# TarError: If an error occurs during extraction
#
@classmethod
def _archive_with_python(cls, location, content, cwd):
with tarfile.open(location, mode='w:bz2') as tar:
tar.add(os.path.join(cwd, content), arcname=content)
# _extract_with_tar()
#
# Extract with an implementation of the `tar` command
#
# Args:
# location (str): The path to the tar to extract
# dest (str): The destination path to extract to
#
# Raises:
# TarError: If an error occurs during extraction
#
@classmethod
def _extract_with_tar(cls, location, dest):
tar_cmd = cls._get_host_tar()
# Some tar implementations do not support '-C'
process = subprocess.Popen(
[tar_cmd, 'jxf', location],
cwd=dest,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
_, err = process.communicate()
if process.poll() != 0:
raise tarfile.TarError("Failed to extract '{}': {}"
.format(location, err.decode('utf8')))
# _extract_with_python()
#
# Extract with the python `tarfile` module
#
# Args:
# location (str): The path to the tar to extract
# dest (str): The destination path to extract to
#
# Raises:
# TarError: If an error occurs during extraction
#
@classmethod
def _extract_with_python(cls, location, dest):
with tarfile.open(location) as tar:
tar.extractall(path=dest)
......@@ -21,6 +21,7 @@ import os
import datetime
from collections import deque, Mapping
from contextlib import contextmanager
from . import utils
from . import _cachekey
from . import _signals
from . import _site
......@@ -30,6 +31,7 @@ from ._message import Message, MessageType
from ._profile import Topics, profile_start, profile_end
from ._artifactcache import ArtifactCache
from ._workspaces import Workspaces
from .plugin import _plugin_lookup
# Context()
......@@ -62,6 +64,12 @@ class Context():
# The locations from which to push and pull prebuilt artifacts
self.artifact_cache_specs = []
# The artifact cache quota
self.cache_quota = None
# The lower threshold to which we aim to reduce the cache size
self.cache_lower_threshold = None
# The directory to store build logs
self.logdir = None
......@@ -114,6 +122,8 @@ class Context():
self._projects = []
self._project_overrides = {}
self._workspaces = None
self._log_handle = None
self._log_filename = None
# load()
#
......@@ -153,6 +163,7 @@ class Context():
_yaml.node_validate(defaults, [
'sourcedir', 'builddir', 'artifactdir', 'logdir',
'scheduler', 'artifacts', 'logging', 'projects',
'cache'
])
for directory in ['sourcedir', 'builddir', 'artifactdir', 'logdir']:
......@@ -165,6 +176,53 @@ class Context():
path = os.path.normpath(path)
setattr(self, directory, path)
# Load quota configuration
# We need to find the first existing directory in the path of
# our artifactdir - the artifactdir may not have been created
# yet.
cache = _yaml.node_get(defaults, Mapping, 'cache')
_yaml.node_validate(cache, ['quota'])
artifactdir_volume = self.artifactdir
while not os.path.exists(artifactdir_volume):
artifactdir_volume = os.path.dirname(artifactdir_volume)
# We read and parse the cache quota as specified by the user
cache_quota = _yaml.node_get(cache, str, 'quota', default_value='infinity')
try:
cache_quota = utils._parse_size(cache_quota, artifactdir_volume)
except utils.UtilError as e:
raise LoadError(LoadErrorReason.INVALID_DATA,
"{}\nPlease specify the value in bytes or as a % of full disk space.\n"
"\nValid values are, for example: 800M 10G 1T 50%\n"
.format(str(e))) from e
# If we are asked not to set a quota, we set it to the maximum
# disk space available minus a headroom of 2GB, such that we
# at least try to avoid raising Exceptions.
#
# Of course, we might still end up running out during a build
# if we end up writing more than 2G, but hey, this stuff is
# already really fuzzy.
#
if cache_quota is None:
stat = os.statvfs(artifactdir_volume)
# Again, the artifact directory may not yet have been
# created
if not os.path.exists(self.artifactdir):
cache_size = 0
else:
cache_size = utils._get_dir_size(self.artifactdir)
cache_quota = cache_size + stat.f_bsize * stat.f_bavail
if 'BST_TEST_SUITE' in os.environ:
headroom = 0
else:
headroom = 2e9
self.cache_quota = cache_quota - headroom
self.cache_lower_threshold = self.cache_quota / 2
# Load artifact share configuration
self.artifact_cache_specs = ArtifactCache.specs_from_config_node(defaults)
......@@ -330,8 +388,11 @@ class Context():
if message.depth is None:
message.depth = len(list(self._message_depth))
# If we are recording messages, dump a copy into the open log file.
self._record_message(message)
# Send it off to the log handler (can be the frontend,
# or it can be the child task which will log and propagate
# or it can be the child task which will propagate
# to the frontend)
assert self._message_handler
......@@ -401,6 +462,137 @@ class Context():
self._pop_message_depth()
self.message(message)
# recorded_messages()
#
# Records all messages in a log file while the context manager
# is active.
#
# In addition to automatically writing all messages to the
# specified logging file, an open file handle for process stdout
# and stderr will be available via the Context.get_log_handle() API,
# and the full logfile path will be available via the
# Context.get_log_filename() API.
#
# Args:
# filename (str): A logging directory relative filename,
# the pid and .log extension will be automatically
# appended
#
# Yields:
# (str): The fully qualified log filename
#
@contextmanager
def recorded_messages(self, filename):
# We dont allow recursing in this context manager, and
# we also do not allow it in the main process.
assert self._log_handle is None
assert self._log_filename is None
assert not utils._is_main_process()
# Create the fully qualified logfile in the log directory,
# appending the pid and .log extension at the end.
self._log_filename = os.path.join(self.logdir,
'{}.{}.log'.format(filename, os.getpid()))
# Ensure the directory exists first
directory = os.path.dirname(self._log_filename)
os.makedirs(directory, exist_ok=True)
with open(self._log_filename, 'a') as logfile:
# Write one last line to the log and flush it to disk
def flush_log():
# If the process currently had something happening in the I/O stack
# then trying to reenter the I/O stack will fire a runtime error.
#
# So just try to flush as well as we can at SIGTERM time
try:
logfile.write('\n\nForcefully terminated\n')
logfile.flush()
except RuntimeError:
os.fsync(logfile.fileno())
self._log_handle = logfile
with _signals.terminator(flush_log):
yield self._log_filename
self._log_handle = None
self._log_filename = None
# get_log_handle()
#
# Fetches the active log handle, this will return the active
# log file handle when the Context.recorded_messages() context
# manager is active
#
# Returns:
# (file): The active logging file handle, or None
#
def get_log_handle(self):
return self._log_handle
# get_log_filename()
#
# Fetches the active log filename, this will return the active
# log filename when the Context.recorded_messages() context
# manager is active
#
# Returns:
# (str): The active logging filename, or None
#
def get_log_filename(self):
return self._log_filename
# _record_message()
#
# Records the message if recording is enabled
#
# Args:
# message (Message): The message to record
#
def _record_message(self, message):
if self._log_handle is None:
return
INDENT = " "
EMPTYTIME = "--:--:--"
template = "[{timecode: <8}] {type: <7}"
# If this message is associated with a plugin, print what
# we know about the plugin.
plugin_name = ""
if message.unique_id:
template += " {plugin}"
plugin = _plugin_lookup(message.unique_id)
plugin_name = plugin.name
template += ": {message}"
detail = ''
if message.detail is not None:
template += "\n\n{detail}"
detail = message.detail.rstrip('\n')
detail = INDENT + INDENT.join(detail.splitlines(True))
timecode = EMPTYTIME
if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2)
minutes, seconds = divmod(remainder, 60)
timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
text = template.format(timecode=timecode,
plugin=plugin_name,
type=message.message_type.upper(),
message=message.message,
detail=detail)
# Write to the open log file
self._log_handle.write('{}\n'.format(text))
self._log_handle.flush()
# _push_message_depth() / _pop_message_depth()
#
# For status messages, send the depth of timed
......
......@@ -88,6 +88,7 @@ class ErrorDomain(Enum):
ELEMENT = 11
APP = 12
STREAM = 13
VIRTUAL_FS = 14
# BstError is an internal base exception class for BuildSream
......@@ -99,7 +100,7 @@ class ErrorDomain(Enum):
#
class BstError(Exception):
def __init__(self, message, *, detail=None, domain=None, reason=None):
def __init__(self, message, *, detail=None, domain=None, reason=None, temporary=False):
global _last_exception
super().__init__(message)
......@@ -114,6 +115,11 @@ class BstError(Exception):
#
self.sandbox = None
# When this exception occurred during the handling of a job, indicate
# whether or not there is any point retrying the job.
#
self.temporary = temporary
# Error domain and reason
#
self.domain = domain
......@@ -131,8 +137,8 @@ class BstError(Exception):
# or by the base :class:`.Plugin` element itself.
#
class PluginError(BstError):
def __init__(self, message, reason=None):
super().__init__(message, domain=ErrorDomain.PLUGIN, reason=reason)
def __init__(self, message, reason=None, temporary=False):
super().__init__(message, domain=ErrorDomain.PLUGIN, reason=reason, temporary=False)
# LoadErrorReason
......@@ -249,8 +255,8 @@ class SandboxError(BstError):
# Raised when errors are encountered in the artifact caches
#
class ArtifactError(BstError):
def __init__(self, message, reason=None):
super().__init__(message, domain=ErrorDomain.ARTIFACT, reason=reason)
def __init__(self, message, *, detail=None, reason=None, temporary=False):
super().__init__(message, detail=detail, domain=ErrorDomain.ARTIFACT, reason=reason, temporary=True)
# PipelineError
......
......@@ -17,17 +17,16 @@
# Authors:
# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
from contextlib import contextmanager
import os
import sys
import resource
import traceback
import datetime
from textwrap import TextWrapper
from contextlib import contextmanager
from blessings import Terminal
import click
from click import UsageError
from blessings import Terminal
# Import buildstream public symbols
from .. import Scope
......@@ -40,6 +39,7 @@ from .._message import Message, MessageType, unconditional_messages
from .._stream import Stream
from .._versions import BST_FORMAT_VERSION
from .. import _yaml
from .._scheduler import ElementJob
# Import frontend assets
from . import Profile, LogLine, Status
......@@ -270,6 +270,10 @@ class App():
# Exit with the error
self._error_exit(e)
except RecursionError:
click.echo("RecursionError: Depency depth is too large. Maximum recursion depth exceeded.",
err=True)
sys.exit(-1)
else:
# No exceptions occurred, print session time and summary
......@@ -492,30 +496,37 @@ class App():
def _tick(self, elapsed):
self._maybe_render_status()
def _job_started(self, element, action_name):
self._status.add_job(element, action_name)
def _job_started(self, job):
self._status.add_job(job)
self._maybe_render_status()
def _job_completed(self, element, queue, action_name, success):
self._status.remove_job(element, action_name)
def _job_completed(self, job, success):
self._status.remove_job(job)
self._maybe_render_status()
# Dont attempt to handle a failure if the user has already opted to
# terminate
if not success and not self.stream.terminated:
# Get the last failure message for additional context
failure = self._fail_messages.get(element._get_unique_id())
if isinstance(job, ElementJob):
element = job.element
queue = job.queue
# Get the last failure message for additional context
failure = self._fail_messages.get(element._get_unique_id())
# XXX This is dangerous, sometimes we get the job completed *before*
# the failure message reaches us ??
if not failure:
self._status.clear()
click.echo("\n\n\nBUG: Message handling out of sync, " +
"unable to retrieve failure message for element {}\n\n\n\n\n"
.format(element), err=True)
# XXX This is dangerous, sometimes we get the job completed *before*
# the failure message reaches us ??
if not failure:
self._status.clear()
click.echo("\n\n\nBUG: Message handling out of sync, " +
"unable to retrieve failure message for element {}\n\n\n\n\n"
.format(element), err=True)
else:
self._handle_failure(element, queue, failure)
else:
self._handle_failure(element, queue, failure)
click.echo("\nTerminating all jobs\n", err=True)
self.stream.terminate()
def _handle_failure(self, element, queue, failure):
......
......@@ -408,6 +408,10 @@ def track(app, elements, deps, except_, cross_junctions):
all: All dependencies of all specified elements
"""
with app.initialized(session_name="Track"):
# Substitute 'none' for 'redirect' so that element redirections
# will be done
if deps == 'none':
deps = 'redirect'
app.stream.track(elements,
selection=deps,
except_targets=except_,
......@@ -622,24 +626,38 @@ def shell(app, element, sysroot, mount, isolate, build_, command):
##################################################################
@cli.command(short_help="Checkout a built artifact")
@click.option('--force', '-f', default=False, is_flag=True,
help="Overwrite files existing in checkout directory")
help="Allow files to be overwritten")
@click.option('--deps', '-d', default='run',
type=click.Choice(['run', 'none']),
help='The dependencies to checkout (default: run)')
@click.option('--integrate/--no-integrate', default=True, is_flag=True,
help="Whether to run integration commands")
@click.option('--hardlinks', default=False, is_flag=True,
help="Checkout hardlinks instead of copies (handle with care)")
@click.option('--tar', default=False, is_flag=True,
help="Create a tarball from the artifact contents instead "
"of a file tree. If LOCATION is '-', the tarball "
"will be dumped to the standard output.")
@click.argument('element',
type=click.Path(readable=False))
@click.argument('directory', type=click.Path(file_okay=False))
@click.argument('location', type=click.Path())
@click.pass_obj
def checkout(app, element, directory, force, integrate, hardlinks):
"""Checkout a built artifact to the specified directory
def checkout(app, element, location, force, deps, integrate, hardlinks, tar):
"""Checkout a built artifact to the specified location
"""
if hardlinks and tar:
click.echo("ERROR: options --hardlinks and --tar conflict", err=True)
sys.exit(-1)
with app.initialized():
app.stream.checkout(element,
directory=directory,
location=location,
force=force,
deps=deps,
integrate=integrate,
hardlinks=hardlinks)
hardlinks=hardlinks,
tar=tar)
##################################################################
......@@ -807,4 +825,5 @@ def source_bundle(app, element, force, directory,
app.stream.source_bundle(element, directory,
track_first=track_,
force=force,
compression=compression)
compression=compression,
except_targets=except_)
......@@ -21,6 +21,7 @@ from blessings import Terminal
# Import a widget internal for formatting time codes
from .widget import TimeCode
from .._scheduler import ElementJob
# Status()
......@@ -77,9 +78,9 @@ class Status():
# element (Element): The element of the job to track
# action_name (str): The action name for this job
#
def add_job(self, element, action_name):
def add_job(self, job):
elapsed = self._stream.elapsed_time
job = _StatusJob(self._context, element, action_name, self._content_profile, self._format_profile, elapsed)
job = _StatusJob(self._context, job, self._content_profile, self._format_profile, elapsed)
self._jobs.append(job)
self._need_alloc = True
......@@ -91,7 +92,13 @@ class Status():
# element (Element): The element of the job to track
# action_name (str): The action name for this job
#
def remove_job(self, element, action_name):
def remove_job(self, job):
action_name = job.action_name
if not isinstance(job, ElementJob):
element = None
else:
element = job.element
self._jobs = [
job for job in self._jobs
if not (job.element is element and
......@@ -358,15 +365,19 @@ class _StatusHeader():
#
# Args:
# context (Context): The Context
# element (Element): The element being processed
# action_name (str): The name of the action
# job (Job): The job being processed
# content_profile (Profile): Formatting profile for content text
# format_profile (Profile): Formatting profile for formatting text
# elapsed (datetime): The offset into the session when this job is created
#
class _StatusJob():
def __init__(self, context, element, action_name, content_profile, format_profile, elapsed):
def __init__(self, context, job, content_profile, format_profile, elapsed):
action_name = job.action_name
if not isinstance(job, ElementJob):
element = None
else:
element = job.element
#
# Public members
......@@ -374,6 +385,7 @@ class _StatusJob():
self.element = element # The Element
self.action_name = action_name # The action name
self.size = None # The number of characters required to render
self.full_name = element._get_full_name() if element else action_name
#
# Private members
......@@ -386,7 +398,7 @@ class _StatusJob():
# Calculate the size needed to display
self.size = 10 # Size of time code with brackets
self.size += len(action_name)
self.size += len(element._get_full_name())
self.size += len(self.full_name)
self.size += 3 # '[' + ':' + ']'
# render()
......@@ -403,7 +415,7 @@ class _StatusJob():
self._format_profile.fmt(']')
# Add padding after the display name, before terminating ']'
name = self.element._get_full_name() + (' ' * padding)
name = self.full_name + (' ' * padding)
text += self._format_profile.fmt('[') + \
self._content_profile.fmt(self.action_name) + \
self._format_profile.fmt(':') + \
......
......@@ -26,7 +26,6 @@
# pylint: disable=bad-exception-context,catching-non-exception
import os
from collections import namedtuple
import gi
from gi.repository.GLib import Variant, VariantDict
......@@ -116,80 +115,6 @@ def checkout(repo, path, commit_, user=False):
raise OSTreeError("Failed to checkout commit '{}': {}".format(commit_, e.message)) from e
# commit():
#
# Commit built artifact to cache.
#
# Files are all recorded with uid/gid 0
#
# Args:
# repo (OSTree.Repo): The repo
# dir_ (str): The source directory to commit to the repo
# refs (list): A list of symbolic references (tag) for the commit
#
def commit(repo, dir_, refs):
def commit_filter(repo, path, file_info):
# For now, just set everything in the repo as uid/gid 0
#
# In the future we'll want to extract virtualized file
# attributes from a fuse layer and use that.
#
file_info.set_attribute_uint32('unix::uid', 0)
file_info.set_attribute_uint32('unix::gid', 0)
return OSTree.RepoCommitFilterResult.ALLOW
commit_modifier = OSTree.RepoCommitModifier.new(
OSTree.RepoCommitModifierFlags.NONE, commit_filter)
repo.prepare_transaction()
try:
# add tree to repository
mtree = OSTree.MutableTree.new()
repo.write_directory_to_mtree(Gio.File.new_for_path(dir_),
mtree, commit_modifier)
_, root = repo.write_mtree(mtree)
# create root commit object, no parent, no branch
_, rev = repo.write_commit(None, None, None, None, root)
# create refs
for ref in refs:
repo.transaction_set_ref(None, ref, rev)
# complete repo transaction
repo.commit_transaction(None)
except GLib.GError as e:
# Reraise any error as a buildstream error
repo.abort_transaction()
raise OSTreeError(e.message) from e
# set_ref():
#
# Set symbolic reference to specified revision.
#
# Args:
# repo (OSTree.Repo): The repo
# ref (str): A symbolic reference (tag) for the commit
# rev (str): Commit checksum
#
def set_ref(repo, ref, rev):
repo.prepare_transaction()
try:
repo.transaction_set_ref(None, ref, rev)
# complete repo transaction
repo.commit_transaction(None)
except:
repo.abort_transaction()
raise
# exists():
#
# Checks wether a given commit or symbolic ref exists and
......@@ -224,42 +149,6 @@ def exists(repo, ref):
return has_object
# remove():
#
# Removes the given commit or symbolic ref from the repo.
#
# Args:
# repo (OSTree.Repo): The repo
# ref (str): A commit checksum or symbolic ref
# defer_prune (bool): Whether to defer pruning to the caller. NOTE:
# The space won't be freed until you manually
# call repo.prune.
#
# Returns:
# (int|None) The amount of space pruned from the repository in
# Bytes, or None if defer_prune is True
#
def remove(repo, ref, *, defer_prune=False):
# Get the commit checksum, this will:
#
# o Return a commit checksum if ref is a symbolic branch
# o Return the same commit checksum if ref is a valid commit checksum
# o Return None if the ostree repo doesnt know this ref.
#
check = checksum(repo, ref)
if check is None:
raise OSTreeError("Could not find artifact for ref '{}'".format(ref))
repo.set_ref_immediate(None, ref, None)
if not defer_prune:
_, _, _, pruned = repo.prune(OSTree.RepoPruneFlags.REFS_ONLY, -1)
return pruned
return None
# checksum():
#
# Returns the commit checksum for a given symbolic ref,
......@@ -279,172 +168,6 @@ def checksum(repo, ref):
return checksum_
OSTREE_GIO_FAST_QUERYINFO = ("standard::name,standard::type,standard::size,"
"standard::is-symlink,standard::symlink-target,"
"unix::device,unix::inode,unix::mode,unix::uid,"
"unix::gid,unix::rdev")
DiffItem = namedtuple('DiffItem', ['src', 'src_info',
'target', 'target_info',
'src_checksum', 'target_checksum'])
# diff_dirs():
#
# Compute the difference between directory a and b as 3 separate sets
# of OSTree.DiffItem.
#
# This is more-or-less a direct port of OSTree.diff_dirs (which cannot
# be used via PyGobject), but does not support options.
#
# Args:
# a (Gio.File): The first directory for the comparison.
# b (Gio.File): The second directory for the comparison.
#
# Returns:
# (modified, removed, added)
#
def diff_dirs(a, b):
# get_file_checksum():
#
# Helper to compute the checksum of an arbitrary file (different
# objects have different methods to compute these).
#
def get_file_checksum(f, f_info):
if isinstance(f, OSTree.RepoFile):
return f.get_checksum()
else:
contents = None
if f_info.get_file_type() == Gio.FileType.REGULAR:
contents = f.read()
csum = OSTree.checksum_file_from_input(f_info, None, contents,
OSTree.ObjectType.FILE)
return OSTree.checksum_from_bytes(csum)
# diff_files():
#
# Helper to compute a diff between two files.
#
def diff_files(a, a_info, b, b_info):
checksum_a = get_file_checksum(a, a_info)
checksum_b = get_file_checksum(b, b_info)
if checksum_a != checksum_b:
return DiffItem(a, a_info, b, b_info, checksum_a, checksum_b)
return None
# diff_add_dir_recurse():
#
# Helper to collect all files in a directory recursively.
#
def diff_add_dir_recurse(d):
added = []
dir_enum = d.enumerate_children(OSTREE_GIO_FAST_QUERYINFO,
Gio.FileQueryInfoFlags.NOFOLLOW_SYMLINKS)
for child_info in dir_enum:
name = child_info.get_name()
child = d.get_child(name)
added.append(child)
if child_info.get_file_type() == Gio.FileType.DIRECTORY:
added.extend(diff_add_dir_recurse(child))
return added
modified = []
removed = []
added = []
child_a_info = a.query_info(OSTREE_GIO_FAST_QUERYINFO,
Gio.FileQueryInfoFlags.NOFOLLOW_SYMLINKS)
child_b_info = b.query_info(OSTREE_GIO_FAST_QUERYINFO,
Gio.FileQueryInfoFlags.NOFOLLOW_SYMLINKS)
# If both are directories and have the same checksum, we know that
# none of the underlying files changed, so we can save time.
if (child_a_info.get_file_type() == Gio.FileType.DIRECTORY and
child_b_info.get_file_type() == Gio.FileType.DIRECTORY and
isinstance(a, OSTree.RepoFileClass) and
isinstance(b, OSTree.RepoFileClass)):
if a.tree_get_contents_checksum() == b.tree_get_contents_checksum():
return modified, removed, added
# We walk through 'a' first
dir_enum = a.enumerate_children(OSTREE_GIO_FAST_QUERYINFO,
Gio.FileQueryInfoFlags.NOFOLLOW_SYMLINKS)
for child_a_info in dir_enum:
name = child_a_info.get_name()
child_a = a.get_child(name)
child_a_type = child_a_info.get_file_type()
try:
child_b = b.get_child(name)
child_b_info = child_b.query_info(OSTREE_GIO_FAST_QUERYINFO,
Gio.FileQueryInfoFlags.NOFOLLOW_SYMLINKS)
except GLib.Error as e:
# If the file does not exist in b, it has been removed
if e.matches(Gio.io_error_quark(), Gio.IOErrorEnum.NOT_FOUND):
removed.append(child_a)
continue
else:
raise
# If the files differ but are of different types, we report a
# modification, saving a bit of time because we won't need a
# checksum
child_b_type = child_b_info.get_file_type()
if child_a_type != child_b_type:
diff_item = DiffItem(child_a, child_a_info,
child_b, child_b_info,
None, None)
modified.append(diff_item)
# Finally, we compute checksums and compare the file contents directly
else:
diff_item = diff_files(child_a, child_a_info, child_b, child_b_info)
if diff_item:
modified.append(diff_item)
# If the files are both directories, we recursively use
# this function to find differences - saving time if they
# are equal.
if child_a_type == Gio.FileType.DIRECTORY:
subdir = diff_dirs(child_a, child_b)
modified.extend(subdir[0])
removed.extend(subdir[1])
added.extend(subdir[2])
# Now we walk through 'b' to find any files that were added
dir_enum = b.enumerate_children(OSTREE_GIO_FAST_QUERYINFO,
Gio.FileQueryInfoFlags.NOFOLLOW_SYMLINKS)
for child_b_info in dir_enum:
name = child_b_info.get_name()
child_b = b.get_child(name)
try:
child_a = a.get_child(name)
child_a_info = child_a.query_info(OSTREE_GIO_FAST_QUERYINFO,
Gio.FileQueryInfoFlags.NOFOLLOW_SYMLINKS)
except GLib.Error as e:
# If the file does not exist in 'a', it was added.
if e.matches(Gio.io_error_quark(), Gio.IOErrorEnum.NOT_FOUND):
added.append(child_b)
if child_b_info.get_file_type() == Gio.FileType.DIRECTORY:
added.extend(diff_add_dir_recurse(child_b))
continue
else:
raise
return modified, removed, added
# fetch()
#
# Fetch new objects from a remote, if configured
......@@ -551,47 +274,3 @@ def configure_remote(repo, remote, url, key_url=None):
repo.remote_gpg_import(remote, stream, None, 0, None)
except GLib.GError as e:
raise OSTreeError("Failed to add gpg key from url '{}': {}".format(key_url, e.message)) from e
# list_artifacts():
#
# List cached artifacts in Least Recently Modified (LRM) order.
#
# Returns:
# (list) - A list of refs in LRM order
#
def list_artifacts(repo):
# string of: /path/to/repo/refs/heads
ref_heads = os.path.join(repo.get_path().get_path(), 'refs', 'heads')
# obtain list of <project>/<element>/<key>
refs = _list_all_refs(repo).keys()
mtimes = []
for ref in refs:
ref_path = os.path.join(ref_heads, ref)
if os.path.exists(ref_path):
# Obtain the mtime (the time a file was last modified)
mtimes.append(os.path.getmtime(ref_path))
# NOTE: Sorted will sort from earliest to latest, thus the
# first element of this list will be the file modified earliest.
return [ref for _, ref in sorted(zip(mtimes, refs))]
# _list_all_refs():
#
# Create a list of all refs.
#
# Args:
# repo (OSTree.Repo): The repo
#
# Returns:
# (dict): A dict of refs to checksums.
#
def _list_all_refs(repo):
try:
_, refs = repo.list_refs(None)
return refs
except GLib.GError as e:
raise OSTreeError(message=e.message) from e
......@@ -44,6 +44,9 @@ class PipelineSelection():
# Select only the target elements in the associated targets
NONE = 'none'
# As NONE, but redirect elements that are capable of it
REDIRECT = 'redirect'
# Select elements which must be built for the associated targets to be built
PLAN = 'plan'
......@@ -215,6 +218,8 @@ class Pipeline():
elements = None
if mode == PipelineSelection.NONE:
elements = targets
elif mode == PipelineSelection.REDIRECT:
# Redirect and log if permitted
elements = []
for t in targets:
......