Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • edbaunton/buildgrid
  • BuildGrid/buildgrid
  • bloomberg/buildgrid
  • devcurmudgeon/buildgrid
  • mhadjimichael/buildgrid
  • jmacarthur/buildgrid
  • rkothur/buildgrid
  • valentindavid/buildgrid
  • jjardon/buildgrid
  • RichKen/buildgrid
  • jbonney/buildgrid
  • onsha_alexander/buildgrid
  • santigl/buildgrid
  • mostynb/buildgrid
  • hoffbrinkle/buildgrid
  • Malinskiy/buildgrid
  • coldtom/buildgrid
  • azeemb_a/buildgrid
  • pointswaves/buildgrid
  • BenjaminSchubert/buildgrid
  • michaellee8/buildgrid
  • anil-anil/buildgrid
  • seanborg/buildgrid
  • jdelong12/buildgrid
  • jclay/buildgrid
  • bweston92/buildgrid
  • zchen723/buildgrid
  • cpratt34/buildgrid
  • armbiant/apache-buildgrid
  • armbiant/android-buildgrid
  • itsme300/buildgrid
  • sbairoliya/buildgrid
32 results
Show changes
Commits on Source (10)
...@@ -104,7 +104,7 @@ def work_buildbox(context, lease): ...@@ -104,7 +104,7 @@ def work_buildbox(context, lease):
output_tree = _cas_tree_maker(stub_bytestream, output_digest) output_tree = _cas_tree_maker(stub_bytestream, output_digest)
with upload(context.cas_channel) as cas: with upload(context.cas_channel) as cas:
output_tree_digest = cas.send_message(output_tree) output_tree_digest = cas.put_message(output_tree)
output_directory = remote_execution_pb2.OutputDirectory() output_directory = remote_execution_pb2.OutputDirectory()
output_directory.tree_digest.CopyFrom(output_tree_digest) output_directory.tree_digest.CopyFrom(output_tree_digest)
......
...@@ -27,8 +27,9 @@ from urllib.parse import urlparse ...@@ -27,8 +27,9 @@ from urllib.parse import urlparse
import click import click
import grpc import grpc
from buildgrid.utils import merkle_maker, create_digest from buildgrid.client.cas import upload
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid.utils import merkle_maker
from ..cli import pass_context from ..cli import pass_context
...@@ -66,27 +67,31 @@ def cli(context, remote, instance_name, client_key, client_cert, server_cert): ...@@ -66,27 +67,31 @@ def cli(context, remote, instance_name, client_key, client_cert, server_cert):
@cli.command('upload-files', short_help="Upload files to the CAS server.") @cli.command('upload-files', short_help="Upload files to the CAS server.")
@click.argument('files', nargs=-1, type=click.File('rb'), required=True) @click.argument('files', nargs=-1, type=click.Path(exists=True, dir_okay=False), required=True)
@pass_context @pass_context
def upload_files(context, files): def upload_files(context, files):
stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel) sent_digests, file_map = list(), dict()
with upload(context.channel, instance=context.instance_name) as cas:
for file_path in files:
context.logger.info("Queueing {}".format(file_path))
requests = [] file_digest = cas.upload_file(file_path, queue=True)
for file in files:
chunk = file.read()
requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
digest=create_digest(chunk), data=chunk))
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=context.instance_name, assert file_digest.hash and file_digest.size_bytes
requests=requests)
context.logger.info("Sending: {}".format(request)) file_map[file_digest.hash] = file_path
response = stub.BatchUpdateBlobs(request) sent_digests.append(file_digest)
context.logger.info("Response: {}".format(response))
for file_digest in sent_digests:
file_path = file_map[file_digest.hash]
if file_digest.ByteSize():
context.logger.info("{}: {}".format(file_path, file_digest.hash))
else:
context.logger.info("{}: FAILED".format(file_path))
@cli.command('upload-dir', short_help="Upload a directory to the CAS server.") @cli.command('upload-dir', short_help="Upload a directory to the CAS server.")
@click.argument('directory', nargs=1, type=click.Path(), required=True) @click.argument('directory', nargs=1, type=click.Path(exists=True, file_okay=False), required=True)
@pass_context @pass_context
def upload_dir(context, directory): def upload_dir(context, directory):
context.logger.info("Uploading directory to cas") context.logger.info("Uploading directory to cas")
......
...@@ -30,9 +30,10 @@ from urllib.parse import urlparse ...@@ -30,9 +30,10 @@ from urllib.parse import urlparse
import click import click
import grpc import grpc
from buildgrid.utils import merkle_maker, create_digest, write_fetch_blob from buildgrid.client.cas import upload
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid._protos.google.bytestream import bytestream_pb2_grpc from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
from buildgrid.utils import merkle_maker, write_fetch_blob
from ..cli import pass_context from ..cli import pass_context
...@@ -119,46 +120,37 @@ def wait_execution(context, operation_name): ...@@ -119,46 +120,37 @@ def wait_execution(context, operation_name):
@click.argument('input-root', nargs=1, type=click.Path(), required=True) @click.argument('input-root', nargs=1, type=click.Path(), required=True)
@click.argument('commands', nargs=-1, type=click.STRING, required=True) @click.argument('commands', nargs=-1, type=click.STRING, required=True)
@pass_context @pass_context
def command(context, input_root, commands, output_file, output_directory): def run_command(context, input_root, commands, output_file, output_directory):
stub = remote_execution_pb2_grpc.ExecutionStub(context.channel) stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
execute_command = remote_execution_pb2.Command() output_executeables = list()
with upload(context.channel, instance=context.instance_name) as cas:
command = remote_execution_pb2.Command()
for arg in commands: for arg in commands:
execute_command.arguments.extend([arg]) command.arguments.extend([arg])
output_executeables = [] for file, is_executeable in output_file:
for file, is_executeable in output_file: command.output_files.extend([file])
execute_command.output_files.extend([file]) if is_executeable:
if is_executeable: output_executeables.append(file)
output_executeables.append(file)
command_digest = create_digest(execute_command.SerializeToString()) command_digest = cas.put_message(command, queue=True)
context.logger.info(command_digest)
# TODO: Check for missing blobs context.logger.info('Sent command: {}'.format(command_digest))
digest = None
for _, digest in merkle_maker(input_root):
pass
action = remote_execution_pb2.Action(command_digest=command_digest, # TODO: Check for missing blobs
input_root_digest=digest, input_root_digest = None
do_not_cache=True) for _, input_root_digest in merkle_maker(input_root):
pass
action_digest = create_digest(action.SerializeToString()) action = remote_execution_pb2.Action(command_digest=command_digest,
input_root_digest=input_root_digest,
do_not_cache=True)
context.logger.info("Sending execution request...") action_digest = cas.put_message(action, queue=True)
requests = []
requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
digest=command_digest, data=execute_command.SerializeToString()))
requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
digest=action_digest, data=action.SerializeToString()))
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=context.instance_name, context.logger.info("Sent action: {}".format(action_digest))
requests=requests)
remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel).BatchUpdateBlobs(request)
request = remote_execution_pb2.ExecuteRequest(instance_name=context.instance_name, request = remote_execution_pb2.ExecuteRequest(instance_name=context.instance_name,
action_digest=action_digest, action_digest=action_digest,
......
...@@ -7,7 +7,7 @@ server: ...@@ -7,7 +7,7 @@ server:
tls-client-certs: null tls-client-certs: null
description: | description: |
Just a CAS. Just a CAS with some reference storage.
instances: instances:
- name: main - name: main
...@@ -24,3 +24,8 @@ instances: ...@@ -24,3 +24,8 @@ instances:
- !bytestream - !bytestream
storage: *main-storage storage: *main-storage
- !reference-cache
storage: *main-storage
max_cached_refs: 256
allow_updates: true
...@@ -23,6 +23,7 @@ import yaml ...@@ -23,6 +23,7 @@ import yaml
from buildgrid.server.controller import ExecutionController from buildgrid.server.controller import ExecutionController
from buildgrid.server.actioncache.storage import ActionCache from buildgrid.server.actioncache.storage import ActionCache
from buildgrid.server.referencestorage.storage import ReferenceCache
from buildgrid.server.cas.instance import ByteStreamInstance, ContentAddressableStorageInstance from buildgrid.server.cas.instance import ByteStreamInstance, ContentAddressableStorageInstance
from buildgrid.server.cas.storage.disk import DiskStorage from buildgrid.server.cas.storage.disk import DiskStorage
from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
...@@ -126,10 +127,18 @@ class Action(YamlFactory): ...@@ -126,10 +127,18 @@ class Action(YamlFactory):
yaml_tag = u'!action-cache' yaml_tag = u'!action-cache'
def __new__(cls, storage, max_cached_refs=0, allow_updates=True): def __new__(cls, storage, max_cached_refs, allow_updates=True):
return ActionCache(storage, max_cached_refs, allow_updates) return ActionCache(storage, max_cached_refs, allow_updates)
class Reference(YamlFactory):
yaml_tag = u'!reference-cache'
def __new__(cls, storage, max_cached_refs, allow_updates=True):
return ReferenceCache(storage, max_cached_refs, allow_updates)
class CAS(YamlFactory): class CAS(YamlFactory):
yaml_tag = u'!cas' yaml_tag = u'!cas'
...@@ -160,9 +169,9 @@ def _parse_size(size): ...@@ -160,9 +169,9 @@ def _parse_size(size):
def get_parser(): def get_parser():
yaml.SafeLoader.add_constructor(Execution.yaml_tag, Execution.from_yaml)
yaml.SafeLoader.add_constructor(Execution.yaml_tag, Execution.from_yaml) yaml.SafeLoader.add_constructor(Execution.yaml_tag, Execution.from_yaml)
yaml.SafeLoader.add_constructor(Action.yaml_tag, Action.from_yaml) yaml.SafeLoader.add_constructor(Action.yaml_tag, Action.from_yaml)
yaml.SafeLoader.add_constructor(Reference.yaml_tag, Reference.from_yaml)
yaml.SafeLoader.add_constructor(Disk.yaml_tag, Disk.from_yaml) yaml.SafeLoader.add_constructor(Disk.yaml_tag, Disk.from_yaml)
yaml.SafeLoader.add_constructor(LRU.yaml_tag, LRU.from_yaml) yaml.SafeLoader.add_constructor(LRU.yaml_tag, LRU.from_yaml)
yaml.SafeLoader.add_constructor(S3.yaml_tag, S3.from_yaml) yaml.SafeLoader.add_constructor(S3.yaml_tag, S3.from_yaml)
......
...@@ -17,9 +17,29 @@ from contextlib import contextmanager ...@@ -17,9 +17,29 @@ from contextlib import contextmanager
import uuid import uuid
import os import os
from buildgrid.settings import HASH import grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
from buildgrid._protos.google.rpc import code_pb2
from buildgrid.settings import HASH
class CallCache:
"""Per remote grpc.StatusCode.UNIMPLEMENTED call cache."""
__calls = dict()
@classmethod
def mark_unimplemented(cls, channel, name):
if channel not in cls.__calls:
cls.__calls[channel] = set()
cls.__calls[channel].add(name)
@classmethod
def unimplemented(cls, channel, name):
if channel not in cls.__calls:
return False
return name in cls.__calls[channel]
@contextmanager @contextmanager
...@@ -28,7 +48,7 @@ def upload(channel, instance=None, u_uid=None): ...@@ -28,7 +48,7 @@ def upload(channel, instance=None, u_uid=None):
try: try:
yield uploader yield uploader
finally: finally:
uploader.flush() uploader.close()
class Uploader: class Uploader:
...@@ -47,6 +67,7 @@ class Uploader: ...@@ -47,6 +67,7 @@ class Uploader:
FILE_SIZE_THRESHOLD = 1 * 1024 * 1024 FILE_SIZE_THRESHOLD = 1 * 1024 * 1024
MAX_REQUEST_SIZE = 2 * 1024 * 1024 MAX_REQUEST_SIZE = 2 * 1024 * 1024
MAX_REQUEST_COUNT = 500
def __init__(self, channel, instance=None, u_uid=None): def __init__(self, channel, instance=None, u_uid=None):
"""Initializes a new :class:`Uploader` instance. """Initializes a new :class:`Uploader` instance.
...@@ -68,8 +89,61 @@ class Uploader: ...@@ -68,8 +89,61 @@ class Uploader:
self.__cas_stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel) self.__cas_stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
self.__requests = dict() self.__requests = dict()
self.__request_count = 0
self.__request_size = 0 self.__request_size = 0
# --- Public API ---
def put_blob(self, blob, digest=None, queue=False):
"""Stores a blob into the remote CAS server.
If queuing is allowed (`queue=True`), the upload request **may** be
defer. An explicit call to :method:`flush` can force the request to be
send immediately (along with the rest of the queued batch).
Args:
blob (bytes): the blob's data.
digest (:obj:`Digest`, optional): the blob's digest.
queue (bool, optional): whether or not the upload request may be
queued and submitted as part of a batch upload request. Defaults
to False.
Returns:
:obj:`Digest`: the sent blob's digest.
"""
if not queue or len(blob) > Uploader.FILE_SIZE_THRESHOLD:
blob_digest = self._send_blob(blob, digest=digest)
else:
blob_digest = self._queue_blob(blob, digest=digest)
return blob_digest
def put_message(self, message, digest=None, queue=False):
"""Stores a message into the remote CAS server.
If queuing is allowed (`queue=True`), the upload request **may** be
defer. An explicit call to :method:`flush` can force the request to be
send immediately (along with the rest of the queued batch).
Args:
message (:obj:`Message`): the message object.
digest (:obj:`Digest`, optional): the message's digest.
queue (bool, optional): whether or not the upload request may be
queued and submitted as part of a batch upload request. Defaults
to False.
Returns:
:obj:`Digest`: the sent message's digest.
"""
message_blob = message.SerializeToString()
if not queue or len(message_blob) > Uploader.FILE_SIZE_THRESHOLD:
message_digest = self._send_blob(message_blob, digest=digest)
else:
message_digest = self._queue_blob(message_blob, digest=digest)
return message_digest
def upload_file(self, file_path, queue=True): def upload_file(self, file_path, queue=True):
"""Stores a local file into the remote CAS storage. """Stores a local file into the remote CAS storage.
...@@ -79,7 +153,7 @@ class Uploader: ...@@ -79,7 +153,7 @@ class Uploader:
Args: Args:
file_path (str): absolute or relative path to a local file. file_path (str): absolute or relative path to a local file.
queue (bool, optional): wheter or not the upload request may be queue (bool, optional): whether or not the upload request may be
queued and submitted as part of a batch upload request. Defaults queued and submitted as part of a batch upload request. Defaults
to True. to True.
...@@ -96,11 +170,11 @@ class Uploader: ...@@ -96,11 +170,11 @@ class Uploader:
file_bytes = bytes_steam.read() file_bytes = bytes_steam.read()
if not queue or len(file_bytes) > Uploader.FILE_SIZE_THRESHOLD: if not queue or len(file_bytes) > Uploader.FILE_SIZE_THRESHOLD:
blob_digest = self._send_blob(file_bytes) file_digest = self._send_blob(file_bytes)
else: else:
blob_digest = self._queue_blob(file_bytes) file_digest = self._queue_blob(file_bytes)
return blob_digest return file_digest
def upload_directory(self, directory, queue=True): def upload_directory(self, directory, queue=True):
"""Stores a :obj:`Directory` into the remote CAS storage. """Stores a :obj:`Directory` into the remote CAS storage.
...@@ -126,50 +200,37 @@ class Uploader: ...@@ -126,50 +200,37 @@ class Uploader:
else: else:
return self._queue_blob(directory.SerializeToString()) return self._queue_blob(directory.SerializeToString())
def send_message(self, message):
"""Stores a message into the remote CAS storage.
Args:
message (:obj:`Message`): a protobuf message object.
Returns:
:obj:`Digest`: The digest of the message.
"""
return self._send_blob(message.SerializeToString())
def flush(self): def flush(self):
"""Ensures any queued request gets sent.""" """Ensures any queued request gets sent."""
if self.__requests: if self.__requests:
self._send_batch() self._send_blob_batch(self.__requests)
def _queue_blob(self, blob):
"""Queues a memory block for later batch upload"""
blob_digest = remote_execution_pb2.Digest()
blob_digest.hash = HASH(blob).hexdigest()
blob_digest.size_bytes = len(blob)
if self.__request_size + len(blob) > Uploader.MAX_REQUEST_SIZE: self.__requests.clear()
self._send_batch() self.__request_count = 0
self.__request_size = 0
update_request = remote_execution_pb2.BatchUpdateBlobsRequest.Request() def close(self):
update_request.digest.CopyFrom(blob_digest) """Closes the underlying connection stubs.
update_request.data = blob
update_request_size = update_request.ByteSize() Note:
if self.__request_size + update_request_size > Uploader.MAX_REQUEST_SIZE: This will always send pending requests before closing connections,
self._send_batch() if any.
"""
self.flush()
self.__requests[update_request.digest.hash] = update_request self.__bytestream_stub = None
self.__request_size += update_request_size self.__cas_stub = None
return blob_digest # --- Private API ---
def _send_blob(self, blob): def _send_blob(self, blob, digest=None):
"""Sends a memory block using ByteStream.Write()""" """Sends a memory block using ByteStream.Write()"""
blob_digest = remote_execution_pb2.Digest() blob_digest = remote_execution_pb2.Digest()
blob_digest.hash = HASH(blob).hexdigest() if digest is not None:
blob_digest.size_bytes = len(blob) blob_digest.CopyFrom(digest)
else:
blob_digest.hash = HASH(blob).hexdigest()
blob_digest.size_bytes = len(blob)
if self.instance_name is not None: if self.instance_name is not None:
resource_name = '/'.join([self.instance_name, 'uploads', self.u_uid, 'blobs', resource_name = '/'.join([self.instance_name, 'uploads', self.u_uid, 'blobs',
blob_digest.hash, str(blob_digest.size_bytes)]) blob_digest.hash, str(blob_digest.size_bytes)])
...@@ -204,18 +265,64 @@ class Uploader: ...@@ -204,18 +265,64 @@ class Uploader:
return blob_digest return blob_digest
def _send_batch(self): def _queue_blob(self, blob, digest=None):
"""Queues a memory block for later batch upload"""
blob_digest = remote_execution_pb2.Digest()
if digest is not None:
blob_digest.CopyFrom(digest)
else:
blob_digest.hash = HASH(blob).hexdigest()
blob_digest.size_bytes = len(blob)
if self.__request_size + blob_digest.size_bytes > Uploader.MAX_REQUEST_SIZE:
self.flush()
elif self.__request_count >= Uploader.MAX_REQUEST_COUNT:
self.flush()
self.__requests[blob_digest.hash] = (blob, blob_digest)
self.__request_count += 1
self.__request_size += blob_digest.size_bytes
return blob_digest
def _send_blob_batch(self, batch):
"""Sends queued data using ContentAddressableStorage.BatchUpdateBlobs()""" """Sends queued data using ContentAddressableStorage.BatchUpdateBlobs()"""
batch_request = remote_execution_pb2.BatchUpdateBlobsRequest() batch_fetched = False
batch_request.requests.extend(self.__requests.values()) written_digests = list()
if self.instance_name is not None:
batch_request.instance_name = self.instance_name
batch_response = self.__cas_stub.BatchUpdateBlobs(batch_request) # First, try BatchUpdateBlobs(), if not already known not being implemented:
if not CallCache.unimplemented(self.channel, 'BatchUpdateBlobs'):
batch_request = remote_execution_pb2.BatchUpdateBlobsRequest()
if self.instance_name is not None:
batch_request.instance_name = self.instance_name
for response in batch_response.responses: for blob, digest in batch.values():
assert response.digest.hash in self.__requests request = batch_request.requests.add()
assert response.status.code is 0 request.digest.CopyFrom(digest)
request.data = blob
self.__requests.clear() try:
self.__request_size = 0 batch_response = self.__cas_stub.BatchUpdateBlobs(batch_request)
for response in batch_response.responses:
assert response.digest.hash in batch
written_digests.append(response.digest)
if response.status.code != code_pb2.OK:
response.digest.Clear()
batch_fetched = True
except grpc.RpcError as e:
status_code = e.code()
if status_code == grpc.StatusCode.UNIMPLEMENTED:
CallCache.mark_unimplemented(self.channel, 'BatchUpdateBlobs')
else:
assert False
# Fallback to Write() if no BatchUpdateBlobs():
if not batch_fetched:
for blob, digest in batch.values():
written_digests.append(self._send_blob(blob, digest=digest))
return written_digests
...@@ -25,9 +25,12 @@ import logging ...@@ -25,9 +25,12 @@ import logging
import grpc import grpc
from buildgrid.utils import gen_fetch_blob, gen_write_request_blob from buildgrid.client.cas import upload
from buildgrid._protos.google.bytestream import bytestream_pb2_grpc from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid._protos.google.rpc import code_pb2
from buildgrid._protos.google.rpc import status_pb2
from buildgrid.utils import gen_fetch_blob
from .storage_abc import StorageABC from .storage_abc import StorageABC
...@@ -36,7 +39,10 @@ class RemoteStorage(StorageABC): ...@@ -36,7 +39,10 @@ class RemoteStorage(StorageABC):
def __init__(self, channel, instance_name): def __init__(self, channel, instance_name):
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self._instance_name = instance_name
self.instance_name = instance_name
self.channel = channel
self._stub_bs = bytestream_pb2_grpc.ByteStreamStub(channel) self._stub_bs = bytestream_pb2_grpc.ByteStreamStub(channel)
self._stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(channel) self._stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(channel)
...@@ -50,7 +56,7 @@ class RemoteStorage(StorageABC): ...@@ -50,7 +56,7 @@ class RemoteStorage(StorageABC):
fetched_data = io.BytesIO() fetched_data = io.BytesIO()
length = 0 length = 0
for data in gen_fetch_blob(self._stub_bs, digest, self._instance_name): for data in gen_fetch_blob(self._stub_bs, digest, self.instance_name):
length += fetched_data.write(data) length += fetched_data.write(data)
if length: if length:
...@@ -71,16 +77,14 @@ class RemoteStorage(StorageABC): ...@@ -71,16 +77,14 @@ class RemoteStorage(StorageABC):
return None return None
def begin_write(self, digest): def begin_write(self, digest):
return io.BytesIO(digest.SerializeToString()) return io.BytesIO()
def commit_write(self, digest, write_session): def commit_write(self, digest, write_session):
write_session.seek(0) with upload(self.channel, instance=self.instance_name) as cas:
cas.put_blob(write_session.getvalue())
for request in gen_write_request_blob(write_session, digest, self._instance_name):
self._stub_bs.Write(request)
def missing_blobs(self, blobs): def missing_blobs(self, blobs):
request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=self._instance_name) request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=self.instance_name)
for blob in blobs: for blob in blobs:
request_digest = request.blob_digests.add() request_digest = request.blob_digests.add()
...@@ -92,19 +96,12 @@ class RemoteStorage(StorageABC): ...@@ -92,19 +96,12 @@ class RemoteStorage(StorageABC):
return [x for x in response.missing_blob_digests] return [x for x in response.missing_blob_digests]
def bulk_update_blobs(self, blobs): def bulk_update_blobs(self, blobs):
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=self._instance_name) sent_digests = list()
with upload(self.channel, instance=self.instance_name) as cas:
for digest, data in blobs: for digest, blob in blobs:
reqs = request.requests.add() sent_digests.append(cas.put_blob(blob, digest=digest, queue=True))
reqs.digest.CopyFrom(digest)
reqs.data = data
response = self._stub_cas.BatchUpdateBlobs(request)
responses = response.responses
# Check everything was sent back, even if order changed assert len(sent_digests) == len(blobs)
assert ([x.digest for x in request.requests].sort(key=lambda x: x.hash)) == \
([x.digest for x in responses].sort(key=lambda x: x.hash))
return [x.status for x in responses] return [status_pb2.Status(code=code_pb2.OK) if d.ByteSize() > 0
else status_pb2.Status(code=code_pb2.UNKNOWN) for d in sent_digests]
...@@ -15,7 +15,6 @@ ...@@ -15,7 +15,6 @@
from operator import attrgetter from operator import attrgetter
import os import os
import uuid
from buildgrid.settings import HASH from buildgrid.settings import HASH
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
...@@ -34,32 +33,6 @@ def gen_fetch_blob(stub, digest, instance_name=""): ...@@ -34,32 +33,6 @@ def gen_fetch_blob(stub, digest, instance_name=""):
yield response.data yield response.data
def gen_write_request_blob(digest_bytes, digest, instance_name=""):
""" Generates a bytestream write request
"""
resource_name = os.path.join(instance_name, 'uploads', str(uuid.uuid4()),
'blobs', digest.hash, str(digest.size_bytes))
offset = 0
finished = False
remaining = digest.size_bytes
while not finished:
chunk_size = min(remaining, 64 * 1024)
remaining -= chunk_size
finished = remaining <= 0
request = bytestream_pb2.WriteRequest()
request.resource_name = resource_name
request.write_offset = offset
request.data = digest_bytes.read(chunk_size)
request.finish_write = finished
yield request
offset += chunk_size
def write_fetch_directory(root_directory, stub, digest, instance_name=None): def write_fetch_directory(root_directory, stub, digest, instance_name=None):
"""Locally replicates a directory from CAS. """Locally replicates a directory from CAS.
...@@ -280,8 +253,12 @@ def tree_maker(directory_path, cas=None): ...@@ -280,8 +253,12 @@ def tree_maker(directory_path, cas=None):
tree.children.extend(child_directories) tree.children.extend(child_directories)
tree.root.CopyFrom(directory) tree.root.CopyFrom(directory)
# Ensure that we've uploded the tree structure first
if cas is not None:
cas.flush()
if cas is not None: if cas is not None:
tree_digest = cas.send_message(tree) tree_digest = cas.put_message(tree)
else: else:
tree_digest = create_digest(tree.SerializeToString()) tree_digest = create_digest(tree.SerializeToString())
......
...@@ -89,6 +89,7 @@ tests_require = [ ...@@ -89,6 +89,7 @@ tests_require = [
'coverage == 4.4.0', 'coverage == 4.4.0',
'moto', 'moto',
'pep8', 'pep8',
'psutil',
'pytest == 3.6.4', 'pytest == 3.6.4',
'pytest-cov >= 2.6.0', 'pytest-cov >= 2.6.0',
'pytest-pep8', 'pytest-pep8',
......
#include <iostream>
int main()
{
std::cout << "Hello, World!" << std::endl;
return 0;
}
#include <stdio.h>
#include "hello.h"
int main()
{
printf("%s\n", HELLO_WORLD);
return 0;
}
#define HELLO_WORLD "Hello, World!"
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=redefined-outer-name
import multiprocessing
import os
import grpc
import pytest
from buildgrid.client.cas import upload
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from ..utils.cas import serve_cas, kill_process_tree
INTANCES = ['', 'instance']
BLOBS = [(b'',), (b'test-string',), (b'test', b'string')]
MESSAGES = [
(remote_execution_pb2.Directory(),),
(remote_execution_pb2.SymlinkNode(name='name', target='target'),),
(remote_execution_pb2.Action(do_not_cache=True),
remote_execution_pb2.ActionResult(exit_code=12))
]
DATA_DIR = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'data')
FILES = [
(os.path.join(DATA_DIR, 'void'),),
(os.path.join(DATA_DIR, 'hello.cc'),),
(os.path.join(DATA_DIR, 'hello', 'hello.c'),
os.path.join(DATA_DIR, 'hello', 'hello.h'))]
DIRECTORIES = [
(remote_execution_pb2.Directory(),),
(remote_execution_pb2.Directory(
files=[remote_execution_pb2.FileNode(name='helloc.c'),
remote_execution_pb2.FileNode(name='helloc.h')]),)]
def run_in_subprocess(function, *arguments):
queue = multiprocessing.Queue()
# Use subprocess to avoid creation of gRPC threads in main process
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md
process = multiprocessing.Process(target=function,
args=(queue, *arguments))
try:
process.start()
result = queue.get()
process.join()
except KeyboardInterrupt:
kill_process_tree(process.pid)
raise
return result
@pytest.mark.parametrize('blobs', BLOBS)
@pytest.mark.parametrize('instance', INTANCES)
def test_blob_upload(instance, blobs):
# Actual test function, to be run in a subprocess:
def __test_blob_upload(queue, remote, instance, blobs):
# Open a channel to the remote CAS server:
channel = grpc.insecure_channel(remote)
digests = list()
with upload(channel, instance) as client:
if len(blobs) > 1:
for blob in blobs:
digest = client.put_blob(blob, queue=True)
digests.append(digest.SerializeToString())
else:
digest = client.put_blob(blobs[0], queue=False)
digests.append(digest.SerializeToString())
queue.put(digests)
# Start a minimal CAS server in a subprocess:
with serve_cas([instance]) as server:
digests = run_in_subprocess(__test_blob_upload,
server.remote, instance, blobs)
for blob, digest_blob in zip(blobs, digests):
digest = remote_execution_pb2.Digest()
digest.ParseFromString(digest_blob)
assert server.has(digest)
assert server.compare_blobs(digest, blob)
@pytest.mark.parametrize('messages', MESSAGES)
@pytest.mark.parametrize('instance', INTANCES)
def test_message_upload(instance, messages):
# Actual test function, to be run in a subprocess:
def __test_message_upload(queue, remote, instance, messages):
# Open a channel to the remote CAS server:
channel = grpc.insecure_channel(remote)
digests = list()
with upload(channel, instance) as client:
if len(messages) > 1:
for message in messages:
digest = client.put_message(message, queue=True)
digests.append(digest.SerializeToString())
else:
digest = client.put_message(messages[0], queue=False)
digests.append(digest.SerializeToString())
queue.put(digests)
# Start a minimal CAS server in a subprocess:
with serve_cas([instance]) as server:
digests = run_in_subprocess(__test_message_upload,
server.remote, instance, messages)
for message, digest_blob in zip(messages, digests):
digest = remote_execution_pb2.Digest()
digest.ParseFromString(digest_blob)
assert server.has(digest)
assert server.compare_messages(digest, message)
@pytest.mark.parametrize('file_paths', FILES)
@pytest.mark.parametrize('instance', INTANCES)
def test_file_upload(instance, file_paths):
# Actual test function, to be run in a subprocess:
def __test_file_upload(queue, remote, instance, file_paths):
# Open a channel to the remote CAS server:
channel = grpc.insecure_channel(remote)
digests = list()
with upload(channel, instance) as client:
if len(file_paths) > 1:
for file_path in file_paths:
digest = client.upload_file(file_path, queue=True)
digests.append(digest.SerializeToString())
else:
digest = client.upload_file(file_paths[0], queue=False)
digests.append(digest.SerializeToString())
queue.put(digests)
# Start a minimal CAS server in a subprocess:
with serve_cas([instance]) as server:
digests = run_in_subprocess(__test_file_upload,
server.remote, instance, file_paths)
for file_path, digest_blob in zip(file_paths, digests):
digest = remote_execution_pb2.Digest()
digest.ParseFromString(digest_blob)
assert server.has(digest)
assert server.compare_files(digest, file_path)
@pytest.mark.parametrize('directories', DIRECTORIES)
@pytest.mark.parametrize('instance', INTANCES)
def test_directory_upload(instance, directories):
# Actual test function, to be run in a subprocess:
def __test_directory_upload(queue, remote, instance, directories):
# Open a channel to the remote CAS server:
channel = grpc.insecure_channel(remote)
digests = list()
with upload(channel, instance) as client:
if len(directories) > 1:
for directory in directories:
digest = client.upload_directory(directory, queue=True)
digests.append(digest.SerializeToString())
else:
digest = client.upload_directory(directories[0], queue=False)
digests.append(digest.SerializeToString())
queue.put(digests)
# Start a minimal CAS server in a subprocess:
with serve_cas([instance]) as server:
digests = run_in_subprocess(__test_directory_upload,
server.remote, instance, directories)
for directory, digest_blob in zip(directories, digests):
digest = remote_execution_pb2.Digest()
digest.ParseFromString(digest_blob)
assert server.has(digest)
assert server.compare_messages(digest, directory)
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from concurrent import futures
from contextlib import contextmanager
import multiprocessing
import os
import signal
import tempfile
import grpc
import psutil
import pytest_cov
from buildgrid.server.cas.service import ByteStreamService
from buildgrid.server.cas.service import ContentAddressableStorageService
from buildgrid.server.cas.instance import ByteStreamInstance
from buildgrid.server.cas.instance import ContentAddressableStorageInstance
from buildgrid.server.cas.storage.disk import DiskStorage
@contextmanager
def serve_cas(instances):
server = Server(instances)
try:
yield server
finally:
server.quit()
def kill_process_tree(pid):
proc = psutil.Process(pid)
children = proc.children(recursive=True)
def kill_proc(p):
try:
p.kill()
except psutil.AccessDenied:
# Ignore this error, it can happen with
# some setuid bwrap processes.
pass
# Bloody Murder
for child in children:
kill_proc(child)
kill_proc(proc)
class Server:
def __init__(self, instances):
self.instances = instances
self.__storage_path = tempfile.TemporaryDirectory()
self.__storage = DiskStorage(self.__storage_path.name)
self.__queue = multiprocessing.Queue()
self.__process = multiprocessing.Process(
target=Server.serve,
args=(self.__queue, self.instances, self.__storage_path.name))
self.__process.start()
self.port = self.__queue.get()
self.remote = 'localhost:{}'.format(self.port)
@classmethod
def serve(cls, queue, instances, storage_path):
pytest_cov.embed.cleanup_on_sigterm()
bs_instances, cas_instances = dict(), dict()
for name in instances:
storage = DiskStorage(storage_path)
bs_instances[name] = ByteStreamInstance(storage)
cas_instances[name] = ContentAddressableStorageInstance(storage)
# Use max_workers default from Python 3.5+
max_workers = (os.cpu_count() or 1) * 5
server = grpc.server(futures.ThreadPoolExecutor(max_workers))
port = server.add_insecure_port('localhost:0')
ByteStreamService(server, bs_instances)
ContentAddressableStorageService(server, cas_instances)
server.start()
queue.put(port)
signal.pause()
def has(self, digest):
return self.__storage.has_blob(digest)
def compare_blobs(self, digest, blob):
if not self.__storage.has_blob(digest):
return False
stored_blob = self.__storage.get_blob(digest)
stored_blob = stored_blob.read()
return blob == stored_blob
def compare_messages(self, digest, message):
if not self.__storage.has_blob(digest):
return False
message_blob = message.SerializeToString()
stored_blob = self.__storage.get_blob(digest)
stored_blob = stored_blob.read()
return message_blob == stored_blob
def compare_files(self, digest, file_path):
if not self.__storage.has_blob(digest):
return False
with open(file_path, 'rb') as file_bytes:
file_blob = file_bytes.read()
stored_blob = self.__storage.get_blob(digest)
stored_blob = stored_blob.read()
return file_blob == stored_blob
def quit(self):
if self.__process:
self.__process.terminate()
self.__process.join()
self.__storage_path.cleanup()