Skip to content
Snippets Groups Projects
Commit c6306b88 authored by Jürg Billeter's avatar Jürg Billeter
Browse files

Merge branch '725-job-cancellation-on-remote-builds' into 'master'

_sandboxremote.py: Add sigterm handler that sends CancelOperation

Closes #725

See merge request !900
parents 8071c00c fd41b0b5
No related branches found
No related tags found
Loading
Pipeline #37291850 passed
......@@ -20,15 +20,18 @@
import os
from urllib.parse import urlparse
from functools import partial
import grpc
from . import Sandbox
from ..storage._filebaseddirectory import FileBasedDirectory
from ..storage._casbaseddirectory import CasBasedDirectory
from .. import _signals
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from .._protos.google.rpc import code_pb2
from .._exceptions import SandboxError
from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
# SandboxRemote()
......@@ -51,6 +54,7 @@ class SandboxRemote(Sandbox):
"Only plain HTTP is currenlty supported (no HTTPS).")
self.server_url = '{}:{}'.format(url.hostname, url.port)
self.operation_name = None
def run_remote_command(self, command, input_root_digest, working_directory, environment):
# Sends an execution request to the remote execution server.
......@@ -102,10 +106,13 @@ class SandboxRemote(Sandbox):
operation_iterator = stub.WaitExecution(request)
for operation in operation_iterator:
if not self.operation_name:
self.operation_name = operation.name
if operation.done:
return operation
else:
last_operation = operation
except grpc.RpcError as e:
status_code = e.code()
if status_code == grpc.StatusCode.UNAVAILABLE:
......@@ -125,19 +132,39 @@ class SandboxRemote(Sandbox):
return last_operation
# Set up signal handler to trigger cancel_operation on SIGTERM
operation = None
with self._get_context().timed_activity("Waiting for the remote build to complete"):
with self._get_context().timed_activity("Waiting for the remote build to complete"), \
_signals.terminator(partial(self.cancel_operation, channel)):
operation = __run_remote_command(stub, execute_request=request)
if operation is None:
return None
elif operation.done:
return operation
while operation is not None and not operation.done:
operation = __run_remote_command(stub, running_operation=operation)
return operation
def cancel_operation(self, channel):
# If we don't have the name can't send request.
if self.operation_name is None:
return
stub = operations_pb2_grpc.OperationsStub(channel)
request = operations_pb2.CancelOperationRequest(
name=str(self.operation_name))
try:
stub.CancelOperation(request)
except grpc.RpcError as e:
if (e.code() == grpc.StatusCode.UNIMPLEMENTED or
e.code() == grpc.StatusCode.INVALID_ARGUMENT):
pass
else:
raise SandboxError("Failed trying to send CancelOperation request: "
"{} ({})".format(e.details(), e.code().name))
def process_job_output(self, output_directories, output_files):
# Reads the remote execution server response to an execution request.
#
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment