Skip to content
Snippets Groups Projects
Commit 24f961a3 authored by Martin Blanchard's avatar Martin Blanchard
Browse files

server/instance.py: Run a monitoring task periodically

parent 0198a0c1
No related branches found
No related tags found
Loading
Pipeline #36701207 canceled
......@@ -15,12 +15,15 @@
import asyncio
from concurrent import futures
from datetime import timedelta
import logging
import os
import time
import grpc
from buildgrid._enums import MetricRecordDomain, MetricRecordType
from buildgrid._protos.buildgrid.v2 import monitoring_pb2
from buildgrid.server.actioncache.service import ActionCacheService
from buildgrid.server.bots.service import BotsService
from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
......@@ -28,6 +31,7 @@ from buildgrid.server.execution.service import ExecutionService
from buildgrid.server._monitoring import MonitoringBus
from buildgrid.server.operations.service import OperationsService
from buildgrid.server.referencestorage.service import ReferenceStorageService
from buildgrid.settings import MONITORING_PERIOD
class BuildGridServer:
......@@ -55,6 +59,8 @@ class BuildGridServer:
self.__main_loop = asyncio.get_event_loop()
self.__monitoring_bus = None
self.__state_monitoring_task = None
self._execution_service = None
self._bots_service = None
self._operations_service = None
......@@ -64,6 +70,7 @@ class BuildGridServer:
self._bytestream_service = None
self._is_monitored = monitor
self._instances = set()
if self._is_monitored:
self.__monitoring_bus = MonitoringBus(self.__main_loop)
......@@ -77,6 +84,11 @@ class BuildGridServer:
if self._is_monitored:
self.__monitoring_bus.start()
self.__state_monitoring_task = asyncio.ensure_future(
self._state_monitoring_worker(period=MONITORING_PERIOD),
loop=self.__main_loop)
self.__main_loop.run_forever()
def stop(self, grace=0):
......@@ -85,6 +97,9 @@ class BuildGridServer:
Args:
grace (int, optional): A duration of time in seconds. Defaults to 0.
"""
if self.__state_monitoring_task is not None:
self.__state_monitoring_task.cancel()
if self._is_monitored:
self.__monitoring_bus.stop()
......@@ -122,9 +137,10 @@ class BuildGridServer:
"""
if self._execution_service is None:
self._execution_service = ExecutionService(self.__grpc_server)
self._execution_service.add_instance(instance_name, instance)
self._instances.add(instance_name)
def add_bots_interface(self, instance, instance_name):
"""Adds a :obj:`BotsInterface` to the service.
......@@ -136,9 +152,10 @@ class BuildGridServer:
"""
if self._bots_service is None:
self._bots_service = BotsService(self.__grpc_server)
self._bots_service.add_instance(instance_name, instance)
self._instances.add(instance_name)
def add_operations_instance(self, instance, instance_name):
"""Adds an :obj:`OperationsInstance` to the service.
......@@ -150,7 +167,6 @@ class BuildGridServer:
"""
if self._operations_service is None:
self._operations_service = OperationsService(self.__grpc_server)
self._operations_service.add_instance(instance_name, instance)
def add_reference_storage_instance(self, instance, instance_name):
......@@ -164,7 +180,6 @@ class BuildGridServer:
"""
if self._reference_storage_service is None:
self._reference_storage_service = ReferenceStorageService(self.__grpc_server)
self._reference_storage_service.add_instance(instance_name, instance)
def add_action_cache_instance(self, instance, instance_name):
......@@ -178,7 +193,6 @@ class BuildGridServer:
"""
if self._action_cache_service is None:
self._action_cache_service = ActionCacheService(self.__grpc_server)
self._action_cache_service.add_instance(instance_name, instance)
def add_cas_instance(self, instance, instance_name):
......@@ -192,7 +206,6 @@ class BuildGridServer:
"""
if self._cas_service is None:
self._cas_service = ContentAddressableStorageService(self.__grpc_server)
self._cas_service.add_instance(instance_name, instance)
def add_bytestream_instance(self, instance, instance_name):
......@@ -206,7 +219,6 @@ class BuildGridServer:
"""
if self._bytestream_service is None:
self._bytestream_service = ByteStreamService(self.__grpc_server)
self._bytestream_service.add_instance(instance_name, instance)
# --- Public API: Monitoring ---
......@@ -214,3 +226,147 @@ class BuildGridServer:
@property
def is_monitored(self):
return self._is_monitored
# --- Private API ---
async def _state_monitoring_worker(self, period=1.0):
"""Periodically publishes state metrics to the monitoring bus."""
async def __state_monitoring_worker():
# Emit total clients count record:
_, record = self._query_n_clients()
await self.__monitoring_bus.send_record(record)
# Emit total bots count record:
_, record = self._query_n_bots()
await self.__monitoring_bus.send_record(record)
queue_times = []
# Emits records by instance:
for instance_name in self._instances:
# Emit instance clients count record:
_, record = self._query_n_clients_for_instance(instance_name)
await self.__monitoring_bus.send_record(record)
# Emit instance bots count record:
_, record = self._query_n_bots_for_instance(instance_name)
await self.__monitoring_bus.send_record(record)
# Emit instance average queue time record:
queue_time, record = self._query_am_queue_time_for_instance(instance_name)
await self.__monitoring_bus.send_record(record)
if queue_time:
queue_times.append(queue_time)
# Emit overall average queue time record:
record = monitoring_pb2.MetricRecord()
if len(queue_times) > 0:
am_queue_time = sum(queue_times, timedelta()) / len(queue_times)
else:
am_queue_time = timedelta()
record.creation_timestamp.GetCurrentTime()
record.domain = MetricRecordDomain.STATE.value
record.type = MetricRecordType.TIMER.value
record.name = 'average-queue-time'
record.duration.FromTimedelta(am_queue_time)
await self.__monitoring_bus.send_record(record)
print('---')
n_clients = self._execution_service.query_n_clients()
n_bots = self._bots_service.query_n_bots()
print('Totals: n_clients={}, n_bots={}, am_queue_time={}'
.format(n_clients, n_bots, am_queue_time))
print('Per instances:')
for instance_name in self._instances:
n_clients = self._execution_service.query_n_clients_for_instance(instance_name)
n_bots = self._bots_service.query_n_bots_for_instance(instance_name)
am_queue_time = self._execution_service.get_scheduler(instance_name).query_am_queue_time()
instance_name = instance_name or 'void'
print(' - {}: n_clients={}, n_bots={}, am_queue_time={}'
.format(instance_name, n_clients, n_bots, am_queue_time))
print('---')
try:
while True:
start = time.time()
await __state_monitoring_worker()
end = time.time()
await asyncio.sleep(period - (end - start))
except asyncio.CancelledError:
pass
except BaseException as e:
print(f'__state_monitoring_worker: {e}')
# --- Private API: Monitoring ---
def _query_n_clients(self):
"""Queries the number of clients connected."""
record = monitoring_pb2.MetricRecord()
n_clients = self._execution_service.query_n_clients()
record.creation_timestamp.GetCurrentTime()
record.domain = MetricRecordDomain.STATE.value
record.type = MetricRecordType.COUNTER.value
record.name = 'clients-count'
record.count = n_clients
return n_clients, record
def _query_n_clients_for_instance(self, instance_name):
"""Queries the number of clients connected for a given instance"""
record = monitoring_pb2.MetricRecord()
n_clients = self._execution_service.query_n_clients_for_instance(instance_name)
record.creation_timestamp.GetCurrentTime()
record.domain = MetricRecordDomain.STATE.value
record.type = MetricRecordType.COUNTER.value
record.name = 'clients-count'
record.count = n_clients
record.extra['instance-name'] = instance_name or 'void'
return n_clients, record
def _query_n_bots(self):
"""Queries the number of bots connected."""
record = monitoring_pb2.MetricRecord()
n_bots = self._bots_service.query_n_bots()
record.creation_timestamp.GetCurrentTime()
record.domain = MetricRecordDomain.STATE.value
record.type = MetricRecordType.COUNTER.value
record.name = 'bots-count'
record.count = n_bots
return n_bots, record
def _query_n_bots_for_instance(self, instance_name):
"""Queries the number of bots connected for a given instance."""
record = monitoring_pb2.MetricRecord()
n_bots = self._bots_service.query_n_bots_for_instance(instance_name)
record.creation_timestamp.GetCurrentTime()
record.domain = MetricRecordDomain.STATE.value
record.type = MetricRecordType.COUNTER.value
record.name = 'bots-count'
record.count = n_bots
record.extra['instance-name'] = instance_name or 'void'
return n_bots, record
def _query_am_queue_time_for_instance(self, instance_name):
"""Queries the average job's queue time for a given instance."""
record = monitoring_pb2.MetricRecord()
instance_scheduler = self._execution_service.get_scheduler(instance_name)
am_queue_time = instance_scheduler.query_am_queue_time()
record.creation_timestamp.GetCurrentTime()
record.domain = MetricRecordDomain.STATE.value
record.type = MetricRecordType.TIMER.value
record.name = 'average-queue-time'
record.duration.FromTimedelta(am_queue_time)
record.extra['instance-name'] = instance_name or 'void'
return am_queue_time, record
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment