Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • coverity
  • master
  • sminskyprimu/blake3
  • sotk/features/lease-audit-table
  • sotk/logstream-testing
  • zchen723/skip-scheduler-metrics
  • 0.0.10
  • 0.0.11
  • 0.0.12
  • 0.0.13
  • 0.0.14
  • 0.0.16
  • 0.0.17
  • 0.0.19
  • 0.0.2
  • 0.0.20
  • 0.0.21
  • 0.0.23
  • 0.0.25
  • 0.0.26
  • 0.0.27
  • 0.0.28
  • 0.0.29
  • 0.0.3
  • 0.0.30
  • 0.0.31
  • 0.0.32
  • 0.0.33
  • 0.0.34
  • 0.0.35
  • 0.0.36
  • 0.0.37
  • 0.0.38
  • 0.0.39
  • 0.0.4
  • 0.0.40
  • 0.0.41
  • 0.0.42
  • 0.0.43
  • 0.0.44
  • 0.0.45
  • 0.0.46
  • 0.0.47
  • 0.0.48
  • 0.0.49
  • 0.0.5
  • 0.0.50
  • 0.0.51
  • 0.0.52
  • 0.0.53
  • 0.0.54
  • 0.0.55
  • 0.0.56
  • 0.0.57
  • 0.0.58
  • 0.0.59
  • 0.0.6
  • 0.0.60
  • 0.0.61
  • 0.0.62
  • 0.0.63
  • 0.0.64
  • 0.0.65
  • 0.0.66
  • 0.0.67
  • 0.0.68
  • 0.0.69
  • 0.0.7
  • 0.0.70
  • 0.0.71
  • 0.0.72
  • 0.0.73
  • 0.0.74
  • 0.0.75
  • 0.0.76
  • 0.0.78
  • 0.0.79
  • 0.0.8
  • 0.0.80
  • 0.0.81
  • 0.0.82
  • 0.0.83
  • 0.0.84
  • 0.0.85
  • 0.0.86
  • 0.0.87
  • 0.0.88
  • 0.0.89
  • 0.0.9
  • 0.0.90
  • 0.0.91
  • 0.0.92
  • 0.0.93
  • 0.0.94
  • 0.0.95
  • 0.0.96
  • 0.0.97
  • 0.0.98
  • 0.1.0
  • 0.1.1
  • 0.1.10
  • 0.1.11
  • 0.1.12
  • 0.1.13
  • 0.1.14
  • 0.1.15
106 results

Target

Select target project
  • edbaunton/buildgrid
  • BuildGrid/buildgrid
  • bloomberg/buildgrid
  • devcurmudgeon/buildgrid
  • mhadjimichael/buildgrid
  • jmacarthur/buildgrid
  • rkothur/buildgrid
  • valentindavid/buildgrid
  • jjardon/buildgrid
  • RichKen/buildgrid
  • jbonney/buildgrid
  • onsha_alexander/buildgrid
  • santigl/buildgrid
  • mostynb/buildgrid
  • hoffbrinkle/buildgrid
  • Malinskiy/buildgrid
  • coldtom/buildgrid
  • azeemb_a/buildgrid
  • pointswaves/buildgrid
  • BenjaminSchubert/buildgrid
  • michaellee8/buildgrid
  • anil-anil/buildgrid
  • seanborg/buildgrid
  • jdelong12/buildgrid
  • jclay/buildgrid
  • bweston92/buildgrid
  • zchen723/buildgrid
  • cpratt34/buildgrid
  • armbiant/apache-buildgrid
  • armbiant/android-buildgrid
  • itsme300/buildgrid
  • sbairoliya/buildgrid
32 results
Select Git revision
  • coverity
  • master
  • sminskyprimu/blake3
  • sotk/features/lease-audit-table
  • sotk/logstream-testing
  • zchen723/skip-scheduler-metrics
  • 0.0.10
  • 0.0.11
  • 0.0.12
  • 0.0.13
  • 0.0.14
  • 0.0.16
  • 0.0.17
  • 0.0.19
  • 0.0.2
  • 0.0.20
  • 0.0.21
  • 0.0.23
  • 0.0.25
  • 0.0.26
  • 0.0.27
  • 0.0.28
  • 0.0.29
  • 0.0.3
  • 0.0.30
  • 0.0.31
  • 0.0.32
  • 0.0.33
  • 0.0.34
  • 0.0.35
  • 0.0.36
  • 0.0.37
  • 0.0.38
  • 0.0.39
  • 0.0.4
  • 0.0.40
  • 0.0.41
  • 0.0.42
  • 0.0.43
  • 0.0.44
  • 0.0.45
  • 0.0.46
  • 0.0.47
  • 0.0.48
  • 0.0.49
  • 0.0.5
  • 0.0.50
  • 0.0.51
  • 0.0.52
  • 0.0.53
  • 0.0.54
  • 0.0.55
  • 0.0.56
  • 0.0.57
  • 0.0.58
  • 0.0.59
  • 0.0.6
  • 0.0.60
  • 0.0.61
  • 0.0.62
  • 0.0.63
  • 0.0.64
  • 0.0.65
  • 0.0.66
  • 0.0.67
  • 0.0.68
  • 0.0.69
  • 0.0.7
  • 0.0.70
  • 0.0.71
  • 0.0.72
  • 0.0.73
  • 0.0.74
  • 0.0.75
  • 0.0.76
  • 0.0.78
  • 0.0.79
  • 0.0.8
  • 0.0.80
  • 0.0.81
  • 0.0.82
  • 0.0.83
  • 0.0.84
  • 0.0.85
  • 0.0.86
  • 0.0.87
  • 0.0.88
  • 0.0.89
  • 0.0.9
  • 0.0.90
  • 0.0.91
  • 0.0.92
  • 0.0.93
  • 0.0.94
  • 0.0.95
  • 0.0.96
  • 0.0.97
  • 0.0.98
  • 0.1.0
  • 0.1.1
  • 0.1.10
  • 0.1.11
  • 0.1.12
  • 0.1.13
  • 0.1.14
  • 0.1.15
106 results
Show changes
Commits on Source (26)
Showing
with 449 additions and 361 deletions
...@@ -142,13 +142,67 @@ their containing *package*, as such; modules which are entirely private to ...@@ -142,13 +142,67 @@ their containing *package*, as such; modules which are entirely private to
BuildGrid are named as such, e.g. ``_roy.py``. BuildGrid are named as such, e.g. ``_roy.py``.
.. _codebase-testing:
Testing
-------
BuildGrid is using `pytest`_ for regression and newly added code testing. The
test suite contains a serie of unit-tests and also run linting tools in order to
detect coding-style_ breakage. The full test suite is automatically executed by
GitLab CI system for every push to the server. Passing all the tests is a
mandatory requirement for any merge request to the trunk.
.. _pytest: https://docs.pytest.org
Running tests
~~~~~~~~~~~~~
In order to run the entire test suite, simply run:
.. code-block:: sh
python3 setup.py test
You can use the ``--addopt`` function to feed arguments to pytest. For example,
if you want to see the ``stdout`` and ``stderr`` generated y the test, run:
.. code-block:: sh
python3 setup.py test --addopts -s
If you want run a specific test instead of the entire suite use:
.. code-block:: sh
python3 setup.py test --addopts tests/cas/test_client
pyest's `usage documentation section`_ details the different command line
options that can be used when invoking the test runner.
.. _usage documentation section: https://docs.pytest.org/en/latest/usage.html
Test coverage
~~~~~~~~~~~~~
We are doing our best at keeping BuildGrid's test coverage score as high as
possible. Doing so, we ask for any merge request to include necessary test
additions and/or modifications in order to maintain that coverage level. A
detailed `coverage report`_ is produced and publish for any change merged to the
trunk.
.. _coverage report: https://buildgrid.gitlab.io/buildgrid/coverage/
.. _committer-access: .. _committer-access:
Committer access Committer access
---------------- ----------------
We'll hand out commit access to anyone who has successfully landed a single We'll hand out commit access to anyone who has successfully landed a single
patch to the code base. Please request this via irc or the mailing list. patch to the code base. Please request this via Slack or the mailing list.
This of course relies on contributors being responsive and show willingness to This of course relies on contributors being responsive and show willingness to
address problems after landing branches there should not be any problems here. address problems after landing branches there should not be any problems here.
......
.. _about:
About
=====
.. image:: https://gitlab.com/Buildgrid/buildgrid/badges/master/pipeline.svg .. image:: https://gitlab.com/Buildgrid/buildgrid/badges/master/pipeline.svg
:target: https://gitlab.com/BuildStream/buildstream/commits/master :target: https://gitlab.com/BuildStream/buildstream/commits/master
.. image:: https://gitlab.com/BuildGrid/buildgrid/badges/master/coverage.svg?job=coverage .. image:: https://gitlab.com/BuildGrid/buildgrid/badges/master/coverage.svg?job=coverage
:target: https://buildgrid.gitlab.io/buildgrid/coverage :target: https://buildgrid.gitlab.io/buildgrid/coverage
.. _about:
About BuildGrid
===============
.. _what-is-it:
What is BuildGrid?
------------------
BuildGrid is a Python remote execution service which implements Google's BuildGrid is a Python remote execution service which implements Google's
`Remote Execution API`_ and the `Remote Workers API`_. The project's goal is to `Remote Execution API`_ and the `Remote Workers API`_. The project's goal is to
be able to execute build jobs remotely on a grid of computers in order to be able to execute build jobs remotely on a grid of computers in order to
massively speed up build times. Workers on the grid should be able to run with massively speed up build times. Workers on the grid should be able to run with
different environments. It is designed to work with but not exclusively different environments. It is designed to work with clients such as `Bazel`_ and
`BuildStream`_. `BuildStream`_.
.. _Remote Execution API: https://github.com/bazelbuild/remote-apis .. _Remote Execution API: https://github.com/bazelbuild/remote-apis
.. _Remote Workers API: https://docs.google.com/document/d/1s_AzRRD2mdyktKUj2HWBn99rMg_3tcPvdjx3MPbFidU/edit#heading=h.1u2taqr2h940 .. _Remote Workers API: https://docs.google.com/document/d/1s_AzRRD2mdyktKUj2HWBn99rMg_3tcPvdjx3MPbFidU/edit#heading=h.1u2taqr2h940
.. _BuildStream: https://wiki.gnome.org/Projects/BuildStream .. _BuildStream: https://wiki.gnome.org/Projects/BuildStream
.. _Bazel: https://bazel.build
.. _getting-started: .. _getting-started:
...@@ -40,10 +48,15 @@ instructions. ...@@ -40,10 +48,15 @@ instructions.
Resources Resources
--------- ---------
- Homepage: https://buildgrid.build - `Homepage`_
- GitLab repository: https://gitlab.com/BuildGrid/buildgrid - `GitLab repository`_
- Bug tracking: https://gitlab.com/BuildGrid/buildgrid/issues - `Bug tracking`_
- Mailing list: https://lists.buildgrid.build/cgi-bin/mailman/listinfo/buildgrid - `Mailing list`_
- Slack channel: https://buildteamworld.slack.com/messages/CC9MKC203 [`invite link`_] - `Slack channel`_ [`invite link`_]
.. _Homepage: https://buildgrid.build
.. _GitLab repository: https://gitlab.com/BuildGrid/buildgrid
.. _Bug tracking: https://gitlab.com/BuildGrid/buildgrid/issues
.. _Mailing list: https://lists.buildgrid.build/cgi-bin/mailman/listinfo/buildgrid
.. _Slack channel: https://buildteamworld.slack.com/messages/CC9MKC203
.. _invite link: https://join.slack.com/t/buildteamworld/shared_invite/enQtMzkxNzE0MDMyMDY1LTRmZmM1OWE0OTFkMGE1YjU5Njc4ODEzYjc0MGMyOTM5ZTQ5MmE2YTQ1MzQwZDc5MWNhODY1ZmRkZTE4YjFhNjU .. _invite link: https://join.slack.com/t/buildteamworld/shared_invite/enQtMzkxNzE0MDMyMDY1LTRmZmM1OWE0OTFkMGE1YjU5Njc4ODEzYjc0MGMyOTM5ZTQ5MmE2YTQ1MzQwZDc5MWNhODY1ZmRkZTE4YjFhNjU
...@@ -104,7 +104,7 @@ def work_buildbox(context, lease): ...@@ -104,7 +104,7 @@ def work_buildbox(context, lease):
output_tree = _cas_tree_maker(stub_bytestream, output_digest) output_tree = _cas_tree_maker(stub_bytestream, output_digest)
with upload(context.cas_channel) as cas: with upload(context.cas_channel) as cas:
output_tree_digest = cas.send_message(output_tree) output_tree_digest = cas.put_message(output_tree)
output_directory = remote_execution_pb2.OutputDirectory() output_directory = remote_execution_pb2.OutputDirectory()
output_directory.tree_digest.CopyFrom(output_tree_digest) output_directory.tree_digest.CopyFrom(output_tree_digest)
......
...@@ -27,8 +27,9 @@ from urllib.parse import urlparse ...@@ -27,8 +27,9 @@ from urllib.parse import urlparse
import click import click
import grpc import grpc
from buildgrid.utils import merkle_maker, create_digest from buildgrid.client.cas import upload
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid.utils import merkle_maker
from ..cli import pass_context from ..cli import pass_context
...@@ -66,27 +67,31 @@ def cli(context, remote, instance_name, client_key, client_cert, server_cert): ...@@ -66,27 +67,31 @@ def cli(context, remote, instance_name, client_key, client_cert, server_cert):
@cli.command('upload-files', short_help="Upload files to the CAS server.") @cli.command('upload-files', short_help="Upload files to the CAS server.")
@click.argument('files', nargs=-1, type=click.File('rb'), required=True) @click.argument('files', nargs=-1, type=click.Path(exists=True, dir_okay=False), required=True)
@pass_context @pass_context
def upload_files(context, files): def upload_files(context, files):
stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel) sent_digests, file_map = [], {}
with upload(context.channel, instance=context.instance_name) as cas:
for file_path in files:
context.logger.info("Queueing {}".format(file_path))
requests = [] file_digest = cas.upload_file(file_path, queue=True)
for file in files:
chunk = file.read()
requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
digest=create_digest(chunk), data=chunk))
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=context.instance_name, assert file_digest.hash and file_digest.size_bytes
requests=requests)
context.logger.info("Sending: {}".format(request)) file_map[file_digest.hash] = file_path
response = stub.BatchUpdateBlobs(request) sent_digests.append(file_digest)
context.logger.info("Response: {}".format(response))
for file_digest in sent_digests:
file_path = file_map[file_digest.hash]
if file_digest.ByteSize():
context.logger.info("{}: {}".format(file_path, file_digest.hash))
else:
context.logger.info("{}: FAILED".format(file_path))
@cli.command('upload-dir', short_help="Upload a directory to the CAS server.") @cli.command('upload-dir', short_help="Upload a directory to the CAS server.")
@click.argument('directory', nargs=1, type=click.Path(), required=True) @click.argument('directory', nargs=1, type=click.Path(exists=True, file_okay=False), required=True)
@pass_context @pass_context
def upload_dir(context, directory): def upload_dir(context, directory):
context.logger.info("Uploading directory to cas") context.logger.info("Uploading directory to cas")
......
...@@ -30,9 +30,10 @@ from urllib.parse import urlparse ...@@ -30,9 +30,10 @@ from urllib.parse import urlparse
import click import click
import grpc import grpc
from buildgrid.utils import merkle_maker, create_digest, write_fetch_blob from buildgrid.client.cas import upload
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid._protos.google.bytestream import bytestream_pb2_grpc from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
from buildgrid.utils import merkle_maker, write_fetch_blob
from ..cli import pass_context from ..cli import pass_context
...@@ -85,7 +86,7 @@ def request_dummy(context, number, wait_for_completion): ...@@ -85,7 +86,7 @@ def request_dummy(context, number, wait_for_completion):
action_digest=action_digest, action_digest=action_digest,
skip_cache_lookup=True) skip_cache_lookup=True)
responses = list() responses = []
for _ in range(0, number): for _ in range(0, number):
responses.append(stub.Execute(request)) responses.append(stub.Execute(request))
...@@ -119,46 +120,37 @@ def wait_execution(context, operation_name): ...@@ -119,46 +120,37 @@ def wait_execution(context, operation_name):
@click.argument('input-root', nargs=1, type=click.Path(), required=True) @click.argument('input-root', nargs=1, type=click.Path(), required=True)
@click.argument('commands', nargs=-1, type=click.STRING, required=True) @click.argument('commands', nargs=-1, type=click.STRING, required=True)
@pass_context @pass_context
def command(context, input_root, commands, output_file, output_directory): def run_command(context, input_root, commands, output_file, output_directory):
stub = remote_execution_pb2_grpc.ExecutionStub(context.channel) stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
execute_command = remote_execution_pb2.Command() output_executeables = []
with upload(context.channel, instance=context.instance_name) as cas:
command = remote_execution_pb2.Command()
for arg in commands: for arg in commands:
execute_command.arguments.extend([arg]) command.arguments.extend([arg])
output_executeables = []
for file, is_executeable in output_file: for file, is_executeable in output_file:
execute_command.output_files.extend([file]) command.output_files.extend([file])
if is_executeable: if is_executeable:
output_executeables.append(file) output_executeables.append(file)
command_digest = create_digest(execute_command.SerializeToString()) command_digest = cas.put_message(command, queue=True)
context.logger.info(command_digest)
context.logger.info('Sent command: {}'.format(command_digest))
# TODO: Check for missing blobs # TODO: Check for missing blobs
digest = None input_root_digest = None
for _, digest in merkle_maker(input_root): for _, input_root_digest in merkle_maker(input_root):
pass pass
action = remote_execution_pb2.Action(command_digest=command_digest, action = remote_execution_pb2.Action(command_digest=command_digest,
input_root_digest=digest, input_root_digest=input_root_digest,
do_not_cache=True) do_not_cache=True)
action_digest = create_digest(action.SerializeToString()) action_digest = cas.put_message(action, queue=True)
context.logger.info("Sending execution request...")
requests = []
requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
digest=command_digest, data=execute_command.SerializeToString()))
requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
digest=action_digest, data=action.SerializeToString()))
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=context.instance_name, context.logger.info("Sent action: {}".format(action_digest))
requests=requests)
remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel).BatchUpdateBlobs(request)
request = remote_execution_pb2.ExecuteRequest(instance_name=context.instance_name, request = remote_execution_pb2.ExecuteRequest(instance_name=context.instance_name,
action_digest=action_digest, action_digest=action_digest,
......
...@@ -26,14 +26,10 @@ import sys ...@@ -26,14 +26,10 @@ import sys
import click import click
from buildgrid.server.controller import ExecutionController from buildgrid.server.instance import BuildGridServer
from buildgrid.server.actioncache.storage import ActionCache
from buildgrid.server.cas.instance import ByteStreamInstance, ContentAddressableStorageInstance
from buildgrid.server.referencestorage.storage import ReferenceCache
from ..cli import pass_context from ..cli import pass_context
from ..settings import parser from ..settings import parser
from ..server import BuildGridServer
@click.group(name='server', short_help="Start a local server instance.") @click.group(name='server', short_help="Start a local server instance.")
...@@ -50,58 +46,12 @@ def start(context, config): ...@@ -50,58 +46,12 @@ def start(context, config):
settings = parser.get_parser().safe_load(f) settings = parser.get_parser().safe_load(f)
try: try:
server_settings = settings['server'] server = _create_server_from_config(settings)
insecure_mode = server_settings['insecure-mode']
credentials = None
if not insecure_mode:
credential_settings = server_settings['credentials']
server_key = credential_settings['tls-server-key']
server_cert = credential_settings['tls-server-cert']
client_certs = credential_settings['tls-client-certs']
credentials = context.load_server_credentials(server_key, server_cert, client_certs)
if not credentials:
click.echo("ERROR: no TLS keys were specified and no defaults could be found.\n" +
"Set `insecure-mode: false` in order to deactivate TLS encryption.\n", err=True)
sys.exit(-1)
port = server_settings['port']
instances = settings['instances']
execution_controllers = _instance_maker(instances, ExecutionController)
execution_instances = {}
bots_interfaces = {}
operations_instances = {}
# TODO: map properly in parser
# Issue 82
for k, v in execution_controllers.items():
execution_instances[k] = v.execution_instance
bots_interfaces[k] = v.bots_interface
operations_instances[k] = v.operations_instance
reference_caches = _instance_maker(instances, ReferenceCache)
action_caches = _instance_maker(instances, ActionCache)
cas = _instance_maker(instances, ContentAddressableStorageInstance)
bytestreams = _instance_maker(instances, ByteStreamInstance)
except KeyError as e: except KeyError as e:
click.echo("ERROR: Could not parse config: {}.\n".format(str(e)), err=True) click.echo("ERROR: Could not parse config: {}.\n".format(str(e)), err=True)
sys.exit(-1) sys.exit(-1)
server = BuildGridServer(port=port,
credentials=credentials,
execution_instances=execution_instances,
bots_interfaces=bots_interfaces,
operations_instances=operations_instances,
reference_storage_instances=reference_caches,
action_cache_instances=action_caches,
cas_instances=cas,
bytestream_instances=bytestreams)
context.logger.info("Starting server on port {}".format(port))
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
try: try:
server.start() server.start()
...@@ -116,15 +66,25 @@ def start(context, config): ...@@ -116,15 +66,25 @@ def start(context, config):
loop.close() loop.close()
# Turn away now if you want to keep your eyes def _create_server_from_config(config):
def _instance_maker(instances, service_type): server_settings = config['server']
# TODO get this mapped in parser
made = {}
server = BuildGridServer()
try:
for channel in server_settings:
server.add_port(channel.address, channel.credentials)
except (AttributeError, TypeError) as e:
click.echo("Error: Use list of `!channel` tags: {}.\n".format(e), err=True)
sys.exit(-1)
instances = config['instances']
for instance in instances: for instance in instances:
services = instance['services']
instance_name = instance['name'] instance_name = instance['name']
services = instance['services']
for service in services: for service in services:
if isinstance(service, service_type): service.register_instance_with_server(instance_name, server)
made[instance_name] = service
return made return server
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
BuildGridServer
==============
Creates a BuildGrid server, binding all the requisite service instances together.
"""
import logging
from concurrent import futures
import grpc
from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
from buildgrid.server.actioncache.service import ActionCacheService
from buildgrid.server.execution.service import ExecutionService
from buildgrid.server.operations.service import OperationsService
from buildgrid.server.bots.service import BotsService
from buildgrid.server.referencestorage.service import ReferenceStorageService
class BuildGridServer:
def __init__(self, port=50051, max_workers=10, credentials=None,
execution_instances=None, bots_interfaces=None, operations_instances=None,
operations_service_instances=None, reference_storage_instances=None,
action_cache_instances=None, cas_instances=None, bytestream_instances=None):
self.logger = logging.getLogger(__name__)
address = '[::]:{0}'.format(port)
server = grpc.server(futures.ThreadPoolExecutor(max_workers))
if credentials is not None:
self.logger.info("Secure connection")
server.add_secure_port(address, credentials)
else:
self.logger.info("Insecure connection")
server.add_insecure_port(address)
if execution_instances:
self.logger.debug("Adding execution instances {}".format(
execution_instances.keys()))
ExecutionService(server, execution_instances)
if bots_interfaces:
self.logger.debug("Adding bots interfaces {}".format(
bots_interfaces.keys()))
BotsService(server, bots_interfaces)
if operations_instances:
self.logger.debug("Adding operations instances {}".format(
operations_instances.keys()))
OperationsService(server, operations_instances)
if reference_storage_instances:
self.logger.debug("Adding reference storages {}".format(
reference_storage_instances.keys()))
ReferenceStorageService(server, reference_storage_instances)
if action_cache_instances:
self.logger.debug("Adding action cache instances {}".format(
action_cache_instances.keys()))
ActionCacheService(server, action_cache_instances)
if cas_instances:
self.logger.debug("Adding cas instances {}".format(
cas_instances.keys()))
ContentAddressableStorageService(server, cas_instances)
if bytestream_instances:
self.logger.debug("Adding bytestream instances {}".format(
bytestream_instances.keys()))
ByteStreamService(server, bytestream_instances)
self._server = server
def start(self):
self._server.start()
def stop(self):
self._server.stop(grace=0)
server: server:
port: 50052 - !channel
insecure-mode: true port: 50051
credentials: insecure_mode: true
tls-server-key: null # credentials:
tls-server-cert: null # tls-server-key: null
tls-client-certs: null # tls-server-cert: null
# tls-client-certs: null
description: | description: |
Just a CAS with some reference storage. Just a CAS with some reference storage.
......
server: server:
- !channel
port: 50051 port: 50051
insecure-mode: true insecure_mode: true
credentials: # credentials:
tls-server-key: null # tls-server-key: null
tls-server-cert: null # tls-server-cert: null
tls-client-certs: null # tls-client-certs: null
description: | description: |
A single default instance A single default instance
......
...@@ -37,10 +37,37 @@ from ..cli import Context ...@@ -37,10 +37,37 @@ from ..cli import Context
class YamlFactory(yaml.YAMLObject): class YamlFactory(yaml.YAMLObject):
@classmethod @classmethod
def from_yaml(cls, loader, node): def from_yaml(cls, loader, node):
if isinstance(node, yaml.ScalarNode):
value = loader.construct_scalar(node)
return cls(value)
else:
values = loader.construct_mapping(node, deep=True) values = loader.construct_mapping(node, deep=True)
return cls(**values) return cls(**values)
class Channel(YamlFactory):
yaml_tag = u'!channel'
def __init__(self, port, insecure_mode, credentials=None):
self.address = '[::]:{0}'.format(port)
self.credentials = None
context = Context()
if not insecure_mode:
server_key = credentials['tls-server-key']
server_cert = credentials['tls-server-cert']
client_certs = credentials['tls-client-certs']
self.credentials = context.load_server_credentials(server_key, server_cert, client_certs)
if not credentials:
click.echo("ERROR: no TLS keys were specified and no defaults could be found.\n" +
"Set `insecure-mode: false` in order to deactivate TLS encryption.\n", err=True)
sys.exit(-1)
class Disk(YamlFactory): class Disk(YamlFactory):
yaml_tag = u'!disk-storage' yaml_tag = u'!disk-storage'
...@@ -169,6 +196,7 @@ def _parse_size(size): ...@@ -169,6 +196,7 @@ def _parse_size(size):
def get_parser(): def get_parser():
yaml.SafeLoader.add_constructor(Channel.yaml_tag, Channel.from_yaml)
yaml.SafeLoader.add_constructor(Execution.yaml_tag, Execution.from_yaml) yaml.SafeLoader.add_constructor(Execution.yaml_tag, Execution.from_yaml)
yaml.SafeLoader.add_constructor(Action.yaml_tag, Action.from_yaml) yaml.SafeLoader.add_constructor(Action.yaml_tag, Action.from_yaml)
yaml.SafeLoader.add_constructor(Reference.yaml_tag, Reference.from_yaml) yaml.SafeLoader.add_constructor(Reference.yaml_tag, Reference.from_yaml)
......
server: server:
- !channel
port: 50051 port: 50051
insecure-mode: true insecure_mode: true
credentials: # credentials:
tls-server-key: null # tls-server-key: null
tls-server-cert: null # tls-server-cert: null
tls-client-certs: null # tls-client-certs: null
description: | description: |
A single default instance with remote storage. A single default instance with remote storage.
......
...@@ -99,13 +99,13 @@ class BotSession: ...@@ -99,13 +99,13 @@ class BotSession:
session = self._interface.create_bot_session(self._parent, self.get_pb2()) session = self._interface.create_bot_session(self._parent, self.get_pb2())
self._name = session.name self._name = session.name
self.logger.info("Created bot session with name: {}".format(self._name)) self.logger.info("Created bot session with name: [{}]".format(self._name))
for lease in session.leases: for lease in session.leases:
self._update_lease_from_server(lease) self._update_lease_from_server(lease)
def update_bot_session(self): def update_bot_session(self):
self.logger.debug("Updating bot session: {}".format(self._bot_id)) self.logger.debug("Updating bot session: [{}]".format(self._bot_id))
session = self._interface.update_bot_session(self.get_pb2()) session = self._interface.update_bot_session(self.get_pb2())
for k, v in list(self._leases.items()): for k, v in list(self._leases.items()):
if v.state == LeaseState.COMPLETED.value: if v.state == LeaseState.COMPLETED.value:
...@@ -141,12 +141,12 @@ class BotSession: ...@@ -141,12 +141,12 @@ class BotSession:
asyncio.ensure_future(self.create_work(lease)) asyncio.ensure_future(self.create_work(lease))
async def create_work(self, lease): async def create_work(self, lease):
self.logger.debug("Work created: {}".format(lease.id)) self.logger.debug("Work created: [{}]".format(lease.id))
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
lease = await loop.run_in_executor(None, self._work, self._context, lease) lease = await loop.run_in_executor(None, self._work, self._context, lease)
self.logger.debug("Work complete: {}".format(lease.id)) self.logger.debug("Work complete: [{}]".format(lease.id))
self.lease_completed(lease) self.lease_completed(lease)
...@@ -161,14 +161,14 @@ class Worker: ...@@ -161,14 +161,14 @@ class Worker:
if k == 'pool': if k == 'pool':
self.properties[k] = v self.properties[k] = v
else: else:
raise KeyError('Key not supported: {}'.format(k)) raise KeyError('Key not supported: [{}]'.format(k))
if configs: if configs:
for k, v in configs.items(): for k, v in configs.items():
if k == 'DockerImage': if k == 'DockerImage':
self.configs[k] = v self.configs[k] = v
else: else:
raise KeyError('Key not supported: {}'.format(k)) raise KeyError('Key not supported: [{}]'.format(k))
@property @property
def configs(self): def configs(self):
...@@ -214,11 +214,11 @@ class Device: ...@@ -214,11 +214,11 @@ class Device:
elif k == 'docker': elif k == 'docker':
if v not in ('True', 'False'): if v not in ('True', 'False'):
raise ValueError('Value not supported: {}'.format(v)) raise ValueError('Value not supported: [{}]'.format(v))
self._properties[k] = v self._properties[k] = v
else: else:
raise KeyError('Key not supported: {}'.format(k)) raise KeyError('Key not supported: [{}]'.format(k))
@property @property
def name(self): def name(self):
......
...@@ -17,9 +17,29 @@ from contextlib import contextmanager ...@@ -17,9 +17,29 @@ from contextlib import contextmanager
import uuid import uuid
import os import os
from buildgrid.settings import HASH import grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
from buildgrid._protos.google.rpc import code_pb2
from buildgrid.settings import HASH
class CallCache:
"""Per remote grpc.StatusCode.UNIMPLEMENTED call cache."""
__calls = {}
@classmethod
def mark_unimplemented(cls, channel, name):
if channel not in cls.__calls:
cls.__calls[channel] = set()
cls.__calls[channel].add(name)
@classmethod
def unimplemented(cls, channel, name):
if channel not in cls.__calls:
return False
return name in cls.__calls[channel]
@contextmanager @contextmanager
...@@ -28,7 +48,7 @@ def upload(channel, instance=None, u_uid=None): ...@@ -28,7 +48,7 @@ def upload(channel, instance=None, u_uid=None):
try: try:
yield uploader yield uploader
finally: finally:
uploader.flush() uploader.close()
class Uploader: class Uploader:
...@@ -47,6 +67,7 @@ class Uploader: ...@@ -47,6 +67,7 @@ class Uploader:
FILE_SIZE_THRESHOLD = 1 * 1024 * 1024 FILE_SIZE_THRESHOLD = 1 * 1024 * 1024
MAX_REQUEST_SIZE = 2 * 1024 * 1024 MAX_REQUEST_SIZE = 2 * 1024 * 1024
MAX_REQUEST_COUNT = 500
def __init__(self, channel, instance=None, u_uid=None): def __init__(self, channel, instance=None, u_uid=None):
"""Initializes a new :class:`Uploader` instance. """Initializes a new :class:`Uploader` instance.
...@@ -67,9 +88,62 @@ class Uploader: ...@@ -67,9 +88,62 @@ class Uploader:
self.__bytestream_stub = bytestream_pb2_grpc.ByteStreamStub(self.channel) self.__bytestream_stub = bytestream_pb2_grpc.ByteStreamStub(self.channel)
self.__cas_stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel) self.__cas_stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
self.__requests = dict() self.__requests = {}
self.__request_count = 0
self.__request_size = 0 self.__request_size = 0
# --- Public API ---
def put_blob(self, blob, digest=None, queue=False):
"""Stores a blob into the remote CAS server.
If queuing is allowed (`queue=True`), the upload request **may** be
defer. An explicit call to :method:`flush` can force the request to be
send immediately (along with the rest of the queued batch).
Args:
blob (bytes): the blob's data.
digest (:obj:`Digest`, optional): the blob's digest.
queue (bool, optional): whether or not the upload request may be
queued and submitted as part of a batch upload request. Defaults
to False.
Returns:
:obj:`Digest`: the sent blob's digest.
"""
if not queue or len(blob) > Uploader.FILE_SIZE_THRESHOLD:
blob_digest = self._send_blob(blob, digest=digest)
else:
blob_digest = self._queue_blob(blob, digest=digest)
return blob_digest
def put_message(self, message, digest=None, queue=False):
"""Stores a message into the remote CAS server.
If queuing is allowed (`queue=True`), the upload request **may** be
defer. An explicit call to :method:`flush` can force the request to be
send immediately (along with the rest of the queued batch).
Args:
message (:obj:`Message`): the message object.
digest (:obj:`Digest`, optional): the message's digest.
queue (bool, optional): whether or not the upload request may be
queued and submitted as part of a batch upload request. Defaults
to False.
Returns:
:obj:`Digest`: the sent message's digest.
"""
message_blob = message.SerializeToString()
if not queue or len(message_blob) > Uploader.FILE_SIZE_THRESHOLD:
message_digest = self._send_blob(message_blob, digest=digest)
else:
message_digest = self._queue_blob(message_blob, digest=digest)
return message_digest
def upload_file(self, file_path, queue=True): def upload_file(self, file_path, queue=True):
"""Stores a local file into the remote CAS storage. """Stores a local file into the remote CAS storage.
...@@ -79,7 +153,7 @@ class Uploader: ...@@ -79,7 +153,7 @@ class Uploader:
Args: Args:
file_path (str): absolute or relative path to a local file. file_path (str): absolute or relative path to a local file.
queue (bool, optional): wheter or not the upload request may be queue (bool, optional): whether or not the upload request may be
queued and submitted as part of a batch upload request. Defaults queued and submitted as part of a batch upload request. Defaults
to True. to True.
...@@ -96,11 +170,11 @@ class Uploader: ...@@ -96,11 +170,11 @@ class Uploader:
file_bytes = bytes_steam.read() file_bytes = bytes_steam.read()
if not queue or len(file_bytes) > Uploader.FILE_SIZE_THRESHOLD: if not queue or len(file_bytes) > Uploader.FILE_SIZE_THRESHOLD:
blob_digest = self._send_blob(file_bytes) file_digest = self._send_blob(file_bytes)
else: else:
blob_digest = self._queue_blob(file_bytes) file_digest = self._queue_blob(file_bytes)
return blob_digest return file_digest
def upload_directory(self, directory, queue=True): def upload_directory(self, directory, queue=True):
"""Stores a :obj:`Directory` into the remote CAS storage. """Stores a :obj:`Directory` into the remote CAS storage.
...@@ -126,50 +200,37 @@ class Uploader: ...@@ -126,50 +200,37 @@ class Uploader:
else: else:
return self._queue_blob(directory.SerializeToString()) return self._queue_blob(directory.SerializeToString())
def send_message(self, message):
"""Stores a message into the remote CAS storage.
Args:
message (:obj:`Message`): a protobuf message object.
Returns:
:obj:`Digest`: The digest of the message.
"""
return self._send_blob(message.SerializeToString())
def flush(self): def flush(self):
"""Ensures any queued request gets sent.""" """Ensures any queued request gets sent."""
if self.__requests: if self.__requests:
self._send_batch() self._send_blob_batch(self.__requests)
def _queue_blob(self, blob):
"""Queues a memory block for later batch upload"""
blob_digest = remote_execution_pb2.Digest()
blob_digest.hash = HASH(blob).hexdigest()
blob_digest.size_bytes = len(blob)
if self.__request_size + len(blob) > Uploader.MAX_REQUEST_SIZE: self.__requests.clear()
self._send_batch() self.__request_count = 0
self.__request_size = 0
update_request = remote_execution_pb2.BatchUpdateBlobsRequest.Request() def close(self):
update_request.digest.CopyFrom(blob_digest) """Closes the underlying connection stubs.
update_request.data = blob
update_request_size = update_request.ByteSize() Note:
if self.__request_size + update_request_size > Uploader.MAX_REQUEST_SIZE: This will always send pending requests before closing connections,
self._send_batch() if any.
"""
self.flush()
self.__requests[update_request.digest.hash] = update_request self.__bytestream_stub = None
self.__request_size += update_request_size self.__cas_stub = None
return blob_digest # --- Private API ---
def _send_blob(self, blob): def _send_blob(self, blob, digest=None):
"""Sends a memory block using ByteStream.Write()""" """Sends a memory block using ByteStream.Write()"""
blob_digest = remote_execution_pb2.Digest() blob_digest = remote_execution_pb2.Digest()
if digest is not None:
blob_digest.CopyFrom(digest)
else:
blob_digest.hash = HASH(blob).hexdigest() blob_digest.hash = HASH(blob).hexdigest()
blob_digest.size_bytes = len(blob) blob_digest.size_bytes = len(blob)
if self.instance_name is not None: if self.instance_name is not None:
resource_name = '/'.join([self.instance_name, 'uploads', self.u_uid, 'blobs', resource_name = '/'.join([self.instance_name, 'uploads', self.u_uid, 'blobs',
blob_digest.hash, str(blob_digest.size_bytes)]) blob_digest.hash, str(blob_digest.size_bytes)])
...@@ -204,18 +265,64 @@ class Uploader: ...@@ -204,18 +265,64 @@ class Uploader:
return blob_digest return blob_digest
def _send_batch(self): def _queue_blob(self, blob, digest=None):
"""Queues a memory block for later batch upload"""
blob_digest = remote_execution_pb2.Digest()
if digest is not None:
blob_digest.CopyFrom(digest)
else:
blob_digest.hash = HASH(blob).hexdigest()
blob_digest.size_bytes = len(blob)
if self.__request_size + blob_digest.size_bytes > Uploader.MAX_REQUEST_SIZE:
self.flush()
elif self.__request_count >= Uploader.MAX_REQUEST_COUNT:
self.flush()
self.__requests[blob_digest.hash] = (blob, blob_digest)
self.__request_count += 1
self.__request_size += blob_digest.size_bytes
return blob_digest
def _send_blob_batch(self, batch):
"""Sends queued data using ContentAddressableStorage.BatchUpdateBlobs()""" """Sends queued data using ContentAddressableStorage.BatchUpdateBlobs()"""
batch_fetched = False
written_digests = []
# First, try BatchUpdateBlobs(), if not already known not being implemented:
if not CallCache.unimplemented(self.channel, 'BatchUpdateBlobs'):
batch_request = remote_execution_pb2.BatchUpdateBlobsRequest() batch_request = remote_execution_pb2.BatchUpdateBlobsRequest()
batch_request.requests.extend(self.__requests.values())
if self.instance_name is not None: if self.instance_name is not None:
batch_request.instance_name = self.instance_name batch_request.instance_name = self.instance_name
batch_response = self.__cas_stub.BatchUpdateBlobs(batch_request) for blob, digest in batch.values():
request = batch_request.requests.add()
request.digest.CopyFrom(digest)
request.data = blob
try:
batch_response = self.__cas_stub.BatchUpdateBlobs(batch_request)
for response in batch_response.responses: for response in batch_response.responses:
assert response.digest.hash in self.__requests assert response.digest.hash in batch
assert response.status.code is 0
self.__requests.clear() written_digests.append(response.digest)
self.__request_size = 0 if response.status.code != code_pb2.OK:
response.digest.Clear()
batch_fetched = True
except grpc.RpcError as e:
status_code = e.code()
if status_code == grpc.StatusCode.UNIMPLEMENTED:
CallCache.mark_unimplemented(self.channel, 'BatchUpdateBlobs')
else:
assert False
# Fallback to Write() if no BatchUpdateBlobs():
if not batch_fetched:
for blob, digest in batch.values():
written_digests.append(self._send_blob(blob, digest=digest))
return written_digests
...@@ -32,13 +32,16 @@ from .._exceptions import InvalidArgumentError, NotFoundError ...@@ -32,13 +32,16 @@ from .._exceptions import InvalidArgumentError, NotFoundError
class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer): class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer):
def __init__(self, server, instances): def __init__(self, server):
self._instances = instances
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self._instances = {}
remote_execution_pb2_grpc.add_ActionCacheServicer_to_server(self, server) remote_execution_pb2_grpc.add_ActionCacheServicer_to_server(self, server)
def add_instance(self, name, instance):
self._instances[name] = instance
def GetActionResult(self, request, context): def GetActionResult(self, request, context):
try: try:
instance = self._get_instance(request.instance_name) instance = self._get_instance(request.instance_name)
...@@ -77,4 +80,4 @@ class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer): ...@@ -77,4 +80,4 @@ class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer):
return self._instances[instance_name] return self._instances[instance_name]
except KeyError: except KeyError:
raise InvalidArgumentError("Invalid instance name: {}".format(instance_name)) raise InvalidArgumentError("Invalid instance name: [{}]".format(instance_name))
...@@ -26,6 +26,9 @@ from ..referencestorage.storage import ReferenceCache ...@@ -26,6 +26,9 @@ from ..referencestorage.storage import ReferenceCache
class ActionCache(ReferenceCache): class ActionCache(ReferenceCache):
def register_instance_with_server(self, instance_name, server):
server.add_action_cache_instance(self, instance_name)
def get_action_result(self, action_digest): def get_action_result(self, action_digest):
key = self._get_key(action_digest) key = self._get_key(action_digest)
return self.get_action_reference(key) return self.get_action_reference(key)
......
...@@ -36,6 +36,9 @@ class BotsInterface: ...@@ -36,6 +36,9 @@ class BotsInterface:
self._bot_sessions = {} self._bot_sessions = {}
self._scheduler = scheduler self._scheduler = scheduler
def register_instance_with_server(self, instance_name, server):
server.add_bots_interface(self, instance_name)
def create_bot_session(self, parent, bot_session): def create_bot_session(self, parent, bot_session):
""" Creates a new bot session. Server should assign a unique """ Creates a new bot session. Server should assign a unique
name to the session. If a bot with the same bot id tries to name to the session. If a bot with the same bot id tries to
...@@ -60,7 +63,7 @@ class BotsInterface: ...@@ -60,7 +63,7 @@ class BotsInterface:
self._bot_ids[name] = bot_id self._bot_ids[name] = bot_id
self._bot_sessions[name] = bot_session self._bot_sessions[name] = bot_session
self.logger.info("Created bot session name={} with bot_id={}".format(name, bot_id)) self.logger.info("Created bot session name=[{}] with bot_id=[{}]".format(name, bot_id))
for lease in self._scheduler.create_leases(): for lease in self._scheduler.create_leases():
bot_session.leases.extend([lease]) bot_session.leases.extend([lease])
...@@ -92,7 +95,7 @@ class BotsInterface: ...@@ -92,7 +95,7 @@ class BotsInterface:
try: try:
server_lease = self._scheduler.get_job_lease(client_lease.id) server_lease = self._scheduler.get_job_lease(client_lease.id)
except KeyError: except KeyError:
raise InvalidArgumentError("Lease not found on server: {}".format(client_lease)) raise InvalidArgumentError("Lease not found on server: [{}]".format(client_lease))
server_state = LeaseState(server_lease.state) server_state = LeaseState(server_lease.state)
client_state = LeaseState(client_lease.state) client_state = LeaseState(client_lease.state)
...@@ -105,7 +108,7 @@ class BotsInterface: ...@@ -105,7 +108,7 @@ class BotsInterface:
# TODO: Lease was rejected # TODO: Lease was rejected
raise NotImplementedError("'Not Accepted' is unsupported") raise NotImplementedError("'Not Accepted' is unsupported")
else: else:
raise OutofSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease)) raise OutofSyncError("Server lease: [{}]. Client lease: [{}]".format(server_lease, client_lease))
elif server_state == LeaseState.ACTIVE: elif server_state == LeaseState.ACTIVE:
...@@ -118,10 +121,10 @@ class BotsInterface: ...@@ -118,10 +121,10 @@ class BotsInterface:
return None return None
else: else:
raise OutofSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease)) raise OutofSyncError("Server lease: [{}]. Client lease: [{}]".format(server_lease, client_lease))
elif server_state == LeaseState.COMPLETED: elif server_state == LeaseState.COMPLETED:
raise OutofSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease)) raise OutofSyncError("Server lease: [{}]. Client lease: [{}]".format(server_lease, client_lease))
elif server_state == LeaseState.CANCELLED: elif server_state == LeaseState.CANCELLED:
raise NotImplementedError("Cancelled states not supported yet") raise NotImplementedError("Cancelled states not supported yet")
...@@ -138,19 +141,19 @@ class BotsInterface: ...@@ -138,19 +141,19 @@ class BotsInterface:
if name is not None: if name is not None:
_bot_id = self._bot_ids.get(name) _bot_id = self._bot_ids.get(name)
if _bot_id is None: if _bot_id is None:
raise InvalidArgumentError('Name not registered on server: {}'.format(name)) raise InvalidArgumentError('Name not registered on server: [{}]'.format(name))
elif _bot_id != bot_id: elif _bot_id != bot_id:
self._close_bot_session(name) self._close_bot_session(name)
raise InvalidArgumentError( raise InvalidArgumentError(
'Bot id invalid. ID sent: {} with name: {}.' 'Bot id invalid. ID sent: [{}] with name: [{}].'
'ID registered: {} for that name'.format(bot_id, name, _bot_id)) 'ID registered: [{}] for that name'.format(bot_id, name, _bot_id))
else: else:
for _name, _bot_id in self._bot_ids.items(): for _name, _bot_id in self._bot_ids.items():
if bot_id == _bot_id: if bot_id == _bot_id:
self._close_bot_session(_name) self._close_bot_session(_name)
raise InvalidArgumentError( raise InvalidArgumentError(
'Bot id already registered. ID sent: {}.' 'Bot id already registered. ID sent: [{}].'
'Id registered: {} with name: {}'.format(bot_id, _bot_id, _name)) 'Id registered: [{}] with name: [{}]'.format(bot_id, _bot_id, _name))
def _close_bot_session(self, name): def _close_bot_session(self, name):
""" Before removing the session, close any leases and """ Before removing the session, close any leases and
...@@ -159,14 +162,14 @@ class BotsInterface: ...@@ -159,14 +162,14 @@ class BotsInterface:
bot_id = self._bot_ids.get(name) bot_id = self._bot_ids.get(name)
if bot_id is None: if bot_id is None:
raise InvalidArgumentError("Bot id does not exist: {}".format(name)) raise InvalidArgumentError("Bot id does not exist: [{}]".format(name))
self.logger.debug("Attempting to close {} with name: {}".format(bot_id, name)) self.logger.debug("Attempting to close [{}] with name: [{}]".format(bot_id, name))
for lease in self._bot_sessions[name].leases: for lease in self._bot_sessions[name].leases:
if lease.state != LeaseState.COMPLETED.value: if lease.state != LeaseState.COMPLETED.value:
# TODO: Be wary here, may need to handle rejected leases in future # TODO: Be wary here, may need to handle rejected leases in future
self._scheduler.retry_job(lease.id) self._scheduler.retry_job(lease.id)
self.logger.debug("Closing bot session: {}".format(name)) self.logger.debug("Closing bot session: [{}]".format(name))
self._bot_ids.pop(name) self._bot_ids.pop(name)
self.logger.info("Closed bot {} with name: {}".format(bot_id, name)) self.logger.info("Closed bot [{}] with name: [{}]".format(bot_id, name))
...@@ -33,12 +33,16 @@ from .._exceptions import InvalidArgumentError, OutofSyncError ...@@ -33,12 +33,16 @@ from .._exceptions import InvalidArgumentError, OutofSyncError
class BotsService(bots_pb2_grpc.BotsServicer): class BotsService(bots_pb2_grpc.BotsServicer):
def __init__(self, server, instances): def __init__(self, server):
self._instances = instances
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self._instances = {}
bots_pb2_grpc.add_BotsServicer_to_server(self, server) bots_pb2_grpc.add_BotsServicer_to_server(self, server)
def add_instance(self, name, instance):
self._instances[name] = instance
def CreateBotSession(self, request, context): def CreateBotSession(self, request, context):
try: try:
parent = request.parent parent = request.parent
...@@ -90,4 +94,4 @@ class BotsService(bots_pb2_grpc.BotsServicer): ...@@ -90,4 +94,4 @@ class BotsService(bots_pb2_grpc.BotsServicer):
return self._instances[name] return self._instances[name]
except KeyError: except KeyError:
raise InvalidArgumentError("Instance doesn't exist on server: {}".format(name)) raise InvalidArgumentError("Instance doesn't exist on server: [{}]".format(name))
...@@ -31,6 +31,9 @@ class ContentAddressableStorageInstance: ...@@ -31,6 +31,9 @@ class ContentAddressableStorageInstance:
def __init__(self, storage): def __init__(self, storage):
self._storage = storage self._storage = storage
def register_instance_with_server(self, instance_name, server):
server.add_cas_instance(self, instance_name)
def find_missing_blobs(self, blob_digests): def find_missing_blobs(self, blob_digests):
storage = self._storage storage = self._storage
return re_pb2.FindMissingBlobsResponse( return re_pb2.FindMissingBlobsResponse(
...@@ -60,6 +63,9 @@ class ByteStreamInstance: ...@@ -60,6 +63,9 @@ class ByteStreamInstance:
def __init__(self, storage): def __init__(self, storage):
self._storage = storage self._storage = storage
def register_instance_with_server(self, instance_name, server):
server.add_bytestream_instance(self, instance_name)
def read(self, path, read_offset, read_limit): def read(self, path, read_offset, read_limit):
storage = self._storage storage = self._storage
......
...@@ -35,12 +35,16 @@ from .._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError ...@@ -35,12 +35,16 @@ from .._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressableStorageServicer): class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
def __init__(self, server, instances): def __init__(self, server):
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self._instances = instances
self._instances = {}
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(self, server) remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(self, server)
def add_instance(self, name, instance):
self._instances[name] = instance
def FindMissingBlobs(self, request, context): def FindMissingBlobs(self, request, context):
try: try:
instance = self._get_instance(request.instance_name) instance = self._get_instance(request.instance_name)
...@@ -70,17 +74,21 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa ...@@ -70,17 +74,21 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa
return self._instances[instance_name] return self._instances[instance_name]
except KeyError: except KeyError:
raise InvalidArgumentError("Invalid instance name: {}".format(instance_name)) raise InvalidArgumentError("Invalid instance name: [{}]".format(instance_name))
class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer): class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
def __init__(self, server, instances): def __init__(self, server):
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self._instances = instances
self._instances = {}
bytestream_pb2_grpc.add_ByteStreamServicer_to_server(self, server) bytestream_pb2_grpc.add_ByteStreamServicer_to_server(self, server)
def add_instance(self, name, instance):
self._instances[name] = instance
def Read(self, request, context): def Read(self, request, context):
try: try:
path = request.resource_name.split("/") path = request.resource_name.split("/")
...@@ -89,15 +97,15 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer): ...@@ -89,15 +97,15 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
# TODO: Decide on default instance name # TODO: Decide on default instance name
if path[0] == "blobs": if path[0] == "blobs":
if len(path) < 3 or not path[2].isdigit(): if len(path) < 3 or not path[2].isdigit():
raise InvalidArgumentError("Invalid resource name: {}".format(request.resource_name)) raise InvalidArgumentError("Invalid resource name: [{}]".format(request.resource_name))
instance_name = "" instance_name = ""
elif path[1] == "blobs": elif path[1] == "blobs":
if len(path) < 4 or not path[3].isdigit(): if len(path) < 4 or not path[3].isdigit():
raise InvalidArgumentError("Invalid resource name: {}".format(request.resource_name)) raise InvalidArgumentError("Invalid resource name: [{}]".format(request.resource_name))
else: else:
raise InvalidArgumentError("Invalid resource name: {}".format(request.resource_name)) raise InvalidArgumentError("Invalid resource name: [{}]".format(request.resource_name))
instance = self._get_instance(instance_name) instance = self._get_instance(instance_name)
yield from instance.read(path, yield from instance.read(path,
...@@ -134,15 +142,15 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer): ...@@ -134,15 +142,15 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
# TODO: Sort out no instance name # TODO: Sort out no instance name
if path[0] == "uploads": if path[0] == "uploads":
if len(path) < 5 or path[2] != "blobs" or not path[4].isdigit(): if len(path) < 5 or path[2] != "blobs" or not path[4].isdigit():
raise InvalidArgumentError("Invalid resource name: {}".format(first_request.resource_name)) raise InvalidArgumentError("Invalid resource name: [{}]".format(first_request.resource_name))
instance_name = "" instance_name = ""
elif path[1] == "uploads": elif path[1] == "uploads":
if len(path) < 6 or path[3] != "blobs" or not path[5].isdigit(): if len(path) < 6 or path[3] != "blobs" or not path[5].isdigit():
raise InvalidArgumentError("Invalid resource name: {}".format(first_request.resource_name)) raise InvalidArgumentError("Invalid resource name: [{}]".format(first_request.resource_name))
else: else:
raise InvalidArgumentError("Invalid resource name: {}".format(first_request.resource_name)) raise InvalidArgumentError("Invalid resource name: [{}]".format(first_request.resource_name))
instance = self._get_instance(instance_name) instance = self._get_instance(instance_name)
return instance.write(requests) return instance.write(requests)
...@@ -169,4 +177,4 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer): ...@@ -169,4 +177,4 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
return self._instances[instance_name] return self._instances[instance_name]
except KeyError: except KeyError:
raise InvalidArgumentError("Invalid instance name: {}".format(instance_name)) raise InvalidArgumentError("Invalid instance name: [{}]".format(instance_name))
...@@ -25,9 +25,13 @@ import logging ...@@ -25,9 +25,13 @@ import logging
import grpc import grpc
from buildgrid.utils import gen_fetch_blob, gen_write_request_blob from buildgrid.client.cas import upload
from buildgrid._protos.google.bytestream import bytestream_pb2_grpc from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid._protos.google.rpc import code_pb2
from buildgrid._protos.google.rpc import status_pb2
from buildgrid.utils import gen_fetch_blob
from buildgrid.settings import HASH
from .storage_abc import StorageABC from .storage_abc import StorageABC
...@@ -36,7 +40,10 @@ class RemoteStorage(StorageABC): ...@@ -36,7 +40,10 @@ class RemoteStorage(StorageABC):
def __init__(self, channel, instance_name): def __init__(self, channel, instance_name):
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self._instance_name = instance_name
self.instance_name = instance_name
self.channel = channel
self._stub_bs = bytestream_pb2_grpc.ByteStreamStub(channel) self._stub_bs = bytestream_pb2_grpc.ByteStreamStub(channel)
self._stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(channel) self._stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(channel)
...@@ -50,17 +57,13 @@ class RemoteStorage(StorageABC): ...@@ -50,17 +57,13 @@ class RemoteStorage(StorageABC):
fetched_data = io.BytesIO() fetched_data = io.BytesIO()
length = 0 length = 0
for data in gen_fetch_blob(self._stub_bs, digest, self._instance_name): for data in gen_fetch_blob(self._stub_bs, digest, self.instance_name):
length += fetched_data.write(data) length += fetched_data.write(data)
if length:
assert digest.size_bytes == length assert digest.size_bytes == length
fetched_data.seek(0) fetched_data.seek(0)
return fetched_data return fetched_data
else:
return None
except grpc.RpcError as e: except grpc.RpcError as e:
if e.code() == grpc.StatusCode.NOT_FOUND: if e.code() == grpc.StatusCode.NOT_FOUND:
pass pass
...@@ -71,16 +74,14 @@ class RemoteStorage(StorageABC): ...@@ -71,16 +74,14 @@ class RemoteStorage(StorageABC):
return None return None
def begin_write(self, digest): def begin_write(self, digest):
return io.BytesIO(digest.SerializeToString()) return io.BytesIO()
def commit_write(self, digest, write_session): def commit_write(self, digest, write_session):
write_session.seek(0) with upload(self.channel, instance=self.instance_name) as cas:
cas.put_blob(write_session.getvalue())
for request in gen_write_request_blob(write_session, digest, self._instance_name):
self._stub_bs.Write(request)
def missing_blobs(self, blobs): def missing_blobs(self, blobs):
request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=self._instance_name) request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=self.instance_name)
for blob in blobs: for blob in blobs:
request_digest = request.blob_digests.add() request_digest = request.blob_digests.add()
...@@ -92,19 +93,15 @@ class RemoteStorage(StorageABC): ...@@ -92,19 +93,15 @@ class RemoteStorage(StorageABC):
return [x for x in response.missing_blob_digests] return [x for x in response.missing_blob_digests]
def bulk_update_blobs(self, blobs): def bulk_update_blobs(self, blobs):
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=self._instance_name) sent_digests = []
with upload(self.channel, instance=self.instance_name) as cas:
for digest, data in blobs: for digest, blob in blobs:
reqs = request.requests.add() if len(blob) != digest.size_bytes or HASH(blob).hexdigest() != digest.hash:
reqs.digest.CopyFrom(digest) sent_digests.append(remote_execution_pb2.Digest())
reqs.data = data else:
sent_digests.append(cas.put_blob(blob, digest=digest, queue=True))
response = self._stub_cas.BatchUpdateBlobs(request)
responses = response.responses
# Check everything was sent back, even if order changed assert len(sent_digests) == len(blobs)
assert ([x.digest for x in request.requests].sort(key=lambda x: x.hash)) == \
([x.digest for x in responses].sort(key=lambda x: x.hash))
return [x.status for x in responses] return [status_pb2.Status(code=code_pb2.OK) if d.ByteSize() > 0
else status_pb2.Status(code=code_pb2.UNKNOWN) for d in sent_digests]