Skip to content
Snippets Groups Projects
Commit a43c5625 authored by Martin Blanchard's avatar Martin Blanchard
Browse files

Expose scheduler from controller's service instances

parent 49782b64
No related branches found
No related tags found
No related merge requests found
...@@ -37,6 +37,10 @@ class BotsInterface: ...@@ -37,6 +37,10 @@ class BotsInterface:
self._bot_sessions = {} self._bot_sessions = {}
self._scheduler = scheduler self._scheduler = scheduler
@property
def scheduler(self):
return self._scheduler
def register_instance_with_server(self, instance_name, server): def register_instance_with_server(self, instance_name, server):
server.add_bots_interface(self, instance_name) server.add_bots_interface(self, instance_name)
......
...@@ -59,11 +59,33 @@ class BotsService(bots_pb2_grpc.BotsServicer): ...@@ -59,11 +59,33 @@ class BotsService(bots_pb2_grpc.BotsServicer):
# --- Public API --- # --- Public API ---
def add_instance(self, instance_name, instance): def add_instance(self, instance_name, instance):
"""Registers a new servicer instance.
Args:
instance_name (str): The new instance's name.
instance (BotsInterface): The new instance itself.
"""
self._instances[instance_name] = instance self._instances[instance_name] = instance
if self._is_instrumented: if self._is_instrumented:
self.__bots_by_instance[instance_name] = 0 self.__bots_by_instance[instance_name] = 0
def get_scheduler(self, instance_name):
"""Retrieves a reference to the scheduler for an instance.
Args:
instance_name (str): The name of the instance to query.
Returns:
Scheduler: A reference to the scheduler for `instance_name`.
Raises:
InvalidArgumentError: If no instance named `instance_name` exists.
"""
instance = self._get_instance(instance_name)
return instance.scheduler
# --- Public API: Servicer --- # --- Public API: Servicer ---
def CreateBotSession(self, request, context): def CreateBotSession(self, request, context):
......
...@@ -35,6 +35,10 @@ class ExecutionInstance: ...@@ -35,6 +35,10 @@ class ExecutionInstance:
self._storage = storage self._storage = storage
self._scheduler = scheduler self._scheduler = scheduler
@property
def scheduler(self):
return self._scheduler
def register_instance_with_server(self, instance_name, server): def register_instance_with_server(self, instance_name, server):
server.add_execution_instance(self, instance_name) server.add_execution_instance(self, instance_name)
......
...@@ -52,11 +52,33 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): ...@@ -52,11 +52,33 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
# --- Public API --- # --- Public API ---
def add_instance(self, instance_name, instance): def add_instance(self, instance_name, instance):
"""Registers a new servicer instance.
Args:
instance_name (str): The new instance's name.
instance (ExecutionInstance): The new instance itself.
"""
self._instances[instance_name] = instance self._instances[instance_name] = instance
if self._is_instrumented: if self._is_instrumented:
self.__peers_by_instance[instance_name] = set() self.__peers_by_instance[instance_name] = set()
def get_scheduler(self, instance_name):
"""Retrieves a reference to the scheduler for an instance.
Args:
instance_name (str): The name of the instance to query.
Returns:
Scheduler: A reference to the scheduler for `instance_name`.
Raises:
InvalidArgumentError: If no instance named `instance_name` exists.
"""
instance = self._get_instance(instance_name)
return instance.scheduler
# --- Public API: Servicer --- # --- Public API: Servicer ---
def Execute(self, request, context): def Execute(self, request, context):
......
...@@ -32,6 +32,10 @@ class OperationsInstance: ...@@ -32,6 +32,10 @@ class OperationsInstance:
self._scheduler = scheduler self._scheduler = scheduler
@property
def scheduler(self):
return self._scheduler
def register_instance_with_server(self, instance_name, server): def register_instance_with_server(self, instance_name, server):
server.add_operations_instance(self, instance_name) server.add_operations_instance(self, instance_name)
......
...@@ -38,8 +38,18 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): ...@@ -38,8 +38,18 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
operations_pb2_grpc.add_OperationsServicer_to_server(self, server) operations_pb2_grpc.add_OperationsServicer_to_server(self, server)
def add_instance(self, name, instance): # --- Public API ---
self._instances[name] = instance
def add_instance(self, instance_name, instance):
"""Registers a new servicer instance.
Args:
instance_name (str): The new instance's name.
instance (OperationsInstance): The new instance itself.
"""
self._instances[instance_name] = instance
# --- Public API: Servicer ---
def GetOperation(self, request, context): def GetOperation(self, request, context):
self.__logger.debug("GetOperation request from [%s]", context.peer()) self.__logger.debug("GetOperation request from [%s]", context.peer())
...@@ -132,6 +142,8 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): ...@@ -132,6 +142,8 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
return Empty() return Empty()
# --- Private API ---
def _parse_instance_name(self, name): def _parse_instance_name(self, name):
""" If the instance name is not blank, 'name' will have the form """ If the instance name is not blank, 'name' will have the form
{instance_name}/{operation_uuid}. Otherwise, it will just be {instance_name}/{operation_uuid}. Otherwise, it will just be
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment