Skip to content
Snippets Groups Projects
Commit 4c9fdcd5 authored by Martin Blanchard's avatar Martin Blanchard
Browse files

scheduler.py: Clean-up API and improve docsrings

parent 6964da31
No related branches found
No related tags found
No related merge requests found
......@@ -123,7 +123,7 @@ class BotsInterface:
# Job does not exist, remove from bot.
return None
self._scheduler.update_job_lease(lease.id, lease)
self._scheduler.update_job_lease_state(lease.id, lease)
if lease_state == LeaseState.COMPLETED:
return None
......@@ -161,7 +161,7 @@ class BotsInterface:
self.__logger.error("Assigned lease id=[%s],"
" not found on bot with name=[%s] and id=[%s]."
" Retrying job", lease_id, bot_session.name, bot_session.bot_id)
self._scheduler.retry_job(lease_id)
self._scheduler.retry_job_lease(lease_id)
def _close_bot_session(self, name):
""" Before removing the session, close any leases and
......@@ -174,7 +174,7 @@ class BotsInterface:
self.__logger.debug("Attempting to close [%s] with name: [%s]", bot_id, name)
for lease_id in self._assigned_leases[name]:
self._scheduler.retry_job(lease_id)
self._scheduler.retry_job_lease(lease_id)
self._assigned_leases.pop(name)
self.__logger.debug("Closing bot session: [%s]", name)
......
......@@ -54,12 +54,12 @@ class ExecutionInstance:
if not action:
raise FailedPreconditionError("Could not get action from storage.")
return self._scheduler.queue_job(action, action_digest, skip_cache_lookup)
return self._scheduler.queue_job_operation(action, action_digest, skip_cache_lookup)
def register_operation_client(self, operation_name, peer, message_queue):
try:
return self._scheduler.register_operation_client(operation_name,
peer, message_queue)
return self._scheduler.register_job_operation_client(operation_name,
peer, message_queue)
except NotFoundError:
raise InvalidArgumentError("Operation name does not exist: [{}]"
......@@ -67,7 +67,7 @@ class ExecutionInstance:
def unregister_operation_client(self, operation_name, peer):
try:
self._scheduler.unregister_operation_client(operation_name, peer)
self._scheduler.unregister_job_operation_client(operation_name, peer)
except NotFoundError:
raise InvalidArgumentError("Operation name does not exist: [{}]"
......
......@@ -54,7 +54,7 @@ class OperationsInstance:
response = operations_pb2.ListOperationsResponse()
operations = []
for job_name in self._scheduler.list_jobs():
for job_name in self._scheduler.list_current_jobs():
operation = self._scheduler.get_job_operation(job_name)
operations.append(operation)
......
......@@ -56,7 +56,13 @@ class Scheduler:
# --- Public API ---
def register_operation_client(self, operation_name, peer, message_queue):
def list_current_jobs(self):
"""Returns a list of the :class:`Job` objects currently managed."""
return self.__jobs_by_name.keys()
# --- Public API: REAPI ---
def register_job_operation_client(self, operation_name, peer, message_queue):
"""Subscribes to one of the job's :class:`Operation` stage changes.
Args:
......@@ -79,7 +85,7 @@ class Scheduler:
return job.register_operation_client(peer, message_queue)
def unregister_operation_client(self, operation_name, peer):
def unregister_job_operation_client(self, operation_name, peer):
"""Unsubscribes to one of the job's :class:`Operation` stage change.
Args:
......@@ -101,7 +107,7 @@ class Scheduler:
if job.n_clients == 0 and job.done:
self._delete_job(job.name)
def queue_job(self, action, action_digest, priority=0, skip_cache_lookup=False):
def queue_job_operation(self, action, action_digest, priority=0, skip_cache_lookup=False):
"""Inserts a newly created job into the execution queue.
Args:
......@@ -168,28 +174,62 @@ class Scheduler:
return job.name
def retry_job(self, job_name):
def get_job_operation(self, operation_name):
"""Retrieves a job's :class:`Operation` by name.
Args:
operation_name (str): name of the operation to query.
Raises:
NotFoundError: If no operation with `operation_name` exists.
"""
try:
job = self.__jobs_by_name[job_name]
job = self.__jobs_by_name[operation_name]
except KeyError:
raise NotFoundError("Job name does not exist: [{}]".format(job_name))
raise NotFoundError("Operation name does not exist: [{}]"
.format(operation_name))
operation_stage = None
if job.n_tries >= self.MAX_N_TRIES:
# TODO: Decide what to do with these jobs
operation_stage = OperationStage.COMPLETED
# TODO: Mark these jobs as done
return job.get_operation()
else:
operation_stage = OperationStage.QUEUED
job.update_lease_state(LeaseState.PENDING)
self.__queue.append(job)
def cancel_job_operation(self, operation_name):
""""Cancels a job's :class:`Operation` by name.
self._update_job_operation_stage(job.name, operation_stage)
Args:
operation_name (str): name of the operation to cancel.
def list_jobs(self):
return self.__jobs_by_name.keys()
Raises:
NotFoundError: If no operation with `operation_name` exists.
"""
try:
job = self.__jobs_by_name[operation_name]
except KeyError:
raise NotFoundError("Operation name does not exist: [{}]"
.format(operation_name))
job.cancel_operation()
def delete_job_operation(self, operation_name):
""""Removes a job.
Args:
operation_name (str): name of the operation to cancel.
Raises:
NotFoundError: If no operation with `operation_name` exists.
"""
try:
job = self.__jobs_by_name[operation_name]
except KeyError:
raise NotFoundError("Operation name does not exist: [{}]"
.format(operation_name))
if job.n_clients == 0 and (job.done or job.lease is None):
self._delete_job(job.name)
# --- Public API: RWAPI ---
def request_job_leases(self, worker_capabilities):
"""Generates a list of the highest priority leases to be run.
......@@ -215,9 +255,12 @@ class Scheduler:
return None
def update_job_lease(self, job_name, lease):
def update_job_lease_state(self, job_name, lease):
"""Requests a state transition for a job's current :class:Lease.
Note:
This may trigger a job's :class:`Operation` stage transition.
Args:
job_name (str): name of the job to query.
lease (Lease): the lease holding the new state.
......@@ -268,8 +311,11 @@ class Scheduler:
self._update_job_operation_stage(job_name, operation_stage)
def get_job_lease(self, job_name):
"""Returns the lease associated to job, if any have been emitted yet.
def retry_job_lease(self, job_name):
"""Re-queues a job on lease execution failure.
Note:
This may trigger a job's :class:`Operation` stage transition.
Args:
job_name (str): name of the job to query.
......@@ -283,10 +329,21 @@ class Scheduler:
except KeyError:
raise NotFoundError("Job name does not exist: [{}]".format(job_name))
return job.lease
operation_stage = None
if job.n_tries >= self.MAX_N_TRIES:
# TODO: Decide what to do with these jobs
operation_stage = OperationStage.COMPLETED
# TODO: Mark these jobs as done
def get_job_lease_cancelled(self, job_name):
"""Returns true if the lease is cancelled.
else:
operation_stage = OperationStage.QUEUED
job.update_lease_state(LeaseState.PENDING)
self.__queue.append(job)
self._update_job_operation_stage(job_name, operation_stage)
def get_job_lease(self, job_name):
"""Returns the lease associated to job, if any have been emitted yet.
Args:
job_name (str): name of the job to query.
......@@ -300,10 +357,10 @@ class Scheduler:
except KeyError:
raise NotFoundError("Job name does not exist: [{}]".format(job_name))
return job.lease_cancelled
return job.lease
def get_job_operation(self, job_name):
"""Returns the operation associated to job.
def get_job_lease_cancelled(self, job_name):
"""Returns true if the lease is cancelled.
Args:
job_name (str): name of the job to query.
......@@ -317,38 +374,7 @@ class Scheduler:
except KeyError:
raise NotFoundError("Job name does not exist: [{}]".format(job_name))
return job.get_operation()
def cancel_job_operation(self, job_name):
""""Cancels the underlying operation of a given job.
This will also cancel any job's lease that may have been issued.
Args:
job_name (str): name of the job holding the operation to cancel.
"""
try:
job = self.__jobs_by_name[job_name]
except KeyError:
raise NotFoundError("Job name does not exist: [{}]".format(job_name))
job.cancel_operation()
def delete_job_operation(self, job_name):
""""Removes a job.
Args:
job_name (str): name of the job to delete.
"""
try:
job = self.__jobs_by_name[job_name]
except KeyError:
raise NotFoundError("Job name does not exist: [{}]".format(job_name))
if job.n_clients == 0 and (job.done or job.lease is None):
self._delete_job(job.name)
return job.lease_cancelled
# --- Public API: Monitoring ---
......
......@@ -162,4 +162,4 @@ def _inject_work(scheduler, action=None, action_digest=None):
if not action_digest:
action_digest = remote_execution_pb2.Digest()
scheduler.queue_job(action, action_digest, skip_cache_lookup=True)
scheduler.queue_job_operation(action, action_digest, skip_cache_lookup=True)
......@@ -106,9 +106,9 @@ def test_no_action_digest_in_storage(instance, context):
def test_wait_execution(instance, controller, context):
job_name = controller.execution_instance._scheduler.queue_job(action,
action_digest,
skip_cache_lookup=True)
job_name = controller.execution_instance._scheduler.queue_job_operation(action,
action_digest,
skip_cache_lookup=True)
controller.execution_instance._scheduler._update_job_operation_stage(job_name,
OperationStage.COMPLETED)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment