Commit 6991d6d6 authored by Tristan Van Berkom's avatar Tristan Van Berkom

_scheduler: Refactor of queues and resources.

This branch makes the following changes:

  * jobs/job.py: No longer stores any interested resource list

    Jobs are ephemeral again, they only ever exist while they
    are running.

  * queues/queue.py: Revert to only handling lists of elements

    Elements pass through the queues, Queue.harvest_jobs()
    replaces Queue.pop_ready_jobs() and now the Queue stops
    creating jobs as soon as there are not enough resources
    for the job.

    Also removed unused `prepare()` abstract method.

  * queues/buildqueue.py: Adapt the part where we launch a job

    This part needs to be reworked anyway, just touch it up for
    now so that it doesnt break with the surrounding changes.

  * jobs/{cachesize,cleanup}job.py: Expose uniform complete callback

    Allows the scheduler to manage resource deallocation for these
    two job completions as a custom thing, at the same phase
    that the Queues take care of their own resource deallocation.

  * resources.py: No longer has knowledge of the job

    Since jobs are ephemeral, they are not a suitable place
    to store the resource identifiers, these must be provided
    by the callers wherever needed.

    Now the main Resources object is owned by the Scheduler
    but shared with Queues, each take care of managing the
    resources of the jobs they create through the same
    resource API.

  * scheduler.py: Reverted to only creating jobs on demand

    This changes the flow of the scheduler such that whenever
    jobs complete, the queues are interrogated for as many
    jobs which can run at the moment but not more; and this
    completely removes the waiting list.

    For the internal cache management jobs, we handle this
    with a little state instead of having a waiting list
    and only launch when the resources permit it.

By abolishing the scheduler waiting list and creating jobs
on demand, we fix the order of element processing and consequently
fix issue #712.
parent e385660c
......@@ -34,8 +34,8 @@ class CacheSizeJob(Job):
if status == JobStatus.OK:
self._artifacts.set_cache_size(result)
if self._complete_cb:
self._complete_cb(result)
if self._complete_cb:
self._complete_cb(status, result)
def child_process_data(self):
return {}
......@@ -20,8 +20,9 @@ from .job import Job, JobStatus
class CleanupJob(Job):
def __init__(self, *args, **kwargs):
def __init__(self, *args, complete_cb, **kwargs):
super().__init__(*args, **kwargs)
self._complete_cb = complete_cb
context = self._scheduler.context
self._artifacts = context.artifactcache
......@@ -32,3 +33,6 @@ class CleanupJob(Job):
def parent_complete(self, status, result):
if status == JobStatus.OK:
self._artifacts.set_cache_size(result)
if self._complete_cb:
self._complete_cb(status, result)
......@@ -85,28 +85,11 @@ class Process(multiprocessing.Process):
# action_name (str): The queue action name
# logfile (str): A template string that points to the logfile
# that should be used - should contain {pid}.
# resources (iter(ResourceType)) - A set of resources this job
# wants to use.
# exclusive_resources (iter(ResourceType)) - A set of resources
# this job wants to use
# exclusively.
# max_retries (int): The maximum number of retries
#
class Job():
def __init__(self, scheduler, action_name, logfile, *,
resources=None, exclusive_resources=None, max_retries=0):
if resources is None:
resources = set()
else:
resources = set(resources)
if exclusive_resources is None:
exclusive_resources = set()
else:
exclusive_resources = set(resources)
assert exclusive_resources <= resources, "All exclusive resources must also be resources!"
def __init__(self, scheduler, action_name, logfile, *, max_retries=0):
#
# Public members
......@@ -114,12 +97,6 @@ class Job():
self.action_name = action_name # The action name for the Queue
self.child_data = None # Data to be sent to the main process
# The resources this job wants to access
self.resources = resources
# Resources this job needs to access exclusively, i.e., no
# other job should be allowed to access them
self.exclusive_resources = exclusive_resources
#
# Private members
#
......
......@@ -57,11 +57,10 @@ class BuildQueue(Queue):
logfile=logfile)
job = ElementJob(self._scheduler, self.action_name,
logfile, element=element, queue=self,
resources=self.resources,
action_cb=self.process,
complete_cb=self._job_done,
max_retries=self._max_retries)
self._done_queue.append(job)
self._done_queue.append(element)
self.failed_elements.append(element)
self._scheduler._job_complete_callback(job, False)
......
......@@ -72,8 +72,9 @@ class Queue():
# Private members
#
self._scheduler = scheduler
self._wait_queue = deque()
self._done_queue = deque()
self._resources = scheduler.resources # Shared resource pool
self._wait_queue = deque() # Ready / Waiting elements
self._done_queue = deque() # Processed / Skipped elements
self._max_retries = 0
# Assert the subclass has setup class data
......@@ -115,16 +116,6 @@ class Queue():
def status(self, element):
return QueueStatus.READY
# prepare()
#
# Abstract method for handling job preparation in the main process.
#
# Args:
# element (Element): The element which is scheduled
#
def prepare(self, element):
pass
# done()
#
# Abstract method for handling a successful job completion.
......@@ -153,26 +144,18 @@ class Queue():
if not elts:
return
# Note: The internal lists work with jobs. This is not
# reflected in any external methods (except
# pop/peek_ready_jobs).
def create_job(element):
logfile = self._element_log_path(element)
return ElementJob(self._scheduler, self.action_name,
logfile, element=element, queue=self,
resources=self.resources,
action_cb=self.process,
complete_cb=self._job_done,
max_retries=self._max_retries)
# Place skipped elements directly on the done queue
jobs = [create_job(elt) for elt in elts]
skip = [job for job in jobs if self.status(job.element) == QueueStatus.SKIP]
wait = [job for job in jobs if job not in skip]
self.skipped_elements.extend([job.element for job in skip])
self._wait_queue.extend(wait)
self._done_queue.extend(skip)
# Place skipped elements on the done queue right away.
#
# The remaining ready and waiting elements must remain in the
# same queue, and ready status must be determined at the moment
# which the scheduler is asking for the next job.
#
skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP]
wait = [elt for elt in elts if elt not in skip]
self.skipped_elements.extend(skip) # Public record of skipped elements
self._done_queue.extend(skip) # Elements to be processed
self._wait_queue.extend(wait) # Elements eligible to be dequeued
# dequeue()
#
......@@ -184,69 +167,59 @@ class Queue():
#
def dequeue(self):
while self._done_queue:
yield self._done_queue.popleft().element
yield self._done_queue.popleft()
# dequeue_ready()
#
# Reports whether there are any elements to dequeue
# Reports whether any elements can be promoted to other queues
#
# Returns:
# (bool): Whether there are elements to dequeue
# (bool): Whether there are elements ready
#
def dequeue_ready(self):
return any(self._done_queue)
# pop_ready_jobs()
#
# Returns:
# ([Job]): A list of jobs to run
# harvest_jobs()
#
# Process elements in the queue, moving elements which were enqueued
# into the dequeue pool, and processing them if necessary.
#
# This will have different results for elements depending
# on the Queue.status() implementation.
#
# o Elements which are QueueStatus.WAIT will not be affected
# into the dequeue pool, and creating as many jobs for which resources
# can be reserved.
#
# o Elements which are QueueStatus.SKIP will move directly
# to the dequeue pool
#
# o For Elements which are QueueStatus.READY a Job will be
# created and returned to the caller, given that the scheduler
# allows the Queue enough resources for the given job
# Returns:
# ([Job]): A list of jobs which can be run now
#
def pop_ready_jobs(self):
def harvest_jobs(self):
unready = []
ready = []
while self._wait_queue:
job = self._wait_queue.popleft()
element = job.element
if not self._resources.reserve(self.resources, peek=True):
break
element = self._wait_queue.popleft()
status = self.status(element)
if status == QueueStatus.WAIT:
unready.append(job)
continue
unready.append(element)
elif status == QueueStatus.SKIP:
self._done_queue.append(job)
self._done_queue.append(element)
self.skipped_elements.append(element)
continue
self.prepare(element)
ready.append(job)
else:
reserved = self._resources.reserve(self.resources)
assert reserved
ready.append(element)
# These were not ready but were in the beginning, give em
# first priority again next time around
self._wait_queue.extendleft(unready)
return ready
def peek_ready_jobs(self):
def ready(job):
return self.status(job.element) == QueueStatus.READY
yield from (job for job in self._wait_queue if ready(job))
return [
ElementJob(self._scheduler, self.action_name,
self._element_log_path(element),
element=element, queue=self,
action_cb=self.process,
complete_cb=self._job_done,
max_retries=self._max_retries)
for element in ready
]
#####################################################
# Private Methods #
......@@ -292,6 +265,10 @@ class Queue():
#
def _job_done(self, job, element, status, result):
# Now release the resources we reserved
#
self._resources.release(self.resources)
# Update values that need to be synchronized in the main task
# before calling any queue implementation
self._update_workspaces(element, job)
......@@ -324,12 +301,8 @@ class Queue():
detail=traceback.format_exc())
self.failed_elements.append(element)
else:
#
# No exception occured in post processing
#
# All jobs get placed on the done queue for later processing.
self._done_queue.append(job)
# All elements get placed on the done queue for later processing.
self._done_queue.append(element)
# These lists are for bookkeeping purposes for the UI and logging.
if status == JobStatus.SKIPPED:
......
......@@ -34,28 +34,25 @@ class Resources():
ResourceType.UPLOAD: set()
}
def clear_job_resources(self, job):
for resource in job.exclusive_resources:
self._exclusive_resources[resource].remove(hash(job))
# reserve()
#
# Reserves a set of resources
#
# Args:
# resources (set): A set of ResourceTypes
# exclusive (set): Another set of ResourceTypes
# peek (bool): Whether to only peek at whether the resource is available
#
# Returns:
# (bool): True if the resources could be reserved
#
def reserve(self, resources, exclusive=None, *, peek=False):
if exclusive is None:
exclusive = set()
for resource in job.resources:
self._used_resources[resource] -= 1
def reserve_exclusive_resources(self, job):
exclusive = job.exclusive_resources
# The very first thing we do is to register any exclusive
# resources this job may want. Even if the job is not yet
# allowed to run (because another job is holding the resource
# it wants), we can still set this - it just means that any
# job *currently* using these resources has to finish first,
# and no new jobs wanting these can be launched (except other
# exclusive-access jobs).
#
for resource in exclusive:
self._exclusive_resources[resource].add(hash(job))
resources = set(resources)
exclusive = set(exclusive)
def reserve_job_resources(self, job):
# First, we check if the job wants to access a resource that
# another job wants exclusive access to. If so, it cannot be
# scheduled.
......@@ -68,7 +65,8 @@ class Resources():
# is currently not possible, but may be worth thinking
# about.
#
for resource in job.resources - job.exclusive_resources:
for resource in resources - exclusive:
# If our job wants this resource exclusively, we never
# check this, so we can get away with not (temporarily)
# removing it from the set.
......@@ -84,14 +82,14 @@ class Resources():
# at a time, despite being allowed to be part of the exclusive
# set.
#
for exclusive in job.exclusive_resources:
if self._used_resources[exclusive] != 0:
for resource in exclusive:
if self._used_resources[resource] != 0:
return False
# Finally, we check if we have enough of each resource
# available. If we don't have enough, the job cannot be
# scheduled.
for resource in job.resources:
for resource in resources:
if (self._max_resources[resource] > 0 and
self._used_resources[resource] >= self._max_resources[resource]):
return False
......@@ -99,7 +97,70 @@ class Resources():
# Now we register the fact that our job is using the resources
# it asked for, and tell the scheduler that it is allowed to
# continue.
for resource in job.resources:
self._used_resources[resource] += 1
if not peek:
for resource in resources:
self._used_resources[resource] += 1
return True
# release()
#
# Release resources previously reserved with Resources.reserve()
#
# Args:
# resources (set): A set of resources to release
#
def release(self, resources):
for resource in resources:
assert self._used_resources[resource] > 0, "Scheduler resource imbalance"
self._used_resources[resource] -= 1
# register_exclusive_interest()
#
# Inform the resources pool that `source` has an interest in
# reserving this resource exclusively.
#
# The source parameter is used to identify the caller, it
# must be ensured to be unique for the time that the
# interest is registered.
#
# This function may be called multiple times, and subsequent
# calls will simply have no effect until clear_exclusive_interest()
# is used to clear the interest.
#
# This must be called in advance of reserve()
#
# Args:
# resources (set): Set of resources to reserve exclusively
# source (any): Source identifier, to be used again when unregistering
# the interest.
#
def register_exclusive_interest(self, resources, source):
# The very first thing we do is to register any exclusive
# resources this job may want. Even if the job is not yet
# allowed to run (because another job is holding the resource
# it wants), we can still set this - it just means that any
# job *currently* using these resources has to finish first,
# and no new jobs wanting these can be launched (except other
# exclusive-access jobs).
#
for resource in resources:
self._exclusive_resources[resource].add(source)
# unregister_exclusive_interest()
#
# Clear the exclusive interest in these resources.
#
# This should be called by the given source which registered
# an exclusive interest.
#
# Args:
# resources (set): Set of resources to reserve exclusively
# source (str): Source identifier, to be used again when unregistering
# the interest.
#
def unregister_exclusive_interest(self, resources, source):
for resource in resources:
self._exclusive_resources[resource].remove(source)
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment