Skip to content
Snippets Groups Projects
Commit f324bbdd authored by Raoul Hidalgo Charman's avatar Raoul Hidalgo Charman
Browse files

Bot: Reconnects and timeouts on the bot side

parent e4fb8842
No related branches found
No related tags found
Loading
Pipeline #35857820 passed
......@@ -33,7 +33,6 @@ from buildgrid.bot.bot_session import BotSession, Device, Worker
from ..bots import buildbox, dummy, host
from ..cli import pass_context
from ...settings import INTERVAL_BUFFER
@click.group(name='bot', short_help="Create and register bot clients.")
......@@ -53,6 +52,7 @@ from ...settings import INTERVAL_BUFFER
help="Public CAS client certificate for TLS (PEM-encoded)")
@click.option('--cas-server-cert', type=click.Path(exists=True, dir_okay=False), default=None,
help="Public CAS server certificate for TLS (PEM-encoded)")
# TODO change default to 30
@click.option('--update-period', type=click.FLOAT, default=30, show_default=True,
help="Time period for bot updates to the server in seconds.")
@click.option('--parent', type=click.STRING, default='main', show_default=True,
......
......@@ -37,20 +37,11 @@ class Bot:
def session(self, work, context):
loop = asyncio.get_event_loop()
self._bot_session.create_bot_session(work, context)
try:
task = asyncio.ensure_future(self._update_bot_session())
task = asyncio.ensure_future(self._bot_session.run(work, context))
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
task.cancel()
loop.close()
async def _update_bot_session(self):
"""
Calls the server periodically to inform the server the client has not died.
"""
while True:
self._bot_session.update_bot_session()
......@@ -21,8 +21,10 @@ Interface to grpc
"""
import logging
import grpc
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, bots_pb2_grpc
from ..settings import INTERVAL_BUFFER
class BotInterface:
......@@ -34,15 +36,23 @@ class BotInterface:
self.logger = logging.getLogger(__name__)
self.logger.info(channel)
self._stub = bots_pb2_grpc.BotsStub(channel)
self._interval = interval
self.interval = interval
def create_bot_session(self, parent, bot_session):
request = bots_pb2.CreateBotSessionRequest(parent=parent,
bot_session=bot_session)
return self._stub.CreateBotSession(request)
return self._bot_call(self._stub.CreateBotSession, request)
def update_bot_session(self, bot_session, update_mask=None):
request = bots_pb2.UpdateBotSessionRequest(name=bot_session.name,
bot_session=bot_session,
update_mask=update_mask)
return self._stub.UpdateBotSession(request, timeout=self._interval)
return self._bot_call(self._stub.UpdateBotSession, request)
def _bot_call(self, call, request):
try:
return call(request, timeout=self.interval + INTERVAL_BUFFER)
except grpc.RpcError as e:
if e.code() in grpc.StatusCode:
self.logger.warning("Server responded with error: {}".format(e.code()))
return None
......@@ -49,7 +49,9 @@ class BotSession:
self._bot_id = '{}.{}'.format(parent, platform.node())
self._context = None
self._interface = interface
self.connected = False
self._leases = {}
self._futures = {}
self._name = None
self._parent = parent
self._status = BotStatus.OK.value
......@@ -63,12 +65,35 @@ class BotSession:
def add_worker(self, worker):
self._worker = worker
async def run(self, work, context=None):
"""
Run a bot session that waits on bot session calls and reconnects if
there is no response
"""
self.logger.info("Starting bot session runner")
while True:
if self.connected is False:
self.create_bot_session(work, context)
else:
self.update_bot_session()
if self._futures:
await asyncio.wait(self._futures.values(),
timeout=self._interface.interval,
return_when=asyncio.FIRST_COMPLETED)
elif self.connected is False:
await asyncio.sleep(self._interface.interval)
def create_bot_session(self, work, context=None):
self.logger.debug("Creating bot session")
self._work = work
self._context = context
session = self._interface.create_bot_session(self._parent, self.get_pb2())
if session is None:
self.connected = False
return
self.connected = True
self._name = session.name
self.logger.info("Created bot session with name: [{}]".format(self._name))
......@@ -79,6 +104,10 @@ class BotSession:
def update_bot_session(self):
self.logger.debug("Updating bot session: [{}]".format(self._bot_id))
session = self._interface.update_bot_session(self.get_pb2())
if session is None:
self.connected = False
return
self.connected = True
for k, v in list(self._leases.items()):
if v.state == LeaseState.COMPLETED.value:
del self._leases[k]
......@@ -100,6 +129,7 @@ class BotSession:
def lease_completed(self, lease):
lease.state = LeaseState.COMPLETED.value
self._leases[lease.id] = lease
del self._futures[lease.id]
def _update_lease_from_server(self, lease):
"""
......@@ -110,7 +140,7 @@ class BotSession:
lease.state = LeaseState.ACTIVE.value
self._leases[lease.id] = lease
self.update_bot_session()
asyncio.ensure_future(self.create_work(lease))
self._futures[lease.id] = asyncio.ensure_future(self.create_work(lease))
async def create_work(self, lease):
self.logger.debug("Work created: [{}]".format(lease.id))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment