Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • coverity
  • master
  • sminskyprimu/blake3
  • sotk/consolidate-leases-and-jobs/p2-write-only-leases
  • sotk/logstream-testing
  • zchen723/skip-scheduler-metrics
  • 0.0.10
  • 0.0.11
  • 0.0.12
  • 0.0.13
  • 0.0.14
  • 0.0.16
  • 0.0.17
  • 0.0.19
  • 0.0.2
  • 0.0.20
  • 0.0.21
  • 0.0.23
  • 0.0.25
  • 0.0.26
  • 0.0.27
  • 0.0.28
  • 0.0.29
  • 0.0.3
  • 0.0.30
  • 0.0.31
  • 0.0.32
  • 0.0.33
  • 0.0.34
  • 0.0.35
  • 0.0.36
  • 0.0.37
  • 0.0.38
  • 0.0.39
  • 0.0.4
  • 0.0.40
  • 0.0.41
  • 0.0.42
  • 0.0.43
  • 0.0.44
  • 0.0.45
  • 0.0.46
  • 0.0.47
  • 0.0.48
  • 0.0.49
  • 0.0.5
  • 0.0.50
  • 0.0.51
  • 0.0.52
  • 0.0.53
  • 0.0.54
  • 0.0.55
  • 0.0.56
  • 0.0.57
  • 0.0.58
  • 0.0.59
  • 0.0.6
  • 0.0.60
  • 0.0.61
  • 0.0.62
  • 0.0.63
  • 0.0.64
  • 0.0.65
  • 0.0.66
  • 0.0.67
  • 0.0.68
  • 0.0.69
  • 0.0.7
  • 0.0.70
  • 0.0.71
  • 0.0.72
  • 0.0.73
  • 0.0.74
  • 0.0.75
  • 0.0.76
  • 0.0.78
  • 0.0.79
  • 0.0.8
  • 0.0.80
  • 0.0.81
  • 0.0.82
  • 0.0.83
  • 0.0.84
  • 0.0.85
  • 0.0.86
  • 0.0.87
  • 0.0.88
  • 0.0.89
  • 0.0.9
  • 0.0.90
  • 0.0.91
  • 0.0.92
  • 0.0.93
  • 0.0.94
  • 0.0.95
  • 0.0.96
  • 0.0.97
  • 0.0.98
  • 0.1.0
  • 0.1.1
  • 0.1.10
  • 0.1.11
  • 0.1.12
  • 0.1.13
  • 0.1.14
  • 0.1.15
106 results

Target

Select target project
  • edbaunton/buildgrid
  • BuildGrid/buildgrid
  • bloomberg/buildgrid
  • devcurmudgeon/buildgrid
  • mhadjimichael/buildgrid
  • jmacarthur/buildgrid
  • rkothur/buildgrid
  • valentindavid/buildgrid
  • jjardon/buildgrid
  • RichKen/buildgrid
  • jbonney/buildgrid
  • onsha_alexander/buildgrid
  • santigl/buildgrid
  • mostynb/buildgrid
  • hoffbrinkle/buildgrid
  • Malinskiy/buildgrid
  • coldtom/buildgrid
  • azeemb_a/buildgrid
  • pointswaves/buildgrid
  • BenjaminSchubert/buildgrid
  • michaellee8/buildgrid
  • anil-anil/buildgrid
  • seanborg/buildgrid
  • jdelong12/buildgrid
  • jclay/buildgrid
  • bweston92/buildgrid
  • zchen723/buildgrid
  • cpratt34/buildgrid
  • armbiant/apache-buildgrid
  • armbiant/android-buildgrid
  • itsme300/buildgrid
  • sbairoliya/buildgrid
32 results
Select Git revision
  • coverity
  • master
  • sminskyprimu/blake3
  • sotk/consolidate-leases-and-jobs/p2-write-only-leases
  • sotk/logstream-testing
  • zchen723/skip-scheduler-metrics
  • 0.0.10
  • 0.0.11
  • 0.0.12
  • 0.0.13
  • 0.0.14
  • 0.0.16
  • 0.0.17
  • 0.0.19
  • 0.0.2
  • 0.0.20
  • 0.0.21
  • 0.0.23
  • 0.0.25
  • 0.0.26
  • 0.0.27
  • 0.0.28
  • 0.0.29
  • 0.0.3
  • 0.0.30
  • 0.0.31
  • 0.0.32
  • 0.0.33
  • 0.0.34
  • 0.0.35
  • 0.0.36
  • 0.0.37
  • 0.0.38
  • 0.0.39
  • 0.0.4
  • 0.0.40
  • 0.0.41
  • 0.0.42
  • 0.0.43
  • 0.0.44
  • 0.0.45
  • 0.0.46
  • 0.0.47
  • 0.0.48
  • 0.0.49
  • 0.0.5
  • 0.0.50
  • 0.0.51
  • 0.0.52
  • 0.0.53
  • 0.0.54
  • 0.0.55
  • 0.0.56
  • 0.0.57
  • 0.0.58
  • 0.0.59
  • 0.0.6
  • 0.0.60
  • 0.0.61
  • 0.0.62
  • 0.0.63
  • 0.0.64
  • 0.0.65
  • 0.0.66
  • 0.0.67
  • 0.0.68
  • 0.0.69
  • 0.0.7
  • 0.0.70
  • 0.0.71
  • 0.0.72
  • 0.0.73
  • 0.0.74
  • 0.0.75
  • 0.0.76
  • 0.0.78
  • 0.0.79
  • 0.0.8
  • 0.0.80
  • 0.0.81
  • 0.0.82
  • 0.0.83
  • 0.0.84
  • 0.0.85
  • 0.0.86
  • 0.0.87
  • 0.0.88
  • 0.0.89
  • 0.0.9
  • 0.0.90
  • 0.0.91
  • 0.0.92
  • 0.0.93
  • 0.0.94
  • 0.0.95
  • 0.0.96
  • 0.0.97
  • 0.0.98
  • 0.1.0
  • 0.1.1
  • 0.1.10
  • 0.1.11
  • 0.1.12
  • 0.1.13
  • 0.1.14
  • 0.1.15
106 results
Show changes
Commits on Source (4)
Showing
with 182 additions and 3363 deletions
......@@ -39,7 +39,7 @@ from buildgrid._exceptions import BotError
from ..cli import pass_context
from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
from buildgrid._protos.google.devtools.remoteexecution.v1test import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from google.protobuf import any_pb2
@click.group(short_help = 'Create a bot client')
......
......@@ -30,8 +30,8 @@ import time
from ..cli import pass_context
from buildgrid._protos.google.devtools.remoteexecution.v1test import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid._protos.google.devtools.remoteexecution.v1test.remote_execution_pb2 import ExecuteOperationMetadata
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteOperationMetadata
from buildgrid._protos.google.longrunning import operations_pb2, operations_pb2_grpc
from google.protobuf import any_pb2
......@@ -52,39 +52,24 @@ def cli(context, host, port):
@click.option('--wait-for-completion', is_flag=True)
@pass_context
def request(context, number, instance_name, wait_for_completion):
action_digest = remote_execution_pb2.Digest()
context.logger.info("Sending execution request...\n")
stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
action = remote_execution_pb2.Action(command_digest = None,
input_root_digest = None,
output_files = [],
output_directories = None,
platform = None,
timeout = None,
do_not_cache = True)
action.command_digest.hash = 'foo'
request = remote_execution_pb2.ExecuteRequest(instance_name = instance_name,
action = action,
action_digest = action_digest,
skip_cache_lookup = True)
responses = []
for i in range(0, number):
response = stub.Execute(request)
context.logger.info("Response name: {}".format(response.name))
try:
while wait_for_completion:
request = operations_pb2.ListOperationsRequest()
context.logger.debug('Querying to see if jobs are complete.')
stub = operations_pb2_grpc.OperationsStub(context.channel)
response = stub.ListOperations(request)
if all(operation.done for operation in response.operations):
context.logger.info('Jobs complete')
break
time.sleep(1)
except KeyboardInterrupt:
pass
responses.append(stub.Execute(request))
for response in responses:
if wait_for_completion:
for stream in response:
context.logger.info(stream)
else:
context.logger.info(next(response))
@cli.command('status', short_help='Get the status of an operation')
@click.argument('operation-name')
......@@ -96,7 +81,7 @@ def operation_status(context, operation_name):
request = operations_pb2.GetOperationRequest(name=operation_name)
response = stub.GetOperation(request)
_log_operation(context, response)
context.logger.info(response)
@cli.command('list', short_help='List operations')
@pass_context
......@@ -113,13 +98,16 @@ def list_operations(context):
return
for op in response.operations:
_log_operation(context, op)
context.logger.info(op)
@cli.command('wait', short_help='Streams an operation until it is complete')
@click.argument('operation-name')
@pass_context
def wait_execution(context, operation_name):
stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
request = remote_execution_pb2.WaitExecutionRequest(name=operation_name)
def _log_operation(context, operation):
op_meta = ExecuteOperationMetadata()
operation.metadata.Unpack(op_meta)
response = stub.WaitExecution(request)
context.logger.info("Name : {}".format(operation.name))
context.logger.info("Done : {}".format(operation.done))
context.logger.info("Stage : {}".format(ExecuteOperationMetadata.Stage.Name(op_meta.stage)))
context.logger.info("Key : {}".format(operation.response))
for stream in response:
context.logger.info(stream)
......@@ -27,7 +27,7 @@ import grpc
from concurrent import futures
from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
from buildgrid._protos.google.devtools.remoteexecution.v1test import remote_execution_pb2_grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
from buildgrid._protos.google.longrunning import operations_pb2_grpc
......
......@@ -25,7 +25,7 @@ CAS blobs.
import grpc
from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
from buildgrid._protos.google.devtools.remoteexecution.v1test import remote_execution_pb2 as re_pb2, remote_execution_pb2_grpc as re_pb2_grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2, remote_execution_pb2_grpc as re_pb2_grpc
from ...settings import HASH
......
......@@ -24,7 +24,7 @@ to check for missing CAS blobs and update them in bulk.
"""
import grpc
from buildgrid._protos.google.devtools.remoteexecution.v1test import remote_execution_pb2 as re_pb2, remote_execution_pb2_grpc as re_pb2_grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2, remote_execution_pb2_grpc as re_pb2_grpc
class ContentAddressableStorageService(re_pb2_grpc.ContentAddressableStorageServicer):
......@@ -42,10 +42,10 @@ class ContentAddressableStorageService(re_pb2_grpc.ContentAddressableStorageServ
storage = self._storage
requests = []
for request_proto in request.requests:
requests.append((request_proto.content_digest, request_proto.data))
requests.append((request_proto.digest, request_proto.data))
response = re_pb2.BatchUpdateBlobsResponse()
for (digest, _), status in zip(requests, storage.bulk_update_blobs(requests)):
response_proto = response.responses.add()
response_proto.blob_digest.CopyFrom(digest)
response_proto.digest.CopyFrom(digest)
response_proto.status.CopyFrom(status)
return response
......@@ -25,7 +25,7 @@ Action Cache currently not implemented.
import logging
import grpc
from buildgrid._protos.google.devtools.remoteexecution.v1test import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer):
......
......@@ -34,12 +34,12 @@ class ExecutionInstance():
self.logger = logging.getLogger(__name__)
self._scheduler = scheduler
def execute(self, action, skip_cache_lookup):
def execute(self, action_digest, skip_cache_lookup, message_queue=None):
""" Sends a job for execution.
Queues an action and creates an Operation instance to be associated with
this action.
"""
job = Job(action)
job = Job(action_digest, message_queue)
self.logger.info("Operation name: {}".format(job.name))
if not skip_cache_lookup:
......@@ -50,7 +50,6 @@ class ExecutionInstance():
return job.get_operation()
def get_operation(self, name):
self.logger.debug("Getting operation: {}".format(name))
operation = self._scheduler.jobs.get(name)
if operation is None:
raise InvalidArgumentError("Operation name does not exist: {}".format(name))
......@@ -60,11 +59,9 @@ class ExecutionInstance():
def list_operations(self, name, list_filter, page_size, page_token):
# TODO: Pages
# Spec says number of pages and length of a page are optional
self.logger.debug("Listing operations")
return self._scheduler.get_operations()
def delete_operation(self, name):
self.logger.debug("Deleting operation {}".format(name))
try:
self._scheduler.jobs.pop(name)
except KeyError:
......@@ -73,3 +70,15 @@ class ExecutionInstance():
def cancel_operation(self, name):
# TODO: Cancel leases
raise NotImplementedError("Cancelled operations not supported")
def register_message_client(self, name, queue):
try:
self._scheduler.register_client(name, queue)
except KeyError:
raise InvalidArgumentError("Operation name does not exist: {}".format(name))
def unregister_message_client(self, name, queue):
try:
self._scheduler.unregister_client(name, queue)
except KeyError:
raise InvalidArgumentError("Operation name does not exist: {}".format(name))
......@@ -24,8 +24,9 @@ Serves remote execution requests.
import grpc
import logging
import queue
from buildgrid._protos.google.devtools.remoteexecution.v1test import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2
from ._exceptions import InvalidArgumentError
......@@ -33,24 +34,58 @@ from ._exceptions import InvalidArgumentError
class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
def __init__(self, instance):
self._instance = instance
self.logger = logging.getLogger(__name__)
self._instance = instance
def Execute(self, request, context):
# Ignore request.instance_name for now
# Have only one instance
try:
return self._instance.execute(request.action,
request.skip_cache_lookup)
message_queue = queue.Queue()
operation = self._instance.execute(request.action_digest,
request.skip_cache_lookup,
message_queue)
remove_client = lambda : self._remove_client(operation.name, message_queue)
context.add_callback(remove_client)
yield from self._stream_operation_updates(message_queue,
operation.name)
except InvalidArgumentError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_details(sxtr(e))
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
return operations_pb2.Operation()
except NotImplementedError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
return operations_pb2.Operation()
def WaitExecution(self, request, context):
try:
message_queue = queue.Queue()
operation_name = request.name
self._instance.register_message_client(operation_name, message_queue)
remove_client = lambda : self._remove_client(operation_name, message_queue)
context.add_callback(remove_client)
yield from self._stream_operation_updates(message_queue,
operation_name)
except InvalidArgumentError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
def _remove_client(self, operation_name, message_queue):
self._instance.unregister_message_client(operation_name, message_queue)
def _stream_operation_updates(self, message_queue, operation_name):
operation = message_queue.get()
while not operation.done:
yield operation
operation = message_queue.get()
yield operation
......@@ -18,11 +18,11 @@
import logging
import uuid
import buildgrid._protos.google.devtools.remoteexecution.v1test.remote_execution_pb2
import buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2
from enum import Enum
from buildgrid._protos.google.devtools.remoteexecution.v1test.remote_execution_pb2 import ExecuteOperationMetadata, ExecuteResponse
from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteOperationMetadata, ExecuteResponse
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, worker_pb2
from buildgrid._protos.google.longrunning import operations_pb2
from google.protobuf import any_pb2
......@@ -51,9 +51,8 @@ class LeaseState(Enum):
class Job():
def __init__(self, action):
self.action = action
self.bot_status = BotStatus.BOT_STATUS_UNSPECIFIED
def __init__(self, action_digest, message_queue=None):
self.action_digest = action_digest
self.execute_stage = ExecuteStage.UNKNOWN
self.lease = None
self.logger = logging.getLogger(__name__)
......@@ -62,10 +61,25 @@ class Job():
self._n_tries = 0
self._operation = operations_pb2.Operation(name = self.name)
self._operation_update_queues = []
if message_queue is not None:
self.register_client(message_queue)
def check_job_finished(self):
if not self._operation_update_queues:
if self.execute_stage == ExecuteStage.COMPLETED:
return True
return False
def register_client(self, queue):
self._operation_update_queues.append(queue)
def unregister_client(self, queue):
self._operation_update_queues.remove(queue)
def get_operation(self):
self._operation.metadata.CopyFrom(self._pack_any(self.get_operation_meta()))
if self.result is not None:
self._operation.done = True
response = ExecuteResponse()
......@@ -81,10 +95,10 @@ class Job():
return meta
def create_lease(self):
action = self._pack_any(self.action)
action_digest = self._pack_any(self.action_digest)
lease = bots_pb2.Lease(id = self.name,
payload = action,
payload = action_digest,
state = LeaseState.PENDING.value)
self.lease = lease
return lease
......@@ -92,6 +106,11 @@ class Job():
def get_operations(self):
return operations_pb2.ListOperationsResponse(operations = [self.get_operation()])
def update_execute_stage(self, stage):
self.execute_stage = stage
for queue in self._operation_update_queues:
queue.put(self.get_operation())
def _pack_any(self, pack):
any = any_pb2.Any()
any.Pack(pack)
......
......@@ -35,8 +35,17 @@ class Scheduler():
self.jobs = {}
self.queue = deque()
def register_client(self, name, queue):
self.jobs[name].register_client(queue)
def unregister_client(self, name, queue):
job = self.jobs[name]
job.unregister_client(queue)
if job.check_job_finished():
del self.jobs[name]
def append_job(self, job):
job.execute_stage = ExecuteStage.QUEUED
job.update_execute_stage(ExecuteStage.QUEUED)
self.jobs[job.name] = job
self.queue.append(job)
......@@ -45,9 +54,9 @@ class Scheduler():
if job.n_tries >= self.MAX_N_TRIES:
# TODO: Decide what to do with these jobs
job.execute_stage = ExecuteStage.COMPLETED
job.update_execute_stage(ExecuteStage.COMPLETED)
else:
job.execute_stage = ExecuteStage.QUEUED
job.update_execute_stage(ExecuteStage.QUEUED)
job.n_tries += 1
self.queue.appendleft(job)
......@@ -56,15 +65,14 @@ class Scheduler():
def create_job(self):
if len(self.queue) > 0:
job = self.queue.popleft()
job.execute_stage = ExecuteStage.EXECUTING
job.update_execute_stage(ExecuteStage.EXECUTING)
self.jobs[job.name] = job
return job
return None
def job_complete(self, name, result):
job = self.jobs[name]
job.execute_stage = ExecuteStage.COMPLETED
job.result = result
job.update_execute_stage(ExecuteStage.COMPLETED)
self.jobs[name] = job
def get_operations(self):
......@@ -122,3 +130,7 @@ class Scheduler():
if state == LeaseState.PENDING.value or \
state == LeaseState.ACTIVE.value:
self.retry_job(name)
def _update_execute_stage(self, job, stage):
job.update_execute_stage(stage)
return job
......@@ -18,7 +18,7 @@
import io
from buildgrid._protos.google.bytestream import bytestream_pb2
from buildgrid._protos.google.devtools.remoteexecution.v1test import remote_execution_pb2 as re_pb2
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
import pytest
from buildgrid.server.cas.storage.storage_abc import StorageABC
......@@ -170,22 +170,22 @@ def test_cas_batch_update_blobs(instance):
storage = SimpleStorage()
servicer = ContentAddressableStorageService(storage)
update_requests = [
re_pb2.UpdateBlobRequest(
content_digest=re_pb2.Digest(hash=HASH(b'abc').hexdigest(), size_bytes=3), data=b'abc'),
re_pb2.UpdateBlobRequest(
content_digest=re_pb2.Digest(hash="invalid digest!", size_bytes=1000),
re_pb2.BatchUpdateBlobsRequest.Request(
digest=re_pb2.Digest(hash=HASH(b'abc').hexdigest(), size_bytes=3), data=b'abc'),
re_pb2.BatchUpdateBlobsRequest.Request(
digest=re_pb2.Digest(hash="invalid digest!", size_bytes=1000),
data=b'wrong data')
]
request = re_pb2.BatchUpdateBlobsRequest(instance_name=instance, requests=update_requests)
response = servicer.BatchUpdateBlobs(request, None)
assert len(response.responses) == 2
for blob_response in response.responses:
if blob_response.blob_digest == update_requests[0].content_digest:
if blob_response.digest == update_requests[0].digest:
assert blob_response.status.code == 0
elif blob_response.blob_digest == update_requests[1].content_digest:
elif blob_response.digest == update_requests[1].digest:
assert blob_response.status.code != 0
else:
raise Exception("Unexpected blob response")
assert len(storage.data) == 1
assert (update_requests[0].content_digest.hash, 3) in storage.data
assert storage.data[(update_requests[0].content_digest.hash, 3)] == b'abc'
assert (update_requests[0].digest.hash, 3) in storage.data
assert storage.data[(update_requests[0].digest.hash, 3)] == b'abc'
......@@ -18,7 +18,7 @@
import tempfile
import boto3
from buildgrid._protos.google.devtools.remoteexecution.v1test.remote_execution_pb2 import Digest
from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
from moto import mock_s3
import pytest
......
......@@ -21,7 +21,7 @@ import pytest
from unittest import mock
from grpc._server import _Context
from buildgrid._protos.google.devtools.remoteexecution.v1test import remote_execution_pb2
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid.server import scheduler
from buildgrid.server.execution import execution_instance, action_cache_service
......
......@@ -23,7 +23,7 @@ import uuid
from unittest import mock
from grpc._server import _Context
from buildgrid._protos.google.devtools.remoteexecution.v1test import remote_execution_pb2
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, worker_pb2
from google.protobuf import any_pb2
......
......@@ -22,7 +22,7 @@ import uuid
from unittest import mock
from grpc._server import _Context
from buildgrid._protos.google.devtools.remoteexecution.v1test import remote_execution_pb2
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid._protos.google.longrunning import operations_pb2
from google.protobuf import any_pb2
......@@ -49,22 +49,40 @@ def instance(execution):
@pytest.mark.parametrize("skip_cache_lookup", [True, False])
def test_execute(skip_cache_lookup, instance, context):
action = remote_execution_pb2.Action()
action.command_digest.hash = 'zhora'
action_digest = remote_execution_pb2.Digest()
action_digest.hash = 'zhora'
request = remote_execution_pb2.ExecuteRequest(instance_name = '',
action = action,
action_digest = action_digest,
skip_cache_lookup = skip_cache_lookup)
result = instance.Execute(request, context)
assert isinstance(result, operations_pb2.Operation)
response = instance.Execute(request, context)
if skip_cache_lookup is False:
[r for r in response]
context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
else:
result = next(response)
assert isinstance(result, operations_pb2.Operation)
metadata = remote_execution_pb2.ExecuteOperationMetadata()
result.metadata.Unpack(metadata)
assert metadata.stage == job.ExecuteStage.QUEUED.value
assert uuid.UUID(result.name, version=4)
assert result.done is False
def test_wait_execution(instance, context):
action_digest = remote_execution_pb2.Digest()
action_digest.hash = 'zhora'
j = job.Job(action_digest, None)
j._operation.done = True
request = remote_execution_pb2.WaitExecutionRequest(name=j.name)
instance._instance._scheduler.jobs[j.name] = j
action_result_any = any_pb2.Any()
action_result = remote_execution_pb2.ActionResult()
action_result_any.Pack(action_result)
instance._instance._scheduler._update_execute_stage(j, job.ExecuteStage.COMPLETED)
response = instance.WaitExecution(request, context)
......@@ -21,7 +21,7 @@ import pytest
from unittest import mock
from grpc._server import _Context
from buildgrid._protos.google.devtools.remoteexecution.v1test import remote_execution_pb2
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid._protos.google.longrunning import operations_pb2
from buildgrid.server import scheduler, job
......@@ -37,11 +37,11 @@ def context():
# Requests to make
@pytest.fixture
def execute_request():
action = remote_execution_pb2.Action()
action.command_digest.hash = 'zhora'
action_digest = remote_execution_pb2.Digest()
action_digest.hash = 'zhora'
yield remote_execution_pb2.ExecuteRequest(instance_name = '',
action = action,
action_digest = action_digest,
skip_cache_lookup = True)
@pytest.fixture
......@@ -59,10 +59,11 @@ def instance(execution):
# Queue an execution, get operation corresponding to that request
def test_get_operation(instance, execute_request, context):
response_execute = instance._instance.execute(execute_request.action,
response_execute = instance._instance.execute(execute_request.action_digest,
execute_request.skip_cache_lookup)
request = operations_pb2.GetOperationRequest()
request.name = response_execute.name
response = instance.GetOperation(request, context)
......@@ -75,7 +76,7 @@ def test_get_operation_fail(instance, context):
context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
def test_list_operations(instance, execute_request, context):
response_execute = instance._instance.execute(execute_request.action,
response_execute = instance._instance.execute(execute_request.action_digest,
execute_request.skip_cache_lookup)
request = operations_pb2.ListOperationsRequest()
......@@ -84,7 +85,7 @@ def test_list_operations(instance, execute_request, context):
assert response.operations[0].name == response_execute.name
def test_list_operations_with_result(instance, execute_request, context):
response_execute = instance._instance.execute(execute_request.action,
response_execute = instance._instance.execute(execute_request.action_digest,
execute_request.skip_cache_lookup)
action_result = remote_execution_pb2.ActionResult()
......@@ -109,7 +110,7 @@ def test_list_operations_empty(instance, context):
# Send execution off, delete, try to find operation should fail
def test_delete_operation(instance, execute_request, context):
response_execute = instance._instance.execute(execute_request.action,
response_execute = instance._instance.execute(execute_request.action_digest,
execute_request.skip_cache_lookup)
request = operations_pb2.DeleteOperationRequest()
request.name = response_execute.name
......