Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • willsalmon/buildstream
  • CumHoleZH/buildstream
  • tchaik/buildstream
  • DCotyPortfolio/buildstream
  • jesusoctavioas/buildstream
  • patrickmmartin/buildstream
  • franred/buildstream
  • tintou/buildstream
  • alatiera/buildstream
  • martinblanchard/buildstream
  • neverdie22042524/buildstream
  • Mattlk13/buildstream
  • PServers/buildstream
  • phamnghia610909/buildstream
  • chiaratolentino/buildstream
  • eysz7-x-x/buildstream
  • kerrick1/buildstream
  • matthew-yates/buildstream
  • twofeathers/buildstream
  • mhadjimichael/buildstream
  • pointswaves/buildstream
  • Mr.JackWilson/buildstream
  • Tw3akG33k/buildstream
  • AlexFazakas/buildstream
  • eruidfkiy/buildstream
  • clamotion2/buildstream
  • nanonyme/buildstream
  • wickyjaaa/buildstream
  • nmanchev/buildstream
  • bojorquez.ja/buildstream
  • mostynb/buildstream
  • highpit74/buildstream
  • Demo112/buildstream
  • ba2014sheer/buildstream
  • tonimadrino/buildstream
  • usuario2o/buildstream
  • Angelika123456/buildstream
  • neo355/buildstream
  • corentin-ferlay/buildstream
  • coldtom/buildstream
  • wifitvbox81/buildstream
  • 358253885/buildstream
  • seanborg/buildstream
  • SotK/buildstream
  • DouglasWinship/buildstream
  • karansthr97/buildstream
  • louib/buildstream
  • bwh-ct/buildstream
  • robjh/buildstream
  • we88c0de/buildstream
  • zhengxian5555/buildstream
51 results
Show changes
Commits on Source (48)
Showing
with 505 additions and 116 deletions
......@@ -145,7 +145,8 @@ docs:
stage: test
script:
- export BST_SOURCE_CACHE="$(pwd)/cache/integration-cache/sources"
- pip3 install sphinx
# Currently sphinx_rtd_theme does not support Sphinx >1.8, this breaks search functionality
- pip3 install sphinx==1.7.9
- pip3 install sphinx-click
- pip3 install sphinx_rtd_theme
- cd dist && ./unpack.sh && cd buildstream
......@@ -161,14 +162,14 @@ docs:
.overnight-tests: &overnight-tests-template
stage: test
variables:
bst_ext_url: git+https://gitlab.com/BuildStream/bst-external.git
bst_ext_ref: 1d6ab71151b93c8cbc0a91a36ffe9270f3b835f1 # 0.5.1
fd_sdk_ref: 88d7c22c2281b987faa02edd57df80d430eecf1f # 18.08.12
BST_EXT_URL: git+https://gitlab.com/BuildStream/bst-external.git
BST_EXT_REF: 1d6ab71151b93c8cbc0a91a36ffe9270f3b835f1 # 0.5.1
FD_SDK_REF: 88d7c22c2281b987faa02edd57df80d430eecf1f # 18.08.11-35-g88d7c22c
before_script:
- (cd dist && ./unpack.sh && cd buildstream && pip3 install .)
- pip3 install --user -e ${bst_ext_url}@${bst_ext_ref}#egg=bst_ext
- pip3 install --user -e ${BST_EXT_URL}@${BST_EXT_REF}#egg=bst_ext
- git clone https://gitlab.com/freedesktop-sdk/freedesktop-sdk.git
- git -C freedesktop-sdk checkout ${fd_sdk_ref}
- git -C freedesktop-sdk checkout ${FD_SDK_REF}
only:
- schedules
......
......@@ -506,7 +506,7 @@ class CASCache(ArtifactCache):
def set_ref(self, ref, tree):
refpath = self._refpath(ref)
os.makedirs(os.path.dirname(refpath), exist_ok=True)
with utils.save_file_atomic(refpath, 'wb') as f:
with utils.save_file_atomic(refpath, 'wb', tempdir=self.tmpdir) as f:
f.write(tree.SerializeToString())
# resolve_ref():
......@@ -1048,10 +1048,29 @@ class CASCache(ArtifactCache):
missing_blobs[d.hash] = d
# Upload any blobs missing on the server
for blob_digest in missing_blobs.values():
with open(self.objpath(blob_digest), 'rb') as f:
assert os.fstat(f.fileno()).st_size == blob_digest.size_bytes
self._send_blob(remote, blob_digest, f, u_uid=u_uid)
self._send_blobs(remote, missing_blobs.values(), u_uid)
def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
batch = _CASBatchUpdate(remote)
for digest in digests:
with open(self.objpath(digest), 'rb') as f:
assert os.fstat(f.fileno()).st_size == digest.size_bytes
if (digest.size_bytes >= remote.max_batch_total_size_bytes or
not remote.batch_update_supported):
# Too large for batch request, upload in independent request.
self._send_blob(remote, digest, f, u_uid=u_uid)
else:
if not batch.add(digest, f):
# Not enough space left in batch request.
# Complete pending batch first.
batch.send()
batch = _CASBatchUpdate(remote)
batch.add(digest, f)
# Send final batch
batch.send()
# Represents a single remote CAS cache.
......@@ -1126,6 +1145,17 @@ class _CASRemote():
if e.code() != grpc.StatusCode.UNIMPLEMENTED:
raise
# Check whether the server supports BatchUpdateBlobs()
self.batch_update_supported = False
try:
request = remote_execution_pb2.BatchUpdateBlobsRequest()
response = self.cas.BatchUpdateBlobs(request)
self.batch_update_supported = True
except grpc.RpcError as e:
if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
e.code() != grpc.StatusCode.PERMISSION_DENIED):
raise
self._initialized = True
......@@ -1173,6 +1203,46 @@ class _CASBatchRead():
yield (response.digest, response.data)
# Represents a batch of blobs queued for upload.
#
class _CASBatchUpdate():
def __init__(self, remote):
self._remote = remote
self._max_total_size_bytes = remote.max_batch_total_size_bytes
self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
self._size = 0
self._sent = False
def add(self, digest, stream):
assert not self._sent
new_batch_size = self._size + digest.size_bytes
if new_batch_size > self._max_total_size_bytes:
# Not enough space left in current batch
return False
blob_request = self._request.requests.add()
blob_request.digest.hash = digest.hash
blob_request.digest.size_bytes = digest.size_bytes
blob_request.data = stream.read(digest.size_bytes)
self._size = new_batch_size
return True
def send(self):
assert not self._sent
self._sent = True
if len(self._request.requests) == 0:
return
batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
for response in batch_response.responses:
if response.status.code != grpc.StatusCode.OK.value[0]:
raise ArtifactError("Failed to upload blob {}: {}".format(
response.digest.hash, response.status.code))
def _grouper(iterable, n):
while True:
try:
......
......@@ -68,7 +68,7 @@ def create_server(repo, *, enable_push):
_ByteStreamServicer(artifactcache, enable_push=enable_push), server)
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
_ContentAddressableStorageServicer(artifactcache), server)
_ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
_CapabilitiesServicer(), server)
......@@ -222,9 +222,10 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
def __init__(self, cas):
def __init__(self, cas, *, enable_push):
super().__init__()
self.cas = cas
self.enable_push = enable_push
def FindMissingBlobs(self, request, context):
response = remote_execution_pb2.FindMissingBlobsResponse()
......@@ -260,6 +261,46 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
return response
def BatchUpdateBlobs(self, request, context):
response = remote_execution_pb2.BatchUpdateBlobsResponse()
if not self.enable_push:
context.set_code(grpc.StatusCode.PERMISSION_DENIED)
return response
batch_size = 0
for blob_request in request.requests:
digest = blob_request.digest
batch_size += digest.size_bytes
if batch_size > _MAX_PAYLOAD_BYTES:
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
return response
blob_response = response.responses.add()
blob_response.digest.hash = digest.hash
blob_response.digest.size_bytes = digest.size_bytes
if len(blob_request.data) != digest.size_bytes:
blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
continue
try:
_clean_up_cache(self.cas, digest.size_bytes)
with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
out.write(blob_request.data)
out.flush()
server_digest = self.cas.add_object(path=out.name)
if server_digest.hash != digest.hash:
blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
except ArtifactTooLargeException:
blob_response.status.code = grpc.StatusCode.RESOURCE_EXHAUSTED
return response
class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
def GetCapabilities(self, request, context):
......
......@@ -34,6 +34,9 @@ class Darwin(Platform):
super().__init__()
def create_sandbox(self, *args, **kwargs):
kwargs['dummy_reason'] = \
"OSXFUSE is not supported and there are no supported sandbox" + \
"technologies for OSX at this time"
return SandboxDummy(*args, **kwargs)
def check_sandbox_config(self, config):
......@@ -41,10 +44,11 @@ class Darwin(Platform):
return True
def get_cpu_count(self, cap=None):
if cap < os.cpu_count():
return cap
cpu_count = os.cpu_count()
if cap is None:
return cpu_count
else:
return os.cpu_count()
return min(cpu_count, cap)
def set_resource_limits(self, soft_limit=OPEN_MAX, hard_limit=None):
super().set_resource_limits(soft_limit)
......@@ -37,24 +37,30 @@ class Linux(Platform):
self._uid = os.geteuid()
self._gid = os.getegid()
self._have_fuse = os.path.exists("/dev/fuse")
self._bwrap_exists = _site.check_bwrap_version(0, 0, 0)
self._have_good_bwrap = _site.check_bwrap_version(0, 1, 2)
self._local_sandbox_available = self._have_fuse and self._have_good_bwrap
self._die_with_parent_available = _site.check_bwrap_version(0, 1, 8)
if self._local_sandbox_available():
if self._local_sandbox_available:
self._user_ns_available = self._check_user_ns_available()
else:
self._user_ns_available = False
def create_sandbox(self, *args, **kwargs):
if not self._local_sandbox_available():
return SandboxDummy(*args, **kwargs)
if not self._local_sandbox_available:
return self._create_dummy_sandbox(*args, **kwargs)
else:
from ..sandbox._sandboxbwrap import SandboxBwrap
# Inform the bubblewrap sandbox as to whether it can use user namespaces or not
kwargs['user_ns_available'] = self._user_ns_available
kwargs['die_with_parent_available'] = self._die_with_parent_available
return SandboxBwrap(*args, **kwargs)
return self._create_bwrap_sandbox(*args, **kwargs)
def check_sandbox_config(self, config):
if not self._local_sandbox_available:
# Accept all sandbox configs as it's irrelevant with the dummy sandbox (no Sandbox.run).
return True
if self._user_ns_available:
# User namespace support allows arbitrary build UID/GID settings.
return True
......@@ -66,11 +72,26 @@ class Linux(Platform):
################################################
# Private Methods #
################################################
def _local_sandbox_available(self):
try:
return os.path.exists(utils.get_host_tool('bwrap')) and os.path.exists('/dev/fuse')
except utils.ProgramNotFoundError:
return False
def _create_dummy_sandbox(self, *args, **kwargs):
reasons = []
if not self._have_fuse:
reasons.append("FUSE is unavailable")
if not self._have_good_bwrap:
if self._bwrap_exists:
reasons.append("`bwrap` is too old (bst needs at least 0.1.2)")
else:
reasons.append("`bwrap` executable not found")
kwargs['dummy_reason'] = " and ".join(reasons)
return SandboxDummy(*args, **kwargs)
def _create_bwrap_sandbox(self, *args, **kwargs):
from ..sandbox._sandboxbwrap import SandboxBwrap
# Inform the bubblewrap sandbox as to whether it can use user namespaces or not
kwargs['user_ns_available'] = self._user_ns_available
kwargs['die_with_parent_available'] = self._die_with_parent_available
return SandboxBwrap(*args, **kwargs)
def _check_user_ns_available(self):
# Here, lets check if bwrap is able to create user namespaces,
......
......@@ -67,7 +67,11 @@ class Platform():
return cls._instance
def get_cpu_count(self, cap=None):
return min(len(os.sched_getaffinity(0)), cap)
cpu_count = len(os.sched_getaffinity(0))
if cap is None:
return cpu_count
else:
return min(cpu_count, cap)
##################################################################
# Sandbox functions #
......
......@@ -119,6 +119,8 @@ class Job():
self._result = None # Return value of child action in the parent
self._tries = 0 # Try count, for retryable jobs
self._skipped_flag = False # Indicate whether the job was skipped.
self._terminated = False # Whether this job has been explicitly terminated
# If False, a retry will not be attempted regardless of whether _tries is less than _max_retries.
#
self._retry_flag = True
......@@ -190,6 +192,8 @@ class Job():
# Terminate the process using multiprocessing API pathway
self._process.terminate()
self._terminated = True
# terminate_wait()
#
# Wait for terminated jobs to complete
......@@ -273,18 +277,22 @@ class Job():
# running the integration commands).
#
# Args:
# (int): The plugin identifier for this task
# task_id (int): The plugin identifier for this task
#
def set_task_id(self, task_id):
self._task_id = task_id
# skipped
#
# This will evaluate to True if the job was skipped
# during processing, or if it was forcefully terminated.
#
# Returns:
# bool: True if the job was skipped while processing.
# (bool): Whether the job should appear as skipped
#
@property
def skipped(self):
return self._skipped_flag
return self._skipped_flag or self._terminated
#######################################################
# Abstract Methods #
......
......@@ -326,16 +326,20 @@ class Queue():
detail=traceback.format_exc())
self.failed_elements.append(element)
else:
# No exception occured, handle the success/failure state in the normal way
#
# No exception occured in post processing
#
# All jobs get placed on the done queue for later processing.
self._done_queue.append(job)
if success:
if not job.skipped:
self.processed_elements.append(element)
else:
self.skipped_elements.append(element)
# A Job can be skipped whether or not it has failed,
# we want to only bookkeep them as processed or failed
# if they are not skipped.
if job.skipped:
self.skipped_elements.append(element)
elif success:
self.processed_elements.append(element)
else:
self.failed_elements.append(element)
......
......@@ -387,6 +387,15 @@ class Scheduler():
# A loop registered event callback for keyboard interrupts
#
def _interrupt_event(self):
# FIXME: This should not be needed, but for some reason we receive an
# additional SIGINT event when the user hits ^C a second time
# to inform us that they really intend to terminate; even though
# we have disconnected our handlers at this time.
#
if self.terminated:
return
# Leave this to the frontend to decide, if no
# interrrupt callback was specified, then just terminate.
if self._interrupt_callback:
......
......@@ -78,18 +78,12 @@ def check_bwrap_version(major, minor, patch):
if not bwrap_path:
return False
cmd = [bwrap_path, "--version"]
version = str(subprocess.check_output(cmd).split()[1], "utf-8")
try:
version = str(subprocess.check_output(cmd).split()[1], "utf-8")
except subprocess.CalledProcessError:
# Failure trying to run bubblewrap
return False
_bwrap_major, _bwrap_minor, _bwrap_patch = map(int, version.split("."))
# Check whether the installed version meets the requirements
if _bwrap_major > major:
return True
elif _bwrap_major < major:
return False
else:
if _bwrap_minor > minor:
return True
elif _bwrap_minor < minor:
return False
else:
return _bwrap_patch >= patch
return (_bwrap_major, _bwrap_minor, _bwrap_patch) >= (major, minor, patch)
......@@ -186,7 +186,9 @@ class BuildElement(Element):
with self.timed_activity("Running {}".format(command_name)):
for cmd in commands:
self.__run_command(sandbox, cmd, command_name)
self.__queue_command(sandbox, cmd, command_name)
sandbox.run_queue(SandboxFlags.ROOT_READ_ONLY)
# %{install-root}/%{build-root} should normally not be written
# to - if an element later attempts to stage to a location
......@@ -210,7 +212,7 @@ class BuildElement(Element):
if commands:
with self.timed_activity("Running configure-commands"):
for cmd in commands:
self.__run_command(sandbox, cmd, 'configure-commands')
self.__queue_command(sandbox, cmd, 'configure-commands')
def generate_script(self):
script = ""
......@@ -235,14 +237,18 @@ class BuildElement(Element):
return commands
def __run_command(self, sandbox, cmd, cmd_name):
self.status("Running {}".format(cmd_name), detail=cmd)
def __queue_command(self, sandbox, cmd, cmd_name):
def start_cb():
self.status("Running {}".format(cmd_name), detail=cmd)
def complete_cb(exitcode):
if exitcode != 0:
raise ElementError("Command '{}' failed with exitcode {}".format(cmd, exitcode),
collect=self.get_variable('install-root'))
# Note the -e switch to 'sh' means to exit with an error
# if any untested command fails.
#
exitcode = sandbox.run(['sh', '-c', '-e', cmd + '\n'],
SandboxFlags.ROOT_READ_ONLY)
if exitcode != 0:
raise ElementError("Command '{}' failed with exitcode {}".format(cmd, exitcode),
collect=self.get_variable('install-root'))
sandbox.queue(['sh', '-c', '-e', cmd + '\n'],
start_callback=start_cb,
complete_callback=complete_cb)
......@@ -212,7 +212,7 @@ class Element(Plugin):
self.__staged_sources_directory = None # Location where Element.stage_sources() was called
self.__tainted = None # Whether the artifact is tainted and should not be shared
self.__required = False # Whether the artifact is required in the current session
self.__build_result = None # The result of assembling this Element
self.__build_result = None # The result of assembling this Element (success, description, detail)
self._build_log_path = None # The path of the build log for this Element
# hash tables of loaded artifact metadata, hashed by key
......@@ -767,6 +767,8 @@ class Element(Plugin):
bstdata = self.get_public_data('bst')
environment = self.get_environment()
# TODO support command batching
if bstdata is not None:
commands = self.node_get_member(bstdata, list, 'integration-commands', [])
for i in range(len(commands)):
......@@ -1379,10 +1381,10 @@ class Element(Plugin):
if not vdirectory.is_empty():
raise ElementError("Staging directory '{}' is not empty".format(vdirectory))
# While mkdtemp is advertised as using the TMP environment variable, it
# doesn't, so this explicit extraction is necesasry.
tmp_prefix = os.environ.get("TMP", None)
temp_staging_directory = tempfile.mkdtemp(prefix=tmp_prefix)
# It's advantageous to have this temporary directory on
# the same filing system as the rest of our cache.
temp_staging_location = os.path.join(self._get_context().artifactdir, "staging_temp")
temp_staging_directory = tempfile.mkdtemp(prefix=temp_staging_location)
try:
workspace = self._get_workspace()
......@@ -1479,11 +1481,13 @@ class Element(Plugin):
self._update_state()
if self._get_workspace() and self._cached():
if self._get_workspace() and self._cached_success():
assert utils._is_main_process(), \
"Attempted to save workspace configuration from child process"
#
# Note that this block can only happen in the
# main process, since `self._cached()` cannot
# be true when assembly is completed in the task.
# main process, since `self._cached_success()` cannot
# be true when assembly is successful in the task.
#
# For this reason, it is safe to update and
# save the workspaces configuration
......@@ -2083,7 +2087,15 @@ class Element(Plugin):
self.prepare(sandbox)
if workspace:
workspace.prepared = True
def mark_workspace_prepared():
workspace.prepared = True
if sandbox._has_commands_queued():
# Defer workspace.prepared setting until the queued
# prepare commands have been executed.
sandbox.queue(None, start_callback=mark_workspace_prepared)
else:
mark_workspace_prepared()
def __is_cached(self, keystrength):
if keystrength is None:
......
......@@ -184,10 +184,18 @@ class GitMirror(SourceFetcher):
cwd=self.mirror)
def fetch(self, alias_override=None):
self.ensure(alias_override)
if not self.has_ref():
self._fetch(alias_override)
self.assert_ref()
# Resolve the URL for the message
resolved_url = self.source.translate_url(self.url,
alias_override=alias_override,
primary=self.primary)
with self.source.timed_activity("Fetching from {}"
.format(resolved_url),
silent_nested=True):
self.ensure(alias_override)
if not self.has_ref():
self._fetch(alias_override)
self.assert_ref()
def has_ref(self):
if not self.ref:
......
......@@ -23,6 +23,7 @@ from . import Sandbox
class SandboxDummy(Sandbox):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._reason = kwargs.get("dummy_reason", "no reason given")
def run(self, command, flags, *, cwd=None, env=None):
......@@ -37,4 +38,4 @@ class SandboxDummy(Sandbox):
"'{}'".format(command[0]),
reason='missing-command')
raise SandboxError("This platform does not support local builds")
raise SandboxError("This platform does not support local builds: {}".format(self._reason))
......@@ -19,6 +19,7 @@
# Jim MacArthur <jim.macarthur@codethink.co.uk>
import os
import shlex
from urllib.parse import urlparse
import grpc
......@@ -160,6 +161,8 @@ class SandboxRemote(Sandbox):
self._set_virtual_directory(new_dir)
def run(self, command, flags, *, cwd=None, env=None):
stdout, stderr = self._get_output()
# Upload sources
upload_vdir = self.get_virtual_directory()
......@@ -177,15 +180,11 @@ class SandboxRemote(Sandbox):
if not cascache.verify_digest_pushed(self._get_project(), upload_vdir.ref):
raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
# Set up environment and working directory
if cwd is None:
cwd = self._get_work_directory()
if cwd is None:
cwd = '/'
if env is None:
env = self._get_environment()
# Fallback to the sandbox default settings for
# the cwd and env.
#
cwd = self._get_work_directory(cwd=cwd)
env = self._get_environment(cwd=cwd, env=env)
# We want command args as a list of strings
if isinstance(command, str):
......@@ -217,13 +216,54 @@ class SandboxRemote(Sandbox):
action_result = execution_response.result
if stdout:
if action_result.stdout_raw:
stdout.write(str(action_result.stdout_raw, 'utf-8', errors='ignore'))
if stderr:
if action_result.stderr_raw:
stderr.write(str(action_result.stderr_raw, 'utf-8', errors='ignore'))
if action_result.exit_code != 0:
# A normal error during the build: the remote execution system
# has worked correctly but the command failed.
# action_result.stdout and action_result.stderr also contains
# build command outputs which we ignore at the moment.
return action_result.exit_code
self.process_job_output(action_result.output_directories, action_result.output_files)
return 0
def run_queue(self, flags, *, cwd=None, env=None):
queue = self._queue
self._queue = []
script = ""
i = 0
for entry in queue:
if entry.command:
cmdline = ' '.join(shlex.quote(cmd) for cmd in entry.command)
script += "({})\n".format(cmdline)
script += "RETVAL=$?\n"
script += "if [ $RETVAL -ne 0 ] ; then\n"
# Report failing command and exit code to stderr (and then back to client)
script += " echo -e '\nbst-command-failure:' {} $RETVAL >&2\n".format(i)
script += " exit 1\n"
script += "fi\n"
i += 1
exit_code = self.run(['sh', '-c', script], flags, cwd=cwd, env=env)
if exit_code != 0:
# TODO get failed command and exit code from stderr
failed_command = 0
command_exit_code = 1
i = 0
for entry in queue:
entry.start_callback()
if exit_code == 0 or i < failed_command:
# Command succeeded
entry.complete_callback(0)
else:
# Command failed
entry.complete_callback(command_exit_code)
break
......@@ -29,6 +29,8 @@ See also: :ref:`sandboxing`.
"""
import os
from collections import namedtuple
from .._exceptions import ImplError, BstError
from ..storage._filebaseddirectory import FileBasedDirectory
from ..storage._casbaseddirectory import CasBasedDirectory
......@@ -114,6 +116,9 @@ class Sandbox():
# directory via get_directory.
self._never_cache_vdirs = False
# Queued commands
self._queue = []
def get_directory(self):
"""Fetches the sandbox root directory
......@@ -228,6 +233,56 @@ class Sandbox():
raise ImplError("Sandbox of type '{}' does not implement run()"
.format(type(self).__name__))
def queue(self, command, *, start_callback=None, complete_callback=None):
"""Queue a command to be run in the sandbox.
If the command fails, commands queued later will not be executed.
The callbacks are not guaranteed to be invoked in real time.
Args:
command (list): The command to run in the sandboxed environment, as a list
of strings starting with the binary to run.
start_callback (callable): Called when the command starts.
complete_callback (callble): Called when the command completes
with the exit code as argument.
"""
entry = namedtuple('QueueEntry', ['command', 'start_callback', 'complete_callback'])
entry.command = command
entry.start_callback = start_callback
entry.complete_callback = complete_callback
self._queue.append(entry)
def run_queue(self, flags, *, cwd=None, env=None):
"""Run a command in the sandbox.
Args:
flags (:class:`.SandboxFlags`): The flags for running this command.
cwd (str): The sandbox relative working directory in which to run the command.
env (dict): A dictionary of string key, value pairs to set as environment
variables inside the sandbox environment.
Raises:
(:class:`.ProgramNotFoundError`): If a host tool which the given sandbox
implementation requires is not found.
.. note::
The optional *cwd* argument will default to the value set with
:func:`~buildstream.sandbox.Sandbox.set_work_directory`
"""
queue = self._queue
self._queue = []
for entry in queue:
if entry.start_callback:
entry.start_callback()
if entry.command:
exit_code = self.run(entry.command, flags, cwd=cwd, env=env)
if entry.complete_callback:
entry.complete_callback(exit_code)
if exit_code != 0:
break
################################################
# Private methods #
################################################
......@@ -375,3 +430,12 @@ class Sandbox():
return True
return False
# _has_commands_queued()
#
# Returns whether the sandbox has a non-empty queue of pending commands.
#
# Returns:
# (bool): Whether the command queue is non-empty.
def _has_commands_queued(self):
return len(self._queue) > 0
......@@ -965,28 +965,48 @@ class Source(Plugin):
# Tries to call fetch for every mirror, stopping once it succeeds
def __do_fetch(self, **kwargs):
project = self._get_project()
source_fetchers = self.get_source_fetchers()
context = self._get_context()
# Silence the STATUS messages which might happen as a result
# of checking the source fetchers.
with context.silence():
source_fetchers = self.get_source_fetchers()
# Use the source fetchers if they are provided
#
if source_fetchers:
for fetcher in source_fetchers:
alias = fetcher._get_alias()
for uri in project.get_alias_uris(alias, first_pass=self.__first_pass):
try:
fetcher.fetch(uri)
# FIXME: Need to consider temporary vs. permanent failures,
# and how this works with retries.
except BstError as e:
last_error = e
continue
# No error, we're done with this fetcher
break
else:
# No break occurred, raise the last detected error
raise last_error
# Use a contorted loop here, this is to allow us to
# silence the messages which can result from consuming
# the items of source_fetchers, if it happens to be a generator.
#
source_fetchers = iter(source_fetchers)
try:
while True:
with context.silence():
fetcher = next(source_fetchers)
alias = fetcher._get_alias()
for uri in project.get_alias_uris(alias, first_pass=self.__first_pass):
try:
fetcher.fetch(uri)
# FIXME: Need to consider temporary vs. permanent failures,
# and how this works with retries.
except BstError as e:
last_error = e
continue
# No error, we're done with this fetcher
break
else:
# No break occurred, raise the last detected error
raise last_error
except StopIteration:
pass
# Default codepath is to reinstantiate the Source
#
......
......@@ -502,7 +502,7 @@ def get_bst_version():
@contextmanager
def save_file_atomic(filename, mode='w', *, buffering=-1, encoding=None,
errors=None, newline=None, closefd=True, opener=None):
errors=None, newline=None, closefd=True, opener=None, tempdir=None):
"""Save a file with a temporary name and rename it into place when ready.
This is a context manager which is meant for saving data to files.
......@@ -529,8 +529,9 @@ def save_file_atomic(filename, mode='w', *, buffering=-1, encoding=None,
# https://bugs.python.org/issue8604
assert os.path.isabs(filename), "The utils.save_file_atomic() parameter ``filename`` must be an absolute path"
dirname = os.path.dirname(filename)
fd, tempname = tempfile.mkstemp(dir=dirname)
if tempdir is None:
tempdir = os.path.dirname(filename)
fd, tempname = tempfile.mkstemp(dir=tempdir)
os.close(fd)
f = open(tempname, mode=mode, buffering=buffering, encoding=encoding,
......@@ -562,6 +563,9 @@ def save_file_atomic(filename, mode='w', *, buffering=-1, encoding=None,
#
# Get the disk usage of a given directory in bytes.
#
# This function assumes that files do not inadvertantly
# disappear while this function is running.
#
# Arguments:
# (str) The path whose size to check.
#
......@@ -682,7 +686,7 @@ def _force_rmtree(rootpath, **kwargs):
try:
shutil.rmtree(rootpath, **kwargs)
except shutil.Error as e:
except OSError as e:
raise UtilError("Failed to remove cache directory '{}': {}"
.format(rootpath, e))
......
......@@ -54,12 +54,13 @@ REQUIRED_BWRAP_MINOR = 1
REQUIRED_BWRAP_PATCH = 2
def exit_bwrap(reason):
def warn_bwrap(reason):
print(reason +
"\nBuildStream requires Bubblewrap (bwrap) for"
" sandboxing the build environment. Install it using your package manager"
" (usually bwrap or bubblewrap)")
sys.exit(1)
"\nBuildStream requires Bubblewrap (bwrap {}.{}.{} or better),"
" during local builds, for"
" sandboxing the build environment.\nInstall it using your package manager"
" (usually bwrap or bubblewrap) otherwise you will be limited to"
" remote builds only.".format(REQUIRED_BWRAP_MAJOR, REQUIRED_BWRAP_MINOR, REQUIRED_BWRAP_PATCH))
def bwrap_too_old(major, minor, patch):
......@@ -76,18 +77,19 @@ def bwrap_too_old(major, minor, patch):
return False
def assert_bwrap():
def check_for_bwrap():
platform = os.environ.get('BST_FORCE_BACKEND', '') or sys.platform
if platform.startswith('linux'):
bwrap_path = shutil.which('bwrap')
if not bwrap_path:
exit_bwrap("Bubblewrap not found")
warn_bwrap("Bubblewrap not found")
return
version_bytes = subprocess.check_output([bwrap_path, "--version"]).split()[1]
version_string = str(version_bytes, "utf-8")
major, minor, patch = map(int, version_string.split("."))
if bwrap_too_old(major, minor, patch):
exit_bwrap("Bubblewrap too old")
warn_bwrap("Bubblewrap too old")
###########################################
......@@ -126,7 +128,7 @@ bst_install_entry_points = {
}
if not os.environ.get('BST_ARTIFACTS_ONLY', ''):
assert_bwrap()
check_for_bwrap()
bst_install_entry_points['console_scripts'] += [
'bst = buildstream._frontend:cli'
]
......
......@@ -139,6 +139,82 @@ def test_mirror_fetch(cli, tmpdir, datafiles, kind):
result.assert_success()
@pytest.mark.datafiles(DATA_DIR)
@pytest.mark.parametrize("ref_storage", [("inline"), ("project.refs")])
@pytest.mark.parametrize("mirror", [("no-mirror"), ("mirror"), ("unrelated-mirror")])
def test_mirror_fetch_ref_storage(cli, tmpdir, datafiles, ref_storage, mirror):
bin_files_path = os.path.join(str(datafiles), 'files', 'bin-files', 'usr')
dev_files_path = os.path.join(str(datafiles), 'files', 'dev-files', 'usr')
upstream_repodir = os.path.join(str(tmpdir), 'upstream')
mirror_repodir = os.path.join(str(tmpdir), 'mirror')
project_dir = os.path.join(str(tmpdir), 'project')
os.makedirs(project_dir)
element_dir = os.path.join(project_dir, 'elements')
# Create repo objects of the upstream and mirror
upstream_repo = create_repo('tar', upstream_repodir)
upstream_ref = upstream_repo.create(bin_files_path)
mirror_repo = upstream_repo.copy(mirror_repodir)
mirror_ref = upstream_ref
upstream_ref = upstream_repo.create(dev_files_path)
element = {
'kind': 'import',
'sources': [
upstream_repo.source_config(ref=upstream_ref if ref_storage == 'inline' else None)
]
}
element_name = 'test.bst'
element_path = os.path.join(element_dir, element_name)
full_repo = element['sources'][0]['url']
upstream_map, repo_name = os.path.split(full_repo)
alias = 'foo'
aliased_repo = alias + ':' + repo_name
element['sources'][0]['url'] = aliased_repo
full_mirror = mirror_repo.source_config()['url']
mirror_map, _ = os.path.split(full_mirror)
os.makedirs(element_dir)
_yaml.dump(element, element_path)
if ref_storage == 'project.refs':
# Manually set project.refs to avoid caching the repo prematurely
project_refs = {'projects': {
'test': {
element_name: [
{'ref': upstream_ref}
]
}
}}
project_refs_path = os.path.join(project_dir, 'project.refs')
_yaml.dump(project_refs, project_refs_path)
project = {
'name': 'test',
'element-path': 'elements',
'aliases': {
alias: upstream_map + "/"
},
'ref-storage': ref_storage
}
if mirror != 'no-mirror':
mirror_data = [{
'name': 'middle-earth',
'aliases': {alias: [mirror_map + '/']}
}]
if mirror == 'unrelated-mirror':
mirror_data.insert(0, {
'name': 'narnia',
'aliases': {'frob': ['http://www.example.com/repo']}
})
project['mirrors'] = mirror_data
project_file = os.path.join(project_dir, 'project.conf')
_yaml.dump(project, project_file)
result = cli.run(project=project_dir, args=['fetch', element_name])
result.assert_success()
@pytest.mark.datafiles(DATA_DIR)
@pytest.mark.parametrize("kind", [(kind) for kind in ALL_REPO_KINDS])
def test_mirror_fetch_upstream_absent(cli, tmpdir, datafiles, kind):
......