Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • edbaunton/buildgrid
  • BuildGrid/buildgrid
  • bloomberg/buildgrid
  • devcurmudgeon/buildgrid
  • mhadjimichael/buildgrid
  • jmacarthur/buildgrid
  • rkothur/buildgrid
  • valentindavid/buildgrid
  • jjardon/buildgrid
  • RichKen/buildgrid
  • jbonney/buildgrid
  • onsha_alexander/buildgrid
  • santigl/buildgrid
  • mostynb/buildgrid
  • hoffbrinkle/buildgrid
  • Malinskiy/buildgrid
  • coldtom/buildgrid
  • azeemb_a/buildgrid
  • pointswaves/buildgrid
  • BenjaminSchubert/buildgrid
  • michaellee8/buildgrid
  • anil-anil/buildgrid
  • seanborg/buildgrid
  • jdelong12/buildgrid
  • jclay/buildgrid
  • bweston92/buildgrid
  • zchen723/buildgrid
  • cpratt34/buildgrid
  • armbiant/apache-buildgrid
  • armbiant/android-buildgrid
  • itsme300/buildgrid
  • sbairoliya/buildgrid
32 results
Show changes
Commits on Source (49)
Showing
with 1145 additions and 370 deletions
...@@ -19,35 +19,32 @@ import tempfile ...@@ -19,35 +19,32 @@ import tempfile
from google.protobuf import any_pb2 from google.protobuf import any_pb2
from buildgrid.settings import HASH_LENGTH from buildgrid.client.cas import download, upload
from buildgrid.client.cas import upload
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
from buildgrid._exceptions import BotError from buildgrid._exceptions import BotError
from buildgrid.utils import read_file, write_file, parse_to_pb2_from_fetch from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid.settings import HASH_LENGTH
from buildgrid.utils import read_file, write_file
def work_buildbox(context, lease): def work_buildbox(context, lease):
"""Executes a lease for a build action, using buildbox. """Executes a lease for a build action, using buildbox.
""" """
stub_bytestream = bytestream_pb2_grpc.ByteStreamStub(context.cas_channel)
local_cas_directory = context.local_cas local_cas_directory = context.local_cas
# instance_name = context.parent
logger = context.logger logger = context.logger
action_digest = remote_execution_pb2.Digest() action_digest = remote_execution_pb2.Digest()
lease.payload.Unpack(action_digest) lease.payload.Unpack(action_digest)
action = parse_to_pb2_from_fetch(remote_execution_pb2.Action(), with download(context.cas_channel) as downloader:
stub_bytestream, action_digest) action = downloader.get_message(action_digest,
remote_execution_pb2.Action())
command = parse_to_pb2_from_fetch(remote_execution_pb2.Command(), assert action.command_digest.hash
stub_bytestream, action.command_digest)
environment = dict() command = downloader.get_message(action.command_digest,
for variable in command.environment_variables: remote_execution_pb2.Command())
if variable.name not in ['PWD']:
environment[variable.name] = variable.value
if command.working_directory: if command.working_directory:
working_directory = command.working_directory working_directory = command.working_directory
...@@ -80,6 +77,12 @@ def work_buildbox(context, lease): ...@@ -80,6 +77,12 @@ def work_buildbox(context, lease):
if context.cas_server_cert: if context.cas_server_cert:
command_line.append('--server-cert={}'.format(context.cas_server_cert)) command_line.append('--server-cert={}'.format(context.cas_server_cert))
command_line.append('--clearenv')
for variable in command.environment_variables:
command_line.append('--setenv')
command_line.append(variable.name)
command_line.append(variable.value)
command_line.append(context.fuse_dir) command_line.append(context.fuse_dir)
command_line.extend(command.arguments) command_line.extend(command.arguments)
...@@ -116,10 +119,11 @@ def work_buildbox(context, lease): ...@@ -116,10 +119,11 @@ def work_buildbox(context, lease):
# TODO: Have BuildBox helping us creating the Tree instance here # TODO: Have BuildBox helping us creating the Tree instance here
# See https://gitlab.com/BuildStream/buildbox/issues/7 for details # See https://gitlab.com/BuildStream/buildbox/issues/7 for details
output_tree = _cas_tree_maker(stub_bytestream, output_digest) with download(context.cas_channel) as downloader:
output_tree = _cas_tree_maker(downloader, output_digest)
with upload(context.cas_channel) as cas: with upload(context.cas_channel) as uploader:
output_tree_digest = cas.send_message(output_tree) output_tree_digest = uploader.put_message(output_tree)
output_directory = remote_execution_pb2.OutputDirectory() output_directory = remote_execution_pb2.OutputDirectory()
output_directory.tree_digest.CopyFrom(output_tree_digest) output_directory.tree_digest.CopyFrom(output_tree_digest)
...@@ -135,24 +139,28 @@ def work_buildbox(context, lease): ...@@ -135,24 +139,28 @@ def work_buildbox(context, lease):
return lease return lease
def _cas_tree_maker(stub_bytestream, directory_digest): def _cas_tree_maker(cas, directory_digest):
# Generates and stores a Tree for a given Directory. This is very inefficient # Generates and stores a Tree for a given Directory. This is very inefficient
# and only temporary. See https://gitlab.com/BuildStream/buildbox/issues/7. # and only temporary. See https://gitlab.com/BuildStream/buildbox/issues/7.
output_tree = remote_execution_pb2.Tree() output_tree = remote_execution_pb2.Tree()
def list_directories(parent_directory): def __cas_tree_maker(cas, parent_directory):
directory_list = list() digests, directories = [], []
for directory_node in parent_directory.directories: for directory_node in parent_directory.directories:
directory = parse_to_pb2_from_fetch(remote_execution_pb2.Directory(), directories.append(remote_execution_pb2.Directory())
stub_bytestream, directory_node.digest) digests.append(directory_node.digest)
directory_list.extend(list_directories(directory))
directory_list.append(directory) cas.get_messages(digests, directories)
for directory in directories[:]:
directories.extend(__cas_tree_maker(cas, directory))
return directories
return directory_list root_directory = cas.get_message(directory_digest,
remote_execution_pb2.Directory())
root_directory = parse_to_pb2_from_fetch(remote_execution_pb2.Directory(), output_tree.children.extend(__cas_tree_maker(cas, root_directory))
stub_bytestream, directory_digest)
output_tree.children.extend(list_directories(root_directory))
output_tree.root.CopyFrom(root_directory) output_tree.root.CopyFrom(root_directory)
return output_tree return output_tree
...@@ -19,40 +19,39 @@ import tempfile ...@@ -19,40 +19,39 @@ import tempfile
from google.protobuf import any_pb2 from google.protobuf import any_pb2
from buildgrid.client.cas import upload from buildgrid.client.cas import download, upload
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
from buildgrid.utils import write_fetch_directory, parse_to_pb2_from_fetch
from buildgrid.utils import output_file_maker, output_directory_maker from buildgrid.utils import output_file_maker, output_directory_maker
def work_temp_directory(context, lease): def work_host_tools(context, lease):
"""Executes a lease for a build action, using host tools. """Executes a lease for a build action, using host tools.
""" """
stub_bytestream = bytestream_pb2_grpc.ByteStreamStub(context.cas_channel)
instance_name = context.parent instance_name = context.parent
logger = context.logger logger = context.logger
action_digest = remote_execution_pb2.Digest() action_digest = remote_execution_pb2.Digest()
lease.payload.Unpack(action_digest) lease.payload.Unpack(action_digest)
action = parse_to_pb2_from_fetch(remote_execution_pb2.Action(),
stub_bytestream, action_digest, instance_name)
with tempfile.TemporaryDirectory() as temp_directory: with tempfile.TemporaryDirectory() as temp_directory:
command = parse_to_pb2_from_fetch(remote_execution_pb2.Command(), with download(context.cas_channel, instance=instance_name) as downloader:
stub_bytestream, action.command_digest, instance_name) action = downloader.get_message(action_digest,
remote_execution_pb2.Action())
assert action.command_digest.hash
command = downloader.get_message(action.command_digest,
remote_execution_pb2.Command())
write_fetch_directory(temp_directory, stub_bytestream, downloader.download_directory(action.input_root_digest, temp_directory)
action.input_root_digest, instance_name)
environment = os.environ.copy() environment = os.environ.copy()
for variable in command.environment_variables: for variable in command.environment_variables:
if variable.name not in ['PATH', 'PWD']: if variable.name not in ['PATH', 'PWD']:
environment[variable.name] = variable.value environment[variable.name] = variable.value
command_line = list() command_line = []
for argument in command.arguments: for argument in command.arguments:
command_line.append(argument.strip()) command_line.append(argument.strip())
...@@ -94,15 +93,21 @@ def work_temp_directory(context, lease): ...@@ -94,15 +93,21 @@ def work_temp_directory(context, lease):
logger.debug("Command stdout: [{}]".format(stdout)) logger.debug("Command stdout: [{}]".format(stdout))
logger.debug("Command exit code: [{}]".format(returncode)) logger.debug("Command exit code: [{}]".format(returncode))
with upload(context.cas_channel, instance=instance_name) as cas: with upload(context.cas_channel, instance=instance_name) as uploader:
output_files, output_directories = [], []
for output_path in command.output_files: for output_path in command.output_files:
file_path = os.path.join(working_directory, output_path) file_path = os.path.join(working_directory, output_path)
# Missing outputs should simply be omitted in ActionResult: # Missing outputs should simply be omitted in ActionResult:
if not os.path.isfile(file_path): if not os.path.isfile(file_path):
continue continue
output_file = output_file_maker(file_path, working_directory, cas=cas) file_digest = uploader.upload_file(file_path, queue=True)
action_result.output_files.extend([output_file]) output_file = output_file_maker(file_path, working_directory,
file_digest)
output_files.append(output_file)
action_result.output_files.extend(output_files)
for output_path in command.output_directories: for output_path in command.output_directories:
directory_path = os.path.join(working_directory, output_path) directory_path = os.path.join(working_directory, output_path)
...@@ -110,10 +115,12 @@ def work_temp_directory(context, lease): ...@@ -110,10 +115,12 @@ def work_temp_directory(context, lease):
if not os.path.isdir(directory_path): if not os.path.isdir(directory_path):
continue continue
# OutputDirectory.path should be relative to the working direcory: tree_digest = uploader.upload_tree(directory_path, queue=True)
output_directory = output_directory_maker(directory_path, working_directory, cas=cas) output_directory = output_directory_maker(directory_path, working_directory,
tree_digest)
output_directories.append(output_directory)
action_result.output_directories.extend([output_directory]) action_result.output_directories.extend(output_directories)
action_result_any = any_pb2.Any() action_result_any = any_pb2.Any()
action_result_any.Pack(action_result) action_result_any.Pack(action_result)
......
...@@ -31,7 +31,7 @@ import grpc ...@@ -31,7 +31,7 @@ import grpc
from buildgrid.bot import bot, bot_interface from buildgrid.bot import bot, bot_interface
from buildgrid.bot.bot_session import BotSession, Device, Worker from buildgrid.bot.bot_session import BotSession, Device, Worker
from ..bots import buildbox, dummy, temp_directory from ..bots import buildbox, dummy, host
from ..cli import pass_context from ..cli import pass_context
...@@ -52,16 +52,19 @@ from ..cli import pass_context ...@@ -52,16 +52,19 @@ from ..cli import pass_context
help="Public CAS client certificate for TLS (PEM-encoded)") help="Public CAS client certificate for TLS (PEM-encoded)")
@click.option('--cas-server-cert', type=click.Path(exists=True, dir_okay=False), default=None, @click.option('--cas-server-cert', type=click.Path(exists=True, dir_okay=False), default=None,
help="Public CAS server certificate for TLS (PEM-encoded)") help="Public CAS server certificate for TLS (PEM-encoded)")
@click.option('--update-period', type=click.FLOAT, default=0.5, show_default=True,
help="Time period for bot updates to the server in seconds.")
@click.option('--parent', type=click.STRING, default='main', show_default=True, @click.option('--parent', type=click.STRING, default='main', show_default=True,
help="Targeted farm resource.") help="Targeted farm resource.")
@pass_context @pass_context
def cli(context, parent, remote, client_key, client_cert, server_cert, def cli(context, parent, update_period, remote, client_key, client_cert, server_cert,
remote_cas, cas_client_key, cas_client_cert, cas_server_cert): remote_cas, cas_client_key, cas_client_cert, cas_server_cert):
# Setup the remote execution server channel: # Setup the remote execution server channel:
url = urlparse(remote) url = urlparse(remote)
context.remote = '{}:{}'.format(url.hostname, url.port or 50051) context.remote = '{}:{}'.format(url.hostname, url.port or 50051)
context.remote_url = remote context.remote_url = remote
context.update_period = update_period
context.parent = parent context.parent = parent
if url.scheme == 'http': if url.scheme == 'http':
...@@ -135,26 +138,26 @@ def cli(context, parent, remote, client_key, client_cert, server_cert, ...@@ -135,26 +138,26 @@ def cli(context, parent, remote, client_key, client_cert, server_cert,
@pass_context @pass_context
def run_dummy(context): def run_dummy(context):
""" """
Simple dummy client. Creates a session, accepts leases, does fake work and Creates a session, accepts leases, does fake work and updates the server.
updates the server.
""" """
try: try:
b = bot.Bot(context.bot_session) b = bot.Bot(context.bot_session, context.update_period)
b.session(dummy.work_dummy, b.session(dummy.work_dummy,
context) context)
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
@cli.command('temp-directory', short_help="Runs commands in temp directory and uploads results.") @cli.command('host-tools', short_help="Runs commands using the host's tools.")
@pass_context @pass_context
def run_temp_directory(context): def run_host_tools(context):
""" Downloads files and command from CAS and runs """
in a temp directory, uploading result back to CAS Downloads inputs from CAS, runs build commands using host-tools and uploads
result back to CAS.
""" """
try: try:
b = bot.Bot(context.bot_session) b = bot.Bot(context.bot_session, context.update_period)
b.session(temp_directory.work_temp_directory, b.session(host.work_host_tools,
context) context)
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
...@@ -168,13 +171,13 @@ def run_temp_directory(context): ...@@ -168,13 +171,13 @@ def run_temp_directory(context):
@pass_context @pass_context
def run_buildbox(context, local_cas, fuse_dir): def run_buildbox(context, local_cas, fuse_dir):
""" """
Uses BuildBox to run commands. Uses BuildBox to run build commands.
""" """
context.local_cas = local_cas context.local_cas = local_cas
context.fuse_dir = fuse_dir context.fuse_dir = fuse_dir
try: try:
b = bot.Bot(context.bot_session) b = bot.Bot(context.bot_session, context.update_period)
b.session(buildbox.work_buildbox, b.session(buildbox.work_buildbox,
context) context)
except KeyboardInterrupt: except KeyboardInterrupt:
......
...@@ -21,14 +21,16 @@ Request work to be executed and monitor status of jobs. ...@@ -21,14 +21,16 @@ Request work to be executed and monitor status of jobs.
""" """
import logging import logging
import os
import sys import sys
from urllib.parse import urlparse from urllib.parse import urlparse
import click import click
import grpc import grpc
from buildgrid.utils import merkle_maker, create_digest from buildgrid.client.cas import download, upload
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid.utils import create_digest, merkle_tree_maker, read_file
from ..cli import pass_context from ..cli import pass_context
...@@ -68,56 +70,145 @@ def cli(context, remote, instance_name, client_key, client_cert, server_cert): ...@@ -68,56 +70,145 @@ def cli(context, remote, instance_name, client_key, client_cert, server_cert):
@cli.command('upload-dummy', short_help="Upload a dummy action. Should be used with `execute dummy-request`") @cli.command('upload-dummy', short_help="Upload a dummy action. Should be used with `execute dummy-request`")
@pass_context @pass_context
def upload_dummy(context): def upload_dummy(context):
context.logger.info("Uploading dummy action...")
action = remote_execution_pb2.Action(do_not_cache=True) action = remote_execution_pb2.Action(do_not_cache=True)
action_digest = create_digest(action.SerializeToString()) with upload(context.channel, instance=context.instance_name) as uploader:
action_digest = uploader.put_message(action)
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=context.instance_name) if action_digest.ByteSize():
request.requests.add(digest=action_digest, click.echo('Success: Pushed digest "{}/{}"'
data=action.SerializeToString()) .format(action_digest.hash, action_digest.size_bytes))
else:
stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel) click.echo("Error: Failed pushing empty message.", err=True)
response = stub.BatchUpdateBlobs(request)
context.logger.info(response)
@cli.command('upload-files', short_help="Upload files to the CAS server.") @cli.command('upload-file', short_help="Upload files to the CAS server.")
@click.argument('files', nargs=-1, type=click.File('rb'), required=True) @click.argument('file_path', nargs=-1, type=click.Path(exists=True, dir_okay=False), required=True)
@click.option('--verify', is_flag=True, show_default=True,
help="Check uploaded files integrity.")
@pass_context @pass_context
def upload_files(context, files): def upload_file(context, file_path, verify):
stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel) sent_digests, files_map = [], {}
with upload(context.channel, instance=context.instance_name) as uploader:
requests = [] for path in file_path:
for file in files: if not os.path.isabs(path):
chunk = file.read() path = os.path.abspath(path)
requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request( context.logger.debug("Queueing {}".format(path))
digest=create_digest(chunk), data=chunk))
file_digest = uploader.upload_file(path, queue=True)
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=context.instance_name,
requests=requests) files_map[file_digest.hash] = path
sent_digests.append(file_digest)
context.logger.info("Sending: {}".format(request))
response = stub.BatchUpdateBlobs(request) for file_digest in sent_digests:
context.logger.info("Response: {}".format(response)) file_path = os.path.relpath(files_map[file_digest.hash])
if verify and file_digest.size_bytes != os.stat(file_path).st_size:
click.echo('Error: Failed to verify "{}"'.format(file_path), err=True)
elif file_digest.ByteSize():
click.echo('Success: Pushed "{}" with digest "{}/{}"'
.format(file_path, file_digest.hash, file_digest.size_bytes))
else:
click.echo('Error: Failed pushing "{}"'.format(file_path), err=True)
@cli.command('upload-dir', short_help="Upload a directory to the CAS server.") @cli.command('upload-dir', short_help="Upload a directory to the CAS server.")
@click.argument('directory', nargs=1, type=click.Path(), required=True) @click.argument('directory-path', nargs=1, type=click.Path(exists=True, file_okay=False), required=True)
@click.option('--verify', is_flag=True, show_default=True,
help="Check uploaded directory's integrity.")
@pass_context @pass_context
def upload_dir(context, directory): def upload_directory(context, directory_path, verify):
context.logger.info("Uploading directory to cas") sent_digests, nodes_map = [], {}
stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel) with upload(context.channel, instance=context.instance_name) as uploader:
for node, blob, path in merkle_tree_maker(directory_path):
requests = [] if not os.path.isabs(path):
path = os.path.abspath(path)
for chunk, file_digest in merkle_maker(directory): context.logger.debug("Queueing {}".format(path))
requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
digest=file_digest, data=chunk)) node_digest = uploader.put_blob(blob, digest=node.digest, queue=True)
nodes_map[node.digest.hash] = path
sent_digests.append(node_digest)
for node_digest in sent_digests:
node_path = nodes_map[node_digest.hash]
if not os.path.isabs(directory_path):
node_path = os.path.relpath(node_path)
if verify and (os.path.isfile(node_path) and
node_digest.size_bytes != os.stat(node_path).st_size):
click.echo('Error: Failed to verify "{}"'.format(node_path), err=True)
elif node_digest.ByteSize():
click.echo('Success: Pushed "{}" with digest "{}/{}"'
.format(node_path, node_digest.hash, node_digest.size_bytes))
else:
click.echo('Error: Failed pushing "{}"'.format(node_path), err=True)
def _create_digest(digest_string):
digest_hash, digest_size = digest_string.split('/')
digest = remote_execution_pb2.Digest()
digest.hash = digest_hash
digest.size_bytes = int(digest_size)
return digest
@cli.command('download-file', short_help="Download a file from the CAS server.")
@click.argument('digest-string', nargs=1, type=click.STRING, required=True)
@click.argument('file-path', nargs=1, type=click.Path(exists=False), required=True)
@click.option('--verify', is_flag=True, show_default=True,
help="Check downloaded file's integrity.")
@pass_context
def download_file(context, digest_string, file_path, verify):
if os.path.exists(file_path):
click.echo('Error: Invalid value for "file-path": ' +
'Path "{}" already exists.'.format(file_path), err=True)
return
digest = _create_digest(digest_string)
with download(context.channel, instance=context.instance_name) as downloader:
downloader.download_file(digest, file_path)
if verify:
file_digest = create_digest(read_file(file_path))
if file_digest != digest:
click.echo('Error: Failed to verify "{}"'.format(file_path), err=True)
return
if os.path.isfile(file_path):
click.echo('Success: Pulled "{}" from digest "{}/{}"'
.format(file_path, digest.hash, digest.size_bytes))
else:
click.echo('Error: Failed pulling "{}"'.format(file_path), err=True)
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=context.instance_name,
requests=requests)
context.logger.info("Request:\n{}".format(request)) @cli.command('download-dir', short_help="Download a directory from the CAS server.")
response = stub.BatchUpdateBlobs(request) @click.argument('digest-string', nargs=1, type=click.STRING, required=True)
context.logger.info("Response:\n{}".format(response)) @click.argument('directory-path', nargs=1, type=click.Path(exists=False), required=True)
@click.option('--verify', is_flag=True, show_default=True,
help="Check downloaded directory's integrity.")
@pass_context
def download_directory(context, digest_string, directory_path, verify):
if os.path.exists(directory_path):
if not os.path.isdir(directory_path) or os.listdir(directory_path):
click.echo('Error: Invalid value for "directory-path": ' +
'Path "{}" already exists.'.format(directory_path), err=True)
return
digest = _create_digest(digest_string)
with download(context.channel, instance=context.instance_name) as downloader:
downloader.download_directory(digest, directory_path)
if verify:
last_directory_node = None
for node, _, _ in merkle_tree_maker(directory_path):
if node.DESCRIPTOR is remote_execution_pb2.DirectoryNode.DESCRIPTOR:
last_directory_node = node
if last_directory_node.digest != digest:
click.echo('Error: Failed to verify "{}"'.format(directory_path), err=True)
return
if os.path.isdir(directory_path):
click.echo('Success: Pulled "{}" from digest "{}/{}"'
.format(directory_path, digest.hash, digest.size_bytes))
else:
click.echo('Error: Failed pulling "{}"'.format(directory_path), err=True)
...@@ -20,7 +20,6 @@ Execute command ...@@ -20,7 +20,6 @@ Execute command
Request work to be executed and monitor status of jobs. Request work to be executed and monitor status of jobs.
""" """
import errno
import logging import logging
import os import os
import stat import stat
...@@ -30,9 +29,9 @@ from urllib.parse import urlparse ...@@ -30,9 +29,9 @@ from urllib.parse import urlparse
import click import click
import grpc import grpc
from buildgrid.utils import merkle_maker, create_digest, write_fetch_blob from buildgrid.client.cas import download, upload
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid._protos.google.bytestream import bytestream_pb2_grpc from buildgrid.utils import create_digest
from ..cli import pass_context from ..cli import pass_context
...@@ -87,7 +86,7 @@ def request_dummy(context, number, wait_for_completion): ...@@ -87,7 +86,7 @@ def request_dummy(context, number, wait_for_completion):
action_digest=action_digest, action_digest=action_digest,
skip_cache_lookup=True) skip_cache_lookup=True)
responses = list() responses = []
for _ in range(0, number): for _ in range(0, number):
responses.append(stub.Execute(request)) responses.append(stub.Execute(request))
...@@ -116,54 +115,43 @@ def request_dummy(context, number, wait_for_completion): ...@@ -116,54 +115,43 @@ def request_dummy(context, number, wait_for_completion):
@click.argument('input-root', nargs=1, type=click.Path(), required=True) @click.argument('input-root', nargs=1, type=click.Path(), required=True)
@click.argument('commands', nargs=-1, type=click.STRING, required=True) @click.argument('commands', nargs=-1, type=click.STRING, required=True)
@pass_context @pass_context
def command(context, input_root, commands, output_file, output_directory): def run_command(context, input_root, commands, output_file, output_directory):
stub = remote_execution_pb2_grpc.ExecutionStub(context.channel) stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
execute_command = remote_execution_pb2.Command()
for arg in commands:
execute_command.arguments.extend([arg])
output_executeables = [] output_executeables = []
for file, is_executeable in output_file: with upload(context.channel, instance=context.instance_name) as uploader:
execute_command.output_files.extend([file]) command = remote_execution_pb2.Command()
if is_executeable:
output_executeables.append(file)
command_digest = create_digest(execute_command.SerializeToString()) for arg in commands:
context.logger.info(command_digest) command.arguments.extend([arg])
# TODO: Check for missing blobs for file, is_executeable in output_file:
digest = None command.output_files.extend([file])
for _, digest in merkle_maker(input_root): if is_executeable:
pass output_executeables.append(file)
action = remote_execution_pb2.Action(command_digest=command_digest, command_digest = uploader.put_message(command, queue=True)
input_root_digest=digest,
do_not_cache=True)
action_digest = create_digest(action.SerializeToString()) context.logger.info('Sent command: {}'.format(command_digest))
context.logger.info("Sending execution request...") # TODO: Check for missing blobs
input_root_digest = uploader.upload_directory(input_root)
requests = [] context.logger.info('Sent input: {}'.format(input_root_digest))
requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
digest=command_digest, data=execute_command.SerializeToString()))
requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request( action = remote_execution_pb2.Action(command_digest=command_digest,
digest=action_digest, data=action.SerializeToString())) input_root_digest=input_root_digest,
do_not_cache=True)
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=context.instance_name, action_digest = uploader.put_message(action, queue=True)
requests=requests)
remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel).BatchUpdateBlobs(request) context.logger.info("Sent action: {}".format(action_digest))
request = remote_execution_pb2.ExecuteRequest(instance_name=context.instance_name, request = remote_execution_pb2.ExecuteRequest(instance_name=context.instance_name,
action_digest=action_digest, action_digest=action_digest,
skip_cache_lookup=True) skip_cache_lookup=True)
response = stub.Execute(request) response = stub.Execute(request)
stub = bytestream_pb2_grpc.ByteStreamStub(context.channel)
stream = None stream = None
for stream in response: for stream in response:
context.logger.info(stream) context.logger.info(stream)
...@@ -171,21 +159,16 @@ def command(context, input_root, commands, output_file, output_directory): ...@@ -171,21 +159,16 @@ def command(context, input_root, commands, output_file, output_directory):
execute_response = remote_execution_pb2.ExecuteResponse() execute_response = remote_execution_pb2.ExecuteResponse()
stream.response.Unpack(execute_response) stream.response.Unpack(execute_response)
for output_file_response in execute_response.result.output_files: with download(context.channel, instance=context.instance_name) as downloader:
path = os.path.join(output_directory, output_file_response.path)
if not os.path.exists(os.path.dirname(path)):
try: for output_file_response in execute_response.result.output_files:
os.makedirs(os.path.dirname(path)) path = os.path.join(output_directory, output_file_response.path)
except OSError as exc: if not os.path.exists(os.path.dirname(path)):
if exc.errno != errno.EEXIST: os.makedirs(os.path.dirname(path), exist_ok=True)
raise
with open(path, 'wb+') as f: downloader.download_file(output_file_response.digest, path)
write_fetch_blob(f, stub, output_file_response.digest, context.instance_name)
if output_file_response.path in output_executeables: if output_file_response.path in output_executeables:
st = os.stat(path) st = os.stat(path)
os.chmod(path, st.st_mode | stat.S_IXUSR) os.chmod(path, st.st_mode | stat.S_IXUSR)
...@@ -101,8 +101,7 @@ def lists(context): ...@@ -101,8 +101,7 @@ def lists(context):
@pass_context @pass_context
def wait(context, operation_name): def wait(context, operation_name):
stub = remote_execution_pb2_grpc.ExecutionStub(context.channel) stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
request = remote_execution_pb2.WaitExecutionRequest(instance_name=context.instance_name, request = remote_execution_pb2.WaitExecutionRequest(name=operation_name)
name=operation_name)
response = stub.WaitExecution(request) response = stub.WaitExecution(request)
......
server: server:
- !channel - !channel
port: 50051 port: 50052
insecure_mode: true insecure_mode: true
# credentials: # credentials:
# tls-server-key: null # tls-server-key: null
......
...@@ -35,6 +35,9 @@ from ..cli import Context ...@@ -35,6 +35,9 @@ from ..cli import Context
class YamlFactory(yaml.YAMLObject): class YamlFactory(yaml.YAMLObject):
""" Base class for contructing maps or scalars from tags.
"""
@classmethod @classmethod
def from_yaml(cls, loader, node): def from_yaml(cls, loader, node):
if isinstance(node, yaml.ScalarNode): if isinstance(node, yaml.ScalarNode):
...@@ -47,6 +50,21 @@ class YamlFactory(yaml.YAMLObject): ...@@ -47,6 +50,21 @@ class YamlFactory(yaml.YAMLObject):
class Channel(YamlFactory): class Channel(YamlFactory):
"""Creates a GRPC channel.
The :class:`Channel` class returns a `grpc.Channel` and is generated from the tag ``!channel``.
Creates either a secure or insecure channel.
Args:
port (int): A port for the channel.
insecure_mode (bool): If ``True``, generates an insecure channel, even if there are
credentials. Defaults to ``True``.
credentials (dict, optional): A dictionary in the form::
tls-server-key: /path/to/server-key
tls-server-cert: /path/to/server-cert
tls-client-certs: /path/to/client-certs
"""
yaml_tag = u'!channel' yaml_tag = u'!channel'
...@@ -69,6 +87,13 @@ class Channel(YamlFactory): ...@@ -69,6 +87,13 @@ class Channel(YamlFactory):
class ExpandPath(YamlFactory): class ExpandPath(YamlFactory):
"""Returns a string of the user's path after expansion.
The :class:`ExpandPath` class returns a string and is generated from the tag ``!expand-path``.
Args:
path (str): Can be used with strings such as: ``~/dir/to/something`` or ``$HOME/certs``
"""
yaml_tag = u'!expand-path' yaml_tag = u'!expand-path'
...@@ -79,14 +104,30 @@ class ExpandPath(YamlFactory): ...@@ -79,14 +104,30 @@ class ExpandPath(YamlFactory):
class Disk(YamlFactory): class Disk(YamlFactory):
"""Generates :class:`buildgrid.server.cas.storage.disk.DiskStorage` using the tag ``!disk-storage``.
Args:
path (str): Path to directory to storage.
"""
yaml_tag = u'!disk-storage' yaml_tag = u'!disk-storage'
def __new__(cls, path): def __new__(cls, path):
"""Creates a new disk
Args:
path (str): Some path
"""
return DiskStorage(path) return DiskStorage(path)
class LRU(YamlFactory): class LRU(YamlFactory):
"""Generates :class:`buildgrid.server.cas.storage.lru_memory_cache.LRUMemoryCache` using the tag ``!lru-storage``.
Args:
size (int): Size e.g ``10kb``. Size parsed with :meth:`buildgrid._app.settings.parser._parse_size`.
"""
yaml_tag = u'!lru-storage' yaml_tag = u'!lru-storage'
...@@ -95,6 +136,12 @@ class LRU(YamlFactory): ...@@ -95,6 +136,12 @@ class LRU(YamlFactory):
class S3(YamlFactory): class S3(YamlFactory):
"""Generates :class:`buildgrid.server.cas.storage.s3.S3Storage` using the tag ``!s3-storage``.
Args:
bucket (str): Name of bucket
endpoint (str): URL of endpoint.
"""
yaml_tag = u'!s3-storage' yaml_tag = u'!s3-storage'
...@@ -103,6 +150,18 @@ class S3(YamlFactory): ...@@ -103,6 +150,18 @@ class S3(YamlFactory):
class Remote(YamlFactory): class Remote(YamlFactory):
"""Generates :class:`buildgrid.server.cas.storage.remote.RemoteStorage`
using the tag ``!remote-storage``.
Args:
url (str): URL to remote storage. If used with ``https``, needs credentials.
instance_name (str): Instance of the remote to connect to.
credentials (dict, optional): A dictionary in the form::
tls-client-key: /path/to/client-key
tls-client-cert: /path/to/client-cert
tls-server-cert: /path/to/server-cert
"""
yaml_tag = u'!remote-storage' yaml_tag = u'!remote-storage'
...@@ -144,6 +203,18 @@ class Remote(YamlFactory): ...@@ -144,6 +203,18 @@ class Remote(YamlFactory):
class WithCache(YamlFactory): class WithCache(YamlFactory):
"""Generates :class:`buildgrid.server.cas.storage.with_cache.WithCacheStorage`
using the tag ``!with-cache-storage``.
Args:
url (str): URL to remote storage. If used with ``https``, needs credentials.
instance_name (str): Instance of the remote to connect to.
credentials (dict, optional): A dictionary in the form::
tls-client-key: /path/to/certs
tls-client-cert: /path/to/certs
tls-server-cert: /path/to/certs
"""
yaml_tag = u'!with-cache-storage' yaml_tag = u'!with-cache-storage'
...@@ -152,6 +223,13 @@ class WithCache(YamlFactory): ...@@ -152,6 +223,13 @@ class WithCache(YamlFactory):
class Execution(YamlFactory): class Execution(YamlFactory):
"""Generates :class:`buildgrid.server.execution.service.ExecutionService`
using the tag ``!execution``.
Args:
storage(:class:`buildgrid.server.cas.storage.storage_abc.StorageABC`): Instance of storage to use.
action_cache(:class:`Action`): Instance of action cache to use.
"""
yaml_tag = u'!execution' yaml_tag = u'!execution'
...@@ -160,6 +238,14 @@ class Execution(YamlFactory): ...@@ -160,6 +238,14 @@ class Execution(YamlFactory):
class Action(YamlFactory): class Action(YamlFactory):
"""Generates :class:`buildgrid.server.actioncache.service.ActionCacheService`
using the tag ``!action-cache``.
Args:
storage(:class:`buildgrid.server.cas.storage.storage_abc.StorageABC`): Instance of storage to use.
max_cached_refs(int): Max number of cached actions.
allow_updates(bool): Allow updates pushed to CAS. Defaults to ``True``.
"""
yaml_tag = u'!action-cache' yaml_tag = u'!action-cache'
...@@ -168,6 +254,14 @@ class Action(YamlFactory): ...@@ -168,6 +254,14 @@ class Action(YamlFactory):
class Reference(YamlFactory): class Reference(YamlFactory):
"""Generates :class:`buildgrid.server.referencestorage.service.ReferenceStorageService`
using the tag ``!reference-cache``.
Args:
storage(:class:`buildgrid.server.cas.storage.storage_abc.StorageABC`): Instance of storage to use.
max_cached_refs(int): Max number of cached actions.
allow_updates(bool): Allow updates pushed to CAS. Defauled to ``True``.
"""
yaml_tag = u'!reference-cache' yaml_tag = u'!reference-cache'
...@@ -176,6 +270,12 @@ class Reference(YamlFactory): ...@@ -176,6 +270,12 @@ class Reference(YamlFactory):
class CAS(YamlFactory): class CAS(YamlFactory):
"""Generates :class:`buildgrid.server.cas.service.ContentAddressableStorageService`
using the tag ``!cas``.
Args:
storage(:class:`buildgrid.server.cas.storage.storage_abc.StorageABC`): Instance of storage to use.
"""
yaml_tag = u'!cas' yaml_tag = u'!cas'
...@@ -184,6 +284,12 @@ class CAS(YamlFactory): ...@@ -184,6 +284,12 @@ class CAS(YamlFactory):
class ByteStream(YamlFactory): class ByteStream(YamlFactory):
"""Generates :class:`buildgrid.server.cas.service.ByteStreamService`
using the tag ``!bytestream``.
Args:
storage(:class:`buildgrid.server.cas.storage.storage_abc.StorageABC`): Instance of storage to use.
"""
yaml_tag = u'!bytestream' yaml_tag = u'!bytestream'
......
...@@ -50,3 +50,36 @@ class ServerError(BgdError): ...@@ -50,3 +50,36 @@ class ServerError(BgdError):
class BotError(BgdError): class BotError(BgdError):
def __init__(self, message, detail=None, reason=None): def __init__(self, message, detail=None, reason=None):
super().__init__(message, detail=detail, domain=ErrorDomain.BOT, reason=reason) super().__init__(message, detail=detail, domain=ErrorDomain.BOT, reason=reason)
class InvalidArgumentError(BgdError):
"""A bad argument was passed, such as a name which doesn't exist."""
def __init__(self, message, detail=None, reason=None):
super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
class NotFoundError(BgdError):
"""Requested resource not found."""
def __init__(self, message, detail=None, reason=None):
super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
class OutOfSyncError(BgdError):
"""The worker is out of sync with the server, such as having a differing
number of leases."""
def __init__(self, message, detail=None, reason=None):
super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
class OutOfRangeError(BgdError):
"""ByteStream service read data out of range."""
def __init__(self, message, detail=None, reason=None):
super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
class FailedPreconditionError(BgdError):
"""One or more errors occurred in setting up the action requested, such as
a missing input or command or no worker being available. The client may be
able to fix the errors and retry."""
def __init__(self, message, detail=None, reason=None):
super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
...@@ -154,15 +154,15 @@ class BotSession: ...@@ -154,15 +154,15 @@ class BotSession:
lease = await loop.run_in_executor(None, self._work, self._context, lease) lease = await loop.run_in_executor(None, self._work, self._context, lease)
except grpc.RpcError as e: except grpc.RpcError as e:
self.logger.error("Connection error thrown: [{}]".format(e)) self.logger.error("RPC error thrown: [{}]".format(e))
lease.status.code = e.code() lease.status.CopyFrom(e.code())
except BotError as e: except BotError as e:
self.logger.error("Internal bot error thrown: [{}]".format(e)) self.logger.error("Internal bot error thrown: [{}]".format(e))
lease.status.code = code_pb2.INTERNAL lease.status.code = code_pb2.INTERNAL
except Exception as e: except Exception as e:
self.logger.error("Connection error thrown: [{}]".format(e)) self.logger.error("Exception thrown: [{}]".format(e))
lease.status.code = code_pb2.INTERNAL lease.status.code = code_pb2.INTERNAL
self.logger.debug("Work complete: [{}]".format(lease.id)) self.logger.debug("Work complete: [{}]".format(lease.id))
......
This diff is collapsed.
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .._exceptions import BgdError, ErrorDomain
class InvalidArgumentError(BgdError):
"""A bad argument was passed, such as a name which doesn't exist.
"""
def __init__(self, message, detail=None, reason=None):
super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
class NotFoundError(BgdError):
"""Requested resource not found.
"""
def __init__(self, message, detail=None, reason=None):
super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
class OutofSyncError(BgdError):
"""The worker is out of sync with the server, such as having a differing number of leases.
"""
def __init__(self, message, detail=None, reason=None):
super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
class OutOfRangeError(BgdError):
""" ByteStream service read data out of range
"""
def __init__(self, message, detail=None, reason=None):
super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
class FailedPreconditionError(BgdError):
""" One or more errors occurred in setting up the action requested, such as a missing input
or command or no worker being available. The client may be able to fix the errors and retry.
"""
def __init__(self, message, detail=None, reason=None):
super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
...@@ -24,11 +24,10 @@ import logging ...@@ -24,11 +24,10 @@ import logging
import grpc import grpc
from buildgrid._exceptions import InvalidArgumentError, NotFoundError
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
from .._exceptions import InvalidArgumentError, NotFoundError
class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer): class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer):
...@@ -53,7 +52,7 @@ class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer): ...@@ -53,7 +52,7 @@ class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer):
context.set_code(grpc.StatusCode.INVALID_ARGUMENT) context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
except NotFoundError as e: except NotFoundError as e:
self.logger.error(e) self.logger.debug(e)
context.set_code(grpc.StatusCode.NOT_FOUND) context.set_code(grpc.StatusCode.NOT_FOUND)
return remote_execution_pb2.ActionResult() return remote_execution_pb2.ActionResult()
......
...@@ -23,7 +23,8 @@ Instance of the Remote Workers interface. ...@@ -23,7 +23,8 @@ Instance of the Remote Workers interface.
import logging import logging
import uuid import uuid
from .._exceptions import InvalidArgumentError, OutofSyncError from buildgrid._exceptions import InvalidArgumentError, OutOfSyncError
from ..job import LeaseState from ..job import LeaseState
...@@ -65,7 +66,9 @@ class BotsInterface: ...@@ -65,7 +66,9 @@ class BotsInterface:
self._bot_sessions[name] = bot_session self._bot_sessions[name] = bot_session
self.logger.info("Created bot session name=[{}] with bot_id=[{}]".format(name, bot_id)) self.logger.info("Created bot session name=[{}] with bot_id=[{}]".format(name, bot_id))
for lease in self._scheduler.create_leases(): # For now, one lease at a time.
lease = self._scheduler.create_lease()
if lease:
bot_session.leases.extend([lease]) bot_session.leases.extend([lease])
return bot_session return bot_session
...@@ -82,8 +85,11 @@ class BotsInterface: ...@@ -82,8 +85,11 @@ class BotsInterface:
del bot_session.leases[:] del bot_session.leases[:]
bot_session.leases.extend(leases) bot_session.leases.extend(leases)
for lease in self._scheduler.create_leases(): # For now, one lease at a time
bot_session.leases.extend([lease]) if not bot_session.leases:
lease = self._scheduler.create_lease()
if lease:
bot_session.leases.extend([lease])
self._bot_sessions[name] = bot_session self._bot_sessions[name] = bot_session
return bot_session return bot_session
...@@ -108,7 +114,7 @@ class BotsInterface: ...@@ -108,7 +114,7 @@ class BotsInterface:
# TODO: Lease was rejected # TODO: Lease was rejected
raise NotImplementedError("'Not Accepted' is unsupported") raise NotImplementedError("'Not Accepted' is unsupported")
else: else:
raise OutofSyncError("Server lease: [{}]. Client lease: [{}]".format(server_lease, client_lease)) raise OutOfSyncError("Server lease: [{}]. Client lease: [{}]".format(server_lease, client_lease))
elif server_state == LeaseState.ACTIVE: elif server_state == LeaseState.ACTIVE:
...@@ -121,17 +127,17 @@ class BotsInterface: ...@@ -121,17 +127,17 @@ class BotsInterface:
return None return None
else: else:
raise OutofSyncError("Server lease: [{}]. Client lease: [{}]".format(server_lease, client_lease)) raise OutOfSyncError("Server lease: [{}]. Client lease: [{}]".format(server_lease, client_lease))
elif server_state == LeaseState.COMPLETED: elif server_state == LeaseState.COMPLETED:
raise OutofSyncError("Server lease: [{}]. Client lease: [{}]".format(server_lease, client_lease)) raise OutOfSyncError("Server lease: [{}]. Client lease: [{}]".format(server_lease, client_lease))
elif server_state == LeaseState.CANCELLED: elif server_state == LeaseState.CANCELLED:
raise NotImplementedError("Cancelled states not supported yet") raise NotImplementedError("Cancelled states not supported yet")
else: else:
# Sould never get here # Sould never get here
raise OutofSyncError("State now allowed: {}".format(server_state)) raise OutOfSyncError("State now allowed: {}".format(server_state))
return client_lease return client_lease
......
...@@ -25,11 +25,10 @@ import grpc ...@@ -25,11 +25,10 @@ import grpc
from google.protobuf.empty_pb2 import Empty from google.protobuf.empty_pb2 import Empty
from buildgrid._exceptions import InvalidArgumentError, OutOfSyncError
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2 from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
from .._exceptions import InvalidArgumentError, OutofSyncError
class BotsService(bots_pb2_grpc.BotsServicer): class BotsService(bots_pb2_grpc.BotsServicer):
...@@ -73,7 +72,7 @@ class BotsService(bots_pb2_grpc.BotsServicer): ...@@ -73,7 +72,7 @@ class BotsService(bots_pb2_grpc.BotsServicer):
context.set_details(str(e)) context.set_details(str(e))
context.set_code(grpc.StatusCode.INVALID_ARGUMENT) context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
except OutofSyncError as e: except OutOfSyncError as e:
self.logger.error(e) self.logger.error(e)
context.set_details(str(e)) context.set_details(str(e))
context.set_code(grpc.StatusCode.DATA_LOSS) context.set_code(grpc.StatusCode.DATA_LOSS)
......
...@@ -19,11 +19,10 @@ Storage Instances ...@@ -19,11 +19,10 @@ Storage Instances
Instances of CAS and ByteStream Instances of CAS and ByteStream
""" """
from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
from buildgrid._protos.google.bytestream import bytestream_pb2 from buildgrid._protos.google.bytestream import bytestream_pb2
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2 from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
from buildgrid.settings import HASH
from .._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
from ...settings import HASH
class ContentAddressableStorageInstance: class ContentAddressableStorageInstance:
......
...@@ -26,12 +26,11 @@ import logging ...@@ -26,12 +26,11 @@ import logging
import grpc import grpc
from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
from .._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressableStorageServicer): class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
...@@ -47,8 +46,11 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa ...@@ -47,8 +46,11 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa
def FindMissingBlobs(self, request, context): def FindMissingBlobs(self, request, context):
try: try:
self.logger.debug("FindMissingBlobs request: [{}]".format(request))
instance = self._get_instance(request.instance_name) instance = self._get_instance(request.instance_name)
return instance.find_missing_blobs(request.blob_digests) response = instance.find_missing_blobs(request.blob_digests)
self.logger.debug("FindMissingBlobs response: [{}]".format(response))
return response
except InvalidArgumentError as e: except InvalidArgumentError as e:
self.logger.error(e) self.logger.error(e)
...@@ -59,8 +61,11 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa ...@@ -59,8 +61,11 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa
def BatchUpdateBlobs(self, request, context): def BatchUpdateBlobs(self, request, context):
try: try:
self.logger.debug("BatchUpdateBlobs request: [{}]".format(request))
instance = self._get_instance(request.instance_name) instance = self._get_instance(request.instance_name)
return instance.batch_update_blobs(request.requests) response = instance.batch_update_blobs(request.requests)
self.logger.debug("FindMissingBlobs response: [{}]".format(response))
return response
except InvalidArgumentError as e: except InvalidArgumentError as e:
self.logger.error(e) self.logger.error(e)
...@@ -69,6 +74,18 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa ...@@ -69,6 +74,18 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa
return remote_execution_pb2.BatchReadBlobsResponse() return remote_execution_pb2.BatchReadBlobsResponse()
def BatchReadBlobs(self, request, context):
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
return remote_execution_pb2.BatchReadBlobsResponse()
def GetTree(self, request, context):
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
return iter([remote_execution_pb2.GetTreeResponse()])
def _get_instance(self, instance_name): def _get_instance(self, instance_name):
try: try:
return self._instances[instance_name] return self._instances[instance_name]
...@@ -91,6 +108,7 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer): ...@@ -91,6 +108,7 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
def Read(self, request, context): def Read(self, request, context):
try: try:
self.logger.debug("Read request: [{}]".format(request))
path = request.resource_name.split("/") path = request.resource_name.split("/")
instance_name = path[0] instance_name = path[0]
...@@ -130,10 +148,13 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer): ...@@ -130,10 +148,13 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
context.set_code(grpc.StatusCode.OUT_OF_RANGE) context.set_code(grpc.StatusCode.OUT_OF_RANGE)
yield bytestream_pb2.ReadResponse() yield bytestream_pb2.ReadResponse()
self.logger.debug("Read finished.")
def Write(self, requests, context): def Write(self, requests, context):
try: try:
requests, request_probe = tee(requests, 2) requests, request_probe = tee(requests, 2)
first_request = next(request_probe) first_request = next(request_probe)
self.logger.debug("First write request: [{}]".format(first_request))
path = first_request.resource_name.split("/") path = first_request.resource_name.split("/")
...@@ -153,7 +174,9 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer): ...@@ -153,7 +174,9 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
raise InvalidArgumentError("Invalid resource name: [{}]".format(first_request.resource_name)) raise InvalidArgumentError("Invalid resource name: [{}]".format(first_request.resource_name))
instance = self._get_instance(instance_name) instance = self._get_instance(instance_name)
return instance.write(requests) response = instance.write(requests)
self.logger.debug("Write response: [{}]".format(response))
return response
except NotImplementedError as e: except NotImplementedError as e:
self.logger.error(e) self.logger.error(e)
...@@ -172,6 +195,12 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer): ...@@ -172,6 +195,12 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
return bytestream_pb2.WriteResponse() return bytestream_pb2.WriteResponse()
def QueryWriteStatus(self, request, context):
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
return bytestream_pb2.QueryWriteStatusResponse()
def _get_instance(self, instance_name): def _get_instance(self, instance_name):
try: try:
return self._instances[instance_name] return self._instances[instance_name]
......
...@@ -21,7 +21,6 @@ A CAS storage provider that stores files as blobs on disk. ...@@ -21,7 +21,6 @@ A CAS storage provider that stores files as blobs on disk.
""" """
import os import os
import pathlib
import tempfile import tempfile
from .storage_abc import StorageABC from .storage_abc import StorageABC
...@@ -30,28 +29,41 @@ from .storage_abc import StorageABC ...@@ -30,28 +29,41 @@ from .storage_abc import StorageABC
class DiskStorage(StorageABC): class DiskStorage(StorageABC):
def __init__(self, path): def __init__(self, path):
self._path = pathlib.Path(path) if not os.path.isabs(path):
os.makedirs(str(self._path / "temp"), exist_ok=True) self.__root_path = os.path.abspath(path)
else:
self.__root_path = path
self.__cas_path = os.path.join(self.__root_path, 'cas')
self.objects_path = os.path.join(self.__cas_path, 'objects')
self.temp_path = os.path.join(self.__root_path, 'tmp')
os.makedirs(self.objects_path, exist_ok=True)
os.makedirs(self.temp_path, exist_ok=True)
def has_blob(self, digest): def has_blob(self, digest):
return (self._path / (digest.hash + "_" + str(digest.size_bytes))).exists() return os.path.exists(self._get_object_path(digest))
def get_blob(self, digest): def get_blob(self, digest):
try: try:
return (self._path / (digest.hash + "_" + str(digest.size_bytes))).open('rb') return open(self._get_object_path(digest), 'rb')
except FileNotFoundError: except FileNotFoundError:
return None return None
def begin_write(self, _digest): def begin_write(self, digest):
return tempfile.NamedTemporaryFile("wb", dir=str(self._path / "temp")) return tempfile.NamedTemporaryFile("wb", dir=self.temp_path)
def commit_write(self, digest, write_session): def commit_write(self, digest, write_session):
# Atomically move the temporary file into place. object_path = self._get_object_path(digest)
path = self._path / (digest.hash + "_" + str(digest.size_bytes))
os.replace(write_session.name, str(path))
try: try:
write_session.close() os.makedirs(os.path.dirname(object_path), exist_ok=True)
except FileNotFoundError: os.link(write_session.name, object_path)
# We moved the temporary file to a new location, so when Python except FileExistsError:
# tries to delete its old location, it'll fail. # Object is already there!
pass pass
write_session.close()
def _get_object_path(self, digest):
return os.path.join(self.objects_path, digest.hash[:2], digest.hash[2:])
...@@ -23,11 +23,11 @@ Forwwards storage requests to a remote storage. ...@@ -23,11 +23,11 @@ Forwwards storage requests to a remote storage.
import io import io
import logging import logging
import grpc from buildgrid.client.cas import download, upload
from buildgrid.utils import gen_fetch_blob, gen_write_request_blob
from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid._protos.google.rpc import code_pb2
from buildgrid._protos.google.rpc import status_pb2
from buildgrid.settings import HASH
from .storage_abc import StorageABC from .storage_abc import StorageABC
...@@ -36,8 +36,10 @@ class RemoteStorage(StorageABC): ...@@ -36,8 +36,10 @@ class RemoteStorage(StorageABC):
def __init__(self, channel, instance_name): def __init__(self, channel, instance_name):
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self._instance_name = instance_name
self._stub_bs = bytestream_pb2_grpc.ByteStreamStub(channel) self.instance_name = instance_name
self.channel = channel
self._stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(channel) self._stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(channel)
def has_blob(self, digest): def has_blob(self, digest):
...@@ -46,41 +48,22 @@ class RemoteStorage(StorageABC): ...@@ -46,41 +48,22 @@ class RemoteStorage(StorageABC):
return False return False
def get_blob(self, digest): def get_blob(self, digest):
try: with download(self.channel, instance=self.instance_name) as downloader:
fetched_data = io.BytesIO() blob = downloader.get_blob(digest)
length = 0 if blob is not None:
return io.BytesIO(blob)
for data in gen_fetch_blob(self._stub_bs, digest, self._instance_name):
length += fetched_data.write(data)
if length:
assert digest.size_bytes == length
fetched_data.seek(0)
return fetched_data
else: else:
return None return None
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.NOT_FOUND:
pass
else:
self.logger.error(e.details())
raise
return None
def begin_write(self, digest): def begin_write(self, digest):
return io.BytesIO(digest.SerializeToString()) return io.BytesIO()
def commit_write(self, digest, write_session): def commit_write(self, digest, write_session):
write_session.seek(0) with upload(self.channel, instance=self.instance_name) as uploader:
uploader.put_blob(write_session.getvalue())
for request in gen_write_request_blob(write_session, digest, self._instance_name):
self._stub_bs.Write(request)
def missing_blobs(self, blobs): def missing_blobs(self, blobs):
request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=self._instance_name) request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=self.instance_name)
for blob in blobs: for blob in blobs:
request_digest = request.blob_digests.add() request_digest = request.blob_digests.add()
...@@ -92,19 +75,15 @@ class RemoteStorage(StorageABC): ...@@ -92,19 +75,15 @@ class RemoteStorage(StorageABC):
return [x for x in response.missing_blob_digests] return [x for x in response.missing_blob_digests]
def bulk_update_blobs(self, blobs): def bulk_update_blobs(self, blobs):
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=self._instance_name) sent_digests = []
with upload(self.channel, instance=self.instance_name) as uploader:
for digest, data in blobs: for digest, blob in blobs:
reqs = request.requests.add() if len(blob) != digest.size_bytes or HASH(blob).hexdigest() != digest.hash:
reqs.digest.CopyFrom(digest) sent_digests.append(remote_execution_pb2.Digest())
reqs.data = data else:
sent_digests.append(uploader.put_blob(blob, digest=digest, queue=True))
response = self._stub_cas.BatchUpdateBlobs(request)
assert len(sent_digests) == len(blobs)
responses = response.responses
return [status_pb2.Status(code=code_pb2.OK) if d.ByteSize() > 0
# Check everything was sent back, even if order changed else status_pb2.Status(code=code_pb2.UNKNOWN) for d in sent_digests]
assert ([x.digest for x in request.requests].sort(key=lambda x: x.hash)) == \
([x.digest for x in responses].sort(key=lambda x: x.hash))
return [x.status for x in responses]
...@@ -21,10 +21,10 @@ An instance of the Remote Execution Service. ...@@ -21,10 +21,10 @@ An instance of the Remote Execution Service.
import logging import logging
from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
from ..job import Job from ..job import Job
from .._exceptions import InvalidArgumentError, FailedPreconditionError
class ExecutionInstance: class ExecutionInstance:
......