Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • willsalmon/buildstream
  • CumHoleZH/buildstream
  • tchaik/buildstream
  • DCotyPortfolio/buildstream
  • jesusoctavioas/buildstream
  • patrickmmartin/buildstream
  • franred/buildstream
  • tintou/buildstream
  • alatiera/buildstream
  • martinblanchard/buildstream
  • neverdie22042524/buildstream
  • Mattlk13/buildstream
  • PServers/buildstream
  • phamnghia610909/buildstream
  • chiaratolentino/buildstream
  • eysz7-x-x/buildstream
  • kerrick1/buildstream
  • matthew-yates/buildstream
  • twofeathers/buildstream
  • mhadjimichael/buildstream
  • pointswaves/buildstream
  • Mr.JackWilson/buildstream
  • Tw3akG33k/buildstream
  • AlexFazakas/buildstream
  • eruidfkiy/buildstream
  • clamotion2/buildstream
  • nanonyme/buildstream
  • wickyjaaa/buildstream
  • nmanchev/buildstream
  • bojorquez.ja/buildstream
  • mostynb/buildstream
  • highpit74/buildstream
  • Demo112/buildstream
  • ba2014sheer/buildstream
  • tonimadrino/buildstream
  • usuario2o/buildstream
  • Angelika123456/buildstream
  • neo355/buildstream
  • corentin-ferlay/buildstream
  • coldtom/buildstream
  • wifitvbox81/buildstream
  • 358253885/buildstream
  • seanborg/buildstream
  • SotK/buildstream
  • DouglasWinship/buildstream
  • karansthr97/buildstream
  • louib/buildstream
  • bwh-ct/buildstream
  • robjh/buildstream
  • we88c0de/buildstream
  • zhengxian5555/buildstream
51 results
Show changes
Commits on Source (14)
Showing
with 321 additions and 195 deletions
...@@ -38,7 +38,7 @@ from .._message import Message, MessageType, unconditional_messages ...@@ -38,7 +38,7 @@ from .._message import Message, MessageType, unconditional_messages
from .._stream import Stream from .._stream import Stream
from .._versions import BST_FORMAT_VERSION from .._versions import BST_FORMAT_VERSION
from .. import _yaml from .. import _yaml
from .._scheduler import ElementJob from .._scheduler import ElementJob, JobStatus
# Import frontend assets # Import frontend assets
from . import Profile, LogLine, Status from . import Profile, LogLine, Status
...@@ -515,13 +515,13 @@ class App(): ...@@ -515,13 +515,13 @@ class App():
self._status.add_job(job) self._status.add_job(job)
self._maybe_render_status() self._maybe_render_status()
def _job_completed(self, job, success): def _job_completed(self, job, status):
self._status.remove_job(job) self._status.remove_job(job)
self._maybe_render_status() self._maybe_render_status()
# Dont attempt to handle a failure if the user has already opted to # Dont attempt to handle a failure if the user has already opted to
# terminate # terminate
if not success and not self.stream.terminated: if status == JobStatus.FAIL and not self.stream.terminated:
if isinstance(job, ElementJob): if isinstance(job, ElementJob):
element = job.element element = job.element
......
...@@ -355,78 +355,6 @@ def build(app, elements, all_, track_, track_save, track_all, track_except, trac ...@@ -355,78 +355,6 @@ def build(app, elements, all_, track_, track_save, track_all, track_except, trac
build_all=all_) build_all=all_)
##################################################################
# Pull Command #
##################################################################
@cli.command(short_help="Pull a built artifact")
@click.option('--deps', '-d', default='none',
type=click.Choice(['none', 'all']),
help='The dependency artifacts to pull (default: none)')
@click.option('--remote', '-r',
help="The URL of the remote cache (defaults to the first configured cache)")
@click.argument('elements', nargs=-1,
type=click.Path(readable=False))
@click.pass_obj
def pull(app, elements, deps, remote):
"""Pull a built artifact from the configured remote artifact cache.
By default the artifact will be pulled one of the configured caches
if possible, following the usual priority order. If the `--remote` flag
is given, only the specified cache will be queried.
Specify `--deps` to control which artifacts to pull:
\b
none: No dependencies, just the element itself
all: All dependencies
"""
with app.initialized(session_name="Pull"):
if not elements:
guessed_target = app.context.guess_element()
if guessed_target:
elements = (guessed_target,)
app.stream.pull(elements, selection=deps, remote=remote)
##################################################################
# Push Command #
##################################################################
@cli.command(short_help="Push a built artifact")
@click.option('--deps', '-d', default='none',
type=click.Choice(['none', 'all']),
help='The dependencies to push (default: none)')
@click.option('--remote', '-r', default=None,
help="The URL of the remote cache (defaults to the first configured cache)")
@click.argument('elements', nargs=-1,
type=click.Path(readable=False))
@click.pass_obj
def push(app, elements, deps, remote):
"""Push a built artifact to a remote artifact cache.
The default destination is the highest priority configured cache. You can
override this by passing a different cache URL with the `--remote` flag.
If bst has been configured to include build trees on artifact pulls,
an attempt will be made to pull any required build trees to avoid the
skipping of partial artifacts being pushed.
Specify `--deps` to control which artifacts to push:
\b
none: No dependencies, just the element itself
all: All dependencies
"""
with app.initialized(session_name="Push"):
if not elements:
guessed_target = app.context.guess_element()
if guessed_target:
elements = (guessed_target,)
app.stream.push(elements, selection=deps, remote=remote)
################################################################## ##################################################################
# Show Command # # Show Command #
################################################################## ##################################################################
...@@ -973,36 +901,48 @@ def workspace_list(app): ...@@ -973,36 +901,48 @@ def workspace_list(app):
############################################################# #############################################################
# Artifact Commands # # Artifact Commands #
############################################################# #############################################################
def _classify_artifacts(names, cas, project_directory): def _classify_element_targets(names, project_directory):
element_targets = [] globs = []
artifact_refs = [] targets = []
element_globs = [] unmatched = []
artifact_globs = []
for name in names: for name in names:
if name.endswith('.bst'): if name.endswith('.bst'):
if any(c in "*?[" for c in name): if any(c in "*?[" for c in name):
element_globs.append(name) globs.append(name)
else: else:
element_targets.append(name) targets.append(name)
else: else:
if any(c in "*?[" for c in name): unmatched.append(name)
artifact_globs.append(name)
else:
artifact_refs.append(name)
if element_globs: if globs:
for dirpath, _, filenames in os.walk(project_directory): for dirpath, _, filenames in os.walk(project_directory):
for filename in filenames: for filename in filenames:
element_path = os.path.join(dirpath, filename).lstrip(project_directory).lstrip('/') element_path = os.path.relpath(os.path.join(dirpath, filename), start=project_directory)
if any(fnmatch(element_path, glob) for glob in element_globs): if any(fnmatch(element_path, glob) for glob in globs):
element_targets.append(element_path) targets.append(element_path)
return targets, unmatched
def _classify_artifact_refs(names, cas):
globs = []
refs = []
for name in names:
if any(c in "*?[" for c in name):
globs.append(name)
else:
refs.append(name)
if globs:
refs.extend(ref for ref in cas.list_refs()
if any(fnmatch(ref, glob) for glob in globs))
return refs
if artifact_globs: def _classify_artifacts(names, cas, project_directory):
artifact_refs.extend(ref for ref in cas.list_refs() targets, unmatched = _classify_element_targets(names, project_directory)
if any(fnmatch(ref, glob) for glob in artifact_globs)) refs = _classify_artifact_refs(unmatched, cas)
return targets, refs
return element_targets, artifact_refs
@cli.group(short_help="Manipulate cached artifacts") @cli.group(short_help="Manipulate cached artifacts")
...@@ -1010,6 +950,109 @@ def artifact(): ...@@ -1010,6 +950,109 @@ def artifact():
"""Manipulate cached artifacts""" """Manipulate cached artifacts"""
################################################################
# Artifact Pull Command #
################################################################
@artifact.command(name="pull", short_help="Pull a built artifact")
@click.option('--deps', '-d', default='none',
type=click.Choice(['none', 'all']),
help='The dependency artifacts to pull (default: none)')
@click.option('--remote', '-r',
help="The URL of the remote cache (defaults to the first configured cache)")
@click.argument('artifacts', type=click.Path(), nargs=-1)
@click.pass_obj
def artifact_pull(app, artifacts, deps, remote):
"""Pull a built artifact from the configured remote artifact cache.
By default the artifact will be pulled one of the configured caches
if possible, following the usual priority order. If the `--remote` flag
is given, only the specified cache will be queried.
Specify `--deps` to control which artifacts to pull:
\b
none: No dependencies, just the element itself
all: All dependencies
"""
with app.initialized(session_name="Pull"):
cache = app.context.artifactcache
elements, artifacts = _classify_artifacts(artifacts, cache.cas,
app.project.directory)
# Guess the element if we're in a workspace
if not elements and not artifacts:
guessed_target = app.context.guess_element()
if guessed_target:
elements = (guessed_target,)
if artifacts and deps is not 'none':
raise AppError("--deps may not be used with artifact refs") # NOTE: I *think* we're good for multiple artifacts and --deps.
if elements:
app.stream.pull(elements, selection=deps, remote=remote)
# FIXME: We can only obtain project/user config through the stream API,
# which we need to determine the remote in order for pull to pull from.
# We can't just go straight to artifactcache here. Thus Stream.Pull()
# will fail because it expects a list of element names (.bst).
if artifacts:
app.stream.pull(artifacts, selection='none', remote=remote)
##################################################################
# Artifact Push Command #
##################################################################
@artifact.command(name="push", short_help="Push a built artifact")
@click.option('--deps', '-d', default='none',
type=click.Choice(['none', 'all']),
help='The dependencies to push (default: none)')
@click.option('--remote', '-r', default=None,
help="The URL of the remote cache (defaults to the first configured cache)")
@click.argument('artifacts', type=click.Path(), nargs=-1)
@click.pass_obj
def artifact_push(app, artifacts, deps, remote):
"""Push a built artifact to a remote artifact cache.
The default destination is the highest priority configured cache. You can
override this by passing a different cache URL with the `--remote` flag.
If bst has been configured to include build trees on artifact pulls,
an attempt will be made to pull any required build trees to avoid the
skipping of partial artifacts being pushed.
Specify `--deps` to control which artifacts to push:
\b
none: No dependencies, just the element itself
all: All dependencies
"""
with app.initialized(session_name="Push"):
cache = app.context.artifactcache
elements, artifacts = _classify_artifacts(artifacts, cache.cas,
app.project.directory)
# Guess the element if we're in a workspace
if not elements:
guessed_target = app.context.guess_element()
if guessed_target:
elements = (guessed_target,)
if artifacts and deps is not 'none':
raise AppError("--deps may not be used with artifact refs")
if elements:
app.stream.push(elements, selection=deps, remote=remote)
# FIXME: We can only obtain project/user config through the stream API,
# which we need to determine the remote in order for pull to pull from.
# We can't just go straight to artifactcache here. Thus Stream.Pull()
# will fail because it expects a list of element names (.bst).
if artifacts:
app.stream.push(artifacts, selection='none', remote=remote)
################################################################ ################################################################
# Artifact Log Command # # Artifact Log Command #
################################################################ ################################################################
...@@ -1116,3 +1159,37 @@ def fetch(app, elements, deps, track_, except_, track_cross_junctions): ...@@ -1116,3 +1159,37 @@ def fetch(app, elements, deps, track_, except_, track_cross_junctions):
def track(app, elements, deps, except_, cross_junctions): def track(app, elements, deps, except_, cross_junctions):
click.echo("This command is now obsolete. Use `bst source track` instead.", err=True) click.echo("This command is now obsolete. Use `bst source track` instead.", err=True)
sys.exit(1) sys.exit(1)
################################################################
# Pull Command #
################################################################
@cli.command(short_help="Pull a built artifact", hidden=True)
@click.option('--deps', '-d', default='none',
type=click.Choice(['none', 'all']),
help='The dependency artifacts to pull (default: none)')
@click.option('--remote', '-r',
help="The URL of the remote cache (defaults to the first configured cache)")
@click.argument('elements', nargs=-1,
type=click.Path(readable=False))
@click.pass_obj
def pull(app, elements, deps, remote):
click.echo("This command is now obsolete. Use `bst artifact pull` instead.", err=True)
sys.exit(1)
##################################################################
# Push Command #
##################################################################
@cli.command(short_help="Push a built artifact", hidden=True)
@click.option('--deps', '-d', default='none',
type=click.Choice(['none', 'all']),
help='The dependencies to push (default: none)')
@click.option('--remote', '-r', default=None,
help="The URL of the remote cache (defaults to the first configured cache)")
@click.argument('elements', nargs=-1,
type=click.Path(readable=False))
@click.pass_obj
def push(app, elements, deps, remote):
click.echo("This command is now obsolete. Use `bst artifact push` instead.", err=True)
sys.exit(1)
...@@ -26,4 +26,4 @@ from .queues.pushqueue import PushQueue ...@@ -26,4 +26,4 @@ from .queues.pushqueue import PushQueue
from .queues.pullqueue import PullQueue from .queues.pullqueue import PullQueue
from .scheduler import Scheduler, SchedStatus from .scheduler import Scheduler, SchedStatus
from .jobs import ElementJob from .jobs import ElementJob, JobStatus
...@@ -20,3 +20,4 @@ ...@@ -20,3 +20,4 @@
from .elementjob import ElementJob from .elementjob import ElementJob
from .cachesizejob import CacheSizeJob from .cachesizejob import CacheSizeJob
from .cleanupjob import CleanupJob from .cleanupjob import CleanupJob
from .job import JobStatus
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
# Author: # Author:
# Tristan Daniël Maat <tristan.maat@codethink.co.uk> # Tristan Daniël Maat <tristan.maat@codethink.co.uk>
# #
from .job import Job from .job import Job, JobStatus
class CacheSizeJob(Job): class CacheSizeJob(Job):
...@@ -30,8 +30,8 @@ class CacheSizeJob(Job): ...@@ -30,8 +30,8 @@ class CacheSizeJob(Job):
def child_process(self): def child_process(self):
return self._artifacts.compute_cache_size() return self._artifacts.compute_cache_size()
def parent_complete(self, success, result): def parent_complete(self, status, result):
if success: if status == JobStatus.OK:
self._artifacts.set_cache_size(result) self._artifacts.set_cache_size(result)
if self._complete_cb: if self._complete_cb:
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
# Author: # Author:
# Tristan Daniël Maat <tristan.maat@codethink.co.uk> # Tristan Daniël Maat <tristan.maat@codethink.co.uk>
# #
from .job import Job from .job import Job, JobStatus
class CleanupJob(Job): class CleanupJob(Job):
...@@ -29,6 +29,6 @@ class CleanupJob(Job): ...@@ -29,6 +29,6 @@ class CleanupJob(Job):
def child_process(self): def child_process(self):
return self._artifacts.clean() return self._artifacts.clean()
def parent_complete(self, success, result): def parent_complete(self, status, result):
if success: if status == JobStatus.OK:
self._artifacts.set_cache_size(result) self._artifacts.set_cache_size(result)
...@@ -60,7 +60,7 @@ from .job import Job ...@@ -60,7 +60,7 @@ from .job import Job
# Args: # Args:
# job (Job): The job object which completed # job (Job): The job object which completed
# element (Element): The element passed to the Job() constructor # element (Element): The element passed to the Job() constructor
# success (bool): True if the action_cb did not raise an exception # status (JobStatus): The status of whether the workload raised an exception
# result (object): The deserialized object returned by the `action_cb`, or None # result (object): The deserialized object returned by the `action_cb`, or None
# if `success` is False # if `success` is False
# #
...@@ -93,8 +93,8 @@ class ElementJob(Job): ...@@ -93,8 +93,8 @@ class ElementJob(Job):
# Run the action # Run the action
return self._action_cb(self._element) return self._action_cb(self._element)
def parent_complete(self, success, result): def parent_complete(self, status, result):
self._complete_cb(self, self._element, success, self._result) self._complete_cb(self, self._element, status, self._result)
def message(self, message_type, message, **kwargs): def message(self, message_type, message, **kwargs):
args = dict(kwargs) args = dict(kwargs)
......
...@@ -28,8 +28,6 @@ import traceback ...@@ -28,8 +28,6 @@ import traceback
import asyncio import asyncio
import multiprocessing import multiprocessing
import psutil
# BuildStream toplevel imports # BuildStream toplevel imports
from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
from ..._message import Message, MessageType, unconditional_messages from ..._message import Message, MessageType, unconditional_messages
...@@ -43,6 +41,22 @@ RC_PERM_FAIL = 2 ...@@ -43,6 +41,22 @@ RC_PERM_FAIL = 2
RC_SKIPPED = 3 RC_SKIPPED = 3
# JobStatus:
#
# The job completion status, passed back through the
# complete callbacks.
#
class JobStatus():
# Job succeeded
OK = 0
# A temporary BstError was raised
FAIL = 1
# A SkipJob was raised
SKIPPED = 3
# Used to distinguish between status messages and return values # Used to distinguish between status messages and return values
class Envelope(): class Envelope():
def __init__(self, message_type, message): def __init__(self, message_type, message):
...@@ -118,7 +132,6 @@ class Job(): ...@@ -118,7 +132,6 @@ class Job():
self._max_retries = max_retries # Maximum number of automatic retries self._max_retries = max_retries # Maximum number of automatic retries
self._result = None # Return value of child action in the parent self._result = None # Return value of child action in the parent
self._tries = 0 # Try count, for retryable jobs self._tries = 0 # Try count, for retryable jobs
self._skipped_flag = False # Indicate whether the job was skipped.
self._terminated = False # Whether this job has been explicitly terminated self._terminated = False # Whether this job has been explicitly terminated
# If False, a retry will not be attempted regardless of whether _tries is less than _max_retries. # If False, a retry will not be attempted regardless of whether _tries is less than _max_retries.
...@@ -215,17 +228,10 @@ class Job(): ...@@ -215,17 +228,10 @@ class Job():
# Forcefully kill the process, and any children it might have. # Forcefully kill the process, and any children it might have.
# #
def kill(self): def kill(self):
# Force kill # Force kill
self.message(MessageType.WARN, self.message(MessageType.WARN,
"{} did not terminate gracefully, killing".format(self.action_name)) "{} did not terminate gracefully, killing".format(self.action_name))
utils._kill_process_tree(self._process.pid)
try:
utils._kill_process_tree(self._process.pid)
# This can happen if the process died of its own accord before
# we try to kill it
except psutil.NoSuchProcess:
return
# suspend() # suspend()
# #
...@@ -282,18 +288,6 @@ class Job(): ...@@ -282,18 +288,6 @@ class Job():
def set_task_id(self, task_id): def set_task_id(self, task_id):
self._task_id = task_id self._task_id = task_id
# skipped
#
# This will evaluate to True if the job was skipped
# during processing, or if it was forcefully terminated.
#
# Returns:
# (bool): Whether the job should appear as skipped
#
@property
def skipped(self):
return self._skipped_flag or self._terminated
####################################################### #######################################################
# Abstract Methods # # Abstract Methods #
####################################################### #######################################################
...@@ -304,10 +298,10 @@ class Job(): ...@@ -304,10 +298,10 @@ class Job():
# pass the result to the main thread. # pass the result to the main thread.
# #
# Args: # Args:
# success (bool): Whether the job was successful. # status (JobStatus): The job exit status
# result (any): The result returned by child_process(). # result (any): The result returned by child_process().
# #
def parent_complete(self, success, result): def parent_complete(self, status, result):
raise ImplError("Job '{kind}' does not implement parent_complete()" raise ImplError("Job '{kind}' does not implement parent_complete()"
.format(kind=type(self).__name__)) .format(kind=type(self).__name__))
...@@ -571,16 +565,23 @@ class Job(): ...@@ -571,16 +565,23 @@ class Job():
# #
self._retry_flag = returncode == RC_FAIL self._retry_flag = returncode == RC_FAIL
# Set the flag to alert Queue that this job skipped.
self._skipped_flag = returncode == RC_SKIPPED
if self._retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated: if self._retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated:
self.spawn() self.spawn()
return return
success = returncode in (RC_OK, RC_SKIPPED) # Resolve the outward facing overall job completion status
self.parent_complete(success, self._result) #
self._scheduler.job_completed(self, success) if returncode == RC_OK:
status = JobStatus.OK
elif returncode == RC_SKIPPED:
status = JobStatus.SKIPPED
elif returncode in (RC_FAIL, RC_PERM_FAIL):
status = JobStatus.FAIL
else:
status = JobStatus.FAIL
self.parent_complete(status, self._result)
self._scheduler.job_completed(self, status)
# Force the deletion of the queue and process objects to try and clean up FDs # Force the deletion of the queue and process objects to try and clean up FDs
self._queue = self._process = None self._queue = self._process = None
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
from datetime import timedelta from datetime import timedelta
from . import Queue, QueueStatus from . import Queue, QueueStatus
from ..jobs import ElementJob from ..jobs import ElementJob, JobStatus
from ..resources import ResourceType from ..resources import ResourceType
from ..._message import MessageType from ..._message import MessageType
...@@ -104,7 +104,7 @@ class BuildQueue(Queue): ...@@ -104,7 +104,7 @@ class BuildQueue(Queue):
if artifacts.has_quota_exceeded(): if artifacts.has_quota_exceeded():
self._scheduler.check_cache_size() self._scheduler.check_cache_size()
def done(self, job, element, result, success): def done(self, job, element, result, status):
# Inform element in main process that assembly is done # Inform element in main process that assembly is done
element._assemble_done() element._assemble_done()
...@@ -117,5 +117,5 @@ class BuildQueue(Queue): ...@@ -117,5 +117,5 @@ class BuildQueue(Queue):
# artifact cache size for a successful build even though we know a # artifact cache size for a successful build even though we know a
# failed build also grows the artifact cache size. # failed build also grows the artifact cache size.
# #
if success: if status == JobStatus.OK:
self._check_cache_size(job, element, result) self._check_cache_size(job, element, result)
...@@ -24,6 +24,7 @@ from ... import Consistency ...@@ -24,6 +24,7 @@ from ... import Consistency
# Local imports # Local imports
from . import Queue, QueueStatus from . import Queue, QueueStatus
from ..resources import ResourceType from ..resources import ResourceType
from ..jobs import JobStatus
# A queue which fetches element sources # A queue which fetches element sources
...@@ -66,9 +67,9 @@ class FetchQueue(Queue): ...@@ -66,9 +67,9 @@ class FetchQueue(Queue):
return QueueStatus.READY return QueueStatus.READY
def done(self, _, element, result, success): def done(self, _, element, result, status):
if not success: if status == JobStatus.FAIL:
return return
element._update_state() element._update_state()
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
# Local imports # Local imports
from . import Queue, QueueStatus from . import Queue, QueueStatus
from ..resources import ResourceType from ..resources import ResourceType
from ..jobs import JobStatus
from ..._exceptions import SkipJob from ..._exceptions import SkipJob
...@@ -54,9 +55,9 @@ class PullQueue(Queue): ...@@ -54,9 +55,9 @@ class PullQueue(Queue):
else: else:
return QueueStatus.SKIP return QueueStatus.SKIP
def done(self, _, element, result, success): def done(self, _, element, result, status):
if not success: if status == JobStatus.FAIL:
return return
element._pull_done() element._pull_done()
...@@ -64,4 +65,5 @@ class PullQueue(Queue): ...@@ -64,4 +65,5 @@ class PullQueue(Queue):
# Build jobs will check the "approximate" size first. Since we # Build jobs will check the "approximate" size first. Since we
# do not get an artifact size from pull jobs, we have to # do not get an artifact size from pull jobs, we have to
# actually check the cache size. # actually check the cache size.
self._scheduler.check_cache_size() if status == JobStatus.OK:
self._scheduler.check_cache_size()
...@@ -25,7 +25,7 @@ from enum import Enum ...@@ -25,7 +25,7 @@ from enum import Enum
import traceback import traceback
# Local imports # Local imports
from ..jobs import ElementJob from ..jobs import ElementJob, JobStatus
from ..resources import ResourceType from ..resources import ResourceType
# BuildStream toplevel imports # BuildStream toplevel imports
...@@ -133,10 +133,9 @@ class Queue(): ...@@ -133,10 +133,9 @@ class Queue():
# job (Job): The job which completed processing # job (Job): The job which completed processing
# element (Element): The element which completed processing # element (Element): The element which completed processing
# result (any): The return value of the process() implementation # result (any): The return value of the process() implementation
# success (bool): True if the process() implementation did not # status (JobStatus): The return status of the Job
# raise any exception
# #
def done(self, job, element, result, success): def done(self, job, element, result, status):
pass pass
##################################################### #####################################################
...@@ -291,7 +290,7 @@ class Queue(): ...@@ -291,7 +290,7 @@ class Queue():
# #
# See the Job object for an explanation of the call signature # See the Job object for an explanation of the call signature
# #
def _job_done(self, job, element, success, result): def _job_done(self, job, element, status, result):
# Update values that need to be synchronized in the main task # Update values that need to be synchronized in the main task
# before calling any queue implementation # before calling any queue implementation
...@@ -301,7 +300,7 @@ class Queue(): ...@@ -301,7 +300,7 @@ class Queue():
# and determine if it should be considered as processed # and determine if it should be considered as processed
# or skipped. # or skipped.
try: try:
self.done(job, element, result, success) self.done(job, element, result, status)
except BstError as e: except BstError as e:
# Report error and mark as failed # Report error and mark as failed
...@@ -332,12 +331,10 @@ class Queue(): ...@@ -332,12 +331,10 @@ class Queue():
# All jobs get placed on the done queue for later processing. # All jobs get placed on the done queue for later processing.
self._done_queue.append(job) self._done_queue.append(job)
# A Job can be skipped whether or not it has failed, # These lists are for bookkeeping purposes for the UI and logging.
# we want to only bookkeep them as processed or failed if status == JobStatus.SKIPPED:
# if they are not skipped.
if job.skipped:
self.skipped_elements.append(element) self.skipped_elements.append(element)
elif success: elif status == JobStatus.OK:
self.processed_elements.append(element) self.processed_elements.append(element)
else: else:
self.failed_elements.append(element) self.failed_elements.append(element)
......
...@@ -24,6 +24,7 @@ from ...plugin import _plugin_lookup ...@@ -24,6 +24,7 @@ from ...plugin import _plugin_lookup
# Local imports # Local imports
from . import Queue, QueueStatus from . import Queue, QueueStatus
from ..resources import ResourceType from ..resources import ResourceType
from ..jobs import JobStatus
# A queue which tracks sources # A queue which tracks sources
...@@ -47,9 +48,9 @@ class TrackQueue(Queue): ...@@ -47,9 +48,9 @@ class TrackQueue(Queue):
return QueueStatus.READY return QueueStatus.READY
def done(self, _, element, result, success): def done(self, _, element, result, status):
if not success: if status == JobStatus.FAIL:
return return
# Set the new refs in the main process one by one as they complete # Set the new refs in the main process one by one as they complete
......
...@@ -38,6 +38,16 @@ class SchedStatus(): ...@@ -38,6 +38,16 @@ class SchedStatus():
TERMINATED = 1 TERMINATED = 1
# Our _REDUNDANT_EXCLUSIVE_ACTIONS jobs are special ones
# which we launch dynamically, they have the property of being
# meaningless to queue if one is already queued, and it also
# doesnt make sense to run them in parallel
#
_ACTION_NAME_CLEANUP = 'cleanup'
_ACTION_NAME_CACHE_SIZE = 'cache_size'
_REDUNDANT_EXCLUSIVE_ACTIONS = [_ACTION_NAME_CLEANUP, _ACTION_NAME_CACHE_SIZE]
# Scheduler() # Scheduler()
# #
# The scheduler operates on a list queues, each of which is meant to accomplish # The scheduler operates on a list queues, each of which is meant to accomplish
...@@ -94,6 +104,15 @@ class Scheduler(): ...@@ -94,6 +104,15 @@ class Scheduler():
self._suspendtime = None self._suspendtime = None
self._queue_jobs = True # Whether we should continue to queue jobs self._queue_jobs = True # Whether we should continue to queue jobs
# Whether our exclusive jobs, like 'cleanup' are currently already
# waiting or active.
#
# This is just a bit quicker than scanning the wait queue and active
# queue and comparing job action names.
#
self._exclusive_waiting = set()
self._exclusive_active = set()
self._resources = Resources(context.sched_builders, self._resources = Resources(context.sched_builders,
context.sched_fetchers, context.sched_fetchers,
context.sched_pushers) context.sched_pushers)
...@@ -211,19 +230,6 @@ class Scheduler(): ...@@ -211,19 +230,6 @@ class Scheduler():
starttime = timenow starttime = timenow
return timenow - starttime return timenow - starttime
# schedule_jobs()
#
# Args:
# jobs ([Job]): A list of jobs to schedule
#
# Schedule 'Job's for the scheduler to run. Jobs scheduled will be
# run as soon any other queueing jobs finish, provided sufficient
# resources are available for them to run
#
def schedule_jobs(self, jobs):
for job in jobs:
self.waiting_jobs.append(job)
# job_completed(): # job_completed():
# #
# Called when a Job completes # Called when a Job completes
...@@ -231,12 +237,14 @@ class Scheduler(): ...@@ -231,12 +237,14 @@ class Scheduler():
# Args: # Args:
# queue (Queue): The Queue holding a complete job # queue (Queue): The Queue holding a complete job
# job (Job): The completed Job # job (Job): The completed Job
# success (bool): Whether the Job completed with a success status # status (JobStatus): The status of the completed job
# #
def job_completed(self, job, success): def job_completed(self, job, status):
self._resources.clear_job_resources(job) self._resources.clear_job_resources(job)
self.active_jobs.remove(job) self.active_jobs.remove(job)
self._job_complete_callback(job, success) if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
self._exclusive_active.remove(job.action_name)
self._job_complete_callback(job, status)
self._schedule_queue_jobs() self._schedule_queue_jobs()
self._sched() self._sched()
...@@ -246,18 +254,13 @@ class Scheduler(): ...@@ -246,18 +254,13 @@ class Scheduler():
# size is calculated, a cleanup job will be run automatically # size is calculated, a cleanup job will be run automatically
# if needed. # if needed.
# #
# FIXME: This should ensure that only one cache size job
# is ever pending at a given time. If a cache size
# job is already running, it is correct to queue
# a new one, it is incorrect to have more than one
# of these jobs pending at a given time, though.
#
def check_cache_size(self): def check_cache_size(self):
job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size', job = CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE,
'cache_size/cache_size',
resources=[ResourceType.CACHE, resources=[ResourceType.CACHE,
ResourceType.PROCESS], ResourceType.PROCESS],
complete_cb=self._run_cleanup) complete_cb=self._run_cleanup)
self.schedule_jobs([job]) self._schedule_jobs([job])
####################################################### #######################################################
# Local Private Methods # # Local Private Methods #
...@@ -276,10 +279,19 @@ class Scheduler(): ...@@ -276,10 +279,19 @@ class Scheduler():
if not self._resources.reserve_job_resources(job): if not self._resources.reserve_job_resources(job):
continue continue
# Postpone these jobs if one is already running
if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS and \
job.action_name in self._exclusive_active:
continue
job.spawn() job.spawn()
self.waiting_jobs.remove(job) self.waiting_jobs.remove(job)
self.active_jobs.append(job) self.active_jobs.append(job)
if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
self._exclusive_waiting.remove(job.action_name)
self._exclusive_active.add(job.action_name)
if self._job_start_callback: if self._job_start_callback:
self._job_start_callback(job) self._job_start_callback(job)
...@@ -287,6 +299,33 @@ class Scheduler(): ...@@ -287,6 +299,33 @@ class Scheduler():
if not self.active_jobs and not self.waiting_jobs: if not self.active_jobs and not self.waiting_jobs:
self.loop.stop() self.loop.stop()
# _schedule_jobs()
#
# The main entry point for jobs to be scheduled.
#
# This is called either as a result of scanning the queues
# in _schedule_queue_jobs(), or directly by the Scheduler
# to insert special jobs like cleanups.
#
# Args:
# jobs ([Job]): A list of jobs to schedule
#
def _schedule_jobs(self, jobs):
for job in jobs:
# Special treatment of our redundant exclusive jobs
#
if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
# Drop the job if one is already queued
if job.action_name in self._exclusive_waiting:
continue
# Mark this action type as queued
self._exclusive_waiting.add(job.action_name)
self.waiting_jobs.append(job)
# _schedule_queue_jobs() # _schedule_queue_jobs()
# #
# Ask the queues what jobs they want to schedule and schedule # Ask the queues what jobs they want to schedule and schedule
...@@ -331,7 +370,7 @@ class Scheduler(): ...@@ -331,7 +370,7 @@ class Scheduler():
# the next queue and process them. # the next queue and process them.
process_queues = any(q.dequeue_ready() for q in self.queues) process_queues = any(q.dequeue_ready() for q in self.queues)
self.schedule_jobs(ready) self._schedule_jobs(ready)
self._sched() self._sched()
# _run_cleanup() # _run_cleanup()
...@@ -353,11 +392,11 @@ class Scheduler(): ...@@ -353,11 +392,11 @@ class Scheduler():
if not artifacts.has_quota_exceeded(): if not artifacts.has_quota_exceeded():
return return
job = CleanupJob(self, 'cleanup', 'cleanup/cleanup', job = CleanupJob(self, _ACTION_NAME_CLEANUP, 'cleanup/cleanup',
resources=[ResourceType.CACHE, resources=[ResourceType.CACHE,
ResourceType.PROCESS], ResourceType.PROCESS],
exclusive_resources=[ResourceType.CACHE]) exclusive_resources=[ResourceType.CACHE])
self.schedule_jobs([job]) self._schedule_jobs([job])
# _suspend_jobs() # _suspend_jobs()
# #
......
...@@ -65,7 +65,7 @@ Miscellaneous abstract methods also exist: ...@@ -65,7 +65,7 @@ Miscellaneous abstract methods also exist:
* :func:`Element.generate_script() <buildstream.element.Element.generate_script>` * :func:`Element.generate_script() <buildstream.element.Element.generate_script>`
For the purpose of ``bst source bundle``, an Element may optionally implement this. For the purpose of ``bst source checkout --include-build-scripts``, an Element may optionally implement this.
Class Reference Class Reference
......
...@@ -592,7 +592,7 @@ class _SandboxBatch(): ...@@ -592,7 +592,7 @@ class _SandboxBatch():
if command.label: if command.label:
context = self.sandbox._get_context() context = self.sandbox._get_context()
message = Message(self.sandbox._get_plugin_id(), MessageType.STATUS, message = Message(self.sandbox._get_plugin_id(), MessageType.STATUS,
'Running {}'.format(command.label)) 'Running command', detail=command.label)
context.message(message) context.message(message)
exitcode = self.sandbox._run(command.command, self.flags, cwd=command.cwd, env=command.env) exitcode = self.sandbox._run(command.command, self.flags, cwd=command.cwd, env=command.env)
......
...@@ -1050,6 +1050,11 @@ def _kill_process_tree(pid): ...@@ -1050,6 +1050,11 @@ def _kill_process_tree(pid):
# Ignore this error, it can happen with # Ignore this error, it can happen with
# some setuid bwrap processes. # some setuid bwrap processes.
pass pass
except psutil.NoSuchProcess:
# It is certain that this has already been sent
# SIGTERM, so there is a window where the process
# could have exited already.
pass
# Bloody Murder # Bloody Murder
for child in children: for child in children:
......
...@@ -138,5 +138,5 @@ def test_missing_certs(cli, datafiles, config_key, config_value): ...@@ -138,5 +138,5 @@ def test_missing_certs(cli, datafiles, config_key, config_value):
# Use `pull` here to ensure we try to initialize the remotes, triggering the error # Use `pull` here to ensure we try to initialize the remotes, triggering the error
# #
# This does not happen for a simple `bst show`. # This does not happen for a simple `bst show`.
result = cli.run(project=project, args=['pull', 'element.bst']) result = cli.run(project=project, args=['artifact', 'pull', 'element.bst'])
result.assert_main_error(ErrorDomain.LOAD, LoadErrorReason.INVALID_DATA) result.assert_main_error(ErrorDomain.LOAD, LoadErrorReason.INVALID_DATA)
...@@ -58,7 +58,7 @@ def test_push_pull(cli, tmpdir, datafiles): ...@@ -58,7 +58,7 @@ def test_push_pull(cli, tmpdir, datafiles):
project_set_artifacts(base_project, base_share.repo) project_set_artifacts(base_project, base_share.repo)
# Now try bst push # Now try bst push
result = cli.run(project=project, args=['push', '--deps', 'all', 'target.bst']) result = cli.run(project=project, args=['artifact', 'push', '--deps', 'all', 'target.bst'])
assert result.exit_code == 0 assert result.exit_code == 0
# And finally assert that the artifacts are in the right shares # And finally assert that the artifacts are in the right shares
...@@ -78,7 +78,7 @@ def test_push_pull(cli, tmpdir, datafiles): ...@@ -78,7 +78,7 @@ def test_push_pull(cli, tmpdir, datafiles):
assert state != 'cached' assert state != 'cached'
# Now try bst pull # Now try bst pull
result = cli.run(project=project, args=['pull', '--deps', 'all', 'target.bst']) result = cli.run(project=project, args=['artifact', 'pull', '--deps', 'all', 'target.bst'])
assert result.exit_code == 0 assert result.exit_code == 0
# And assert that they are again in the local cache, without having built # And assert that they are again in the local cache, without having built
......
...@@ -11,8 +11,6 @@ MAIN_COMMANDS = [ ...@@ -11,8 +11,6 @@ MAIN_COMMANDS = [
'checkout ', 'checkout ',
'help ', 'help ',
'init ', 'init ',
'pull ',
'push ',
'shell ', 'shell ',
'show ', 'show ',
'source ', 'source ',
...@@ -54,6 +52,12 @@ SOURCE_COMMANDS = [ ...@@ -54,6 +52,12 @@ SOURCE_COMMANDS = [
'track ', 'track ',
] ]
ARTIFACT_COMMANDS = [
'push ',
'pull ',
'log ',
]
WORKSPACE_COMMANDS = [ WORKSPACE_COMMANDS = [
'close ', 'close ',
'list ', 'list ',
...@@ -117,8 +121,7 @@ def assert_completion_failed(cli, cmd, word_idx, expected, cwd=None): ...@@ -117,8 +121,7 @@ def assert_completion_failed(cli, cmd, word_idx, expected, cwd=None):
@pytest.mark.parametrize("cmd,word_idx,expected", [ @pytest.mark.parametrize("cmd,word_idx,expected", [
('bst', 0, []), ('bst', 0, []),
('bst ', 1, MAIN_COMMANDS), ('bst ', 1, MAIN_COMMANDS),
('bst pu', 1, ['pull ', 'push ']), ('bst artifact ', 2, ARTIFACT_COMMANDS),
('bst pul', 1, ['pull ']),
('bst source ', 2, SOURCE_COMMANDS), ('bst source ', 2, SOURCE_COMMANDS),
('bst w ', 1, ['workspace ']), ('bst w ', 1, ['workspace ']),
('bst workspace ', 2, WORKSPACE_COMMANDS), ('bst workspace ', 2, WORKSPACE_COMMANDS),
...@@ -272,9 +275,8 @@ def test_argument_element_invalid(datafiles, cli, project, cmd, word_idx, expect ...@@ -272,9 +275,8 @@ def test_argument_element_invalid(datafiles, cli, project, cmd, word_idx, expect
@pytest.mark.parametrize("cmd,word_idx,expected", [ @pytest.mark.parametrize("cmd,word_idx,expected", [
('bst he', 1, ['help ']), ('bst he', 1, ['help ']),
('bst help ', 2, MAIN_COMMANDS), ('bst help ', 2, MAIN_COMMANDS),
('bst help artifact ', 3, ARTIFACT_COMMANDS),
('bst help in', 2, ['init ']), ('bst help in', 2, ['init ']),
('bst help p', 2, ['pull ', 'push ']),
('bst help p', 2, ['pull ', 'push ']),
('bst help source ', 3, SOURCE_COMMANDS), ('bst help source ', 3, SOURCE_COMMANDS),
('bst help w', 2, ['workspace ']), ('bst help w', 2, ['workspace ']),
('bst help workspace ', 3, WORKSPACE_COMMANDS), ('bst help workspace ', 3, WORKSPACE_COMMANDS),
......