diff --git a/buildgrid/server/instance.py b/buildgrid/server/instance.py index f7dfcbf191840c72edbb1571782ff4c4b4040a52..dbaa01b4cca8dd5b361e9f2509c0ab21e3e074f6 100644 --- a/buildgrid/server/instance.py +++ b/buildgrid/server/instance.py @@ -27,6 +27,7 @@ from buildgrid.server.execution.service import ExecutionService from buildgrid.server.operations.service import OperationsService from buildgrid.server.bots.service import BotsService from buildgrid.server.referencestorage.service import ReferenceStorageService +from buildgrid.settings import MONITORING_PERIOD class BuildGridServer: @@ -50,7 +51,9 @@ class BuildGridServer: self.__grpc_executor = futures.ThreadPoolExecutor(max_workers) self.__grpc_server = grpc.server(self.__grpc_executor) + self.__main_loop = asyncio.get_event_loop() + self.__monitoring_task = None self._execution_service = None self._bots_service = None @@ -60,10 +63,18 @@ class BuildGridServer: self._cas_service = None self._bytestream_service = None + self.__execution_instances = [] + self.__bots_instances = [] + + # --- Public API --- + def start(self): """Starts the BuildGrid server. """ self.__grpc_server.start() + + self.__monitoring_task = asyncio.ensure_future( + self._monitoring_worker(period=MONITORING_PERIOD), loop=self.__main_loop) self.__main_loop.run_forever() def stop(self, grace=0): @@ -72,8 +83,9 @@ class BuildGridServer: Args: grace (int, optional): A duration of time in seconds. Defaults to 0. """ - self.__main_loop.stop() - self.__main_loop.close() + if self.__monitoring_task is not None: + self.__monitoring_task.cancel() + self.__grpc_server.stop(grace) if grace > 0: @@ -108,9 +120,10 @@ class BuildGridServer: """ if self._execution_service is None: self._execution_service = ExecutionService(self.__grpc_server) - self._execution_service.add_instance(instance_name, instance) + self.__execution_instances.append(instance_name) + def add_bots_interface(self, instance, instance_name): """Adds a :obj:`BotsInterface` to the service. @@ -122,9 +135,10 @@ class BuildGridServer: """ if self._bots_service is None: self._bots_service = BotsService(self.__grpc_server) - self._bots_service.add_instance(instance_name, instance) + self.__bots_instances.append(instance_name) + def add_operations_instance(self, instance, instance_name): """Adds an :obj:`OperationsInstance` to the service. @@ -136,7 +150,6 @@ class BuildGridServer: """ if self._operations_service is None: self._operations_service = OperationsService(self.__grpc_server) - self._operations_service.add_instance(instance_name, instance) def add_reference_storage_instance(self, instance, instance_name): @@ -150,7 +163,6 @@ class BuildGridServer: """ if self._reference_storage_service is None: self._reference_storage_service = ReferenceStorageService(self.__grpc_server) - self._reference_storage_service.add_instance(instance_name, instance) def add_action_cache_instance(self, instance, instance_name): @@ -164,7 +176,6 @@ class BuildGridServer: """ if self._action_cache_service is None: self._action_cache_service = ActionCacheService(self.__grpc_server) - self._action_cache_service.add_instance(instance_name, instance) def add_cas_instance(self, instance, instance_name): @@ -178,7 +189,6 @@ class BuildGridServer: """ if self._cas_service is None: self._cas_service = ContentAddressableStorageService(self.__grpc_server) - self._cas_service.add_instance(instance_name, instance) def add_bytestream_instance(self, instance, instance_name): @@ -192,5 +202,22 @@ class BuildGridServer: """ if self._bytestream_service is None: self._bytestream_service = ByteStreamService(self.__grpc_server) - self._bytestream_service.add_instance(instance_name, instance) + + # --- Private API --- + + async def _monitoring_worker(self, period=1): + while True: + try: + n_clients = self._execution_service.query_n_clients() + n_bots = self._bots_service.query_n_bots() + + print('---') + print('n_clients={}'.format(n_clients)) + print('n_bots={}'.format(n_bots)) + + await asyncio.sleep(period) + except asyncio.CancelledError: + break + + self.__main_loop.stop()