Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • willsalmon/buildstream
  • CumHoleZH/buildstream
  • tchaik/buildstream
  • DCotyPortfolio/buildstream
  • jesusoctavioas/buildstream
  • patrickmmartin/buildstream
  • franred/buildstream
  • tintou/buildstream
  • alatiera/buildstream
  • martinblanchard/buildstream
  • neverdie22042524/buildstream
  • Mattlk13/buildstream
  • PServers/buildstream
  • phamnghia610909/buildstream
  • chiaratolentino/buildstream
  • eysz7-x-x/buildstream
  • kerrick1/buildstream
  • matthew-yates/buildstream
  • twofeathers/buildstream
  • mhadjimichael/buildstream
  • pointswaves/buildstream
  • Mr.JackWilson/buildstream
  • Tw3akG33k/buildstream
  • AlexFazakas/buildstream
  • eruidfkiy/buildstream
  • clamotion2/buildstream
  • nanonyme/buildstream
  • wickyjaaa/buildstream
  • nmanchev/buildstream
  • bojorquez.ja/buildstream
  • mostynb/buildstream
  • highpit74/buildstream
  • Demo112/buildstream
  • ba2014sheer/buildstream
  • tonimadrino/buildstream
  • usuario2o/buildstream
  • Angelika123456/buildstream
  • neo355/buildstream
  • corentin-ferlay/buildstream
  • coldtom/buildstream
  • wifitvbox81/buildstream
  • 358253885/buildstream
  • seanborg/buildstream
  • SotK/buildstream
  • DouglasWinship/buildstream
  • karansthr97/buildstream
  • louib/buildstream
  • bwh-ct/buildstream
  • robjh/buildstream
  • we88c0de/buildstream
  • zhengxian5555/buildstream
51 results
Show changes
Commits on Source (26)
Showing
with 230 additions and 105 deletions
......@@ -54,7 +54,6 @@ _MAX_PAYLOAD_BYTES = 1024 * 1024
#
# Args:
# context (Context): The BuildStream context
# enable_push (bool): Whether pushing is allowed by the platform
#
# Pushing is explicitly disabled by the platform in some cases,
# like when we are falling back to functioning without using
......@@ -62,7 +61,7 @@ _MAX_PAYLOAD_BYTES = 1024 * 1024
#
class CASCache(ArtifactCache):
def __init__(self, context, *, enable_push=True):
def __init__(self, context):
super().__init__(context)
self.casdir = os.path.join(context.artifactdir, 'cas')
......@@ -71,8 +70,6 @@ class CASCache(ArtifactCache):
self._calculate_cache_quota()
self._enable_push = enable_push
# Per-project list of _CASRemote instances.
self._remotes = {}
......@@ -214,7 +211,7 @@ class CASCache(ArtifactCache):
return bool(remotes_for_project)
def has_push_remotes(self, *, element=None):
if not self._has_push_remotes or not self._enable_push:
if not self._has_push_remotes:
# No project has push remotes
return False
elif element is None:
......
......@@ -35,8 +35,6 @@ from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
from .._exceptions import ArtifactError
from .._context import Context
from .cascache import CASCache
# The default limit for gRPC messages is 4 MiB.
# Limit payload to 1 MiB to leave sufficient headroom for metadata.
......@@ -60,7 +58,7 @@ def create_server(repo, *, enable_push):
context = Context()
context.artifactdir = os.path.abspath(repo)
artifactcache = CASCache(context)
artifactcache = context.artifactcache
# Use max_workers default from Python 3.5+
max_workers = (os.cpu_count() or 1) * 5
......
......@@ -30,6 +30,7 @@ from ._exceptions import LoadError, LoadErrorReason, BstError
from ._message import Message, MessageType
from ._profile import Topics, profile_start, profile_end
from ._artifactcache import ArtifactCache
from ._artifactcache.cascache import CASCache
from ._workspaces import Workspaces
from .plugin import _plugin_lookup
......@@ -113,6 +114,7 @@ class Context():
self._cache_key = None
self._message_handler = None
self._message_depth = deque()
self._artifactcache = None
self._projects = []
self._project_overrides = {}
self._workspaces = None
......@@ -227,6 +229,13 @@ class Context():
"{}: on-error should be one of: {}".format(
provenance, ", ".join(valid_actions)))
@property
def artifactcache(self):
if not self._artifactcache:
self._artifactcache = CASCache(self)
return self._artifactcache
# add_project():
#
# Add a project to the context.
......
......@@ -115,14 +115,6 @@ class App():
else:
self.colors = False
# Increase the soft limit for open file descriptors to the maximum.
# SafeHardlinks FUSE needs to hold file descriptors for all processes in the sandbox.
# Avoid hitting the limit too quickly.
limits = resource.getrlimit(resource.RLIMIT_NOFILE)
if limits[0] != limits[1]:
# Set soft limit to hard limit
resource.setrlimit(resource.RLIMIT_NOFILE, (limits[1], limits[1]))
# create()
#
# Should be used instead of the regular constructor.
......@@ -198,7 +190,7 @@ class App():
if option_value is not None:
setattr(self.context, context_attr, option_value)
try:
Platform.create_instance(self.context)
Platform.get_platform()
except BstError as e:
self._error_exit(e, "Error instantiating platform")
......
......@@ -28,7 +28,6 @@ from .. import Consistency
from .. import _yaml
from ..element import Element
from .._profile import Topics, profile_start, profile_end
from .._platform import Platform
from .._includes import Includes
from .types import Symbol, Dependency
......@@ -518,8 +517,7 @@ class Loader():
raise LoadError(LoadErrorReason.INVALID_DATA,
"{}: Expected junction but element kind is {}".format(filename, meta_element.kind))
platform = Platform.get_platform()
element = Element._new_from_meta(meta_element, platform.artifactcache)
element = Element._new_from_meta(meta_element, self._context.artifactcache)
element._preflight()
sources = list(element.sources())
......
#
# Copyright (C) 2017 Codethink Limited
# Copyright (C) 2018 Bloomberg Finance LP
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
import os
import resource
from .._exceptions import PlatformError
from ..sandbox import SandboxDummy
from . import Platform
class Darwin(Platform):
# This value comes from OPEN_MAX in syslimits.h
OPEN_MAX = 10240
def __init__(self, context):
super().__init__(context)
def create_sandbox(self, *args, **kwargs):
return SandboxDummy(*args, **kwargs)
def check_sandbox_config(self, config):
# Accept all sandbox configs as it's irrelevant with the dummy sandbox (no Sandbox.run).
return True
def get_cpu_count(self, cap=None):
if cap < os.cpu_count():
return cap
else:
return os.cpu_count()
def set_resource_limits(self, soft_limit=OPEN_MAX, hard_limit=None):
super().set_resource_limits(soft_limit)
......@@ -17,42 +17,62 @@
# Authors:
# Tristan Maat <tristan.maat@codethink.co.uk>
import os
import subprocess
from .. import _site
from .. import utils
from .._artifactcache.cascache import CASCache
from .._message import Message, MessageType
from ..sandbox import SandboxBwrap
from ..sandbox import SandboxDummy
from . import Platform
class Linux(Platform):
def __init__(self, context):
def __init__(self):
super().__init__(context)
super().__init__()
self._uid = os.geteuid()
self._gid = os.getegid()
self._die_with_parent_available = _site.check_bwrap_version(0, 1, 8)
self._user_ns_available = self._check_user_ns_available(context)
self._artifact_cache = CASCache(context, enable_push=self._user_ns_available)
@property
def artifactcache(self):
return self._artifact_cache
if self._local_sandbox_available():
self._user_ns_available = self._check_user_ns_available()
else:
self._user_ns_available = False
def create_sandbox(self, *args, **kwargs):
# Inform the bubblewrap sandbox as to whether it can use user namespaces or not
kwargs['user_ns_available'] = self._user_ns_available
kwargs['die_with_parent_available'] = self._die_with_parent_available
return SandboxBwrap(*args, **kwargs)
if not self._local_sandbox_available():
return SandboxDummy(*args, **kwargs)
else:
from ..sandbox._sandboxbwrap import SandboxBwrap
# Inform the bubblewrap sandbox as to whether it can use user namespaces or not
kwargs['user_ns_available'] = self._user_ns_available
kwargs['die_with_parent_available'] = self._die_with_parent_available
return SandboxBwrap(*args, **kwargs)
def check_sandbox_config(self, config):
if self._user_ns_available:
# User namespace support allows arbitrary build UID/GID settings.
return True
else:
# Without user namespace support, the UID/GID in the sandbox
# will match the host UID/GID.
return config.build_uid == self._uid and config.build_gid == self._gid
################################################
# Private Methods #
################################################
def _check_user_ns_available(self, context):
def _local_sandbox_available(self):
try:
return os.path.exists(utils.get_host_tool('bwrap')) and os.path.exists('/dev/fuse')
except utils.ProgramNotFoundError:
return False
def _check_user_ns_available(self):
# Here, lets check if bwrap is able to create user namespaces,
# issue a warning if it's not available, and save the state
# locally so that we can inform the sandbox to not try it
......@@ -75,9 +95,4 @@ class Linux(Platform):
return True
else:
context.message(
Message(None, MessageType.WARN,
"Unable to create user namespaces with bubblewrap, resorting to fallback",
detail="Some builds may not function due to lack of uid / gid 0, " +
"artifacts created will not be trusted for push purposes."))
return False
......@@ -19,6 +19,7 @@
import os
import sys
import resource
from .._exceptions import PlatformError, ImplError
......@@ -29,50 +30,44 @@ class Platform():
# Platform()
#
# A class to manage platform-specific details. Currently holds the
# sandbox factory, the artifact cache and staging operations, as
# well as platform helpers.
# sandbox factory as well as platform helpers.
#
# Args:
# context (context): The project context
#
def __init__(self, context):
self.context = context
def __init__(self):
self.set_resource_limits()
@classmethod
def create_instance(cls, *args, **kwargs):
if sys.platform.startswith('linux'):
backend = 'linux'
else:
backend = 'unix'
def _create_instance(cls):
# Meant for testing purposes and therefore hidden in the
# deepest corners of the source code. Try not to abuse this,
# please?
if os.getenv('BST_FORCE_BACKEND'):
backend = os.getenv('BST_FORCE_BACKEND')
elif sys.platform.startswith('linux'):
backend = 'linux'
elif sys.platform.startswith('darwin'):
backend = 'darwin'
else:
backend = 'unix'
if backend == 'linux':
from .linux import Linux as PlatformImpl
elif backend == 'darwin':
from .darwin import Darwin as PlatformImpl
elif backend == 'unix':
from .unix import Unix as PlatformImpl
else:
raise PlatformError("No such platform: '{}'".format(backend))
cls._instance = PlatformImpl(*args, **kwargs)
cls._instance = PlatformImpl()
@classmethod
def get_platform(cls):
if not cls._instance:
raise PlatformError("Platform needs to be initialized first")
cls._create_instance()
return cls._instance
##################################################################
# Platform properties #
##################################################################
@property
def artifactcache(self):
raise ImplError("Platform {platform} does not implement an artifactcache"
.format(platform=type(self).__name__))
def get_cpu_count(self, cap=None):
return min(len(os.sched_getaffinity(0)), cap)
##################################################################
# Sandbox functions #
......@@ -92,3 +87,19 @@ class Platform():
def create_sandbox(self, *args, **kwargs):
raise ImplError("Platform {platform} does not implement create_sandbox()"
.format(platform=type(self).__name__))
def check_sandbox_config(self, config):
raise ImplError("Platform {platform} does not implement check_sandbox_config()"
.format(platform=type(self).__name__))
def set_resource_limits(self, soft_limit=None, hard_limit=None):
# Need to set resources for _frontend/app.py as this is dependent on the platform
# SafeHardlinks FUSE needs to hold file descriptors for all processes in the sandbox.
# Avoid hitting the limit too quickly.
limits = resource.getrlimit(resource.RLIMIT_NOFILE)
if limits[0] != limits[1]:
if soft_limit is None:
soft_limit = limits[1]
if hard_limit is None:
hard_limit = limits[1]
resource.setrlimit(resource.RLIMIT_NOFILE, (soft_limit, hard_limit))
......@@ -19,27 +19,29 @@
import os
from .._artifactcache.cascache import CASCache
from .._exceptions import PlatformError
from ..sandbox import SandboxChroot
from . import Platform
class Unix(Platform):
def __init__(self, context):
def __init__(self):
super().__init__(context)
self._artifact_cache = CASCache(context)
super().__init__()
self._uid = os.geteuid()
self._gid = os.getegid()
# Not necessarily 100% reliable, but we want to fail early.
if os.geteuid() != 0:
if self._uid != 0:
raise PlatformError("Root privileges are required to run without bubblewrap.")
@property
def artifactcache(self):
return self._artifact_cache
def create_sandbox(self, *args, **kwargs):
from ..sandbox._sandboxchroot import SandboxChroot
return SandboxChroot(*args, **kwargs)
def check_sandbox_config(self, config):
# With the chroot sandbox, the UID/GID in the sandbox
# will match the host UID/GID (typically 0/0).
return config.build_uid == self._uid and config.build_gid == self._gid
......@@ -38,6 +38,7 @@ from ._loader import Loader
from .element import Element
from ._message import Message, MessageType
from ._includes import Includes
from ._platform import Platform
# Project Configuration file
......@@ -617,7 +618,8 @@ class Project():
# Based on some testing (mainly on AWS), maximum effective
# max-jobs value seems to be around 8-10 if we have enough cores
# users should set values based on workload and build infrastructure
output.base_variables['max-jobs'] = str(min(len(os.sched_getaffinity(0)), 8))
platform = Platform.get_platform()
output.base_variables['max-jobs'] = str(platform.get_cpu_count(8))
# Export options into variables, if that was requested
output.options.export_variables(output.base_variables)
......
......@@ -17,7 +17,6 @@
# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
#
from .job import Job
from ..._platform import Platform
class CacheSizeJob(Job):
......@@ -25,8 +24,8 @@ class CacheSizeJob(Job):
super().__init__(*args, **kwargs)
self._complete_cb = complete_cb
platform = Platform.get_platform()
self._artifacts = platform.artifactcache
context = self._scheduler.context
self._artifacts = context.artifactcache
def child_process(self):
return self._artifacts.compute_cache_size()
......
......@@ -17,15 +17,14 @@
# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
#
from .job import Job
from ..._platform import Platform
class CleanupJob(Job):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
platform = Platform.get_platform()
self._artifacts = platform.artifactcache
context = self._scheduler.context
self._artifacts = context.artifactcache
def child_process(self):
return self._artifacts.clean()
......
......@@ -24,7 +24,6 @@ from . import Queue, QueueStatus
from ..jobs import ElementJob
from ..resources import ResourceType
from ..._message import MessageType
from ..._platform import Platform
# A queue which assembles elements
......@@ -94,8 +93,8 @@ class BuildQueue(Queue):
# as returned from Element._assemble() to the estimated
# artifact cache size
#
platform = Platform.get_platform()
artifacts = platform.artifactcache
context = self._scheduler.context
artifacts = context.artifactcache
artifacts.add_artifact_size(artifact_size)
......
......@@ -29,7 +29,6 @@ from contextlib import contextmanager
# Local imports
from .resources import Resources, ResourceType
from .jobs import CacheSizeJob, CleanupJob
from .._platform import Platform
# A decent return code for Scheduler.run()
......@@ -348,8 +347,8 @@ class Scheduler():
# which will report the calculated cache size.
#
def _run_cleanup(self, cache_size):
platform = Platform.get_platform()
artifacts = platform.artifactcache
context = self.context
artifacts = context.artifactcache
if not artifacts.has_quota_exceeded():
return
......
......@@ -32,7 +32,6 @@ from ._exceptions import StreamError, ImplError, BstError, set_last_task_error
from ._message import Message, MessageType
from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue
from ._pipeline import Pipeline, PipelineSelection
from ._platform import Platform
from . import utils, _yaml, _site
from . import Scope, Consistency
......@@ -71,8 +70,7 @@ class Stream():
#
# Private members
#
self._platform = Platform.get_platform()
self._artifacts = self._platform.artifactcache
self._artifacts = context.artifactcache
self._context = context
self._project = project
self._pipeline = Pipeline(context, project, self._artifacts)
......
......@@ -246,15 +246,23 @@ class Element(Plugin):
self.__config = self.__extract_config(meta)
self._configure(self.__config)
# Extract Sandbox config
self.__sandbox_config = self.__extract_sandbox_config(meta)
# Extract remote execution URL
if not self.__is_junction:
self.__remote_execution_url = project.remote_execution_url
else:
self.__remote_execution_url = None
# Extract Sandbox config
self.__sandbox_config = self.__extract_sandbox_config(meta)
self.__sandbox_config_supported = True
if not self.__use_remote_execution():
platform = Platform.get_platform()
if not platform.check_sandbox_config(self.__sandbox_config):
# Local sandbox does not fully support specified sandbox config.
# This will taint the artifact, disable pushing.
self.__sandbox_config_supported = False
def __lt__(self, other):
return self.name < other.name
......@@ -1521,6 +1529,11 @@ class Element(Plugin):
context = self._get_context()
with self._output_file() as output_file:
if not self.__sandbox_config_supported:
self.warn("Sandbox configuration is not supported by the platform.",
detail="Falling back to UID {} GID {}. Artifact will not be pushed."
.format(self.__sandbox_config.build_uid, self.__sandbox_config.build_gid))
# Explicitly clean it up, keep the build dir around if exceptions are raised
os.makedirs(context.builddir, exist_ok=True)
rootdir = tempfile.mkdtemp(prefix="{}-".format(self.normal_name), dir=context.builddir)
......@@ -2110,10 +2123,19 @@ class Element(Plugin):
workspaced_dependencies = self.__get_artifact_metadata_workspaced_dependencies()
# Other conditions should be or-ed
self.__tainted = workspaced or workspaced_dependencies
self.__tainted = (workspaced or workspaced_dependencies or
not self.__sandbox_config_supported)
return self.__tainted
# __use_remote_execution():
#
# Returns True if remote execution is configured and the element plugin
# supports it.
#
def __use_remote_execution(self):
return self.__remote_execution_url and self.BST_VIRTUAL_DIRECTORY
# __sandbox():
#
# A context manager to prepare a Sandbox object at the specified directory,
......@@ -2135,9 +2157,7 @@ class Element(Plugin):
project = self._get_project()
platform = Platform.get_platform()
if (directory is not None and
self.__remote_execution_url and
self.BST_VIRTUAL_DIRECTORY):
if directory is not None and self.__use_remote_execution():
self.info("Using a remote sandbox for artifact {} with directory '{}'".format(self.name, directory))
......
......@@ -18,6 +18,5 @@
# Tristan Maat <tristan.maat@codethink.co.uk>
from .sandbox import Sandbox, SandboxFlags
from ._sandboxchroot import SandboxChroot
from ._sandboxbwrap import SandboxBwrap
from ._sandboxremote import SandboxRemote
from ._sandboxdummy import SandboxDummy
#
# Copyright (C) 2017 Codethink Limited
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
from .._exceptions import SandboxError
from . import Sandbox
class SandboxDummy(Sandbox):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def run(self, command, flags, *, cwd=None, env=None):
# Fallback to the sandbox default settings for
# the cwd and env.
#
cwd = self._get_work_directory(cwd=cwd)
env = self._get_environment(cwd=cwd, env=env)
if not self._has_command(command[0], env):
raise SandboxError("Staged artifacts do not provide command "
"'{}'".format(command[0]),
reason='missing-command')
raise SandboxError("This platform does not support local builds")
......@@ -27,7 +27,6 @@ from . import Sandbox
from ..storage._filebaseddirectory import FileBasedDirectory
from ..storage._casbaseddirectory import CasBasedDirectory
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from .._platform import Platform
class SandboxError(Exception):
......@@ -71,8 +70,8 @@ class SandboxRemote(Sandbox):
output_files=[],
output_directories=[self._output_directory],
platform=None)
platform = Platform.get_platform()
cascache = platform.artifactcache
context = self._get_context()
cascache = context.artifactcache
# Upload the Command message to the remote CAS server
command_digest = cascache.push_message(self._get_project(), remote_command)
if not command_digest or not cascache.verify_digest_pushed(self._get_project(), command_digest):
......@@ -134,8 +133,8 @@ class SandboxRemote(Sandbox):
if tree_digest is None or not tree_digest.hash:
raise SandboxError("Output directory structure had no digest attached.")
platform = Platform.get_platform()
cascache = platform.artifactcache
context = self._get_context()
cascache = context.artifactcache
# Now do a pull to ensure we have the necessary parts.
dir_digest = cascache.pull_tree(self._get_project(), tree_digest)
if dir_digest is None or not dir_digest.hash or not dir_digest.size_bytes:
......@@ -170,8 +169,8 @@ class SandboxRemote(Sandbox):
upload_vdir.recalculate_hash()
platform = Platform.get_platform()
cascache = platform.artifactcache
context = self._get_context()
cascache = context.artifactcache
# Now, push that key (without necessarily needing a ref) to the remote.
cascache.push_directory(self._get_project(), upload_vdir)
if not cascache.verify_digest_pushed(self._get_project(), upload_vdir.ref):
......
......@@ -38,7 +38,6 @@ from .._exceptions import BstError
from .directory import Directory, VirtualDirectoryError
from ._filebaseddirectory import FileBasedDirectory
from ..utils import FileListResult, safe_copy, list_relative_paths
from .._artifactcache.cascache import CASCache
class IndexEntry():
......@@ -80,7 +79,7 @@ class CasBasedDirectory(Directory):
self.filename = filename
self.common_name = common_name
self.pb2_directory = remote_execution_pb2.Directory()
self.cas_cache = CASCache(context)
self.cas_cache = context.artifactcache
if ref:
with open(self.cas_cache.objpath(ref), 'rb') as f:
self.pb2_directory.ParseFromString(f.read())
......