Skip to content
Snippets Groups Projects
Commit 08565f1e authored by Jim MacArthur's avatar Jim MacArthur
Browse files

_sandboxremote.py: New file.

parent 47b81e9c
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python3
#
# Copyright (C) 2018 Codethink Limited
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Jim MacArthur <jim.macarthur@codethink.co.uk>
import os
import time
import grpc
from . import Sandbox
from ..storage._filebaseddirectory import FileBasedDirectory
from ..storage._casbaseddirectory import CasBasedDirectory
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
from .._artifactcache.cascache import CASCache
class SandboxError(Exception):
pass
# SandboxRemote()
#
# This isn't really a sandbox, it's a stub which sends all the source
# to a remote server and retrieves the results from it.
#
class SandboxRemote(Sandbox):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.user_ns_available = kwargs['user_ns_available']
self.die_with_parent_available = kwargs['die_with_parent_available']
self.cascache = None
def _get_cascache(self):
if self.cascache is None:
self.cascache = CASCache(self._get_context())
self.cascache.setup_remotes(use_config=True)
return self.cascache
def __run_remote_command(self, cascache, command, input_root_digest, environment):
environment_variables = [ remote_execution_pb2.Command.
EnvironmentVariable(name=k, value=v)
for (k,v) in environment.items() ]
# Create and send the Command object.
remote_command = remote_execution_pb2.Command(arguments=command, environment_variables=environment_variables,
output_files=[],
output_directories=[self._output_directory],
platform=None)
command_digest = cascache.add_object(buffer=remote_command.SerializeToString())
command_ref = 'worker-command/{}'.format(command_digest.hash)
cascache.set_ref(command_ref, command_digest)
command_push_successful = cascache.push_refs([command_ref], self._get_project(), may_have_dependencies=False)
if not command_push_successful and not cascache.verify_key_pushed(command_ref, self._get_project()):
# Command push failed
return None
# Create and send the action.
action = remote_execution_pb2.Action(command_digest=command_digest,
input_root_digest=input_root_digest,
timeout=None,
do_not_cache=True)
action_digest = cascache.add_object(buffer=action.SerializeToString())
action_ref = 'worker-action/{}'.format(command_digest.hash)
cascache.set_ref(action_ref, action_digest)
action_push_successful = cascache.push_refs([action_ref], self._get_project(), may_have_dependencies=False)
if not action_push_successful and not cascache.verify_key_pushed(action_ref, self._get_project()):
# Action push failed
return None
# Next, try to create a communication channel to the BuildGrid server.
port = 50051
channel = grpc.insecure_channel('dekatron.office.codethink.co.uk:{}'.format(port))
stub = remote_execution_pb2_grpc.ExecutionStub(channel)
request = remote_execution_pb2.ExecuteRequest(instance_name='default',
action_digest=action_digest,
skip_cache_lookup=True)
operation_iterator = stub.Execute(request)
for operation in operation_iterator:
if operation.done:
break
# TODO: Do we need a sleep here?
return operation
def process_job_output(self, output_directories, output_files):
# output_directories is an array of OutputDirectory objects.
# output_files is an array of OutputFile objects.
#
# We only specify one output_directory, so it's an error
# for there to be any output files or more than one directory at the moment.
if output_files:
raise SandboxError("Output files were returned when we didn't request any.")
elif len(output_directories) > 1:
error_text = "More than one output directory was returned from the build server: {}"
raise SandboxError(error_text.format(output_directories))
elif len(output_directories) < 1:
error_text = "No output directory was returned from the build server."
raise SandboxError(error_test)
digest = output_directories[0].tree_digest
if digest is None or digest.hash is None or digest.hash == "":
raise SandboxError("Output directory structure had no digest attached.")
# Now do a pull to ensure we have the necessary parts.
cascache = self._get_cascache()
cascache.pull_key(digest.hash, digest.size_bytes, self._get_project())
path_components = os.path.split(self._output_directory)
# Now what we have is a digest for the output. Once we return, the calling process will
# attempt to descend into our directory and find that directory, so we need to overwrite
# that.
if not path_components:
# The artifact wants the whole directory; we could just return the returned hash in its
# place, but we don't have a means to do that yet.
raise SandboxError("Unimplemented: Output directory is empty or equal to the sandbox root.")
# At the moment, we will get the whole directory back in the first directory argument and we need
# to replace the sandbox's virtual directory with that. Creating a new virtual directory object
# from another hash will be interesting, though...
new_dir = CasBasedDirectory(self._get_context(), ref=digest)
self.set_virtual_directory(new_dir)
def run(self, command, flags, *, cwd=None, env=None):
# Upload sources
upload_vdir = self.get_virtual_directory()
if isinstance(upload_vdir, FileBasedDirectory):
# Make a new temporary directory to put source in
upload_vdir = CasBasedDirectory(self._get_context(), ref=None)
upload_vdir.import_files(self.get_virtual_directory()._get_underlying_directory())
# Now, push that key (without necessarily needing a ref) to the remote.
cascache = self._get_cascache()
ref = 'worker-source/{}'.format(upload_vdir.ref.hash)
upload_vdir._save(ref)
source_push_successful = cascache.push_refs([ref], self._get_project())
# Set up environment and PWD
if env is None:
env = self._get_environment()
if 'PWD' not in env:
env['PWD'] = self._get_work_directory()
# We want command args as a list of strings
if isinstance(command, str):
command = [command]
# Now transmit the command to execute
if source_push_successful or cascache.verify_key_pushed(ref, self._get_project()):
response = self.__run_remote_command(cascache, command, upload_vdir.ref, env)
if response is None or response.HasField("error"):
# Build failed, so return a failure code
return 1
else:
# At the moment, response can either be an
# ExecutionResponse containing an ActionResult, or an
# ActionResult directly.
executeResponse = remote_execution_pb2.ExecuteResponse()
if response.response.Is(executeResponse.DESCRIPTOR):
# Unpack ExecuteResponse and set response to its response
response.response.Unpack(executeResponse)
response = executeResponse
actionResult = remote_execution_pb2.ActionResult()
if response.response.Is(actionResult.DESCRIPTOR):
response.response.Unpack(actionResult)
self.process_job_output(actionResult.output_directories, actionResult.output_files)
else:
raise SandboxError("Received unknown message from server (expected ExecutionResponse).")
else:
raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
return 0
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment