diff --git a/buildstream/sandbox/_sandboxremote.py b/buildstream/sandbox/_sandboxremote.py index 389a8fec09ae281582e0eb614493824f53a7f522..60a053e0c028f554dc8cfb158cbbf8146a42f70e 100644 --- a/buildstream/sandbox/_sandboxremote.py +++ b/buildstream/sandbox/_sandboxremote.py @@ -20,15 +20,18 @@ import os from urllib.parse import urlparse +from functools import partial import grpc from . import Sandbox from ..storage._filebaseddirectory import FileBasedDirectory from ..storage._casbaseddirectory import CasBasedDirectory +from .. import _signals from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc from .._protos.google.rpc import code_pb2 from .._exceptions import SandboxError +from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc # SandboxRemote() @@ -51,6 +54,7 @@ class SandboxRemote(Sandbox): "Only plain HTTP is currenlty supported (no HTTPS).") self.server_url = '{}:{}'.format(url.hostname, url.port) + self.operation_name = None def run_remote_command(self, command, input_root_digest, working_directory, environment): # Sends an execution request to the remote execution server. @@ -102,10 +106,13 @@ class SandboxRemote(Sandbox): operation_iterator = stub.WaitExecution(request) for operation in operation_iterator: + if not self.operation_name: + self.operation_name = operation.name if operation.done: return operation else: last_operation = operation + except grpc.RpcError as e: status_code = e.code() if status_code == grpc.StatusCode.UNAVAILABLE: @@ -125,19 +132,39 @@ class SandboxRemote(Sandbox): return last_operation + # Set up signal handler to trigger cancel_operation on SIGTERM operation = None - with self._get_context().timed_activity("Waiting for the remote build to complete"): + with self._get_context().timed_activity("Waiting for the remote build to complete"), \ + _signals.terminator(partial(self.cancel_operation, channel)): operation = __run_remote_command(stub, execute_request=request) if operation is None: return None elif operation.done: return operation - while operation is not None and not operation.done: operation = __run_remote_command(stub, running_operation=operation) return operation + def cancel_operation(self, channel): + # If we don't have the name can't send request. + if self.operation_name is None: + return + + stub = operations_pb2_grpc.OperationsStub(channel) + request = operations_pb2.CancelOperationRequest( + name=str(self.operation_name)) + + try: + stub.CancelOperation(request) + except grpc.RpcError as e: + if (e.code() == grpc.StatusCode.UNIMPLEMENTED or + e.code() == grpc.StatusCode.INVALID_ARGUMENT): + pass + else: + raise SandboxError("Failed trying to send CancelOperation request: " + "{} ({})".format(e.details(), e.code().name)) + def process_job_output(self, output_directories, output_files): # Reads the remote execution server response to an execution request. #