Skip to content
Snippets Groups Projects
Commit c21c4824 authored by finn's avatar finn
Browse files

Refactored bots, simplified asyncio

Number of leases is now dictated by server. New helper classes
which create devices and workers.
parent 532529d3
No related branches found
No related tags found
Loading
......@@ -23,163 +23,46 @@ Creates a bot session.
"""
import asyncio
import inspect
import collections
import logging
import platform
import queue
import time
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, worker_pb2
from . import bot_interface
from . import bot_interface, bot_session
from .bot_session import BotStatus, LeaseState
from .._exceptions import BotError
class Bot(object):
class Bot:
"""
Creates a local BotSession.
"""
def __init__(self, work, context, channel, parent, number_of_leases):
if not inspect.iscoroutinefunction(work):
raise BotError("work function must be async")
UPDATE_PERIOD = 1
self.interface = bot_interface.BotInterface(channel)
def __init__(self, bot_session):
self.logger = logging.getLogger(__name__)
self._create_session(parent, number_of_leases)
self._work_queue = queue.Queue(maxsize = number_of_leases)
try:
while True:
## TODO: Leases independently finish
## Allow leases to queue finished work independently instead
## of waiting for all to finish
futures = [self._do_work(work, context, lease) for lease in self._get_work()]
if futures:
loop = asyncio.new_event_loop()
leases_complete, _ = loop.run_until_complete(asyncio.wait(futures))
work_complete = [(lease.result().id, lease.result(),) for lease in leases_complete]
self._work_complete(work_complete)
loop.close()
self._update_bot_session()
time.sleep(2)
except Exception as e:
self.logger.error(e)
raise BotError(e)
@property
def bot_session(self):
## Read only, shouldn't have to set any of the variables in here
return self._bot_session
def close_session(self):
self.logger.warning("Session closing not yet implemented")
async def _do_work(self, work, context, lease):
""" Work is done here, work function should be asynchronous
"""
self.logger.info("Work found: {}".format(lease.id))
lease = await work(context=context, lease=lease)
lease.state = bots_pb2.LeaseState.Value('COMPLETED')
self.logger.info("Work complete: {}".format(lease.id))
return lease
def _update_bot_session(self):
""" Should call the server periodically to inform the server the client
has not died.
"""
self.logger.debug("Updating bot session")
self._bot_session = self.interface.update_bot_session(self._bot_session)
leases_update = ([self._update_lease(lease) for lease in self._bot_session.leases])
del self._bot_session.leases[:]
self._bot_session.leases.extend(leases_update)
def _get_work(self):
while not self._work_queue.empty():
yield self._work_queue.get()
self._work_queue.task_done()
def _work_complete(self, leases_complete):
""" Bot updates itself with any completed work.
"""
# Should really improve this...
# Maybe add some call back function sentoff work...
leases_active = list(filter(self._lease_active, self._bot_session.leases))
leases_not_active = [lease for lease in self._bot_session.leases if not self._lease_active(lease)]
del self._bot_session.leases[:]
for lease in leases_active:
for lease_tuple in leases_complete:
if lease.id == lease_tuple[0]:
leases_not_active.extend([lease_tuple[1]])
self._bot_session.leases.extend(leases_not_active)
def _update_lease(self, lease):
"""
State machine for any recieved updates to the leases.
"""
if self._lease_pending(lease):
lease.state = bots_pb2.LeaseState.Value('ACTIVE')
self._work_queue.put(lease)
return lease
else:
return lease
def _create_session(self, parent, number_of_leases):
self.logger.debug("Creating bot session")
worker = self._create_worker()
self._bot_session = bot_session
""" Unique bot ID within the farm used to identify this bot
Needs to be human readable.
All prior sessions with bot_id of same ID are invalidated.
If a bot attempts to update an invalid session, it must be rejected and
may be put in quarantine.
"""
bot_id = '{}.{}'.format(parent, platform.node())
def session(self, work, context):
loop = asyncio.get_event_loop()
leases = [bots_pb2.Lease() for x in range(number_of_leases)]
self._bot_session.create_bot_session(work, context)
bot_session = bots_pb2.BotSession(worker = worker,
status = bots_pb2.BotStatus.Value('OK'),
leases = leases,
bot_id = bot_id)
self._bot_session = self.interface.create_bot_session(parent, bot_session)
self.logger.info("Name: {}, Id: {}".format(self._bot_session.name,
self._bot_session.bot_id))
def _create_worker(self):
devices = self._create_devices()
# Contains a list of devices and the connections between them.
worker = worker_pb2.Worker(devices = devices)
""" Keys supported:
*pool
"""
worker.Property.key = "pool"
worker.Property.value = "all"
return worker
def _create_devices(self):
""" Creates devices available to the worker
The first device is know as the Primary Device - the revice which
is running a bit and responsible to actually executing commands.
All other devices are known as Attatched Devices and must be controlled
by the Primary Device.
"""
devices = []
for i in range(0, 1): # Append one device for now
dev = worker_pb2.Device()
devices.append(dev)
return devices
def _lease_pending(self, lease):
return lease.state == bots_pb2.LeaseState.Value('PENDING')
def _lease_active(self, lease):
return lease.state == bots_pb2.LeaseState.Value('ACTIVE')
try:
task = asyncio.ensure_future(self._update_bot_session())
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
task.cancel()
loop.close()
async def _update_bot_session(self):
while True:
""" Calls the server periodically to inform the server the client
has not died.
"""
self._bot_session.update_bot_session()
await asyncio.sleep(self.UPDATE_PERIOD)
......@@ -29,7 +29,7 @@ from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, bo
from .._exceptions import BotError
class BotInterface(object):
class BotInterface:
""" Interface handles calls to the server.
"""
......@@ -39,22 +39,12 @@ class BotInterface(object):
self._stub = bots_pb2_grpc.BotsStub(channel)
def create_bot_session(self, parent, bot_session):
try:
request = bots_pb2.CreateBotSessionRequest(parent = parent,
bot_session = bot_session)
return self._stub.CreateBotSession(request)
except Exception as e:
self.logger.error(e)
raise BotError(e)
request = bots_pb2.CreateBotSessionRequest(parent = parent,
bot_session = bot_session)
return self._stub.CreateBotSession(request)
def update_bot_session(self, bot_session, update_mask = None):
try:
request = bots_pb2.UpdateBotSessionRequest(name = bot_session.name,
bot_session = bot_session,
update_mask = update_mask)
return self._stub.UpdateBotSession(request)
except Exception as e:
self.logger.error(e)
raise BotError(e)
request = bots_pb2.UpdateBotSessionRequest(name = bot_session.name,
bot_session = bot_session,
update_mask = update_mask)
return self._stub.UpdateBotSession(request)
# Copyright (C) 2018 Bloomberg LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Bot Session
====
Allows connections
"""
import asyncio
import logging
import platform
import uuid
from enum import Enum
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, worker_pb2
class BotStatus(Enum):
BOT_STATUS_UNSPECIFIED = bots_pb2.BotStatus.Value('BOT_STATUS_UNSPECIFIED')
OK = bots_pb2.BotStatus.Value('OK')
UNHEALTHY = bots_pb2.BotStatus.Value('UNHEALTHY');
HOST_REBOOTING = bots_pb2.BotStatus.Value('HOST_REBOOTING')
BOT_TERMINATING = bots_pb2.BotStatus.Value('BOT_TERMINATING')
class LeaseState(Enum):
LEASE_STATE_UNSPECIFIED = bots_pb2.LeaseState.Value('LEASE_STATE_UNSPECIFIED')
PENDING = bots_pb2.LeaseState.Value('PENDING')
ACTIVE = bots_pb2.LeaseState.Value('ACTIVE')
COMPLETED = bots_pb2.LeaseState.Value('COMPLETED')
CANCELLED = bots_pb2.LeaseState.Value('CANCELLED')
class BotSession:
def __init__(self, parent, interface):
""" Unique bot ID within the farm used to identify this bot
Needs to be human readable.
All prior sessions with bot_id of same ID are invalidated.
If a bot attempts to update an invalid session, it must be rejected and
may be put in quarantine.
"""
self.logger = logging.getLogger(__name__)
self._bot_id = '{}.{}'.format(parent, platform.node())
self._interface = interface
self._leases = {}
self._name = None
self._parent = parent
self._status = BotStatus.OK.value
self._work = None
self._worker = None
@property
def bot_id(self):
return self._bot_id
def add_worker(self, worker):
self._worker = worker
def create_bot_session(self, work, context=None):
self.logger.debug("Creating bot session")
self._work = work
self._context = context
session = self._interface.create_bot_session(self._parent, self.get_pb2())
self._name = session.name
self.logger.info("Created bot session with name: {}".format(self._name))
def update_bot_session(self):
session = self._interface.update_bot_session(self.get_pb2())
for lease in session.leases:
self._update_lease_from_server(lease)
def get_pb2(self):
leases = list(self._leases.values())
if not leases:
leases = None
return bots_pb2.BotSession(worker=self._worker.get_pb2(),
status=self._status,
leases=leases,
bot_id=self._bot_id,
name = self._name)
def lease_completed(self, lease):
lease.state = LeaseState.COMPLETED.value
self._leases[lease.id] = lease
def _update_lease_from_server(self, lease):
"""
State machine for any recieved updates to the leases.
"""
## TODO: Compare with previous state of lease
lease_bot = self._leases.get(lease.id)
if lease.state == LeaseState.PENDING.value:
lease.state = LeaseState.ACTIVE.value
asyncio.ensure_future(self.create_work(lease))
self._leases[lease.id] = lease
elif lease.state == LeaseState.COMPLETED.value and \
lease_bot.state == LeaseState.COMPLETED.value:
del self._leases[lease.id]
async def create_work(self, lease):
self.logger.debug("Work created: {}".format(lease.id))
lease = await self._work(self._context, lease)
self.logger.debug("Work complete: {}".format(lease.id))
self.lease_completed(lease)
class Worker:
def __init__(self, properties=None, configs=None):
self.properties = {}
self._configs = {}
self._devices = []
if properties:
for k, v in properties.items():
if k == 'pool':
self.properties[k] = v
else:
raise KeyError('Key not supported: {}'.format(k))
if configs:
for k, v in configs.items():
if k == 'DockerImage':
self.configs[k] = v
else:
raise KeyError('Key not supported: {}'.format(k))
@property
def configs(self):
return self._configs
def add_device(self, device):
self._devices.append(device)
def get_pb2(self):
devices = [device.get_pb2() for device in self._devices]
worker = worker_pb2.Worker(devices=devices)
property_message = worker_pb2.Worker.Property()
for k, v in self.properties.items():
property_message.key = k
property_message.value = v
worker.properties.extend([property_message])
config_message = worker_pb2.Worker.Config()
for k, v in self.properties.items():
property_message.key = k
property_message.value = v
worker.configs.extend([config_message])
return worker
class Device:
def __init__(self, properties=None):
""" Creates devices available to the worker
The first device is know as the Primary Device - the revice which
is running a bit and responsible to actually executing commands.
All other devices are known as Attatched Devices and must be controlled
by the Primary Device.
"""
self._name = str(uuid.uuid4())
self._properties = {}
if properties:
for k, v in properties.items():
if k == 'os':
self._properties[k] = v
elif k == 'docker':
if v not in ('True', 'False'):
raise ValueError('Value not supported: {}'.format(v))
self._properties[k] = v
else:
raise KeyError('Key not supported: {}'.format(k))
@property
def name(self):
return self._name
@property
def properties(self):
return self._properties
def get_pb2(self):
device = worker_pb2.Device(handle=self._name)
property_message = worker_pb2.Device.Property()
for k, v in self._properties.items():
property_message.key = k
property_message.value = v
device.properties.extend([property_message])
return device
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment