Skip to content
Snippets Groups Projects
Commit a0ada9f4 authored by Raoul Hidalgo Charman's avatar Raoul Hidalgo Charman
Browse files

bot: add reconnects and waiting on job status

Bot will periodically attempt to reconnect if a controller goes down.
If a job is running it will similar to the server side, wait on the job
completing else respond periodically.

This logic may need changing if/when bots can have multiple leases.

This is part of #118 and #126
parent 0d241b27
No related branches found
No related tags found
1 merge request!113Smarter bot calls
...@@ -35,7 +35,7 @@ class Bot: ...@@ -35,7 +35,7 @@ class Bot:
self.__bot_session.create_bot_session() self.__bot_session.create_bot_session()
try: try:
task = asyncio.ensure_future(self.__update_bot_session()) task = asyncio.ensure_future(self.__bot_session.run())
self.__loop.run_until_complete(task) self.__loop.run_until_complete(task)
except KeyboardInterrupt: except KeyboardInterrupt:
...@@ -44,15 +44,6 @@ class Bot: ...@@ -44,15 +44,6 @@ class Bot:
self.__kill_everyone() self.__kill_everyone()
self.__logger.info("Bot shutdown.") self.__logger.info("Bot shutdown.")
async def __update_bot_session(self):
"""Calls the server periodically to inform the server the client has not died."""
try:
while True:
self.__bot_session.update_bot_session()
except asyncio.CancelledError:
pass
def __kill_everyone(self): def __kill_everyone(self):
"""Cancels and waits for them to stop.""" """Cancels and waits for them to stop."""
self.__logger.info("Cancelling remaining tasks...") self.__logger.info("Cancelling remaining tasks...")
......
...@@ -43,22 +43,21 @@ class BotInterface: ...@@ -43,22 +43,21 @@ class BotInterface:
return self.__interval return self.__interval
def create_bot_session(self, parent, bot_session): def create_bot_session(self, parent, bot_session):
""" Creates a bot session returning a grpc StatusCode if it failed """
request = bots_pb2.CreateBotSessionRequest(parent=parent, request = bots_pb2.CreateBotSessionRequest(parent=parent,
bot_session=bot_session) bot_session=bot_session)
try: return self._bot_call(self._stub.CreateBotSession, request)
return self._stub.CreateBotSession(request)
except grpc.RpcError as e:
self.__logger.error(e)
raise
def update_bot_session(self, bot_session, update_mask=None): def update_bot_session(self, bot_session, update_mask=None):
""" Updates a bot session returning a grpc StatusCode if it failed """
request = bots_pb2.UpdateBotSessionRequest(name=bot_session.name, request = bots_pb2.UpdateBotSessionRequest(name=bot_session.name,
bot_session=bot_session, bot_session=bot_session,
update_mask=update_mask) update_mask=update_mask)
try: return self._bot_call(self._stub.UpdateBotSession, request)
return self._stub.UpdateBotSession(request, timeout=self.interval + NETWORK_TIMEOUT)
def _bot_call(self, call, request):
try:
return call(request, timeout=self.interval + NETWORK_TIMEOUT)
except grpc.RpcError as e: except grpc.RpcError as e:
self.__logger.error(e) self.__logger.error(e.code())
raise return e.code()
...@@ -19,9 +19,12 @@ Bot Session ...@@ -19,9 +19,12 @@ Bot Session
Allows connections Allows connections
""" """
import asyncio
import logging import logging
import platform import platform
from grpc import StatusCode
from buildgrid._enums import BotStatus, LeaseState from buildgrid._enums import BotStatus, LeaseState
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2 from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
from buildgrid._protos.google.rpc import code_pb2 from buildgrid._protos.google.rpc import code_pb2
...@@ -47,6 +50,8 @@ class BotSession: ...@@ -47,6 +50,8 @@ class BotSession:
self._status = BotStatus.OK.value self._status = BotStatus.OK.value
self._tenant_manager = TenantManager() self._tenant_manager = TenantManager()
self.connected = False
self.__parent = parent self.__parent = parent
self.__bot_id = '{}.{}'.format(parent, platform.node()) self.__bot_id = '{}.{}'.format(parent, platform.node())
self.__name = None self.__name = None
...@@ -58,10 +63,33 @@ class BotSession: ...@@ -58,10 +63,33 @@ class BotSession:
def bot_id(self): def bot_id(self):
return self.__bot_id return self.__bot_id
async def run(self):
""" Run a bot session
This connects and reconnects via create bot session and waits on update
bot session calls.
"""
self.__logger.debug("Starting bot session")
interval = self._bots_interface.interval
while True:
if not self.connected:
self.create_bot_session()
else:
self.update_bot_session()
if not self.connected:
await asyncio.sleep(interval)
else:
await self._tenant_manager.wait_on_tenants(interval)
def create_bot_session(self): def create_bot_session(self):
self.__logger.debug("Creating bot session") self.__logger.debug("Creating bot session")
session = self._bots_interface.create_bot_session(self.__parent, self.get_pb2()) session = self._bots_interface.create_bot_session(self.__parent, self.get_pb2())
if session in list(StatusCode):
self.connected = False
return
self.connected = True
self.__name = session.name self.__name = session.name
self.__logger.info("Created bot session with name: [%s]", self.__name) self.__logger.info("Created bot session with name: [%s]", self.__name)
...@@ -73,6 +101,13 @@ class BotSession: ...@@ -73,6 +101,13 @@ class BotSession:
self.__logger.debug("Updating bot session: [%s]", self.__bot_id) self.__logger.debug("Updating bot session: [%s]", self.__bot_id)
session = self._bots_interface.update_bot_session(self.get_pb2()) session = self._bots_interface.update_bot_session(self.get_pb2())
if session == StatusCode.DEADLINE_EXCEEDED:
# try to continue to do update session if it passed the timeout
return
elif session in StatusCode:
self.connected = False
return
self.connected = True
server_ids = [] server_ids = []
for lease in session.leases: for lease in session.leases:
......
...@@ -150,6 +150,13 @@ class TenantManager: ...@@ -150,6 +150,13 @@ class TenantManager:
""" """
return self._tenants[lease_id].tenant_completed return self._tenants[lease_id].tenant_completed
async def wait_on_tenants(self, timeout):
if self._tasks:
tasks = self._tasks.values()
await asyncio.wait(tasks,
timeout=timeout,
return_when=asyncio.FIRST_COMPLETED)
def _update_lease_result(self, lease_id, result): def _update_lease_result(self, lease_id, result):
"""Updates the lease with the result.""" """Updates the lease with the result."""
self._tenants[lease_id].update_lease_result(result) self._tenants[lease_id].update_lease_result(result)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment