Skip to content
Snippets Groups Projects
Commit bf5f6bef authored by Raoul Hidalgo Charman's avatar Raoul Hidalgo Charman
Browse files

_sandboxremote.py: Add sigterm handler that sends CancelOperation

parent 494d7018
No related branches found
No related tags found
Loading
Pipeline #34421580 passed
......@@ -19,6 +19,7 @@
# Jim MacArthur <jim.macarthur@codethink.co.uk>
import os
import signal
from urllib.parse import urlparse
import grpc
......@@ -26,8 +27,10 @@ import grpc
from . import Sandbox
from ..storage._filebaseddirectory import FileBasedDirectory
from ..storage._casbaseddirectory import CasBasedDirectory
from .. import _signals
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from .._protos.google.rpc import code_pb2
from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
class SandboxError(Exception):
......@@ -54,6 +57,7 @@ class SandboxRemote(Sandbox):
"Only plain HTTP is currenlty supported (no HTTPS).")
self.server_url = '{}:{}'.format(url.hostname, url.port)
self.operation_name = None
def run_remote_command(self, command, input_root_digest, working_directory, environment):
# Sends an execution request to the remote execution server.
......@@ -104,11 +108,15 @@ class SandboxRemote(Sandbox):
request = remote_execution_pb2.WaitExecutionRequest(name=running_operation.name)
operation_iterator = stub.WaitExecution(request)
for operation in operation_iterator:
if operation.done:
return operation
else:
last_operation = operation
with _signals.terminator(self.cancel_operation):
with _signals.blocked([signal.SIGTERM], ignore=False):
operation = next(operation_iterator)
self.operation_name = operation.name
for operation in operation_iterator:
if operation.done:
return operation
else:
last_operation = operation
except grpc.RpcError as e:
status_code = e.code()
if status_code == grpc.StatusCode.UNAVAILABLE:
......@@ -135,12 +143,23 @@ class SandboxRemote(Sandbox):
return None
elif operation.done:
return operation
while operation is not None and not operation.done:
operation = __run_remote_command(stub, running_operation=operation)
return operation
def cancel_operation(self):
channel = grpc.insecure_channel(self.server_url)
stub = operations_pb2_grpc.OperationsStub(channel)
request = operations_pb2.CancelOperationRequest(
name="/{}".format(self.operation_name))
try:
stub.CancelOperation(request)
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.UNIMPLEMENTED:
raise SandboxError("{} ({})".format(e.details(), e.code().name))
def process_job_output(self, output_directories, output_files):
# Reads the remote execution server response to an execution request.
#
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment