Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • edbaunton/buildgrid
  • BuildGrid/buildgrid
  • bloomberg/buildgrid
  • devcurmudgeon/buildgrid
  • mhadjimichael/buildgrid
  • jmacarthur/buildgrid
  • rkothur/buildgrid
  • valentindavid/buildgrid
  • jjardon/buildgrid
  • RichKen/buildgrid
  • jbonney/buildgrid
  • onsha_alexander/buildgrid
  • santigl/buildgrid
  • mostynb/buildgrid
  • hoffbrinkle/buildgrid
  • Malinskiy/buildgrid
  • coldtom/buildgrid
  • azeemb_a/buildgrid
  • pointswaves/buildgrid
  • BenjaminSchubert/buildgrid
  • michaellee8/buildgrid
  • anil-anil/buildgrid
  • seanborg/buildgrid
  • jdelong12/buildgrid
  • jclay/buildgrid
  • bweston92/buildgrid
  • zchen723/buildgrid
  • cpratt34/buildgrid
  • armbiant/apache-buildgrid
  • armbiant/android-buildgrid
  • itsme300/buildgrid
  • sbairoliya/buildgrid
32 results
Show changes
Commits on Source (9)
...@@ -460,6 +460,7 @@ known-third-party=boto3, ...@@ -460,6 +460,7 @@ known-third-party=boto3,
enchant, enchant,
google, google,
grpc, grpc,
janus,
moto, moto,
yaml yaml
...@@ -523,4 +524,4 @@ valid-metaclass-classmethod-first-arg=mcs ...@@ -523,4 +524,4 @@ valid-metaclass-classmethod-first-arg=mcs
# Exceptions that will emit a warning when being caught. Defaults to # Exceptions that will emit a warning when being caught. Defaults to
# "Exception" # "Exception"
overgeneral-exceptions=Exception overgeneral-exceptions=Exception
\ No newline at end of file
...@@ -37,6 +37,10 @@ class BotsInterface: ...@@ -37,6 +37,10 @@ class BotsInterface:
self._bot_sessions = {} self._bot_sessions = {}
self._scheduler = scheduler self._scheduler = scheduler
@property
def scheduler(self):
return self._scheduler
def register_instance_with_server(self, instance_name, server): def register_instance_with_server(self, instance_name, server):
server.add_bots_interface(self, instance_name) server.add_bots_interface(self, instance_name)
......
...@@ -23,8 +23,9 @@ import logging ...@@ -23,8 +23,9 @@ import logging
import grpc import grpc
from google.protobuf.empty_pb2 import Empty from google.protobuf import empty_pb2, timestamp_pb2
from buildgrid._enums import BotStatus
from buildgrid._exceptions import InvalidArgumentError, OutOfSyncError from buildgrid._exceptions import InvalidArgumentError, OutOfSyncError
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2 from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
...@@ -32,24 +33,65 @@ from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grp ...@@ -32,24 +33,65 @@ from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grp
class BotsService(bots_pb2_grpc.BotsServicer): class BotsService(bots_pb2_grpc.BotsServicer):
def __init__(self, server): def __init__(self, server, monitor=True):
self.__logger = logging.getLogger(__name__) self.__logger = logging.getLogger(__name__)
self.__bots_by_status = None
self.__bots_by_instance = None
self.__bots = None
self._instances = {} self._instances = {}
bots_pb2_grpc.add_BotsServicer_to_server(self, server) bots_pb2_grpc.add_BotsServicer_to_server(self, server)
def add_instance(self, name, instance): self._is_instrumented = monitor
self._instances[name] = instance
if self._is_instrumented:
self.__bots_by_status = {}
self.__bots_by_instance = {}
self.__bots = {}
self.__bots_by_status[BotStatus.OK] = set()
self.__bots_by_status[BotStatus.UNHEALTHY] = set()
self.__bots_by_status[BotStatus.HOST_REBOOTING] = set()
self.__bots_by_status[BotStatus.BOT_TERMINATING] = set()
# --- Public API ---
def add_instance(self, instance_name, instance):
self._instances[instance_name] = instance
if self._is_instrumented:
self.__bots_by_instance[instance_name] = 0
# --- Public API: Servicer ---
def CreateBotSession(self, request, context): def CreateBotSession(self, request, context):
"""Handles CreateBotSessionRequest messages.
Args:
request (CreateBotSessionRequest): The incoming RPC request.
context (grpc.ServicerContext): Context for the RPC call.
"""
self.__logger.debug("CreateBotSession request from [%s]", context.peer()) self.__logger.debug("CreateBotSession request from [%s]", context.peer())
instance_name = request.parent
bot_status = BotStatus(request.bot_session.status)
bot_id = request.bot_session.bot_id
try: try:
parent = request.parent instance = self._get_instance(instance_name)
instance = self._get_instance(request.parent) bot_session = instance.create_bot_session(instance_name,
return instance.create_bot_session(parent, request.bot_session)
request.bot_session) now = timestamp_pb2.Timestamp()
now.GetCurrentTime()
if self._is_instrumented:
self.__bots[bot_id] = now
self.__bots_by_instance[instance_name] += 1
self.__bots_by_status[bot_status].add(bot_id)
return bot_session
except InvalidArgumentError as e: except InvalidArgumentError as e:
self.__logger.error(e) self.__logger.error(e)
...@@ -59,17 +101,36 @@ class BotsService(bots_pb2_grpc.BotsServicer): ...@@ -59,17 +101,36 @@ class BotsService(bots_pb2_grpc.BotsServicer):
return bots_pb2.BotSession() return bots_pb2.BotSession()
def UpdateBotSession(self, request, context): def UpdateBotSession(self, request, context):
"""Handles UpdateBotSessionRequest messages.
Args:
request (UpdateBotSessionRequest): The incoming RPC request.
context (grpc.ServicerContext): Context for the RPC call.
"""
self.__logger.debug("UpdateBotSession request from [%s]", context.peer()) self.__logger.debug("UpdateBotSession request from [%s]", context.peer())
names = request.name.split("/")
bot_status = BotStatus(request.bot_session.status)
bot_id = request.bot_session.bot_id
try: try:
names = request.name.split("/") instance_name = '/'.join(names[:-1])
# Operation name should be in format:
# {instance/name}/{uuid}
instance_name = ''.join(names[0:-1])
instance = self._get_instance(instance_name) instance = self._get_instance(instance_name)
return instance.update_bot_session(request.name, bot_session = instance.update_bot_session(request.name,
request.bot_session) request.bot_session)
if self._is_instrumented:
self.__bots[bot_id].GetCurrentTime()
if bot_id not in self.__bots_by_status[bot_status]:
self.__bots_by_status[BotStatus.OK].discard(bot_id)
self.__bots_by_status[BotStatus.UNHEALTHY].discard(bot_id)
self.__bots_by_status[BotStatus.HOST_REBOOTING].discard(bot_id)
self.__bots_by_status[BotStatus.BOT_TERMINATING].discard(bot_id)
self.__bots_by_status[bot_status].add(bot_id)
return bot_session
except InvalidArgumentError as e: except InvalidArgumentError as e:
self.__logger.error(e) self.__logger.error(e)
...@@ -89,10 +150,46 @@ class BotsService(bots_pb2_grpc.BotsServicer): ...@@ -89,10 +150,46 @@ class BotsService(bots_pb2_grpc.BotsServicer):
return bots_pb2.BotSession() return bots_pb2.BotSession()
def PostBotEventTemp(self, request, context): def PostBotEventTemp(self, request, context):
"""Handles PostBotEventTempRequest messages.
Args:
request (PostBotEventTempRequest): The incoming RPC request.
context (grpc.ServicerContext): Context for the RPC call.
"""
self.__logger.debug("PostBotEventTemp request from [%s]", context.peer()) self.__logger.debug("PostBotEventTemp request from [%s]", context.peer())
context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_code(grpc.StatusCode.UNIMPLEMENTED)
return Empty()
return empty_pb2.Empty()
# --- Public API: Monitoring ---
@property
def is_instrumented(self):
return self._is_instrumented
def query_n_bots(self):
if self.__bots is not None:
return len(self.__bots)
return 0
def query_n_bots_for_instance(self, instance_name):
try:
if self.__bots_by_instance is not None:
return self.__bots_by_instance[instance_name]
except KeyError:
pass
return 0
def query_n_bots_for_status(self, bot_status):
try:
if self.__bots_by_status is not None:
return len(self.__bots_by_status[bot_status])
except KeyError:
pass
return 0
# --- Private API ---
def _get_instance(self, name): def _get_instance(self, name):
try: try:
......
...@@ -35,6 +35,10 @@ class ExecutionInstance: ...@@ -35,6 +35,10 @@ class ExecutionInstance:
self._storage = storage self._storage = storage
self._scheduler = scheduler self._scheduler = scheduler
@property
def scheduler(self):
return self._scheduler
def register_instance_with_server(self, instance_name, server): def register_instance_with_server(self, instance_name, server):
server.add_execution_instance(self, instance_name) server.add_execution_instance(self, instance_name)
......
...@@ -33,30 +33,84 @@ from buildgrid._protos.google.longrunning import operations_pb2 ...@@ -33,30 +33,84 @@ from buildgrid._protos.google.longrunning import operations_pb2
class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
def __init__(self, server): def __init__(self, server, monitor=True):
self.__logger = logging.getLogger(__name__) self.__logger = logging.getLogger(__name__)
self.__peers_by_instance = None
self.__peers = None
self._instances = {} self._instances = {}
remote_execution_pb2_grpc.add_ExecutionServicer_to_server(self, server) remote_execution_pb2_grpc.add_ExecutionServicer_to_server(self, server)
def add_instance(self, name, instance): self._is_instrumented = monitor
self._instances[name] = instance
if self._is_instrumented:
self.__peers_by_instance = {}
self.__peers = {}
# --- Public API ---
def add_instance(self, instance_name, instance):
"""Registers a new servicer instance.
Args:
instance_name (str): The new instance's name.
instance (ExecutionInstance): The new instance itself.
"""
self._instances[instance_name] = instance
if self._is_instrumented:
self.__peers_by_instance[instance_name] = set()
def get_scheduler(self, instance_name):
"""Retrieves a reference to the scheduler for an instance.
Args:
instance_name (str): The name of the instance to query.
Returns:
Scheduler: A reference to the scheduler for `instance_name`.
Raises:
InvalidArgumentError: If no instance named `instance_name` exists.
"""
instance = self._get_instance(instance_name)
return instance.scheduler
# --- Public API: Servicer ---
def Execute(self, request, context): def Execute(self, request, context):
"""Handles ExecuteRequest messages.
Args:
request (ExecuteRequest): The incoming RPC request.
context (grpc.ServicerContext): Context for the RPC call.
"""
self.__logger.debug("Execute request from [%s]", context.peer()) self.__logger.debug("Execute request from [%s]", context.peer())
instance_name = request.instance_name
message_queue = queue.Queue()
peer = context.peer()
try: try:
message_queue = queue.Queue() instance = self._get_instance(instance_name)
instance = self._get_instance(request.instance_name)
operation = instance.execute(request.action_digest, operation = instance.execute(request.action_digest,
request.skip_cache_lookup, request.skip_cache_lookup,
message_queue) message_queue)
context.add_callback(partial(instance.unregister_message_client, context.add_callback(partial(self._rpc_termination_callback,
operation.name, message_queue)) peer, instance_name, operation.name, message_queue))
if self._is_instrumented:
if peer not in self.__peers:
self.__peers_by_instance[instance_name].add(peer)
self.__peers[peer] = 1
else:
self.__peers[peer] += 1
instanced_op_name = "{}/{}".format(request.instance_name, instanced_op_name = "{}/{}".format(instance_name, operation.name)
operation.name)
self.__logger.info("Operation name: [%s]", instanced_op_name) self.__logger.info("Operation name: [%s]", instanced_op_name)
...@@ -80,23 +134,37 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): ...@@ -80,23 +134,37 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
yield operations_pb2.Operation() yield operations_pb2.Operation()
def WaitExecution(self, request, context): def WaitExecution(self, request, context):
"""Handles WaitExecutionRequest messages.
Args:
request (WaitExecutionRequest): The incoming RPC request.
context (grpc.ServicerContext): Context for the RPC call.
"""
self.__logger.debug("WaitExecution request from [%s]", context.peer()) self.__logger.debug("WaitExecution request from [%s]", context.peer())
try: names = request.name.split('/')
names = request.name.split("/") instance_name = '/'.join(names[:-1])
operation_name = names[-1]
message_queue = queue.Queue()
peer = context.peer()
# Operation name should be in format: try:
# {instance/name}/{operation_id} if instance_name != request.instance_name:
instance_name = ''.join(names[0:-1]) raise InvalidArgumentError("Invalid operation [{}] for instance [{}]"
.format(request.name, instance_name))
message_queue = queue.Queue()
operation_name = names[-1]
instance = self._get_instance(instance_name) instance = self._get_instance(instance_name)
instance.register_message_client(operation_name, message_queue) instance.register_message_client(operation_name, message_queue)
context.add_callback(partial(self._rpc_termination_callback,
peer, instance_name, operation_name, message_queue))
context.add_callback(partial(instance.unregister_message_client, if self._is_instrumented:
operation_name, message_queue)) if peer not in self.__peers:
self.__peers_by_instance[instance_name].add(peer)
self.__peers[peer] = 1
else:
self.__peers[peer] += 1
for operation in instance.stream_operation_updates(message_queue, for operation in instance.stream_operation_updates(message_queue,
operation_name): operation_name):
...@@ -111,9 +179,42 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): ...@@ -111,9 +179,42 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
context.set_code(grpc.StatusCode.INVALID_ARGUMENT) context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
yield operations_pb2.Operation() yield operations_pb2.Operation()
# --- Private API ---
def _rpc_termination_callback(self, peer, instance_name, job_name, message_queue):
instance = self._get_instance(instance_name)
instance.unregister_message_client(job_name, message_queue)
if self._is_instrumented:
if self.__peers[peer] > 1:
self.__peers[peer] -= 1
else:
self.__peers_by_instance[instance_name].remove(peer)
del self.__peers[peer]
def _get_instance(self, name): def _get_instance(self, name):
try: try:
return self._instances[name] return self._instances[name]
except KeyError: except KeyError:
raise InvalidArgumentError("Instance doesn't exist on server: [{}]".format(name)) raise InvalidArgumentError("Instance doesn't exist on server: [{}]".format(name))
# --- Public API: Monitoring ---
@property
def is_instrumented(self):
return self._is_instrumented
def query_n_clients(self):
if self.__peers is not None:
return len(self.__peers)
return 0
def query_n_clients_for_instance(self, instance_name):
try:
if self.__peers_by_instance is not None:
return len(self.__peers_by_instance[instance_name])
except KeyError:
pass
return 0
...@@ -15,18 +15,23 @@ ...@@ -15,18 +15,23 @@
import asyncio import asyncio
from concurrent import futures from concurrent import futures
from datetime import timedelta
import logging import logging
import os import os
import time import time
import grpc import grpc
from buildgrid._enums import MetricRecordDomain, MetricRecordType
from buildgrid._protos.buildgrid.v2 import monitoring_pb2
from buildgrid.server.actioncache.service import ActionCacheService from buildgrid.server.actioncache.service import ActionCacheService
from buildgrid.server.bots.service import BotsService from buildgrid.server.bots.service import BotsService
from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
from buildgrid.server.execution.service import ExecutionService from buildgrid.server.execution.service import ExecutionService
from buildgrid.server._monitoring import MonitoringBus
from buildgrid.server.operations.service import OperationsService from buildgrid.server.operations.service import OperationsService
from buildgrid.server.referencestorage.service import ReferenceStorageService from buildgrid.server.referencestorage.service import ReferenceStorageService
from buildgrid.settings import MONITORING_PERIOD
class BuildGridServer: class BuildGridServer:
...@@ -36,7 +41,7 @@ class BuildGridServer: ...@@ -36,7 +41,7 @@ class BuildGridServer:
requisite services. requisite services.
""" """
def __init__(self, max_workers=None): def __init__(self, max_workers=None, monitor=True):
"""Initializes a new :class:`BuildGridServer` instance. """Initializes a new :class:`BuildGridServer` instance.
Args: Args:
...@@ -52,6 +57,9 @@ class BuildGridServer: ...@@ -52,6 +57,9 @@ class BuildGridServer:
self.__grpc_server = grpc.server(self.__grpc_executor) self.__grpc_server = grpc.server(self.__grpc_executor)
self.__main_loop = asyncio.get_event_loop() self.__main_loop = asyncio.get_event_loop()
self.__monitoring_bus = None
self.__state_monitoring_task = None
self._execution_service = None self._execution_service = None
self._bots_service = None self._bots_service = None
...@@ -61,10 +69,28 @@ class BuildGridServer: ...@@ -61,10 +69,28 @@ class BuildGridServer:
self._cas_service = None self._cas_service = None
self._bytestream_service = None self._bytestream_service = None
self._schedulers = {}
self._instances = set()
self._is_instrumented = monitor
if self._is_instrumented:
self.__monitoring_bus = MonitoringBus(self.__main_loop)
# --- Public API ---
def start(self): def start(self):
"""Starts the BuildGrid server. """Starts the BuildGrid server.
""" """
self.__grpc_server.start() self.__grpc_server.start()
if self._is_instrumented:
self.__monitoring_bus.start()
self.__state_monitoring_task = asyncio.ensure_future(
self._state_monitoring_worker(period=MONITORING_PERIOD),
loop=self.__main_loop)
self.__main_loop.run_forever() self.__main_loop.run_forever()
def stop(self, grace=0): def stop(self, grace=0):
...@@ -73,7 +99,11 @@ class BuildGridServer: ...@@ -73,7 +99,11 @@ class BuildGridServer:
Args: Args:
grace (int, optional): A duration of time in seconds. Defaults to 0. grace (int, optional): A duration of time in seconds. Defaults to 0.
""" """
self.__main_loop.close() if self._is_instrumented:
if self.__state_monitoring_task is not None:
self.__state_monitoring_task.cancel()
self.__monitoring_bus.stop()
self.__grpc_server.stop(grace) self.__grpc_server.stop(grace)
...@@ -109,9 +139,11 @@ class BuildGridServer: ...@@ -109,9 +139,11 @@ class BuildGridServer:
""" """
if self._execution_service is None: if self._execution_service is None:
self._execution_service = ExecutionService(self.__grpc_server) self._execution_service = ExecutionService(self.__grpc_server)
self._execution_service.add_instance(instance_name, instance) self._execution_service.add_instance(instance_name, instance)
self._schedulers[instance_name] = instance.scheduler
self._instances.add(instance_name)
def add_bots_interface(self, instance, instance_name): def add_bots_interface(self, instance, instance_name):
"""Adds a :obj:`BotsInterface` to the service. """Adds a :obj:`BotsInterface` to the service.
...@@ -123,9 +155,10 @@ class BuildGridServer: ...@@ -123,9 +155,10 @@ class BuildGridServer:
""" """
if self._bots_service is None: if self._bots_service is None:
self._bots_service = BotsService(self.__grpc_server) self._bots_service = BotsService(self.__grpc_server)
self._bots_service.add_instance(instance_name, instance) self._bots_service.add_instance(instance_name, instance)
self._instances.add(instance_name)
def add_operations_instance(self, instance, instance_name): def add_operations_instance(self, instance, instance_name):
"""Adds an :obj:`OperationsInstance` to the service. """Adds an :obj:`OperationsInstance` to the service.
...@@ -137,7 +170,6 @@ class BuildGridServer: ...@@ -137,7 +170,6 @@ class BuildGridServer:
""" """
if self._operations_service is None: if self._operations_service is None:
self._operations_service = OperationsService(self.__grpc_server) self._operations_service = OperationsService(self.__grpc_server)
self._operations_service.add_instance(instance_name, instance) self._operations_service.add_instance(instance_name, instance)
def add_reference_storage_instance(self, instance, instance_name): def add_reference_storage_instance(self, instance, instance_name):
...@@ -151,7 +183,6 @@ class BuildGridServer: ...@@ -151,7 +183,6 @@ class BuildGridServer:
""" """
if self._reference_storage_service is None: if self._reference_storage_service is None:
self._reference_storage_service = ReferenceStorageService(self.__grpc_server) self._reference_storage_service = ReferenceStorageService(self.__grpc_server)
self._reference_storage_service.add_instance(instance_name, instance) self._reference_storage_service.add_instance(instance_name, instance)
def add_action_cache_instance(self, instance, instance_name): def add_action_cache_instance(self, instance, instance_name):
...@@ -165,7 +196,6 @@ class BuildGridServer: ...@@ -165,7 +196,6 @@ class BuildGridServer:
""" """
if self._action_cache_service is None: if self._action_cache_service is None:
self._action_cache_service = ActionCacheService(self.__grpc_server) self._action_cache_service = ActionCacheService(self.__grpc_server)
self._action_cache_service.add_instance(instance_name, instance) self._action_cache_service.add_instance(instance_name, instance)
def add_cas_instance(self, instance, instance_name): def add_cas_instance(self, instance, instance_name):
...@@ -179,7 +209,6 @@ class BuildGridServer: ...@@ -179,7 +209,6 @@ class BuildGridServer:
""" """
if self._cas_service is None: if self._cas_service is None:
self._cas_service = ContentAddressableStorageService(self.__grpc_server) self._cas_service = ContentAddressableStorageService(self.__grpc_server)
self._cas_service.add_instance(instance_name, instance) self._cas_service.add_instance(instance_name, instance)
def add_bytestream_instance(self, instance, instance_name): def add_bytestream_instance(self, instance, instance_name):
...@@ -193,5 +222,163 @@ class BuildGridServer: ...@@ -193,5 +222,163 @@ class BuildGridServer:
""" """
if self._bytestream_service is None: if self._bytestream_service is None:
self._bytestream_service = ByteStreamService(self.__grpc_server) self._bytestream_service = ByteStreamService(self.__grpc_server)
self._bytestream_service.add_instance(instance_name, instance) self._bytestream_service.add_instance(instance_name, instance)
# --- Public API: Monitoring ---
@property
def is_instrumented(self):
return self._is_instrumented
# --- Private API ---
async def _state_monitoring_worker(self, period=1.0):
"""Periodically publishes state metrics to the monitoring bus."""
async def __state_monitoring_worker():
# Emit total clients count record:
_, record = self._query_n_clients()
await self.__monitoring_bus.send_record(record)
# Emit total bots count record:
_, record = self._query_n_bots()
await self.__monitoring_bus.send_record(record)
queue_times = []
# Emits records by instance:
for instance_name in self._instances:
# Emit instance clients count record:
_, record = self._query_n_clients_for_instance(instance_name)
await self.__monitoring_bus.send_record(record)
# Emit instance bots count record:
_, record = self._query_n_bots_for_instance(instance_name)
await self.__monitoring_bus.send_record(record)
# Emit instance average queue time record:
queue_time, record = self._query_am_queue_time_for_instance(instance_name)
await self.__monitoring_bus.send_record(record)
if queue_time:
queue_times.append(queue_time)
# Emit overall average queue time record:
if len(queue_times) > 0:
am_queue_time = sum(queue_times, timedelta()) / len(queue_times)
else:
am_queue_time = timedelta()
record = self._forge_timer_metric_record(
MetricRecordDomain.STATE,
'average-queue-time',
am_queue_time)
await self.__monitoring_bus.send_record(record)
print('---')
n_clients = self._execution_service.query_n_clients()
n_bots = self._bots_service.query_n_bots()
print('Totals: n_clients={}, n_bots={}, am_queue_time={}'
.format(n_clients, n_bots, am_queue_time))
print('Per instances:')
for instance_name in self._instances:
n_clients = self._execution_service.query_n_clients_for_instance(instance_name)
n_bots = self._bots_service.query_n_bots_for_instance(instance_name)
am_queue_time = self._execution_service.get_scheduler(instance_name).query_am_queue_time()
instance_name = instance_name or 'void'
print(' - {}: n_clients={}, n_bots={}, am_queue_time={}'
.format(instance_name, n_clients, n_bots, am_queue_time))
print('---')
try:
while True:
start = time.time()
await __state_monitoring_worker()
end = time.time()
await asyncio.sleep(period - (end - start))
except asyncio.CancelledError:
pass
except BaseException as e:
print(f'__state_monitoring_worker: {e}')
def _forge_counter_metric_record(self, domain, name, count, extra=None):
counter_record = monitoring_pb2.MetricRecord()
counter_record.creation_timestamp.GetCurrentTime()
counter_record.domain = domain.value
counter_record.type = MetricRecordType.COUNTER.value
counter_record.name = name
counter_record.count = count
if extra is not None:
counter_record.extra.update(extra)
return record
def _forge_timer_metric_record(self, domain, name, duration, extra=None):
timer_record = monitoring_pb2.MetricRecord()
timer_record.creation_timestamp.GetCurrentTime()
timer_record.domain = domain.value
timer_record.type = MetricRecordType.TIMER.value
timer_record.name = name
timer_record.duration.FromTimedelta(duration)
if extra is not None:
timer_record.extra.update(extra)
return timer_record
def _forge_gauge_metric_record(self, domain, name, value, extra=None):
gauge_record = monitoring_pb2.MetricRecord()
gauge_record.creation_timestamp.GetCurrentTime()
gauge_record.domain = domain.value
gauge_record.type = MetricRecordType.GAUGE.value
gauge_record.name = name
gauge_record.value = value
if extra is not None:
gauge_record.extra.update(extra)
return gauge_record
# --- Private API: Monitoring ---
def _query_n_clients(self):
"""Queries the number of clients connected."""
n_clients = self._execution_service.query_n_clients()
gauge_record = self._forge_gauge_metric_record(
MetricRecordDomain.STATE, 'clients-count', n_clients)
return n_clients, gauge_record
def _query_n_clients_for_instance(self, instance_name):
"""Queries the number of clients connected for a given instance"""
n_clients = self._execution_service.query_n_clients_for_instance(instance_name)
gauge_record = self._forge_gauge_metric_record(
MetricRecordDomain.STATE, 'clients-count', n_clients,
extra={ 'instance-name': instance_name or 'void' })
return n_clients, gauge_record
def _query_n_bots(self):
"""Queries the number of bots connected."""
n_bots = self._bots_service.query_n_bots()
gauge_record = self._forge_gauge_metric_record(
MetricRecordDomain.STATE, 'bots-count', n_bots)
return n_bots, gauge_record
def _query_n_bots_for_instance(self, instance_name):
"""Queries the number of bots connected for a given instance."""
n_bots = self._bots_service.query_n_bots_for_instance(instance_name)
gauge_record = self._forge_gauge_metric_record(
MetricRecordDomain.STATE, 'bots-count', n_bots,
extra={ 'instance-name': instance_name or 'void' })
return n_bots, gauge_record
def _query_am_queue_time_for_instance(self, instance_name):
"""Queries the average job's queue time for a given instance."""
am_queue_time = self._schedulers[instance_name].query_am_queue_time()
timer_record = self._forge_timer_metric_record(
MetricRecordDomain.STATE, 'average-queue-time', am_queue_time,
extra={ 'instance-name': instance_name or 'void' })
return am_queue_time, timer_record
...@@ -13,10 +13,11 @@ ...@@ -13,10 +13,11 @@
# limitations under the License. # limitations under the License.
from datetime import datetime
import logging import logging
import uuid import uuid
from google.protobuf import timestamp_pb2 from google.protobuf import duration_pb2, timestamp_pb2
from buildgrid._enums import LeaseState, OperationStage from buildgrid._enums import LeaseState, OperationStage
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
...@@ -37,6 +38,7 @@ class Job: ...@@ -37,6 +38,7 @@ class Job:
self.__execute_response = None self.__execute_response = None
self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata() self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
self.__queued_timestamp = timestamp_pb2.Timestamp() self.__queued_timestamp = timestamp_pb2.Timestamp()
self.__queued_time_duration = duration_pb2.Duration()
self.__worker_start_timestamp = timestamp_pb2.Timestamp() self.__worker_start_timestamp = timestamp_pb2.Timestamp()
self.__worker_completed_timestamp = timestamp_pb2.Timestamp() self.__worker_completed_timestamp = timestamp_pb2.Timestamp()
...@@ -50,6 +52,8 @@ class Job: ...@@ -50,6 +52,8 @@ class Job:
self._operation.done = False self._operation.done = False
self._n_tries = 0 self._n_tries = 0
# --- Public API ---
@property @property
def name(self): def name(self):
return self._name return self._name
...@@ -179,7 +183,7 @@ class Job: ...@@ -179,7 +183,7 @@ class Job:
result.Unpack(action_result) result.Unpack(action_result)
action_metadata = action_result.execution_metadata action_metadata = action_result.execution_metadata
action_metadata.queued_timestamp.CopyFrom(self.__worker_start_timestamp) action_metadata.queued_timestamp.CopyFrom(self.__queued_timestamp)
action_metadata.worker_start_timestamp.CopyFrom(self.__worker_start_timestamp) action_metadata.worker_start_timestamp.CopyFrom(self.__worker_start_timestamp)
action_metadata.worker_completed_timestamp.CopyFrom(self.__worker_completed_timestamp) action_metadata.worker_completed_timestamp.CopyFrom(self.__worker_completed_timestamp)
...@@ -204,6 +208,10 @@ class Job: ...@@ -204,6 +208,10 @@ class Job:
self.__queued_timestamp.GetCurrentTime() self.__queued_timestamp.GetCurrentTime()
self._n_tries += 1 self._n_tries += 1
elif self.__operation_metadata.stage == OperationStage.EXECUTING.value:
queue_in, queue_out = self.__queued_timestamp.ToDatetime(), datetime.now()
self.__queued_time_duration.FromTimedelta(queue_out - queue_in)
elif self.__operation_metadata.stage == OperationStage.COMPLETED.value: elif self.__operation_metadata.stage == OperationStage.COMPLETED.value:
if self.__execute_response is not None: if self.__execute_response is not None:
self._operation.response.Pack(self.__execute_response) self._operation.response.Pack(self.__execute_response)
...@@ -213,3 +221,11 @@ class Job: ...@@ -213,3 +221,11 @@ class Job:
for queue in self._operation_update_queues: for queue in self._operation_update_queues:
queue.put(self._operation) queue.put(self._operation)
# --- Public API: Monitoring ---
def query_queue_time(self):
return self.__queued_time_duration.ToTimedelta()
def query_n_retries(self):
return self._n_tries - 1 if self._n_tries > 0 else 0
...@@ -32,6 +32,10 @@ class OperationsInstance: ...@@ -32,6 +32,10 @@ class OperationsInstance:
self._scheduler = scheduler self._scheduler = scheduler
@property
def scheduler(self):
return self._scheduler
def register_instance_with_server(self, instance_name, server): def register_instance_with_server(self, instance_name, server):
server.add_operations_instance(self, instance_name) server.add_operations_instance(self, instance_name)
......
...@@ -38,8 +38,18 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): ...@@ -38,8 +38,18 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
operations_pb2_grpc.add_OperationsServicer_to_server(self, server) operations_pb2_grpc.add_OperationsServicer_to_server(self, server)
def add_instance(self, name, instance): # --- Public API ---
self._instances[name] = instance
def add_instance(self, instance_name, instance):
"""Registers a new servicer instance.
Args:
instance_name (str): The new instance's name.
instance (OperationsInstance): The new instance itself.
"""
self._instances[instance_name] = instance
# --- Public API: Servicer ---
def GetOperation(self, request, context): def GetOperation(self, request, context):
self.__logger.debug("GetOperation request from [%s]", context.peer()) self.__logger.debug("GetOperation request from [%s]", context.peer())
...@@ -132,6 +142,8 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): ...@@ -132,6 +142,8 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
return Empty() return Empty()
# --- Private API ---
def _parse_instance_name(self, name): def _parse_instance_name(self, name):
""" If the instance name is not blank, 'name' will have the form """ If the instance name is not blank, 'name' will have the form
{instance_name}/{operation_uuid}. Otherwise, it will just be {instance_name}/{operation_uuid}. Otherwise, it will just be
......
...@@ -20,24 +20,38 @@ Schedules jobs. ...@@ -20,24 +20,38 @@ Schedules jobs.
""" """
from collections import deque from collections import deque
from datetime import timedelta
import logging import logging
from buildgrid._enums import LeaseState, OperationStage
from buildgrid._exceptions import NotFoundError from buildgrid._exceptions import NotFoundError
from .job import OperationStage, LeaseState
class Scheduler: class Scheduler:
MAX_N_TRIES = 5 MAX_N_TRIES = 5
def __init__(self, action_cache=None): def __init__(self, action_cache=None, monitor=True):
self.__logger = logging.getLogger(__name__) self.__logger = logging.getLogger(__name__)
self.__queue_times_by_priority = None
self.__queue_time_average = None
self.__retries_by_error = None
self.__retries_count = 0
self._action_cache = action_cache self._action_cache = action_cache
self.jobs = {} self.jobs = {}
self.queue = deque() self.queue = deque()
self._is_instrumented = monitor
if self._is_instrumented:
self.__queue_time_average = 0, timedelta()
self.__queue_times_by_priority = {}
self.__retries_by_error = {}
# --- Public API ---
def register_client(self, job_name, queue): def register_client(self, job_name, queue):
self.jobs[job_name].register_client(queue) self.jobs[job_name].register_client(queue)
...@@ -66,18 +80,22 @@ class Scheduler: ...@@ -66,18 +80,22 @@ class Scheduler:
operation_stage = OperationStage.QUEUED operation_stage = OperationStage.QUEUED
self.queue.append(job) self.queue.append(job)
job.update_operation_stage(operation_stage) self._update_job_operation_stage(job.name, operation_stage)
def retry_job(self, job_name): def retry_job(self, job_name):
if job_name in self.jobs: job = self.jobs[job_name]
job = self.jobs[job_name]
if job.n_tries >= self.MAX_N_TRIES: operation_stage = None
# TODO: Decide what to do with these jobs if job.n_tries >= self.MAX_N_TRIES:
job.update_operation_stage(OperationStage.COMPLETED) # TODO: Decide what to do with these jobs
# TODO: Mark these jobs as done operation_stage = OperationStage.COMPLETED
else: # TODO: Mark these jobs as done
job.update_operation_stage(OperationStage.QUEUED)
self.queue.appendleft(job) else:
operation_stage = OperationStage.QUEUED
self.queue.appendleft(job)
self._update_job_operation_stage(job_name, operation_stage)
def list_jobs(self): def list_jobs(self):
return self.jobs.values() return self.jobs.values()
...@@ -112,13 +130,14 @@ class Scheduler: ...@@ -112,13 +130,14 @@ class Scheduler:
""" """
job = self.jobs[job_name] job = self.jobs[job_name]
operation_stage = None
if lease_state == LeaseState.PENDING: if lease_state == LeaseState.PENDING:
job.update_lease_state(LeaseState.PENDING) job.update_lease_state(LeaseState.PENDING)
job.update_operation_stage(OperationStage.QUEUED) operation_stage = OperationStage.QUEUED
elif lease_state == LeaseState.ACTIVE: elif lease_state == LeaseState.ACTIVE:
job.update_lease_state(LeaseState.ACTIVE) job.update_lease_state(LeaseState.ACTIVE)
job.update_operation_stage(OperationStage.EXECUTING) operation_stage = OperationStage.EXECUTING
elif lease_state == LeaseState.COMPLETED: elif lease_state == LeaseState.COMPLETED:
job.update_lease_state(LeaseState.COMPLETED, job.update_lease_state(LeaseState.COMPLETED,
...@@ -127,7 +146,9 @@ class Scheduler: ...@@ -127,7 +146,9 @@ class Scheduler:
if self._action_cache is not None and not job.do_not_cache: if self._action_cache is not None and not job.do_not_cache:
self._action_cache.update_action_result(job.action_digest, job.action_result) self._action_cache.update_action_result(job.action_digest, job.action_result)
job.update_operation_stage(OperationStage.COMPLETED) operation_stage = OperationStage.COMPLETED
self._update_job_operation_stage(job_name, operation_stage)
def get_job_lease(self, job_name): def get_job_lease(self, job_name):
"""Returns the lease associated to job, if any have been emitted yet.""" """Returns the lease associated to job, if any have been emitted yet."""
...@@ -136,3 +157,83 @@ class Scheduler: ...@@ -136,3 +157,83 @@ class Scheduler:
def get_job_operation(self, job_name): def get_job_operation(self, job_name):
"""Returns the operation associated to job.""" """Returns the operation associated to job."""
return self.jobs[job_name].operation return self.jobs[job_name].operation
# --- Public API: Monitoring ---
@property
def is_instrumented(self):
return self._is_instrumented
def query_n_jobs(self):
return len(self.jobs)
def query_n_operations(self):
return len(self.jobs)
def query_n_operations_by_stage(self):
return len(self.jobs)
def query_n_leases(self):
return len(self.jobs)
def query_n_leases_by_state(self):
return len(self.jobs)
def query_n_retries(self):
return self.__retries_count
def query_n_retries_for_error(self, error_type):
try:
if self.__retries_by_error is not None:
return self.__retries_by_error[error_type]
except KeyError:
pass
return 0
def query_am_queue_time(self):
if self.__queue_time_average is not None:
return self.__queue_time_average[1]
return 0
def query_am_queue_time_for_priority(self, priority_level):
try:
if self.__queue_times_by_priority is not None:
return self.__queue_times_by_priority[priority_level]
except KeyError:
pass
return 0
# --- Private API ---
def _update_job_operation_stage(self, job_name, operation_stage):
"""Requests a stage transition for the job's :class:Operations.
Args:
job_name (str): name of the job to query.
operation_stage (OperationStage): the stage to transition to.
"""
job = self.jobs[job_name]
if operation_stage == OperationStage.CACHE_CHECK:
job.update_operation_stage(OperationStage.CACHE_CHECK)
elif operation_stage == OperationStage.QUEUED:
job.update_operation_stage(OperationStage.QUEUED)
elif operation_stage == OperationStage.EXECUTING:
job.update_operation_stage(OperationStage.EXECUTING)
elif operation_stage == OperationStage.COMPLETED:
job.update_operation_stage(OperationStage.COMPLETED)
if self._is_instrumented:
average_order, average_time = self.__queue_time_average
average_order += 1
if average_order <= 1:
average_time = job.query_queue_time()
else:
queue_time = job.query_queue_time()
average_time = average_time + ((queue_time - average_time) / average_order)
self.__queue_time_average = average_order, average_time
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib import hashlib
# The hash function that CAS uses # Hash function used for computing digests:
HASH = hashlib.sha256 HASH = hashlib.sha256
# Lenght in bytes of a hash string returned by HASH:
HASH_LENGTH = HASH().digest_size * 2 HASH_LENGTH = HASH().digest_size * 2
# Period, in seconds, for the monitoring cycle:
MONITORING_PERIOD = 5.0
...@@ -112,13 +112,15 @@ setup( ...@@ -112,13 +112,15 @@ setup(
license="Apache License, Version 2.0", license="Apache License, Version 2.0",
description="A remote execution service", description="A remote execution service",
packages=find_packages(), packages=find_packages(),
python_requires='>= 3.5.3', # janus requirement
install_requires=[ install_requires=[
'protobuf',
'grpcio',
'Click',
'PyYAML',
'boto3 < 1.8.0', 'boto3 < 1.8.0',
'botocore < 1.11.0', 'botocore < 1.11.0',
'click',
'grpcio',
'janus',
'protobuf',
'pyyaml',
], ],
entry_points={ entry_points={
'console_scripts': [ 'console_scripts': [
......