Skip to content
Snippets Groups Projects
Commit ae0a9e47 authored by Martin Blanchard's avatar Martin Blanchard
Browse files

Allow discarding job's leases

parent c0795f6d
No related branches found
No related tags found
No related merge requests found
......@@ -222,6 +222,13 @@ class Job:
if self._lease is not None:
self.update_lease_state(LeaseState.CANCELLED)
def delete_lease(self):
"""Discard the job's :class:Lease."""
self.__worker_start_timestamp.Clear()
self.__worker_completed_timestamp.Clear()
self._lease = None
def update_operation_stage(self, stage):
"""Operates a stage transition for the job's :class:Operation.
......
......@@ -63,17 +63,7 @@ class Scheduler:
job.unregister_client(queue)
if not job.n_clients and job.operation.done and not job.lease:
del self.jobs[job_name]
if self._is_instrumented:
self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job_name)
self.__operations_by_stage[OperationStage.QUEUED].discard(job_name)
self.__operations_by_stage[OperationStage.EXECUTING].discard(job_name)
self.__operations_by_stage[OperationStage.COMPLETED].discard(job_name)
self.__leases_by_state[LeaseState.PENDING].discard(job_name)
self.__leases_by_state[LeaseState.ACTIVE].discard(job_name)
self.__leases_by_state[LeaseState.COMPLETED].discard(job_name)
self._delete_job(job.name)
def queue_job(self, job, skip_cache_lookup=False):
self.jobs[job.name] = job
......@@ -199,6 +189,15 @@ class Scheduler:
"""Returns true if the lease is cancelled"""
return self.jobs[job_name].lease_cancelled
def delete_job_lease(self, job_name):
"""Discards the lease associated to a job."""
job = self.jobs[job_name]
self.jobs[job.name].delete_lease()
if not job.n_clients and job.operation.done:
self._delete_job(job.name)
def get_job_operation(self, job_name):
"""Returns the operation associated to job."""
return self.jobs[job_name].operation
......@@ -296,6 +295,20 @@ class Scheduler:
# --- Private API ---
def _delete_job(self, job_name):
"""Drops an entry from the internal list of jobs."""
del self.jobs[job_name]
if self._is_instrumented:
self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job_name)
self.__operations_by_stage[OperationStage.QUEUED].discard(job_name)
self.__operations_by_stage[OperationStage.EXECUTING].discard(job_name)
self.__operations_by_stage[OperationStage.COMPLETED].discard(job_name)
self.__leases_by_state[LeaseState.PENDING].discard(job_name)
self.__leases_by_state[LeaseState.ACTIVE].discard(job_name)
self.__leases_by_state[LeaseState.COMPLETED].discard(job_name)
def _update_job_operation_stage(self, job_name, operation_stage):
"""Requests a stage transition for the job's :class:Operations.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment