Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • edbaunton/buildgrid
  • BuildGrid/buildgrid
  • bloomberg/buildgrid
  • devcurmudgeon/buildgrid
  • mhadjimichael/buildgrid
  • jmacarthur/buildgrid
  • rkothur/buildgrid
  • valentindavid/buildgrid
  • jjardon/buildgrid
  • RichKen/buildgrid
  • jbonney/buildgrid
  • onsha_alexander/buildgrid
  • santigl/buildgrid
  • mostynb/buildgrid
  • hoffbrinkle/buildgrid
  • Malinskiy/buildgrid
  • coldtom/buildgrid
  • azeemb_a/buildgrid
  • pointswaves/buildgrid
  • BenjaminSchubert/buildgrid
  • michaellee8/buildgrid
  • anil-anil/buildgrid
  • seanborg/buildgrid
  • jdelong12/buildgrid
  • jclay/buildgrid
  • bweston92/buildgrid
  • zchen723/buildgrid
  • cpratt34/buildgrid
  • armbiant/apache-buildgrid
  • armbiant/android-buildgrid
  • itsme300/buildgrid
  • sbairoliya/buildgrid
32 results
Select Git revision
Show changes
Commits on Source (6)
......@@ -19,71 +19,97 @@ import tempfile
from google.protobuf import any_pb2
from buildgrid.utils import read_file, create_digest, write_fetch_directory, parse_to_pb2_from_fetch
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
from buildgrid.utils import write_fetch_directory, parse_to_pb2_from_fetch
from buildgrid.utils import output_file_maker, output_directory_maker
def work_temp_directory(context, lease):
""" Bot downloads directories and files into a temp directory,
then uploads results back to CAS
"""Executes a lease for a build action, using host tools.
"""
parent = context.parent
instance_name = context.parent
stub_bytestream = bytestream_pb2_grpc.ByteStreamStub(context.cas_channel)
action_digest = remote_execution_pb2.Digest()
lease.payload.Unpack(action_digest)
action = remote_execution_pb2.Action()
action = parse_to_pb2_from_fetch(remote_execution_pb2.Action(),
stub_bytestream, action_digest, instance_name)
action = parse_to_pb2_from_fetch(action, stub_bytestream, action_digest, parent)
with tempfile.TemporaryDirectory() as temp_directory:
command = parse_to_pb2_from_fetch(remote_execution_pb2.Command(),
stub_bytestream, action.command_digest, instance_name)
with tempfile.TemporaryDirectory() as temp_dir:
write_fetch_directory(temp_directory, stub_bytestream,
action.input_root_digest, instance_name)
command = remote_execution_pb2.Command()
command = parse_to_pb2_from_fetch(command, stub_bytestream, action.command_digest, parent)
arguments = "cd {} &&".format(temp_dir)
environment = os.environ.copy()
for variable in command.environment_variables:
if variable.name not in ['PATH', 'PWD']:
environment[variable.name] = variable.value
command_line = list()
for argument in command.arguments:
arguments += " {}".format(argument)
context.logger.info(arguments)
write_fetch_directory(temp_dir, stub_bytestream, action.input_root_digest, parent)
proc = subprocess.Popen(arguments,
shell=True,
command_line.append(argument.strip())
working_directory = None
if command.working_directory:
working_directory = os.path.join(temp_directory,
command.working_directory)
os.makedirs(working_directory, exist_ok=True)
else:
working_directory = temp_directory
# Ensure that output files structure exists:
for output_path in command.output_files:
directory_path = os.path.join(working_directory,
os.path.dirname(output_path))
os.makedirs(directory_path, exist_ok=True)
process = subprocess.Popen(command_line,
cwd=working_directory,
universal_newlines=True,
env=environment,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
# TODO: Should return the stdout and stderr to the user.
process.communicate()
update_requests = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name)
action_result = remote_execution_pb2.ActionResult()
# TODO: Should return the std_out to the user
proc.communicate()
for output_path in command.output_files:
file_path = os.path.join(working_directory, output_path)
# Missing outputs should simply be omitted in ActionResult:
if not os.path.isfile(file_path):
continue
result = remote_execution_pb2.ActionResult()
requests = []
for output_file in command.output_files:
path = os.path.join(temp_dir, output_file)
chunk = read_file(path)
# OutputFile.path should be relative to the working direcory:
output_file, update_request = output_file_maker(file_path, working_directory)
digest = create_digest(chunk)
action_result.output_files.extend([output_file])
update_requests.requests.extend([update_request])
result.output_files.extend([remote_execution_pb2.OutputFile(path=output_file,
digest=digest)])
for output_path in command.output_directories:
directory_path = os.path.join(working_directory, output_path)
# Missing outputs should simply be omitted in ActionResult:
if not os.path.isdir(directory_path):
continue
requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
digest=digest, data=chunk))
# OutputDirectory.path should be relative to the working direcory:
output_directory, update_request = output_directory_maker(directory_path, working_directory)
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=parent,
requests=requests)
action_result.output_directories.extend([output_directory])
update_requests.requests.extend(update_request)
stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.cas_channel)
stub_cas.BatchUpdateBlobs(request)
stub_cas.BatchUpdateBlobs(update_requests)
result_any = any_pb2.Any()
result_any.Pack(result)
action_result_any = any_pb2.Any()
action_result_any.Pack(action_result)
lease.result.CopyFrom(result_any)
lease.result.CopyFrom(action_result_any)
return lease
......@@ -86,6 +86,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
yield operations_pb2.Operation()
def _get_instance(self, name):
# If client does not support multiple instances, it may omit the
# instance name request parameter, so better map our default:
if not name and len(self._instances) == 1:
name = next(iter(self._instances))
try:
return self._instances[name]
......
......@@ -13,6 +13,7 @@
# limitations under the License.
from operator import attrgetter
import os
from buildgrid.settings import HASH
......@@ -31,30 +32,59 @@ def gen_fetch_blob(stub, digest, instance_name=""):
yield response.data
def write_fetch_directory(directory, stub, digest, instance_name=""):
""" Given a directory digest, fetches files and writes them to a directory
def write_fetch_directory(root_directory, stub, digest, instance_name=None):
"""Locally replicates a directory from CAS.
Args:
root_directory (str): local directory to populate.
stub (): gRPC stub for CAS communication.
digest (Digest): digest for the directory to fetch from CAS.
instance_name (str, optional): farm instance name to query data from.
"""
# TODO: Extend to symlinks and inner directories
# pathlib.Path('/my/directory').mkdir(parents=True, exist_ok=True)
if not os.path.isabs(root_directory):
root_directory = os.path.abspath(root_directory)
if not os.path.exists(root_directory):
os.makedirs(root_directory, exist_ok=True)
directory_pb2 = remote_execution_pb2.Directory()
directory_pb2 = parse_to_pb2_from_fetch(directory_pb2, stub, digest, instance_name)
directory = parse_to_pb2_from_fetch(remote_execution_pb2.Directory(),
stub, digest, instance_name)
for directory_node in directory.directories:
child_path = os.path.join(root_directory, directory_node.name)
write_fetch_directory(child_path, stub, directory_node.digest, instance_name)
for file_node in directory.files:
child_path = os.path.join(root_directory, file_node.name)
for file_node in directory_pb2.files:
path = os.path.join(directory, file_node.name)
with open(path, 'wb') as f:
write_fetch_blob(f, stub, file_node.digest, instance_name)
with open(child_path, 'wb') as child_file:
write_fetch_blob(child_file, stub, file_node.digest, instance_name)
for symlink_node in directory.symlinks:
child_path = os.path.join(root_directory, symlink_node.name)
def write_fetch_blob(out, stub, digest, instance_name=""):
""" Given an output buffer, fetches blob and writes to buffer
if os.path.isabs(symlink_node.target):
continue # No out of temp-directory links for now.
target_path = os.path.join(root_directory, symlink_node.target)
os.symlink(child_path, target_path)
def write_fetch_blob(target_file, stub, digest, instance_name=None):
"""Extracts a blob from CAS into a local file.
Args:
target_file (str): local file to write.
stub (): gRPC stub for CAS communication.
digest (Digest): digest for the blob to fetch from CAS.
instance_name (str, optional): farm instance name to query data from.
"""
for stream in gen_fetch_blob(stub, digest, instance_name):
out.write(stream)
target_file.write(stream)
target_file.flush()
out.flush()
assert digest.size_bytes == os.fstat(out.fileno()).st_size
assert digest.size_bytes == os.fstat(target_file.fileno()).st_size
def parse_to_pb2_from_fetch(pb2, stub, digest, instance_name=""):
......@@ -70,7 +100,15 @@ def parse_to_pb2_from_fetch(pb2, stub, digest, instance_name=""):
def create_digest(bytes_to_digest):
""" Creates a hash based on the hex digest and returns the digest
"""Computes the :obj:`Digest` of a piece of data.
The :obj:`Digest` of a data is a function of its hash **and** size.
Args:
bytes_to_digest (bytes): byte data to digest.
Returns:
:obj:`Digest`: The gRPC :obj:`Digest` for the given byte data.
"""
return remote_execution_pb2.Digest(hash=HASH(bytes_to_digest).hexdigest(),
size_bytes=len(bytes_to_digest))
......@@ -107,6 +145,202 @@ def file_maker(file_path, file_digest):
is_executable=os.access(file_path, os.X_OK))
def read_file(read):
with open(read, 'rb') as f:
return f.read()
def directory_maker(directory_path):
"""Creates a :obj:`Directory` from a local directory.
Args:
directory_path (str): absolute or relative path to a local directory.
Returns:
:obj:`Directory`, list of :obj:`Directory`, list of
:obj:`BatchUpdateBlobsRequest`: Tuple of a new gRPC :obj:`Directory` for
the directory pointed by `directory_path`, a list of new gRPC
:obj:`Directory` for every children of that directory and the
corresponding list of :obj:`BatchUpdateBlobsRequest` for CAS upload.
The :obj:`Directory` children list may come in any order.
The :obj:`BatchUpdateBlobsRequest` list may come in any order. However,
its last element is guaranteed to be the root :obj:`Direcotry`'s
request.
"""
if not os.path.isabs(directory_path):
directory_path = os.path.abspath(directory_path)
child_directories = list()
update_requests = list()
files, directories, symlinks = list(), list(), list()
for directory_entry in os.scandir(directory_path):
# Create a FileNode and corresponding BatchUpdateBlobsRequest:
if directory_entry.is_file(follow_symlinks=False):
node_blob = read_file(directory_entry.path)
node_digest = create_digest(node_blob)
node = remote_execution_pb2.FileNode()
node.name = directory_entry.name
node.digest = node_digest
node.is_executable = os.access(directory_entry.path, os.X_OK)
node_request = remote_execution_pb2.BatchUpdateBlobsRequest.Request(digest=node_digest)
node_request.data = node_blob
update_requests.append(node_request)
files.append(node)
# Create a DirectoryNode and corresponding BatchUpdateBlobsRequest:
elif directory_entry.is_dir(follow_symlinks=False):
node_directory, node_children, node_requests = directory_maker(directory_entry.path)
node = remote_execution_pb2.DirectoryNode()
node.name = directory_entry.name
node.digest = node_requests[-1].digest
child_directories.extend(node_children)
child_directories.append(node_directory)
update_requests.extend(node_requests)
directories.append(node)
# Create a SymlinkNode if necessary;
elif os.path.islink(directory_entry.path):
node_target = os.readlink(directory_entry.path)
node = remote_execution_pb2.SymlinkNode()
node.name = directory_entry.name
node.target = node_target
symlinks.append(node)
directory = remote_execution_pb2.Directory()
directory.files.extend(files.sort(key=attrgetter('name')))
directory.directories.extend(directories.sort(key=attrgetter('name')))
directory.symlinks.extend(symlinks.sort(key=attrgetter('name')))
directory_blob = directory.SerializeToString()
directory_digest = create_digest(directory_blob)
update_request = remote_execution_pb2.BatchUpdateBlobsRequest.Request(digest=directory_digest)
update_request.data = directory_blob
update_requests.append(update_request)
return directory, child_directories, update_requests
def read_file(file_path):
"""Loads raw file content in memory.
Args:
file_path (str): path to the target file.
Returns:
bytes: Raw file's content until EOF.
Raises:
OSError: If `file_path` does not exist or is not readable.
"""
with open(file_path, 'rb') as byte_file:
return byte_file.read()
def output_file_maker(file_path, input_path):
"""Creates an :obj:`OutputFile` from a local file.
`file_path` **must** point inside or be relative to `input_path`.
Args:
file_path (str): absolute or relative path to a local file.
input_path (str): absolute or relative path to the input root directory.
Returns:
:obj:`OutputFile`, :obj:`BatchUpdateBlobsRequest`: Tuple of a new gRPC
:obj:`OutputFile` object for the file pointed by `file_path` and the
corresponding :obj:`BatchUpdateBlobsRequest` for CAS upload.
"""
if not os.path.isabs(file_path):
file_path = os.path.abspath(file_path)
if not os.path.isabs(input_path):
input_path = os.path.abspath(input_path)
file_blob = read_file(file_path)
file_digest = create_digest(file_blob)
output_file = remote_execution_pb2.OutputFile(digest=file_digest)
output_file.path = os.path.relpath(file_path, start=input_path)
output_file.is_executable = os.access(file_path, os.X_OK)
update_request = remote_execution_pb2.BatchUpdateBlobsRequest.Request(digest=file_digest)
update_request.data = file_blob
return output_file, update_request
def output_directory_maker(directory_path, working_path):
"""Creates an :obj:`OutputDirectory` from a local directory.
`directory_path` **must** point inside or be relative to `input_path`.
Args:
directory_path (str): absolute or relative path to a local directory.
working_path (str): absolute or relative path to the working directory.
Returns:
:obj:`OutputDirectory`, :obj:`BatchUpdateBlobsRequest`: Tuple of a new
gRPC :obj:`OutputDirectory` for the directory pointed by
`directory_path` and the corresponding list of
:obj:`BatchUpdateBlobsRequest` for CAS upload.
"""
if not os.path.isabs(directory_path):
directory_path = os.path.abspath(directory_path)
if not os.path.isabs(working_path):
working_path = os.path.abspath(working_path)
_, update_requests = tree_maker(directory_path)
output_directory = remote_execution_pb2.OutputDirectory()
output_directory.tree_digest = update_requests[-1].digest
output_directory.path = os.path.relpath(directory_path, start=working_path)
output_directory_blob = output_directory.SerializeToString()
output_directory_digest = create_digest(output_directory_blob)
update_request = remote_execution_pb2.BatchUpdateBlobsRequest.Request(digest=output_directory_digest)
update_request.data = output_directory_blob
update_requests.append(update_request)
return output_directory, update_requests
def tree_maker(directory_path):
"""Creates a :obj:`Tree` from a local directory.
Args:
directory_path (str): absolute or relative path to a local directory.
Returns:
:obj:`Tree`, :obj:`BatchUpdateBlobsRequest`: Tuple of a new
gRPC :obj:`Tree` for the directory pointed by `directory_path` and the
corresponding list of :obj:`BatchUpdateBlobsRequest` for CAS upload.
The :obj:`BatchUpdateBlobsRequest` list may come in any order. However,
its last element is guaranteed to be the :obj:`Tree`'s request.
"""
if not os.path.isabs(directory_path):
directory_path = os.path.abspath(directory_path)
directory, child_directories, update_requests = directory_maker(directory_path)
tree = remote_execution_pb2.Tree()
tree.children.extend([child_directories])
tree.root.CopyFrom(directory)
tree_blob = tree.SerializeToString()
tree_digest = create_digest(tree_blob)
update_request = remote_execution_pb2.BatchUpdateBlobsRequest.Request(digest=tree_digest)
update_request.data = tree_blob
update_requests.append(update_request)
return tree, update_requests