Skip to content
Snippets Groups Projects
Commit 22acffd7 authored by Martin Blanchard's avatar Martin Blanchard
Browse files

server/scheduler.py: Calculate job counts

parent 62c87bc5
No related branches found
No related tags found
Loading
......@@ -20,24 +20,37 @@ Schedules jobs.
"""
from collections import deque
from datetime import timedelta
import logging
from buildgrid._enums import LeaseState, OperationStage
from buildgrid._exceptions import NotFoundError
from .job import OperationStage, LeaseState
class Scheduler:
MAX_N_TRIES = 5
def __init__(self, action_cache=None):
def __init__(self, action_cache=None, monitor=True):
self.__logger = logging.getLogger(__name__)
self.__queue_times_by_priority = None
self.__queue_time_average = 0, timedelta()
self.__retries_by_error = None
self.__retries_count = 0
self._action_cache = action_cache
self.jobs = {}
self.queue = deque()
self._is_monitored = monitor
if self._is_monitored:
self.__queue_times_by_priority = {}
self.__retries_by_error = {}
# --- Public API ---
def register_client(self, job_name, queue):
self.jobs[job_name].register_client(queue)
......@@ -66,19 +79,23 @@ class Scheduler:
operation_stage = OperationStage.QUEUED
self.queue.append(job)
job.update_operation_stage(operation_stage)
self._update_job_operation_stage(job, operation_stage)
def retry_job(self, job_name):
if job_name in self.jobs:
job = self.jobs[job_name]
operation_stage = None
if job.n_tries >= self.MAX_N_TRIES:
# TODO: Decide what to do with these jobs
job.update_operation_stage(OperationStage.COMPLETED)
operation_stage = OperationStage.COMPLETED
# TODO: Mark these jobs as done
else:
job.update_operation_stage(OperationStage.QUEUED)
operation_stage = OperationStage.QUEUED
self.queue.appendleft(job)
self._update_job_operation_stage(job, operation_stage)
def list_jobs(self):
return self.jobs.values()
......@@ -112,13 +129,14 @@ class Scheduler:
"""
job = self.jobs[job_name]
operation_stage = None
if lease_state == LeaseState.PENDING:
job.update_lease_state(LeaseState.PENDING)
job.update_operation_stage(OperationStage.QUEUED)
operation_stage = OperationStage.QUEUED
elif lease_state == LeaseState.ACTIVE:
job.update_lease_state(LeaseState.ACTIVE)
job.update_operation_stage(OperationStage.EXECUTING)
operation_stage = OperationStage.EXECUTING
elif lease_state == LeaseState.COMPLETED:
job.update_lease_state(LeaseState.COMPLETED,
......@@ -127,7 +145,9 @@ class Scheduler:
if self._action_cache is not None and not job.do_not_cache:
self._action_cache.update_action_result(job.action_digest, job.action_result)
job.update_operation_stage(OperationStage.COMPLETED)
operation_stage = OperationStage.COMPLETED
self._update_job_operation_stage(job, operation_stage)
def get_job_lease(self, job_name):
"""Returns the lease associated to job, if any have been emitted yet."""
......@@ -136,3 +156,60 @@ class Scheduler:
def get_job_operation(self, job_name):
"""Returns the operation associated to job."""
return self.jobs[job_name].operation
# --- Public API: Monitoring ---
@property
def is_monitored(self):
return self._is_monitored
def query_n_jobs(self):
return len(self.jobs)
def query_n_operations(self):
return len(self.jobs)
def query_n_operations_by_stage(self):
return len(self.jobs)
def query_n_leases(self):
return len(self.jobs)
def query_n_leases_by_state(self):
return len(self.jobs)
def query_n_retries(self):
return self.__retries_count
def query_n_retries_for_error(self, error_type):
try:
return self.__retries_by_error[error_type]
except KeyError:
return 0
def query_am_queue_time(self):
return self.__queue_time_average[1]
def query_am_queue_time_for_priority(self, priority_level):
try:
return self.__queue_times_by_priority[priority_level]
except KeyError:
return 0
# --- Private API ---
def _update_job_operation_stage(self, job, stage):
job.update_operation_stage(stage)
if self._is_monitored and stage == OperationStage.COMPLETED:
average_order, average_time = self.__queue_time_average
average_order += 1
if average_order <= 1:
average_time = job.query_queue_time()
else:
queue_time = job.query_queue_time()
average_time = average_time + ((queue_time - average_time) / average_order)
self.__queue_time_average = average_order, average_time
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment