Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • edbaunton/buildgrid
  • BuildGrid/buildgrid
  • bloomberg/buildgrid
  • devcurmudgeon/buildgrid
  • mhadjimichael/buildgrid
  • jmacarthur/buildgrid
  • rkothur/buildgrid
  • valentindavid/buildgrid
  • jjardon/buildgrid
  • RichKen/buildgrid
  • jbonney/buildgrid
  • onsha_alexander/buildgrid
  • santigl/buildgrid
  • mostynb/buildgrid
  • hoffbrinkle/buildgrid
  • Malinskiy/buildgrid
  • coldtom/buildgrid
  • azeemb_a/buildgrid
  • pointswaves/buildgrid
  • BenjaminSchubert/buildgrid
  • michaellee8/buildgrid
  • anil-anil/buildgrid
  • seanborg/buildgrid
  • jdelong12/buildgrid
  • jclay/buildgrid
  • bweston92/buildgrid
  • zchen723/buildgrid
  • cpratt34/buildgrid
  • armbiant/apache-buildgrid
  • armbiant/android-buildgrid
  • itsme300/buildgrid
  • sbairoliya/buildgrid
32 results
Show changes
Commits on Source (4)
Showing
with 335 additions and 91 deletions
......@@ -28,11 +28,11 @@ import logging
import click
from buildgrid.server import build_grid_server
from buildgrid.server.action_cache import ActionCache
from buildgrid.server.cas.storage.disk import DiskStorage
from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
from buildgrid.server.cas.storage.s3 import S3Storage
from buildgrid.server.cas.storage.with_cache import WithCacheStorage
from buildgrid.server.execution.action_cache import ActionCache
from ..cli import pass_context
......@@ -72,36 +72,31 @@ def cli(context):
def start(context, port, max_cached_actions, allow_uar, cas, **cas_args):
context.logger.info("Starting on port {}".format(port))
loop = asyncio.get_event_loop()
cas_storage = _make_cas_storage(context, cas, cas_args)
if cas_storage is None:
context.logger.info("Running without CAS - action cache will be unavailable")
action_cache = None
else:
action_cache = ActionCache(cas_storage, max_cached_actions)
action_cache = ActionCache(cas_storage, max_cached_actions, allow_uar)
server = build_grid_server.BuildGridServer(port,
cas_storage=cas_storage,
action_cache=action_cache,
allow_update_action_result=allow_uar)
action_cache=action_cache)
loop = asyncio.get_event_loop()
try:
asyncio.ensure_future(server.start())
server.start()
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(server.stop())
server.stop()
loop.close()
@cli.command('stop', short_help="Request a server to teardown.")
@pass_context
def stop(context):
context.logger.error("Not implemented yet")
def _make_cas_storage(context, cas_type, cas_args):
"""Returns the storage provider corresponding to the given `cas_type`,
or None if the provider cannot be created.
......
from ..._exceptions import BgdError, ErrorDomain
from .._exceptions import BgdError, ErrorDomain
class InvalidArgumentError(BgdError):
"""
A bad argument was passed, such as a name which doesn't exist.
A bad argument was passed, such as a name which doesn't exist
"""
def __init__(self, message, detail=None, reason=None):
super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
class NotFoundError(BgdError):
"""
Requested resource not found
"""
def __init__(self, message, detail=None, reason=None):
super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
......
......@@ -44,8 +44,7 @@ from .worker.bots_interface import BotsInterface
class BuildGridServer:
def __init__(self, port='50051', max_workers=10, cas_storage=None,
action_cache=None, allow_update_action_result=True):
def __init__(self, port='50051', max_workers=10, cas_storage=None, action_cache=None):
port = '[::]:{0}'.format(port)
scheduler = Scheduler(action_cache)
bots_interface = BotsInterface(scheduler)
......@@ -68,13 +67,12 @@ class BuildGridServer:
bytestream_pb2_grpc.add_ByteStreamServicer_to_server(ByteStreamService(cas_storage),
self._server)
if action_cache is not None:
action_cache_service = ActionCacheService(action_cache,
allow_update_action_result)
action_cache_service = ActionCacheService(action_cache)
remote_execution_pb2_grpc.add_ActionCacheServicer_to_server(action_cache_service,
self._server)
async def start(self):
def start(self):
self._server.start()
async def stop(self):
def stop(self):
self._server.stop(0)
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Reference Cache
==================
Implements an in-memory reference cache.
For a given key, it
"""
import collections
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from .._exceptions import NotFoundError
class ReferenceCache:
def __init__(self, storage, max_cached_refs, allow_updates=True):
""" Initialises a new ReferenceCache instance.
Args:
storage (StorageABC): storage backend instance to be used.
max_cached_refs (int): maximum number of entries to be stored.
allow_updates (bool): allow the client to write to storage
"""
self._allow_updates = allow_updates
self._storage = storage
self._max_cached_refs = max_cached_refs
self._digest_map = collections.OrderedDict()
@property
def allow_updates(self):
return self._allow_updates
def get_digest_reference(self, key):
"""Retrieves the cached Digest for the given key.
Args:
key: key for Digest to query.
Returns:
The cached Digest matching the given key or raises
NotFoundError.
"""
if key in self._digest_map:
reference_result = self._storage.get_message(self._digest_map[key], remote_execution_pb2.Digest)
if reference_result is not None:
return reference_result
del self._digest_map[key]
raise NotFoundError("Key not found: {}".format(key))
def get_action_reference(self, key):
"""Retrieves the cached ActionResult for the given Action digest.
Args:
key: key for ActionResult to query.
Returns:
The cached ActionResult matching the given key or raises
NotFoundError.
"""
if key in self._digest_map:
reference_result = self._storage.get_message(self._digest_map[key], remote_execution_pb2.ActionResult)
if reference_result is not None:
if self._action_result_blobs_still_exist(reference_result):
self._digest_map.move_to_end(key)
return reference_result
del self._digest_map[key]
raise NotFoundError("Key not found: {}".format(key))
def update_reference(self, key, result):
"""Stores the result in cache for the given key.
If the cache size limit has been reached, the oldest cache entries will
be dropped before insertion so that the cache size never exceeds the
maximum numbers of entries allowed.
Args:
key: key to store result.
result (Digest): result digest to store.
"""
if not self._allow_updates:
raise NotImplementedError("Updating cache not allowed")
if self._max_cached_refs == 0:
return
while len(self._digest_map) >= self._max_cached_refs:
self._digest_map.popitem(last=False)
result_digest = self._storage.put_message(result)
self._digest_map[key] = result_digest
def _action_result_blobs_still_exist(self, action_result):
"""Checks CAS for ActionResult output blobs existance.
Args:
action_result (ActionResult): ActionResult to search referenced
output blobs for.
Returns:
True if all referenced blobs are present in CAS, False otherwise.
"""
blobs_needed = []
for output_file in action_result.output_files:
blobs_needed.append(output_file.digest)
for output_directory in action_result.output_directories:
blobs_needed.append(output_directory.tree_digest)
tree = self._storage.get_message(output_directory.tree_digest,
remote_execution_pb2.Tree)
if tree is None:
return False
for file_node in tree.root.files:
blobs_needed.append(file_node.digest)
for child in tree.children:
for file_node in child.files:
blobs_needed.append(file_node.digest)
if action_result.stdout_digest.hash and not action_result.stdout_raw:
blobs_needed.append(action_result.stdout_digest)
if action_result.stderr_digest.hash and not action_result.stderr_raw:
blobs_needed.append(action_result.stderr_digest)
missing = self._storage.missing_blobs(blobs_needed)
return len(missing) == 0
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import grpc
from buildgrid._protos.buildstream.v2 import buildstream_pb2
from buildgrid._protos.buildstream.v2 import buildstream_pb2_grpc
from .._exceptions import NotFoundError
class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer):
def __init__(self, reference_cache):
self._reference_cache = reference_cache
self.logger = logging.getLogger(__name__)
def GetReference(self, request, context):
try:
response = buildstream_pb2.GetReferenceResponse()
response.digest.CopyFrom(self._reference_cache.get_digest_reference(request.key))
return response
except NotFoundError:
context.set_code(grpc.StatusCode.NOT_FOUND)
def UpdateReference(self, request, context):
try:
for key in request.keys:
self._reference_cache.update_reference(key, request.digest)
return buildstream_pb2.UpdateReferenceResponse()
except NotImplementedError:
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
def Status(self, request, context):
return buildstream_pb2.StatusResponse(self._reference_cache.allow_updates)
from ..._exceptions import BgdError, ErrorDomain
class InvalidArgumentError(BgdError):
"""
A bad argument was passed, such as a name which doesn't exist
"""
def __init__(self, message, detail=None, reason=None):
super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Action Cache
============
Implements an in-memory action Cache
"""
from ..cas.reference_cache import ReferenceCache
class ActionCache(ReferenceCache):
def get_action_result(self, action_digest):
key = self._get_key(action_digest)
return self.get_action_reference(key)
def update_action_result(self, action_digest, action_result):
key = self._get_key(action_digest)
self.update_reference(key, action_result)
def _get_key(self, action_digest):
return (action_digest.hash, action_digest.size_bytes)
......@@ -26,27 +26,28 @@ import logging
import grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
from .._exceptions import NotFoundError
class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer):
def __init__(self, action_cache, allow_updates=True):
def __init__(self, action_cache):
self._action_cache = action_cache
self._allow_updates = allow_updates
self.logger = logging.getLogger(__name__)
def GetActionResult(self, request, context):
result = self._action_cache.get_action_result(request.action_digest)
if result is None:
try:
return self._action_cache.get_action_result(request.action_digest)
except NotFoundError:
context.set_code(grpc.StatusCode.NOT_FOUND)
return remote_execution_pb2.ActionResult()
return result
def UpdateActionResult(self, request, context):
if not self._allow_updates:
try:
self._action_cache.update_action_result(request.action_digest, request.action_result)
return request.action_result
except NotImplementedError:
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
return remote_execution_pb2.ActionResult()
self._action_cache.put_action_result(request.action_digest, request.action_result)
return request.action_result
......@@ -25,7 +25,7 @@ import logging
from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
from ._exceptions import InvalidArgumentError
from .._exceptions import InvalidArgumentError
from ..job import Job
......
......@@ -30,7 +30,7 @@ import grpc
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
from ._exceptions import InvalidArgumentError
from .._exceptions import InvalidArgumentError
class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
......
......@@ -27,7 +27,7 @@ import grpc
from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2
from ._exceptions import InvalidArgumentError
from .._exceptions import InvalidArgumentError
class OperationsService(operations_pb2_grpc.OperationsServicer):
......
......@@ -25,6 +25,8 @@ from collections import deque
from google.protobuf import any_pb2
from buildgrid.server._exceptions import NotFoundError
from buildgrid._protos.google.longrunning import operations_pb2
from .job import ExecuteStage, LeaseState
......@@ -35,7 +37,7 @@ class Scheduler:
MAX_N_TRIES = 5
def __init__(self, action_cache=None):
self.action_cache = action_cache
self._action_cache = action_cache
self.jobs = {}
self.queue = deque()
......@@ -50,17 +52,23 @@ class Scheduler:
def append_job(self, job, skip_cache_lookup=False):
self.jobs[job.name] = job
if self.action_cache is not None and not skip_cache_lookup:
cached_result = self.action_cache.get_action_result(job.action_digest)
if cached_result is not None:
if self._action_cache is not None and not skip_cache_lookup:
try:
cached_result = self._action_cache.get_action_result(job.action_digest)
except NotFoundError:
self.queue.append(job)
job.update_execute_stage(ExecuteStage.QUEUED)
else:
cached_result_any = any_pb2.Any()
cached_result_any.Pack(cached_result)
job.result = cached_result_any
job.result_cached = True
job.update_execute_stage(ExecuteStage.COMPLETED)
return
self.queue.append(job)
job.update_execute_stage(ExecuteStage.QUEUED)
else:
self.queue.append(job)
job.update_execute_stage(ExecuteStage.QUEUED)
def retry_job(self, name):
if name in self.jobs:
......@@ -81,8 +89,8 @@ class Scheduler:
job.result = result
job.update_execute_stage(ExecuteStage.COMPLETED)
self.jobs[name] = job
if not job.do_not_cache and self.action_cache is not None:
self.action_cache.put_action_result(job.action_digest, result)
if not job.do_not_cache and self._action_cache is not None:
self._action_cache.put_action_result(job.action_digest, result)
def get_operations(self):
response = operations_pb2.ListOperationsResponse()
......
......@@ -25,7 +25,7 @@ Instance of the Remote Workers interface.
import logging
import uuid
from ._exceptions import InvalidArgumentError, OutofSyncError
from .._exceptions import InvalidArgumentError, OutofSyncError
from ..job import LeaseState
......
......@@ -27,7 +27,7 @@ import grpc
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
from ._exceptions import InvalidArgumentError, OutofSyncError
from .._exceptions import InvalidArgumentError, OutofSyncError
class BotsService(bots_pb2_grpc.BotsServicer):
......
......@@ -117,10 +117,3 @@ BuildGrid's Command Line Interface (CLI) reference documentation.
.. click:: app.commands.cmd_server:start
:prog: bgd server start
----
.. _invoking_bgd_server_stop:
.. click:: app.commands.cmd_server:stop
:prog: bgd server stop
......@@ -11,17 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Authors:
# Carter Sande <csande@bloomberg.net>
# pylint: disable=redefined-outer-name
import pytest
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid.server import action_cache
from buildgrid.server.cas.storage import lru_memory_cache
from buildgrid.server.execution import action_cache
from buildgrid.server._exceptions import NotFoundError
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
@pytest.fixture
......@@ -35,8 +34,9 @@ def test_null_action_cache(cas):
action_digest1 = remote_execution_pb2.Digest(hash='alpha', size_bytes=4)
dummy_result = remote_execution_pb2.ActionResult()
cache.put_action_result(action_digest1, dummy_result)
assert cache.get_action_result(action_digest1) is None
cache.update_action_result(action_digest1, dummy_result)
with pytest.raises(NotFoundError):
cache.get_action_result(action_digest1)
def test_action_cache_expiry(cas):
......@@ -47,16 +47,18 @@ def test_action_cache_expiry(cas):
action_digest3 = remote_execution_pb2.Digest(hash='charlie', size_bytes=4)
dummy_result = remote_execution_pb2.ActionResult()
cache.put_action_result(action_digest1, dummy_result)
cache.put_action_result(action_digest2, dummy_result)
cache.update_action_result(action_digest1, dummy_result)
cache.update_action_result(action_digest2, dummy_result)
# Get digest 1 (making 2 the least recently used)
assert cache.get_action_result(action_digest1) is not None
# Add digest 3 (so 2 gets removed from the cache)
cache.put_action_result(action_digest3, dummy_result)
cache.update_action_result(action_digest3, dummy_result)
assert cache.get_action_result(action_digest1) is not None
assert cache.get_action_result(action_digest2) is None
with pytest.raises(NotFoundError):
cache.get_action_result(action_digest2)
assert cache.get_action_result(action_digest3) is not None
......@@ -67,34 +69,35 @@ def test_action_cache_checks_cas(cas):
action_digest2 = remote_execution_pb2.Digest(hash='bravo', size_bytes=4)
action_digest3 = remote_execution_pb2.Digest(hash='charlie', size_bytes=4)
# Create a tree that references digests in CAS
# Create a tree that actions digests in CAS
sample_digest = cas.put_message(remote_execution_pb2.Command(arguments=["sample"]))
tree = remote_execution_pb2.Tree()
tree.root.files.add().digest.CopyFrom(sample_digest)
tree.children.add().files.add().digest.CopyFrom(sample_digest)
tree_digest = cas.put_message(tree)
# Add an ActionResult that references real digests to the cache
# Add an ActionResult that actions real digests to the cache
action_result1 = remote_execution_pb2.ActionResult()
action_result1.output_directories.add().tree_digest.CopyFrom(tree_digest)
action_result1.output_files.add().digest.CopyFrom(sample_digest)
action_result1.stdout_digest.CopyFrom(sample_digest)
action_result1.stderr_digest.CopyFrom(sample_digest)
cache.put_action_result(action_digest1, action_result1)
cache.update_action_result(action_digest1, action_result1)
# Add ActionResults that reference fake digests to the cache
# Add ActionResults that action fake digests to the cache
action_result2 = remote_execution_pb2.ActionResult()
action_result2.output_directories.add().tree_digest.hash = "nonexistent"
action_result2.output_directories[0].tree_digest.size_bytes = 8
cache.put_action_result(action_digest2, action_result2)
cache.update_action_result(action_digest2, action_result2)
action_result3 = remote_execution_pb2.ActionResult()
action_result3.stdout_digest.hash = "nonexistent"
action_result3.stdout_digest.size_bytes = 8
cache.put_action_result(action_digest3, action_result3)
cache.update_action_result(action_digest3, action_result3)
# Verify we can get the first ActionResult but not the others
fetched_result1 = cache.get_action_result(action_digest1)
assert fetched_result1.output_directories[0].tree_digest.hash == tree_digest.hash
assert cache.get_action_result(action_digest2) is None
assert cache.get_action_result(action_digest3) is None
with pytest.raises(NotFoundError):
cache.get_action_result(action_digest2)
cache.get_action_result(action_digest3)
......@@ -20,9 +20,10 @@
import tempfile
import boto3
from moto import mock_s3
import pytest
from moto import mock_s3
from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
from buildgrid.server.cas.storage.disk import DiskStorage
......
......@@ -23,10 +23,10 @@ import grpc
from grpc._server import _Context
import pytest
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid.server import action_cache
from buildgrid.server.cas.storage import lru_memory_cache
from buildgrid.server.execution import action_cache_service
from buildgrid.server.execution import action_cache, action_cache_service
# Can mock this
......@@ -67,7 +67,8 @@ def test_simple_action_result(cache, context):
def test_disabled_update_action_result(cache, context):
service = action_cache_service.ActionCacheService(cache, False)
disabled_push = action_cache.ActionCache(cas, 50, False)
service = action_cache_service.ActionCacheService(disabled_push)
request = remote_execution_pb2.UpdateActionResultRequest()
service.UpdateActionResult(request, context)
......
......@@ -25,9 +25,10 @@ import pytest
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid._protos.google.longrunning import operations_pb2
from buildgrid.server import action_cache, scheduler, job
from buildgrid.server import scheduler, job
from buildgrid.server.cas.storage import lru_memory_cache
from buildgrid.server.execution import execution_instance, execution_service
from buildgrid.server.execution import action_cache, execution_instance, execution_service
@pytest.fixture
......
......@@ -27,8 +27,10 @@ from google.protobuf import any_pb2
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildgrid._protos.google.longrunning import operations_pb2
from buildgrid.server import scheduler
from buildgrid.server.execution._exceptions import InvalidArgumentError
from buildgrid.server._exceptions import InvalidArgumentError
from buildgrid.server.execution import execution_instance, operations_service
......