Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • edbaunton/buildgrid
  • BuildGrid/buildgrid
  • bloomberg/buildgrid
  • devcurmudgeon/buildgrid
  • mhadjimichael/buildgrid
  • jmacarthur/buildgrid
  • rkothur/buildgrid
  • valentindavid/buildgrid
  • jjardon/buildgrid
  • RichKen/buildgrid
  • jbonney/buildgrid
  • onsha_alexander/buildgrid
  • santigl/buildgrid
  • mostynb/buildgrid
  • hoffbrinkle/buildgrid
  • Malinskiy/buildgrid
  • coldtom/buildgrid
  • azeemb_a/buildgrid
  • pointswaves/buildgrid
  • BenjaminSchubert/buildgrid
  • michaellee8/buildgrid
  • anil-anil/buildgrid
  • seanborg/buildgrid
  • jdelong12/buildgrid
  • jclay/buildgrid
  • bweston92/buildgrid
  • zchen723/buildgrid
  • cpratt34/buildgrid
  • armbiant/apache-buildgrid
  • armbiant/android-buildgrid
  • itsme300/buildgrid
  • sbairoliya/buildgrid
32 results
Select Git revision
Show changes
Commits on Source (4)
Showing
with 224 additions and 127 deletions
......@@ -16,13 +16,12 @@
import os
import subprocess
import tempfile
import grpc
from google.protobuf import any_pb2
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
from buildgrid.utils import read_file, parse_to_pb2_from_fetch
from buildgrid.utils import parse_to_pb2_from_fetch
def work_buildbox(context, lease):
......@@ -32,18 +31,7 @@ def work_buildbox(context, lease):
action_digest = remote_execution_pb2.Digest()
action_digest_any.Unpack(action_digest)
cert_server = read_file(context.server_cert)
cert_client = read_file(context.client_cert)
key_client = read_file(context.client_key)
# create server credentials
credentials = grpc.ssl_channel_credentials(root_certificates=cert_server,
private_key=key_client,
certificate_chain=cert_client)
channel = grpc.secure_channel('{}:{}'.format(context.remote, context.port), credentials)
stub = bytestream_pb2_grpc.ByteStreamStub(channel)
stub = bytestream_pb2_grpc.ByteStreamStub(context.cas_channel)
action = remote_execution_pb2.Action()
parse_to_pb2_from_fetch(action, stub, action_digest)
......@@ -66,13 +54,18 @@ def work_buildbox(context, lease):
with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as output_digest_file:
command = ['buildbox',
'--remote={}'.format('https://{}:{}'.format(context.remote, context.port)),
'--server-cert={}'.format(context.server_cert),
'--client-key={}'.format(context.client_key),
'--client-cert={}'.format(context.client_cert),
'--remote={}'.format(context.remote_cas_url),
'--input-digest={}'.format(input_digest_file.name),
'--output-digest={}'.format(output_digest_file.name),
'--local={}'.format(casdir)]
if context.cas_client_key:
command.append('--client-key={}'.format(context.cas_client_key))
if context.cas_client_cert:
command.append('--client-cert={}'.format(context.cas_client_cert))
if context.cas_server_cert:
command.append('--server-cert={}'.format(context.cas_server_cert))
if 'PWD' in environment and environment['PWD']:
command.append('--chdir={}'.format(environment['PWD']))
......
......@@ -30,7 +30,7 @@ def work_temp_directory(context, lease):
"""
parent = context.parent
stub_bytestream = bytestream_pb2_grpc.ByteStreamStub(context.channel)
stub_bytestream = bytestream_pb2_grpc.ByteStreamStub(context.cas_channel)
action_digest = remote_execution_pb2.Digest()
lease.payload.Unpack(action_digest)
......@@ -78,7 +78,7 @@ def work_temp_directory(context, lease):
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=parent,
requests=requests)
stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.cas_channel)
stub_cas.BatchUpdateBlobs(request)
result_any = any_pb2.Any()
......
......@@ -83,15 +83,23 @@ class Context:
client_key_pem = read_file(client_key)
else:
client_key_pem = None
client_key = None
if client_key_pem and client_cert and os.path.exists(client_cert):
client_cert_pem = read_file(client_cert)
else:
client_cert_pem = None
client_cert = None
return grpc.ssl_channel_credentials(root_certificates=server_cert_pem,
credentials = grpc.ssl_channel_credentials(root_certificates=server_cert_pem,
private_key=client_key_pem,
certificate_chain=client_cert_pem)
credentials.client_key = client_key
credentials.client_cert = client_cert
credentials.server_cert = server_cert
return credentials
def load_server_credentials(self, server_key=None, server_cert=None,
client_certs=None, use_default_client_certs=False):
"""Looks-up and loads TLS server gRPC credentials.
......@@ -132,11 +140,18 @@ class Context:
client_certs_pem = read_file(client_certs)
else:
client_certs_pem = None
client_certs = None
return grpc.ssl_server_credentials([(server_key_pem, server_cert_pem)],
credentials = grpc.ssl_server_credentials([(server_key_pem, server_cert_pem)],
root_certificates=client_certs_pem,
require_client_auth=bool(client_certs))
credentials.server_key = server_key
credentials.server_cert = server_cert
credentials.client_certs = client_certs
return credentials
pass_context = click.make_pass_decorator(Context, ensure=True)
cmd_folder = os.path.abspath(os.path.join(os.path.dirname(__file__),
......
......@@ -44,26 +44,79 @@ from ..cli import pass_context
help="Public client certificate for TLS (PEM-encoded)")
@click.option('--server-cert', type=click.Path(exists=True, dir_okay=False), default=None,
help="Public server certificate for TLS (PEM-encoded)")
@click.option('--remote-cas', type=click.STRING, default=None, show_default=True,
help="Remote CAS server's URL (port defaults to 11001 if not specified).")
@click.option('--cas-client-key', type=click.Path(exists=True, dir_okay=False), default=None,
help="Private CAS client key for TLS (PEM-encoded)")
@click.option('--cas-client-cert', type=click.Path(exists=True, dir_okay=False), default=None,
help="Public CAS client certificate for TLS (PEM-encoded)")
@click.option('--cas-server-cert', type=click.Path(exists=True, dir_okay=False), default=None,
help="Public CAS server certificate for TLS (PEM-encoded)")
@click.option('--parent', type=click.STRING, default='main', show_default=True,
help="Targeted farm resource.")
@pass_context
def cli(context, remote, parent, client_key, client_cert, server_cert):
def cli(context, parent, remote, client_key, client_cert, server_cert,
remote_cas, cas_client_key, cas_client_cert, cas_server_cert):
# Setup the remote execution server channel:
url = urlparse(remote)
context.remote = '{}:{}'.format(url.hostname, url.port or 50051)
context.remote_url = remote
context.parent = parent
if url.scheme == 'http':
context.channel = grpc.insecure_channel(context.remote)
context.client_key = None
context.client_cert = None
context.server_cert = None
else:
credentials = context.load_client_credentials(client_key, client_cert, server_cert)
if not credentials:
click.echo("ERROR: no TLS keys were specified and no defaults could be found.\n" +
"Use --allow-insecure in order to deactivate TLS encryption.\n", err=True)
click.echo("ERROR: no TLS keys were specified and no defaults could be found.", err=True)
sys.exit(-1)
context.channel = grpc.secure_channel(context.remote, credentials)
context.client_key = credentials.client_key
context.client_cert = credentials.client_cert
context.server_cert = credentials.server_cert
# Setup the remote CAS server channel, if separated:
if remote_cas is not None and remote_cas != remote:
cas_url = urlparse(remote_cas)
context.remote_cas = '{}:{}'.format(cas_url.hostname, cas_url.port or 11001)
context.remote_cas_url = remote_cas
if cas_url.scheme == 'http':
context.cas_channel = grpc.insecure_channel(context.remote_cas)
context.cas_client_key = None
context.cas_client_cert = None
context.cas_server_cert = None
else:
cas_credentials = context.load_client_credentials(cas_client_key, cas_client_cert, cas_server_cert)
if not cas_credentials:
click.echo("ERROR: no TLS keys were specified and no defaults could be found.", err=True)
sys.exit(-1)
context.cas_channel = grpc.secure_channel(context.remote_cas, cas_credentials)
context.cas_client_key = cas_credentials.client_key
context.cas_client_cert = cas_credentials.client_cert
context.cas_server_cert = cas_credentials.server_cert
else:
context.remote_cas = context.remote
context.remote_cas_url = remote
context.cas_channel = context.channel
context.cas_client_key = context.client_key
context.cas_client_cert = context.client_cert
context.cas_server_cert = context.server_cert
context.logger = logging.getLogger(__name__)
context.logger.debug("Starting for remote {}".format(context.remote))
......@@ -112,35 +165,17 @@ def run_temp_directory(context):
help="Main mount-point location.")
@click.option('--local-cas', type=click.Path(readable=False), default=str(PurePath(Path.home(), 'cas')),
help="Local CAS cache directory.")
@click.option('--client-cert', type=click.Path(readable=False), default=str(PurePath(Path.home(), 'client.crt')),
help="Public client certificate for TLS (PEM-encoded).")
@click.option('--client-key', type=click.Path(readable=False), default=str(PurePath(Path.home(), 'client.key')),
help="Private client key for TLS (PEM-encoded).")
@click.option('--server-cert', type=click.Path(readable=False), default=str(PurePath(Path.home(), 'server.crt')),
help="Public server certificate for TLS (PEM-encoded).")
@click.option('--port', type=click.INT, default=11001, show_default=True,
help="Remote CAS server port.")
@click.option('--remote', type=click.STRING, default='localhost', show_default=True,
help="Remote CAS server hostname.")
@pass_context
def run_buildbox(context, remote, port, server_cert, client_key, client_cert, local_cas, fuse_dir):
def run_buildbox(context, local_cas, fuse_dir):
"""
Uses BuildBox to run commands.
"""
context.logger.info("Creating a bot session")
context.remote = remote
context.port = port
context.server_cert = server_cert
context.client_key = client_key
context.client_cert = client_cert
context.local_cas = local_cas
context.fuse_dir = fuse_dir
try:
b = bot.Bot(context.bot_session)
b.session(work=buildbox.work_buildbox,
context=context)
b.session(buildbox.work_buildbox,
context)
except KeyboardInterrupt:
pass
......@@ -31,7 +31,7 @@ from buildgrid.server.cas.storage.disk import DiskStorage
from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
from buildgrid.server.cas.storage.s3 import S3Storage
from buildgrid.server.cas.storage.with_cache import WithCacheStorage
from buildgrid.server.execution.action_cache import ActionCache
from buildgrid.server.actioncache.storage import ActionCache
from ..cli import pass_context
......
......@@ -39,9 +39,7 @@ class BgdError(Exception):
class ErrorDomain(Enum):
SERVER = 1
EXECUTION = 2
WORKER = 3
BOT = 4
BOT = 2
class ServerError(BgdError):
......@@ -49,16 +47,6 @@ class ServerError(BgdError):
super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
class ExecutionError(BgdError):
def __init__(self, message, detail=None, reason=None):
super().__init__(message, detail=detail, domain=ErrorDomain.EXECUTION, reason=reason)
class WorkerError(BgdError):
def __init__(self, message, detail=None, reason=None):
super().__init__(message, detail=detail, domain=ErrorDomain.WORKER, reason=reason)
class BotError(BgdError):
def __init__(self, message, detail=None, reason=None):
super().__init__(message, detail=detail, domain=ErrorDomain.BOT, reason=reason)
......@@ -21,7 +21,7 @@ Implements an in-memory action Cache
"""
from ..cas.reference_cache import ReferenceCache
from ..referencestorage.storage import ReferenceCache
class ActionCache(ReferenceCache):
......
......@@ -29,13 +29,12 @@ from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_p
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
from buildgrid._protos.google.longrunning import operations_pb2_grpc
from .buildgrid_instance import BuildGridInstance
from .cas.bytestream_service import ByteStreamService
from .cas.content_addressable_storage_service import ContentAddressableStorageService
from .execution.action_cache_service import ActionCacheService
from .execution.execution_service import ExecutionService
from .execution.operations_service import OperationsService
from .worker.bots_service import BotsService
from .instance import BuildGridInstance
from .cas.service import ByteStreamService, ContentAddressableStorageService
from .actioncache.service import ActionCacheService
from .execution.service import ExecutionService
from .operations.service import OperationsService
from .bots.service import BotsService
class BuildGridServer:
......
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
ContentAddressableStorageService
==================
Implements the Content Addressable Storage API, which provides methods
to check for missing CAS blobs and update them in bulk.
"""
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc as re_pb2_grpc
class ContentAddressableStorageService(re_pb2_grpc.ContentAddressableStorageServicer):
def __init__(self, storage):
self._storage = storage
def FindMissingBlobs(self, request, context):
# Only one instance for now.
storage = self._storage
return re_pb2.FindMissingBlobsResponse(
missing_blob_digests=storage.missing_blobs(request.blob_digests))
def BatchUpdateBlobs(self, request, context):
# Only one instance for now.
storage = self._storage
requests = []
for request_proto in request.requests:
requests.append((request_proto.digest, request_proto.data))
response = re_pb2.BatchUpdateBlobsResponse()
for (digest, _), status in zip(requests, storage.bulk_update_blobs(requests)):
response_proto = response.responses.add()
response_proto.digest.CopyFrom(digest)
response_proto.status.CopyFrom(status)
return response
......@@ -14,21 +14,47 @@
"""
ByteStreamService
CAS services
==================
Implements the ByteStream API, which clients can use to read and write
CAS blobs.
Implements the Content Addressable Storage API and ByteStream API.
"""
import grpc
from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc as re_pb2_grpc
from ...settings import HASH
class ContentAddressableStorageService(re_pb2_grpc.ContentAddressableStorageServicer):
def __init__(self, storage):
self._storage = storage
def FindMissingBlobs(self, request, context):
# Only one instance for now.
storage = self._storage
return re_pb2.FindMissingBlobsResponse(
missing_blob_digests=storage.missing_blobs(request.blob_digests))
def BatchUpdateBlobs(self, request, context):
# Only one instance for now.
storage = self._storage
requests = []
for request_proto in request.requests:
requests.append((request_proto.digest, request_proto.data))
response = re_pb2.BatchUpdateBlobsResponse()
for (digest, _), status in zip(requests, storage.bulk_update_blobs(requests)):
response_proto = response.responses.add()
response_proto.digest.CopyFrom(digest)
response_proto.status.CopyFrom(status)
return response
class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
BLOCK_SIZE = 1 * 1024 * 1024 # 1 MB block size
......
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
RemoteStorage
==================
Forwwards storage requests to a remote storage.
"""
import io
import logging
import grpc
from buildgrid.utils import create_digest, gen_fetch_blob, gen_write_request_blob
from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from .storage_abc import StorageABC
class RemoteStorage(StorageABC):
def __init__(self, channel, instance_name):
self.logger = logging.getLogger(__name__)
self._instance_name = instance_name
self._stub_bs = bytestream_pb2_grpc.ByteStreamStub(channel)
self._stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(channel)
def has_blob(self, digest):
try:
self.get_blob(digest)
return True
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.NOT_FOUND:
return False
else:
raise
def get_blob(self, digest):
fetched_data = None
for data in gen_fetch_blob(self._stub_bs, digest, self._instance_name):
fetched_data += data
assert digest.size_bytes == len(fetched_data)
return io.BytesIO(fetched_data)
def begin_write(self, digest):
return gen_write_request_blob(digest, self._instance_name)
def commit_write(self, digest, write_session):
for request in write_session:
self._stub_bs.Write(request)
def missing_blobs(self, blobs):
digests = []
for blob in blobs:
digests.append(create_digest(blob))
request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=self._instance_name,
blob_digests=digests)
response = self._stub_cas.FindMissingBlobs(request)
return response.missing_blob_digests
def bulk_update_blobs(self, blobs):
requests = []
for blob in blobs:
request = remote_execution_pb2.BatchUpdateBlobsRequest.Request(digest=create_digest(blob),
data=blob)
requests.append(request)
response = self._stub_cas.BatchUpdateBlobs(instance_name=self._instance_name,
requests=requests)
result = [x.status for x in response.responses]
return result
......@@ -25,9 +25,9 @@ Contains scheduler, execution instance and an interface to the bots.
import logging
from .execution.execution_instance import ExecutionInstance
from .execution.instance import ExecutionInstance
from .scheduler import Scheduler
from .worker.bots_interface import BotsInterface
from .bots.instance import BotsInterface
class BuildGridInstance(ExecutionInstance, BotsInterface):
......