Commit 6167a877 authored by Thomas Phil's avatar Thomas Phil

RM-55 scheduler optimizations

parent ffab0752
......@@ -44,7 +44,7 @@ class RunResourceManager(Command):
schedule.every(1).seconds.do(command_bus.handle, run_enqueued_jobs_command)
schedule.every(5).seconds.do(command_bus.handle, check_scaling_command)
schedule.every(10).seconds.do(command_bus.handle, log_stats)
schedule.every(5).minutes.do(command_bus.handle, cleanup_nodes)
schedule.every(1).minutes.do(command_bus.handle, cleanup_nodes)
schedule.every(30).minutes.do(command_bus.handle, cleanup_jobs)
while True:
......
......@@ -14,14 +14,16 @@ class DestroyVmsHandler(AbstractDestroyVmsHandler):
logger = logging.getLogger(self.__class__.__name__)
ret = []
for chunked_names in self._chunk_list(command.names, 1):
# we are using _chunk_list generator because at some point we might want to remove more than 1 at a time
for chunked_names in self._chunk_list(command.names, 1): # remove one by one, we are offloading this to amqp anyway
try:
ret.append(client.destroy(names=chunked_names))
except SaltSystemExit as e:
# an exception occured within salt, normally below event would be published trough salt event system
# assume VM is no longer there and broadcast vm destroyed event from here
# if it turns out the vm still there but error was triggered due to api rate limiting or flaky connection
# the cleanup script will remove the vm at a later time but this cleanup script will not trigger below event
# An exception occured within salt. Normally vmdestroyed event would be published trough salt event system.
# Assume VM is no longer there and broadcast vm destroyed event from here.
# If it turns out the vm is still there but the error was triggered due to api rate limiting or flaky connection
# the cleanup should remove the vm at a later time.
inter_domain_event_factory = strongr.clouddomain.model.gateways.Gateways.inter_domain_event_factory()
vmdestroyed_event = inter_domain_event_factory.newVmDestroyedEvent(chunked_names[0])
strongr.core.Core.inter_domain_events_publisher().publish(vmdestroyed_event)
......
......@@ -6,4 +6,4 @@ import salt.client
class RunJobHandler(AbstractRunJobHandler):
def __call__(self, command):
local = salt.client.LocalClient()
local.cmd_async(command.host, 'cmd.run', [command.sh, "runas={}".format(strongr.core.Core.config().clouddomain.OpenNebula.runas)], jid=command.job_id)
local.cmd_async(command.host, 'cmd.run', [command.script, "runas={}".format(strongr.core.Core.config().clouddomain.OpenNebula.runas)], jid=command.job_id)
......@@ -18,7 +18,7 @@ class CleanupNodesHandler(object):
vm_templates = ScalingDriver.scaling_driver().get_templates()
deadline = datetime.utcnow() - timedelta(minutes=30) # give cloud domain time to provision a machine, if it isn't online by then it will probably never be
deadline = datetime.utcnow() + timedelta(minutes=-30) # give cloud domain time to provision a machine, if it isn't online by then it will probably never be
session = strongr.core.gateways.Gateways.sqlalchemy_session()
unprovisioned_vms_in_db = session.query(Vm).filter(and_(Vm.state.in_([VmState.NEW, VmState.PROVISION]), deadline > Vm.state_date)).all()
vms_in_cloud = cloud_query_bus.handle(cloud_query_factory.newListDeployedVms())
......@@ -46,7 +46,7 @@ class CleanupNodesHandler(object):
parallel_remove_list.append(vm)
# check for VM's marked for death without jobs
deadline = datetime.utcnow() + timedelta(minutes=-5) # vm's need to be marked for death for at least 5 mins before we clean them up else we run into race conditions
deadline = datetime.utcnow() + timedelta(seconds=-10) # vm's need to be marked for death for at least 10 secs before we clean them up else we run into race conditions
subquery = session.query(Job.vm_id,
func.count(Job.job_id).label('jobs'))\
.filter(
......@@ -61,7 +61,8 @@ class CleanupNodesHandler(object):
parallel_remove_list.append(vm.vm_id)
# check for expired VM's
expired_vms = session.query(Vm).filter(and_(func.now() > Vm.deadline, Vm.state.in_([VmState.NEW, VmState.PROVISION, VmState.READY]))).all()
timenow = datetime.utcnow()
expired_vms = session.query(Vm).filter(and_(timenow > Vm.deadline, Vm.state.in_([VmState.NEW, VmState.PROVISION, VmState.READY]))).all()
for vm in expired_vms:
vm.state = VmState.MARKED_FOR_DEATH
session.commit()
......
......@@ -6,3 +6,4 @@ class JobState(Enum):
ASSIGNED = 20
RUNNING = 30
FINISHED = 40
ON_HOLD = 50
......@@ -5,13 +5,15 @@ import strongr.core
from strongr.schedulerdomain.model.scalingdrivers.nullscaler import NullScaler
from strongr.schedulerdomain.model.scalingdrivers.simplescaler import SimpleScaler
from strongr.schedulerdomain.model.scalingdrivers.surfhpccloudscaler import SurfHpcScaler
class ScalingDriver(containers.DeclarativeContainer):
"""IoC container of service providers."""
_scalingdrivers = providers.Object({
'simplescaler': SimpleScaler,
'nullscaler': NullScaler
'nullscaler': NullScaler,
'surfsarahpccloud': SurfHpcScaler
})
scaling_driver = providers.Singleton(_scalingdrivers()[strongr.core.Core.config().schedulerdomain.scalingdriver.lower()], config=dict(strongr.core.Core.config().schedulerdomain.as_dict()[strongr.core.Core.config().schedulerdomain.scalingdriver]) if strongr.core.Core.config().schedulerdomain.scalingdriver in strongr.core.Core.config().schedulerdomain.as_dict() else {})
from strongr.schedulerdomain.model.scalingdrivers.abstract import AbstractScaleOut, AbstractScaleIn, \
AbstractVmTemplateRetriever
import uuid
import strongr.core
import strongr.core.gateways
import logging
import strongr.core.domain.schedulerdomain
from sqlalchemy import func, and_, or_
from strongr.schedulerdomain.model import JobState, Job, Vm, VmState
from datetime import datetime, timedelta
# This scaler is specifically written for the Surf-SARA HPCCloud
class SurfHpcScaler(AbstractScaleIn, AbstractScaleOut, AbstractVmTemplateRetriever):
def __init__(self, config, *args, **kwargs):
super(SurfHpcScaler, self).__init__(*args, **kwargs)
self._config = config
def get_vm_max_age(self, vm_name):
deadline = datetime.utcnow() + timedelta(hours=6)
return deadline
def get_templates(self):
return self._config.schedulerdomain.surfhpcscaler.templates.as_dict()
def scalein(self, command):
if strongr.core.gateways.Gateways.lock('scaleout-lock').exists():
return # only every run one of these commands at once
# double check lock
# we should move this locking mechanism a little bit more down the chain so that it isn't the specific scaling
# algorithms responsibility
with strongr.core.gateways.Gateways.lock('scaleout-lock'): # only ever run one of these commands at once
logger = logging.getLogger('schedulerdomain.' + self.__class__.__name__)
session = strongr.core.gateways.Gateways.sqlalchemy_session()
# subquery to see whats already running on vm
subquery1 = session.query(Job.vm_id, func.count(Job.job_id).label('jobs'), func.sum(Job.cores).label('cores'), func.sum(Job.ram).label('ram')).filter(
Job.state.in_([JobState.RUNNING])).group_by(Job.vm_id).subquery('j')
subquery2 = session.query(Job.vm_id, func.max(Job.state_date).label('last_job_date')).filter(Job.state.in_([JobState.FAILED, JobState.FINISHED, JobState.RUNNING])).group_by(Job.vm_id).subquery('i')
job_deadline = datetime.utcnow() + timedelta(minutes=-10)
results = session.query(Vm.vm_id.label('vm_id'), subquery1.c.jobs.label('job_count'), subquery2.c.last_job_date) \
.outerjoin(subquery1, subquery1.c.vm_id == Vm.vm_id) \
.outerjoin(subquery2, subquery2.c.vm_id == Vm.vm_id) \
.filter(
and_(
and_( # only vm's with no jobs
subquery1.c.cores == None,
subquery1.c.ram == None,
),
or_(
subquery2.c.last_job_date < job_deadline, # if no job running for 10 minutes kill the vm
and_(subquery2.c.last_job_date == None, Vm.state_date < job_deadline) # if no job scheduled to vm for 10 minutes kill the vm
),
Vm.state.in_([VmState.READY]) # vm should be in state ready
)
).all()
if not results:
return # no VM's to scalein
vms_to_update = []
for vm in results:
vms_to_update.append(vm[0])
if len(vms_to_update) > 0:
session.query(Vm).filter(Vm.vm_id.in_(vms_to_update)).update({Vm.state: VmState.MARKED_FOR_DEATH}, synchronize_session='fetch')
def scaleout(self, command):
cores = command.cores
ram = command.ram
if strongr.core.gateways.Gateways.lock('scaleout-lock').exists():
return # only every run one of these commands at once
with strongr.core.gateways.Gateways.lock('scaleout-lock'): # only ever run one of these commands at once
config = self._config
logger = logging.getLogger('schedulerdomain.' + self.__class__.__name__)
query_factory = strongr.core.domain.schedulerdomain.SchedulerDomain.queryFactory()
query_bus = strongr.core.domain.schedulerdomain.SchedulerDomain.schedulerService().getQueryBus()
templates = dict(config.schedulerdomain.simplescaler.templates.as_dict()) # make a copy because we want to manipulate the list
active_vms = query_bus.handle(query_factory.newRequestVms([VmState.NEW, VmState.PROVISION, VmState.READY]))
provision_counter = 0
for vm in active_vms:
if vm.state in [VmState.NEW, VmState.PROVISION]:
cores -= vm.cores
ram -= vm.ram
provision_counter += 1
if provision_counter >= 2:
# don't provision more than 2 VM's at the same time
return
if cores <= 0 or cores < config.schedulerdomain.surfhpcscaler.scaleoutmincoresneeded:
return
if ram <= 0 or ram < config.schedulerdomain.surfhpcscaler.scaleoutminramneeded:
return
for template in templates:
templates[template]['distance'] = templates[template]['ram'] / templates[template]['cores']
ram_per_core_needed = ram / cores
# find best fit based on templates
# first we calculate the distance based on optimal mem / core distribution
distances = {}
for template in templates:
distance = abs(templates[template]['distance'] - ram_per_core_needed)
if distance not in distances:
distances[distance] = []
distances[distance].append(template)
min_distance_templates = distances[min(distances)] # get templates with least distance
# now find the templates with resources that best fit what we need
# we simply select the best fitting template with the most resources
template = min(min_distance_templates, key=lambda key: (cores - templates[key]['cores']) + (ram - templates[key]['ram']))
# scaleout by one instance
cloudService = strongr.core.domain.clouddomain.CloudDomain.cloudService()
cloudCommandFactory = strongr.core.domain.clouddomain.CloudDomain.commandFactory()
cloudProviderName = config.clouddomain.driver
profile = getattr(config.clouddomain, cloudProviderName).default_profile if 'profile' not in templates[template] else templates[template]['profile']
deployVmsCommand = cloudCommandFactory.newDeployVms(names=[template + '-' + str(uuid.uuid4())], profile=profile, cores=templates[template]['cores'], ram=templates[template]['ram'])
cloudCommandBus = cloudService.getCommandBus()
logger.info('Deploying VM {0} cores={1} ram={2}GiB'.format(deployVmsCommand.names[0], templates[template]['cores'], templates[template]['ram']))
cloudCommandBus.handle(deployVmsCommand)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment