Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • master
  • sminskyprimu/blake3
  • sotk/consolidate-leases-and-jobs/p2-write-only-leases
  • sotk/logstream-testing
  • zchen723/skip-scheduler-metrics
  • 0.0.10
  • 0.0.11
  • 0.0.12
  • 0.0.13
  • 0.0.14
  • 0.0.16
  • 0.0.17
  • 0.0.19
  • 0.0.2
  • 0.0.20
  • 0.0.21
  • 0.0.23
  • 0.0.25
  • 0.0.26
  • 0.0.27
  • 0.0.28
  • 0.0.29
  • 0.0.3
  • 0.0.30
  • 0.0.31
  • 0.0.32
  • 0.0.33
  • 0.0.34
  • 0.0.35
  • 0.0.36
  • 0.0.37
  • 0.0.38
  • 0.0.39
  • 0.0.4
  • 0.0.40
  • 0.0.41
  • 0.0.42
  • 0.0.43
  • 0.0.44
  • 0.0.45
  • 0.0.46
  • 0.0.47
  • 0.0.48
  • 0.0.49
  • 0.0.5
  • 0.0.50
  • 0.0.51
  • 0.0.52
  • 0.0.53
  • 0.0.54
  • 0.0.55
  • 0.0.56
  • 0.0.57
  • 0.0.58
  • 0.0.59
  • 0.0.6
  • 0.0.60
  • 0.0.61
  • 0.0.62
  • 0.0.63
  • 0.0.64
  • 0.0.65
  • 0.0.66
  • 0.0.67
  • 0.0.68
  • 0.0.69
  • 0.0.7
  • 0.0.70
  • 0.0.71
  • 0.0.72
  • 0.0.73
  • 0.0.74
  • 0.0.75
  • 0.0.76
  • 0.0.78
  • 0.0.79
  • 0.0.8
  • 0.0.80
  • 0.0.81
  • 0.0.82
  • 0.0.83
  • 0.0.84
  • 0.0.85
  • 0.0.86
  • 0.0.87
  • 0.0.88
  • 0.0.89
  • 0.0.9
  • 0.0.90
  • 0.0.91
  • 0.0.92
  • 0.0.93
  • 0.0.94
  • 0.0.95
  • 0.0.96
  • 0.0.97
  • 0.0.98
  • 0.1.0
  • 0.1.1
  • 0.1.10
  • 0.1.11
  • 0.1.12
  • 0.1.13
  • 0.1.14
  • 0.1.15
105 results

Target

Select target project
  • edbaunton/buildgrid
  • BuildGrid/buildgrid
  • bloomberg/buildgrid
  • devcurmudgeon/buildgrid
  • mhadjimichael/buildgrid
  • jmacarthur/buildgrid
  • rkothur/buildgrid
  • valentindavid/buildgrid
  • jjardon/buildgrid
  • RichKen/buildgrid
  • jbonney/buildgrid
  • onsha_alexander/buildgrid
  • santigl/buildgrid
  • mostynb/buildgrid
  • hoffbrinkle/buildgrid
  • Malinskiy/buildgrid
  • coldtom/buildgrid
  • azeemb_a/buildgrid
  • pointswaves/buildgrid
  • BenjaminSchubert/buildgrid
  • michaellee8/buildgrid
  • anil-anil/buildgrid
  • seanborg/buildgrid
  • jdelong12/buildgrid
  • jclay/buildgrid
  • bweston92/buildgrid
  • zchen723/buildgrid
  • cpratt34/buildgrid
  • armbiant/apache-buildgrid
  • armbiant/android-buildgrid
  • itsme300/buildgrid
  • sbairoliya/buildgrid
32 results
Select Git revision
  • master
  • sminskyprimu/blake3
  • sotk/consolidate-leases-and-jobs/p2-write-only-leases
  • sotk/logstream-testing
  • zchen723/skip-scheduler-metrics
  • 0.0.10
  • 0.0.11
  • 0.0.12
  • 0.0.13
  • 0.0.14
  • 0.0.16
  • 0.0.17
  • 0.0.19
  • 0.0.2
  • 0.0.20
  • 0.0.21
  • 0.0.23
  • 0.0.25
  • 0.0.26
  • 0.0.27
  • 0.0.28
  • 0.0.29
  • 0.0.3
  • 0.0.30
  • 0.0.31
  • 0.0.32
  • 0.0.33
  • 0.0.34
  • 0.0.35
  • 0.0.36
  • 0.0.37
  • 0.0.38
  • 0.0.39
  • 0.0.4
  • 0.0.40
  • 0.0.41
  • 0.0.42
  • 0.0.43
  • 0.0.44
  • 0.0.45
  • 0.0.46
  • 0.0.47
  • 0.0.48
  • 0.0.49
  • 0.0.5
  • 0.0.50
  • 0.0.51
  • 0.0.52
  • 0.0.53
  • 0.0.54
  • 0.0.55
  • 0.0.56
  • 0.0.57
  • 0.0.58
  • 0.0.59
  • 0.0.6
  • 0.0.60
  • 0.0.61
  • 0.0.62
  • 0.0.63
  • 0.0.64
  • 0.0.65
  • 0.0.66
  • 0.0.67
  • 0.0.68
  • 0.0.69
  • 0.0.7
  • 0.0.70
  • 0.0.71
  • 0.0.72
  • 0.0.73
  • 0.0.74
  • 0.0.75
  • 0.0.76
  • 0.0.78
  • 0.0.79
  • 0.0.8
  • 0.0.80
  • 0.0.81
  • 0.0.82
  • 0.0.83
  • 0.0.84
  • 0.0.85
  • 0.0.86
  • 0.0.87
  • 0.0.88
  • 0.0.89
  • 0.0.9
  • 0.0.90
  • 0.0.91
  • 0.0.92
  • 0.0.93
  • 0.0.94
  • 0.0.95
  • 0.0.96
  • 0.0.97
  • 0.0.98
  • 0.1.0
  • 0.1.1
  • 0.1.10
  • 0.1.11
  • 0.1.12
  • 0.1.13
  • 0.1.14
  • 0.1.15
105 results
Show changes
Commits on Source (11)
......@@ -460,6 +460,7 @@ known-third-party=boto3,
enchant,
google,
grpc,
janus,
moto,
yaml
......@@ -523,4 +524,4 @@ valid-metaclass-classmethod-first-arg=mcs
# Exceptions that will emit a warning when being caught. Defaults to
# "Exception"
overgeneral-exceptions=Exception
overgeneral-exceptions=Exception
\ No newline at end of file
......@@ -20,7 +20,6 @@ Server command
Create a BuildGrid server.
"""
import asyncio
import logging
import sys
......@@ -52,18 +51,14 @@ def start(context, config):
click.echo("ERROR: Could not parse config: {}.\n".format(str(e)), err=True)
sys.exit(-1)
loop = asyncio.get_event_loop()
try:
server.start()
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
context.logger.info("Stopping server")
server.stop()
loop.close()
def _create_server_from_config(config):
......
......@@ -171,7 +171,7 @@ class Downloader:
return messages
def download_file(self, digest, file_path, queue=True):
def download_file(self, digest, file_path, is_executable=False, queue=True):
"""Retrieves a file from the remote CAS server.
If queuing is allowed (`queue=True`), the download request **may** be
......@@ -181,6 +181,7 @@ class Downloader:
Args:
digest (:obj:`Digest`): the file's digest to fetch.
file_path (str): absolute or relative path to the local file to write.
is_executable (bool): whether the file is executable or not.
queue (bool, optional): whether or not the download request may be
queued and submitted as part of a batch upload request. Defaults
to True.
......@@ -193,9 +194,9 @@ class Downloader:
file_path = os.path.abspath(file_path)
if not queue or digest.size_bytes > FILE_SIZE_THRESHOLD:
self._fetch_file(digest, file_path)
self._fetch_file(digest, file_path, is_executable=is_executable)
else:
self._queue_file(digest, file_path)
self._queue_file(digest, file_path, is_executable=is_executable)
def download_directory(self, digest, directory_path):
"""Retrieves a :obj:`Directory` from the remote CAS server.
......@@ -311,7 +312,7 @@ class Downloader:
return read_blobs
def _fetch_file(self, digest, file_path):
def _fetch_file(self, digest, file_path, is_executable=False):
"""Fetches a file using ByteStream.Read()"""
if self.instance_name:
resource_name = '/'.join([self.instance_name, 'blobs',
......@@ -332,7 +333,10 @@ class Downloader:
assert byte_file.tell() == digest.size_bytes
def _queue_file(self, digest, file_path):
if is_executable:
os.chmod(file_path, 0o755) # rwxr-xr-x / 755
def _queue_file(self, digest, file_path, is_executable=False):
"""Queues a file for later batch download"""
if self.__file_request_size + digest.ByteSize() > MAX_REQUEST_SIZE:
self.flush()
......@@ -341,22 +345,25 @@ class Downloader:
elif self.__file_request_count >= MAX_REQUEST_COUNT:
self.flush()
self.__file_requests[digest.hash] = (digest, file_path)
self.__file_requests[digest.hash] = (digest, file_path, is_executable)
self.__file_request_count += 1
self.__file_request_size += digest.ByteSize()
self.__file_response_size += digest.size_bytes
def _fetch_file_batch(self, batch):
"""Sends queued data using ContentAddressableStorage.BatchReadBlobs()"""
batch_digests = [digest for digest, _ in batch.values()]
batch_digests = [digest for digest, _, _ in batch.values()]
batch_blobs = self._fetch_blob_batch(batch_digests)
for (_, file_path), file_blob in zip(batch.values(), batch_blobs):
for (_, file_path, is_executable), file_blob in zip(batch.values(), batch_blobs):
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, 'wb') as byte_file:
byte_file.write(file_blob)
if is_executable:
os.chmod(file_path, 0o755) # rwxr-xr-x / 755
def _fetch_directory(self, digest, directory_path):
"""Fetches a file using ByteStream.GetTree()"""
# Better fail early if the local root path cannot be created:
......@@ -414,7 +421,7 @@ class Downloader:
for file_node in root_directory.files:
file_path = os.path.join(root_path, file_node.name)
self._queue_file(file_node.digest, file_path)
self._queue_file(file_node.digest, file_path, is_executable=file_node.is_executable)
for directory_node in root_directory.directories:
directory_path = os.path.join(root_path, directory_node.name)
......
......@@ -37,6 +37,10 @@ class BotsInterface:
self._bot_sessions = {}
self._scheduler = scheduler
@property
def scheduler(self):
return self._scheduler
def register_instance_with_server(self, instance_name, server):
server.add_bots_interface(self, instance_name)
......
......@@ -23,8 +23,9 @@ import logging
import grpc
from google.protobuf.empty_pb2 import Empty
from google.protobuf import empty_pb2, timestamp_pb2
from buildgrid._enums import BotStatus
from buildgrid._exceptions import InvalidArgumentError, OutOfSyncError
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
......@@ -32,24 +33,82 @@ from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grp
class BotsService(bots_pb2_grpc.BotsServicer):
def __init__(self, server):
def __init__(self, server, monitor=True):
self.__logger = logging.getLogger(__name__)
self.__bots_by_status = {}
self.__bots_by_instance = {}
self.__bots = {}
self._instances = {}
self._is_monitored = True
bots_pb2_grpc.add_BotsServicer_to_server(self, server)
def add_instance(self, name, instance):
self._instances[name] = instance
if self._is_monitored:
self.__bots_by_status[BotStatus.OK] = set()
self.__bots_by_status[BotStatus.UNHEALTHY] = set()
self.__bots_by_status[BotStatus.HOST_REBOOTING] = set()
self.__bots_by_status[BotStatus.BOT_TERMINATING] = set()
# --- Public API ---
def add_instance(self, instance_name, instance):
"""Registers a new servicer instance.
Args:
instance_name (str): The new instance's name.
instance (BotsInterface): The new instance itself.
"""
self._instances[instance_name] = instance
if self._is_monitored:
self.__bots_by_instance[instance_name] = 0
def get_scheduler(self, instance_name):
"""Retrieves a reference to the scheduler for an instance.
Args:
instance_name (str): The name of the instance to query.
Returns:
Scheduler: A reference to the scheduler for `instance_name`.
Raises:
InvalidArgumentError: If no instance named `instance_name` exists.
"""
instance = self._get_instance(instance_name)
return instance.scheduler
# --- Public API: Servicer ---
def CreateBotSession(self, request, context):
"""Handles CreateBotSessionRequest messages.
Args:
request (CreateBotSessionRequest): The incoming RPC request.
context (grpc.ServicerContext): Context for the RPC call.
"""
self.__logger.debug("CreateBotSession request from [%s]", context.peer())
instance_name = request.parent
bot_status = BotStatus(request.bot_session.status)
bot_id = request.bot_session.bot_id
try:
parent = request.parent
instance = self._get_instance(request.parent)
return instance.create_bot_session(parent,
request.bot_session)
instance = self._get_instance(instance_name)
bot_session = instance.create_bot_session(instance_name,
request.bot_session)
now = timestamp_pb2.Timestamp()
now.GetCurrentTime()
if self._is_monitored:
self.__bots[bot_id] = now
self.__bots_by_instance[instance_name] += 1
self.__bots_by_status[bot_status].add(bot_id)
return bot_session
except InvalidArgumentError as e:
self.__logger.error(e)
......@@ -59,17 +118,36 @@ class BotsService(bots_pb2_grpc.BotsServicer):
return bots_pb2.BotSession()
def UpdateBotSession(self, request, context):
"""Handles UpdateBotSessionRequest messages.
Args:
request (UpdateBotSessionRequest): The incoming RPC request.
context (grpc.ServicerContext): Context for the RPC call.
"""
self.__logger.debug("UpdateBotSession request from [%s]", context.peer())
names = request.name.split("/")
bot_status = BotStatus(request.bot_session.status)
bot_id = request.bot_session.bot_id
try:
names = request.name.split("/")
# Operation name should be in format:
# {instance/name}/{uuid}
instance_name = ''.join(names[0:-1])
instance_name = '/'.join(names[:-1])
instance = self._get_instance(instance_name)
return instance.update_bot_session(request.name,
request.bot_session)
bot_session = instance.update_bot_session(request.name,
request.bot_session)
if self._is_monitored:
self.__bots[bot_id].GetCurrentTime()
if bot_id not in self.__bots_by_status[bot_status]:
self.__bots_by_status[BotStatus.OK].discard(bot_id)
self.__bots_by_status[BotStatus.UNHEALTHY].discard(bot_id)
self.__bots_by_status[BotStatus.HOST_REBOOTING].discard(bot_id)
self.__bots_by_status[BotStatus.BOT_TERMINATING].discard(bot_id)
self.__bots_by_status[bot_status].add(bot_id)
return bot_session
except InvalidArgumentError as e:
self.__logger.error(e)
......@@ -89,10 +167,40 @@ class BotsService(bots_pb2_grpc.BotsServicer):
return bots_pb2.BotSession()
def PostBotEventTemp(self, request, context):
"""Handles PostBotEventTempRequest messages.
Args:
request (PostBotEventTempRequest): The incoming RPC request.
context (grpc.ServicerContext): Context for the RPC call.
"""
self.__logger.debug("PostBotEventTemp request from [%s]", context.peer())
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
return Empty()
return empty_pb2.Empty()
# --- Public API: Monitoring ---
@property
def is_monitored(self):
return self._is_monitored
def query_n_bots(self):
return len(self.__bots)
def query_n_bots_for_instance(self, instance_name):
try:
return self.__bots_by_instance[instance_name]
except KeyError:
return 0
def query_n_bots_for_status(self, bot_status):
try:
return len(self.__bots_by_status[bot_status])
except KeyError:
return 0
# --- Private API ---
def _get_instance(self, name):
try:
......
......@@ -35,6 +35,10 @@ class ExecutionInstance:
self._storage = storage
self._scheduler = scheduler
@property
def scheduler(self):
return self._scheduler
def register_instance_with_server(self, instance_name, server):
server.add_execution_instance(self, instance_name)
......
......@@ -33,30 +33,79 @@ from buildgrid._protos.google.longrunning import operations_pb2
class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
def __init__(self, server):
def __init__(self, server, monitor=True):
self.__logger = logging.getLogger(__name__)
self.__peers_by_instance = {}
self.__peers = {}
self._instances = {}
self._is_monitored = True
remote_execution_pb2_grpc.add_ExecutionServicer_to_server(self, server)
def add_instance(self, name, instance):
self._instances[name] = instance
# --- Public API ---
def add_instance(self, instance_name, instance):
"""Registers a new servicer instance.
Args:
instance_name (str): The new instance's name.
instance (ExecutionInstance): The new instance itself.
"""
self._instances[instance_name] = instance
if self._is_monitored:
self.__peers_by_instance[instance_name] = set()
def get_scheduler(self, instance_name):
"""Retrieves a reference to the scheduler for an instance.
Args:
instance_name (str): The name of the instance to query.
Returns:
Scheduler: A reference to the scheduler for `instance_name`.
Raises:
InvalidArgumentError: If no instance named `instance_name` exists.
"""
instance = self._get_instance(instance_name)
return instance.scheduler
# --- Public API: Servicer ---
def Execute(self, request, context):
"""Handles ExecuteRequest messages.
Args:
request (ExecuteRequest): The incoming RPC request.
context (grpc.ServicerContext): Context for the RPC call.
"""
self.__logger.debug("Execute request from [%s]", context.peer())
instance_name = request.instance_name
message_queue = queue.Queue()
peer = context.peer()
try:
message_queue = queue.Queue()
instance = self._get_instance(request.instance_name)
instance = self._get_instance(instance_name)
operation = instance.execute(request.action_digest,
request.skip_cache_lookup,
message_queue)
context.add_callback(partial(instance.unregister_message_client,
operation.name, message_queue))
context.add_callback(partial(self._rpc_termination_callback,
peer, instance_name, operation.name, message_queue))
instanced_op_name = "{}/{}".format(request.instance_name,
operation.name)
if self._is_monitored:
if peer not in self.__peers:
self.__peers_by_instance[instance_name].add(peer)
self.__peers[peer] = 1
else:
self.__peers[peer] += 1
instanced_op_name = "{}/{}".format(instance_name, operation.name)
self.__logger.info("Operation name: [%s]", instanced_op_name)
......@@ -80,23 +129,37 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
yield operations_pb2.Operation()
def WaitExecution(self, request, context):
"""Handles WaitExecutionRequest messages.
Args:
request (WaitExecutionRequest): The incoming RPC request.
context (grpc.ServicerContext): Context for the RPC call.
"""
self.__logger.debug("WaitExecution request from [%s]", context.peer())
try:
names = request.name.split("/")
names = request.name.split('/')
instance_name = '/'.join(names[:-1])
operation_name = names[-1]
message_queue = queue.Queue()
peer = context.peer()
# Operation name should be in format:
# {instance/name}/{operation_id}
instance_name = ''.join(names[0:-1])
try:
if instance_name != request.instance_name:
raise InvalidArgumentError("Invalid operation [{}] for instance [{}]"
.format(request.name, instance_name))
message_queue = queue.Queue()
operation_name = names[-1]
instance = self._get_instance(instance_name)
instance.register_message_client(operation_name, message_queue)
context.add_callback(partial(self._rpc_termination_callback,
peer, instance_name, operation_name, message_queue))
context.add_callback(partial(instance.unregister_message_client,
operation_name, message_queue))
if self._is_monitored:
if peer not in self.__peers:
self.__peers_by_instance[instance_name].add(peer)
self.__peers[peer] = 1
else:
self.__peers[peer] += 1
for operation in instance.stream_operation_updates(message_queue,
operation_name):
......@@ -111,6 +174,35 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
yield operations_pb2.Operation()
# --- Public API: Monitoring ---
@property
def is_monitored(self):
return self._is_monitored
def query_n_clients(self):
return len(self.__peers)
def query_n_clients_for_instance(self, instance_name):
try:
return len(self.__peers_by_instance[instance_name])
except KeyError:
return 0
# --- Private API ---
def _rpc_termination_callback(self, peer, instance_name, job_name, message_queue):
instance = self._get_instance(instance_name)
instance.unregister_message_client(job_name, message_queue)
if self._is_monitored:
if self.__peers[peer] > 1:
self.__peers[peer] -= 1
else:
self.__peers_by_instance[instance_name].remove(peer)
del self.__peers[peer]
def _get_instance(self, name):
try:
return self._instances[name]
......
......@@ -13,18 +13,21 @@
# limitations under the License.
import asyncio
from concurrent import futures
import logging
import os
import time
import grpc
from .cas.service import ByteStreamService, ContentAddressableStorageService
from .actioncache.service import ActionCacheService
from .execution.service import ExecutionService
from .operations.service import OperationsService
from .bots.service import BotsService
from .referencestorage.service import ReferenceStorageService
from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
from buildgrid.server.actioncache.service import ActionCacheService
from buildgrid.server.execution.service import ExecutionService
from buildgrid.server.operations.service import OperationsService
from buildgrid.server.bots.service import BotsService
from buildgrid.server.referencestorage.service import ReferenceStorageService
from buildgrid.settings import MONITORING_PERIOD
class BuildGridServer:
......@@ -46,9 +49,11 @@ class BuildGridServer:
# Use max_workers default from Python 3.5+
max_workers = (os.cpu_count() or 1) * 5
server = grpc.server(futures.ThreadPoolExecutor(max_workers))
self.__grpc_executor = futures.ThreadPoolExecutor(max_workers)
self.__grpc_server = grpc.server(self.__grpc_executor)
self._server = server
self.__main_loop = asyncio.get_event_loop()
self.__monitoring_task = None
self._execution_service = None
self._bots_service = None
......@@ -58,15 +63,32 @@ class BuildGridServer:
self._cas_service = None
self._bytestream_service = None
self._instances = set()
# --- Public API ---
def start(self):
"""Starts the server.
"""Starts the BuildGrid server.
"""
self._server.start()
self.__grpc_server.start()
self.__monitoring_task = asyncio.ensure_future(
self._monitoring_worker(period=MONITORING_PERIOD), loop=self.__main_loop)
self.__main_loop.run_forever()
def stop(self, grace=0):
"""Stops the server.
"""Stops the BuildGrid server.
Args:
grace (int, optional): A duration of time in seconds. Defaults to 0.
"""
self._server.stop(grace)
if self.__monitoring_task is not None:
self.__monitoring_task.cancel()
self.__grpc_server.stop(grace)
if grace > 0:
time.sleep(grace)
def add_port(self, address, credentials):
"""Adds a port to the server.
......@@ -80,11 +102,11 @@ class BuildGridServer:
"""
if credentials is not None:
self.__logger.info("Adding secure connection on: [%s]", address)
self._server.add_secure_port(address, credentials)
self.__grpc_server.add_secure_port(address, credentials)
else:
self.__logger.info("Adding insecure connection on [%s]", address)
self._server.add_insecure_port(address)
self.__grpc_server.add_insecure_port(address)
def add_execution_instance(self, instance, instance_name):
"""Adds an :obj:`ExecutionInstance` to the service.
......@@ -96,10 +118,11 @@ class BuildGridServer:
instance_name (str): Instance name.
"""
if self._execution_service is None:
self._execution_service = ExecutionService(self._server)
self._execution_service = ExecutionService(self.__grpc_server)
self._execution_service.add_instance(instance_name, instance)
self._instances.add(instance_name)
def add_bots_interface(self, instance, instance_name):
"""Adds a :obj:`BotsInterface` to the service.
......@@ -110,10 +133,11 @@ class BuildGridServer:
instance_name (str): Instance name.
"""
if self._bots_service is None:
self._bots_service = BotsService(self._server)
self._bots_service = BotsService(self.__grpc_server)
self._bots_service.add_instance(instance_name, instance)
self._instances.add(instance_name)
def add_operations_instance(self, instance, instance_name):
"""Adds an :obj:`OperationsInstance` to the service.
......@@ -124,8 +148,7 @@ class BuildGridServer:
instance_name (str): Instance name.
"""
if self._operations_service is None:
self._operations_service = OperationsService(self._server)
self._operations_service = OperationsService(self.__grpc_server)
self._operations_service.add_instance(instance_name, instance)
def add_reference_storage_instance(self, instance, instance_name):
......@@ -138,8 +161,7 @@ class BuildGridServer:
instance_name (str): Instance name.
"""
if self._reference_storage_service is None:
self._reference_storage_service = ReferenceStorageService(self._server)
self._reference_storage_service = ReferenceStorageService(self.__grpc_server)
self._reference_storage_service.add_instance(instance_name, instance)
def add_action_cache_instance(self, instance, instance_name):
......@@ -152,8 +174,7 @@ class BuildGridServer:
instance_name (str): Instance name.
"""
if self._action_cache_service is None:
self._action_cache_service = ActionCacheService(self._server)
self._action_cache_service = ActionCacheService(self.__grpc_server)
self._action_cache_service.add_instance(instance_name, instance)
def add_cas_instance(self, instance, instance_name):
......@@ -166,8 +187,7 @@ class BuildGridServer:
instance_name (str): Instance name.
"""
if self._cas_service is None:
self._cas_service = ContentAddressableStorageService(self._server)
self._cas_service = ContentAddressableStorageService(self.__grpc_server)
self._cas_service.add_instance(instance_name, instance)
def add_bytestream_instance(self, instance, instance_name):
......@@ -180,6 +200,33 @@ class BuildGridServer:
instance_name (str): Instance name.
"""
if self._bytestream_service is None:
self._bytestream_service = ByteStreamService(self._server)
self._bytestream_service = ByteStreamService(self.__grpc_server)
self._bytestream_service.add_instance(instance_name, instance)
# --- Private API ---
async def _monitoring_worker(self, period=1):
while True:
try:
n_clients = self._execution_service.query_n_clients()
n_bots = self._bots_service.query_n_bots()
print('---')
print('Totals: n_clients={}, n_bots={}'.format(n_clients, n_bots))
print('Per instances:')
for instance_name in self._instances:
n_clients = self._execution_service.query_n_clients_for_instance(instance_name)
n_bots = self._bots_service.query_n_bots_for_instance(instance_name)
am_queue_time = self._execution_service.get_scheduler(instance_name).query_am_queue_time()
instance_name = instance_name or 'empty'
print(' - {}: n_clients={}, n_bots={}, am_queue_time={}'
.format(instance_name, n_clients, n_bots, am_queue_time))
await asyncio.sleep(period)
except asyncio.CancelledError:
break
self.__main_loop.stop()
......@@ -13,10 +13,11 @@
# limitations under the License.
from datetime import datetime
import logging
import uuid
from google.protobuf import timestamp_pb2
from google.protobuf import duration_pb2, timestamp_pb2
from buildgrid._enums import LeaseState, OperationStage
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
......@@ -37,6 +38,7 @@ class Job:
self.__execute_response = None
self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
self.__queued_timestamp = timestamp_pb2.Timestamp()
self.__queued_time_duration = duration_pb2.Duration()
self.__worker_start_timestamp = timestamp_pb2.Timestamp()
self.__worker_completed_timestamp = timestamp_pb2.Timestamp()
......@@ -50,6 +52,8 @@ class Job:
self._operation.done = False
self._n_tries = 0
# --- Public API ---
@property
def name(self):
return self._name
......@@ -179,7 +183,7 @@ class Job:
result.Unpack(action_result)
action_metadata = action_result.execution_metadata
action_metadata.queued_timestamp.CopyFrom(self.__worker_start_timestamp)
action_metadata.queued_timestamp.CopyFrom(self.__queued_timestamp)
action_metadata.worker_start_timestamp.CopyFrom(self.__worker_start_timestamp)
action_metadata.worker_completed_timestamp.CopyFrom(self.__worker_completed_timestamp)
......@@ -204,6 +208,10 @@ class Job:
self.__queued_timestamp.GetCurrentTime()
self._n_tries += 1
elif self.__operation_metadata.stage == OperationStage.EXECUTING.value:
queue_in, queue_out = self.__queued_timestamp.ToDatetime(), datetime.now()
self.__queued_time_duration.FromTimedelta(queue_out - queue_in)
elif self.__operation_metadata.stage == OperationStage.COMPLETED.value:
if self.__execute_response is not None:
self._operation.response.Pack(self.__execute_response)
......@@ -213,3 +221,11 @@ class Job:
for queue in self._operation_update_queues:
queue.put(self._operation)
# --- Public API: Monitoring ---
def query_queue_time(self):
return self.__queued_time_duration.ToTimedelta()
def query_n_retries(self):
return self._n_tries - 1 if self._n_tries > 0 else 0
......@@ -32,6 +32,10 @@ class OperationsInstance:
self._scheduler = scheduler
@property
def scheduler(self):
return self._scheduler
def register_instance_with_server(self, instance_name, server):
server.add_operations_instance(self, instance_name)
......
......@@ -38,8 +38,34 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
operations_pb2_grpc.add_OperationsServicer_to_server(self, server)
def add_instance(self, name, instance):
self._instances[name] = instance
# --- Public API ---
def add_instance(self, instance_name, instance):
"""Registers a new servicer instance.
Args:
instance_name (str): The new instance's name.
instance (OperationsInstance): The new instance itself.
"""
self._instances[instance_name] = instance
def get_scheduler(self, instance_name):
"""Retrieves a reference to the scheduler for an instance.
Args:
instance_name (str): The name of the instance to query.
Returns:
Scheduler: A reference to the scheduler for `instance_name`.
Raises:
InvalidArgumentError: If no instance named `instance_name` exists.
"""
instance = self._get_instance(instance_name)
return instance.scheduler
# --- Public API: Servicer ---
def GetOperation(self, request, context):
self.__logger.debug("GetOperation request from [%s]", context.peer())
......@@ -132,6 +158,8 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
return Empty()
# --- Private API ---
def _parse_instance_name(self, name):
""" If the instance name is not blank, 'name' will have the form
{instance_name}/{operation_uuid}. Otherwise, it will just be
......
......@@ -22,22 +22,29 @@ Schedules jobs.
from collections import deque
import logging
from buildgrid._enums import LeaseState, OperationStage
from buildgrid._exceptions import NotFoundError
from .job import OperationStage, LeaseState
class Scheduler:
MAX_N_TRIES = 5
def __init__(self, action_cache=None):
def __init__(self, action_cache=None, monitor=True):
self.__logger = logging.getLogger(__name__)
self.__queue_times_by_priority = {}
self.__queue_time_average = 0, 0.0
self.__retries_by_error = {}
self.__retries_count = 0
self._action_cache = action_cache
self._is_monitored = True
self.jobs = {}
self.queue = deque()
# --- Public API ---
def register_client(self, job_name, queue):
self.jobs[job_name].register_client(queue)
......@@ -66,18 +73,22 @@ class Scheduler:
operation_stage = OperationStage.QUEUED
self.queue.append(job)
job.update_operation_stage(operation_stage)
self._update_job_operation_stage(job, operation_stage)
def retry_job(self, job_name):
if job_name in self.jobs:
job = self.jobs[job_name]
if job.n_tries >= self.MAX_N_TRIES:
# TODO: Decide what to do with these jobs
job.update_operation_stage(OperationStage.COMPLETED)
# TODO: Mark these jobs as done
else:
job.update_operation_stage(OperationStage.QUEUED)
self.queue.appendleft(job)
job = self.jobs[job_name]
operation_stage = None
if job.n_tries >= self.MAX_N_TRIES:
# TODO: Decide what to do with these jobs
operation_stage = OperationStage.COMPLETED
# TODO: Mark these jobs as done
else:
operation_stage = OperationStage.QUEUED
self.queue.appendleft(job)
self._update_job_operation_stage(job, operation_stage)
def list_jobs(self):
return self.jobs.values()
......@@ -112,13 +123,14 @@ class Scheduler:
"""
job = self.jobs[job_name]
operation_stage = None
if lease_state == LeaseState.PENDING:
job.update_lease_state(LeaseState.PENDING)
job.update_operation_stage(OperationStage.QUEUED)
operation_stage = OperationStage.QUEUED
elif lease_state == LeaseState.ACTIVE:
job.update_lease_state(LeaseState.ACTIVE)
job.update_operation_stage(OperationStage.EXECUTING)
operation_stage = OperationStage.EXECUTING
elif lease_state == LeaseState.COMPLETED:
job.update_lease_state(LeaseState.COMPLETED,
......@@ -127,7 +139,9 @@ class Scheduler:
if self._action_cache is not None and not job.do_not_cache:
self._action_cache.update_action_result(job.action_digest, job.action_result)
job.update_operation_stage(OperationStage.COMPLETED)
operation_stage = OperationStage.COMPLETED
self._update_job_operation_stage(job, operation_stage)
def get_job_lease(self, job_name):
"""Returns the lease associated to job, if any have been emitted yet."""
......@@ -136,3 +150,59 @@ class Scheduler:
def get_job_operation(self, job_name):
"""Returns the operation associated to job."""
return self.jobs[job_name].operation
# --- Public API: Monitoring ---
@property
def is_monitored(self):
return self._is_monitored
def query_n_jobs(self):
return len(self.jobs)
def query_n_operations(self):
return len(self.jobs)
def query_n_operations_by_stage(self):
return len(self.jobs)
def query_n_leases(self):
return len(self.jobs)
def query_n_leases_by_state(self):
return len(self.jobs)
def query_n_retries(self):
return self.__retries_count
def query_n_retries_for_error(self, error_type):
try:
return self.__retries_by_error[error_type]
except KeyError:
return 0
def query_am_queue_time(self):
return self.__queue_time_average[1]
def query_am_queue_time_for_priority(self, priority_level):
try:
return self.__queue_times_by_priority[priority_level]
except KeyError:
return 0
# --- Private API ---
def _update_job_operation_stage(self, job, stage):
job.update_operation_stage(stage)
if self._is_monitored and stage == OperationStage.COMPLETED:
average_order, average_time = self.__queue_time_average
queue_time = job.query_queue_time().total_seconds()
average_order += 1
if average_order > 1:
average_time = average_time + (queue_time - average_time / average_order)
else:
average_time = queue_time
self.__queue_time_average = average_order, queue_time
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
# The hash function that CAS uses
# Hash function used for computing digests:
HASH = hashlib.sha256
# Lenght in bytes of a hash string returned by HASH:
HASH_LENGTH = HASH().digest_size * 2
# Period, in seconds, for the monitoring cycle:
MONITORING_PERIOD = 5.0
......@@ -112,13 +112,15 @@ setup(
license="Apache License, Version 2.0",
description="A remote execution service",
packages=find_packages(),
python_requires='>= 3.5.3', # janus requirement
install_requires=[
'protobuf',
'grpcio',
'Click',
'PyYAML',
'boto3 < 1.8.0',
'botocore < 1.11.0',
'click',
'grpcio',
'janus',
'protobuf',
'pyyaml',
],
entry_points={
'console_scripts': [
......
#!/bin/bash
echo "Hello, World!"