Skip to content
Snippets Groups Projects
Commit 027040e0 authored by Martin Blanchard's avatar Martin Blanchard
Browse files

job.py: Add an integer priority attribute

parent d4735838
No related branches found
No related tags found
No related merge requests found
...@@ -28,10 +28,11 @@ from buildgrid._protos.google.rpc import code_pb2 ...@@ -28,10 +28,11 @@ from buildgrid._protos.google.rpc import code_pb2
class Job: class Job:
def __init__(self, action, action_digest): def __init__(self, action, action_digest, priority=0):
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self._name = str(uuid.uuid4()) self._name = str(uuid.uuid4())
self._priority = priority
self._action = remote_execution_pb2.Action() self._action = remote_execution_pb2.Action()
self._operation = operations_pb2.Operation() self._operation = operations_pb2.Operation()
self._lease = None self._lease = None
...@@ -56,10 +57,22 @@ class Job: ...@@ -56,10 +57,22 @@ class Job:
self._operation.done = False self._operation.done = False
self._n_tries = 0 self._n_tries = 0
def __eq__(self, other):
if isinstance(other, Job):
return self.name == other.name
return False
def __ne__(self, other):
return not self.__eq__(other)
@property @property
def name(self): def name(self):
return self._name return self._name
@property
def priority(self):
return self._priority
@property @property
def do_not_cache(self): def do_not_cache(self):
return self._do_not_cache return self._do_not_cache
......
...@@ -33,7 +33,7 @@ class Scheduler: ...@@ -33,7 +33,7 @@ class Scheduler:
def __init__(self, action_cache=None): def __init__(self, action_cache=None):
self._action_cache = action_cache self._action_cache = action_cache
self.jobs = {} self.jobs = {}
self.queue = deque() self.__queue = deque()
def register_client(self, job_name, queue): def register_client(self, job_name, queue):
self.jobs[job_name].register_client(queue) self.jobs[job_name].register_client(queue)
...@@ -45,6 +45,18 @@ class Scheduler: ...@@ -45,6 +45,18 @@ class Scheduler:
del self.jobs[job_name] del self.jobs[job_name]
def queue_job(self, job, skip_cache_lookup=False): def queue_job(self, job, skip_cache_lookup=False):
def __queue_job(queue, new_job):
index = 0
for queued_job in reversed(queue):
if new_job.priority < queued_job.priority:
index += 1
else:
break
index = len(queue) - index
queue.insert(index, new_job)
self.jobs[job.name] = job self.jobs[job.name] = job
operation_stage = None operation_stage = None
...@@ -53,7 +65,7 @@ class Scheduler: ...@@ -53,7 +65,7 @@ class Scheduler:
action_result = self._action_cache.get_action_result(job.action_digest) action_result = self._action_cache.get_action_result(job.action_digest)
except NotFoundError: except NotFoundError:
operation_stage = OperationStage.QUEUED operation_stage = OperationStage.QUEUED
self.queue.append(job) __queue_job(self.__queue, job)
else: else:
job.set_cached_result(action_result) job.set_cached_result(action_result)
...@@ -61,7 +73,7 @@ class Scheduler: ...@@ -61,7 +73,7 @@ class Scheduler:
else: else:
operation_stage = OperationStage.QUEUED operation_stage = OperationStage.QUEUED
self.queue.append(job) __queue_job(self.__queue, job)
job.update_operation_stage(operation_stage) job.update_operation_stage(operation_stage)
...@@ -74,7 +86,7 @@ class Scheduler: ...@@ -74,7 +86,7 @@ class Scheduler:
# TODO: Mark these jobs as done # TODO: Mark these jobs as done
else: else:
job.update_operation_stage(OperationStage.QUEUED) job.update_operation_stage(OperationStage.QUEUED)
self.queue.appendleft(job) self.__queue.appendleft(job)
def list_jobs(self): def list_jobs(self):
return self.jobs.values() return self.jobs.values()
...@@ -87,10 +99,10 @@ class Scheduler: ...@@ -87,10 +99,10 @@ class Scheduler:
worker properties, configuration and state at the time of the worker properties, configuration and state at the time of the
request. request.
""" """
if not self.queue: if not self.__queue:
return [] return []
job = self.queue.popleft() job = self.__queue.popleft()
# For now, one lease at a time: # For now, one lease at a time:
lease = job.create_lease() lease = job.create_lease()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment