Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • willsalmon/buildstream
  • CumHoleZH/buildstream
  • tchaik/buildstream
  • DCotyPortfolio/buildstream
  • jesusoctavioas/buildstream
  • patrickmmartin/buildstream
  • franred/buildstream
  • tintou/buildstream
  • alatiera/buildstream
  • martinblanchard/buildstream
  • neverdie22042524/buildstream
  • Mattlk13/buildstream
  • PServers/buildstream
  • phamnghia610909/buildstream
  • chiaratolentino/buildstream
  • eysz7-x-x/buildstream
  • kerrick1/buildstream
  • matthew-yates/buildstream
  • twofeathers/buildstream
  • mhadjimichael/buildstream
  • pointswaves/buildstream
  • Mr.JackWilson/buildstream
  • Tw3akG33k/buildstream
  • AlexFazakas/buildstream
  • eruidfkiy/buildstream
  • clamotion2/buildstream
  • nanonyme/buildstream
  • wickyjaaa/buildstream
  • nmanchev/buildstream
  • bojorquez.ja/buildstream
  • mostynb/buildstream
  • highpit74/buildstream
  • Demo112/buildstream
  • ba2014sheer/buildstream
  • tonimadrino/buildstream
  • usuario2o/buildstream
  • Angelika123456/buildstream
  • neo355/buildstream
  • corentin-ferlay/buildstream
  • coldtom/buildstream
  • wifitvbox81/buildstream
  • 358253885/buildstream
  • seanborg/buildstream
  • SotK/buildstream
  • DouglasWinship/buildstream
  • karansthr97/buildstream
  • louib/buildstream
  • bwh-ct/buildstream
  • robjh/buildstream
  • we88c0de/buildstream
  • zhengxian5555/buildstream
51 results
Show changes
Commits on Source (5)
Showing with 133 additions and 82 deletions
......@@ -38,7 +38,7 @@ from .._message import Message, MessageType, unconditional_messages
from .._stream import Stream
from .._versions import BST_FORMAT_VERSION
from .. import _yaml
from .._scheduler import ElementJob
from .._scheduler import ElementJob, JobStatus
# Import frontend assets
from . import Profile, LogLine, Status
......@@ -515,13 +515,13 @@ class App():
self._status.add_job(job)
self._maybe_render_status()
def _job_completed(self, job, success):
def _job_completed(self, job, status):
self._status.remove_job(job)
self._maybe_render_status()
# Dont attempt to handle a failure if the user has already opted to
# terminate
if not success and not self.stream.terminated:
if status == JobStatus.FAIL and not self.stream.terminated:
if isinstance(job, ElementJob):
element = job.element
......
......@@ -26,4 +26,4 @@ from .queues.pushqueue import PushQueue
from .queues.pullqueue import PullQueue
from .scheduler import Scheduler, SchedStatus
from .jobs import ElementJob
from .jobs import ElementJob, JobStatus
......@@ -20,3 +20,4 @@
from .elementjob import ElementJob
from .cachesizejob import CacheSizeJob
from .cleanupjob import CleanupJob
from .job import JobStatus
......@@ -16,7 +16,7 @@
# Author:
# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
#
from .job import Job
from .job import Job, JobStatus
class CacheSizeJob(Job):
......@@ -30,8 +30,8 @@ class CacheSizeJob(Job):
def child_process(self):
return self._artifacts.compute_cache_size()
def parent_complete(self, success, result):
if success:
def parent_complete(self, status, result):
if status == JobStatus.OK:
self._artifacts.set_cache_size(result)
if self._complete_cb:
......
......@@ -16,7 +16,7 @@
# Author:
# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
#
from .job import Job
from .job import Job, JobStatus
class CleanupJob(Job):
......@@ -29,6 +29,6 @@ class CleanupJob(Job):
def child_process(self):
return self._artifacts.clean()
def parent_complete(self, success, result):
if success:
def parent_complete(self, status, result):
if status == JobStatus.OK:
self._artifacts.set_cache_size(result)
......@@ -60,7 +60,7 @@ from .job import Job
# Args:
# job (Job): The job object which completed
# element (Element): The element passed to the Job() constructor
# success (bool): True if the action_cb did not raise an exception
# status (JobStatus): The status of whether the workload raised an exception
# result (object): The deserialized object returned by the `action_cb`, or None
# if `success` is False
#
......@@ -93,8 +93,8 @@ class ElementJob(Job):
# Run the action
return self._action_cb(self._element)
def parent_complete(self, success, result):
self._complete_cb(self, self._element, success, self._result)
def parent_complete(self, status, result):
self._complete_cb(self, self._element, status, self._result)
def message(self, message_type, message, **kwargs):
args = dict(kwargs)
......
......@@ -41,6 +41,22 @@ RC_PERM_FAIL = 2
RC_SKIPPED = 3
# JobStatus:
#
# The job completion status, passed back through the
# complete callbacks.
#
class JobStatus():
# Job succeeded
OK = 0
# A temporary BstError was raised
FAIL = 1
# A SkipJob was raised
SKIPPED = 3
# Used to distinguish between status messages and return values
class Envelope():
def __init__(self, message_type, message):
......@@ -116,7 +132,6 @@ class Job():
self._max_retries = max_retries # Maximum number of automatic retries
self._result = None # Return value of child action in the parent
self._tries = 0 # Try count, for retryable jobs
self._skipped_flag = False # Indicate whether the job was skipped.
self._terminated = False # Whether this job has been explicitly terminated
# If False, a retry will not be attempted regardless of whether _tries is less than _max_retries.
......@@ -273,18 +288,6 @@ class Job():
def set_task_id(self, task_id):
self._task_id = task_id
# skipped
#
# This will evaluate to True if the job was skipped
# during processing, or if it was forcefully terminated.
#
# Returns:
# (bool): Whether the job should appear as skipped
#
@property
def skipped(self):
return self._skipped_flag or self._terminated
#######################################################
# Abstract Methods #
#######################################################
......@@ -295,10 +298,10 @@ class Job():
# pass the result to the main thread.
#
# Args:
# success (bool): Whether the job was successful.
# status (JobStatus): The job exit status
# result (any): The result returned by child_process().
#
def parent_complete(self, success, result):
def parent_complete(self, status, result):
raise ImplError("Job '{kind}' does not implement parent_complete()"
.format(kind=type(self).__name__))
......@@ -562,16 +565,23 @@ class Job():
#
self._retry_flag = returncode == RC_FAIL
# Set the flag to alert Queue that this job skipped.
self._skipped_flag = returncode == RC_SKIPPED
if self._retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated:
self.spawn()
return
success = returncode in (RC_OK, RC_SKIPPED)
self.parent_complete(success, self._result)
self._scheduler.job_completed(self, success)
# Resolve the outward facing overall job completion status
#
if returncode == RC_OK:
status = JobStatus.OK
elif returncode == RC_SKIPPED:
status = JobStatus.SKIPPED
elif returncode in (RC_FAIL, RC_PERM_FAIL):
status = JobStatus.FAIL
else:
status = JobStatus.FAIL
self.parent_complete(status, self._result)
self._scheduler.job_completed(self, status)
# Force the deletion of the queue and process objects to try and clean up FDs
self._queue = self._process = None
......
......@@ -21,7 +21,7 @@
from datetime import timedelta
from . import Queue, QueueStatus
from ..jobs import ElementJob
from ..jobs import ElementJob, JobStatus
from ..resources import ResourceType
from ..._message import MessageType
......@@ -104,7 +104,7 @@ class BuildQueue(Queue):
if artifacts.has_quota_exceeded():
self._scheduler.check_cache_size()
def done(self, job, element, result, success):
def done(self, job, element, result, status):
# Inform element in main process that assembly is done
element._assemble_done()
......@@ -117,5 +117,5 @@ class BuildQueue(Queue):
# artifact cache size for a successful build even though we know a
# failed build also grows the artifact cache size.
#
if success:
if status == JobStatus.OK:
self._check_cache_size(job, element, result)
......@@ -24,6 +24,7 @@ from ... import Consistency
# Local imports
from . import Queue, QueueStatus
from ..resources import ResourceType
from ..jobs import JobStatus
# A queue which fetches element sources
......@@ -66,9 +67,9 @@ class FetchQueue(Queue):
return QueueStatus.READY
def done(self, _, element, result, success):
def done(self, _, element, result, status):
if not success:
if status == JobStatus.FAIL:
return
element._update_state()
......
......@@ -21,6 +21,7 @@
# Local imports
from . import Queue, QueueStatus
from ..resources import ResourceType
from ..jobs import JobStatus
from ..._exceptions import SkipJob
......@@ -54,9 +55,9 @@ class PullQueue(Queue):
else:
return QueueStatus.SKIP
def done(self, _, element, result, success):
def done(self, _, element, result, status):
if not success:
if status == JobStatus.FAIL:
return
element._pull_done()
......@@ -64,4 +65,5 @@ class PullQueue(Queue):
# Build jobs will check the "approximate" size first. Since we
# do not get an artifact size from pull jobs, we have to
# actually check the cache size.
self._scheduler.check_cache_size()
if status == JobStatus.OK:
self._scheduler.check_cache_size()
......@@ -25,7 +25,7 @@ from enum import Enum
import traceback
# Local imports
from ..jobs import ElementJob
from ..jobs import ElementJob, JobStatus
from ..resources import ResourceType
# BuildStream toplevel imports
......@@ -133,10 +133,9 @@ class Queue():
# job (Job): The job which completed processing
# element (Element): The element which completed processing
# result (any): The return value of the process() implementation
# success (bool): True if the process() implementation did not
# raise any exception
# status (JobStatus): The return status of the Job
#
def done(self, job, element, result, success):
def done(self, job, element, result, status):
pass
#####################################################
......@@ -291,7 +290,7 @@ class Queue():
#
# See the Job object for an explanation of the call signature
#
def _job_done(self, job, element, success, result):
def _job_done(self, job, element, status, result):
# Update values that need to be synchronized in the main task
# before calling any queue implementation
......@@ -301,7 +300,7 @@ class Queue():
# and determine if it should be considered as processed
# or skipped.
try:
self.done(job, element, result, success)
self.done(job, element, result, status)
except BstError as e:
# Report error and mark as failed
......@@ -332,12 +331,10 @@ class Queue():
# All jobs get placed on the done queue for later processing.
self._done_queue.append(job)
# A Job can be skipped whether or not it has failed,
# we want to only bookkeep them as processed or failed
# if they are not skipped.
if job.skipped:
# These lists are for bookkeeping purposes for the UI and logging.
if status == JobStatus.SKIPPED:
self.skipped_elements.append(element)
elif success:
elif status == JobStatus.OK:
self.processed_elements.append(element)
else:
self.failed_elements.append(element)
......
......@@ -24,6 +24,7 @@ from ...plugin import _plugin_lookup
# Local imports
from . import Queue, QueueStatus
from ..resources import ResourceType
from ..jobs import JobStatus
# A queue which tracks sources
......@@ -47,9 +48,9 @@ class TrackQueue(Queue):
return QueueStatus.READY
def done(self, _, element, result, success):
def done(self, _, element, result, status):
if not success:
if status == JobStatus.FAIL:
return
# Set the new refs in the main process one by one as they complete
......
......@@ -38,6 +38,16 @@ class SchedStatus():
TERMINATED = 1
# Our _REDUNDANT_EXCLUSIVE_ACTIONS jobs are special ones
# which we launch dynamically, they have the property of being
# meaningless to queue if one is already queued, and it also
# doesnt make sense to run them in parallel
#
_ACTION_NAME_CLEANUP = 'cleanup'
_ACTION_NAME_CACHE_SIZE = 'cache_size'
_REDUNDANT_EXCLUSIVE_ACTIONS = [_ACTION_NAME_CLEANUP, _ACTION_NAME_CACHE_SIZE]
# Scheduler()
#
# The scheduler operates on a list queues, each of which is meant to accomplish
......@@ -94,6 +104,15 @@ class Scheduler():
self._suspendtime = None
self._queue_jobs = True # Whether we should continue to queue jobs
# Whether our exclusive jobs, like 'cleanup' are currently already
# waiting or active.
#
# This is just a bit quicker than scanning the wait queue and active
# queue and comparing job action names.
#
self._exclusive_waiting = set()
self._exclusive_active = set()
self._resources = Resources(context.sched_builders,
context.sched_fetchers,
context.sched_pushers)
......@@ -211,19 +230,6 @@ class Scheduler():
starttime = timenow
return timenow - starttime
# schedule_jobs()
#
# Args:
# jobs ([Job]): A list of jobs to schedule
#
# Schedule 'Job's for the scheduler to run. Jobs scheduled will be
# run as soon any other queueing jobs finish, provided sufficient
# resources are available for them to run
#
def schedule_jobs(self, jobs):
for job in jobs:
self.waiting_jobs.append(job)
# job_completed():
#
# Called when a Job completes
......@@ -231,12 +237,14 @@ class Scheduler():
# Args:
# queue (Queue): The Queue holding a complete job
# job (Job): The completed Job
# success (bool): Whether the Job completed with a success status
# status (JobStatus): The status of the completed job
#
def job_completed(self, job, success):
def job_completed(self, job, status):
self._resources.clear_job_resources(job)
self.active_jobs.remove(job)
self._job_complete_callback(job, success)
if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
self._exclusive_active.remove(job.action_name)
self._job_complete_callback(job, status)
self._schedule_queue_jobs()
self._sched()
......@@ -246,18 +254,13 @@ class Scheduler():
# size is calculated, a cleanup job will be run automatically
# if needed.
#
# FIXME: This should ensure that only one cache size job
# is ever pending at a given time. If a cache size
# job is already running, it is correct to queue
# a new one, it is incorrect to have more than one
# of these jobs pending at a given time, though.
#
def check_cache_size(self):
job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size',
job = CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE,
'cache_size/cache_size',
resources=[ResourceType.CACHE,
ResourceType.PROCESS],
complete_cb=self._run_cleanup)
self.schedule_jobs([job])
self._schedule_jobs([job])
#######################################################
# Local Private Methods #
......@@ -276,10 +279,19 @@ class Scheduler():
if not self._resources.reserve_job_resources(job):
continue
# Postpone these jobs if one is already running
if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS and \
job.action_name in self._exclusive_active:
continue
job.spawn()
self.waiting_jobs.remove(job)
self.active_jobs.append(job)
if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
self._exclusive_waiting.remove(job.action_name)
self._exclusive_active.add(job.action_name)
if self._job_start_callback:
self._job_start_callback(job)
......@@ -287,6 +299,33 @@ class Scheduler():
if not self.active_jobs and not self.waiting_jobs:
self.loop.stop()
# _schedule_jobs()
#
# The main entry point for jobs to be scheduled.
#
# This is called either as a result of scanning the queues
# in _schedule_queue_jobs(), or directly by the Scheduler
# to insert special jobs like cleanups.
#
# Args:
# jobs ([Job]): A list of jobs to schedule
#
def _schedule_jobs(self, jobs):
for job in jobs:
# Special treatment of our redundant exclusive jobs
#
if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
# Drop the job if one is already queued
if job.action_name in self._exclusive_waiting:
continue
# Mark this action type as queued
self._exclusive_waiting.add(job.action_name)
self.waiting_jobs.append(job)
# _schedule_queue_jobs()
#
# Ask the queues what jobs they want to schedule and schedule
......@@ -331,7 +370,7 @@ class Scheduler():
# the next queue and process them.
process_queues = any(q.dequeue_ready() for q in self.queues)
self.schedule_jobs(ready)
self._schedule_jobs(ready)
self._sched()
# _run_cleanup()
......@@ -353,11 +392,11 @@ class Scheduler():
if not artifacts.has_quota_exceeded():
return
job = CleanupJob(self, 'cleanup', 'cleanup/cleanup',
job = CleanupJob(self, _ACTION_NAME_CLEANUP, 'cleanup/cleanup',
resources=[ResourceType.CACHE,
ResourceType.PROCESS],
exclusive_resources=[ResourceType.CACHE])
self.schedule_jobs([job])
self._schedule_jobs([job])
# _suspend_jobs()
#
......