Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • willsalmon/buildstream
  • CumHoleZH/buildstream
  • tchaik/buildstream
  • DCotyPortfolio/buildstream
  • jesusoctavioas/buildstream
  • patrickmmartin/buildstream
  • franred/buildstream
  • tintou/buildstream
  • alatiera/buildstream
  • martinblanchard/buildstream
  • neverdie22042524/buildstream
  • Mattlk13/buildstream
  • PServers/buildstream
  • phamnghia610909/buildstream
  • chiaratolentino/buildstream
  • eysz7-x-x/buildstream
  • kerrick1/buildstream
  • matthew-yates/buildstream
  • twofeathers/buildstream
  • mhadjimichael/buildstream
  • pointswaves/buildstream
  • Mr.JackWilson/buildstream
  • Tw3akG33k/buildstream
  • AlexFazakas/buildstream
  • eruidfkiy/buildstream
  • clamotion2/buildstream
  • nanonyme/buildstream
  • wickyjaaa/buildstream
  • nmanchev/buildstream
  • bojorquez.ja/buildstream
  • mostynb/buildstream
  • highpit74/buildstream
  • Demo112/buildstream
  • ba2014sheer/buildstream
  • tonimadrino/buildstream
  • usuario2o/buildstream
  • Angelika123456/buildstream
  • neo355/buildstream
  • corentin-ferlay/buildstream
  • coldtom/buildstream
  • wifitvbox81/buildstream
  • 358253885/buildstream
  • seanborg/buildstream
  • SotK/buildstream
  • DouglasWinship/buildstream
  • karansthr97/buildstream
  • louib/buildstream
  • bwh-ct/buildstream
  • robjh/buildstream
  • we88c0de/buildstream
  • zhengxian5555/buildstream
51 results
Show changes
Commits on Source (5)
  • Tristan Van Berkom's avatar
    _scheduler: Refactor of queues and resources. · 6991d6d6
    Tristan Van Berkom authored
    This branch makes the following changes:
    
      * jobs/job.py: No longer stores any interested resource list
    
        Jobs are ephemeral again, they only ever exist while they
        are running.
    
      * queues/queue.py: Revert to only handling lists of elements
    
        Elements pass through the queues, Queue.harvest_jobs()
        replaces Queue.pop_ready_jobs() and now the Queue stops
        creating jobs as soon as there are not enough resources
        for the job.
    
        Also removed unused `prepare()` abstract method.
    
      * queues/buildqueue.py: Adapt the part where we launch a job
    
        This part needs to be reworked anyway, just touch it up for
        now so that it doesnt break with the surrounding changes.
    
      * jobs/{cachesize,cleanup}job.py: Expose uniform complete callback
    
        Allows the scheduler to manage resource deallocation for these
        two job completions as a custom thing, at the same phase
        that the Queues take care of their own resource deallocation.
    
      * resources.py: No longer has knowledge of the job
    
        Since jobs are ephemeral, they are not a suitable place
        to store the resource identifiers, these must be provided
        by the callers wherever needed.
    
        Now the main Resources object is owned by the Scheduler
        but shared with Queues, each take care of managing the
        resources of the jobs they create through the same
        resource API.
    
      * scheduler.py: Reverted to only creating jobs on demand
    
        This changes the flow of the scheduler such that whenever
        jobs complete, the queues are interrogated for as many
        jobs which can run at the moment but not more; and this
        completely removes the waiting list.
    
        For the internal cache management jobs, we handle this
        with a little state instead of having a waiting list
        and only launch when the resources permit it.
    
    By abolishing the scheduler waiting list and creating jobs
    on demand, we fix the order of element processing and consequently
    fix issue #712.
    6991d6d6
  • Tristan Van Berkom's avatar
    tests/frontend/order.py: Enable the test for build and fix the fetch tests · 04ba59bc
    Tristan Van Berkom authored
    With the scheduler changes, fetch jobs get automatically skipped
    so the output is changed, using a separate repo for each element
    fixes the test such that every fetch job gets a job launched.
    04ba59bc
  • Tristan Van Berkom's avatar
    _artifactcache/artifactcache.py: Rephrase failure message · 32bdded8
    Tristan Van Berkom authored
    It was saying "There is not enough space to build the given element.",
    this makes me think the error is associated to a specific element, but
    this does not make sense to show up in a cleanup task.
    
    Instead say "There is not enough space to complete the build.", which
    should be more clear that even after cleaning up there is not enough
    space.
    32bdded8
  • Tristan Van Berkom's avatar
    Merge branch 'tristan/element-processing-order' into 'master' · 17f6e5a8
    Tristan Van Berkom authored
    Scheduler refactor, fix processing order
    
    Closes #712
    
    See merge request !1067
    17f6e5a8
  • Tristan Van Berkom's avatar
    sandbox/sandbox.py: Display failed commands in the detail string · 0e72ef06
    Tristan Van Berkom authored
    We should only display commands in detail strings, not in the
    message texts.
    0e72ef06
......@@ -247,7 +247,7 @@ class ArtifactCache():
# FIXME: Asking the user what to do may be neater
default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'],
'buildstream.conf')
detail = ("There is not enough space to build the given element.\n"
detail = ("There is not enough space to complete the build.\n"
"Please increase the cache-quota in {}."
.format(self.context.config_origin or default_conf))
......
......@@ -262,8 +262,8 @@ class PlatformError(BstError):
# Raised when errors are encountered by the sandbox implementation
#
class SandboxError(BstError):
def __init__(self, message, reason=None):
super().__init__(message, domain=ErrorDomain.SANDBOX, reason=reason)
def __init__(self, message, detail=None, reason=None):
super().__init__(message, detail=detail, domain=ErrorDomain.SANDBOX, reason=reason)
# ArtifactError
......
......@@ -34,8 +34,8 @@ class CacheSizeJob(Job):
if status == JobStatus.OK:
self._artifacts.set_cache_size(result)
if self._complete_cb:
self._complete_cb(result)
if self._complete_cb:
self._complete_cb(status, result)
def child_process_data(self):
return {}
......@@ -20,8 +20,9 @@ from .job import Job, JobStatus
class CleanupJob(Job):
def __init__(self, *args, **kwargs):
def __init__(self, *args, complete_cb, **kwargs):
super().__init__(*args, **kwargs)
self._complete_cb = complete_cb
context = self._scheduler.context
self._artifacts = context.artifactcache
......@@ -32,3 +33,6 @@ class CleanupJob(Job):
def parent_complete(self, status, result):
if status == JobStatus.OK:
self._artifacts.set_cache_size(result)
if self._complete_cb:
self._complete_cb(status, result)
......@@ -85,28 +85,11 @@ class Process(multiprocessing.Process):
# action_name (str): The queue action name
# logfile (str): A template string that points to the logfile
# that should be used - should contain {pid}.
# resources (iter(ResourceType)) - A set of resources this job
# wants to use.
# exclusive_resources (iter(ResourceType)) - A set of resources
# this job wants to use
# exclusively.
# max_retries (int): The maximum number of retries
#
class Job():
def __init__(self, scheduler, action_name, logfile, *,
resources=None, exclusive_resources=None, max_retries=0):
if resources is None:
resources = set()
else:
resources = set(resources)
if exclusive_resources is None:
exclusive_resources = set()
else:
exclusive_resources = set(resources)
assert exclusive_resources <= resources, "All exclusive resources must also be resources!"
def __init__(self, scheduler, action_name, logfile, *, max_retries=0):
#
# Public members
......@@ -114,12 +97,6 @@ class Job():
self.action_name = action_name # The action name for the Queue
self.child_data = None # Data to be sent to the main process
# The resources this job wants to access
self.resources = resources
# Resources this job needs to access exclusively, i.e., no
# other job should be allowed to access them
self.exclusive_resources = exclusive_resources
#
# Private members
#
......
......@@ -57,11 +57,10 @@ class BuildQueue(Queue):
logfile=logfile)
job = ElementJob(self._scheduler, self.action_name,
logfile, element=element, queue=self,
resources=self.resources,
action_cb=self.process,
complete_cb=self._job_done,
max_retries=self._max_retries)
self._done_queue.append(job)
self._done_queue.append(element)
self.failed_elements.append(element)
self._scheduler._job_complete_callback(job, False)
......
......@@ -72,8 +72,9 @@ class Queue():
# Private members
#
self._scheduler = scheduler
self._wait_queue = deque()
self._done_queue = deque()
self._resources = scheduler.resources # Shared resource pool
self._wait_queue = deque() # Ready / Waiting elements
self._done_queue = deque() # Processed / Skipped elements
self._max_retries = 0
# Assert the subclass has setup class data
......@@ -115,16 +116,6 @@ class Queue():
def status(self, element):
return QueueStatus.READY
# prepare()
#
# Abstract method for handling job preparation in the main process.
#
# Args:
# element (Element): The element which is scheduled
#
def prepare(self, element):
pass
# done()
#
# Abstract method for handling a successful job completion.
......@@ -153,26 +144,18 @@ class Queue():
if not elts:
return
# Note: The internal lists work with jobs. This is not
# reflected in any external methods (except
# pop/peek_ready_jobs).
def create_job(element):
logfile = self._element_log_path(element)
return ElementJob(self._scheduler, self.action_name,
logfile, element=element, queue=self,
resources=self.resources,
action_cb=self.process,
complete_cb=self._job_done,
max_retries=self._max_retries)
# Place skipped elements directly on the done queue
jobs = [create_job(elt) for elt in elts]
skip = [job for job in jobs if self.status(job.element) == QueueStatus.SKIP]
wait = [job for job in jobs if job not in skip]
self.skipped_elements.extend([job.element for job in skip])
self._wait_queue.extend(wait)
self._done_queue.extend(skip)
# Place skipped elements on the done queue right away.
#
# The remaining ready and waiting elements must remain in the
# same queue, and ready status must be determined at the moment
# which the scheduler is asking for the next job.
#
skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP]
wait = [elt for elt in elts if elt not in skip]
self.skipped_elements.extend(skip) # Public record of skipped elements
self._done_queue.extend(skip) # Elements to be processed
self._wait_queue.extend(wait) # Elements eligible to be dequeued
# dequeue()
#
......@@ -184,69 +167,59 @@ class Queue():
#
def dequeue(self):
while self._done_queue:
yield self._done_queue.popleft().element
yield self._done_queue.popleft()
# dequeue_ready()
#
# Reports whether there are any elements to dequeue
# Reports whether any elements can be promoted to other queues
#
# Returns:
# (bool): Whether there are elements to dequeue
# (bool): Whether there are elements ready
#
def dequeue_ready(self):
return any(self._done_queue)
# pop_ready_jobs()
#
# Returns:
# ([Job]): A list of jobs to run
# harvest_jobs()
#
# Process elements in the queue, moving elements which were enqueued
# into the dequeue pool, and processing them if necessary.
#
# This will have different results for elements depending
# on the Queue.status() implementation.
#
# o Elements which are QueueStatus.WAIT will not be affected
# into the dequeue pool, and creating as many jobs for which resources
# can be reserved.
#
# o Elements which are QueueStatus.SKIP will move directly
# to the dequeue pool
#
# o For Elements which are QueueStatus.READY a Job will be
# created and returned to the caller, given that the scheduler
# allows the Queue enough resources for the given job
# Returns:
# ([Job]): A list of jobs which can be run now
#
def pop_ready_jobs(self):
def harvest_jobs(self):
unready = []
ready = []
while self._wait_queue:
job = self._wait_queue.popleft()
element = job.element
if not self._resources.reserve(self.resources, peek=True):
break
element = self._wait_queue.popleft()
status = self.status(element)
if status == QueueStatus.WAIT:
unready.append(job)
continue
unready.append(element)
elif status == QueueStatus.SKIP:
self._done_queue.append(job)
self._done_queue.append(element)
self.skipped_elements.append(element)
continue
self.prepare(element)
ready.append(job)
else:
reserved = self._resources.reserve(self.resources)
assert reserved
ready.append(element)
# These were not ready but were in the beginning, give em
# first priority again next time around
self._wait_queue.extendleft(unready)
return ready
def peek_ready_jobs(self):
def ready(job):
return self.status(job.element) == QueueStatus.READY
yield from (job for job in self._wait_queue if ready(job))
return [
ElementJob(self._scheduler, self.action_name,
self._element_log_path(element),
element=element, queue=self,
action_cb=self.process,
complete_cb=self._job_done,
max_retries=self._max_retries)
for element in ready
]
#####################################################
# Private Methods #
......@@ -292,6 +265,10 @@ class Queue():
#
def _job_done(self, job, element, status, result):
# Now release the resources we reserved
#
self._resources.release(self.resources)
# Update values that need to be synchronized in the main task
# before calling any queue implementation
self._update_workspaces(element, job)
......@@ -324,12 +301,8 @@ class Queue():
detail=traceback.format_exc())
self.failed_elements.append(element)
else:
#
# No exception occured in post processing
#
# All jobs get placed on the done queue for later processing.
self._done_queue.append(job)
# All elements get placed on the done queue for later processing.
self._done_queue.append(element)
# These lists are for bookkeeping purposes for the UI and logging.
if status == JobStatus.SKIPPED:
......
......@@ -34,28 +34,25 @@ class Resources():
ResourceType.UPLOAD: set()
}
def clear_job_resources(self, job):
for resource in job.exclusive_resources:
self._exclusive_resources[resource].remove(hash(job))
# reserve()
#
# Reserves a set of resources
#
# Args:
# resources (set): A set of ResourceTypes
# exclusive (set): Another set of ResourceTypes
# peek (bool): Whether to only peek at whether the resource is available
#
# Returns:
# (bool): True if the resources could be reserved
#
def reserve(self, resources, exclusive=None, *, peek=False):
if exclusive is None:
exclusive = set()
for resource in job.resources:
self._used_resources[resource] -= 1
def reserve_exclusive_resources(self, job):
exclusive = job.exclusive_resources
# The very first thing we do is to register any exclusive
# resources this job may want. Even if the job is not yet
# allowed to run (because another job is holding the resource
# it wants), we can still set this - it just means that any
# job *currently* using these resources has to finish first,
# and no new jobs wanting these can be launched (except other
# exclusive-access jobs).
#
for resource in exclusive:
self._exclusive_resources[resource].add(hash(job))
resources = set(resources)
exclusive = set(exclusive)
def reserve_job_resources(self, job):
# First, we check if the job wants to access a resource that
# another job wants exclusive access to. If so, it cannot be
# scheduled.
......@@ -68,7 +65,8 @@ class Resources():
# is currently not possible, but may be worth thinking
# about.
#
for resource in job.resources - job.exclusive_resources:
for resource in resources - exclusive:
# If our job wants this resource exclusively, we never
# check this, so we can get away with not (temporarily)
# removing it from the set.
......@@ -84,14 +82,14 @@ class Resources():
# at a time, despite being allowed to be part of the exclusive
# set.
#
for exclusive in job.exclusive_resources:
if self._used_resources[exclusive] != 0:
for resource in exclusive:
if self._used_resources[resource] != 0:
return False
# Finally, we check if we have enough of each resource
# available. If we don't have enough, the job cannot be
# scheduled.
for resource in job.resources:
for resource in resources:
if (self._max_resources[resource] > 0 and
self._used_resources[resource] >= self._max_resources[resource]):
return False
......@@ -99,7 +97,70 @@ class Resources():
# Now we register the fact that our job is using the resources
# it asked for, and tell the scheduler that it is allowed to
# continue.
for resource in job.resources:
self._used_resources[resource] += 1
if not peek:
for resource in resources:
self._used_resources[resource] += 1
return True
# release()
#
# Release resources previously reserved with Resources.reserve()
#
# Args:
# resources (set): A set of resources to release
#
def release(self, resources):
for resource in resources:
assert self._used_resources[resource] > 0, "Scheduler resource imbalance"
self._used_resources[resource] -= 1
# register_exclusive_interest()
#
# Inform the resources pool that `source` has an interest in
# reserving this resource exclusively.
#
# The source parameter is used to identify the caller, it
# must be ensured to be unique for the time that the
# interest is registered.
#
# This function may be called multiple times, and subsequent
# calls will simply have no effect until clear_exclusive_interest()
# is used to clear the interest.
#
# This must be called in advance of reserve()
#
# Args:
# resources (set): Set of resources to reserve exclusively
# source (any): Source identifier, to be used again when unregistering
# the interest.
#
def register_exclusive_interest(self, resources, source):
# The very first thing we do is to register any exclusive
# resources this job may want. Even if the job is not yet
# allowed to run (because another job is holding the resource
# it wants), we can still set this - it just means that any
# job *currently* using these resources has to finish first,
# and no new jobs wanting these can be launched (except other
# exclusive-access jobs).
#
for resource in resources:
self._exclusive_resources[resource].add(source)
# unregister_exclusive_interest()
#
# Clear the exclusive interest in these resources.
#
# This should be called by the given source which registered
# an exclusive interest.
#
# Args:
# resources (set): Set of resources to reserve exclusively
# source (str): Source identifier, to be used again when unregistering
# the interest.
#
def unregister_exclusive_interest(self, resources, source):
for resource in resources:
self._exclusive_resources[resource].remove(source)
......@@ -28,7 +28,7 @@ from contextlib import contextmanager
# Local imports
from .resources import Resources, ResourceType
from .jobs import CacheSizeJob, CleanupJob
from .jobs import JobStatus, CacheSizeJob, CleanupJob
# A decent return code for Scheduler.run()
......@@ -38,14 +38,10 @@ class SchedStatus():
TERMINATED = 1
# Our _REDUNDANT_EXCLUSIVE_ACTIONS jobs are special ones
# which we launch dynamically, they have the property of being
# meaningless to queue if one is already queued, and it also
# doesnt make sense to run them in parallel
# Some action names for the internal jobs we launch
#
_ACTION_NAME_CLEANUP = 'cleanup'
_ACTION_NAME_CACHE_SIZE = 'cache_size'
_REDUNDANT_EXCLUSIVE_ACTIONS = [_ACTION_NAME_CLEANUP, _ACTION_NAME_CACHE_SIZE]
# Scheduler()
......@@ -81,8 +77,6 @@ class Scheduler():
#
# Public members
#
self.active_jobs = [] # Jobs currently being run in the scheduler
self.waiting_jobs = [] # Jobs waiting for resources
self.queues = None # Exposed for the frontend to print summaries
self.context = context # The Context object shared with Queues
self.terminated = False # Whether the scheduler was asked to terminate or has terminated
......@@ -95,15 +89,23 @@ class Scheduler():
#
# Private members
#
self._active_jobs = [] # Jobs currently being run in the scheduler
self._starttime = start_time # Initial application start time
self._suspendtime = None # Session time compensation for suspended state
self._queue_jobs = True # Whether we should continue to queue jobs
# State of cache management related jobs
self._cache_size_scheduled = False # Whether we have a cache size job scheduled
self._cache_size_running = None # A running CacheSizeJob, or None
self._cleanup_scheduled = False # Whether we have a cleanup job scheduled
self._cleanup_running = None # A running CleanupJob, or None
# Callbacks to report back to the Scheduler owner
self._interrupt_callback = interrupt_callback
self._ticker_callback = ticker_callback
self._job_start_callback = job_start_callback
self._job_complete_callback = job_complete_callback
self._starttime = start_time
self._suspendtime = None
self._queue_jobs = True # Whether we should continue to queue jobs
# Whether our exclusive jobs, like 'cleanup' are currently already
# waiting or active.
#
......@@ -113,9 +115,9 @@ class Scheduler():
self._exclusive_waiting = set()
self._exclusive_active = set()
self._resources = Resources(context.sched_builders,
context.sched_fetchers,
context.sched_pushers)
self.resources = Resources(context.sched_builders,
context.sched_fetchers,
context.sched_pushers)
# run()
#
......@@ -150,7 +152,7 @@ class Scheduler():
self._connect_signals()
# Run the queues
self._schedule_queue_jobs()
self._sched()
self.loop.run_forever()
self.loop.close()
......@@ -240,12 +242,14 @@ class Scheduler():
# status (JobStatus): The status of the completed job
#
def job_completed(self, job, status):
self._resources.clear_job_resources(job)
self.active_jobs.remove(job)
if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
self._exclusive_active.remove(job.action_name)
# Remove from the active jobs list
self._active_jobs.remove(job)
# Scheduler owner facing callback
self._job_complete_callback(job, status)
self._schedule_queue_jobs()
# Now check for more jobs
self._sched()
# check_cache_size():
......@@ -255,78 +259,104 @@ class Scheduler():
# if needed.
#
def check_cache_size(self):
job = CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE,
'cache_size/cache_size',
resources=[ResourceType.CACHE,
ResourceType.PROCESS],
complete_cb=self._run_cleanup)
self._schedule_jobs([job])
# Here we assume we are called in response to a job
# completion callback, or before entering the scheduler.
#
# As such there is no need to call `_sched()` from here,
# and we prefer to run it once at the last moment.
#
self._cache_size_scheduled = True
#######################################################
# Local Private Methods #
#######################################################
# _sched()
# _spawn_job()
#
# The main driving function of the scheduler, it will be called
# automatically when Scheduler.run() is called initially,
# Spanws a job
#
def _sched(self):
for job in self.waiting_jobs:
self._resources.reserve_exclusive_resources(job)
# Args:
# job (Job): The job to spawn
#
def _spawn_job(self, job):
job.spawn()
self._active_jobs.append(job)
if self._job_start_callback:
self._job_start_callback(job)
for job in self.waiting_jobs:
if not self._resources.reserve_job_resources(job):
continue
# Callback for the cache size job
def _cache_size_job_complete(self, status, cache_size):
# Postpone these jobs if one is already running
if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS and \
job.action_name in self._exclusive_active:
continue
# Deallocate cache size job resources
self._cache_size_running = None
self.resources.release([ResourceType.CACHE, ResourceType.PROCESS])
job.spawn()
self.waiting_jobs.remove(job)
self.active_jobs.append(job)
# Schedule a cleanup job if we've hit the threshold
if status != JobStatus.OK:
return
if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
self._exclusive_waiting.remove(job.action_name)
self._exclusive_active.add(job.action_name)
context = self.context
artifacts = context.artifactcache
if self._job_start_callback:
self._job_start_callback(job)
if artifacts.has_quota_exceeded():
self._cleanup_scheduled = True
# If nothings ticking, time to bail out
if not self.active_jobs and not self.waiting_jobs:
self.loop.stop()
# Callback for the cleanup job
def _cleanup_job_complete(self, status, cache_size):
# _schedule_jobs()
#
# The main entry point for jobs to be scheduled.
#
# This is called either as a result of scanning the queues
# in _schedule_queue_jobs(), or directly by the Scheduler
# to insert special jobs like cleanups.
# Deallocate cleanup job resources
self._cleanup_running = None
self.resources.release([ResourceType.CACHE, ResourceType.PROCESS])
# Unregister the exclusive interest when we're done with it
if not self._cleanup_scheduled:
self.resources.unregister_exclusive_interest(
[ResourceType.CACHE], 'cache-cleanup'
)
# _sched_cleanup_job()
#
# Args:
# jobs ([Job]): A list of jobs to schedule
# Runs a cleanup job if one is scheduled to run now and
# sufficient recources are available.
#
def _schedule_jobs(self, jobs):
for job in jobs:
def _sched_cleanup_job(self):
# Special treatment of our redundant exclusive jobs
#
if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
if self._cleanup_scheduled and self._cleanup_running is None:
# Ensure we have an exclusive interest in the resources
self.resources.register_exclusive_interest(
[ResourceType.CACHE], 'cache-cleanup'
)
if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS],
[ResourceType.CACHE]):
# Drop the job if one is already queued
if job.action_name in self._exclusive_waiting:
continue
# Update state and launch
self._cleanup_scheduled = False
self._cleanup_running = \
CleanupJob(self, _ACTION_NAME_CLEANUP, 'cleanup/cleanup',
complete_cb=self._cleanup_job_complete)
self._spawn_job(self._cleanup_running)
# Mark this action type as queued
self._exclusive_waiting.add(job.action_name)
# _sched_cache_size_job()
#
# Runs a cache size job if one is scheduled to run now and
# sufficient recources are available.
#
def _sched_cache_size_job(self):
if self._cache_size_scheduled and not self._cache_size_running:
self.waiting_jobs.append(job)
if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS]):
self._cache_size_scheduled = False
self._cache_size_running = \
CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE,
'cache_size/cache_size',
complete_cb=self._cache_size_job_complete)
self._spawn_job(self._cache_size_running)
# _schedule_queue_jobs()
# _sched_queue_jobs()
#
# Ask the queues what jobs they want to schedule and schedule
# them. This is done here so we can ask for new jobs when jobs
......@@ -335,7 +365,7 @@ class Scheduler():
# This will process the Queues, pull elements through the Queues
# and process anything that is ready.
#
def _schedule_queue_jobs(self):
def _sched_queue_jobs(self):
ready = []
process_queues = True
......@@ -344,10 +374,7 @@ class Scheduler():
# Pull elements forward through queues
elements = []
for queue in self.queues:
# Enqueue elements complete from the last queue
queue.enqueue(elements)
# Dequeue processed elements for the next queue
elements = list(queue.dequeue())
# Kickoff whatever processes can be processed at this time
......@@ -362,41 +389,51 @@ class Scheduler():
# thus need all the pulls to complete before ever starting
# a build
ready.extend(chain.from_iterable(
queue.pop_ready_jobs() for queue in reversed(self.queues)
q.harvest_jobs() for q in reversed(self.queues)
))
# pop_ready_jobs() may have skipped jobs, adding them to
# the done_queue. Pull these skipped elements forward to
# the next queue and process them.
# harvest_jobs() may have decided to skip some jobs, making
# them eligible for promotion to the next queue as a side effect.
#
# If that happens, do another round.
process_queues = any(q.dequeue_ready() for q in self.queues)
self._schedule_jobs(ready)
self._sched()
# Spawn the jobs
#
for job in ready:
self._spawn_job(job)
# _run_cleanup()
#
# Schedules the cache cleanup job if the passed size
# exceeds the cache quota.
# _sched()
#
# Args:
# cache_size (int): The calculated cache size (ignored)
# Run any jobs which are ready to run, or quit the main loop
# when nothing is running or is ready to run.
#
# NOTE: This runs in response to completion of the cache size
# calculation job lauched by Scheduler.check_cache_size(),
# which will report the calculated cache size.
# This is the main driving function of the scheduler, it is called
# initially when we enter Scheduler.run(), and at the end of whenever
# any job completes, after any bussiness logic has occurred and before
# going back to sleep.
#
def _run_cleanup(self, cache_size):
context = self.context
artifacts = context.artifactcache
def _sched(self):
if not artifacts.has_quota_exceeded():
return
if not self.terminated:
#
# Try the cache management jobs
#
self._sched_cleanup_job()
self._sched_cache_size_job()
#
# Run as many jobs as the queues can handle for the
# available resources
#
self._sched_queue_jobs()
job = CleanupJob(self, _ACTION_NAME_CLEANUP, 'cleanup/cleanup',
resources=[ResourceType.CACHE,
ResourceType.PROCESS],
exclusive_resources=[ResourceType.CACHE])
self._schedule_jobs([job])
#
# If nothing is ticking then bail out
#
if not self._active_jobs:
self.loop.stop()
# _suspend_jobs()
#
......@@ -406,7 +443,7 @@ class Scheduler():
if not self.suspended:
self._suspendtime = datetime.datetime.now()
self.suspended = True
for job in self.active_jobs:
for job in self._active_jobs:
job.suspend()
# _resume_jobs()
......@@ -415,7 +452,7 @@ class Scheduler():
#
def _resume_jobs(self):
if self.suspended:
for job in self.active_jobs:
for job in self._active_jobs:
job.resume()
self.suspended = False
self._starttime += (datetime.datetime.now() - self._suspendtime)
......@@ -488,19 +525,16 @@ class Scheduler():
wait_limit = 20.0
# First tell all jobs to terminate
for job in self.active_jobs:
for job in self._active_jobs:
job.terminate()
# Now wait for them to really terminate
for job in self.active_jobs:
for job in self._active_jobs:
elapsed = datetime.datetime.now() - wait_start
timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
if not job.terminate_wait(timeout):
job.kill()
# Clear out the waiting jobs
self.waiting_jobs = []
# Regular timeout for driving status in the UI
def _tick(self):
elapsed = self.elapsed_time()
......
......@@ -86,10 +86,11 @@ class SandboxCommandError(SandboxError):
Args:
message (str): The error message to report to the user
detail (str): The detailed error string
collect (str): An optional directory containing partial install contents
"""
def __init__(self, message, *, collect=None):
super().__init__(message, reason='command-failed')
def __init__(self, message, *, detail=None, collect=None):
super().__init__(message, detail=detail, reason='command-failed')
self.collect = collect
......@@ -599,8 +600,8 @@ class _SandboxBatch():
if exitcode != 0:
cmdline = ' '.join(shlex.quote(cmd) for cmd in command.command)
label = command.label or cmdline
raise SandboxCommandError("Command '{}' failed with exitcode {}".format(label, exitcode),
collect=self.collect)
raise SandboxCommandError("Command failed with exitcode {}".format(exitcode),
detail=label, collect=self.collect)
def execute_call(self, call):
call.callback()
......
......@@ -12,7 +12,21 @@ DATA_DIR = os.path.join(
)
def create_element(repo, name, path, dependencies, ref=None):
# create_element()
#
# Args:
# project (str): The project directory where testing is happening
# name (str): The element name to create
# dependencies (list): The list of dependencies to dump into YAML format
#
# Returns:
# (Repo): The corresponding git repository created for the element
def create_element(project, name, dependencies):
dev_files_path = os.path.join(project, 'files', 'dev-files')
element_path = os.path.join(project, 'elements')
repo = create_repo('git', project, "{}-repo".format(name))
ref = repo.create(dev_files_path)
element = {
'kind': 'import',
'sources': [
......@@ -20,7 +34,9 @@ def create_element(repo, name, path, dependencies, ref=None):
],
'depends': dependencies
}
_yaml.dump(element, os.path.join(path, name))
_yaml.dump(element, os.path.join(element_path, name))
return repo
# This tests a variety of scenarios and checks that the order in
......@@ -59,18 +75,6 @@ def create_element(repo, name, path, dependencies, ref=None):
@pytest.mark.parametrize("operation", [('show'), ('fetch'), ('build')])
def test_order(cli, datafiles, tmpdir, operation, target, template, expected):
project = os.path.join(datafiles.dirname, datafiles.basename)
dev_files_path = os.path.join(project, 'files', 'dev-files')
element_path = os.path.join(project, 'elements')
# FIXME: Remove this when the test passes reliably.
#
# There is no reason why the order should not
# be preserved when the builders is set to 1,
# the scheduler queue processing still seems to
# be losing the order.
#
if operation == 'build':
pytest.skip("FIXME: This still only sometimes passes")
# Configure to only allow one fetcher at a time, make it easy to
# determine what is being planned in what order.
......@@ -84,11 +88,8 @@ def test_order(cli, datafiles, tmpdir, operation, target, template, expected):
# Build the project from the template, make import elements
# all with the same repo
#
repo = create_repo('git', str(tmpdir))
ref = repo.create(dev_files_path)
for element, dependencies in template.items():
create_element(repo, element, element_path, dependencies, ref=ref)
repo.add_commit()
create_element(project, element, dependencies)
# Run test and collect results
if operation == 'show':
......