Skip to content
Commits on Source (14)
......@@ -30,7 +30,7 @@ before_script:
.run-dummy-job-template: &dummy-job
stage: test
script:
- ${BGD} server start --allow-insecure &
- ${BGD} server start buildgrid/_app/settings/default.yml &
- sleep 1 # Allow server to boot
- ${BGD} bot dummy &
- ${BGD} execute request-dummy --wait-for-completion
......
......@@ -19,83 +19,126 @@ import tempfile
from google.protobuf import any_pb2
from buildgrid.client.cas import upload
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
from buildgrid.utils import parse_to_pb2_from_fetch
from buildgrid.utils import read_file, write_file, parse_to_pb2_from_fetch
def work_buildbox(context, lease):
"""Executes a lease for a build action, using buildbox.
"""
stub_bytestream = bytestream_pb2_grpc.ByteStreamStub(context.cas_channel)
local_cas_directory = context.local_cas
logger = context.logger
action_digest_any = lease.payload
action_digest = remote_execution_pb2.Digest()
action_digest_any.Unpack(action_digest)
lease.payload.Unpack(action_digest)
action = parse_to_pb2_from_fetch(remote_execution_pb2.Action(),
stub_bytestream, action_digest)
stub = bytestream_pb2_grpc.ByteStreamStub(context.cas_channel)
command = parse_to_pb2_from_fetch(remote_execution_pb2.Command(),
stub_bytestream, action.command_digest)
action = remote_execution_pb2.Action()
parse_to_pb2_from_fetch(action, stub, action_digest)
environment = dict()
for variable in command.environment_variables:
if variable.name not in ['PWD']:
environment[variable.name] = variable.value
casdir = context.local_cas
remote_command = remote_execution_pb2.Command()
parse_to_pb2_from_fetch(remote_command, stub, action.command_digest)
if command.working_directory:
working_directory = command.working_directory
else:
working_directory = '/'
environment = dict((x.name, x.value) for x in remote_command.environment_variables)
logger.debug("command hash: {}".format(action.command_digest.hash))
logger.debug("vdir hash: {}".format(action.input_root_digest.hash))
logger.debug("\n{}".format(' '.join(remote_command.arguments)))
# Input hash must be written to disk for buildbox.
os.makedirs(os.path.join(casdir, 'tmp'), exist_ok=True)
with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as input_digest_file:
with open(input_digest_file.name, 'wb') as f:
f.write(action.input_root_digest.SerializeToString())
f.flush()
with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as output_digest_file:
command = ['buildbox',
'--remote={}'.format(context.remote_cas_url),
'--input-digest={}'.format(input_digest_file.name),
'--output-digest={}'.format(output_digest_file.name),
'--local={}'.format(casdir)]
logger.debug("\n{}".format(' '.join(command.arguments)))
os.makedirs(os.path.join(local_cas_directory, 'tmp'), exist_ok=True)
os.makedirs(context.fuse_dir, exist_ok=True)
with tempfile.NamedTemporaryFile(dir=os.path.join(local_cas_directory, 'tmp')) as input_digest_file:
# Input hash must be written to disk for BuildBox
write_file(input_digest_file.name, action.input_root_digest.SerializeToString())
with tempfile.NamedTemporaryFile(dir=os.path.join(local_cas_directory, 'tmp')) as output_digest_file:
command_line = ['buildbox',
'--remote={}'.format(context.remote_cas_url),
'--input-digest={}'.format(input_digest_file.name),
'--output-digest={}'.format(output_digest_file.name),
'--chdir={}'.format(working_directory),
'--local={}'.format(local_cas_directory)]
if context.cas_client_key:
command.append('--client-key={}'.format(context.cas_client_key))
command_line.append('--client-key={}'.format(context.cas_client_key))
if context.cas_client_cert:
command.append('--client-cert={}'.format(context.cas_client_cert))
command_line.append('--client-cert={}'.format(context.cas_client_cert))
if context.cas_server_cert:
command.append('--server-cert={}'.format(context.cas_server_cert))
if 'PWD' in environment and environment['PWD']:
command.append('--chdir={}'.format(environment['PWD']))
command_line.append('--server-cert={}'.format(context.cas_server_cert))
command.append(context.fuse_dir)
command.extend(remote_command.arguments)
command_line.append(context.fuse_dir)
command_line.extend(command.arguments)
logger.debug(' '.join(command))
logger.debug(' '.join(command_line))
logger.debug("Input root digest:\n{}".format(action.input_root_digest))
logger.info("Launching process")
proc = subprocess.Popen(command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
proc.communicate()
command_line = subprocess.Popen(command_line,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
# TODO: Should return the stdout and stderr to the user.
command_line.communicate()
output_root_digest = remote_execution_pb2.Digest()
with open(output_digest_file.name, 'rb') as f:
output_root_digest.ParseFromString(f.read())
logger.debug("Output root digest: {}".format(output_root_digest))
output_digest = remote_execution_pb2.Digest()
output_digest.ParseFromString(read_file(output_digest_file.name))
if len(output_root_digest.hash) < 64:
logger.debug("Output root digest: {}".format(output_digest))
if len(output_digest.hash) < 64:
logger.warning("Buildbox command failed - no output root digest present.")
output_file = remote_execution_pb2.OutputDirectory(tree_digest=output_root_digest)
action_result = remote_execution_pb2.ActionResult()
action_result.output_directories.extend([output_file])
# TODO: Have BuildBox helping us creating the Tree instance here
# See https://gitlab.com/BuildStream/buildbox/issues/7 for details
output_tree = _cas_tree_maker(stub_bytestream, output_digest)
with upload(context.cas_channel) as cas:
output_tree_digest = cas.send_message(output_tree)
action_result_any = any_pb2.Any()
action_result_any.Pack(action_result)
output_directory = remote_execution_pb2.OutputDirectory()
output_directory.tree_digest.CopyFrom(output_tree_digest)
output_directory.path = os.path.relpath(working_directory, start='/')
lease.result.CopyFrom(action_result_any)
action_result = remote_execution_pb2.ActionResult()
action_result.output_directories.extend([output_directory])
action_result_any = any_pb2.Any()
action_result_any.Pack(action_result)
lease.result.CopyFrom(action_result_any)
return lease
def _cas_tree_maker(stub_bytestream, directory_digest):
# Generates and stores a Tree for a given Directory. This is very inefficient
# and only temporary. See https://gitlab.com/BuildStream/buildbox/issues/7.
output_tree = remote_execution_pb2.Tree()
def list_directories(parent_directory):
directory_list = list()
for directory_node in parent_directory.directories:
directory = parse_to_pb2_from_fetch(remote_execution_pb2.Directory(),
stub_bytestream, directory_node.digest)
directory_list.extend(list_directories(directory))
directory_list.append(directory)
return directory_list
root_directory = parse_to_pb2_from_fetch(remote_execution_pb2.Directory(),
stub_bytestream, directory_digest)
output_tree.children.extend(list_directories(root_directory))
output_tree.root.CopyFrom(root_directory)
return output_tree
......@@ -170,11 +170,8 @@ class BuildGridCLI(click.MultiCommand):
return commands
def get_command(self, context, name):
try:
mod = __import__(name='buildgrid._app.commands.cmd_{}'.format(name),
fromlist=['cli'])
except ImportError:
return None
mod = __import__(name='buildgrid._app.commands.cmd_{}'.format(name),
fromlist=['cli'])
return mod.cli
......
......@@ -56,8 +56,7 @@ def cli(context, remote, instance_name, client_key, client_cert, server_cert):
else:
credentials = context.load_client_credentials(client_key, client_cert, server_cert)
if not credentials:
click.echo("ERROR: no TLS keys were specified and no defaults could be found.\n" +
"Use --allow-insecure in order to deactivate TLS encryption.\n", err=True)
click.echo("ERROR: no TLS keys were specified and no defaults could be found.", err=True)
sys.exit(-1)
context.channel = grpc.secure_channel(context.remote, credentials)
......
......@@ -33,7 +33,6 @@ import grpc
from buildgrid.utils import merkle_maker, create_digest, write_fetch_blob
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
from buildgrid._protos.google.longrunning import operations_pb2, operations_pb2_grpc
from ..cli import pass_context
......@@ -61,8 +60,7 @@ def cli(context, remote, instance_name, client_key, client_cert, server_cert):
else:
credentials = context.load_client_credentials(client_key, client_cert, server_cert)
if not credentials:
click.echo("ERROR: no TLS keys were specified and no defaults could be found.\n" +
"Use --allow-insecure in order to deactivate TLS encryption.\n", err=True)
click.echo("ERROR: no TLS keys were specified and no defaults could be found.", err=True)
sys.exit(-1)
context.channel = grpc.secure_channel(context.remote, credentials)
......@@ -99,37 +97,6 @@ def request_dummy(context, number, wait_for_completion):
context.logger.info(next(response))
@cli.command('status', short_help="Get the status of an operation.")
@click.argument('operation-name', nargs=1, type=click.STRING, required=True)
@pass_context
def operation_status(context, operation_name):
context.logger.info("Getting operation status...")
stub = operations_pb2_grpc.OperationsStub(context.channel)
request = operations_pb2.GetOperationRequest(name=operation_name)
response = stub.GetOperation(request)
context.logger.info(response)
@cli.command('list', short_help="List operations.")
@pass_context
def list_operations(context):
context.logger.info("Getting list of operations")
stub = operations_pb2_grpc.OperationsStub(context.channel)
request = operations_pb2.ListOperationsRequest(name=context.instance_name)
response = stub.ListOperations(request)
if not response.operations:
context.logger.warning("No operations to list")
return
for op in response.operations:
context.logger.info(op)
@cli.command('wait', short_help="Streams an operation until it is complete.")
@click.argument('operation-name', nargs=1, type=click.STRING, required=True)
@pass_context
......
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Operations command
=================
Check the status of operations
"""
import logging
from urllib.parse import urlparse
import sys
import click
import grpc
from buildgrid._protos.google.longrunning import operations_pb2, operations_pb2_grpc
from ..cli import pass_context
@click.group(name='operation', short_help="Long running operations commands")
@click.option('--remote', type=click.STRING, default='http://localhost:50051', show_default=True,
help="Remote execution server's URL (port defaults to 50051 if no specified).")
@click.option('--client-key', type=click.Path(exists=True, dir_okay=False), default=None,
help="Private client key for TLS (PEM-encoded)")
@click.option('--client-cert', type=click.Path(exists=True, dir_okay=False), default=None,
help="Public client certificate for TLS (PEM-encoded)")
@click.option('--server-cert', type=click.Path(exists=True, dir_okay=False), default=None,
help="Public server certificate for TLS (PEM-encoded)")
@click.option('--instance-name', type=click.STRING, default='main', show_default=True,
help="Targeted farm instance name.")
@pass_context
def cli(context, remote, instance_name, client_key, client_cert, server_cert):
url = urlparse(remote)
context.remote = '{}:{}'.format(url.hostname, url.port or 50051)
context.instance_name = instance_name
if url.scheme == 'http':
context.channel = grpc.insecure_channel(context.remote)
else:
credentials = context.load_client_credentials(client_key, client_cert, server_cert)
if not credentials:
click.echo("ERROR: no TLS keys were specified and no defaults could be found.", err=True)
sys.exit(-1)
context.channel = grpc.secure_channel(context.remote, credentials)
context.logger = logging.getLogger(__name__)
context.logger.debug("Starting for remote {}".format(context.remote))
@cli.command('status', short_help="Get the status of an operation.")
@click.argument('operation-name', nargs=1, type=click.STRING, required=True)
@pass_context
def status(context, operation_name):
context.logger.info("Getting operation status...")
stub = operations_pb2_grpc.OperationsStub(context.channel)
request = operations_pb2.GetOperationRequest(name=operation_name)
response = stub.GetOperation(request)
context.logger.info(response)
@cli.command('list', short_help="List operations.")
@pass_context
def lists(context):
context.logger.info("Getting list of operations")
stub = operations_pb2_grpc.OperationsStub(context.channel)
request = operations_pb2.ListOperationsRequest(name=context.instance_name)
response = stub.ListOperations(request)
if not response.operations:
context.logger.warning("No operations to list")
return
for op in response.operations:
context.logger.info(op)
......@@ -26,16 +26,14 @@ import sys
import click
from buildgrid.server import buildgrid_server
from buildgrid.server.cas.storage.disk import DiskStorage
from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
from buildgrid.server.cas.storage.s3 import S3Storage
from buildgrid.server.cas.storage.with_cache import WithCacheStorage
from buildgrid.server.execution.action_cache import ActionCache
from buildgrid.server.controller import ExecutionController
from buildgrid.server.actioncache.storage import ActionCache
from buildgrid.server.cas.instance import ByteStreamInstance, ContentAddressableStorageInstance
from buildgrid.server.referencestorage.storage import ReferenceCache
from ..cli import pass_context
_SIZE_PREFIXES = {'k': 2 ** 10, 'm': 2 ** 20, 'g': 2 ** 30, 't': 2 ** 40}
from ..settings import parser
from ..server import BuildGridServer
@click.group(name='server', short_help="Start a local server instance.")
......@@ -45,71 +43,58 @@ def cli(context):
@cli.command('start', short_help="Setup a new server instance.")
@click.argument('instances', nargs=-1, type=click.STRING)
@click.option('--port', type=click.INT, default='50051', show_default=True,
help="The port number to be listened.")
@click.option('--server-key', type=click.Path(exists=True, dir_okay=False), default=None,
help="Private server key for TLS (PEM-encoded)")
@click.option('--server-cert', type=click.Path(exists=True, dir_okay=False), default=None,
help="Public server certificate for TLS (PEM-encoded)")
@click.option('--client-certs', type=click.Path(exists=True, dir_okay=False), default=None,
help="Public client certificates for TLS (PEM-encoded, one single file)")
@click.option('--allow-insecure', type=click.BOOL, is_flag=True,
help="Whether or not to allow unencrypted connections.")
@click.option('--allow-update-action-result/--forbid-update-action-result',
'allow_uar', default=True, show_default=True,
help="Whether or not to allow clients to manually edit the action cache.")
@click.option('--max-cached-actions', type=click.INT, default=50, show_default=True,
help="Maximum number of actions to keep in the ActionCache.")
@click.option('--cas', type=click.Choice(('lru', 's3', 'disk', 'with-cache')),
help="The CAS storage type to use.")
@click.option('--cas-cache', type=click.Choice(('lru', 's3', 'disk')),
help="For --cas=with-cache, the CAS storage to use as the cache.")
@click.option('--cas-fallback', type=click.Choice(('lru', 's3', 'disk')),
help="For --cas=with-cache, the CAS storage to use as the fallback.")
@click.option('--cas-lru-size', type=click.STRING,
help="For --cas=lru, the LRU cache's memory limit.")
@click.option('--cas-s3-bucket', type=click.STRING,
help="For --cas=s3, the bucket name.")
@click.option('--cas-s3-endpoint', type=click.STRING,
help="For --cas=s3, the endpoint URI.")
@click.option('--cas-disk-directory', type=click.Path(file_okay=False, dir_okay=True, writable=True),
help="For --cas=disk, the folder to store CAS blobs in.")
@click.argument('CONFIG', type=click.Path(file_okay=True, dir_okay=False, writable=False))
@pass_context
def start(context, port, allow_insecure, server_key, server_cert, client_certs,
instances, max_cached_actions, allow_uar, cas, **cas_args):
"""Setups a new server instance."""
credentials = None
if not allow_insecure:
credentials = context.load_server_credentials(server_key, server_cert, client_certs)
if not credentials and not allow_insecure:
click.echo("ERROR: no TLS keys were specified and no defaults could be found.\n" +
"Use --allow-insecure in order to deactivate TLS encryption.\n", err=True)
sys.exit(-1)
context.credentials = credentials
context.port = port
context.logger.info("BuildGrid server booting up")
context.logger.info("Starting on port {}".format(port))
def start(context, config):
with open(config) as f:
settings = parser.get_parser().safe_load(f)
cas_storage = _make_cas_storage(context, cas, cas_args)
server_settings = settings['server']
insecure_mode = server_settings['insecure-mode']
if cas_storage is None:
context.logger.info("Running without CAS - action cache will be unavailable")
action_cache = None
else:
action_cache = ActionCache(cas_storage, max_cached_actions, allow_uar)
if instances is None:
instances = ['main']
credentials = None
if not insecure_mode:
server_key = server_settings['tls-server-key']
server_cert = server_settings['tls-server-cert']
client_certs = server_settings['tls-client-certs']
credentials = context.load_server_credentials(server_key, server_cert, client_certs)
server = buildgrid_server.BuildGridServer(port=context.port,
credentials=context.credentials,
instances=instances,
cas_storage=cas_storage,
action_cache=action_cache)
if not credentials:
click.echo("ERROR: no TLS keys were specified and no defaults could be found.\n" +
"Set `insecure-mode: false` in order to deactivate TLS encryption.\n", err=True)
sys.exit(-1)
instances = settings['instances']
execution_controllers = _instance_maker(instances, ExecutionController)
execution_instances = {}
bots_interfaces = {}
operations_instances = {}
# TODO: map properly in parser
for k, v in execution_controllers.items():
execution_instances[k] = v.execution_instance
bots_interfaces[k] = v.bots_interface
operations_instances[k] = v.operations_instance
reference_caches = _instance_maker(instances, ReferenceCache)
action_caches = _instance_maker(instances, ActionCache)
cas = _instance_maker(instances, ContentAddressableStorageInstance)
bytestreams = _instance_maker(instances, ByteStreamInstance)
port = server_settings['port']
server = BuildGridServer(port=port,
credentials=credentials,
execution_instances=execution_instances,
bots_interfaces=bots_interfaces,
operations_instances=operations_instances,
reference_storage_instances=reference_caches,
action_cache_instances=action_caches,
cas_instances=cas,
bytestream_instances=bytestreams)
context.logger.info("Starting server on port {}".format(port))
loop = asyncio.get_event_loop()
try:
server.start()
......@@ -119,57 +104,20 @@ def start(context, port, allow_insecure, server_key, server_cert, client_certs,
pass
finally:
context.logger.info("Stopping server")
server.stop()
loop.close()
def _make_cas_storage(context, cas_type, cas_args):
"""Returns the storage provider corresponding to the given `cas_type`,
or None if the provider cannot be created.
"""
if cas_type == "lru":
if cas_args["cas_lru_size"] is None:
context.logger.error("--cas-lru-size is required for LRU CAS")
return None
try:
size = _parse_size(cas_args["cas_lru_size"])
except ValueError:
context.logger.error('Invalid LRU size "{0}"'.format(cas_args["cas_lru_size"]))
return None
return LRUMemoryCache(size)
elif cas_type == "s3":
if cas_args["cas_s3_bucket"] is None:
context.logger.error("--cas-s3-bucket is required for S3 CAS")
return None
if cas_args["cas_s3_endpoint"] is not None:
return S3Storage(cas_args["cas_s3_bucket"],
endpoint_url=cas_args["cas_s3_endpoint"])
return S3Storage(cas_args["cas_s3_bucket"])
elif cas_type == "disk":
if cas_args["cas_disk_directory"] is None:
context.logger.error("--cas-disk-directory is required for disk CAS")
return None
return DiskStorage(cas_args["cas_disk_directory"])
elif cas_type == "with-cache":
cache = _make_cas_storage(context, cas_args["cas_cache"], cas_args)
fallback = _make_cas_storage(context, cas_args["cas_fallback"], cas_args)
if cache is None:
context.logger.error("Missing cache provider for --cas=with-cache")
return None
elif fallback is None:
context.logger.error("Missing fallback provider for --cas=with-cache")
return None
return WithCacheStorage(cache, fallback)
elif cas_type is None:
return None
return None
def _parse_size(size):
"""Convert a string containing a size in bytes (e.g. '2GB') to a number."""
size = size.lower()
if size[-1] == 'b':
size = size[:-1]
if size[-1] in _SIZE_PREFIXES:
return int(size[:-1]) * _SIZE_PREFIXES[size[-1]]
return int(size)
# Turn away now if you want to keep your eyes
def _instance_maker(instances, service_type):
# TODO get this mapped in parser
made = {}
for instance in instances:
services = instance['services']
instance_name = instance['name']
for service in services:
if isinstance(service, service_type):
made[instance_name] = service
return made
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
BuildGridServer
==============
Creates a BuildGrid server, binding all the requisite service instances together.
"""
import logging
from concurrent import futures
import grpc
from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
from buildgrid.server.actioncache.service import ActionCacheService
from buildgrid.server.execution.service import ExecutionService
from buildgrid.server.operations.service import OperationsService
from buildgrid.server.bots.service import BotsService
from buildgrid.server.referencestorage.service import ReferenceStorageService
class BuildGridServer:
def __init__(self, port=50051, max_workers=10, credentials=None,
execution_instances=None, bots_interfaces=None, operations_instances=None,
operations_service_instances=None, reference_storage_instances=None,
action_cache_instances=None, cas_instances=None, bytestream_instances=None):
self.logger = logging.getLogger(__name__)
address = '[::]:{0}'.format(port)
server = grpc.server(futures.ThreadPoolExecutor(max_workers))
if credentials is not None:
self.logger.info("Secure connection")
server.add_secure_port(address, credentials)
else:
self.logger.info("Insecure connection")
server.add_insecure_port(address)
if execution_instances:
self.logger.debug("Adding execution instances {}".format(
execution_instances.keys()))
ExecutionService(server, execution_instances)
if bots_interfaces:
self.logger.debug("Adding bots interfaces {}".format(
bots_interfaces.keys()))
BotsService(server, bots_interfaces)
if operations_instances:
self.logger.debug("Adding operations instances {}".format(
operations_instances.keys()))
OperationsService(server, operations_instances)
if reference_storage_instances:
self.logger.debug("Adding reference storages {}".format(
reference_storage_instances.keys()))
ReferenceStorageService(server, reference_storage_instances)
if action_cache_instances:
self.logger.debug("Adding action cache instances {}".format(
action_cache_instances.keys()))
ActionCacheService(server, action_cache_instances)
if cas_instances:
self.logger.debug("Adding cas instances {}".format(
cas_instances.keys()))
ContentAddressableStorageService(server, cas_instances)
if bytestream_instances:
self.logger.debug("Adding bytestream instances {}".format(
bytestream_instances.keys()))
ByteStreamService(server, bytestream_instances)
self._server = server
def start(self):
self._server.start()
def stop(self):
self._server.stop(grace=0)
server:
port: 50051
tls-server-key: null
tls-server-cert: null
tls-client-certs: null
insecure-mode: true
description: |
A single default instance
instances:
- name: main
description: |
The main server
storages:
- !disk-storage &main-storage
path: ~/cas/
services:
- !action-cache &main-action
storage: *main-storage
max_cached_refs: 256
allow_updates: true
- !execution
storage: *main-storage
action_cache: *main-action
- !cas
storage: *main-storage
- !bytestream
storage: *main-storage
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import yaml
from buildgrid.server.controller import ExecutionController
from buildgrid.server.actioncache.storage import ActionCache
from buildgrid.server.cas.instance import ByteStreamInstance, ContentAddressableStorageInstance
from buildgrid.server.cas.storage.disk import DiskStorage
from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
from buildgrid.server.cas.storage.s3 import S3Storage
from buildgrid.server.cas.storage.with_cache import WithCacheStorage
class YamlFactory(yaml.YAMLObject):
@classmethod
def from_yaml(cls, loader, node):
values = loader.construct_mapping(node, deep=True)
return cls(**values)
class Disk(YamlFactory):
yaml_tag = u'!disk-storage'
def __new__(cls, path):
path = os.path.expanduser(path)
return DiskStorage(path)
class LRU(YamlFactory):
yaml_tag = u'!lru-storage'
def __new__(cls, size):
return LRUMemoryCache(_parse_size(size))
class S3(YamlFactory):
yaml_tag = u'!s3-storage'
def __new__(cls, bucket, endpoint):
return S3Storage(bucket, endpoint_url=endpoint)
class WithCache(YamlFactory):
yaml_tag = u'!with-cache-storage'
def __new__(cls, cache, fallback):
return WithCacheStorage(cache, fallback)
class Execution(YamlFactory):
yaml_tag = u'!execution'
def __new__(cls, storage, action_cache=None):
return ExecutionController(action_cache, storage)
class Action(YamlFactory):
yaml_tag = u'!action-cache'
def __new__(cls, storage, max_cached_refs=0, allow_updates=True):
return ActionCache(storage, max_cached_refs, allow_updates)
class CAS(YamlFactory):
yaml_tag = u'!cas'
def __new__(cls, storage):
return ContentAddressableStorageInstance(storage)
class ByteStream(YamlFactory):
yaml_tag = u'!bytestream'
def __new__(cls, storage):
return ByteStreamInstance(storage)
def _parse_size(size):
"""Convert a string containing a size in bytes (e.g. '2GB') to a number."""
_size_prefixes = {'k': 2 ** 10, 'm': 2 ** 20, 'g': 2 ** 30, 't': 2 ** 40}
size = size.lower()
if size[-1] == 'b':
size = size[:-1]
if size[-1] in _size_prefixes:
return int(size[:-1]) * _size_prefixes[size[-1]]
return int(size)
def get_parser():
yaml.SafeLoader.add_constructor(Execution.yaml_tag, Execution.from_yaml)
yaml.SafeLoader.add_constructor(Execution.yaml_tag, Execution.from_yaml)
yaml.SafeLoader.add_constructor(Action.yaml_tag, Action.from_yaml)
yaml.SafeLoader.add_constructor(Disk.yaml_tag, Disk.from_yaml)
yaml.SafeLoader.add_constructor(LRU.yaml_tag, LRU.from_yaml)
yaml.SafeLoader.add_constructor(S3.yaml_tag, S3.from_yaml)
yaml.SafeLoader.add_constructor(WithCache.yaml_tag, WithCache.from_yaml)
yaml.SafeLoader.add_constructor(CAS.yaml_tag, CAS.from_yaml)
yaml.SafeLoader.add_constructor(ByteStream.yaml_tag, ByteStream.from_yaml)
return yaml
......@@ -39,9 +39,7 @@ class BgdError(Exception):
class ErrorDomain(Enum):
SERVER = 1
EXECUTION = 2
WORKER = 3
BOT = 4
BOT = 2
class ServerError(BgdError):
......@@ -49,16 +47,6 @@ class ServerError(BgdError):
super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
class ExecutionError(BgdError):
def __init__(self, message, detail=None, reason=None):
super().__init__(message, detail=detail, domain=ErrorDomain.EXECUTION, reason=reason)
class WorkerError(BgdError):
def __init__(self, message, detail=None, reason=None):
super().__init__(message, detail=detail, domain=ErrorDomain.WORKER, reason=reason)
class BotError(BgdError):
def __init__(self, message, detail=None, reason=None):
super().__init__(message, detail=detail, domain=ErrorDomain.BOT, reason=reason)
......@@ -38,3 +38,11 @@ class OutofSyncError(BgdError):
def __init__(self, message, detail=None, reason=None):
super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
class OutOfRangeError(BgdError):
""" ByteStream service read data out of range
"""
def __init__(self, message, detail=None, reason=None):
super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
......@@ -27,18 +27,27 @@ import grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
from .._exceptions import NotFoundError
from .._exceptions import InvalidArgumentError, NotFoundError
class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer):
def __init__(self, action_cache):
self._action_cache = action_cache
def __init__(self, server, instances):
self._instances = instances
self.logger = logging.getLogger(__name__)
remote_execution_pb2_grpc.add_ActionCacheServicer_to_server(self, server)
def GetActionResult(self, request, context):
try:
return self._action_cache.get_action_result(request.action_digest)
instance = self._get_instance(request.instance_name)
return instance.get_action_result(request.action_digest)
except InvalidArgumentError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
except NotFoundError as e:
self.logger.error(e)
......@@ -48,11 +57,24 @@ class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer):
def UpdateActionResult(self, request, context):
try:
self._action_cache.update_action_result(request.action_digest, request.action_result)
instance = self._get_instance(request.instance_name)
instance.update_action_result(request.action_digest, request.action_result)
return request.action_result
except InvalidArgumentError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
except NotImplementedError as e:
self.logger.error(e)
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
return remote_execution_pb2.ActionResult()
def _get_instance(self, instance_name):
try:
return self._instances[instance_name]
except KeyError:
raise InvalidArgumentError("Invalid instance name: {}".format(instance_name))
......@@ -21,7 +21,7 @@ Implements an in-memory action Cache
"""
from ..cas.reference_cache import ReferenceCache
from ..referencestorage.storage import ReferenceCache
class ActionCache(ReferenceCache):
......