From 027040e089fdebe5263eb62503d589569b04caf1 Mon Sep 17 00:00:00 2001 From: Martin Blanchard <martin.blanchard@codethink.co.uk> Date: Tue, 30 Oct 2018 09:47:53 +0000 Subject: [PATCH] job.py: Add an integer priority attribute --- buildgrid/server/job.py | 15 ++++++++++++++- buildgrid/server/scheduler.py | 24 ++++++++++++++++++------ 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/buildgrid/server/job.py b/buildgrid/server/job.py index 7905f0163..79426ef99 100644 --- a/buildgrid/server/job.py +++ b/buildgrid/server/job.py @@ -28,10 +28,11 @@ from buildgrid._protos.google.rpc import code_pb2 class Job: - def __init__(self, action, action_digest): + def __init__(self, action, action_digest, priority=0): self.logger = logging.getLogger(__name__) self._name = str(uuid.uuid4()) + self._priority = priority self._action = remote_execution_pb2.Action() self._operation = operations_pb2.Operation() self._lease = None @@ -56,10 +57,22 @@ class Job: self._operation.done = False self._n_tries = 0 + def __eq__(self, other): + if isinstance(other, Job): + return self.name == other.name + return False + + def __ne__(self, other): + return not self.__eq__(other) + @property def name(self): return self._name + @property + def priority(self): + return self._priority + @property def do_not_cache(self): return self._do_not_cache diff --git a/buildgrid/server/scheduler.py b/buildgrid/server/scheduler.py index d479f14af..4f7cced7e 100644 --- a/buildgrid/server/scheduler.py +++ b/buildgrid/server/scheduler.py @@ -33,7 +33,7 @@ class Scheduler: def __init__(self, action_cache=None): self._action_cache = action_cache self.jobs = {} - self.queue = deque() + self.__queue = deque() def register_client(self, job_name, queue): self.jobs[job_name].register_client(queue) @@ -45,6 +45,18 @@ class Scheduler: del self.jobs[job_name] def queue_job(self, job, skip_cache_lookup=False): + def __queue_job(queue, new_job): + index = 0 + for queued_job in reversed(queue): + if new_job.priority < queued_job.priority: + index += 1 + else: + break + + index = len(queue) - index + + queue.insert(index, new_job) + self.jobs[job.name] = job operation_stage = None @@ -53,7 +65,7 @@ class Scheduler: action_result = self._action_cache.get_action_result(job.action_digest) except NotFoundError: operation_stage = OperationStage.QUEUED - self.queue.append(job) + __queue_job(self.__queue, job) else: job.set_cached_result(action_result) @@ -61,7 +73,7 @@ class Scheduler: else: operation_stage = OperationStage.QUEUED - self.queue.append(job) + __queue_job(self.__queue, job) job.update_operation_stage(operation_stage) @@ -74,7 +86,7 @@ class Scheduler: # TODO: Mark these jobs as done else: job.update_operation_stage(OperationStage.QUEUED) - self.queue.appendleft(job) + self.__queue.appendleft(job) def list_jobs(self): return self.jobs.values() @@ -87,10 +99,10 @@ class Scheduler: worker properties, configuration and state at the time of the request. """ - if not self.queue: + if not self.__queue: return [] - job = self.queue.popleft() + job = self.__queue.popleft() # For now, one lease at a time: lease = job.create_lease() -- GitLab