Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • edbaunton/buildgrid
  • BuildGrid/buildgrid
  • bloomberg/buildgrid
  • devcurmudgeon/buildgrid
  • mhadjimichael/buildgrid
  • jmacarthur/buildgrid
  • rkothur/buildgrid
  • valentindavid/buildgrid
  • jjardon/buildgrid
  • RichKen/buildgrid
  • jbonney/buildgrid
  • onsha_alexander/buildgrid
  • santigl/buildgrid
  • mostynb/buildgrid
  • hoffbrinkle/buildgrid
  • Malinskiy/buildgrid
  • coldtom/buildgrid
  • azeemb_a/buildgrid
  • pointswaves/buildgrid
  • BenjaminSchubert/buildgrid
  • michaellee8/buildgrid
  • anil-anil/buildgrid
  • seanborg/buildgrid
  • jdelong12/buildgrid
  • jclay/buildgrid
  • bweston92/buildgrid
  • zchen723/buildgrid
  • cpratt34/buildgrid
  • armbiant/apache-buildgrid
  • armbiant/android-buildgrid
  • itsme300/buildgrid
  • sbairoliya/buildgrid
32 results
Show changes
Commits on Source (3)
Showing
with 289 additions and 292 deletions
......@@ -29,7 +29,7 @@ def work_temp_directory(context, lease):
then uploads results back to CAS
"""
instance_name = context.instance_name
parent = context.parent
stub_bytestream = bytestream_pb2_grpc.ByteStreamStub(context.channel)
action_digest = remote_execution_pb2.Digest()
......@@ -37,12 +37,12 @@ def work_temp_directory(context, lease):
action = remote_execution_pb2.Action()
action = parse_to_pb2_from_fetch(action, stub_bytestream, action_digest, instance_name)
action = parse_to_pb2_from_fetch(action, stub_bytestream, action_digest, parent)
with tempfile.TemporaryDirectory() as temp_dir:
command = remote_execution_pb2.Command()
command = parse_to_pb2_from_fetch(command, stub_bytestream, action.command_digest, instance_name)
command = parse_to_pb2_from_fetch(command, stub_bytestream, action.command_digest, parent)
arguments = "cd {} &&".format(temp_dir)
......@@ -51,7 +51,7 @@ def work_temp_directory(context, lease):
context.logger.info(arguments)
write_fetch_directory(temp_dir, stub_bytestream, action.input_root_digest, instance_name)
write_fetch_directory(temp_dir, stub_bytestream, action.input_root_digest, parent)
proc = subprocess.Popen(arguments,
shell=True,
......@@ -75,7 +75,7 @@ def work_temp_directory(context, lease):
requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
digest=digest, data=chunk))
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=parent,
requests=requests)
stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
......
......@@ -35,7 +35,7 @@ from ..cli import pass_context
@click.group(name='bot', short_help="Create and register bot clients.")
@click.option('--parent', type=click.STRING, default='bgd_test', show_default=True,
@click.option('--parent', type=click.STRING, default='main', show_default=True,
help="Targeted farm resource.")
@click.option('--port', type=click.INT, default='50051', show_default=True,
help="Remote server's port number.")
......@@ -49,6 +49,7 @@ def cli(context, host, port, parent):
context.logger = logging.getLogger(__name__)
context.logger.info("Starting on port {}".format(port))
context.channel = channel
context.parent = parent
worker = Worker()
worker.add_device(Device())
......@@ -75,14 +76,11 @@ def run_dummy(context):
@cli.command('temp-directory', short_help="Runs commands in temp directory and uploads results.")
@click.option('--instance-name', type=click.STRING, default='testing', show_default=True,
help="Targeted farm instance name.")
@pass_context
def run_temp_directory(context, instance_name):
def run_temp_directory(context):
""" Downloads files and command from CAS and runs
in a temp directory, uploading result back to CAS
"""
context.instance_name = instance_name
try:
b = bot.Bot(context.bot_session)
b.session(temp_directory.work_temp_directory,
......
......@@ -31,25 +31,26 @@ from ..cli import pass_context
@click.group(name='cas', short_help="Interact with the CAS server.")
@click.option('--instance-name', type=click.STRING, default='main', show_default=True,
help="Targeted farm instance name.")
@click.option('--port', type=click.INT, default='50051', show_default=True,
help="Remote server's port number.")
@click.option('--host', type=click.STRING, default='localhost', show_default=True,
help="Remote server's hostname.")
@pass_context
def cli(context, host, port):
def cli(context, instance_name, host, port):
context.logger = logging.getLogger(__name__)
context.logger.info("Starting on port {}".format(port))
context.instance_name = instance_name
context.channel = grpc.insecure_channel('{}:{}'.format(host, port))
context.port = port
@cli.command('upload-files', short_help="Upload files to the CAS server.")
@click.option('--instance-name', type=click.STRING, default='testing', show_default=True,
help="Targeted farm instance name.")
@click.argument('files', nargs=-1, type=click.File('rb'), required=True)
@pass_context
def upload_files(context, files, instance_name):
def upload_files(context, files):
stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
requests = []
......@@ -58,7 +59,7 @@ def upload_files(context, files, instance_name):
requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
digest=create_digest(chunk), data=chunk))
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=context.instance_name,
requests=requests)
context.logger.info("Sending: {}".format(request))
......@@ -67,11 +68,9 @@ def upload_files(context, files, instance_name):
@cli.command('upload-dir', short_help="Upload a directory to the CAS server.")
@click.option('--instance-name', type=click.STRING, default='testing', show_default=True,
help="Targeted farm instance name.")
@click.argument('directory', nargs=1, type=click.Path(), required=True)
@pass_context
def upload_dir(context, directory, instance_name):
def upload_dir(context, directory):
context.logger.info("Uploading directory to cas")
stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
......@@ -81,7 +80,7 @@ def upload_dir(context, directory, instance_name):
requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
digest=file_digest, data=chunk))
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=context.instance_name,
requests=requests)
context.logger.info("Request:\n{}".format(request))
......
......@@ -36,34 +36,35 @@ from ..cli import pass_context
@click.group(name='execute', short_help="Execute simple operations.")
@click.option('--instance-name', type=click.STRING, default='main',
show_default=True, help="Targeted farm instance name.")
@click.option('--port', type=click.INT, default='50051', show_default=True,
help="Remote server's port number.")
@click.option('--host', type=click.STRING, default='localhost', show_default=True,
help="Remote server's hostname.")
@pass_context
def cli(context, host, port):
def cli(context, instance_name, host, port):
context.logger = logging.getLogger(__name__)
context.logger.info("Starting on port {}".format(port))
context.instance_name = instance_name
context.channel = grpc.insecure_channel('{}:{}'.format(host, port))
context.port = port
@cli.command('request-dummy', short_help="Send a dummy action.")
@click.option('--instance-name', type=click.STRING, default='testing', show_default=True,
help="Targeted farm instance name.")
@click.option('--number', type=click.INT, default=1, show_default=True,
help="Number of request to send.")
@click.option('--wait-for-completion', is_flag=True,
help="Stream updates until jobs are completed.")
@pass_context
def request_dummy(context, number, instance_name, wait_for_completion):
def request_dummy(context, number, wait_for_completion):
action_digest = remote_execution_pb2.Digest()
context.logger.info("Sending execution request...")
stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
request = remote_execution_pb2.ExecuteRequest(instance_name=instance_name,
request = remote_execution_pb2.ExecuteRequest(instance_name=context.instance_name,
action_digest=action_digest,
skip_cache_lookup=True)
......@@ -98,7 +99,7 @@ def list_operations(context):
context.logger.info("Getting list of operations")
stub = operations_pb2_grpc.OperationsStub(context.channel)
request = operations_pb2.ListOperationsRequest()
request = operations_pb2.ListOperationsRequest(name=context.instance_name)
response = stub.ListOperations(request)
......@@ -115,7 +116,8 @@ def list_operations(context):
@pass_context
def wait_execution(context, operation_name):
stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
request = remote_execution_pb2.WaitExecutionRequest(name=operation_name)
request = remote_execution_pb2.WaitExecutionRequest(instance_name=context.instance_name,
name=operation_name)
response = stub.WaitExecution(request)
......@@ -124,8 +126,6 @@ def wait_execution(context, operation_name):
@cli.command('command', short_help="Send a command to be executed.")
@click.option('--instance-name', type=click.STRING, default='testing', show_default=True,
help="Targeted farm instance name.")
@click.option('--output-file', nargs=2, type=(click.STRING, click.BOOL), multiple=True,
help="Tuple of expected output file and is-executeable flag.")
@click.option('--output-directory', default='testing', show_default=True,
......@@ -133,7 +133,7 @@ def wait_execution(context, operation_name):
@click.argument('input-root', nargs=1, type=click.Path(), required=True)
@click.argument('commands', nargs=-1, type=click.STRING, required=True)
@pass_context
def command(context, input_root, commands, output_file, output_directory, instance_name):
def command(context, input_root, commands, output_file, output_directory):
stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
execute_command = remote_execution_pb2.Command()
......@@ -170,11 +170,11 @@ def command(context, input_root, commands, output_file, output_directory, instan
requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
digest=action_digest, data=action.SerializeToString()))
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=context.instance_name,
requests=requests)
remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel).BatchUpdateBlobs(request)
request = remote_execution_pb2.ExecuteRequest(instance_name=instance_name,
request = remote_execution_pb2.ExecuteRequest(instance_name=context.instance_name,
action_digest=action_digest,
skip_cache_lookup=True)
response = stub.Execute(request)
......@@ -201,7 +201,7 @@ def command(context, input_root, commands, output_file, output_directory, instan
raise
with open(path, 'wb+') as f:
write_fetch_blob(f, stub, output_file_response.digest, instance_name)
write_fetch_blob(f, stub, output_file_response.digest, context.instance_name)
if output_file_response.path in output_executeables:
st = os.stat(path)
......
......@@ -25,7 +25,7 @@ import logging
import click
from buildgrid.server import build_grid_server
from buildgrid.server import buildgrid_server
from buildgrid.server.cas.storage.disk import DiskStorage
from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
from buildgrid.server.cas.storage.s3 import S3Storage
......@@ -45,6 +45,7 @@ def cli(context):
@cli.command('start', short_help="Setup a new server instance.")
@click.argument('instances', nargs=-1, type=click.STRING)
@click.option('--port', type=click.INT, default='50051', show_default=True,
help="The port number to be listened.")
@click.option('--max-cached-actions', type=click.INT, default=50, show_default=True,
......@@ -67,7 +68,9 @@ def cli(context):
@click.option('--cas-disk-directory', type=click.Path(file_okay=False, dir_okay=True, writable=True),
help="For --cas=disk, the folder to store CAS blobs in.")
@pass_context
def start(context, port, max_cached_actions, allow_uar, cas, **cas_args):
def start(context, instances, port, max_cached_actions, allow_uar, cas, **cas_args):
""" Starts a BuildGrid server.
"""
context.logger.info("Starting on port {}".format(port))
cas_storage = _make_cas_storage(context, cas, cas_args)
......@@ -79,9 +82,13 @@ def start(context, port, max_cached_actions, allow_uar, cas, **cas_args):
else:
action_cache = ActionCache(cas_storage, max_cached_actions, allow_uar)
server = build_grid_server.BuildGridServer(port,
cas_storage=cas_storage,
action_cache=action_cache)
if instances is None:
instances = ['main']
server = buildgrid_server.BuildGridServer(port,
instances,
cas_storage=cas_storage,
action_cache=action_cache)
loop = asyncio.get_event_loop()
try:
server.start()
......
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
ActionCache
===========
Implements a simple in-memory action cache.
The action cache maps Action to their corresponding ActionResult. An
ActionResult may be found in cache, for any given Action, if that action has
already been executed.
Note:
Action and ActionResult are referenced by their Digest and mapping is stored
in-memory.
"""
import collections
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
class ActionCache:
"""In-memory Action to ActionResult associative array.
"""
def __init__(self, storage, max_cached_actions):
"""Initialises a new ActionCache instance.
Args:
storage (StorageABC): storage backend instance to be used.
max_cached_actions (int): maximun number of entries to cache.
"""
self._storage = storage
self._max_cached_actions = max_cached_actions
self._digest_map = collections.OrderedDict()
def get_action_result(self, action_digest):
"""Retrieves the cached ActionResult for the given Action digest.
Args:
action_digest (Digest): digest of the Action to query.
Returns:
The cached ActionResult matching the given Action digest or None if
the nothing hass been cached yet for that Action.
"""
key = (action_digest.hash, action_digest.size_bytes)
if key in self._digest_map:
action_result = self._storage.get_message(self._digest_map[key],
re_pb2.ActionResult)
if action_result is not None:
if self._blobs_still_exist(action_result):
self._digest_map.move_to_end(key)
return action_result
del self._digest_map[key]
return None
def put_action_result(self, action_digest, action_result):
"""Stores an ActionResult in cache for the given Action digest.
If the cache size limit has been reached, the oldest cache entries will
be dropped before insertion so that the cache size never exceeds the
maximum numbers of entries allowed.
Args:
action_digest (Digest): digest of the Action to select.
action_result (ActionResult): result object to store.
"""
if self._max_cached_actions == 0:
return
while len(self._digest_map) >= self._max_cached_actions:
self._digest_map.popitem(last=False)
key = (action_digest.hash, action_digest.size_bytes)
action_result_digest = self._storage.put_message(action_result)
self._digest_map[key] = action_result_digest
def _blobs_still_exist(self, action_result):
"""Checks CAS for ActionResult output blobs existance.
Args:
action_result (ActionResult): ActionResult to search referenced
output blobs for.
Returns:
True if all referenced blobs are present in CAS, False otherwise.
"""
blobs_needed = []
for output_file in action_result.output_files:
blobs_needed.append(output_file.digest)
for output_directory in action_result.output_directories:
blobs_needed.append(output_directory.tree_digest)
tree = self._storage.get_message(output_directory.tree_digest,
re_pb2.Tree)
if tree is None:
return False
for file_node in tree.root.files:
blobs_needed.append(file_node.digest)
for child in tree.children:
for file_node in child.files:
blobs_needed.append(file_node.digest)
if action_result.stdout_digest.hash and not action_result.stdout_raw:
blobs_needed.append(action_result.stdout_digest)
if action_result.stderr_digest.hash and not action_result.stderr_raw:
blobs_needed.append(action_result.stderr_digest)
missing = self._storage.missing_blobs(blobs_needed)
return len(missing) == 0
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
BuildGrid Instance
==================
An instance of the BuildGrid server.
Contains scheduler, execution instance and an interface to the bots.
"""
import logging
from .execution.execution_instance import ExecutionInstance
from .scheduler import Scheduler
from .worker.bots_interface import BotsInterface
class BuildGridInstance(ExecutionInstance, BotsInterface):
def __init__(self, action_cache=None, cas_storage=None):
scheduler = Scheduler(action_cache)
self.logger = logging.getLogger(__name__)
ExecutionInstance.__init__(self, scheduler, cas_storage)
BotsInterface.__init__(self, scheduler)
def stream_operation_updates(self, message_queue, operation_name):
operation = message_queue.get()
while not operation.done:
yield operation
operation = message_queue.get()
yield operation
def cancel_operation(self, name):
# TODO: Cancel leases
raise NotImplementedError("Cancelled operations not supported")
......@@ -29,35 +29,23 @@ from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_p
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
from buildgrid._protos.google.longrunning import operations_pb2_grpc
from .buildgrid_instance import BuildGridInstance
from .cas.bytestream_service import ByteStreamService
from .cas.content_addressable_storage_service import ContentAddressableStorageService
from .execution.action_cache_service import ActionCacheService
from .execution.execution_service import ExecutionService
from .execution.operations_service import OperationsService
from .execution.execution_instance import ExecutionInstance
from .scheduler import Scheduler
from .worker.bots_service import BotsService
from .worker.bots_interface import BotsInterface
class BuildGridServer:
def __init__(self, port='50051', max_workers=10, cas_storage=None, action_cache=None):
def __init__(self, port='50051', instances=None, max_workers=10, action_cache=None, cas_storage=None):
port = '[::]:{0}'.format(port)
scheduler = Scheduler(action_cache)
bots_interface = BotsInterface(scheduler)
execution_instance = ExecutionInstance(scheduler, cas_storage)
self._server = grpc.server(futures.ThreadPoolExecutor(max_workers))
self._server.add_insecure_port(port)
bots_pb2_grpc.add_BotsServicer_to_server(BotsService(bots_interface),
self._server)
remote_execution_pb2_grpc.add_ExecutionServicer_to_server(ExecutionService(execution_instance),
self._server)
operations_pb2_grpc.add_OperationsServicer_to_server(OperationsService(execution_instance),
self._server)
if cas_storage is not None:
cas_service = ContentAddressableStorageService(cas_storage)
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(cas_service,
......@@ -69,6 +57,20 @@ class BuildGridServer:
remote_execution_pb2_grpc.add_ActionCacheServicer_to_server(action_cache_service,
self._server)
buildgrid_instances = {}
if not instances:
buildgrid_instances["main"] = BuildGridInstance(action_cache, cas_storage)
else:
for name in instances:
buildgrid_instances[name] = BuildGridInstance(action_cache, cas_storage)
bots_pb2_grpc.add_BotsServicer_to_server(BotsService(buildgrid_instances),
self._server)
remote_execution_pb2_grpc.add_ExecutionServicer_to_server(ExecutionService(buildgrid_instances),
self._server)
operations_pb2_grpc.add_OperationsServicer_to_server(OperationsService(buildgrid_instances),
self._server)
def start(self):
self._server.start()
......
......@@ -56,12 +56,14 @@ class ExecutionInstance:
def get_operation(self, name):
operation = self._scheduler.jobs.get(name)
if operation is None:
raise InvalidArgumentError("Operation name does not exist: {}".format(name))
else:
return operation.get_operation()
def list_operations(self, name, list_filter, page_size, page_token):
def list_operations(self, list_filter, page_size, page_token):
# TODO: Pages
# Spec says number of pages and length of a page are optional
return self._scheduler.get_operations()
......@@ -72,10 +74,6 @@ class ExecutionInstance:
except KeyError:
raise InvalidArgumentError("Operation name does not exist: {}".format(name))
def cancel_operation(self, name):
# TODO: Cancel leases
raise NotImplementedError("Cancelled operations not supported")
def register_message_client(self, name, queue):
try:
self._scheduler.register_client(name, queue)
......
......@@ -35,23 +35,23 @@ from .._exceptions import InvalidArgumentError
class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
def __init__(self, instance):
def __init__(self, instances):
self.logger = logging.getLogger(__name__)
self._instance = instance
self._instances = instances
def Execute(self, request, context):
# Ignore request.instance_name for now
# Have only one instance
try:
message_queue = queue.Queue()
operation = self._instance.execute(request.action_digest,
request.skip_cache_lookup,
message_queue)
instance = self._get_instance(request.instance_name)
operation = instance.execute(request.action_digest,
request.skip_cache_lookup,
message_queue)
context.add_callback(partial(self._remove_client, operation.name, message_queue))
context.add_callback(partial(instance.unregister_message_client,
operation.name, message_queue))
yield from self._stream_operation_updates(message_queue,
operation.name)
yield from instance.stream_operation_updates(message_queue,
operation.name)
except InvalidArgumentError as e:
self.logger.error(e)
......@@ -69,13 +69,15 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
try:
message_queue = queue.Queue()
operation_name = request.name
instance = self._get_instance(request.instance_name)
self._instance.register_message_client(operation_name, message_queue)
instance.register_message_client(operation_name, message_queue)
context.add_callback(partial(self._remove_client, operation_name, message_queue))
context.add_callback(partial(instance.unregister_message_client,
operation_name, message_queue))
yield from self._stream_operation_updates(message_queue,
operation_name)
yield from instance.stream_operation_updates(message_queue,
operation_name)
except InvalidArgumentError as e:
self.logger.error(e)
......@@ -83,12 +85,9 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
yield operations_pb2.Operation()
def _remove_client(self, operation_name, message_queue):
self._instance.unregister_message_client(operation_name, message_queue)
def _get_instance(self, name):
try:
return self._instances[name]
def _stream_operation_updates(self, message_queue, operation_name):
operation = message_queue.get()
while not operation.done:
yield operation
operation = message_queue.get()
yield operation
except KeyError:
raise InvalidArgumentError("Instance doesn't exist on server: {}".format(name))
......@@ -23,6 +23,8 @@ import logging
import grpc
from google.protobuf.empty_pb2 import Empty
from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2
from .._exceptions import InvalidArgumentError
......@@ -30,42 +32,102 @@ from .._exceptions import InvalidArgumentError
class OperationsService(operations_pb2_grpc.OperationsServicer):
def __init__(self, instance):
self._instance = instance
def __init__(self, instances):
self._instances = instances
self.logger = logging.getLogger(__name__)
def GetOperation(self, request, context):
try:
return self._instance.get_operation(request.name)
name = request.name
operation_name = self._get_operation_name(name)
instance = self._get_instance(name)
operation = instance.get_operation(operation_name)
operation.name = name
return operation
except InvalidArgumentError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
return operations_pb2.Operation()
return operations_pb2.Operation()
def ListOperations(self, request, context):
return self._instance.list_operations(request.name,
request.filter,
try:
# Name should be the collection name
# Or in this case, the instance_name
name = request.name
instance = self._get_instance(name)
result = instance.list_operations(request.filter,
request.page_size,
request.page_token)
for operation in result.operations:
operation.name = "{}/{}".format(name, operation.name)
return result
except InvalidArgumentError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
return operations_pb2.ListOperationsResponse()
def DeleteOperation(self, request, context):
try:
return self._instance.delete_operation(request.name)
name = request.name
operation_name = self._get_operation_name(name)
instance = self._get_instance(name)
instance.delete_operation(operation_name)
except InvalidArgumentError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
return operations_pb2.Operation()
return Empty()
def CancelOperation(self, request, context):
try:
return self._instance.cancel_operation(request.name)
name = request.name
operation_name = self._get_operation_name(name)
instance = self._get_instance(name)
instance.cancel_operation(operation_name)
except NotImplementedError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
return operations_pb2.Operation()
except InvalidArgumentError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
return Empty()
def _get_operation_name(self, name):
return name.split("/")[-1]
def _get_instance(self, name):
try:
names = name.split("/")
# Operation name should be in format:
# {instance/name}/{operation_id}
instance_name = ''.join(names[0:-1])
if not instance_name:
return self._instances[name]
return self._instances[instance_name]
except KeyError:
raise InvalidArgumentError("Instance doesn't exist on server: {}".format(name))
......@@ -54,7 +54,8 @@ class BotsInterface:
pass
# Bot session name, selected by the server
name = str(uuid.uuid4())
name = "{}/{}".format(parent, str(uuid.uuid4()))
bot_session.name = name
self._bot_ids[name] = bot_id
......
......@@ -33,14 +33,17 @@ from .._exceptions import InvalidArgumentError, OutofSyncError
class BotsService(bots_pb2_grpc.BotsServicer):
def __init__(self, instance):
self._instance = instance
def __init__(self, instances):
self._instances = instances
self.logger = logging.getLogger(__name__)
def CreateBotSession(self, request, context):
try:
return self._instance.create_bot_session(request.parent,
request.bot_session)
parent = request.parent
instance = self._get_instance(request.parent)
return instance.create_bot_session(parent,
request.bot_session)
except InvalidArgumentError as e:
self.logger.error(e)
context.set_details(str(e))
......@@ -50,8 +53,15 @@ class BotsService(bots_pb2_grpc.BotsServicer):
def UpdateBotSession(self, request, context):
try:
return self._instance.update_bot_session(request.name,
request.bot_session)
names = request.name.split("/")
# Operation name should be in format:
# {instance/name}/{uuid}
instance_name = ''.join(names[0:-1])
instance = self._get_instance(instance_name)
return instance.update_bot_session(request.name,
request.bot_session)
except InvalidArgumentError as e:
self.logger.error(e)
context.set_details(str(e))
......@@ -72,3 +82,10 @@ class BotsService(bots_pb2_grpc.BotsServicer):
def PostBotEventTemp(self, request, context):
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
return Empty()
def _get_instance(self, name):
try:
return self._instances[name]
except KeyError:
raise InvalidArgumentError("Instance doesn't exist on server: {}".format(name))
......@@ -18,7 +18,6 @@
# pylint: disable=redefined-outer-name
import copy
import uuid
from unittest import mock
import grpc
......@@ -27,7 +26,7 @@ import pytest
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
from buildgrid.server import scheduler, job
from buildgrid.server import job, buildgrid_instance
from buildgrid.server.job import LeaseState
from buildgrid.server.worker import bots_interface, bots_service
......@@ -53,8 +52,8 @@ def bot_session():
@pytest.fixture
def schedule():
yield scheduler.Scheduler()
def buildgrid():
yield buildgrid_instance.BuildGridInstance()
@pytest.fixture
......@@ -64,19 +63,17 @@ def bots(schedule):
# Instance to test
@pytest.fixture
def instance(bots):
yield bots_service.BotsService(bots)
def instance(buildgrid):
instances = {"": buildgrid}
yield bots_service.BotsService(instances)
def test_create_bot_session(bot_session, context, instance):
parent = 'rach'
request = bots_pb2.CreateBotSessionRequest(parent=parent,
bot_session=bot_session)
request = bots_pb2.CreateBotSessionRequest(bot_session=bot_session)
response = instance.CreateBotSession(request, context)
assert isinstance(response, bots_pb2.BotSession)
assert uuid.UUID(response.name, version=4)
assert bot_session.bot_id == response.bot_id
......@@ -92,8 +89,7 @@ def test_create_bot_session_bot_id_fail(context, instance):
def test_update_bot_session(bot_session, context, instance):
request = bots_pb2.CreateBotSessionRequest(parent='',
bot_session=bot_session)
request = bots_pb2.CreateBotSessionRequest(bot_session=bot_session)
bot = instance.CreateBotSession(request, context)
request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
......@@ -106,8 +102,7 @@ def test_update_bot_session(bot_session, context, instance):
def test_update_bot_session_zombie(bot_session, context, instance):
request = bots_pb2.CreateBotSessionRequest(parent='',
bot_session=bot_session)
request = bots_pb2.CreateBotSessionRequest(bot_session=bot_session)
bot = instance.CreateBotSession(request, context)
# Update server with incorrect UUID by rotating it
bot.name = bot.name[len(bot.name): 0]
......@@ -121,8 +116,7 @@ def test_update_bot_session_zombie(bot_session, context, instance):
def test_update_bot_session_bot_id_fail(bot_session, context, instance):
request = bots_pb2.UpdateBotSessionRequest(name='ana',
bot_session=bot_session)
request = bots_pb2.UpdateBotSessionRequest(bot_session=bot_session)
instance.UpdateBotSession(request, context)
......@@ -131,17 +125,15 @@ def test_update_bot_session_bot_id_fail(bot_session, context, instance):
@pytest.mark.parametrize("number_of_jobs", [0, 1, 3, 500])
def test_number_of_leases(number_of_jobs, bot_session, context, instance):
request = bots_pb2.CreateBotSessionRequest(parent='',
bot_session=bot_session)
request = bots_pb2.CreateBotSessionRequest(bot_session=bot_session)
# Inject work
for _ in range(0, number_of_jobs):
action_digest = remote_execution_pb2.Digest()
instance._instance._scheduler.append_job(job.Job(action_digest))
instance._instances[""].execute(action_digest, True)
response = instance.CreateBotSession(request, context)
assert len(response.leases) == number_of_jobs
assert isinstance(response, bots_pb2.BotSession)
def test_update_leases_with_work(bot_session, context, instance):
......@@ -149,7 +141,7 @@ def test_update_leases_with_work(bot_session, context, instance):
bot_session=bot_session)
# Inject work
action_digest = remote_execution_pb2.Digest(hash='gaff')
instance._instance._scheduler.append_job(job.Job(action_digest))
instance._instances[""].execute(action_digest, True)
response = instance.CreateBotSession(request, context)
......@@ -159,7 +151,6 @@ def test_update_leases_with_work(bot_session, context, instance):
assert isinstance(response, bots_pb2.BotSession)
assert response.leases[0].state == LeaseState.PENDING.value
assert uuid.UUID(response.leases[0].id, version=4)
assert response_action == action_digest
......@@ -172,7 +163,7 @@ def test_update_leases_work_complete(bot_session, context, instance):
# Inject work
action_digest = remote_execution_pb2.Digest(hash='gaff')
instance._instance._scheduler.append_job(job.Job(action_digest))
instance._instances[""].execute(action_digest, True)
request = bots_pb2.UpdateBotSessionRequest(name=response.name,
bot_session=response)
......@@ -200,7 +191,7 @@ def test_work_rejected_by_bot(bot_session, context, instance):
bot_session=bot_session)
# Inject work
action_digest = remote_execution_pb2.Digest(hash='gaff')
instance._instance._scheduler.append_job(job.Job(action_digest))
instance._instances[""].execute(action_digest, True)
# Simulated the severed binding between client and server
response = copy.deepcopy(instance.CreateBotSession(request, context))
......@@ -222,7 +213,8 @@ def test_work_out_of_sync_from_pending(state, bot_session, context, instance):
bot_session=bot_session)
# Inject work
action_digest = remote_execution_pb2.Digest(hash='gaff')
instance._instance._scheduler.append_job(job.Job(action_digest))
instance._instances[""].execute(action_digest, True)
# Simulated the severed binding between client and server
response = copy.deepcopy(instance.CreateBotSession(request, context))
......@@ -242,7 +234,8 @@ def test_work_out_of_sync_from_active(state, bot_session, context, instance):
bot_session=bot_session)
# Inject work
action_digest = remote_execution_pb2.Digest(hash='gaff')
instance._instance._scheduler.append_job(job.Job(action_digest))
instance._instances[""].execute(action_digest, True)
# Simulated the severed binding between client and server
response = copy.deepcopy(instance.CreateBotSession(request, context))
......@@ -268,7 +261,8 @@ def test_work_active_to_active(bot_session, context, instance):
bot_session=bot_session)
# Inject work
action_digest = remote_execution_pb2.Digest(hash='gaff')
instance._instance._scheduler.append_job(job.Job(action_digest))
instance._instances[""].execute(action_digest, True)
# Simulated the severed binding between client and server
response = copy.deepcopy(instance.CreateBotSession(request, context))
......
......@@ -26,9 +26,9 @@ import pytest
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid._protos.google.longrunning import operations_pb2
from buildgrid.server import scheduler, job
from buildgrid.server import job, buildgrid_instance
from buildgrid.server.cas.storage import lru_memory_cache
from buildgrid.server.execution import action_cache, execution_instance, execution_service
from buildgrid.server.execution import action_cache, execution_service
@pytest.fixture
......@@ -38,19 +38,21 @@ def context():
@pytest.fixture(params=["action-cache", "no-action-cache"])
def execution(request):
def buildgrid(request):
if request.param == "action-cache":
storage = lru_memory_cache.LRUMemoryCache(1024 * 1024)
cache = action_cache.ActionCache(storage, 50)
schedule = scheduler.Scheduler(cache)
return execution_instance.ExecutionInstance(schedule, storage)
return execution_instance.ExecutionInstance(scheduler.Scheduler())
return buildgrid_instance.BuildGridInstance(action_cache=cache,
cas_storage=storage)
return buildgrid_instance.BuildGridInstance()
# Instance to test
@pytest.fixture
def instance(execution):
yield execution_service.ExecutionService(execution)
def instance(buildgrid):
instances = {"": buildgrid}
yield execution_service.ExecutionService(instances)
@pytest.mark.parametrize("skip_cache_lookup", [True, False])
......
......@@ -28,10 +28,10 @@ from google.protobuf import any_pb2
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid._protos.google.longrunning import operations_pb2
from buildgrid.server import scheduler
from buildgrid.server import buildgrid_instance
from buildgrid.server._exceptions import InvalidArgumentError
from buildgrid.server.execution import execution_instance, operations_service
from buildgrid.server.execution import operations_service
# Can mock this
......@@ -52,29 +52,25 @@ def execute_request():
@pytest.fixture
def schedule():
yield scheduler.Scheduler()
@pytest.fixture
def execution(schedule):
yield execution_instance.ExecutionInstance(schedule)
def buildgrid():
yield buildgrid_instance.BuildGridInstance()
# Instance to test
@pytest.fixture
def instance(execution):
yield operations_service.OperationsService(execution)
def instance(buildgrid):
instances = {"blade": buildgrid}
yield operations_service.OperationsService(instances)
# Queue an execution, get operation corresponding to that request
def test_get_operation(instance, execute_request, context):
response_execute = instance._instance.execute(execute_request.action_digest,
execute_request.skip_cache_lookup)
response_execute = instance._instances["blade"].execute(execute_request.action_digest,
execute_request.skip_cache_lookup)
request = operations_pb2.GetOperationRequest()
request.name = response_execute.name
request.name = "blade/" + response_execute.name
response = instance.GetOperation(request, context)
assert response is response_execute
......@@ -88,29 +84,30 @@ def test_get_operation_fail(instance, context):
def test_list_operations(instance, execute_request, context):
response_execute = instance._instance.execute(execute_request.action_digest,
execute_request.skip_cache_lookup)
response_execute = instance._instances["blade"].execute(execute_request.action_digest,
execute_request.skip_cache_lookup)
request = operations_pb2.ListOperationsRequest()
request = operations_pb2.ListOperationsRequest(name="blade")
response = instance.ListOperations(request, context)
assert response.operations[0].name == response_execute.name
assert response.operations[0].name.split('/')[-1] == response_execute.name
def test_list_operations_with_result(instance, execute_request, context):
response_execute = instance._instance.execute(execute_request.action_digest,
execute_request.skip_cache_lookup)
response_execute = instance._instances["blade"].execute(execute_request.action_digest,
execute_request.skip_cache_lookup)
action_result = remote_execution_pb2.ActionResult()
output_file = remote_execution_pb2.OutputFile(path='unicorn')
action_result.output_files.extend([output_file])
instance._instance._scheduler.job_complete(response_execute.name, _pack_any(action_result))
instance._instances["blade"]._scheduler.job_complete(response_execute.name,
_pack_any(action_result))
request = operations_pb2.ListOperationsRequest()
request = operations_pb2.ListOperationsRequest(name="blade")
response = instance.ListOperations(request, context)
assert response.operations[0].name == response_execute.name
assert response.operations[0].name.split('/')[-1] == response_execute.name
execute_response = remote_execution_pb2.ExecuteResponse()
response.operations[0].response.Unpack(execute_response)
......@@ -118,7 +115,7 @@ def test_list_operations_with_result(instance, execute_request, context):
def test_list_operations_empty(instance, context):
request = operations_pb2.ListOperationsRequest()
request = operations_pb2.ListOperationsRequest(name="blade")
response = instance.ListOperations(request, context)
......@@ -127,32 +124,27 @@ def test_list_operations_empty(instance, context):
# Send execution off, delete, try to find operation should fail
def test_delete_operation(instance, execute_request, context):
response_execute = instance._instance.execute(execute_request.action_digest,
execute_request.skip_cache_lookup)
response_execute = instance._instances["blade"].execute(execute_request.action_digest,
execute_request.skip_cache_lookup)
request = operations_pb2.DeleteOperationRequest()
request.name = response_execute.name
request.name = "blade/" + response_execute.name
instance.DeleteOperation(request, context)
request = operations_pb2.GetOperationRequest()
request.name = response_execute.name
request.name = "blade/" + response_execute.name
with pytest.raises(InvalidArgumentError):
instance._instance.get_operation(response_execute.name)
instance._instances["blade"].get_operation(response_execute.name)
def test_delete_operation_fail(instance, execute_request, context):
request = operations_pb2.DeleteOperationRequest()
request.name = "blade/run"
instance.DeleteOperation(request, context)
context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
def test_cancel_operation(instance, context):
request = operations_pb2.CancelOperationRequest()
instance.CancelOperation(request, context)
context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
def _pack_any(pack):
some_any = any_pb2.Any()
some_any.Pack(pack)
......