diff --git a/buildstream/sandbox/_sandboxremote.py b/buildstream/sandbox/_sandboxremote.py new file mode 100644 index 0000000000000000000000000000000000000000..a9c24dc2e3e185ec96d39a7ea861ecd1e117d9eb --- /dev/null +++ b/buildstream/sandbox/_sandboxremote.py @@ -0,0 +1,195 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2018 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Jim MacArthur <jim.macarthur@codethink.co.uk> + +import os +import time + +import grpc + +from . import Sandbox +from ..storage._filebaseddirectory import FileBasedDirectory +from ..storage._casbaseddirectory import CasBasedDirectory +from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc +from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc + +from .._artifactcache.cascache import CASCache + + +class SandboxError(Exception): + pass + + +# SandboxRemote() +# +# This isn't really a sandbox, it's a stub which sends all the source +# to a remote server and retrieves the results from it. +# +class SandboxRemote(Sandbox): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.user_ns_available = kwargs['user_ns_available'] + self.die_with_parent_available = kwargs['die_with_parent_available'] + self.cascache = None + + def _get_cascache(self): + if self.cascache is None: + self.cascache = CASCache(self._get_context()) + self.cascache.setup_remotes(use_config=True) + return self.cascache + + def __run_remote_command(self, cascache, command, input_root_digest, environment): + environment_variables = [] + for(k, v) in environment.items(): + environment_variables.append(remote_execution_pb2.Command.EnvironmentVariable(name=k, value=v)) + remote_command = remote_execution_pb2.Command(arguments=command, environment_variables=environment_variables) + + # Serialise this into the cascache... + command_digest = cascache.add_object(buffer=remote_command.SerializeToString()) + + command_ref = 'worker-command/{}'.format(command_digest.hash) + cascache.set_ref(command_ref, command_digest) + + command_push_successful = cascache.push_refs([command_ref], self._get_project(), may_have_dependencies=False) + if command_push_successful or cascache.verify_key_pushed(command_ref, self._get_project()): + # Next, try to create a communication channel + port = 50051 + channel = grpc.insecure_channel('dekatron.office.codethink.co.uk:{}'.format(port)) + stub = remote_execution_pb2_grpc.ExecutionStub(channel) + ops_stub = operations_pb2_grpc.OperationsStub(channel) + + # Having done that, create and send the action. + + action = remote_execution_pb2.Action(command_digest=command_digest, + input_root_digest=input_root_digest, + output_files=[], + output_directories=[self._output_directory], + platform=None, + timeout=None, + do_not_cache=True) + + request = remote_execution_pb2.ExecuteRequest(instance_name='default', + action=action, + skip_cache_lookup=True) + + operation = stub.Execute(request) # Returns Operation + job_name = operation.name + else: + # Source push failed + return None + while True: + # TODO: Timeout + # Refresh the operation data periodically using the name + request = operations_pb2.GetOperationRequest(name=job_name) + operation = ops_stub.GetOperation(request) + time.sleep(1) + if operation.done: + break + return operation + + def process_job_output(self, output_directories, output_files): + # output_directories is an array of OutputDirectory objects. + # output_files is an array of OutputFile objects. + # + # We only specify one output_directory, so it's an error + # for there to be any output files or more than one directory at the moment. + + if output_files: + raise SandboxError("Output files were returned when we didn't request any.") + elif len(output_directories) > 1: + error_text = "More than one output directory was returned from the build server: {}" + raise SandboxError(error_text.format(output_directories)) + + digest = output_directories[0].tree_digest + if digest is None or digest.hash is None or digest.hash == "": + raise SandboxError("Output directory structure had no digest attached.") + + # Now do a pull to ensure we have the necessary parts. + cascache = self._get_cascache() + cascache.pull_key(digest.hash, digest.size_bytes, self._get_project()) + path_components = os.path.split(self._output_directory) + + # Now what we have is a digest for the output. Once we return, the calling process will + # attempt to descend into our directory and find that directory, so we need to overwrite + # that. + + if not path_components: + # The artifact wants the whole directory; we could just return the returned hash in its + # place, but we don't have a means to do that yet. + raise SandboxError("Unimplemented: Output directory is empty or equal to the sandbox root.") + + # At the moment, we will get the whole directory back in the first directory argument and we need + # to replace the sandbox's virtual directory with that. Creating a new virtual directory object + # from another hash will be interesting, though... + + new_dir = CasBasedDirectory(self._get_context(), ref=digest) + self.set_virtual_directory(new_dir) + + def run(self, command, flags, *, cwd=None, env=None): + # Upload sources + upload_vdir = self.get_virtual_directory() + if isinstance(upload_vdir, FileBasedDirectory): + # Make a new temporary directory to put source in + upload_vdir = CasBasedDirectory(self._get_context(), ref=None) + upload_vdir.import_files(self.get_virtual_directory().get_underlying_directory()) + + # Now, push that key (without necessarily needing a ref) to the remote. + cascache = self._get_cascache() + + ref = 'worker-source/{}'.format(upload_vdir.ref.hash) + upload_vdir._save(ref) + source_push_successful = cascache.push_refs([ref], self._get_project()) + # Fallback to the sandbox default settings for + # the cwd and environment. + + if env is None: + env = self._get_environment() + + # We want command args as a list of strings + if isinstance(command, str): + command = [command] + + # Now transmit the command to execute + if source_push_successful or cascache.verify_key_pushed(ref, self._get_project()): + response = self.__run_remote_command(cascache, command, upload_vdir.ref, env) + + if response is None or response.HasField("error"): + # Build failed, so return a failure code + return 1 + else: + + # At the moment, response can either be an + # ExecutionResponse containing an ActionResult, or an + # ActionResult directly. + executeResponse = remote_execution_pb2.ExecuteResponse() + if response.response.Is(executeResponse.DESCRIPTOR): + # Unpack ExecuteResponse and set response to its response + response.response.Unpack(executeResponse) + response = executeResponse + + actionResult = remote_execution_pb2.ActionResult() + if response.response.Is(actionResult.DESCRIPTOR): + response.response.Unpack(actionResult) + self.process_job_output(actionResult.output_directories, actionResult.output_files) + else: + raise SandboxError("Received unknown message from server (expected ExecutionResponse).") + else: + raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.") + return 0