Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • edbaunton/buildgrid
  • BuildGrid/buildgrid
  • bloomberg/buildgrid
  • devcurmudgeon/buildgrid
  • mhadjimichael/buildgrid
  • jmacarthur/buildgrid
  • rkothur/buildgrid
  • valentindavid/buildgrid
  • jjardon/buildgrid
  • RichKen/buildgrid
  • jbonney/buildgrid
  • onsha_alexander/buildgrid
  • santigl/buildgrid
  • mostynb/buildgrid
  • hoffbrinkle/buildgrid
  • Malinskiy/buildgrid
  • coldtom/buildgrid
  • azeemb_a/buildgrid
  • pointswaves/buildgrid
  • BenjaminSchubert/buildgrid
  • michaellee8/buildgrid
  • anil-anil/buildgrid
  • seanborg/buildgrid
  • jdelong12/buildgrid
  • jclay/buildgrid
  • bweston92/buildgrid
  • zchen723/buildgrid
  • cpratt34/buildgrid
  • armbiant/apache-buildgrid
  • armbiant/android-buildgrid
  • itsme300/buildgrid
  • sbairoliya/buildgrid
32 results
Show changes
Commits on Source (11)
Showing with 843 additions and 149 deletions
......@@ -31,7 +31,7 @@ before_script:
- ${BGD} server start &
- sleep 1 # Allow server to boot
- ${BGD} bot --host=0.0.0.0 dummy &
- ${BGD} execute --host=0.0.0.0 request --wait-for-completion
- ${BGD} execute --host=0.0.0.0 request-dummy --wait-for-completion
tests-debian-stretch:
<<: *linux-tests
......@@ -40,6 +40,17 @@ run-dummy-job-debian:
image: buildstream/buildstream-debian
<<: *dummy-job
build-docs:
stage: test
script:
- pip3 install --editable ".[docs]"
- make -C docs html
- mkdir -p documentation/
- cp -a docs/build/html/. documentation/
artifacts:
paths:
- documentation/
coverage:
stage: post
......@@ -61,8 +72,10 @@ pages:
stage: post
dependencies:
- tests-debian-stretch
- build-docs
script:
- mv coverage/ public/
- cp -a coverage/. public/
- cp -a documentation/* public/
artifacts:
paths:
- public/
......
......@@ -7,7 +7,7 @@ extension-pkg-whitelist=
# Add files or directories to the blacklist. They should be base names, not
# paths.
#ignore=CVS,tests,doc
#ignore=tests,docs
# Add files or directories matching the regex patterns to the blacklist. The
# regex matches against base names, not paths.
......
......@@ -38,7 +38,7 @@ In one terminal, start a server::
In another terminal, send a request for work::
bgd execute request
bgd execute request-dummy
The stage should show as `QUEUED` as it awaits a bot to pick up the work::
......@@ -51,3 +51,35 @@ Create a bot session::
Show the work as completed::
bgd execute list
Instructions for a Simple Build
-------------------------------
This example covers a simple build. The user will upload a directory containing a C file and a command to the CAS. The bot will then fetch the uploaded directory and command which will then be run inside a temporary directory. The result will then be uploaded to the CAS and downloaded by the user. This is an early demo and still lacks a few features such as symlink support and checking to see if files exist in the CAS before executing a command.
Create a new directory called `test-buildgrid/` and place the following C file in it called `hello.c`::
#include <stdio.h>
int main()
{
printf("Hello, World!\n");
return 0;
}
Now start a BuildGrid server, passing it a directory it can write a CAS to::
bgd server start --cas disk --cas-cache disk --cas-disk-directory /path/to/empty/directory
Start the following bot session::
bgd bot temp-directory
Upload the directory containing the C file::
bgd cas upload-dir /path/to/test-buildgrid
Now we send an execution request to the bot with the name of the epxected `output-file`, a boolean describing if it is executeable, the path to the directory we uploaded in order to calculate the digest and finally the command to run on the bot::
bgd execute command --output-file hello True /path/to/test-buildgrid -- gcc -Wall hello.c -o hello
The resulting executeable should have returned to a new directory called `testing/`
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import subprocess
import tempfile
import grpc
from google.protobuf import any_pb2
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
from buildgrid.utils import read_file
def work_buildbox(context, lease):
logger = context.logger
action_any = lease.payload
action = remote_execution_pb2.Action()
action_any.Unpack(action)
cert_server = read_file(context.server_cert)
cert_client = read_file(context.client_cert)
key_client = read_file(context.client_key)
# create server credentials
credentials = grpc.ssl_channel_credentials(root_certificates=cert_server,
private_key=key_client,
certificate_chain=cert_client)
channel = grpc.secure_channel('{}:{}'.format(context.remote, context.port), credentials)
stub = bytestream_pb2_grpc.ByteStreamStub(channel)
remote_command = _buildstream_fetch_command(context.local_cas, stub, action.command_digest)
environment = dict((x.name, x.value) for x in remote_command.environment_variables)
logger.debug("command hash: {}".format(action.command_digest.hash))
logger.debug("vdir hash: {}".format(action.input_root_digest.hash))
logger.debug("\n{}".format(' '.join(remote_command.arguments)))
command = ['buildbox',
'--remote={}'.format('https://{}:{}'.format(context.remote, context.port)),
'--server-cert={}'.format(context.server_cert),
'--client-key={}'.format(context.client_key),
'--client-cert={}'.format(context.client_cert),
'--local={}'.format(context.local_cas),
'--chdir={}'.format(environment['PWD']),
context.fuse_dir]
command.extend(remote_command.arguments)
logger.debug(' '.join(command))
logger.debug("Input root digest:\n{}".format(action.input_root_digest))
logger.info("Launching process")
proc = subprocess.Popen(command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
std_send = action.input_root_digest.SerializeToString()
std_out, _ = proc.communicate(std_send)
output_root_digest = remote_execution_pb2.Digest()
output_root_digest.ParseFromString(std_out)
logger.debug("Output root digest: {}".format(output_root_digest))
output_file = remote_execution_pb2.OutputDirectory(tree_digest=output_root_digest)
action_result = remote_execution_pb2.ActionResult()
action_result.output_directories.extend([output_file])
action_result_any = any_pb2.Any()
action_result_any.Pack(action_result)
lease.result.CopyFrom(action_result_any)
return lease
def _buildstream_fetch_blob(remote, digest, out):
resource_name = os.path.join(digest.hash, str(digest.size_bytes))
request = bytestream_pb2.ReadRequest()
request.resource_name = resource_name
request.read_offset = 0
for response in remote.Read(request):
out.write(response.data)
out.flush()
assert digest.size_bytes == os.fstat(out.fileno()).st_size
def _buildstream_fetch_command(casdir, remote, digest):
with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as out:
_buildstream_fetch_blob(remote, digest, out)
remote_command = remote_execution_pb2.Command()
with open(out.name, 'rb') as f:
remote_command.ParseFromString(f.read())
return remote_command
def _buildstream_fetch_action(casdir, remote, digest):
with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as out:
_buildstream_fetch_blob(remote, digest, out)
remote_action = remote_execution_pb2.Action()
with open(out.name, 'rb') as f:
remote_action.ParseFromString(f.read())
return remote_action
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import time
def work_dummy(context, lease):
""" Just returns lease after some random time
"""
time.sleep(random.randint(1, 5))
return lease
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import subprocess
import tempfile
from google.protobuf import any_pb2
from buildgrid.utils import read_file, create_digest, write_fetch_directory, parse_to_pb2_from_fetch
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
def work_temp_directory(context, lease):
""" Bot downloads directories and files into a temp directory,
then uploads results back to CAS
"""
instance_name = context.instance_name
stub_bytestream = bytestream_pb2_grpc.ByteStreamStub(context.channel)
action_digest = remote_execution_pb2.Digest()
lease.payload.Unpack(action_digest)
action = remote_execution_pb2.Action()
action = parse_to_pb2_from_fetch(action, stub_bytestream, action_digest, instance_name)
with tempfile.TemporaryDirectory() as temp_dir:
command = remote_execution_pb2.Command()
command = parse_to_pb2_from_fetch(command, stub_bytestream, action.command_digest, instance_name)
arguments = "cd {} &&".format(temp_dir)
for argument in command.arguments:
arguments += " {}".format(argument)
context.logger.info(arguments)
write_fetch_directory(temp_dir, stub_bytestream, action.input_root_digest, instance_name)
proc = subprocess.Popen(arguments,
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
# TODO: Should return the std_out to the user
proc.communicate()
result = remote_execution_pb2.ActionResult()
requests = []
for output_file in command.output_files:
path = os.path.join(temp_dir, output_file)
chunk = read_file(path)
digest = create_digest(chunk)
result.output_files.extend([remote_execution_pb2.OutputFile(path=output_file,
digest=digest)])
requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
digest=digest, data=chunk))
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
requests=requests)
stub_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
stub_cas.BatchUpdateBlobs(request)
result_any = any_pb2.Any()
result_any.Pack(result)
lease.result.CopyFrom(result_any)
return lease
......@@ -22,23 +22,17 @@ Bot command
Create a bot interface and request work
"""
import asyncio
import logging
import os
import random
import subprocess
import tempfile
from pathlib import Path, PurePath
import click
import grpc
from google.protobuf import any_pb2
from buildgrid.bot import bot, bot_interface
from buildgrid.bot.bot_session import BotSession, Device, Worker
from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from ..bots import buildbox, dummy, temp_directory
from ..cli import pass_context
......@@ -53,6 +47,7 @@ def cli(context, host, port, parent):
context.logger = logging.getLogger(__name__)
context.logger.info("Starting on port {}".format(port))
context.channel = channel
worker = Worker()
worker.add_device(Device())
......@@ -63,16 +58,33 @@ def cli(context, host, port, parent):
context.bot_session = bot_session
@cli.command('dummy', short_help="Create a dummy bot session")
@cli.command('dummy', short_help='Create a dummy bot session which just returns lease')
@pass_context
def dummy(context):
def run_dummy(context):
"""
Simple dummy client. Creates a session, accepts leases, does fake work and
updates the server.
"""
try:
b = bot.Bot(context.bot_session)
b.session(_work_dummy,
b.session(dummy.work_dummy,
context)
except KeyboardInterrupt:
pass
@cli.command('temp-directory', short_help='Runs commands in temp directory and uploads results')
@click.option('--instance-name', default='testing')
@pass_context
def run_temp_directory(context, instance_name):
""" Downloads files and command from CAS and runs
in a temp directory, uploading result back to CAS
"""
context.instance_name = instance_name
try:
b = bot.Bot(context.bot_session)
b.session(temp_directory.work_temp_directory,
context)
except KeyboardInterrupt:
......@@ -88,7 +100,7 @@ def dummy(context):
@click.option('--port', show_default=True, default=11001)
@click.option('--remote', show_default=True, default='localhost')
@pass_context
def work_buildbox(context, remote, port, server_cert, client_key, client_cert, local_cas, fuse_dir):
def run_buildbox(context, remote, port, server_cert, client_key, client_cert, local_cas, fuse_dir):
"""
Uses BuildBox to run commands.
"""
......@@ -104,104 +116,9 @@ def work_buildbox(context, remote, port, server_cert, client_key, client_cert, l
context.fuse_dir = fuse_dir
try:
b = bot.Bot(bot_session=context.bot_session)
b.session(work=_work_buildbox,
b = bot.Bot(context.bot_session)
b.session(work=buildbox.work_buildbox,
context=context)
except KeyboardInterrupt:
pass
async def _work_dummy(context, lease):
await asyncio.sleep(random.randint(1, 5))
return lease
async def _work_buildbox(context, lease):
logger = context.logger
action_any = lease.payload
action = remote_execution_pb2.Action()
action_any.Unpack(action)
cert_server = _file_read(context.server_cert)
cert_client = _file_read(context.client_cert)
key_client = _file_read(context.client_key)
# create server credentials
credentials = grpc.ssl_channel_credentials(root_certificates=cert_server,
private_key=key_client,
certificate_chain=cert_client)
channel = grpc.secure_channel('{}:{}'.format(context.remote, context.port), credentials)
stub = bytestream_pb2_grpc.ByteStreamStub(channel)
remote_command = _fetch_command(context.local_cas, stub, action.command_digest)
environment = dict((x.name, x.value) for x in remote_command.environment_variables)
logger.debug("command hash: {}".format(action.command_digest.hash))
logger.debug("vdir hash: {}".format(action.input_root_digest.hash))
logger.debug("\n{}".format(' '.join(remote_command.arguments)))
command = ['buildbox',
'--remote={}'.format('https://{}:{}'.format(context.remote, context.port)),
'--server-cert={}'.format(context.server_cert),
'--client-key={}'.format(context.client_key),
'--client-cert={}'.format(context.client_cert),
'--local={}'.format(context.local_cas),
'--chdir={}'.format(environment['PWD']),
context.fuse_dir]
command.extend(remote_command.arguments)
logger.debug(' '.join(command))
logger.debug("Input root digest:\n{}".format(action.input_root_digest))
logger.info("Launching process")
proc = subprocess.Popen(command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
std_send = action.input_root_digest.SerializeToString()
std_out, _ = proc.communicate(std_send)
output_root_digest = remote_execution_pb2.Digest()
output_root_digest.ParseFromString(std_out)
logger.debug("Output root digest: {}".format(output_root_digest))
output_file = remote_execution_pb2.OutputDirectory(tree_digest=output_root_digest)
action_result = remote_execution_pb2.ActionResult()
action_result.output_directories.extend([output_file])
action_result_any = any_pb2.Any()
action_result_any.Pack(action_result)
lease.result.CopyFrom(action_result_any)
return lease
def _fetch_blob(remote, digest, out):
resource_name = os.path.join(digest.hash, str(digest.size_bytes))
request = bytestream_pb2.ReadRequest()
request.resource_name = resource_name
request.read_offset = 0
for response in remote.Read(request):
out.write(response.data)
out.flush()
assert digest.size_bytes == os.fstat(out.fileno()).st_size
def _fetch_command(casdir, remote, digest):
with tempfile.NamedTemporaryFile(dir=os.path.join(casdir, 'tmp')) as out:
_fetch_blob(remote, digest, out)
remote_command = remote_execution_pb2.Command()
with open(out.name, 'rb') as f:
remote_command.ParseFromString(f.read())
return remote_command
def _file_read(file_path):
with open(file_path, 'rb') as f:
return f.read()
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Execute command
=================
Request work to be executed and monitor status of jobs.
"""
import logging
import click
import grpc
from buildgrid.utils import merkle_maker, create_digest
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from ..cli import pass_context
@click.group(short_help='Interact with the CAS')
@click.option('--port', default='50051')
@click.option('--host', default='localhost')
@pass_context
def cli(context, host, port):
context.logger = logging.getLogger(__name__)
context.logger.info("Starting on port {}".format(port))
context.channel = grpc.insecure_channel('{}:{}'.format(host, port))
context.port = port
@cli.command('upload-files', short_help='Upload files')
@click.argument('files', nargs=-1, type=click.File('rb'))
@click.option('--instance-name', default='testing')
@pass_context
def upload_files(context, files, instance_name):
stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
requests = []
for file in files:
chunk = file.read()
requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
digest=create_digest(chunk), data=chunk))
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
requests=requests)
context.logger.info("Sending: {}".format(request))
response = stub.BatchUpdateBlobs(request)
context.logger.info("Response: {}".format(response))
@cli.command('upload-dir', short_help='Upload files')
@click.argument('directory')
@click.option('--instance-name', default='testing')
@pass_context
def upload_dir(context, directory, instance_name):
context.logger.info("Uploading directory to cas")
stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel)
requests = []
for chunk, file_digest in merkle_maker(directory):
requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
digest=file_digest, data=chunk))
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
requests=requests)
context.logger.info("Request:\n{}".format(request))
response = stub.BatchUpdateBlobs(request)
context.logger.info("Response:\n{}".format(response))
......@@ -22,18 +22,22 @@ Execute command
Request work to be executed and monitor status of jobs.
"""
import errno
import logging
import stat
import os
import click
import grpc
from buildgrid.utils import merkle_maker, create_digest, write_fetch_blob
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
from buildgrid._protos.google.longrunning import operations_pb2, operations_pb2_grpc
from ..cli import pass_context
@click.group(short_help="Simple execute client")
@click.group(short_help='Simple execute client')
@click.option('--port', default='50051')
@click.option('--host', default='localhost')
@pass_context
......@@ -45,15 +49,15 @@ def cli(context, host, port):
context.port = port
@cli.command('request', short_help="Send a dummy action")
@cli.command('request-dummy', short_help='Send a dummy action')
@click.option('--number', default=1)
@click.option('--instance-name', default='testing')
@click.option('--wait-for-completion', is_flag=True)
@click.option('--wait-for-completion', is_flag=True, help='Stream updates until jobs are completed')
@pass_context
def request(context, number, instance_name, wait_for_completion):
def request_dummy(context, number, instance_name, wait_for_completion):
action_digest = remote_execution_pb2.Digest()
context.logger.info("Sending execution request...\n")
context.logger.info("Sending execution request...")
stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
request = remote_execution_pb2.ExecuteRequest(instance_name=instance_name,
......@@ -72,11 +76,11 @@ def request(context, number, instance_name, wait_for_completion):
context.logger.info(next(response))
@cli.command('status', short_help="Get the status of an operation")
@cli.command('status', short_help='Get the status of an operation')
@click.argument('operation-name')
@pass_context
def operation_status(context, operation_name):
context.logger.info("Getting operation status...\n")
context.logger.info("Getting operation status...")
stub = operations_pb2_grpc.OperationsStub(context.channel)
request = operations_pb2.GetOperationRequest(name=operation_name)
......@@ -85,7 +89,7 @@ def operation_status(context, operation_name):
context.logger.info(response)
@cli.command('list', short_help="List operations")
@cli.command('list', short_help='List operations')
@pass_context
def list_operations(context):
context.logger.info("Getting list of operations")
......@@ -103,7 +107,7 @@ def list_operations(context):
context.logger.info(op)
@cli.command('wait', short_help="Streams an operation until it is complete")
@cli.command('wait', short_help='Streams an operation until it is complete')
@click.argument('operation-name')
@pass_context
def wait_execution(context, operation_name):
......@@ -114,3 +118,86 @@ def wait_execution(context, operation_name):
for stream in response:
context.logger.info(stream)
@cli.command('command', short_help='Send a command to be executed')
@click.argument('input-root')
@click.argument('commands', nargs=-1)
@click.option('--output-file', nargs=2, type=(str, bool), multiple=True,
help='{Expected output file, is_executeable flag}')
@click.option('--output-directory', default='testing', help='Output directory for output files')
@click.option('--instance-name', default='testing')
@pass_context
def command(context, input_root, commands, output_file, output_directory, instance_name):
stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
execute_command = remote_execution_pb2.Command()
for arg in commands:
execute_command.arguments.extend([arg])
output_executeables = []
for file, is_executeable in output_file:
execute_command.output_files.extend([file])
if is_executeable:
output_executeables.append(file)
command_digest = create_digest(execute_command.SerializeToString())
context.logger.info(command_digest)
# TODO: Check for missing blobs
digest = None
for _, digest in merkle_maker(input_root):
pass
action = remote_execution_pb2.Action(command_digest=command_digest,
input_root_digest=digest,
do_not_cache=True)
action_digest = create_digest(action.SerializeToString())
context.logger.info("Sending execution request...")
requests = []
requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
digest=command_digest, data=execute_command.SerializeToString()))
requests.append(remote_execution_pb2.BatchUpdateBlobsRequest.Request(
digest=action_digest, data=action.SerializeToString()))
request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=instance_name,
requests=requests)
remote_execution_pb2_grpc.ContentAddressableStorageStub(context.channel).BatchUpdateBlobs(request)
request = remote_execution_pb2.ExecuteRequest(instance_name=instance_name,
action_digest=action_digest,
skip_cache_lookup=True)
response = stub.Execute(request)
stub = bytestream_pb2_grpc.ByteStreamStub(context.channel)
stream = None
for stream in response:
context.logger.info(stream)
execute_response = remote_execution_pb2.ExecuteResponse()
stream.response.Unpack(execute_response)
for output_file_response in execute_response.result.output_files:
path = os.path.join(output_directory, output_file_response.path)
if not os.path.exists(os.path.dirname(path)):
try:
os.makedirs(os.path.dirname(path))
except OSError as exc:
if exc.errno != errno.EEXIST:
raise
with open(path, 'wb+') as f:
write_fetch_blob(f, stub, output_file_response.digest, instance_name)
if output_file_response.path in output_executeables:
st = os.stat(path)
os.chmod(path, st.st_mode | stat.S_IXUSR)
......@@ -104,14 +104,15 @@ class BotSession:
self._update_lease_from_server(lease)
def update_bot_session(self):
self.logger.debug("Updating bot session: {}".format(self._bot_id))
session = self._interface.update_bot_session(self.get_pb2())
for lease in session.leases:
self._update_lease_from_server(lease)
for k, v in self._leases.items():
for k, v in list(self._leases.items()):
if v.state == LeaseState.COMPLETED.value:
del self._leases[k]
for lease in session.leases:
self._update_lease_from_server(lease)
def get_pb2(self):
leases = list(self._leases.values())
if not leases:
......@@ -134,12 +135,16 @@ class BotSession:
# TODO: Compare with previous state of lease
if lease.state == LeaseState.PENDING.value:
lease.state = LeaseState.ACTIVE.value
asyncio.ensure_future(self.create_work(lease))
self._leases[lease.id] = lease
self.update_bot_session()
asyncio.ensure_future(self.create_work(lease))
async def create_work(self, lease):
self.logger.debug("Work created: {}".format(lease.id))
lease = await self._work(self._context, lease)
loop = asyncio.get_event_loop()
lease = await loop.run_in_executor(None, self._work, self._context, lease)
self.logger.debug("Work complete: {}".format(lease.id))
self.lease_completed(lease)
......
......@@ -21,26 +21,25 @@ from enum import Enum
from google.protobuf import any_pb2
from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteOperationMetadata
from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteResponse
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
from buildgrid._protos.google.longrunning import operations_pb2
class ExecuteStage(Enum):
UNKNOWN = ExecuteOperationMetadata.Stage.Value('UNKNOWN')
UNKNOWN = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('UNKNOWN')
# Checking the result against the cache.
CACHE_CHECK = ExecuteOperationMetadata.Stage.Value('CACHE_CHECK')
CACHE_CHECK = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('CACHE_CHECK')
# Currently idle, awaiting a free machine to execute.
QUEUED = ExecuteOperationMetadata.Stage.Value('QUEUED')
QUEUED = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('QUEUED')
# Currently being executed by a worker.
EXECUTING = ExecuteOperationMetadata.Stage.Value('EXECUTING')
EXECUTING = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('EXECUTING')
# Finished execution.
COMPLETED = ExecuteOperationMetadata.Stage.Value('COMPLETED')
COMPLETED = remote_execution_pb2.ExecuteOperationMetadata.Stage.Value('COMPLETED')
class BotStatus(Enum):
......@@ -80,13 +79,13 @@ class Job:
def __init__(self, action_digest, do_not_cache=False, message_queue=None):
self.lease = None
self.logger = logging.getLogger(__name__)
self.n_tries = 0
self.result = None
self.result_cached = False
self._action_digest = action_digest
self._do_not_cache = do_not_cache
self._execute_stage = ExecuteStage.UNKNOWN
self._n_tries = 0
self._name = str(uuid.uuid4())
self._operation = operations_pb2.Operation(name=self._name)
self._operation_update_queues = []
......@@ -122,15 +121,16 @@ class Job:
self._operation.metadata.CopyFrom(self._pack_any(self.get_operation_meta()))
if self.result is not None:
self._operation.done = True
response = ExecuteResponse()
self.result.Unpack(response.result)
response.cached_result = self.result_cached
action_result = remote_execution_pb2.ActionResult()
self.result.Unpack(action_result)
response = remote_execution_pb2.ExecuteResponse(result=action_result,
cached_result=self.result_cached)
self._operation.response.CopyFrom(self._pack_any(response))
return self._operation
def get_operation_meta(self):
meta = ExecuteOperationMetadata()
meta = remote_execution_pb2.ExecuteOperationMetadata()
meta.stage = self._execute_stage.value
meta.action_digest.CopyFrom(self._action_digest)
......
......@@ -25,7 +25,6 @@ from collections import deque
from google.protobuf import any_pb2
from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ActionResult
from buildgrid._protos.google.longrunning import operations_pb2
from .job import ExecuteStage, LeaseState
......@@ -83,9 +82,7 @@ class Scheduler:
job.update_execute_stage(ExecuteStage.COMPLETED)
self.jobs[name] = job
if not job.do_not_cache and self.action_cache is not None:
action_result = ActionResult()
result.Unpack(action_result)
self.action_cache.put_action_result(job.action_digest, action_result)
self.action_cache.put_action_result(job.action_digest, result)
def get_operations(self):
response = operations_pb2.ListOperationsResponse()
......@@ -94,7 +91,7 @@ class Scheduler:
return response
def update_job_lease_state(self, name, state):
job = self.jobs.get(name)
job = self.jobs[name]
job.lease.state = state
self.jobs[name] = job
......
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from buildgrid.settings import HASH
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid._protos.google.bytestream import bytestream_pb2
def gen_fetch_blob(stub, digest, instance_name=""):
""" Generates byte stream from a fetch blob request
"""
resource_name = os.path.join(instance_name, 'blobs', digest.hash, str(digest.size_bytes))
request = bytestream_pb2.ReadRequest(resource_name=resource_name,
read_offset=0)
for response in stub.Read(request):
yield response.data
def write_fetch_directory(directory, stub, digest, instance_name=""):
""" Given a directory digest, fetches files and writes them to a directory
"""
# TODO: Extend to symlinks and inner directories
# pathlib.Path('/my/directory').mkdir(parents=True, exist_ok=True)
directory_pb2 = remote_execution_pb2.Directory()
directory_pb2 = parse_to_pb2_from_fetch(directory_pb2, stub, digest, instance_name)
for file_node in directory_pb2.files:
path = os.path.join(directory, file_node.name)
with open(path, 'wb') as f:
write_fetch_blob(f, stub, file_node.digest, instance_name)
def write_fetch_blob(out, stub, digest, instance_name=""):
""" Given an output buffer, fetches blob and writes to buffer
"""
for stream in gen_fetch_blob(stub, digest, instance_name):
out.write(stream)
out.flush()
assert digest.size_bytes == os.fstat(out.fileno()).st_size
def parse_to_pb2_from_fetch(pb2, stub, digest, instance_name=""):
""" Fetches stream and parses it into given pb2
"""
stream_bytes = b''
for stream in gen_fetch_blob(stub, digest, instance_name):
stream_bytes += stream
pb2.ParseFromString(stream_bytes)
return pb2
def create_digest(bytes_to_digest):
""" Creates a hash based on the hex digest and returns the digest
"""
return remote_execution_pb2.Digest(hash=HASH(bytes_to_digest).hexdigest(),
size_bytes=len(bytes_to_digest))
def merkle_maker(directory):
""" Walks thorugh given directory, yielding the binary and digest
"""
directory_pb2 = remote_execution_pb2.Directory()
for (dir_path, dir_names, file_names) in os.walk(directory):
for file_name in file_names:
file_path = os.path.join(dir_path, file_name)
chunk = read_file(file_path)
file_digest = create_digest(chunk)
directory_pb2.files.extend([file_maker(file_path, file_digest)])
yield chunk, file_digest
for inner_dir in dir_names:
inner_dir_path = os.path.join(dir_path, inner_dir)
yield from merkle_maker(inner_dir_path)
directory_string = directory_pb2.SerializeToString()
yield directory_string, create_digest(directory_string)
def file_maker(file_path, file_digest):
""" Creates a File Node
"""
_, file_name = os.path.split(file_path)
return remote_execution_pb2.FileNode(name=file_name,
digest=file_digest,
is_executable=os.access(file_path, os.X_OK))
def read_file(read):
with open(read, 'rb') as f:
return f.read()
# Minimal makefile for Sphinx documentation
#
# You can set these variables from the command line.
SPHINXOPTS =
SPHINXBUILD = sphinx-build
SPHINXPROJ = BuildGrid
SOURCEDIR = source
BUILDDIR = build
# Put it first so that "make" without argument is like "make help".
help:
@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
.PHONY: help Makefile
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# pylint: skip-file
#
# Configuration file for the Sphinx documentation builder.
#
# This file does only contain a selection of the most common options. For a
# full list see the documentation:
# http://www.sphinx-doc.org/en/master/config
# -- Path setup --------------------------------------------------------------
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#
import os
import sys
sys.path.insert(0, os.path.abspath('..'))
from _version import __version__
# -- Project information -----------------------------------------------------
project = 'BuildGrid'
copyright = '2018, The BuildGrid Contributors'
author = 'The BuildGrid Contributors'
# The short X.Y version
version = __version__
# The full version, including alpha/beta/rc tags
release = __version__
# -- General configuration ---------------------------------------------------
# If your documentation needs a minimal Sphinx version, state it here.
#
# needs_sphinx = '1.0'
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.napoleon',
'sphinx_click.ext'
]
# Add any paths that contain templates here, relative to this directory.
templates_path = ['templates']
# The suffix(es) of source filenames.
# You can specify multiple suffix as a list of string:
#
# source_suffix = ['.rst', '.md']
source_suffix = '.rst'
# The master toctree document.
master_doc = 'index'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
#
# This is also used if you do content translation via gettext catalogs.
# Usually you set "language" from the command line for these cases.
language = None
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This pattern also affects html_static_path and html_extra_path .
exclude_patterns = []
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
# -- Options for HTML output -------------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
html_theme = 'sphinx_rtd_theme'
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
#
# html_theme_options = {}
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
# html_static_path = ['static']
# Custom sidebar templates, must be a dictionary that maps document names
# to template names.
#
# The default sidebars (for documents that don't match any pattern) are
# defined by theme itself. Builtin themes are using these templates by
# default: ``['localtoc.html', 'relations.html', 'sourcelink.html',
# 'searchbox.html']``.
#
# html_sidebars = {}
# -- Options for HTMLHelp output ---------------------------------------------
# Output file base name for HTML help builder.
htmlhelp_basename = 'BuildGriddoc'
# -- Options for LaTeX output ------------------------------------------------
latex_elements = {
# The paper size ('letterpaper' or 'a4paper').
#
# 'papersize': 'letterpaper',
# The font size ('10pt', '11pt' or '12pt').
#
# 'pointsize': '10pt',
# Additional stuff for the LaTeX preamble.
#
# 'preamble': '',
# Latex figure (float) alignment
#
# 'figure_align': 'htbp',
}
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title,
# author, documentclass [howto, manual, or own class]).
latex_documents = [
(master_doc, 'BuildGrid.tex', 'BuildGrid Documentation',
'The BuildGrid Contributors', 'manual'),
]
# -- Options for manual page output ------------------------------------------
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
(master_doc, 'buildgrid', 'BuildGrid Documentation',
[author], 1)
]
# -- Options for Texinfo output ----------------------------------------------
# Grouping the document tree into Texinfo files. List of tuples
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
(master_doc, 'BuildGrid', 'BuildGrid Documentation',
author, 'BuildGrid', 'One line description of project.',
'Miscellaneous'),
]
.. BuildGrid documentation master file, created by
sphinx-quickstart on Tue Aug 14 16:51:30 2018.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
BuildGrid's documentation
=========================
.. toctree::
:maxdepth: 1
:caption: Contents:
Resources
---------
* Homepage: https://buildgrid.build
* GitLab repository: https://gitlab.com/BuildGrid/buildgrid
* Bug Tracking: https://gitlab.com/BuildGrid/buildgrid/issues
* Mailing list: https://lists.buildgrid.build/cgi-bin/mailman/listinfo/buildgrid
* IRC Channel: irc://chat.freenode.net/#buildgrid
......@@ -11,5 +11,6 @@ pep8ignore =
*/lib/python3* ALL
*/bin/* ALL
.eggs/* ALL
docs/source/conf.py ALL
*_pb2.py ALL
*_pb2_grpc.py ALL
......@@ -89,12 +89,20 @@ tests_require = [
'coverage == 4.4.0',
'moto',
'pep8',
'pytest >= 3.1.0',
'pytest == 3.6.4',
'pytest-cov >= 2.5.0',
'pytest-pep8',
'pytest-pylint',
]
docs_require = [
'sphinx',
'sphinx-click',
'sphinx-rtd-theme',
'sphinxcontrib-apidoc',
'sphinxcontrib-napoleon',
]
setup(
name="BuildGrid",
version=__version__,
......@@ -117,6 +125,7 @@ setup(
setup_requires=['pytest-runner'],
tests_require=tests_require,
extras_require={
'devel': tests_require,
'docs': docs_require,
'tests': tests_require,
},
)
......@@ -15,9 +15,11 @@
# Authors:
# Carter Sande <csande@bloomberg.net>
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
# pylint: disable=redefined-outer-name
import pytest
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid.server import action_cache
from buildgrid.server.cas.storage import lru_memory_cache
......