Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • coverity
  • master
  • sminskyprimu/blake3
  • sotk/consolidate-leases-and-jobs/p2-write-only-leases
  • sotk/consolidate-leases-and-jobs/p3-drop-leases-table
  • sotk/features/lease-audit-table
  • sotk/logstream-testing
  • zchen723/skip-scheduler-metrics
  • 0.0.10
  • 0.0.11
  • 0.0.12
  • 0.0.13
  • 0.0.14
  • 0.0.16
  • 0.0.17
  • 0.0.19
  • 0.0.2
  • 0.0.20
  • 0.0.21
  • 0.0.23
  • 0.0.25
  • 0.0.26
  • 0.0.27
  • 0.0.28
  • 0.0.29
  • 0.0.3
  • 0.0.30
  • 0.0.31
  • 0.0.32
  • 0.0.33
  • 0.0.34
  • 0.0.35
  • 0.0.36
  • 0.0.37
  • 0.0.38
  • 0.0.39
  • 0.0.4
  • 0.0.40
  • 0.0.41
  • 0.0.42
  • 0.0.43
  • 0.0.44
  • 0.0.45
  • 0.0.46
  • 0.0.47
  • 0.0.48
  • 0.0.49
  • 0.0.5
  • 0.0.50
  • 0.0.51
  • 0.0.52
  • 0.0.53
  • 0.0.54
  • 0.0.55
  • 0.0.56
  • 0.0.57
  • 0.0.58
  • 0.0.59
  • 0.0.6
  • 0.0.60
  • 0.0.61
  • 0.0.62
  • 0.0.63
  • 0.0.64
  • 0.0.65
  • 0.0.66
  • 0.0.67
  • 0.0.68
  • 0.0.69
  • 0.0.7
  • 0.0.70
  • 0.0.71
  • 0.0.72
  • 0.0.73
  • 0.0.74
  • 0.0.75
  • 0.0.76
  • 0.0.78
  • 0.0.79
  • 0.0.8
  • 0.0.80
  • 0.0.81
  • 0.0.82
  • 0.0.83
  • 0.0.84
  • 0.0.85
  • 0.0.86
  • 0.0.87
  • 0.0.88
  • 0.0.89
  • 0.0.9
  • 0.0.90
  • 0.0.91
  • 0.0.92
  • 0.0.93
  • 0.0.94
  • 0.0.95
  • 0.0.96
  • 0.0.97
  • 0.0.98
  • 0.1.0
  • 0.1.1
  • 0.1.10
  • 0.1.11
  • 0.1.12
  • 0.1.13
  • 0.1.14
  • 0.1.15
108 results

Target

Select target project
  • edbaunton/buildgrid
  • BuildGrid/buildgrid
  • bloomberg/buildgrid
  • devcurmudgeon/buildgrid
  • mhadjimichael/buildgrid
  • jmacarthur/buildgrid
  • rkothur/buildgrid
  • valentindavid/buildgrid
  • jjardon/buildgrid
  • RichKen/buildgrid
  • jbonney/buildgrid
  • onsha_alexander/buildgrid
  • santigl/buildgrid
  • mostynb/buildgrid
  • hoffbrinkle/buildgrid
  • Malinskiy/buildgrid
  • coldtom/buildgrid
  • azeemb_a/buildgrid
  • pointswaves/buildgrid
  • BenjaminSchubert/buildgrid
  • michaellee8/buildgrid
  • anil-anil/buildgrid
  • seanborg/buildgrid
  • jdelong12/buildgrid
  • jclay/buildgrid
  • bweston92/buildgrid
  • zchen723/buildgrid
  • cpratt34/buildgrid
  • armbiant/apache-buildgrid
  • armbiant/android-buildgrid
  • itsme300/buildgrid
  • sbairoliya/buildgrid
