Loading buildgrid/bot/session.py +49 −63 Original line number Diff line number Diff line Loading @@ -12,9 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # Disable broad exception catch # pylint: disable=broad-except """ Bot Session Loading @@ -22,21 +19,20 @@ Bot Session Allows connections """ import asyncio import logging import platform import uuid import grpc from buildgrid._enums import BotStatus, LeaseState from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2 from buildgrid._protos.google.rpc import code_pb2 from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, worker_pb2 from buildgrid._exceptions import BotError from buildgrid._exceptions import FailedPreconditionError from .tenantmanager import TenantManager class BotSession: def __init__(self, parent, interface, worker): def __init__(self, parent, bots_interface, hardware_interface, work, context=None): """ Unique bot ID within the farm used to identify this bot Needs to be human readable. All prior sessions with bot_id of same ID are invalidated. Loading @@ -45,88 +41,78 @@ class BotSession: """ self.__logger = logging.getLogger(__name__) self._context = None self._bots_interface = bots_interface self._hardware_interface = hardware_interface self._worker = worker self._interface = interface self._leases = {} self._parent = parent self._status = BotStatus.OK.value self._tenant_manager = TenantManager() self.__parent = parent self.__bot_id = '{}.{}'.format(parent, platform.node()) self.__name = None self._work = work self._context = context @property def bot_id(self): return self.__bot_id def create_bot_session(self, work, context=None): def create_bot_session(self): self.__logger.debug("Creating bot session") self._work = work self._context = context session = self._interface.create_bot_session(self._parent, self.get_pb2()) session = self._bots_interface.create_bot_session(self.__parent, self.get_pb2()) self.__name = session.name self.__logger.info("Created bot session with name: [%s]", self._name) self.__logger.info("Created bot session with name: [%s]", self.__name) for lease in session.leases: self._update_lease_from_server(lease) self._register_lease(lease) def update_bot_session(self): self.__logger.debug("Updating bot session: [%s]", self._bot_id) session = self._interface.update_bot_session(self.get_pb2()) for k, v in list(self._leases.items()): if v.state == LeaseState.COMPLETED.value: del self._leases[k] self.__logger.debug("Updating bot session: [%s]", self.__bot_id) session = self._bots_interface.update_bot_session(self.get_pb2()) server_ids = [] for lease in session.leases: self._update_lease_from_server(lease) server_ids.append(lease.id) def get_pb2(self): leases = list(self._leases.values()) if not leases: leases = None lease_state = LeaseState(lease.state) if lease_state == LeaseState.PENDING: self._register_lease(lease) return bots_pb2.BotSession(worker=self._worker.get_pb2(), status=self._status, leases=leases, bot_id=self.__bot_id, name=self.__name) elif lease_state == LeaseState.CANCELLED: self._tenant_manager.cancel_tenancy(lease.id) def lease_completed(self, lease): lease.state = LeaseState.COMPLETED.value self._leases[lease.id] = lease closed_lease_ids = [x for x in self._tenant_manager.get_lease_ids() if x not in server_ids] def _update_lease_from_server(self, lease): """ State machine for any recieved updates to the leases. """ # TODO: Compare with previous state of lease if lease.state == LeaseState.PENDING.value: lease.state = LeaseState.ACTIVE.value self._leases[lease.id] = lease self.update_bot_session() asyncio.ensure_future(self.create_work(lease)) for lease_id in closed_lease_ids: self._tenant_manager.cancel_tenancy(lease_id) self._tenant_manager.remove_tenant(lease_id) async def create_work(self, lease): self.__logger.debug("Work created: [%s]", lease.id) loop = asyncio.get_event_loop() def get_pb2(self): return bots_pb2.BotSession(worker=self._hardware_interface.get_worker_pb2(), status=self._status, leases=self._tenant_manager.get_leases(), bot_id=self.__bot_id, name=self.__name) def _register_lease(self, lease): lease_id = lease.id try: lease = await loop.run_in_executor(None, self._work, self._context, lease) self._tenant_manager.create_tenancy(lease) except grpc.RpcError as e: except KeyError as e: self.__logger.error(e) lease.status.CopyFrom(e.code()) except BotError as e: self.__logger.error(e) lease.status.code = code_pb2.INTERNAL else: try: self._hardware_interface.configure_hardware(lease.requirements) except Exception as e: except FailedPreconditionError as e: self.__logger.error(e) lease.status.code = code_pb2.INTERNAL self._tenant_manager.complete_lease(lease_id, status=code_pb2.FailedPreconditionError) self.__logger.debug("Work complete: [%s]", lease.id) self.lease_completed(lease) else: self._tenant_manager.create_work(lease_id, self._work, self._context) Loading
buildgrid/bot/session.py +49 −63 Original line number Diff line number Diff line Loading @@ -12,9 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # Disable broad exception catch # pylint: disable=broad-except """ Bot Session Loading @@ -22,21 +19,20 @@ Bot Session Allows connections """ import asyncio import logging import platform import uuid import grpc from buildgrid._enums import BotStatus, LeaseState from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2 from buildgrid._protos.google.rpc import code_pb2 from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, worker_pb2 from buildgrid._exceptions import BotError from buildgrid._exceptions import FailedPreconditionError from .tenantmanager import TenantManager class BotSession: def __init__(self, parent, interface, worker): def __init__(self, parent, bots_interface, hardware_interface, work, context=None): """ Unique bot ID within the farm used to identify this bot Needs to be human readable. All prior sessions with bot_id of same ID are invalidated. Loading @@ -45,88 +41,78 @@ class BotSession: """ self.__logger = logging.getLogger(__name__) self._context = None self._bots_interface = bots_interface self._hardware_interface = hardware_interface self._worker = worker self._interface = interface self._leases = {} self._parent = parent self._status = BotStatus.OK.value self._tenant_manager = TenantManager() self.__parent = parent self.__bot_id = '{}.{}'.format(parent, platform.node()) self.__name = None self._work = work self._context = context @property def bot_id(self): return self.__bot_id def create_bot_session(self, work, context=None): def create_bot_session(self): self.__logger.debug("Creating bot session") self._work = work self._context = context session = self._interface.create_bot_session(self._parent, self.get_pb2()) session = self._bots_interface.create_bot_session(self.__parent, self.get_pb2()) self.__name = session.name self.__logger.info("Created bot session with name: [%s]", self._name) self.__logger.info("Created bot session with name: [%s]", self.__name) for lease in session.leases: self._update_lease_from_server(lease) self._register_lease(lease) def update_bot_session(self): self.__logger.debug("Updating bot session: [%s]", self._bot_id) session = self._interface.update_bot_session(self.get_pb2()) for k, v in list(self._leases.items()): if v.state == LeaseState.COMPLETED.value: del self._leases[k] self.__logger.debug("Updating bot session: [%s]", self.__bot_id) session = self._bots_interface.update_bot_session(self.get_pb2()) server_ids = [] for lease in session.leases: self._update_lease_from_server(lease) server_ids.append(lease.id) def get_pb2(self): leases = list(self._leases.values()) if not leases: leases = None lease_state = LeaseState(lease.state) if lease_state == LeaseState.PENDING: self._register_lease(lease) return bots_pb2.BotSession(worker=self._worker.get_pb2(), status=self._status, leases=leases, bot_id=self.__bot_id, name=self.__name) elif lease_state == LeaseState.CANCELLED: self._tenant_manager.cancel_tenancy(lease.id) def lease_completed(self, lease): lease.state = LeaseState.COMPLETED.value self._leases[lease.id] = lease closed_lease_ids = [x for x in self._tenant_manager.get_lease_ids() if x not in server_ids] def _update_lease_from_server(self, lease): """ State machine for any recieved updates to the leases. """ # TODO: Compare with previous state of lease if lease.state == LeaseState.PENDING.value: lease.state = LeaseState.ACTIVE.value self._leases[lease.id] = lease self.update_bot_session() asyncio.ensure_future(self.create_work(lease)) for lease_id in closed_lease_ids: self._tenant_manager.cancel_tenancy(lease_id) self._tenant_manager.remove_tenant(lease_id) async def create_work(self, lease): self.__logger.debug("Work created: [%s]", lease.id) loop = asyncio.get_event_loop() def get_pb2(self): return bots_pb2.BotSession(worker=self._hardware_interface.get_worker_pb2(), status=self._status, leases=self._tenant_manager.get_leases(), bot_id=self.__bot_id, name=self.__name) def _register_lease(self, lease): lease_id = lease.id try: lease = await loop.run_in_executor(None, self._work, self._context, lease) self._tenant_manager.create_tenancy(lease) except grpc.RpcError as e: except KeyError as e: self.__logger.error(e) lease.status.CopyFrom(e.code()) except BotError as e: self.__logger.error(e) lease.status.code = code_pb2.INTERNAL else: try: self._hardware_interface.configure_hardware(lease.requirements) except Exception as e: except FailedPreconditionError as e: self.__logger.error(e) lease.status.code = code_pb2.INTERNAL self._tenant_manager.complete_lease(lease_id, status=code_pb2.FailedPreconditionError) self.__logger.debug("Work complete: [%s]", lease.id) self.lease_completed(lease) else: self._tenant_manager.create_work(lease_id, self._work, self._context)