Skip to content
Snippets Groups Projects
Commit 2809507f authored by finnball's avatar finnball
Browse files

Server class now allows instances to add themselves to services.

If no service exists, creates the object.
parent 8a0bae22
No related branches found
No related tags found
Loading
......@@ -13,85 +13,174 @@
# limitations under the License.
"""
BuildGridServer
==============
Creates a BuildGrid server, binding all the requisite service instances together.
"""
import logging
import os
from concurrent import futures
import grpc
from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
from buildgrid.server.actioncache.service import ActionCacheService
from buildgrid.server.execution.service import ExecutionService
from buildgrid.server.operations.service import OperationsService
from buildgrid.server.bots.service import BotsService
from buildgrid.server.referencestorage.service import ReferenceStorageService
from .cas.service import ByteStreamService, ContentAddressableStorageService
from .actioncache.service import ActionCacheService
from .execution.service import ExecutionService
from .operations.service import OperationsService
from .bots.service import BotsService
from .referencestorage.service import ReferenceStorageService
class BuildGridServer:
"""Creates a BuildGrid server.
The :class:`BuildGridServer` class binds together all the
requisite services.
"""
def __init__(self, max_workers=None):
"""Initializes a new :class:`BuildGridServer` instance.
def __init__(self, port=50051, max_workers=10, credentials=None,
execution_instances=None, bots_interfaces=None, operations_instances=None,
operations_service_instances=None, reference_storage_instances=None,
action_cache_instances=None, cas_instances=None, bytestream_instances=None):
Args:
max_workers (int, optional): A pool of max worker threads.
"""
self.logger = logging.getLogger(__name__)
address = '[::]:{0}'.format(port)
if max_workers is None:
# Use max_workers default from Python 3.5+
max_workers = (os.cpu_count() or 1) * 5
server = grpc.server(futures.ThreadPoolExecutor(max_workers))
self._server = server
self._execution_service = None
self._bots_service = None
self._operations_service = None
self._reference_storage_service = None
self._action_cache_service = None
self._cas_service = None
self._bytestream_service = None
def start(self):
"""Starts the server.
"""
self._server.start()
def stop(self, grace=0):
"""Stops the server.
"""
self._server.stop(grace)
def add_port(self, address, credentials):
"""Adds a port to the server.
Must be called before the server starts. If a credentials object exists,
it will make a secure port.
Args:
address (str): The address with port number.
credentials (:obj:`grpc.ChannelCredentials`): Credentials object.
"""
if credentials is not None:
self.logger.info("Secure connection")
server.add_secure_port(address, credentials)
self.logger.info("Adding secure connection on: [{}]".format(address))
self._server.add_secure_port(address, credentials)
else:
self.logger.info("Insecure connection")
server.add_insecure_port(address)
if execution_instances:
self.logger.debug("Adding execution instances {}".format(
execution_instances.keys()))
ExecutionService(server, execution_instances)
if bots_interfaces:
self.logger.debug("Adding bots interfaces {}".format(
bots_interfaces.keys()))
BotsService(server, bots_interfaces)
if operations_instances:
self.logger.debug("Adding operations instances {}".format(
operations_instances.keys()))
OperationsService(server, operations_instances)
if reference_storage_instances:
self.logger.debug("Adding reference storages {}".format(
reference_storage_instances.keys()))
ReferenceStorageService(server, reference_storage_instances)
if action_cache_instances:
self.logger.debug("Adding action cache instances {}".format(
action_cache_instances.keys()))
ActionCacheService(server, action_cache_instances)
if cas_instances:
self.logger.debug("Adding cas instances {}".format(
cas_instances.keys()))
ContentAddressableStorageService(server, cas_instances)
if bytestream_instances:
self.logger.debug("Adding bytestream instances {}".format(
bytestream_instances.keys()))
ByteStreamService(server, bytestream_instances)
self.logger.info("Adding insecure connection on [{}]".format(address))
self._server.add_insecure_port(address)
self._server = server
def add_execution_instance(self, instance, instance_name):
"""Adds an :obj:`ExecutionInstance` to the service.
def start(self):
self._server.start()
If no service exists, it creates one.
Args:
instance (:obj:`ExecutionInstance`): Instance to add.
instance_name (str): Instance name.
"""
if self._execution_service is None:
self._execution_service = ExecutionService(self._server)
self._execution_service.add_instance(instance_name, instance)
def add_bots_interface(self, instance, instance_name):
"""Adds a :obj:`BotsInterface` to the service.
If no service exists, it creates one.
Args:
instance (:obj:`BotsInterface`): Instance to add.
instance_name (str): Instance name.
"""
if self._bots_service is None:
self._bots_service = BotsService(self._server)
self._bots_service.add_instance(instance_name, instance)
def add_operations_instance(self, instance, instance_name):
"""Adds an :obj:`OperationsInstance` to the service.
If no service exists, it creates one.
Args:
instance (:obj:`OperationsInstance`): Instance to add.
instance_name (str): Instance name.
"""
if self._operations_service is None:
self._operations_service = OperationsService(self._server)
self._operations_service.add_instance(instance_name, instance)
def add_reference_storage_instance(self, instance, instance_name):
"""Adds a :obj:`ReferenceCache` to the service.
If no service exists, it creates one.
Args:
instance (:obj:`ReferenceCache`): Instance to add.
instance_name (str): Instance name.
"""
if self._reference_storage_service is None:
self._reference_storage_service = ReferenceStorageService(self._server)
self._reference_storage_service.add_instance(instance_name, instance)
def add_action_cache_instance(self, instance, instance_name):
"""Adds a :obj:`ReferenceCache` to the service.
If no service exists, it creates one.
Args:
instance (:obj:`ReferenceCache`): Instance to add.
instance_name (str): Instance name.
"""
if self._action_cache_service is None:
self._action_cache_service = ActionCacheService(self._server)
self._action_cache_service.add_instance(instance_name, instance)
def add_cas_instance(self, instance, instance_name):
"""Stores a :obj:`ContentAddressableStorageInstance` to the service.
If no service exists, it creates one.
Args:
instance (:obj:`ReferenceCache`): Instance to add.
instance_name (str): Instance name.
"""
if self._cas_service is None:
self._cas_service = ContentAddressableStorageService(self._server)
self._cas_service.add_instance(instance_name, instance)
def add_bytestream_instance(self, instance, instance_name):
"""Stores a :obj:`ByteStreamInstance` to the service.
If no service exists, it creates one.
Args:
instance (:obj:`ByteStreamInstance`): Instance to add.
instance_name (str): Instance name.
"""
if self._bytestream_service is None:
self._bytestream_service = ByteStreamService(self._server)
def stop(self):
self._server.stop(grace=0)
self._bytestream_service.add_instance(instance_name, instance)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment