Skip to content
Snippets Groups Projects
Commit d98f53e9 authored by Martin Blanchard's avatar Martin Blanchard
Browse files

scheduler.py: Respect priority when scheduling jobs

#75
parent c674da79
No related branches found
No related tags found
No related merge requests found
......@@ -19,7 +19,7 @@ Scheduler
Schedules jobs.
"""
from collections import deque
import bisect
from datetime import timedelta
import logging
......@@ -43,7 +43,7 @@ class Scheduler:
self._action_cache = action_cache
self.jobs = {}
self.queue = deque()
self.__queue = []
self._is_instrumented = monitor
......@@ -66,26 +66,25 @@ class Scheduler:
self._delete_job(job.name)
def queue_job(self, job, skip_cache_lookup=False):
self.jobs[job.name] = job
operation_stage = None
if self._action_cache is not None and not skip_cache_lookup:
try:
action_result = self._action_cache.get_action_result(job.action_digest)
except NotFoundError:
operation_stage = OperationStage.QUEUED
self.queue.append(job)
self._queue_job(job.name)
else:
job.set_cached_result(action_result)
operation_stage = OperationStage.COMPLETED
job.set_cached_result(action_result)
if self._is_instrumented:
self.__retries_count += 1
else:
operation_stage = OperationStage.QUEUED
self.queue.append(job)
self._queue_job(job.name)
self._update_job_operation_stage(job.name, operation_stage)
......@@ -100,8 +99,9 @@ class Scheduler:
else:
operation_stage = OperationStage.QUEUED
self._queue_job(job.name)
job.update_lease_state(LeaseState.PENDING)
self.queue.append(job)
self._update_job_operation_stage(job_name, operation_stage)
......@@ -116,10 +116,10 @@ class Scheduler:
worker properties, configuration and state at the time of the
request.
"""
if not self.queue:
if not self.__queue:
return []
job = self.queue.popleft()
job = self.__queue.pop()
lease = job.lease
......@@ -295,8 +295,23 @@ class Scheduler:
# --- Private API ---
def _queue_job(self, job_name):
"""Schedules or reschedules a job."""
job = self.jobs[job_name]
if job.operation_stage == OperationStage.QUEUED:
self.__queue.sort()
else:
bisect.insort(self.__queue, job)
def _delete_job(self, job_name):
"""Drops an entry from the internal list of jobs."""
job = self.jobs[job_name]
if job.operation_stage == OperationStage.QUEUED:
self.__queue.remove(job)
del self.jobs[job_name]
if self._is_instrumented:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment