Skip to content
Snippets Groups Projects
Commit 98c0f44e authored by Raoul Hidalgo Charman's avatar Raoul Hidalgo Charman
Browse files

Smarter bot calls

Use grpc time outs and ability to wait on jobs to have the server hang pending
on a job coming through. This reduces the need for constant update requests
while still responding to new jobs quickly.

This is part of #118 and #126
parent 143b32d3
No related branches found
No related tags found
No related merge requests found
......@@ -31,7 +31,7 @@ from buildgrid.bot import bot, interface, session
from buildgrid.bot.hardware.interface import HardwareInterface
from buildgrid.bot.hardware.device import Device
from buildgrid.bot.hardware.worker import Worker
from buildgrid.settings import INTERVAL_BUFFER
from ..bots import buildbox, dummy, host
from ..cli import pass_context
......@@ -54,7 +54,7 @@ from ..cli import pass_context
help="Public CAS client certificate for TLS (PEM-encoded)")
@click.option('--cas-server-cert', type=click.Path(exists=True, dir_okay=False), default=None,
help="Public CAS server certificate for TLS (PEM-encoded)")
@click.option('--update-period', type=click.FLOAT, default=0.5, show_default=True,
@click.option('--update-period', type=click.FLOAT, default=30, show_default=True,
help="Time period for bot updates to the server in seconds.")
@click.option('--parent', type=click.STRING, default='main', show_default=True,
help="Targeted farm resource.")
......@@ -66,7 +66,6 @@ def cli(context, parent, update_period, remote, client_key, client_cert, server_
context.remote = '{}:{}'.format(url.hostname, url.port or 50051)
context.remote_url = remote
context.update_period = update_period
context.parent = parent
if url.scheme == 'http':
......@@ -124,7 +123,7 @@ def cli(context, parent, update_period, remote, client_key, client_cert, server_
click.echo("Starting for remote=[{}]".format(context.remote))
bot_interface = interface.BotInterface(context.channel)
bot_interface = interface.BotInterface(context.channel, update_period + INTERVAL_BUFFER)
worker = Worker()
worker.add_device(Device())
hardware_interface = HardwareInterface(worker)
......@@ -142,7 +141,7 @@ def run_dummy(context):
try:
bot_session = session.BotSession(context.parent, context.bot_interface, context.hardware_interface,
dummy.work_dummy, context)
b = bot.Bot(bot_session, context.update_period)
b = bot.Bot(bot_session)
b.session()
except KeyboardInterrupt:
pass
......@@ -158,7 +157,7 @@ def run_host_tools(context):
try:
bot_session = session.BotSession(context.parent, context.bot_interface, context.hardware_interface,
host.work_host_tools, context)
b = bot.Bot(bot_session, context.update_period)
b = bot.Bot(bot_session)
b.session()
except KeyboardInterrupt:
pass
......@@ -180,7 +179,7 @@ def run_buildbox(context, local_cas, fuse_dir):
try:
bot_session = session.BotSession(context.parent, context.bot_interface, context.hardware_interface,
buildbox.work_buildbox, context)
b = bot.Bot(bot_session, context.update_period)
b = bot.Bot(bot_session)
b.session()
except KeyboardInterrupt:
pass
......@@ -20,14 +20,12 @@ import logging
class Bot:
"""Creates a local BotSession."""
def __init__(self, bot_session, update_period=1):
def __init__(self, bot_session):
"""
"""
self.__logger = logging.getLogger(__name__)
self.__bot_session = bot_session
self.__update_period = update_period
self.__loop = None
def session(self):
......@@ -51,7 +49,6 @@ class Bot:
try:
while True:
self.__bot_session.update_bot_session()
await asyncio.sleep(self.__update_period)
except asyncio.CancelledError:
pass
......
......@@ -31,10 +31,16 @@ class BotInterface:
Interface handles calls to the server.
"""
def __init__(self, channel):
def __init__(self, channel, interval):
self.__logger = logging.getLogger(__name__)
self.__logger.info(channel)
self._stub = bots_pb2_grpc.BotsStub(channel)
self.__interval = interval
@property
def interval(self):
return self.__interval
def create_bot_session(self, parent, bot_session):
request = bots_pb2.CreateBotSessionRequest(parent=parent,
......@@ -51,7 +57,7 @@ class BotInterface:
bot_session=bot_session,
update_mask=update_mask)
try:
return self._stub.UpdateBotSession(request)
return self._stub.UpdateBotSession(request, timeout=self.interval)
except grpc.RpcError as e:
self.__logger.error(e)
......
......@@ -24,6 +24,7 @@ import logging
import uuid
from buildgrid._exceptions import InvalidArgumentError
from buildgrid.settings import INTERVAL_BUFFER
from ..job import LeaseState
......@@ -75,7 +76,7 @@ class BotsInterface:
self._request_leases(bot_session)
return bot_session
def update_bot_session(self, name, bot_session):
def update_bot_session(self, name, bot_session, deadline=None):
""" Client updates the server. Any changes in state to the Lease should be
registered server side. Assigns available leases with work.
"""
......@@ -93,14 +94,15 @@ class BotsInterface:
pass
lease.Clear()
self._request_leases(bot_session)
self._request_leases(bot_session, deadline)
return bot_session
def _request_leases(self, bot_session):
def _request_leases(self, bot_session, deadline=None):
# TODO: Send worker capabilities to the scheduler!
# Only send one lease at a time currently.
if not bot_session.leases:
leases = self._scheduler.request_job_leases({})
leases = self._scheduler.request_job_leases(
{}, timeout=deadline - INTERVAL_BUFFER if deadline else None)
if leases:
for lease in leases:
self._assigned_leases[bot_session.name].add(lease.id)
......
......@@ -138,8 +138,10 @@ class BotsService(bots_pb2_grpc.BotsServicer):
instance_name = '/'.join(names[:-1])
instance = self._get_instance(instance_name)
bot_session = instance.update_bot_session(request.name,
request.bot_session)
bot_session = instance.update_bot_session(
request.name,
request.bot_session,
deadline=context.time_remaining())
if self._is_instrumented:
self.__bots[bot_id].GetCurrentTime()
......
......@@ -24,3 +24,6 @@ HASH_LENGTH = HASH().digest_size * 2
# Period, in seconds, for the monitoring cycle:
MONITORING_PERIOD = 5.0
# time in seconds to pad timeouts
INTERVAL_BUFFER = 5
......@@ -30,6 +30,7 @@ from ..utils.utils import run_in_subprocess
from ..utils.bots_interface import serve_bots_interface
TIMEOUT = 5
INSTANCES = ['', 'instance']
......@@ -48,7 +49,7 @@ class ServerInterface:
bot_session = bots_pb2.BotSession()
bot_session.ParseFromString(string_bot_session)
interface = BotInterface(grpc.insecure_channel(remote))
interface = BotInterface(grpc.insecure_channel(remote), TIMEOUT)
result = interface.create_bot_session(parent, bot_session)
queue.put(result.SerializeToString())
......@@ -67,7 +68,7 @@ class ServerInterface:
bot_session = bots_pb2.BotSession()
bot_session.ParseFromString(string_bot_session)
interface = BotInterface(grpc.insecure_channel(remote))
interface = BotInterface(grpc.insecure_channel(remote), TIMEOUT)
result = interface.update_bot_session(bot_session, update_mask)
queue.put(result.SerializeToString())
......
......@@ -38,7 +38,9 @@ server = mock.create_autospec(grpc.server)
# GRPC context
@pytest.fixture
def context():
yield mock.MagicMock(spec=_Context)
context_mock = mock.MagicMock(spec=_Context)
context_mock.time_remaining.return_value = None
yield context_mock
@pytest.fixture
......
......@@ -123,7 +123,7 @@ class BotsInterface:
self.__bot_session_queue.put(bot_session.SerializeToString())
return bot_session
def update_bot_session(self, name, bot_session):
def update_bot_session(self, name, bot_session, deadline=None):
for lease in bot_session.leases:
state = LeaseState(lease.state)
if state == LeaseState.COMPLETED:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment