Skip to content
Snippets Groups Projects
Commit 80cf5248 authored by Martin Blanchard's avatar Martin Blanchard
Browse files

scheduler.py: Allow registering for build metadata updates

parent 15ae801e
No related branches found
No related tags found
No related merge requests found
...@@ -34,6 +34,8 @@ class Scheduler: ...@@ -34,6 +34,8 @@ class Scheduler:
def __init__(self, action_cache=None, monitor=False): def __init__(self, action_cache=None, monitor=False):
self.__logger = logging.getLogger(__name__) self.__logger = logging.getLogger(__name__)
self.__build_metadata_queues = None
self.__operations_by_stage = None self.__operations_by_stage = None
self.__leases_by_state = None self.__leases_by_state = None
self.__queue_time_average = None self.__queue_time_average = None
...@@ -46,6 +48,8 @@ class Scheduler: ...@@ -46,6 +48,8 @@ class Scheduler:
self._is_instrumented = monitor self._is_instrumented = monitor
if self._is_instrumented: if self._is_instrumented:
self.__build_metadata_queues = []
self.__operations_by_stage = {} self.__operations_by_stage = {}
self.__leases_by_state = {} self.__leases_by_state = {}
self.__queue_time_average = 0, timedelta() self.__queue_time_average = 0, timedelta()
...@@ -53,7 +57,7 @@ class Scheduler: ...@@ -53,7 +57,7 @@ class Scheduler:
self.__operations_by_stage[OperationStage.CACHE_CHECK] = set() self.__operations_by_stage[OperationStage.CACHE_CHECK] = set()
self.__operations_by_stage[OperationStage.QUEUED] = set() self.__operations_by_stage[OperationStage.QUEUED] = set()
self.__operations_by_stage[OperationStage.EXECUTING] = set() self.__operations_by_stage[OperationStage.EXECUTING] = set()
self.__operations_by_stage[OperationStage.COPLETED] = set() self.__operations_by_stage[OperationStage.COMPLETED] = set()
self.__leases_by_state[LeaseState.PENDING] = set() self.__leases_by_state[LeaseState.PENDING] = set()
self.__leases_by_state[LeaseState.ACTIVE] = set() self.__leases_by_state[LeaseState.ACTIVE] = set()
...@@ -200,6 +204,10 @@ class Scheduler: ...@@ -200,6 +204,10 @@ class Scheduler:
def is_instrumented(self): def is_instrumented(self):
return self._is_instrumented return self._is_instrumented
def register_build_metadata_watcher(self, message_queue):
if self.__build_metadata_queues is not None:
self.__build_metadata_queues.append(message_queue)
def query_n_jobs(self): def query_n_jobs(self):
return len(self.jobs) return len(self.jobs)
...@@ -268,3 +276,7 @@ class Scheduler: ...@@ -268,3 +276,7 @@ class Scheduler:
average_time = average_time + ((queue_time - average_time) / average_order) average_time = average_time + ((queue_time - average_time) / average_order)
self.__queue_time_average = average_order, average_time self.__queue_time_average = average_order, average_time
if not job.holds_cached_action_result:
for message_queue in self.__build_metadata_queues:
message_queue.put(job.action_result.execution_metadata)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment