Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • edbaunton/buildgrid
  • BuildGrid/buildgrid
  • bloomberg/buildgrid
  • devcurmudgeon/buildgrid
  • mhadjimichael/buildgrid
  • jmacarthur/buildgrid
  • rkothur/buildgrid
  • valentindavid/buildgrid
  • jjardon/buildgrid
  • RichKen/buildgrid
  • jbonney/buildgrid
  • onsha_alexander/buildgrid
  • santigl/buildgrid
  • mostynb/buildgrid
  • hoffbrinkle/buildgrid
  • Malinskiy/buildgrid
  • coldtom/buildgrid
  • azeemb_a/buildgrid
  • pointswaves/buildgrid
  • BenjaminSchubert/buildgrid
  • michaellee8/buildgrid
  • anil-anil/buildgrid
  • seanborg/buildgrid
  • jdelong12/buildgrid
  • jclay/buildgrid
  • bweston92/buildgrid
  • zchen723/buildgrid
  • cpratt34/buildgrid
  • armbiant/apache-buildgrid
  • armbiant/android-buildgrid
  • itsme300/buildgrid
  • sbairoliya/buildgrid
32 results
Show changes
Commits on Source (8)
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
class CapabilitiesInterface:
"""Interface for calls the the Capabilities Service."""
def __init__(self, channel):
"""Initialises an instance of the capabilities service.
Args:
channel (grpc.Channel): A gRPC channel to the CAS endpoint.
"""
self.__logger = logging.getLogger(__name__)
self.__stub = remote_execution_pb2_grpc.CapabilitiesStub(channel)
def get_capabilities(self, instance_name):
"""Returns the capabilities or the server to the user.
Args:
instance_name (str): The name of the instance."""
request = remote_execution_pb2.GetCapabilitiesRequest(instance_name=instance_name)
try:
return self.__stub.GetCapabilities(request)
except grpc.RpcError as e:
self.__logger.error(e)
raise
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
class CapabilitiesInstance:
def __init__(self, cas_instance=None, action_cache_instance=None, execution_instance=None):
self.__logger = logging.getLogger(__name__)
self.__cas_instance = cas_instance
self.__action_cache_instance = action_cache_instance
self.__execution_instance = execution_instance
def register_instance_with_server(self, instance_name, server):
server.add_capabilities_instance(self, instance_name)
def add_cas_instance(self, cas_instance):
self.__cas_instance = cas_instance
def add_action_cache_instance(self, action_cache_instance):
self.__action_cache_instance = action_cache_instance
def add_execution_instance(self, execution_instance):
self.__execution_instance = execution_instance
def get_capabilities(self):
server_capabilities = remote_execution_pb2.ServerCapabilities()
server_capabilities.cache_capabilities.CopyFrom(self._get_cache_capabilities())
server_capabilities.execution_capabilities.CopyFrom(self._get_capabilities_execution())
return server_capabilities
def _get_cache_capabilities(self):
capabilities = remote_execution_pb2.CacheCapabilities()
action_cache_update_capabilities = remote_execution_pb2.ActionCacheUpdateCapabilities()
if self.__cas_instance:
capabilities.digest_function.extend([self.__cas_instance.hash_type()])
capabilities.max_batch_total_size_bytes = self.__cas_instance.max_batch_total_size_bytes()
capabilities.symlink_absolute_path_strategy = self.__cas_instance.symlink_absolute_path_strategy()
# TODO: execution priority #102
# capabilities.cache_priority_capabilities =
if self.__action_cache_instance:
action_cache_update_capabilities.update_enabled = self.__action_cache_instance.allow_updates
capabilities.action_cache_update_capabilities.CopyFrom(action_cache_update_capabilities)
return capabilities
def _get_capabilities_execution(self):
capabilities = remote_execution_pb2.ExecutionCapabilities()
if self.__execution_instance:
capabilities.exec_enabled = True
capabilities.digest_function = self.__execution_instance.hash_type()
# TODO: execution priority #102
# capabilities.execution_priority =
else:
capabilities.exec_enabled = False
return capabilities
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import grpc
from buildgrid._exceptions import InvalidArgumentError
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
class CapabilitiesService(remote_execution_pb2_grpc.CapabilitiesServicer):
def __init__(self, server):
self.__logger = logging.getLogger(__name__)
self.__instances = {}
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(self, server)
def add_instance(self, name, instance):
self.__instances[name] = instance
def add_cas_instance(self, name, instance):
self.__instances[name].add_cas_instance(instance)
def add_action_cache_instance(self, name, instance):
self.__instances[name].add_action_cache_instance(instance)
def add_execution_instance(self, name, instance):
self.__instances[name].add_execution_instance(instance)
def GetCapabilities(self, request, context):
try:
instance = self._get_instance(request.instance_name)
return instance.get_capabilities()
except InvalidArgumentError as e:
self.__logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
return remote_execution_pb2_grpc.ServerCapabilities()
def _get_instance(self, name):
try:
return self.__instances[name]
except KeyError:
raise InvalidArgumentError("Instance doesn't exist on server: [{}]".format(name))
......@@ -37,6 +37,22 @@ class ContentAddressableStorageInstance:
def register_instance_with_server(self, instance_name, server):
server.add_cas_instance(self, instance_name)
def hash_type(self):
hash_name = HASH().name
if hash_name == "sha256":
return re_pb2.SHA256
return re_pb2.UNKNOWN
def max_batch_total_size_bytes(self):
# TODO: link with max size
# Should be added from settings in MR !119
return 2000000
def symlink_absolute_path_strategy(self):
# Currently this strategy is hardcoded into BuildGrid
# With no setting to reference
return re_pb2.CacheCapabilities().DISALLOWED
def find_missing_blobs(self, blob_digests):
storage = self._storage
return re_pb2.FindMissingBlobsResponse(
......
......@@ -22,9 +22,11 @@ An instance of the Remote Execution Service.
import logging
from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
from ..job import Job
from ...settings import HASH
class ExecutionInstance:
......@@ -38,6 +40,12 @@ class ExecutionInstance:
def register_instance_with_server(self, instance_name, server):
server.add_execution_instance(self, instance_name)
def hash_type(self):
hash_name = HASH().name
if hash_name == "sha256":
return remote_execution_pb2.SHA256
return remote_execution_pb2.UNKNOWN
def execute(self, action_digest, skip_cache_lookup, message_queue=None):
""" Sends a job for execution.
Queues an action and creates an Operation instance to be associated with
......
......@@ -21,11 +21,14 @@ import grpc
from .cas.service import ByteStreamService, ContentAddressableStorageService
from .actioncache.service import ActionCacheService
from .capabilities.service import CapabilitiesService
from .execution.service import ExecutionService
from .operations.service import OperationsService
from .bots.service import BotsService
from .referencestorage.service import ReferenceStorageService
from .capabilities.instance import CapabilitiesInstance
class BuildGridServer:
"""Creates a BuildGrid server.
......@@ -50,6 +53,9 @@ class BuildGridServer:
self._server = server
# We always want a capabilities service
self._capabilities_service = CapabilitiesService(self._server)
self._execution_service = None
self._bots_service = None
self._operations_service = None
......@@ -99,6 +105,7 @@ class BuildGridServer:
self._execution_service = ExecutionService(self._server)
self._execution_service.add_instance(instance_name, instance)
self._add_capabilities_instance(instance_name, execution_instance=instance)
def add_bots_interface(self, instance, instance_name):
"""Adds a :obj:`BotsInterface` to the service.
......@@ -155,9 +162,10 @@ class BuildGridServer:
self._action_cache_service = ActionCacheService(self._server)
self._action_cache_service.add_instance(instance_name, instance)
self._add_capabilities_instance(instance_name, action_cache_instance=instance)
def add_cas_instance(self, instance, instance_name):
"""Stores a :obj:`ContentAddressableStorageInstance` to the service.
"""Adds a :obj:`ContentAddressableStorageInstance` to the service.
If no service exists, it creates one.
......@@ -168,10 +176,10 @@ class BuildGridServer:
if self._cas_service is None:
self._cas_service = ContentAddressableStorageService(self._server)
self._cas_service.add_instance(instance_name, instance)
self._add_capabilities_instance(instance_name, cas_instance=instance)
def add_bytestream_instance(self, instance, instance_name):
"""Stores a :obj:`ByteStreamInstance` to the service.
"""Adds a :obj:`ByteStreamInstance` to the service.
If no service exists, it creates one.
......@@ -183,3 +191,28 @@ class BuildGridServer:
self._bytestream_service = ByteStreamService(self._server)
self._bytestream_service.add_instance(instance_name, instance)
def _add_capabilities_instance(self, instance_name,
cas_instance=None,
action_cache_instance=None,
execution_instance=None):
"""Adds a :obj:`CapabilitiesInstance` to the service.
Args:
instance (:obj:`CapabilitiesInstance`): Instance to add.
instance_name (str): Instance name.
"""
try:
if cas_instance:
self._capabilities_service.add_cas_instance(instance_name, cas_instance)
if action_cache_instance:
self._capabilities_service.add_action_cache_instance(instance_name, action_cache_instance)
if execution_instance:
self._capabilities_service.add_execution_instance(instance_name, execution_instance)
except KeyError:
capabilities_instance = CapabilitiesInstance(cas_instance,
action_cache_instance,
execution_instance)
self._capabilities_service.add_instance(instance_name, capabilities_instance)
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=redefined-outer-name
import grpc
import pytest
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid.client.capabilities import CapabilitiesInterface
from buildgrid.server.controller import ExecutionController
from buildgrid.server.actioncache.storage import ActionCache
from buildgrid.server.cas.instance import ContentAddressableStorageInstance
from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
from ..utils.utils import run_in_subprocess
from ..utils.capabilities import serve_capabilities_service
INSTANCES = ['', 'instance']
# Use subprocess to avoid creation of gRPC threads in main process
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md
# Multiprocessing uses pickle which protobufs don't work with
# Workaround wrapper to send messages as strings
class ServerInterface:
def __init__(self, remote):
self.__remote = remote
def get_capabilities(self, instance_name):
def __get_capabilities(queue, remote, instance_name):
interface = CapabilitiesInterface(grpc.insecure_channel(remote))
result = interface.get_capabilities(instance_name)
queue.put(result.SerializeToString())
result = run_in_subprocess(__get_capabilities,
self.__remote, instance_name)
capabilities = remote_execution_pb2.ServerCapabilities()
capabilities.ParseFromString(result)
return capabilities
@pytest.mark.parametrize('instance', INSTANCES)
def test_execution_not_available_capabilities(instance):
with serve_capabilities_service([instance]) as server:
server_interface = ServerInterface(server.remote)
response = server_interface.get_capabilities(instance)
assert not response.execution_capabilities.exec_enabled
@pytest.mark.parametrize('instance', INSTANCES)
def test_execution_available_capabilities(instance):
controller = ExecutionController()
with serve_capabilities_service([instance],
execution_instance=controller.execution_instance) as server:
server_interface = ServerInterface(server.remote)
response = server_interface.get_capabilities(instance)
assert response.execution_capabilities.exec_enabled
assert response.execution_capabilities.digest_function
@pytest.mark.parametrize('instance', INSTANCES)
def test_action_cache_allow_updates_capabilities(instance):
storage = LRUMemoryCache(limit=256)
action_cache = ActionCache(storage, max_cached_refs=256, allow_updates=True)
with serve_capabilities_service([instance],
action_cache_instance=action_cache) as server:
server_interface = ServerInterface(server.remote)
response = server_interface.get_capabilities(instance)
assert response.cache_capabilities.action_cache_update_capabilities.update_enabled
@pytest.mark.parametrize('instance', INSTANCES)
def test_action_cache_not_allow_updates_capabilities(instance):
storage = LRUMemoryCache(limit=256)
action_cache = ActionCache(storage, max_cached_refs=256, allow_updates=False)
with serve_capabilities_service([instance],
action_cache_instance=action_cache) as server:
server_interface = ServerInterface(server.remote)
response = server_interface.get_capabilities(instance)
assert not response.cache_capabilities.action_cache_update_capabilities.update_enabled
@pytest.mark.parametrize('instance', INSTANCES)
def test_cas_capabilities(instance):
cas = ContentAddressableStorageInstance(None)
with serve_capabilities_service([instance],
cas_instance=cas) as server:
server_interface = ServerInterface(server.remote)
response = server_interface.get_capabilities(instance)
assert len(response.cache_capabilities.digest_function) == 1
assert response.cache_capabilities.digest_function[0]
assert response.cache_capabilities.symlink_absolute_path_strategy
assert response.cache_capabilities.max_batch_total_size_bytes
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from concurrent import futures
from contextlib import contextmanager
import multiprocessing
import os
import signal
import grpc
import pytest_cov
from buildgrid.server.capabilities.service import CapabilitiesService
from buildgrid.server.capabilities.instance import CapabilitiesInstance
@contextmanager
def serve_capabilities_service(instances,
cas_instance=None,
action_cache_instance=None,
execution_instance=None):
server = Server(instances,
cas_instance,
action_cache_instance,
execution_instance)
try:
yield server
finally:
server.quit()
class Server:
def __init__(self, instances,
cas_instance=None,
action_cache_instance=None,
execution_instance=None):
self.instances = instances
self.__queue = multiprocessing.Queue()
self.__process = multiprocessing.Process(
target=Server.serve,
args=(self.__queue, self.instances, cas_instance, action_cache_instance, execution_instance))
self.__process.start()
self.port = self.__queue.get(timeout=1)
self.remote = 'localhost:{}'.format(self.port)
@staticmethod
def serve(queue, instances, cas_instance, action_cache_instance, execution_instance):
pytest_cov.embed.cleanup_on_sigterm()
# Use max_workers default from Python 3.5+
max_workers = (os.cpu_count() or 1) * 5
server = grpc.server(futures.ThreadPoolExecutor(max_workers))
port = server.add_insecure_port('localhost:0')
capabilities_service = CapabilitiesService(server)
for name in instances:
capabilities_instance = CapabilitiesInstance(cas_instance, action_cache_instance, execution_instance)
capabilities_service.add_instance(name, capabilities_instance)
server.start()
queue.put(port)
signal.pause()
def quit(self):
if self.__process:
self.__process.terminate()
self.__process.join()