Skip to content
Snippets Groups Projects
Commit fe578726 authored by Martin Blanchard's avatar Martin Blanchard
Browse files

temp_directory.py: Better action execution support

#61
parent 2c0d9b59
No related branches found
No related tags found
Loading
......@@ -19,71 +19,94 @@ import tempfile
from google.protobuf import any_pb2
from buildgrid.utils import read_file, create_digest, write_fetch_directory, parse_to_pb2_from_fetch
from buildgrid.client.cas import upload
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
from buildgrid.utils import write_fetch_directory, parse_to_pb2_from_fetch
from buildgrid.utils import output_file_maker, output_directory_maker
def work_temp_directory(context, lease):
""" Bot downloads directories and files into a temp directory,
then uploads results back to CAS
"""Executes a lease for a build action, using host tools.
"""
parent = context.parent
stub_bytestream = bytestream_pb2_grpc.ByteStreamStub(context.cas_channel)
instance_name = context.parent
logger = context.logger
action_digest = remote_execution_pb2.Digest()
lease.payload.Unpack(action_digest)
action = remote_execution_pb2.Action()
action = parse_to_pb2_from_fetch(remote_execution_pb2.Action(),
stub_bytestream, action_digest, instance_name)
action = parse_to_pb2_from_fetch(action, stub_bytestream, action_digest, parent)
with tempfile.TemporaryDirectory() as temp_directory:
command = parse_to_pb2_from_fetch(remote_execution_pb2.Command(),
stub_bytestream, action.command_digest, instance_name)
with tempfile.TemporaryDirectory() as temp_dir:
write_fetch_directory(temp_directory, stub_bytestream,
action.input_root_digest, instance_name)
command = remote_execution_pb2.Command()
command = parse_to_pb2_from_fetch(command, stub_bytestream, action.command_digest, parent)
arguments = "cd {} &&".format(temp_dir)
environment = os.environ.copy()
for variable in command.environment_variables:
if variable.name not in ['PATH', 'PWD']:
environment[variable.name] = variable.value
command_line = list()
for argument in command.arguments:
arguments += " {}".format(argument)
context.logger.info(arguments)
write_fetch_directory(temp_dir, stub_bytestream, action.input_root_digest, parent)
proc = subprocess.Popen(arguments,
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
# TODO: Should return the std_out to the user
proc.communicate()
result = remote_execution_pb2.ActionResult()
requests = []
for output_file in command.output_files:
path = os.path.join(temp_dir, output_file)
chunk = read_file(path)
digest = create_digest(chunk)
result.output_files.extend([remote_execution_pb2.OutputFile(path=output_file,
digest=digest)])
requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
digest=digest, data=chunk))
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=parent,
requests=requests)
stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.cas_channel)
stub_cas.BatchUpdateBlobs(request)
result_any = any_pb2.Any()
result_any.Pack(result)
lease.result.CopyFrom(result_any)
command_line.append(argument.strip())
working_directory = None
if command.working_directory:
working_directory = os.path.join(temp_directory,
command.working_directory)
os.makedirs(working_directory, exist_ok=True)
else:
working_directory = temp_directory
# Ensure that output files structure exists:
for output_path in command.output_files:
directory_path = os.path.join(working_directory,
os.path.dirname(output_path))
os.makedirs(directory_path, exist_ok=True)
logger.debug(' '.join(command_line))
process = subprocess.Popen(command_line,
cwd=working_directory,
universal_newlines=True,
env=environment,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
# TODO: Should return the stdout and stderr in the ActionResult.
process.communicate()
action_result = remote_execution_pb2.ActionResult()
with upload(context.cas_channel, instance=instance_name) as cas:
for output_path in command.output_files:
file_path = os.path.join(working_directory, output_path)
# Missing outputs should simply be omitted in ActionResult:
if not os.path.isfile(file_path):
continue
output_file = output_file_maker(file_path, working_directory, cas=cas)
action_result.output_files.extend([output_file])
for output_path in command.output_directories:
directory_path = os.path.join(working_directory, output_path)
# Missing outputs should simply be omitted in ActionResult:
if not os.path.isdir(directory_path):
continue
# OutputDirectory.path should be relative to the working direcory:
output_directory = output_directory_maker(directory_path, working_directory, cas=cas)
action_result.output_directories.extend([output_directory])
action_result_any = any_pb2.Any()
action_result_any.Pack(action_result)
lease.result.CopyFrom(action_result_any)
return lease
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment