Skip to content
Snippets Groups Projects
Commit 939d3763 authored by Martin Blanchard's avatar Martin Blanchard
Browse files

scheduler.py: Respect priority when scheduling jobs

#75
parent a05de331
No related branches found
No related tags found
No related merge requests found
......@@ -43,7 +43,7 @@ class Scheduler:
self._action_cache = action_cache
self.jobs = {}
self.queue = deque()
self.__queue = deque()
self._is_instrumented = monitor
......@@ -76,6 +76,18 @@ class Scheduler:
self.__leases_by_state[LeaseState.COMPLETED].discard(job_name)
def queue_job(self, job, skip_cache_lookup=False):
def __queue_job(jobs_queue, new_job):
index = 0
for queued_job in reversed(jobs_queue):
if new_job.priority < queued_job.priority:
index += 1
else:
break
index = len(jobs_queue) - index
jobs_queue.insert(index, new_job)
self.jobs[job.name] = job
operation_stage = None
......@@ -84,7 +96,7 @@ class Scheduler:
action_result = self._action_cache.get_action_result(job.action_digest)
except NotFoundError:
operation_stage = OperationStage.QUEUED
self.queue.append(job)
__queue_job(self.__queue, job)
else:
job.set_cached_result(action_result)
......@@ -95,7 +107,7 @@ class Scheduler:
else:
operation_stage = OperationStage.QUEUED
self.queue.append(job)
__queue_job(self.__queue, job)
self._update_job_operation_stage(job.name, operation_stage)
......@@ -111,7 +123,7 @@ class Scheduler:
else:
operation_stage = OperationStage.QUEUED
job.update_lease_state(LeaseState.PENDING)
self.queue.append(job)
self.__queue.append(job)
self._update_job_operation_stage(job_name, operation_stage)
......@@ -126,10 +138,10 @@ class Scheduler:
worker properties, configuration and state at the time of the
request.
"""
if not self.queue:
if not self.__queue:
return []
job = self.queue.popleft()
job = self.__queue.popleft()
lease = job.lease
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment