Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • coverity
  • master
  • sminskyprimu/blake3
  • sotk/consolidate-leases-and-jobs/p2-write-only-leases
  • sotk/logstream-testing
  • zchen723/skip-scheduler-metrics
  • 0.0.10
  • 0.0.11
  • 0.0.12
  • 0.0.13
  • 0.0.14
  • 0.0.16
  • 0.0.17
  • 0.0.19
  • 0.0.2
  • 0.0.20
  • 0.0.21
  • 0.0.23
  • 0.0.25
  • 0.0.26
  • 0.0.27
  • 0.0.28
  • 0.0.29
  • 0.0.3
  • 0.0.30
  • 0.0.31
  • 0.0.32
  • 0.0.33
  • 0.0.34
  • 0.0.35
  • 0.0.36
  • 0.0.37
  • 0.0.38
  • 0.0.39
  • 0.0.4
  • 0.0.40
  • 0.0.41
  • 0.0.42
  • 0.0.43
  • 0.0.44
  • 0.0.45
  • 0.0.46
  • 0.0.47
  • 0.0.48
  • 0.0.49
  • 0.0.5
  • 0.0.50
  • 0.0.51
  • 0.0.52
  • 0.0.53
  • 0.0.54
  • 0.0.55
  • 0.0.56
  • 0.0.57
  • 0.0.58
  • 0.0.59
  • 0.0.6
  • 0.0.60
  • 0.0.61
  • 0.0.62
  • 0.0.63
  • 0.0.64
  • 0.0.65
  • 0.0.66
  • 0.0.67
  • 0.0.68
  • 0.0.69
  • 0.0.7
  • 0.0.70
  • 0.0.71
  • 0.0.72
  • 0.0.73
  • 0.0.74
  • 0.0.75
  • 0.0.76
  • 0.0.78
  • 0.0.79
  • 0.0.8
  • 0.0.80
  • 0.0.81
  • 0.0.82
  • 0.0.83
  • 0.0.84
  • 0.0.85
  • 0.0.86
  • 0.0.87
  • 0.0.88
  • 0.0.89
  • 0.0.9
  • 0.0.90
  • 0.0.91
  • 0.0.92
  • 0.0.93
  • 0.0.94
  • 0.0.95
  • 0.0.96
  • 0.0.97
  • 0.0.98
  • 0.1.0
  • 0.1.1
  • 0.1.10
  • 0.1.11
  • 0.1.12
  • 0.1.13
  • 0.1.14
  • 0.1.15
106 results

Target

Select target project
  • edbaunton/buildgrid
  • BuildGrid/buildgrid
  • bloomberg/buildgrid
  • devcurmudgeon/buildgrid
  • mhadjimichael/buildgrid
  • jmacarthur/buildgrid
  • rkothur/buildgrid
  • valentindavid/buildgrid
  • jjardon/buildgrid
  • RichKen/buildgrid
  • jbonney/buildgrid
  • onsha_alexander/buildgrid
  • santigl/buildgrid
  • mostynb/buildgrid
  • hoffbrinkle/buildgrid
  • Malinskiy/buildgrid
  • coldtom/buildgrid
  • azeemb_a/buildgrid
  • pointswaves/buildgrid
  • BenjaminSchubert/buildgrid
  • michaellee8/buildgrid
  • anil-anil/buildgrid
  • seanborg/buildgrid
  • jdelong12/buildgrid
  • jclay/buildgrid
  • bweston92/buildgrid
  • zchen723/buildgrid
  • cpratt34/buildgrid
  • armbiant/apache-buildgrid
  • armbiant/android-buildgrid
  • itsme300/buildgrid
  • sbairoliya/buildgrid
32 results
Select Git revision
  • coverity
  • master
  • sminskyprimu/blake3
  • sotk/consolidate-leases-and-jobs/p2-write-only-leases
  • sotk/logstream-testing
  • zchen723/skip-scheduler-metrics
  • 0.0.10
  • 0.0.11
  • 0.0.12
  • 0.0.13
  • 0.0.14
  • 0.0.16
  • 0.0.17
  • 0.0.19
  • 0.0.2
  • 0.0.20
  • 0.0.21
  • 0.0.23
  • 0.0.25
  • 0.0.26
  • 0.0.27
  • 0.0.28
  • 0.0.29
  • 0.0.3
  • 0.0.30
  • 0.0.31
  • 0.0.32
  • 0.0.33
  • 0.0.34
  • 0.0.35
  • 0.0.36
  • 0.0.37
  • 0.0.38
  • 0.0.39
  • 0.0.4
  • 0.0.40
  • 0.0.41
  • 0.0.42
  • 0.0.43
  • 0.0.44
  • 0.0.45
  • 0.0.46
  • 0.0.47
  • 0.0.48
  • 0.0.49
  • 0.0.5
  • 0.0.50
  • 0.0.51
  • 0.0.52
  • 0.0.53
  • 0.0.54
  • 0.0.55
  • 0.0.56
  • 0.0.57
  • 0.0.58
  • 0.0.59
  • 0.0.6
  • 0.0.60
  • 0.0.61
  • 0.0.62
  • 0.0.63
  • 0.0.64
  • 0.0.65
  • 0.0.66
  • 0.0.67
  • 0.0.68
  • 0.0.69
  • 0.0.7
  • 0.0.70
  • 0.0.71
  • 0.0.72
  • 0.0.73
  • 0.0.74
  • 0.0.75
  • 0.0.76
  • 0.0.78
  • 0.0.79
  • 0.0.8
  • 0.0.80
  • 0.0.81
  • 0.0.82
  • 0.0.83
  • 0.0.84
  • 0.0.85
  • 0.0.86
  • 0.0.87
  • 0.0.88
  • 0.0.89
  • 0.0.9
  • 0.0.90
  • 0.0.91
  • 0.0.92
  • 0.0.93
  • 0.0.94
  • 0.0.95
  • 0.0.96
  • 0.0.97
  • 0.0.98
  • 0.1.0
  • 0.1.1
  • 0.1.10
  • 0.1.11
  • 0.1.12
  • 0.1.13
  • 0.1.14
  • 0.1.15
106 results
Show changes
Commits on Source (10)
......@@ -83,15 +83,21 @@ def work_temp_directory(context, lease):
action_result = remote_execution_pb2.ActionResult()
with upload(context.cas_channel, instance=instance_name) as cas:
with upload(context.cas_channel, instance=instance_name) as uploader:
output_files, output_directories = [], []
for output_path in command.output_files:
file_path = os.path.join(working_directory, output_path)
# Missing outputs should simply be omitted in ActionResult:
if not os.path.isfile(file_path):
continue
output_file = output_file_maker(file_path, working_directory, cas=cas)
action_result.output_files.extend([output_file])
file_digest = uploader.upload_file(file_path, queue=True)
output_file = output_file_maker(file_path, working_directory,
file_digest)
output_files.append(output_file)
action_result.output_files.extend(output_files)
for output_path in command.output_directories:
directory_path = os.path.join(working_directory, output_path)
......@@ -99,10 +105,12 @@ def work_temp_directory(context, lease):
if not os.path.isdir(directory_path):
continue
# OutputDirectory.path should be relative to the working direcory:
output_directory = output_directory_maker(directory_path, working_directory, cas=cas)
tree_digest = uploader.upload_tree(directory_path, queue=True)
output_directory = output_directory_maker(directory_path, working_directory,
tree_digest)
output_directories.append(output_directory)
action_result.output_directories.extend([output_directory])
action_result.output_directories.extend(output_directories)
action_result_any = any_pb2.Any()
action_result_any.Pack(action_result)
......
......@@ -21,14 +21,15 @@ Request work to be executed and monitor status of jobs.
"""
import logging
import os
import sys
from urllib.parse import urlparse
import click
import grpc
from buildgrid.utils import merkle_maker, create_digest
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid.client.cas import upload
from buildgrid.utils import merkle_tree_maker
from ..cli import pass_context
......@@ -66,41 +67,48 @@ def cli(context, remote, instance_name, client_key, client_cert, server_cert):
@cli.command('upload-files', short_help="Upload files to the CAS server.")
@click.argument('files', nargs=-1, type=click.File('rb'), required=True)
@click.argument('files', nargs=-1, type=click.Path(exists=True, dir_okay=False), required=True)
@pass_context
def upload_files(context, files):
stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
sent_digests, files_map = [], {}
with upload(context.channel, instance=context.instance_name) as uploader:
for file_path in files:
context.logger.debug("Queueing {}".format(file_path))
requests = []
for file in files:
chunk = file.read()
requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
digest=create_digest(chunk), data=chunk))
file_digest = uploader.upload_file(file_path, queue=True)
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=context.instance_name,
requests=requests)
files_map[file_digest.hash] = file_path
sent_digests.append(file_digest)
context.logger.info("Sending: {}".format(request))
response = stub.BatchUpdateBlobs(request)
context.logger.info("Response: {}".format(response))
for file_digest in sent_digests:
file_path = files_map[file_digest.hash]
if os.path.isabs(file_path):
file_path = os.path.relpath(file_path)
if file_digest.ByteSize():
print("[OK] {}: {}".format(file_path, file_digest.hash))
else:
print("[KO] {}!".format(file_path))
@cli.command('upload-dir', short_help="Upload a directory to the CAS server.")
@click.argument('directory', nargs=1, type=click.Path(), required=True)
@click.argument('directory', nargs=1, type=click.Path(exists=True, file_okay=False), required=True)
@pass_context
def upload_dir(context, directory):
context.logger.info("Uploading directory to cas")
stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
requests = []
for chunk, file_digest in merkle_maker(directory):
requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
digest=file_digest, data=chunk))
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=context.instance_name,
requests=requests)
context.logger.info("Request:\n{}".format(request))
response = stub.BatchUpdateBlobs(request)
context.logger.info("Response:\n{}".format(response))
sent_digests, nodes_map = [], {}
with upload(context.channel, instance=context.instance_name) as uploader:
for node, blob, path in merkle_tree_maker(directory):
context.logger.debug("Queueing {}".format(path))
node_digest = uploader.put_blob(blob, digest=node.digest, queue=True)
nodes_map[node.digest.hash] = path
sent_digests.append(node_digest)
for node_digest in sent_digests:
node_path = nodes_map[node_digest.hash]
if os.path.isabs(node_path):
node_path = os.path.relpath(node_path, start=directory)
if node_digest.ByteSize():
print("[OK] {}: {}".format(node_path, node_digest.hash))
else:
print("[KO] {}!".format(node_path))
......@@ -30,9 +30,10 @@ from urllib.parse import urlparse
import click
import grpc
from buildgrid.utils import merkle_maker, create_digest, write_fetch_blob
from buildgrid.client.cas import upload
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
from buildgrid.utils import write_fetch_blob
from ..cli import pass_context
......@@ -85,7 +86,7 @@ def request_dummy(context, number, wait_for_completion):
action_digest=action_digest,
skip_cache_lookup=True)
responses = list()
responses = []
for _ in range(0, number):
responses.append(stub.Execute(request))
......@@ -105,46 +106,37 @@ def request_dummy(context, number, wait_for_completion):
@click.argument('input-root', nargs=1, type=click.Path(), required=True)
@click.argument('commands', nargs=-1, type=click.STRING, required=True)
@pass_context
def command(context, input_root, commands, output_file, output_directory):
def run_command(context, input_root, commands, output_file, output_directory):
stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
execute_command = remote_execution_pb2.Command()
for arg in commands:
execute_command.arguments.extend([arg])
output_executeables = []
for file, is_executeable in output_file:
execute_command.output_files.extend([file])
if is_executeable:
output_executeables.append(file)
with upload(context.channel, instance=context.instance_name) as uploader:
command = remote_execution_pb2.Command()
command_digest = create_digest(execute_command.SerializeToString())
context.logger.info(command_digest)
for arg in commands:
command.arguments.extend([arg])
# TODO: Check for missing blobs
digest = None
for _, digest in merkle_maker(input_root):
pass
for file, is_executeable in output_file:
command.output_files.extend([file])
if is_executeable:
output_executeables.append(file)
action = remote_execution_pb2.Action(command_digest=command_digest,
input_root_digest=digest,
do_not_cache=True)
command_digest = uploader.put_message(command, queue=True)
action_digest = create_digest(action.SerializeToString())
context.logger.info('Sent command: {}'.format(command_digest))
context.logger.info("Sending execution request...")
# TODO: Check for missing blobs
input_root_digest = uploader.upload_directory(input_root)
context.logger.info('Sent input: {}'.format(input_root_digest))
requests = []
requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
digest=command_digest, data=execute_command.SerializeToString()))
action = remote_execution_pb2.Action(command_digest=command_digest,
input_root_digest=input_root_digest,
do_not_cache=True)
requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
digest=action_digest, data=action.SerializeToString()))
action_digest = uploader.put_message(action, queue=True)
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=context.instance_name,
requests=requests)
remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel).BatchUpdateBlobs(request)
context.logger.info("Sent action: {}".format(action_digest))
request = remote_execution_pb2.ExecuteRequest(instance_name=context.instance_name,
action_digest=action_digest,
......
......@@ -17,13 +17,35 @@ from contextlib import contextmanager
import uuid
import os
from buildgrid.settings import HASH
import grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
from buildgrid._protos.google.rpc import code_pb2
from buildgrid.settings import HASH
from buildgrid.utils import merkle_tree_maker
class _CallCache:
"""Per remote grpc.StatusCode.UNIMPLEMENTED call cache."""
__calls = {}
@classmethod
def mark_unimplemented(cls, channel, name):
if channel not in cls.__calls:
cls.__calls[channel] = set()
cls.__calls[channel].add(name)
@classmethod
def unimplemented(cls, channel, name):
if channel not in cls.__calls:
return False
return name in cls.__calls[channel]
@contextmanager
def upload(channel, instance=None, u_uid=None):
"""Context manager generator for the :class:`Uploader` class."""
uploader = Uploader(channel, instance=instance, u_uid=u_uid)
try:
yield uploader
......@@ -37,8 +59,10 @@ class Uploader:
The :class:`Uploader` class comes with a generator factory function that can
be used together with the `with` statement for context management::
with upload(channel, instance='build') as cas:
cas.upload_file('/path/to/local/file')
from buildgrid.client.cas import upload
with upload(channel, instance='build') as uploader:
uploader.upload_file('/path/to/local/file')
Attributes:
FILE_SIZE_THRESHOLD (int): maximum size for a queueable file.
......@@ -47,6 +71,7 @@ class Uploader:
FILE_SIZE_THRESHOLD = 1 * 1024 * 1024
MAX_REQUEST_SIZE = 2 * 1024 * 1024
MAX_REQUEST_COUNT = 500
def __init__(self, channel, instance=None, u_uid=None):
"""Initializes a new :class:`Uploader` instance.
......@@ -67,19 +92,72 @@ class Uploader:
self.__bytestream_stub = bytestream_pb2_grpc.ByteStreamStub(self.channel)
self.__cas_stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
self.__requests = dict()
self.__requests = {}
self.__request_count = 0
self.__request_size = 0
# --- Public API ---
def put_blob(self, blob, digest=None, queue=False):
"""Stores a blob into the remote CAS server.
If queuing is allowed (`queue=True`), the upload request **may** be
defer. An explicit call to :func:`~flush` can force the request to be
send immediately (along with the rest of the queued batch).
Args:
blob (bytes): the blob's data.
digest (:obj:`Digest`, optional): the blob's digest.
queue (bool, optional): whether or not the upload request may be
queued and submitted as part of a batch upload request. Defaults
to False.
Returns:
:obj:`Digest`: the sent blob's digest.
"""
if not queue or len(blob) > Uploader.FILE_SIZE_THRESHOLD:
blob_digest = self._send_blob(blob, digest=digest)
else:
blob_digest = self._queue_blob(blob, digest=digest)
return blob_digest
def put_message(self, message, digest=None, queue=False):
"""Stores a message into the remote CAS server.
If queuing is allowed (`queue=True`), the upload request **may** be
defer. An explicit call to :func:`~flush` can force the request to be
send immediately (along with the rest of the queued batch).
Args:
message (:obj:`Message`): the message object.
digest (:obj:`Digest`, optional): the message's digest.
queue (bool, optional): whether or not the upload request may be
queued and submitted as part of a batch upload request. Defaults
to False.
Returns:
:obj:`Digest`: the sent message's digest.
"""
message_blob = message.SerializeToString()
if not queue or len(message_blob) > Uploader.FILE_SIZE_THRESHOLD:
message_digest = self._send_blob(message_blob, digest=digest)
else:
message_digest = self._queue_blob(message_blob, digest=digest)
return message_digest
def upload_file(self, file_path, queue=True):
"""Stores a local file into the remote CAS storage.
If queuing is allowed (`queue=True`), the upload request **may** be
defer. An explicit call to :method:`flush` can force the request to be
defer. An explicit call to :func:`~flush` can force the request to be
send immediately (allong with the rest of the queued batch).
Args:
file_path (str): absolute or relative path to a local file.
queue (bool, optional): wheter or not the upload request may be
queue (bool, optional): whether or not the upload request may be
queued and submitted as part of a batch upload request. Defaults
to True.
......@@ -87,7 +165,8 @@ class Uploader:
:obj:`Digest`: The digest of the file's content.
Raises:
OSError: If `file_path` does not exist or is not readable.
FileNotFoundError: If `file_path` does not exist.
PermissionError: If `file_path` is not readable.
"""
if not os.path.isabs(file_path):
file_path = os.path.abspath(file_path)
......@@ -96,53 +175,112 @@ class Uploader:
file_bytes = bytes_steam.read()
if not queue or len(file_bytes) > Uploader.FILE_SIZE_THRESHOLD:
blob_digest = self._send_blob(file_bytes)
file_digest = self._send_blob(file_bytes)
else:
blob_digest = self._queue_blob(file_bytes)
file_digest = self._queue_blob(file_bytes)
return blob_digest
return file_digest
def upload_directory(self, directory, queue=True):
"""Stores a :obj:`Directory` into the remote CAS storage.
def upload_directory(self, directory_path, queue=True):
"""Stores a local folder into the remote CAS storage.
If queuing is allowed (`queue=True`), the upload request **may** be
defer. An explicit call to :method:`flush` can force the request to be
defer. An explicit call to :func:`~flush` can force the request to be
send immediately (allong with the rest of the queued batch).
Args:
directory (:obj:`Directory`): a :obj:`Directory` object.
queue (bool, optional): wheter or not the upload request may be
directory_path (str): absolute or relative path to a local folder.
queue (bool, optional): wheter or not the upload requests may be
queued and submitted as part of a batch upload request. Defaults
to True.
Returns:
:obj:`Digest`: The digest of the :obj:`Directory`.
:obj:`Digest`: The digest of the top :obj:`Directory`.
Raises:
FileNotFoundError: If `directory_path` does not exist.
PermissionError: If `directory_path` is not readable.
"""
if not isinstance(directory, remote_execution_pb2.Directory):
raise TypeError
if not os.path.isabs(directory_path):
directory_path = os.path.abspath(directory_path)
last_directory_node = None
if not queue:
return self._send_blob(directory.SerializeToString())
for node, blob, _ in merkle_tree_maker(directory_path):
if node.DESCRIPTOR is remote_execution_pb2.DirectoryNode.DESCRIPTOR:
last_directory_node = node
self._send_blob(blob, digest=node.digest)
else:
return self._queue_blob(directory.SerializeToString())
for node, blob, _ in merkle_tree_maker(directory_path):
if node.DESCRIPTOR is remote_execution_pb2.DirectoryNode.DESCRIPTOR:
last_directory_node = node
self._queue_blob(blob, digest=node.digest)
def put_message(self, message):
"""Stores a message into the remote CAS storage.
return last_directory_node.digest
Message is send immediately, upload is never be deferred.
def upload_tree(self, directory_path, queue=True):
"""Stores a local folder into the remote CAS storage as a :obj:`Tree`.
If queuing is allowed (`queue=True`), the upload request **may** be
defer. An explicit call to :func:`~flush` can force the request to be
send immediately (allong with the rest of the queued batch).
Args:
message (:obj:`Message`): a protobuf message object.
directory_path (str): absolute or relative path to a local folder.
queue (bool, optional): wheter or not the upload requests may be
queued and submitted as part of a batch upload request. Defaults
to True.
Returns:
:obj:`Digest`: The digest of the message.
:obj:`Digest`: The digest of the :obj:`Tree`.
Raises:
FileNotFoundError: If `directory_path` does not exist.
PermissionError: If `directory_path` is not readable.
"""
return self._send_blob(message.SerializeToString())
if not os.path.isabs(directory_path):
directory_path = os.path.abspath(directory_path)
directories = []
if not queue:
for node, blob, _ in merkle_tree_maker(directory_path):
if node.DESCRIPTOR is remote_execution_pb2.DirectoryNode.DESCRIPTOR:
# TODO: Get the Directory object from merkle_tree_maker():
directory = remote_execution_pb2.Directory()
directory.ParseFromString(blob)
directories.append(directory)
self._send_blob(blob, digest=node.digest)
else:
for node, blob, _ in merkle_tree_maker(directory_path):
if node.DESCRIPTOR is remote_execution_pb2.DirectoryNode.DESCRIPTOR:
# TODO: Get the Directory object from merkle_tree_maker():
directory = remote_execution_pb2.Directory()
directory.ParseFromString(blob)
directories.append(directory)
self._queue_blob(blob, digest=node.digest)
tree = remote_execution_pb2.Tree()
tree.root.CopyFrom(directories[-1])
tree.children.extend(directories[:-1])
return self.put_message(tree, queue=queue)
def flush(self):
"""Ensures any queued request gets sent."""
if self.__requests:
self._send_batch()
self._send_blob_batch(self.__requests)
self.__requests.clear()
self.__request_count = 0
self.__request_size = 0
def close(self):
"""Closes the underlying connection stubs.
......@@ -156,34 +294,16 @@ class Uploader:
self.__bytestream_stub = None
self.__cas_stub = None
def _queue_blob(self, blob):
"""Queues a memory block for later batch upload"""
blob_digest = remote_execution_pb2.Digest()
blob_digest.hash = HASH(blob).hexdigest()
blob_digest.size_bytes = len(blob)
if self.__request_size + len(blob) > Uploader.MAX_REQUEST_SIZE:
self._send_batch()
update_request = remote_execution_pb2.BatchUpdateBlobsRequest.Request()
update_request.digest.CopyFrom(blob_digest)
update_request.data = blob
update_request_size = update_request.ByteSize()
if self.__request_size + update_request_size > Uploader.MAX_REQUEST_SIZE:
self._send_batch()
self.__requests[update_request.digest.hash] = update_request
self.__request_size += update_request_size
# --- Private API ---
return blob_digest
def _send_blob(self, blob):
def _send_blob(self, blob, digest=None):
"""Sends a memory block using ByteStream.Write()"""
blob_digest = remote_execution_pb2.Digest()
blob_digest.hash = HASH(blob).hexdigest()
blob_digest.size_bytes = len(blob)
if digest is not None:
blob_digest.CopyFrom(digest)
else:
blob_digest.hash = HASH(blob).hexdigest()
blob_digest.size_bytes = len(blob)
if self.instance_name is not None:
resource_name = '/'.join([self.instance_name, 'uploads', self.u_uid, 'blobs',
blob_digest.hash, str(blob_digest.size_bytes)])
......@@ -218,18 +338,68 @@ class Uploader:
return blob_digest
def _send_batch(self):
def _queue_blob(self, blob, digest=None):
"""Queues a memory block for later batch upload"""
blob_digest = remote_execution_pb2.Digest()
if digest is not None:
blob_digest.CopyFrom(digest)
else:
blob_digest.hash = HASH(blob).hexdigest()
blob_digest.size_bytes = len(blob)
if self.__request_size + blob_digest.size_bytes > Uploader.MAX_REQUEST_SIZE:
self.flush()
elif self.__request_count >= Uploader.MAX_REQUEST_COUNT:
self.flush()
self.__requests[blob_digest.hash] = (blob, blob_digest)
self.__request_count += 1
self.__request_size += blob_digest.size_bytes
return blob_digest
def _send_blob_batch(self, batch):
"""Sends queued data using ContentAddressableStorage.BatchUpdateBlobs()"""
batch_request = remote_execution_pb2.BatchUpdateBlobsRequest()
batch_request.requests.extend(self.__requests.values())
if self.instance_name is not None:
batch_request.instance_name = self.instance_name
batch_fetched = False
written_digests = []
batch_response = self.__cas_stub.BatchUpdateBlobs(batch_request)
# First, try BatchUpdateBlobs(), if not already known not being implemented:
if not _CallCache.unimplemented(self.channel, 'BatchUpdateBlobs'):
batch_request = remote_execution_pb2.BatchUpdateBlobsRequest()
if self.instance_name is not None:
batch_request.instance_name = self.instance_name
for response in batch_response.responses:
assert response.digest.hash in self.__requests
assert response.status.code is 0
for blob, digest in batch.values():
request = batch_request.requests.add()
request.digest.CopyFrom(digest)
request.data = blob
self.__requests.clear()
self.__request_size = 0
try:
batch_response = self.__cas_stub.BatchUpdateBlobs(batch_request)
for response in batch_response.responses:
assert response.digest.hash in batch
written_digests.append(response.digest)
if response.status.code != code_pb2.OK:
response.digest.Clear()
batch_fetched = True
except grpc.RpcError as e:
status_code = e.code()
if status_code == grpc.StatusCode.UNIMPLEMENTED:
_CallCache.mark_unimplemented(self.channel, 'BatchUpdateBlobs')
elif status_code == grpc.StatusCode.INVALID_ARGUMENT:
written_digests.clear()
batch_fetched = False
else:
assert False
# Fallback to Write() if no BatchUpdateBlobs():
if not batch_fetched:
for blob, digest in batch.values():
written_digests.append(self._send_blob(blob, digest=digest))
return written_digests
......@@ -25,9 +25,13 @@ import logging
import grpc
from buildgrid.utils import gen_fetch_blob, gen_write_request_blob
from buildgrid.client.cas import upload
from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid._protos.google.rpc import code_pb2
from buildgrid._protos.google.rpc import status_pb2
from buildgrid.utils import gen_fetch_blob
from buildgrid.settings import HASH
from .storage_abc import StorageABC
......@@ -36,7 +40,10 @@ class RemoteStorage(StorageABC):
def __init__(self, channel, instance_name):
self.logger = logging.getLogger(__name__)
self._instance_name = instance_name
self.instance_name = instance_name
self.channel = channel
self._stub_bs = bytestream_pb2_grpc.ByteStreamStub(channel)
self._stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(channel)
......@@ -50,16 +57,12 @@ class RemoteStorage(StorageABC):
fetched_data = io.BytesIO()
length = 0
for data in gen_fetch_blob(self._stub_bs, digest, self._instance_name):
for data in gen_fetch_blob(self._stub_bs, digest, self.instance_name):
length += fetched_data.write(data)
if length:
assert digest.size_bytes == length
fetched_data.seek(0)
return fetched_data
else:
return None
assert digest.size_bytes == length
fetched_data.seek(0)
return fetched_data
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.NOT_FOUND:
......@@ -71,16 +74,14 @@ class RemoteStorage(StorageABC):
return None
def begin_write(self, digest):
return io.BytesIO(digest.SerializeToString())
return io.BytesIO()
def commit_write(self, digest, write_session):
write_session.seek(0)
for request in gen_write_request_blob(write_session, digest, self._instance_name):
self._stub_bs.Write(request)
with upload(self.channel, instance=self.instance_name) as uploader:
uploader.put_blob(write_session.getvalue())
def missing_blobs(self, blobs):
request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=self._instance_name)
request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=self.instance_name)
for blob in blobs:
request_digest = request.blob_digests.add()
......@@ -92,19 +93,15 @@ class RemoteStorage(StorageABC):
return [x for x in response.missing_blob_digests]
def bulk_update_blobs(self, blobs):
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=self._instance_name)
for digest, data in blobs:
reqs = request.requests.add()
reqs.digest.CopyFrom(digest)
reqs.data = data
response = self._stub_cas.BatchUpdateBlobs(request)
responses = response.responses
# Check everything was sent back, even if order changed
assert ([x.digest for x in request.requests].sort(key=lambda x: x.hash)) == \
([x.digest for x in responses].sort(key=lambda x: x.hash))
return [x.status for x in responses]
sent_digests = []
with upload(self.channel, instance=self.instance_name) as uploader:
for digest, blob in blobs:
if len(blob) != digest.size_bytes or HASH(blob).hexdigest() != digest.hash:
sent_digests.append(remote_execution_pb2.Digest())
else:
sent_digests.append(uploader.put_blob(blob, digest=digest, queue=True))
assert len(sent_digests) == len(blobs)
return [status_pb2.Status(code=code_pb2.OK) if d.ByteSize() > 0
else status_pb2.Status(code=code_pb2.UNKNOWN) for d in sent_digests]
......@@ -15,7 +15,6 @@
from operator import attrgetter
import os
import uuid
from buildgrid.settings import HASH
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
......@@ -34,32 +33,6 @@ def gen_fetch_blob(stub, digest, instance_name=""):
yield response.data
def gen_write_request_blob(digest_bytes, digest, instance_name=""):
""" Generates a bytestream write request
"""
resource_name = os.path.join(instance_name, 'uploads', str(uuid.uuid4()),
'blobs', digest.hash, str(digest.size_bytes))
offset = 0
finished = False
remaining = digest.size_bytes
while not finished:
chunk_size = min(remaining, 64 * 1024)
remaining -= chunk_size
finished = remaining <= 0
request = bytestream_pb2.WriteRequest()
request.resource_name = resource_name
request.write_offset = offset
request.data = digest_bytes.read(chunk_size)
request.finish_write = finished
yield request
offset += chunk_size
def write_fetch_directory(root_directory, stub, digest, instance_name=None):
"""Locally replicates a directory from CAS.
......@@ -137,252 +110,170 @@ def create_digest(bytes_to_digest):
bytes_to_digest (bytes): byte data to digest.
Returns:
:obj:`Digest`: The gRPC :obj:`Digest` for the given byte data.
:obj:`Digest`: The :obj:`Digest` for the given byte data.
"""
return remote_execution_pb2.Digest(hash=HASH(bytes_to_digest).hexdigest(),
size_bytes=len(bytes_to_digest))
def merkle_maker(directory):
""" Walks thorugh given directory, yielding the binary and digest
"""
directory_pb2 = remote_execution_pb2.Directory()
for (dir_path, dir_names, file_names) in os.walk(directory):
for file_name in file_names:
file_path = os.path.join(dir_path, file_name)
chunk = read_file(file_path)
file_digest = create_digest(chunk)
directory_pb2.files.extend([file_maker(file_path, file_digest)])
yield chunk, file_digest
for inner_dir in dir_names:
inner_dir_path = os.path.join(dir_path, inner_dir)
yield from merkle_maker(inner_dir_path)
directory_string = directory_pb2.SerializeToString()
def read_file(file_path):
"""Loads raw file content in memory.
yield directory_string, create_digest(directory_string)
Args:
file_path (str): path to the target file.
Returns:
bytes: Raw file's content until EOF.
def file_maker(file_path, file_digest):
""" Creates a File Node
Raises:
OSError: If `file_path` does not exist or is not readable.
"""
_, file_name = os.path.split(file_path)
return remote_execution_pb2.FileNode(name=file_name,
digest=file_digest,
is_executable=os.access(file_path, os.X_OK))
with open(file_path, 'rb') as byte_file:
return byte_file.read()
def directory_maker(directory_path, child_directories=None, cas=None, upload_directories=True):
"""Creates a :obj:`Directory` from a local directory and possibly upload it.
def write_file(file_path, content):
"""Dumps raw memory content to a file.
Args:
directory_path (str): absolute or relative path to a local directory.
child_directories (list): output list of of children :obj:`Directory`
objects.
cas (:obj:`Uploader`): a CAS client uploader.
upload_directories (bool): wheter or not to upload the :obj:`Directory`
objects along with the files.
file_path (str): path to the target file.
content (bytes): raw file's content.
Returns:
:obj:`Directory`, :obj:`Digest`: Tuple of a new gRPC :obj:`Directory`
for the local directory pointed by `directory_path` and the digest
for that object.
Raises:
OSError: If `file_path` does not exist or is not writable.
"""
if not os.path.isabs(directory_path):
directory_path = os.path.abspath(directory_path)
files, directories, symlinks = list(), list(), list()
for directory_entry in os.scandir(directory_path):
# Create a FileNode and corresponding BatchUpdateBlobsRequest:
if directory_entry.is_file(follow_symlinks=False):
if cas is not None:
node_digest = cas.upload_file(directory_entry.path)
else:
node_digest = create_digest(read_file(directory_entry.path))
node = remote_execution_pb2.FileNode()
node.name = directory_entry.name
node.digest.CopyFrom(node_digest)
node.is_executable = os.access(directory_entry.path, os.X_OK)
files.append(node)
# Create a DirectoryNode and corresponding BatchUpdateBlobsRequest:
elif directory_entry.is_dir(follow_symlinks=False):
_, node_digest = directory_maker(directory_entry.path,
child_directories=child_directories,
upload_directories=upload_directories,
cas=cas)
node = remote_execution_pb2.DirectoryNode()
node.name = directory_entry.name
node.digest.CopyFrom(node_digest)
directories.append(node)
# Create a SymlinkNode if necessary;
elif os.path.islink(directory_entry.path):
node_target = os.readlink(directory_entry.path)
with open(file_path, 'wb') as byte_file:
byte_file.write(content)
byte_file.flush()
node = remote_execution_pb2.SymlinkNode()
node.name = directory_entry.name
node.target = node_target
symlinks.append(node)
def merkle_tree_maker(directory_path):
"""Walks a local folder tree, generating :obj:`FileNode` and
:obj:`DirectoryNode`.
files.sort(key=attrgetter('name'))
directories.sort(key=attrgetter('name'))
symlinks.sort(key=attrgetter('name'))
Args:
directory_path (str): absolute or relative path to a local directory.
directory = remote_execution_pb2.Directory()
directory.files.extend(files)
directory.directories.extend(directories)
directory.symlinks.extend(symlinks)
Yields:
:obj:`Message`, bytes, str: a tutple of either a :obj:`FileNode` or
:obj:`DirectoryNode` message, the corresponding blob and the
corresponding node path.
"""
directory_name = os.path.basename(directory_path)
if child_directories is not None:
child_directories.append(directory)
# Actual generator, yields recursively FileNodes and DirectoryNodes:
def __merkle_tree_maker(directory_path, directory_name):
if not os.path.isabs(directory_path):
directory_path = os.path.abspath(directory_path)
if cas is not None and upload_directories:
directory_digest = cas.upload_directory(directory)
else:
directory_digest = create_digest(directory.SerializeToString())
directory = remote_execution_pb2.Directory()
return directory, directory_digest
files, directories, symlinks = [], [], []
for directory_entry in os.scandir(directory_path):
node_name, node_path = directory_entry.name, directory_entry.path
if directory_entry.is_file(follow_symlinks=False):
node_blob = read_file(directory_entry.path)
node_digest = create_digest(node_blob)
def tree_maker(directory_path, cas=None):
"""Creates a :obj:`Tree` from a local directory and possibly upload it.
node = remote_execution_pb2.FileNode()
node.name = node_name
node.digest.CopyFrom(node_digest)
node.is_executable = os.access(node_path, os.X_OK)
If `cas` is specified, the local directory content will be uploded/stored
in remote CAS (the :obj:`Tree` message won't).
files.append(node)
Args:
directory_path (str): absolute or relative path to a local directory.
cas (:obj:`Uploader`): a CAS client uploader.
yield node, node_blob, node_path
Returns:
:obj:`Tree`, :obj:`Digest`: Tuple of a new gRPC :obj:`Tree` for the
local directory pointed by `directory_path` and the digest for that
object.
"""
if not os.path.isabs(directory_path):
directory_path = os.path.abspath(directory_path)
elif directory_entry.is_dir(follow_symlinks=False):
node, node_blob, _ = yield from __merkle_tree_maker(node_path, node_name)
child_directories = list()
directory, _ = directory_maker(directory_path,
child_directories=child_directories,
upload_directories=False,
cas=cas)
directories.append(node)
tree = remote_execution_pb2.Tree()
tree.children.extend(child_directories)
tree.root.CopyFrom(directory)
yield node, node_blob, node_path
# Ensure that we've uploded the tree structure first
if cas is not None:
cas.flush()
tree_digest = cas.put_message(tree)
else:
tree_digest = create_digest(tree.SerializeToString())
# Create a SymlinkNode;
elif os.path.islink(directory_entry.path):
node_target = os.readlink(directory_entry.path)
return tree, tree_digest
node = remote_execution_pb2.SymlinkNode()
node.name = directory_entry.name
node.target = node_target
symlinks.append(node)
def read_file(file_path):
"""Loads raw file content in memory.
files.sort(key=attrgetter('name'))
directories.sort(key=attrgetter('name'))
symlinks.sort(key=attrgetter('name'))
Args:
file_path (str): path to the target file.
directory.files.extend(files)
directory.directories.extend(directories)
directory.symlinks.extend(symlinks)
Returns:
bytes: Raw file's content until EOF.
node_blob = directory.SerializeToString()
node_digest = create_digest(node_blob)
Raises:
OSError: If `file_path` does not exist or is not readable.
"""
with open(file_path, 'rb') as byte_file:
return byte_file.read()
node = remote_execution_pb2.DirectoryNode()
node.name = directory_name
node.digest.CopyFrom(node_digest)
return node, node_blob, directory_path
def write_file(file_path, content):
"""Dumps raw memory content to a file.
node, node_blob, node_path = yield from __merkle_tree_maker(directory_path,
directory_name)
Args:
file_path (str): path to the target file.
content (bytes): raw file's content.
Raises:
OSError: If `file_path` does not exist or is not writable.
"""
with open(file_path, 'wb') as byte_file:
byte_file.write(content)
byte_file.flush()
yield node, node_blob, node_path
def output_file_maker(file_path, input_path, cas=None):
def output_file_maker(file_path, input_path, file_digest):
"""Creates an :obj:`OutputFile` from a local file and possibly upload it.
If `cas` is specified, the local file will be uploded/stored in remote CAS
(the :obj:`OutputFile` message won't).
Note:
`file_path` **must** point inside or be relative to `input_path`.
Args:
file_path (str): absolute or relative path to a local file.
input_path (str): absolute or relative path to the input root directory.
cas (:obj:`Uploader`): a CAS client uploader.
file_digest (:obj:`Digest`): the underlying file's digest.
Returns:
:obj:`OutputFile`: a new gRPC :obj:`OutputFile` object for the file
pointed by `file_path`.
:obj:`OutputFile`: a new :obj:`OutputFile` object for the file pointed
by `file_path`.
"""
if not os.path.isabs(file_path):
file_path = os.path.abspath(file_path)
if not os.path.isabs(input_path):
input_path = os.path.abspath(input_path)
if cas is not None:
file_digest = cas.upload_file(file_path)
else:
file_digest = create_digest(read_file(file_path))
output_file = remote_execution_pb2.OutputFile()
output_file.digest.CopyFrom(file_digest)
# OutputFile.path should be relative to the working direcory:
# OutputFile.path should be relative to the working directory
output_file.path = os.path.relpath(file_path, start=input_path)
output_file.is_executable = os.access(file_path, os.X_OK)
return output_file
def output_directory_maker(directory_path, working_path, cas=None):
def output_directory_maker(directory_path, working_path, tree_digest):
"""Creates an :obj:`OutputDirectory` from a local directory.
If `cas` is specified, the local directory content will be uploded/stored
in remote CAS (the :obj:`OutputDirectory` message won't).
Note:
`directory_path` **must** point inside or be relative to `input_path`.
Args:
directory_path (str): absolute or relative path to a local directory.
working_path (str): absolute or relative path to the working directory.
cas (:obj:`Uploader`): a CAS client uploader.
tree_digest (:obj:`Digest`): the underlying folder tree's digest.
Returns:
:obj:`OutputDirectory`: a new gRPC :obj:`OutputDirectory` for the
directory pointed by `directory_path`.
:obj:`OutputDirectory`: a new :obj:`OutputDirectory` for the directory
pointed by `directory_path`.
"""
if not os.path.isabs(directory_path):
directory_path = os.path.abspath(directory_path)
if not os.path.isabs(working_path):
working_path = os.path.abspath(working_path)
_, tree_digest = tree_maker(directory_path, cas=cas)
output_directory = remote_execution_pb2.OutputDirectory()
output_directory.tree_digest.CopyFrom(tree_digest)
output_directory.path = os.path.relpath(directory_path, start=working_path)
......
......@@ -89,6 +89,7 @@ tests_require = [
'coverage == 4.4.0',
'moto',
'pep8',
'psutil',
'pytest == 3.6.4',
'pytest-cov >= 2.6.0',
'pytest-pep8',
......
#include <iostream>
int main()
{
std::cout << "Hello, World!" << std::endl;
return 0;
}
#include <stdio.h>
#include "hello.h"
int main()
{
printf("%s\n", HELLO_WORLD);
return 0;
}
#define HELLO_WORLD "Hello, World!"
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=redefined-outer-name
import os
import grpc
import pytest
from buildgrid.client.cas import upload
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid.utils import create_digest
from ..utils.cas import serve_cas, run_in_subprocess
INTANCES = ['', 'instance']
BLOBS = [(b'',), (b'test-string',), (b'test', b'string')]
MESSAGES = [
(remote_execution_pb2.Directory(),),
(remote_execution_pb2.SymlinkNode(name='name', target='target'),),
(remote_execution_pb2.Action(do_not_cache=True),
remote_execution_pb2.ActionResult(exit_code=12))
]
DATA_DIR = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'data')
FILES = [
(os.path.join(DATA_DIR, 'void'),),
(os.path.join(DATA_DIR, 'hello.cc'),),
(os.path.join(DATA_DIR, 'hello', 'hello.c'),
os.path.join(DATA_DIR, 'hello', 'hello.h'))]
DIRECTORIES = [
(os.path.join(DATA_DIR, 'hello'),),
(os.path.join(DATA_DIR, 'hello'), DATA_DIR)]
@pytest.mark.parametrize('blobs', BLOBS)
@pytest.mark.parametrize('instance', INTANCES)
def test_upload_blob(instance, blobs):
# Actual test function, to be run in a subprocess:
def __test_upload_blob(queue, remote, instance, blobs):
# Open a channel to the remote CAS server:
channel = grpc.insecure_channel(remote)
digests = []
with upload(channel, instance) as uploader:
if len(blobs) > 1:
for blob in blobs:
digest = uploader.put_blob(blob, queue=True)
digests.append(digest.SerializeToString())
else:
digest = uploader.put_blob(blobs[0], queue=False)
digests.append(digest.SerializeToString())
queue.put(digests)
# Start a minimal CAS server in a subprocess:
with serve_cas([instance]) as server:
digests = run_in_subprocess(__test_upload_blob,
server.remote, instance, blobs)
for blob, digest_blob in zip(blobs, digests):
digest = remote_execution_pb2.Digest()
digest.ParseFromString(digest_blob)
assert server.has(digest)
assert server.compare_blobs(digest, blob)
@pytest.mark.parametrize('messages', MESSAGES)
@pytest.mark.parametrize('instance', INTANCES)
def test_upload_message(instance, messages):
# Actual test function, to be run in a subprocess:
def __test_upload_message(queue, remote, instance, messages):
# Open a channel to the remote CAS server:
channel = grpc.insecure_channel(remote)
digests = []
with upload(channel, instance) as uploader:
if len(messages) > 1:
for message in messages:
digest = uploader.put_message(message, queue=True)
digests.append(digest.SerializeToString())
else:
digest = uploader.put_message(messages[0], queue=False)
digests.append(digest.SerializeToString())
queue.put(digests)
# Start a minimal CAS server in a subprocess:
with serve_cas([instance]) as server:
digests = run_in_subprocess(__test_upload_message,
server.remote, instance, messages)
for message, digest_blob in zip(messages, digests):
digest = remote_execution_pb2.Digest()
digest.ParseFromString(digest_blob)
assert server.has(digest)
assert server.compare_messages(digest, message)
@pytest.mark.parametrize('file_paths', FILES)
@pytest.mark.parametrize('instance', INTANCES)
def test_upload_file(instance, file_paths):
# Actual test function, to be run in a subprocess:
def __test_upload_file(queue, remote, instance, file_paths):
# Open a channel to the remote CAS server:
channel = grpc.insecure_channel(remote)
digests = []
with upload(channel, instance) as uploader:
if len(file_paths) > 1:
for file_path in file_paths:
digest = uploader.upload_file(file_path, queue=True)
digests.append(digest.SerializeToString())
else:
digest = uploader.upload_file(file_paths[0], queue=False)
digests.append(digest.SerializeToString())
queue.put(digests)
# Start a minimal CAS server in a subprocess:
with serve_cas([instance]) as server:
digests = run_in_subprocess(__test_upload_file,
server.remote, instance, file_paths)
for file_path, digest_blob in zip(file_paths, digests):
digest = remote_execution_pb2.Digest()
digest.ParseFromString(digest_blob)
assert server.has(digest)
assert server.compare_files(digest, file_path)
@pytest.mark.parametrize('directory_paths', DIRECTORIES)
@pytest.mark.parametrize('instance', INTANCES)
def test_upload_directory(instance, directory_paths):
# Actual test function, to be run in a subprocess:
def __test_upload_directory(queue, remote, instance, directory_paths):
# Open a channel to the remote CAS server:
channel = grpc.insecure_channel(remote)
digests = []
with upload(channel, instance) as uploader:
if len(directory_paths) > 1:
for directory_path in directory_paths:
digest = uploader.upload_directory(directory_path, queue=True)
digests.append(digest.SerializeToString())
else:
digest = uploader.upload_directory(directory_paths[0], queue=False)
digests.append(digest.SerializeToString())
queue.put(digests)
# Start a minimal CAS server in a subprocess:
with serve_cas([instance]) as server:
digests = run_in_subprocess(__test_upload_directory,
server.remote, instance, directory_paths)
for directory_path, digest_blob in zip(directory_paths, digests):
digest = remote_execution_pb2.Digest()
digest.ParseFromString(digest_blob)
assert server.compare_directories(digest, directory_path)
@pytest.mark.parametrize('directory_paths', DIRECTORIES)
@pytest.mark.parametrize('instance', INTANCES)
def test_upload_tree(instance, directory_paths):
# Actual test function, to be run in a subprocess:
def __test_upload_tree(queue, remote, instance, directory_paths):
# Open a channel to the remote CAS server:
channel = grpc.insecure_channel(remote)
digests = []
with upload(channel, instance) as uploader:
if len(directory_paths) > 1:
for directory_path in directory_paths:
digest = uploader.upload_tree(directory_path, queue=True)
digests.append(digest.SerializeToString())
else:
digest = uploader.upload_tree(directory_paths[0], queue=False)
digests.append(digest.SerializeToString())
queue.put(digests)
# Start a minimal CAS server in a subprocess:
with serve_cas([instance]) as server:
digests = run_in_subprocess(__test_upload_tree,
server.remote, instance, directory_paths)
for directory_path, digest_blob in zip(directory_paths, digests):
digest = remote_execution_pb2.Digest()
digest.ParseFromString(digest_blob)
assert server.has(digest)
tree = remote_execution_pb2.Tree()
tree.ParseFromString(server.get(digest))
directory_digest = create_digest(tree.root.SerializeToString())
assert server.compare_directories(directory_digest, directory_path)
......@@ -19,220 +19,285 @@
import tempfile
from unittest import mock
import boto3
import grpc
from grpc._server import _Context
import pytest
from moto import mock_s3
from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
from buildgrid.server.cas import service
from buildgrid.server.cas.instance import ByteStreamInstance, ContentAddressableStorageInstance
from buildgrid.server.cas.storage import remote
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid.server.cas.storage.remote import RemoteStorage
from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
from buildgrid.server.cas.storage.disk import DiskStorage
from buildgrid.server.cas.storage.s3 import S3Storage
from buildgrid.server.cas.storage.with_cache import WithCacheStorage
from buildgrid.settings import HASH
from ..utils.cas import serve_cas, run_in_subprocess
context = mock.create_autospec(_Context)
server = mock.create_autospec(grpc.server)
abc = b"abc"
abc_digest = Digest(hash=HASH(abc).hexdigest(), size_bytes=3)
defg = b"defg"
defg_digest = Digest(hash=HASH(defg).hexdigest(), size_bytes=4)
hijk = b"hijk"
hijk_digest = Digest(hash=HASH(hijk).hexdigest(), size_bytes=4)
def write(storage, digest, blob):
session = storage.begin_write(digest)
session.write(blob)
storage.commit_write(digest, session)
class MockCASStorage(ByteStreamInstance, ContentAddressableStorageInstance):
def __init__(self):
storage = LRUMemoryCache(256)
super().__init__(storage)
# Mock a CAS server with LRUStorage to return "calls" made to it
class MockStubServer:
def __init__(self):
instances = {"": MockCASStorage(), "dna": MockCASStorage()}
self._requests = []
with mock.patch.object(service, 'bytestream_pb2_grpc'):
self._bs_service = service.ByteStreamService(server)
for k, v in instances.items():
self._bs_service.add_instance(k, v)
with mock.patch.object(service, 'remote_execution_pb2_grpc'):
self._cas_service = service.ContentAddressableStorageService(server)
for k, v in instances.items():
self._cas_service.add_instance(k, v)
def Read(self, request):
yield from self._bs_service.Read(request, context)
def Write(self, request):
self._requests.append(request)
if request.finish_write:
response = self._bs_service.Write(self._requests, context)
self._requests = []
return response
return None
def FindMissingBlobs(self, request):
return self._cas_service.FindMissingBlobs(request, context)
def BatchUpdateBlobs(self, request):
return self._cas_service.BatchUpdateBlobs(request, context)
BLOBS = [(b'abc', b'defg', b'hijk', b'')]
BLOBS_DIGESTS = [tuple([remote_execution_pb2.Digest(hash=HASH(blob).hexdigest(),
size_bytes=len(blob)) for blob in blobs])
for blobs in BLOBS]
# Instances of MockCASStorage
@pytest.fixture(params=["", "dna"])
def instance(params):
return {params, MockCASStorage()}
# General tests for all storage providers
@pytest.fixture(params=["lru", "disk", "s3", "lru_disk", "disk_s3", "remote"])
@pytest.fixture(params=['lru', 'disk', 's3', 'lru_disk', 'disk_s3', 'remote'])
def any_storage(request):
if request.param == "lru":
if request.param == 'lru':
yield LRUMemoryCache(256)
elif request.param == "disk":
elif request.param == 'disk':
with tempfile.TemporaryDirectory() as path:
yield DiskStorage(path)
elif request.param == "s3":
elif request.param == 's3':
with mock_s3():
boto3.resource('s3').create_bucket(Bucket="testing")
yield S3Storage("testing")
elif request.param == "lru_disk":
boto3.resource('s3').create_bucket(Bucket='testing')
yield S3Storage('testing')
elif request.param == 'lru_disk':
# LRU cache with a uselessly small limit, so requests always fall back
with tempfile.TemporaryDirectory() as path:
yield WithCacheStorage(LRUMemoryCache(1), DiskStorage(path))
elif request.param == "disk_s3":
elif request.param == 'disk_s3':
# Disk-based cache of S3, but we don't delete files, so requests
# are always handled by the cache
with tempfile.TemporaryDirectory() as path:
with mock_s3():
boto3.resource('s3').create_bucket(Bucket="testing")
yield WithCacheStorage(DiskStorage(path), S3Storage("testing"))
elif request.param == "remote":
with mock.patch.object(remote, 'bytestream_pb2_grpc'):
with mock.patch.object(remote, 'remote_execution_pb2_grpc'):
mock_server = MockStubServer()
storage = remote.RemoteStorage(None, "")
storage._stub_bs = mock_server
storage._stub_cas = mock_server
yield storage
def test_initially_empty(any_storage):
assert not any_storage.has_blob(abc_digest)
assert not any_storage.has_blob(defg_digest)
assert not any_storage.has_blob(hijk_digest)
def test_basic_write_read(any_storage):
assert not any_storage.has_blob(abc_digest)
write(any_storage, abc_digest, abc)
assert any_storage.has_blob(abc_digest)
assert any_storage.get_blob(abc_digest).read() == abc
# Try writing the same digest again (since it's valid to do that)
write(any_storage, abc_digest, abc)
assert any_storage.has_blob(abc_digest)
assert any_storage.get_blob(abc_digest).read() == abc
def test_bulk_write_read(any_storage):
missing_digests = any_storage.missing_blobs([abc_digest, defg_digest, hijk_digest])
assert len(missing_digests) == 3
assert abc_digest in missing_digests
assert defg_digest in missing_digests
assert hijk_digest in missing_digests
boto3.resource('s3').create_bucket(Bucket='testing')
yield WithCacheStorage(DiskStorage(path), S3Storage('testing'))
elif request.param == 'remote':
with serve_cas(['testing']) as server:
yield server.remote
bulk_update_results = any_storage.bulk_update_blobs([(abc_digest, abc), (defg_digest, defg),
(hijk_digest, b'????')])
assert len(bulk_update_results) == 3
assert bulk_update_results[0].code == 0
assert bulk_update_results[1].code == 0
assert bulk_update_results[2].code != 0
missing_digests = any_storage.missing_blobs([abc_digest, defg_digest, hijk_digest])
assert missing_digests == [hijk_digest]
assert any_storage.get_blob(abc_digest).read() == abc
assert any_storage.get_blob(defg_digest).read() == defg
def test_nonexistent_read(any_storage):
assert any_storage.get_blob(abc_digest) is None
def write(storage, digest, blob):
session = storage.begin_write(digest)
session.write(blob)
storage.commit_write(digest, session)
# Tests for special behavior of individual storage providers
@pytest.mark.parametrize('blobs_digests', zip(BLOBS, BLOBS_DIGESTS))
def test_initially_empty(any_storage, blobs_digests):
_, digests = blobs_digests
# Actual test function, failing on assertions:
def __test_initially_empty(any_storage, digests):
for digest in digests:
assert not any_storage.has_blob(digest)
# Helper test function for remote storage, to be run in a subprocess:
def __test_remote_initially_empty(queue, remote, serialized_digests):
channel = grpc.insecure_channel(remote)
remote_storage = RemoteStorage(channel, 'testing')
digests = []
for data in serialized_digests:
digest = remote_execution_pb2.Digest()
digest.ParseFromString(data)
digests.append(digest)
try:
__test_initially_empty(remote_storage, digests)
except AssertionError:
queue.put(False)
else:
queue.put(True)
if isinstance(any_storage, str):
serialized_digests = [digest.SerializeToString() for digest in digests]
assert run_in_subprocess(__test_remote_initially_empty,
any_storage, serialized_digests)
else:
__test_initially_empty(any_storage, digests)
@pytest.mark.parametrize('blobs_digests', zip(BLOBS, BLOBS_DIGESTS))
def test_basic_write_read(any_storage, blobs_digests):
blobs, digests = blobs_digests
# Actual test function, failing on assertions:
def __test_basic_write_read(any_storage, blobs, digests):
for blob, digest in zip(blobs, digests):
assert not any_storage.has_blob(digest)
write(any_storage, digest, blob)
assert any_storage.has_blob(digest)
assert any_storage.get_blob(digest).read() == blob
# Try writing the same digest again (since it's valid to do that)
write(any_storage, digest, blob)
assert any_storage.has_blob(digest)
assert any_storage.get_blob(digest).read() == blob
# Helper test function for remote storage, to be run in a subprocess:
def __test_remote_basic_write_read(queue, remote, blobs, serialized_digests):
channel = grpc.insecure_channel(remote)
remote_storage = RemoteStorage(channel, 'testing')
digests = []
for data in serialized_digests:
digest = remote_execution_pb2.Digest()
digest.ParseFromString(data)
digests.append(digest)
try:
__test_basic_write_read(remote_storage, blobs, digests)
except AssertionError:
queue.put(False)
else:
queue.put(True)
if isinstance(any_storage, str):
serialized_digests = [digest.SerializeToString() for digest in digests]
assert run_in_subprocess(__test_remote_basic_write_read,
any_storage, blobs, serialized_digests)
else:
__test_basic_write_read(any_storage, blobs, digests)
@pytest.mark.parametrize('blobs_digests', zip(BLOBS, BLOBS_DIGESTS))
def test_bulk_write_read(any_storage, blobs_digests):
blobs, digests = blobs_digests
# Actual test function, failing on assertions:
def __test_bulk_write_read(any_storage, blobs, digests):
missing_digests = any_storage.missing_blobs(digests)
assert len(missing_digests) == len(digests)
for digest in digests:
assert digest in missing_digests
faulty_blobs = list(blobs)
faulty_blobs[-1] = b'this-is-not-matching'
results = any_storage.bulk_update_blobs(list(zip(digests, faulty_blobs)))
assert len(results) == len(digests)
for result, blob, digest in zip(results[:-1], faulty_blobs[:-1], digests[:-1]):
assert result.code == 0
assert any_storage.get_blob(digest).read() == blob
assert results[-1].code != 0
missing_digests = any_storage.missing_blobs(digests)
assert len(missing_digests) == 1
assert missing_digests[0] == digests[-1]
# Helper test function for remote storage, to be run in a subprocess:
def __test_remote_bulk_write_read(queue, remote, blobs, serialized_digests):
channel = grpc.insecure_channel(remote)
remote_storage = RemoteStorage(channel, 'testing')
digests = []
for data in serialized_digests:
digest = remote_execution_pb2.Digest()
digest.ParseFromString(data)
digests.append(digest)
try:
__test_bulk_write_read(remote_storage, blobs, digests)
except AssertionError:
queue.put(False)
else:
queue.put(True)
if isinstance(any_storage, str):
serialized_digests = [digest.SerializeToString() for digest in digests]
assert run_in_subprocess(__test_remote_bulk_write_read,
any_storage, blobs, serialized_digests)
else:
__test_bulk_write_read(any_storage, blobs, digests)
@pytest.mark.parametrize('blobs_digests', zip(BLOBS, BLOBS_DIGESTS))
def test_nonexistent_read(any_storage, blobs_digests):
_, digests = blobs_digests
# Actual test function, failing on assertions:
def __test_nonexistent_read(any_storage, digests):
for digest in digests:
assert any_storage.get_blob(digest) is None
# Helper test function for remote storage, to be run in a subprocess:
def __test_remote_nonexistent_read(queue, remote, serialized_digests):
channel = grpc.insecure_channel(remote)
remote_storage = RemoteStorage(channel, 'testing')
digests = []
for data in serialized_digests:
digest = remote_execution_pb2.Digest()
digest.ParseFromString(data)
digests.append(digest)
try:
__test_nonexistent_read(remote_storage, digests)
except AssertionError:
queue.put(False)
else:
queue.put(True)
if isinstance(any_storage, str):
serialized_digests = [digest.SerializeToString() for digest in digests]
assert run_in_subprocess(__test_remote_nonexistent_read,
any_storage, serialized_digests)
else:
__test_nonexistent_read(any_storage, digests)
@pytest.mark.parametrize('blobs_digests', [(BLOBS[0], BLOBS_DIGESTS[0])])
def test_lru_eviction(blobs_digests):
blobs, digests = blobs_digests
blob1, blob2, blob3, *_ = blobs
digest1, digest2, digest3, *_ = digests
def test_lru_eviction():
lru = LRUMemoryCache(8)
write(lru, abc_digest, abc)
write(lru, defg_digest, defg)
assert lru.has_blob(abc_digest)
assert lru.has_blob(defg_digest)
write(lru, hijk_digest, hijk)
# Check that the LRU evicted abc (it was written first)
assert not lru.has_blob(abc_digest)
assert lru.has_blob(defg_digest)
assert lru.has_blob(hijk_digest)
assert lru.get_blob(defg_digest).read() == defg
write(lru, abc_digest, abc)
# Check that the LRU evicted hijk (since we just read defg)
assert lru.has_blob(abc_digest)
assert lru.has_blob(defg_digest)
assert not lru.has_blob(hijk_digest)
assert lru.has_blob(defg_digest)
write(lru, hijk_digest, abc)
# Check that the LRU evicted abc (since we just checked hijk)
assert not lru.has_blob(abc_digest)
assert lru.has_blob(defg_digest)
assert lru.has_blob(hijk_digest)
def test_with_cache():
write(lru, digest1, blob1)
write(lru, digest2, blob2)
assert lru.has_blob(digest1)
assert lru.has_blob(digest2)
write(lru, digest3, blob3)
# Check that the LRU evicted blob1 (it was written first)
assert not lru.has_blob(digest1)
assert lru.has_blob(digest2)
assert lru.has_blob(digest3)
assert lru.get_blob(digest2).read() == blob2
write(lru, digest1, blob1)
# Check that the LRU evicted blob3 (since we just read blob2)
assert lru.has_blob(digest1)
assert lru.has_blob(digest2)
assert not lru.has_blob(digest3)
assert lru.has_blob(digest2)
write(lru, digest3, blob1)
# Check that the LRU evicted blob1 (since we just checked blob3)
assert not lru.has_blob(digest1)
assert lru.has_blob(digest2)
assert lru.has_blob(digest3)
@pytest.mark.parametrize('blobs_digests', [(BLOBS[0], BLOBS_DIGESTS[0])])
def test_with_cache(blobs_digests):
blobs, digests = blobs_digests
blob1, blob2, blob3, *_ = blobs
digest1, digest2, digest3, *_ = digests
cache = LRUMemoryCache(256)
fallback = LRUMemoryCache(256)
with_cache_storage = WithCacheStorage(cache, fallback)
assert not with_cache_storage.has_blob(abc_digest)
write(with_cache_storage, abc_digest, abc)
assert cache.has_blob(abc_digest)
assert fallback.has_blob(abc_digest)
assert with_cache_storage.get_blob(abc_digest).read() == abc
assert not with_cache_storage.has_blob(digest1)
write(with_cache_storage, digest1, blob1)
assert cache.has_blob(digest1)
assert fallback.has_blob(digest1)
assert with_cache_storage.get_blob(digest1).read() == blob1
# Even if a blob is in cache, we still need to check if the fallback
# has it.
write(cache, defg_digest, defg)
assert not with_cache_storage.has_blob(defg_digest)
write(fallback, defg_digest, defg)
assert with_cache_storage.has_blob(defg_digest)
write(cache, digest2, blob2)
assert not with_cache_storage.has_blob(digest2)
write(fallback, digest2, blob2)
assert with_cache_storage.has_blob(digest2)
# When a blob is in the fallback but not the cache, reading it should
# put it into the cache.
write(fallback, hijk_digest, hijk)
assert with_cache_storage.get_blob(hijk_digest).read() == hijk
assert cache.has_blob(hijk_digest)
assert cache.get_blob(hijk_digest).read() == hijk
assert cache.has_blob(hijk_digest)
write(fallback, digest3, blob3)
assert with_cache_storage.get_blob(digest3).read() == blob3
assert cache.has_blob(digest3)
assert cache.get_blob(digest3).read() == blob3
assert cache.has_blob(digest3)
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from concurrent import futures
from contextlib import contextmanager
import multiprocessing
import os
import signal
import tempfile
import grpc
import psutil
import pytest_cov
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid.server.cas.service import ByteStreamService
from buildgrid.server.cas.service import ContentAddressableStorageService
from buildgrid.server.cas.instance import ByteStreamInstance
from buildgrid.server.cas.instance import ContentAddressableStorageInstance
from buildgrid.server.cas.storage.disk import DiskStorage
@contextmanager
def serve_cas(instances):
server = Server(instances)
try:
yield server
finally:
server.quit()
def kill_process_tree(pid):
proc = psutil.Process(pid)
children = proc.children(recursive=True)
def kill_proc(p):
try:
p.kill()
except psutil.AccessDenied:
# Ignore this error, it can happen with
# some setuid bwrap processes.
pass
# Bloody Murder
for child in children:
kill_proc(child)
kill_proc(proc)
def run_in_subprocess(function, *arguments):
queue = multiprocessing.Queue()
# Use subprocess to avoid creation of gRPC threads in main process
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md
process = multiprocessing.Process(target=function,
args=(queue, *arguments))
try:
process.start()
result = queue.get()
process.join()
except KeyboardInterrupt:
kill_process_tree(process.pid)
raise
return result
class Server:
def __init__(self, instances):
self.instances = instances
self.__storage_path = tempfile.TemporaryDirectory()
self.__storage = DiskStorage(self.__storage_path.name)
self.__queue = multiprocessing.Queue()
self.__process = multiprocessing.Process(
target=Server.serve,
args=(self.__queue, self.instances, self.__storage_path.name))
self.__process.start()
self.port = self.__queue.get()
self.remote = 'localhost:{}'.format(self.port)
@classmethod
def serve(cls, queue, instances, storage_path):
pytest_cov.embed.cleanup_on_sigterm()
# Use max_workers default from Python 3.5+
max_workers = (os.cpu_count() or 1) * 5
server = grpc.server(futures.ThreadPoolExecutor(max_workers))
port = server.add_insecure_port('localhost:0')
storage = DiskStorage(storage_path)
bs_service = ByteStreamService(server)
cas_service = ContentAddressableStorageService(server)
for name in instances:
bs_service.add_instance(name, ByteStreamInstance(storage))
cas_service.add_instance(name, ContentAddressableStorageInstance(storage))
server.start()
queue.put(port)
signal.pause()
def has(self, digest):
return self.__storage.has_blob(digest)
def get(self, digest):
return self.__storage.get_blob(digest).read()
def compare_blobs(self, digest, blob):
if not self.__storage.has_blob(digest):
return False
stored_blob = self.__storage.get_blob(digest)
stored_blob = stored_blob.read()
return blob == stored_blob
def compare_messages(self, digest, message):
if not self.__storage.has_blob(digest):
return False
message_blob = message.SerializeToString()
stored_blob = self.__storage.get_blob(digest)
stored_blob = stored_blob.read()
return message_blob == stored_blob
def compare_files(self, digest, file_path):
if not self.__storage.has_blob(digest):
return False
with open(file_path, 'rb') as file_bytes:
file_blob = file_bytes.read()
stored_blob = self.__storage.get_blob(digest)
stored_blob = stored_blob.read()
return file_blob == stored_blob
def compare_directories(self, digest, directory_path):
if not self.__storage.has_blob(digest):
return False
elif not os.path.isdir(directory_path):
return False
def __compare_folders(digest, path):
directory = remote_execution_pb2.Directory()
directory.ParseFromString(self.__storage.get_blob(digest).read())
files, directories, symlinks = [], [], []
for entry in os.scandir(path):
if entry.is_file(follow_symlinks=False):
files.append(entry.name)
elif entry.is_dir(follow_symlinks=False):
directories.append(entry.name)
elif os.path.islink(entry.path):
symlinks.append(entry.name)
assert len(files) == len(directory.files)
assert len(directories) == len(directory.directories)
assert len(symlinks) == len(directory.symlinks)
for file_node in directory.files:
file_path = os.path.join(path, file_node.name)
assert file_node.name in files
assert os.path.isfile(file_path)
assert not os.path.islink(file_path)
if file_node.is_executable:
assert os.access(file_path, os.X_OK)
assert self.compare_files(file_node.digest, file_path)
for directory_node in directory.directories:
directory_path = os.path.join(path, directory_node.name)
assert directory_node.name in directories
assert os.path.exists(directory_path)
assert not os.path.islink(directory_path)
assert __compare_folders(directory_node.digest, directory_path)
for symlink_node in directory.symlinks:
symlink_path = os.path.join(path, symlink_node.name)
assert symlink_node.name in symlinks
assert os.path.islink(symlink_path)
assert os.readlink(symlink_path) == symlink_node.target
return True
return __compare_folders(digest, directory_path)
def quit(self):
if self.__process:
self.__process.terminate()
self.__process.join()
self.__storage_path.cleanup()