32 results
Select Git revision
Show changes
Commits on Source (5)
Showing
with 758 additions and 380 deletions
......@@ -30,7 +30,7 @@ before_script:
.run-dummy-job-template: &dummy-job
stage: test
script:
- ${BGD} server start --allow-insecure &
- ${BGD} server start buildgrid/_app/settings/default.yml &
- sleep 1 # Allow server to boot
- ${BGD} bot dummy &
- ${BGD} execute request-dummy --wait-for-completion
......
......@@ -170,11 +170,8 @@ class BuildGridCLI(click.MultiCommand):
return commands
def get_command(self, context, name):
try:
mod = __import__(name='buildgrid._app.commands.cmd_{}'.format(name),
fromlist=['cli'])
except ImportError:
return None
mod = __import__(name='buildgrid._app.commands.cmd_{}'.format(name),
fromlist=['cli'])
return mod.cli
......
......@@ -22,20 +22,17 @@ Create a BuildGrid server.
import asyncio
import logging
import sys
import click
from buildgrid.server import buildgrid_server
from buildgrid.server.cas.storage.disk import DiskStorage
from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
from buildgrid.server.cas.storage.s3 import S3Storage
from buildgrid.server.cas.storage.with_cache import WithCacheStorage
from buildgrid.server.controller import ExecutionController
from buildgrid.server.actioncache.storage import ActionCache
from buildgrid.server.cas.instance import ByteStreamInstance, ContentAddressableStorageInstance
from buildgrid.server.referencestorage.storage import ReferenceCache
from ..cli import pass_context
_SIZE_PREFIXES = {'k': 2 ** 10, 'm': 2 ** 20, 'g': 2 ** 30, 't': 2 ** 40}
from ..settings import parser
from ..server import BuildGridServer
@click.group(name='server', short_help="Start a local server instance.")
......@@ -45,71 +42,31 @@ def cli(context):
@cli.command('start', short_help="Setup a new server instance.")
@click.argument('instances', nargs=-1, type=click.STRING)
@click.option('--port', type=click.INT, default='50051', show_default=True,
help="The port number to be listened.")
@click.option('--server-key', type=click.Path(exists=True, dir_okay=False), default=None,
help="Private server key for TLS (PEM-encoded)")
@click.option('--server-cert', type=click.Path(exists=True, dir_okay=False), default=None,
help="Public server certificate for TLS (PEM-encoded)")
@click.option('--client-certs', type=click.Path(exists=True, dir_okay=False), default=None,
help="Public client certificates for TLS (PEM-encoded, one single file)")
@click.option('--allow-insecure', type=click.BOOL, is_flag=True,
help="Whether or not to allow unencrypted connections.")
@click.option('--allow-update-action-result/--forbid-update-action-result',
'allow_uar', default=True, show_default=True,
help="Whether or not to allow clients to manually edit the action cache.")
@click.option('--max-cached-actions', type=click.INT, default=50, show_default=True,
help="Maximum number of actions to keep in the ActionCache.")
@click.option('--cas', type=click.Choice(('lru', 's3', 'disk', 'with-cache')),
help="The CAS storage type to use.")
@click.option('--cas-cache', type=click.Choice(('lru', 's3', 'disk')),
help="For --cas=with-cache, the CAS storage to use as the cache.")
@click.option('--cas-fallback', type=click.Choice(('lru', 's3', 'disk')),
help="For --cas=with-cache, the CAS storage to use as the fallback.")
@click.option('--cas-lru-size', type=click.STRING,
help="For --cas=lru, the LRU cache's memory limit.")
@click.option('--cas-s3-bucket', type=click.STRING,
help="For --cas=s3, the bucket name.")
@click.option('--cas-s3-endpoint', type=click.STRING,
help="For --cas=s3, the endpoint URI.")
@click.option('--cas-disk-directory', type=click.Path(file_okay=False, dir_okay=True, writable=True),
help="For --cas=disk, the folder to store CAS blobs in.")
@click.argument('CONFIG', type=click.Path(file_okay=True, dir_okay=False, writable=False))
@pass_context
def start(context, port, allow_insecure, server_key, server_cert, client_certs,
instances, max_cached_actions, allow_uar, cas, **cas_args):
"""Setups a new server instance."""
credentials = None
if not allow_insecure:
credentials = context.load_server_credentials(server_key, server_cert, client_certs)
if not credentials and not allow_insecure:
click.echo("ERROR: no TLS keys were specified and no defaults could be found.\n" +
"Use --allow-insecure in order to deactivate TLS encryption.\n", err=True)
sys.exit(-1)
context.credentials = credentials
context.port = port
context.logger.info("BuildGrid server booting up")
context.logger.info("Starting on port {}".format(port))
cas_storage = _make_cas_storage(context, cas, cas_args)
if cas_storage is None:
context.logger.info("Running without CAS - action cache will be unavailable")
action_cache = None
else:
action_cache = ActionCache(cas_storage, max_cached_actions, allow_uar)
if instances is None:
instances = ['main']
server = buildgrid_server.BuildGridServer(port=context.port,
credentials=context.credentials,
instances=instances,
cas_storage=cas_storage,
action_cache=action_cache)
def start(context, config):
with open(config) as f:
settings = parser.get_parser().safe_load(f)
server_settings = settings['server']
instances = settings['instances']
execution_controllers = _instance_maker(instances, ExecutionController)
reference_caches = _instance_maker(instances, ReferenceCache)
action_caches = _instance_maker(instances, ActionCache)
cas = _instance_maker(instances, ContentAddressableStorageInstance)
bytestreams = _instance_maker(instances, ByteStreamInstance)
port = server_settings['port']
server = BuildGridServer(port=port,
execution_controller=execution_controllers,
reference_storage_instances=reference_caches,
action_cache_instances=action_caches,
cas_instances=cas,
bytestream_instances=bytestreams)
context.logger.info("Starting server on port {}".format(port))
loop = asyncio.get_event_loop()
try:
server.start()
......@@ -119,57 +76,20 @@ def start(context, port, allow_insecure, server_key, server_cert, client_certs,
pass
finally:
context.logger.info("Stopping server")
server.stop()
loop.close()
def _make_cas_storage(context, cas_type, cas_args):
"""Returns the storage provider corresponding to the given `cas_type`,
or None if the provider cannot be created.
"""
if cas_type == "lru":
if cas_args["cas_lru_size"] is None:
context.logger.error("--cas-lru-size is required for LRU CAS")
return None
try:
size = _parse_size(cas_args["cas_lru_size"])
except ValueError:
context.logger.error('Invalid LRU size "{0}"'.format(cas_args["cas_lru_size"]))
return None
return LRUMemoryCache(size)
elif cas_type == "s3":
if cas_args["cas_s3_bucket"] is None:
context.logger.error("--cas-s3-bucket is required for S3 CAS")
return None
if cas_args["cas_s3_endpoint"] is not None:
return S3Storage(cas_args["cas_s3_bucket"],
endpoint_url=cas_args["cas_s3_endpoint"])
return S3Storage(cas_args["cas_s3_bucket"])
elif cas_type == "disk":
if cas_args["cas_disk_directory"] is None:
context.logger.error("--cas-disk-directory is required for disk CAS")
return None
return DiskStorage(cas_args["cas_disk_directory"])
elif cas_type == "with-cache":
cache = _make_cas_storage(context, cas_args["cas_cache"], cas_args)
fallback = _make_cas_storage(context, cas_args["cas_fallback"], cas_args)
if cache is None:
context.logger.error("Missing cache provider for --cas=with-cache")
return None
elif fallback is None:
context.logger.error("Missing fallback provider for --cas=with-cache")
return None
return WithCacheStorage(cache, fallback)
elif cas_type is None:
return None
return None
def _parse_size(size):
"""Convert a string containing a size in bytes (e.g. '2GB') to a number."""
size = size.lower()
if size[-1] == 'b':
size = size[:-1]
if size[-1] in _SIZE_PREFIXES:
return int(size[:-1]) * _SIZE_PREFIXES[size[-1]]
return int(size)
# Turn away now if you want to keep your eyes
def _instance_maker(instances, service_type):
# TODO get this mapped in parser
made = {}
for instance in instances:
services = instance['services']
instance_name = instance['name']
for service in services:
if isinstance(service, service_type):
made[instance_name] = service
return made
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
BuildGridServer
==============
Creates a BuildGrid server, binding all the requisite service instances together.
"""
import logging
from concurrent import futures
import grpc
from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
from buildgrid.server.actioncache.service import ActionCacheService
from buildgrid.server.execution.service import ExecutionService
from buildgrid.server.operations.service import OperationsService
from buildgrid.server.bots.service import BotsService
from buildgrid.server.referencestorage.service import ReferenceStorageService
class BuildGridServer:
def __init__(self, port=50051, max_workers=10, credentials=None,
execution_controller=None, reference_storage_instances=None,
action_cache_instances=None, cas_instances=None, bytestream_instances=None):
self.logger = logging.getLogger(__name__)
address = '[::]:{0}'.format(port)
server = grpc.server(futures.ThreadPoolExecutor(max_workers))
if credentials is not None:
self.logger.info("Secure connection")
server.add_secure_port(address, credentials)
else:
self.logger.info("Insecure connection")
server.add_insecure_port(address)
if execution_controller:
self.logger.debug("Adding execution controllers {}".format(
execution_controller.keys()))
ExecutionService(server, execution_controller)
BotsService(server, execution_controller)
OperationsService(server, execution_controller)
if reference_storage_instances:
self.logger.debug("Adding reference storages {}".format(
reference_storage_instances.keys()))
ReferenceStorageService(server, reference_storage_instances)
if action_cache_instances:
self.logger.debug("Adding action cache instances {}".format(
action_cache_instances.keys()))
ActionCacheService(server, action_cache_instances)
if cas_instances:
self.logger.debug("Adding cas instances {}".format(
cas_instances.keys()))
ContentAddressableStorageService(server, cas_instances)
if bytestream_instances:
self.logger.debug("Adding bytestream instances {}".format(
bytestream_instances.keys()))
ByteStreamService(server, bytestream_instances)
self._server = server
def start(self):
self._server.start()
def stop(self):
self._server.stop(grace=0)
server:
port: 50051
tls-server-key: null
tls-server-cert: null
tls-client-certs: null
insecure-mode: true
description: |
A single default instance
instances:
- name: main
description: |
The main server
storages:
- !disk-storage &main-storage
path: ~/cas/
services:
- !action-cache &main-action
storage: *main-storage
max_cached_refs: 256
allow_updates: true
- !execution
storage: *main-storage
action_cache: *main-action
- !cas
storage: *main-storage
- !bytestream
storage: *main-storage
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import yaml
from buildgrid.server.controller import ExecutionController
from buildgrid.server.actioncache.storage import ActionCache
from buildgrid.server.cas.instance import ByteStreamInstance, ContentAddressableStorageInstance
from buildgrid.server.cas.storage.disk import DiskStorage
from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
class YamlFactory(yaml.YAMLObject):
@classmethod
def from_yaml(cls, loader, node):
values = loader.construct_mapping(node, deep=True)
return cls(**values)
class Disk(YamlFactory):
yaml_tag = u'!disk-storage'
def __new__(cls, path):
return DiskStorage(path)
class LRU(YamlFactory):
yaml_tag = u'!lru-storage'
def __new__(cls, size):
return LRUMemoryCache(_parse_size(size))
class Execution(YamlFactory):
yaml_tag = u'!execution'
def __new__(cls, storage, action_cache=None):
return ExecutionController(action_cache, storage)
class Action(YamlFactory):
yaml_tag = u'!action-cache'
def __new__(cls, storage, max_cached_refs=0, allow_updates=True):
return ActionCache(storage, max_cached_refs, allow_updates)
class CAS(YamlFactory):
yaml_tag = u'!cas'
def __new__(cls, storage):
return ContentAddressableStorageInstance(storage)
class ByteStream(YamlFactory):
yaml_tag = u'!bytestream'
def __new__(cls, storage):
return ByteStreamInstance(storage)
def _parse_size(size):
"""Convert a string containing a size in bytes (e.g. '2GB') to a number."""
_size_prefixes = {'k': 2 ** 10, 'm': 2 ** 20, 'g': 2 ** 30, 't': 2 ** 40}
size = size.lower()
if size[-1] == 'b':
size = size[:-1]
if size[-1] in _size_prefixes:
return int(size[:-1]) * _size_prefixes[size[-1]]
return int(size)
def get_parser():
yaml.SafeLoader.add_constructor(Execution.yaml_tag, Execution.from_yaml)
yaml.SafeLoader.add_constructor(Execution.yaml_tag, Execution.from_yaml)
yaml.SafeLoader.add_constructor(Action.yaml_tag, Action.from_yaml)
yaml.SafeLoader.add_constructor(Disk.yaml_tag, Disk.from_yaml)
yaml.SafeLoader.add_constructor(LRU.yaml_tag, LRU.from_yaml)
yaml.SafeLoader.add_constructor(CAS.yaml_tag, CAS.from_yaml)
yaml.SafeLoader.add_constructor(ByteStream.yaml_tag, ByteStream.from_yaml)
return yaml
......@@ -27,18 +27,27 @@ import grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
from .._exceptions import NotFoundError
from .._exceptions import InvalidArgumentError, NotFoundError
class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer):
def __init__(self, action_cache):
self._action_cache = action_cache
def __init__(self, server, instances):
self._instances = instances
self.logger = logging.getLogger(__name__)
remote_execution_pb2_grpc.add_ActionCacheServicer_to_server(self, server)
def GetActionResult(self, request, context):
try:
return self._action_cache.get_action_result(request.action_digest)
instance = self._get_instance(request.instance_name)
return instance.get_action_result(request.action_digest)
except InvalidArgumentError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
except NotFoundError as e:
self.logger.error(e)
......@@ -48,11 +57,24 @@ class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer):
def UpdateActionResult(self, request, context):
try:
self._action_cache.update_action_result(request.action_digest, request.action_result)
instance = self._get_instance(request.instance_name)
instance.update_action_result(request.action_digest, request.action_result)
return request.action_result
except InvalidArgumentError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
except NotImplementedError as e:
self.logger.error(e)
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
return remote_execution_pb2.ActionResult()
def _get_instance(self, instance_name):
try:
return self._instances[instance_name]
except KeyError:
raise InvalidArgumentError("Invalid instance name: {}".format(instance_name))
......@@ -33,10 +33,12 @@ from .._exceptions import InvalidArgumentError, OutofSyncError
class BotsService(bots_pb2_grpc.BotsServicer):
def __init__(self, instances):
def __init__(self, server, instances):
self._instances = instances
self.logger = logging.getLogger(__name__)
bots_pb2_grpc.add_BotsServicer_to_server(self, server)
def CreateBotSession(self, request, context):
try:
parent = request.parent
......
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
BuildGridServer
==============
Creates the user a local server BuildGrid server.
"""
from concurrent import futures
import grpc
from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
from buildgrid._protos.google.longrunning import operations_pb2_grpc
from .instance import BuildGridInstance
from .cas.service import ByteStreamService, ContentAddressableStorageService
from .actioncache.service import ActionCacheService
from .execution.service import ExecutionService
from .operations.service import OperationsService
from .bots.service import BotsService
class BuildGridServer:
def __init__(self, port=50051, credentials=None, instances=None,
max_workers=10, action_cache=None, cas_storage=None):
address = '[::]:{0}'.format(port)
self._server = grpc.server(futures.ThreadPoolExecutor(max_workers))
if credentials is not None:
self._server.add_secure_port(address, credentials)
else:
self._server.add_insecure_port(address)
if cas_storage is not None:
cas_service = ContentAddressableStorageService(cas_storage)
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(cas_service,
self._server)
bytestream_pb2_grpc.add_ByteStreamServicer_to_server(ByteStreamService(cas_storage),
self._server)
if action_cache is not None:
action_cache_service = ActionCacheService(action_cache)
remote_execution_pb2_grpc.add_ActionCacheServicer_to_server(action_cache_service,
self._server)
buildgrid_instances = {}
if not instances:
buildgrid_instances["main"] = BuildGridInstance(action_cache, cas_storage)
else:
for name in instances:
buildgrid_instances[name] = BuildGridInstance(action_cache, cas_storage)
bots_pb2_grpc.add_BotsServicer_to_server(BotsService(buildgrid_instances),
self._server)
remote_execution_pb2_grpc.add_ExecutionServicer_to_server(ExecutionService(buildgrid_instances),
self._server)
operations_pb2_grpc.add_OperationsServicer_to_server(OperationsService(buildgrid_instances),
self._server)
def start(self):
self._server.start()
def stop(self):
self._server.stop(0)
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Storage Instances
=========
Instances of CAS and ByteStream
"""
from buildgrid._protos.google.bytestream import bytestream_pb2
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
from .._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
from ...settings import HASH
class ContentAddressableStorageInstance:
def __init__(self, storage):
self._storage = storage
def find_missing_blobs(self, blob_digests):
storage = self._storage
return re_pb2.FindMissingBlobsResponse(
missing_blob_digests=storage.missing_blobs(blob_digests))
def batch_update_blobs(self, requests):
storage = self._storage
store = []
for request_proto in requests:
store.append((request_proto.digest, request_proto.data))
response = re_pb2.BatchUpdateBlobsResponse()
statuses = storage.bulk_update_blobs(store)
for (digest, _), status in zip(store, statuses):
response_proto = response.responses.add()
response_proto.digest.CopyFrom(digest)
response_proto.status.CopyFrom(status)
return response
class ByteStreamInstance:
BLOCK_SIZE = 1 * 1024 * 1024 # 1 MB block size
def __init__(self, storage):
self._storage = storage
def read(self, path, read_offset, read_limit):
storage = self._storage
if path[0] == "blobs"
path = [""] + path
# Parse/verify resource name.
# Read resource names look like "[instance/]blobs/abc123hash/99".
digest = re_pb2.Digest(hash=path[2], size_bytes=int(path[3]))
# Check the given read offset and limit.
if read_offset < 0 or read_offset > digest.size_bytes:
raise OutOfRangeError("Read offset out of range")
elif read_limit == 0:
bytes_remaining = digest.size_bytes - read_offset
elif read_limit > 0:
bytes_remaining = read_limit
else:
raise InvalidArgumentError("Negative read_limit is invalid")
# Read the blob from storage and send its contents to the client.
result = storage.get_blob(digest)
if result is None:
raise NotFoundError("Blob not found")
elif result.seekable():
result.seek(read_offset)
else:
result.read(read_offset)
while bytes_remaining > 0:
yield bytestream_pb2.ReadResponse(
data=result.read(min(self.BLOCK_SIZE, bytes_remaining)))
bytes_remaining -= self.BLOCK_SIZE
def write(self, requests):
storage = self._storage
first_request = next(requests)
path = first_request.resource_name.split("/")
if path[0] == "uploads":
path = [""] + path
if len(path) < 6 or path[1] != "uploads" or path[3] != "blobs" or not path[5].isdigit():
raise InvalidArgumentError("Invalid resource name")
digest = re_pb2.Digest(hash=path[4], size_bytes=int(path[5]))
write_session = storage.begin_write(digest)
# Start the write session and write the first request's data.
write_session.write(first_request.data)
hash_ = HASH(first_request.data)
bytes_written = len(first_request.data)
finished = first_request.finish_write
# Handle subsequent write requests.
while not finished:
for request in requests:
if finished:
raise InvalidArgumentError("Write request sent after write finished")
elif request.write_offset != bytes_written:
raise InvalidArgumentError("Invalid write offset")
elif request.resource_name and request.resource_name != first_request.resource_name:
raise InvalidArgumentError("Resource name changed mid-write")
finished = request.finish_write
bytes_written += len(request.data)
if bytes_written > digest.size_bytes:
raise InvalidArgumentError("Wrote too much data to blob")
write_session.write(request.data)
hash_.update(request.data)
# Check that the data matches the provided digest.
if bytes_written != digest.size_bytes or not finished:
raise NotImplementedError("Cannot close stream before finishing write")
elif hash_.hexdigest() != digest.hash:
raise InvalidArgumentError("Data does not match hash")
storage.commit_write(digest, write_session)
return bytestream_pb2.WriteResponse(committed_size=bytes_written)
......@@ -21,131 +21,146 @@ Implements the Content Addressable Storage API and ByteStream API.
"""
from itertools import tee
import logging
import grpc
from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc as re_pb2_grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
from .._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
from ...settings import HASH
class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
class ContentAddressableStorageService(re_pb2_grpc.ContentAddressableStorageServicer):
def __init__(self, server, instances):
self.logger = logging.getLogger(__name__)
self._instances = instances
def __init__(self, storage):
self._storage = storage
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(self, server)
def FindMissingBlobs(self, request, context):
# Only one instance for now.
storage = self._storage
return re_pb2.FindMissingBlobsResponse(
missing_blob_digests=storage.missing_blobs(request.blob_digests))
try:
instance = self._get_instance(request.instance_name)
return instance.find_missing_blobs(request.blob_digests)
except InvalidArgumentError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
return remote_execution_pb2.FindMissingBlobsResponse()
def BatchUpdateBlobs(self, request, context):
# Only one instance for now.
storage = self._storage
requests = []
for request_proto in request.requests:
requests.append((request_proto.digest, request_proto.data))
response = re_pb2.BatchUpdateBlobsResponse()
for (digest, _), status in zip(requests, storage.bulk_update_blobs(requests)):
response_proto = response.responses.add()
response_proto.digest.CopyFrom(digest)
response_proto.status.CopyFrom(status)
return response
try:
instance = self._get_instance(request.instance_name)
return instance.batch_update_blobs(request.requests)
except InvalidArgumentError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
return remote_execution_pb2.BatchReadBlobsResponse()
def _get_instance(self, instance_name):
try:
return self._instances[instance_name]
except KeyError:
raise InvalidArgumentError("Invalid instance name: {}".format(instance_name))
class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
BLOCK_SIZE = 1 * 1024 * 1024 # 1 MB block size
def __init__(self, server, instances):
self.logger = logging.getLogger(__name__)
self._instances = instances
def __init__(self, storage):
self._storage = storage
bytestream_pb2_grpc.add_ByteStreamServicer_to_server(self, server)
def Read(self, request, context):
# Only one instance for now.
storage = self._storage
# Parse/verify resource name.
# Read resource names look like "[instance/]blobs/abc123hash/99".
path = request.resource_name.split("/")
if len(path) == 3:
path = [""] + path
if len(path) != 4 or path[1] != "blobs" or not path[3].isdigit():
context.abort(grpc.StatusCode.NOT_FOUND, "Invalid resource name")
# instance_name = path[0]
digest = re_pb2.Digest(hash=path[2], size_bytes=int(path[3]))
# Check the given read offset and limit.
if request.read_offset < 0 or request.read_offset > digest.size_bytes:
context.abort(grpc.StatusCode.OUT_OF_RANGE, "Read offset out of range")
elif request.read_limit == 0:
bytes_remaining = digest.size_bytes - request.read_offset
elif request.read_limit > 0:
bytes_remaining = request.read_limit
else:
context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Negative read_limit is invalid")
# Read the blob from storage and send its contents to the client.
result = storage.get_blob(digest)
if result is None:
context.abort(grpc.StatusCode.NOT_FOUND, "Blob not found")
elif result.seekable():
result.seek(request.read_offset)
else:
result.read(request.read_offset)
while bytes_remaining > 0:
yield bytestream_pb2.ReadResponse(
data=result.read(min(self.BLOCK_SIZE, bytes_remaining)))
bytes_remaining -= self.BLOCK_SIZE
def Write(self, request_iterator, context):
# Only one instance for now.
storage = self._storage
requests = iter(request_iterator)
first_request = next(requests)
if first_request.write_offset != 0:
context.abort(grpc.StatusCode.UNIMPLEMENTED, "Nonzero write offset is unsupported")
# Parse/verify resource name.
# Write resource names look like "[instance/]uploads/SOME-GUID/blobs/abc123hash/99".
path = first_request.resource_name.split("/")
if path[0] == "uploads":
path = [""] + path
if len(path) < 6 or path[1] != "uploads" or path[3] != "blobs" or not path[5].isdigit():
context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Invalid resource name")
# instance_name = path[0]
digest = re_pb2.Digest(hash=path[4], size_bytes=int(path[5]))
# Start the write session and write the first request's data.
write_session = storage.begin_write(digest)
write_session.write(first_request.data)
hash_ = HASH(first_request.data)
bytes_written = len(first_request.data)
done = first_request.finish_write
# Handle subsequent write requests.
for request in requests:
if done:
context.abort(grpc.StatusCode.INVALID_ARGUMENT,
"Write request sent after write finished")
elif request.write_offset != bytes_written:
context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Invalid write offset")
elif request.resource_name and request.resource_name != first_request.resource_name:
context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Resource name changed mid-write")
done = request.finish_write
bytes_written += len(request.data)
if bytes_written > digest.size_bytes:
context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Wrote too much data to blob")
write_session.write(request.data)
hash_.update(request.data)
# Check that the data matches the provided digest.
if bytes_written != digest.size_bytes or not done:
context.abort(grpc.StatusCode.UNIMPLEMENTED,
"Cannot close stream before finishing write")
elif hash_.hexdigest() != digest.hash:
context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Data does not match hash")
storage.commit_write(digest, write_session)
return bytestream_pb2.WriteResponse(committed_size=bytes_written)
try:
path = request.resource_name.split("/")
instance_name = path[0]
# TODO: Decide on default instance name
if path[0] == "blobs":
if len(path) < 3 or not path[2].isdigit():
raise InvalidArgumentError("Invalid resource name: {}".format(context.resource_name))
instance_name = ""
elif path[1] == "blobs":
if len(path) < 4 or not path[3].isdigit():
raise InvalidArgumentError("Invalid resource name: {}".format(context.resource_name))
instance = self._get_instance(instance_name)
yield from instance.read(path,
request.read_offset,
request.read_limit)
except InvalidArgumentError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
yield bytestream_pb2.ReadResponse()
except NotFoundError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.NOT_FOUND)
yield bytestream_pb2.ReadResponse()
except OutOfRangeError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.OUT_OF_RANGE)
yield bytestream_pb2.ReadResponse()
def Write(self, requests, context):
try:
requests, request_probe = tee(requests, 2)
first_request = next(request_probe)
path = first_request.resource_name.split("/")
instance_name = path[0]
# TODO: Sort out no instance name
if path[0] == "uploads":
if len(path) < 5 or path[2] != "blobs" or not path[4].isdigit():
raise InvalidArgumentError("Invalid resource name: {}".format(context.resource_name))
instance_name = ""
elif path[1] == "uploads":
if len(path) < 6 or path[3] != "blobs" or not path[5].isdigit():
raise InvalidArgumentError("Invalid resource name: {}".format(context.resource_name))
instance = self._get_instance(instance_name)
return instance.write(requests)
except NotImplementedError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
except InvalidArgumentError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
except NotFoundError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.NOT_FOUND)
return bytestream_pb2.WriteResponse()
def _get_instance(self, instance_name):
try:
return self._instances[instance_name]
except KeyError:
raise InvalidArgumentError("Invalid instance name: {}".format(instance_name))
......@@ -14,31 +14,36 @@
"""
BuildGrid Instance
Execution Controller
==================
An instance of the BuildGrid server.
An instance of the Execution controller.
Contains scheduler, execution instance and an interface to the bots.
All this stuff you need to make the execution service work.
Contains scheduler, execution instance, an interface to the bots
and an operations instance.
"""
import logging
from .execution.instance import ExecutionInstance
from .scheduler import Scheduler
from .bots.instance import BotsInterface
from .execution.instance import ExecutionInstance
from .operations.instance import OperationsInstance
class BuildGridInstance(ExecutionInstance, BotsInterface):
class ExecutionController(ExecutionInstance, BotsInterface, OperationsInstance):
def __init__(self, action_cache=None, cas_storage=None):
def __init__(self, action_cache=None, storage=None):
scheduler = Scheduler(action_cache)
self.logger = logging.getLogger(__name__)
ExecutionInstance.__init__(self, scheduler, cas_storage)
ExecutionInstance.__init__(self, scheduler, storage)
BotsInterface.__init__(self, scheduler)
OperationsInstance.__init__(self, scheduler)
def stream_operation_updates(self, message_queue, operation_name):
operation = message_queue.get()
......
......@@ -35,10 +35,12 @@ from .._exceptions import InvalidArgumentError
class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
def __init__(self, instances):
def __init__(self, server, instances):
self.logger = logging.getLogger(__name__)
self._instances = instances
remote_execution_pb2_grpc.add_ExecutionServicer_to_server(self, server)
def Execute(self, request, context):
try:
message_queue = queue.Queue()
......
......@@ -32,10 +32,12 @@ from .._exceptions import InvalidArgumentError
class OperationsService(operations_pb2_grpc.OperationsServicer):
def __init__(self, instances):
def __init__(self, server, instances):
self._instances = instances
self.logger = logging.getLogger(__name__)
operations_pb2_grpc.add_OperationsServicer_to_server(self, server)
def GetOperation(self, request, context):
try:
name = request.name
......
......@@ -20,34 +20,70 @@ import grpc
from buildgrid._protos.buildstream.v2 import buildstream_pb2
from buildgrid._protos.buildstream.v2 import buildstream_pb2_grpc
from .._exceptions import NotFoundError
from .._exceptions import InvalidArgumentError, NotFoundError
class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer):
def __init__(self, reference_cache):
self._reference_cache = reference_cache
def __init__(self, server, instances):
self.logger = logging.getLogger(__name__)
self._instances = instances
buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server(self, server)
def GetReference(self, request, context):
try:
instance = self._get_instance(request.instance_name)
digest = instance.get_digest_reference(request.key)
response = buildstream_pb2.GetReferenceResponse()
response.digest.CopyFrom(self._reference_cache.get_digest_reference(request.key))
response.digest.CopyFrom(digest)
return response
except InvalidArgumentError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
except NotFoundError:
context.set_code(grpc.StatusCode.NOT_FOUND)
return buildstream_pb2.GetReferenceResponse()
def UpdateReference(self, request, context):
try:
instance = self._get_instance(request.instance_name)
digest = request.digest
for key in request.keys:
self._reference_cache.update_reference(key, request.digest)
instance.update_reference(key, digest)
return buildstream_pb2.UpdateReferenceResponse()
except InvalidArgumentError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
except NotImplementedError:
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
return buildstream_pb2.UpdateReferenceResponse()
def Status(self, request, context):
allow_updates = self._reference_cache.allow_updates
return buildstream_pb2.StatusResponse(allow_updates=allow_updates)
try:
instance = self._get_instance(request.instance_name)
allow_updates = instance.allow_updates
return buildstream_pb2.StatusResponse(allow_updates=allow_updates)
except InvalidArgumentError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
return buildstream_pb2.StatusResponse()
def _get_instance(self, instance_name):
try:
return self._instances[instance_name]
except KeyError:
raise InvalidArgumentError("Invalid instance name: {}".format(instance_name))
.. _dummy-build:
Dummy build
......@@ -8,7 +7,7 @@ In one terminal, start a server:
.. code-block:: sh
bgd server start --allow-insecure
bgd server start buildgrid/_app/settings/default.yml
In another terminal, send a request for work:
......
.. _simple-build:
Simple build
......@@ -27,7 +26,7 @@ Now start a BuildGrid server, passing it a directory it can write a CAS to:
.. code-block:: sh
bgd server start --allow-insecure --cas disk --cas-cache disk --cas-disk-directory /path/to/empty/directory
bgd server start buildgrid/_app/settings/default.yml
Start the following bot session:
......
......@@ -114,6 +114,7 @@ setup(
'protobuf',
'grpcio',
'Click',
'pyaml',
'boto3 < 1.8.0',
'botocore < 1.11.0',
'xdg',
......
......@@ -18,17 +18,25 @@
# pylint: disable=redefined-outer-name
import io
from unittest import mock
import grpc
from grpc._server import _Context
import pytest
from buildgrid._protos.google.bytestream import bytestream_pb2
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
from buildgrid.server.cas.storage.storage_abc import StorageABC
from buildgrid.server.cas.service import ByteStreamService
from buildgrid.server.cas.service import ContentAddressableStorageService
from buildgrid.server.cas.instance import ByteStreamInstance, ContentAddressableStorageInstance
from buildgrid.server.cas import service
from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
from buildgrid.settings import HASH
context = mock.create_autospec(_Context)
server = mock.create_autospec(grpc.server)
class SimpleStorage(StorageABC):
"""Storage provider wrapper around a dictionary.
......@@ -61,28 +69,18 @@ class SimpleStorage(StorageABC):
self.data[(digest.hash, digest.size_bytes)] = data
class MockObject:
def __init__(self):
self.abort = None
class MockException(Exception):
pass
def raise_mock_exception(*args, **kwargs):
raise MockException()
test_strings = [b"", b"hij"]
instances = ["", "test_inst"]
@pytest.mark.parametrize("data_to_read", test_strings)
@pytest.mark.parametrize("instance", instances)
def test_bytestream_read(data_to_read, instance):
@mock.patch.object(service, 'bytestream_pb2_grpc', autospec=True)
def test_bytestream_read(mocked, data_to_read, instance):
storage = SimpleStorage([b"abc", b"defg", data_to_read])
servicer = ByteStreamService(storage)
bs_instance = ByteStreamInstance(storage)
servicer = ByteStreamService(server, {instance: bs_instance})
request = bytestream_pb2.ReadRequest()
if instance != "":
......@@ -96,11 +94,13 @@ def test_bytestream_read(data_to_read, instance):
@pytest.mark.parametrize("instance", instances)
def test_bytestream_read_many(instance):
@mock.patch.object(service, 'bytestream_pb2_grpc', autospec=True)
def test_bytestream_read_many(mocked, instance):
data_to_read = b"testing" * 10000
storage = SimpleStorage([b"abc", b"defg", data_to_read])
servicer = ByteStreamService(storage)
bs_instance = ByteStreamInstance(storage)
servicer = ByteStreamService(server, {instance: bs_instance})
request = bytestream_pb2.ReadRequest()
if instance != "":
......@@ -115,9 +115,11 @@ def test_bytestream_read_many(instance):
@pytest.mark.parametrize("instance", instances)
@pytest.mark.parametrize("extra_data", ["", "/", "/extra/data"])
def test_bytestream_write(instance, extra_data):
@mock.patch.object(service, 'bytestream_pb2_grpc', autospec=True)
def test_bytestream_write(mocked, instance, extra_data):
storage = SimpleStorage()
servicer = ByteStreamService(storage)
bs_instance = ByteStreamInstance(storage)
servicer = ByteStreamService(server, {instance: bs_instance})
resource_name = ""
if instance != "":
......@@ -137,9 +139,11 @@ def test_bytestream_write(instance, extra_data):
assert storage.data[(hash_, 6)] == b'abcdef'
def test_bytestream_write_rejects_wrong_hash():
@mock.patch.object(service, 'bytestream_pb2_grpc', autospec=True)
def test_bytestream_write_rejects_wrong_hash(mocked):
storage = SimpleStorage()
servicer = ByteStreamService(storage)
bs_instance = ByteStreamInstance(storage)
servicer = ByteStreamService(server, {"": bs_instance})
data = b'some data'
wrong_hash = HASH(b'incorrect').hexdigest()
......@@ -148,18 +152,18 @@ def test_bytestream_write_rejects_wrong_hash():
bytestream_pb2.WriteRequest(resource_name=resource_name, data=data, finish_write=True)
]
context = MockObject()
context.abort = raise_mock_exception
with pytest.raises(MockException):
servicer.Write(requests, context)
servicer.Write(requests, context)
context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
assert len(storage.data) is 0
@pytest.mark.parametrize("instance", instances)
def test_cas_find_missing_blobs(instance):
@mock.patch.object(service, 'remote_execution_pb2_grpc', autospec=True)
def test_cas_find_missing_blobs(mocked, instance):
storage = SimpleStorage([b'abc', b'def'])
servicer = ContentAddressableStorageService(storage)
cas_instance = ContentAddressableStorageInstance(storage)
servicer = ContentAddressableStorageService(server, {instance: cas_instance})
digests = [
re_pb2.Digest(hash=HASH(b'def').hexdigest(), size_bytes=3),
re_pb2.Digest(hash=HASH(b'ghij').hexdigest(), size_bytes=4)
......@@ -171,9 +175,12 @@ def test_cas_find_missing_blobs(instance):
@pytest.mark.parametrize("instance", instances)
def test_cas_batch_update_blobs(instance):
@mock.patch.object(service, 'remote_execution_pb2_grpc', autospec=True)
def test_cas_batch_update_blobs(mocked, instance):
storage = SimpleStorage()
servicer = ContentAddressableStorageService(storage)
cas_instance = ContentAddressableStorageInstance(storage)
servicer = ContentAddressableStorageService(server, {instance: cas_instance})
update_requests = [
re_pb2.BatchUpdateBlobsRequest.Request(
digest=re_pb2.Digest(hash=HASH(b'abc').hexdigest(), size_bytes=3), data=b'abc'),
......@@ -181,16 +188,21 @@ def test_cas_batch_update_blobs(instance):
digest=re_pb2.Digest(hash="invalid digest!", size_bytes=1000),
data=b'wrong data')
]
request = re_pb2.BatchUpdateBlobsRequest(instance_name=instance, requests=update_requests)
response = servicer.BatchUpdateBlobs(request, None)
assert len(response.responses) == 2
for blob_response in response.responses:
if blob_response.digest == update_requests[0].digest:
assert blob_response.status.code == 0
elif blob_response.digest == update_requests[1].digest:
assert blob_response.status.code != 0
else:
raise Exception("Unexpected blob response")
assert len(storage.data) == 1
assert (update_requests[0].digest.hash, 3) in storage.data
assert storage.data[(update_requests[0].digest.hash, 3)] == b'abc'
......@@ -19,18 +19,28 @@
import tempfile
from unittest import mock
import boto3
import grpc
from grpc._server import _Context
import pytest
from moto import mock_s3
from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
from buildgrid.server.cas import service
from buildgrid.server.cas.instance import ByteStreamInstance, ContentAddressableStorageInstance
from buildgrid.server.cas.storage import remote
from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
from buildgrid.server.cas.storage.disk import DiskStorage
from buildgrid.server.cas.storage.s3 import S3Storage
from buildgrid.server.cas.storage.with_cache import WithCacheStorage
from buildgrid.settings import HASH
context = mock.create_autospec(_Context)
server = mock.create_autospec(grpc.server)
abc = b"abc"
abc_digest = Digest(hash=HASH(abc).hexdigest(), size_bytes=3)
defg = b"defg"
......@@ -45,10 +55,64 @@ def write(storage, digest, blob):
storage.commit_write(digest, session)
class MockCASStorage(ByteStreamInstance, ContentAddressableStorageInstance):
def __init__(self):
storage = LRUMemoryCache(256)
super().__init__(storage)
# Mock a CAS server with LRUStorage to return "calls" made to it
class MockStubServer:
def __init__(self):
instances = {"": MockCASStorage(), "dna": MockCASStorage()}
self._requests = []
with mock.patch.object(service, 'bytestream_pb2_grpc'):
self._bs_service = service.ByteStreamService(server, instances)
with mock.patch.object(service, 'remote_execution_pb2_grpc'):
self._cas_service = service.ContentAddressableStorageService(server, instances)
def Read(self, request):
yield from self._bs_service.Read(request, context)
def Write(self, request):
self._requests.append(request)
if request.finish_write:
response = self._bs_service.Write(self._requests, context)
self._requests = []
return response
return None
def FindMissingBlobs(self, request):
return self._cas_service.FindMissingBlobs(request, context)
def BatchUpdateBlobs(self, request):
return self._cas_service.BatchUpdateBlobs(request, context)
# Instances of MockCASStorage
@pytest.fixture(params=["", "dna"])
def instance(params):
return {params, MockCASStorage()}
@pytest.fixture()
@mock.patch.object(remote, 'bytestream_pb2_grpc')
@mock.patch.object(remote, 'remote_execution_pb2_grpc')
def remote_storage(mock_bs_grpc, mock_re_pb2_grpc):
mock_server = MockStubServer()
storage = remote.RemoteStorage(instance)
storage._stub_bs = mock_server
storage._stub_cas = mock_server
yield storage
# General tests for all storage providers
@pytest.fixture(params=["lru", "disk", "s3", "lru_disk", "disk_s3"])
@pytest.fixture(params=["lru", "disk", "s3", "lru_disk", "disk_s3", "remote"])
def any_storage(request):
if request.param == "lru":
yield LRUMemoryCache(256)
......@@ -70,6 +134,14 @@ def any_storage(request):
with mock_s3():
boto3.resource('s3').create_bucket(Bucket="testing")
yield WithCacheStorage(DiskStorage(path), S3Storage("testing"))
elif request.param == "remote":
with mock.patch.object(remote, 'bytestream_pb2_grpc'):
with mock.patch.object(remote, 'remote_execution_pb2_grpc'):
mock_server = MockStubServer()
storage = remote.RemoteStorage(instance)
storage._stub_bs = mock_server
storage._stub_cas = mock_server
yield storage
def test_initially_empty(any_storage):
......