Commit 48573e17 authored by Thomas's avatar Thomas

Added experimental azure support

parent d0bb306d
...@@ -27,7 +27,7 @@ logging.config.dictConfig(core.config().logger.as_dict()) ...@@ -27,7 +27,7 @@ logging.config.dictConfig(core.config().logger.as_dict())
from strongr.cli import DeploySingleCommand, ListDeployedVmsCommand,\ from strongr.cli import DeploySingleCommand, ListDeployedVmsCommand,\
DeployManyCommand,\ DeployManyCommand,\
AddJobCommand, GetFinishedJobsCommand,\ GetFinishedJobsCommand,\
RunResourceManager, PrintConfig,\ RunResourceManager, PrintConfig,\
IsValidUserCommand, RunRestServerCommand,\ IsValidUserCommand, RunRestServerCommand,\
RunWorkerCommand, DestroySingleCommand,\ RunWorkerCommand, DestroySingleCommand,\
...@@ -41,7 +41,6 @@ application = Application() ...@@ -41,7 +41,6 @@ application = Application()
application.add(DeploySingleCommand()) application.add(DeploySingleCommand())
application.add(ListDeployedVmsCommand()) application.add(ListDeployedVmsCommand())
application.add(DeployManyCommand()) application.add(DeployManyCommand())
application.add(AddJobCommand())
application.add(GetFinishedJobsCommand()) application.add(GetFinishedJobsCommand())
application.add(RunResourceManager()) application.add(RunResourceManager())
application.add(PrintConfig()) application.add(PrintConfig())
......
from .deploysinglecommand import DeploySingleCommand from .deploysinglecommand import DeploySingleCommand
from .listdeployedvmscommand import ListDeployedVmsCommand from .listdeployedvmscommand import ListDeployedVmsCommand
from .deploymanycommand import DeployManyCommand from .deploymanycommand import DeployManyCommand
from .addjobcommand import AddJobCommand
from .getfinishedjobscommand import GetFinishedJobsCommand from .getfinishedjobscommand import GetFinishedJobsCommand
from .runresourcemanager import RunResourceManager from .runresourcemanager import RunResourceManager
from .printconfig import PrintConfig from .printconfig import PrintConfig
......
from strongr.core.domain.schedulerdomain import SchedulerDomain
from .wrapper import Command
import uuid
import base64
import json
import time
class AddJobCommand(Command):
"""
Add a task to the resource manager
job:add
{image : Docker image to be used}
{script : Shellcode to be executed in docker image, base64 encoded}
{scratch : Does the docker image use a scratch mount?}
{cores : The amount of cores needed for execution}
{memory : The amount of memory in GiB needed for execution}
"""
def handle(self):
schedulerService = SchedulerDomain.schedulerService()
commandFactory = SchedulerDomain.commandFactory()
image = str(self.argument('image'))
script = str(base64.b64decode(self.argument('script'))).splitlines()
job_id = str(int(time.time())) + '-' + str(uuid.uuid4())
scratch = self._castToBool(self.argument('scratch'))
cores = int(self.argument('cores'))
memory = int(self.argument('memory'))
command = commandFactory.newScheduleJobCommand(image=image, script=script, job_id=job_id, scratch=scratch, cores=cores, memory=memory)
schedulerService.getCommandBus().handle(command)
print(json.dumps({'job_id': job_id}))
...@@ -48,7 +48,7 @@ class RunResourceManager(Command): ...@@ -48,7 +48,7 @@ class RunResourceManager(Command):
schedule.every(5).seconds.do(command_bus.handle, check_scaling_command) schedule.every(5).seconds.do(command_bus.handle, check_scaling_command)
schedule.every(10).seconds.do(command_bus.handle, log_stats) schedule.every(10).seconds.do(command_bus.handle, log_stats)
schedule.every(1).minutes.do(command_bus.handle, cleanup_nodes) schedule.every(1).minutes.do(command_bus.handle, cleanup_nodes)
#schedule.every(30).minutes.do(command_bus.handle, cleanup_jobs) schedule.every(30).minutes.do(command_bus.handle, cleanup_jobs)
while True: while True:
schedule.run_pending() schedule.run_pending()
......
...@@ -24,8 +24,8 @@ class RunWorkerCommand(Command): ...@@ -24,8 +24,8 @@ class RunWorkerCommand(Command):
celery = Celery('strongr', broker=broker, backend=backend) celery = Celery('strongr', broker=broker, backend=backend)
scheduler = SchedulerDomain.schedulerService() scheduler = SchedulerDomain.schedulerService()
scheduler.getCommandBus() # we need to initiate this so that celery knows where to send its commands scheduler.getCommandBus() # we need to initiate this as it self-registers it's commands to celery
scheduler.getQueryBus() # we need to initiate this so that celery knows where to send its commands scheduler.getQueryBus() # we need to initiate this as it self-registers it's queries to celery
domains = getattr(config.celery.workers, self.option('profile')).as_dict() domains = getattr(config.celery.workers, self.option('profile')).as_dict()
......
from .runjobhandler import RunJobHandler
from .listdeployedvmshandler import ListDeployedVmsHandler
from .deployvmshandler import DeployVmsHandler
from .requestjidstatushandler import RequestJidStatusHandler
from .destroyvmshandler import DestroyVmsHandler
from .jobfinishedhandler import JobFinishedHandler
from strongr.clouddomain.handler.abstract.cloud import AbstractDeployVmsHandler
import salt.cloud
import time
import strongr.core
import strongr.clouddomain.model.gateways
class DeployVmsHandler(AbstractDeployVmsHandler):
def __call__(self, command):
overrides = {}
overrides['memory'] = command.ram * 1024
overrides['cpu'] = command.cores
overrides['vcpu'] = command.cores
client = salt.cloud.CloudClient(strongr.core.Core.config().clouddomain.Azure.salt_config + '/cloud')
ret = []
for name in command.names:
vmnew_event = strongr.clouddomain.model.gateways.Gateways.inter_domain_event_factory().newVmNewEvent(name, command.cores, command.ram)
strongr.core.Core.inter_domain_events_publisher().publish(vmnew_event)
ret.append(client.profile(names=command.names, profile=command.profile, vm_overrides=overrides, parallel=True))
return ret
from salt.exceptions import SaltSystemExit
from strongr.clouddomain.handler.abstract.cloud import AbstractDestroyVmsHandler
import salt.cloud
import strongr.core
import logging
class DestroyVmsHandler(AbstractDestroyVmsHandler):
def __call__(self, command):
client = salt.cloud.CloudClient(strongr.core.Core.config().clouddomain.Azure.salt_config + '/cloud')
logger = logging.getLogger(self.__class__.__name__)
ret = []
# we are using _chunk_list generator because at some point we might want to remove more than 1 at a time
for chunked_names in self._chunk_list(command.names, 1): # remove one by one, we are offloading this to amqp anyway
try:
ret.append(client.destroy(names=chunked_names))
except SaltSystemExit as e:
# An exception occured within salt. Normally vmdestroyed event would be published trough salt event system.
# Assume VM is no longer there and broadcast vm destroyed event from here.
# If it turns out the vm is still there but the error was triggered due to api rate limiting or flaky connection
# the cleanup job will remove the vm at a later time.
inter_domain_event_factory = strongr.clouddomain.model.gateways.Gateways.inter_domain_event_factory()
vmdestroyed_event = inter_domain_event_factory.newVmDestroyedEvent(chunked_names[0])
strongr.core.Core.inter_domain_events_publisher().publish(vmdestroyed_event)
logger.warning(e)
return ret
def _chunk_list(self, list, chunksize):
for i in range(0, len(list), chunksize):
yield list[i:i + chunksize]
from strongr.clouddomain.handler.abstract.cloud import AbstractJobFinishedHandler
class JobFinishedHandler(AbstractJobFinishedHandler):
def __call__(self, command):
pass
# convert command to inter-domain event
from strongr.clouddomain.handler.abstract.cloud import AbstractListDeployedVmsHandler
import salt.cloud
import strongr.core
class ListDeployedVmsHandler(AbstractListDeployedVmsHandler):
def __call__(self, command):
client = salt.cloud.CloudClient(strongr.core.Core.config().clouddomain.Azure.salt_config + '/cloud')
names = {}
rs = client.query()
for provider in list(rs.keys()):
for location in list(rs[provider].keys()):
for machine in list(rs[provider][location].keys()):
names[machine] = {
'cores': int(rs[provider][location][machine]['size']['cpu']),
'ram': int(rs[provider][location][machine]['size']['memory']) // 1024
}
opts = salt.config.master_config(strongr.core.Core.config().clouddomain.Azure.salt_config + '/master')
opts['quiet'] = True
runner = salt.runner.RunnerClient(opts)
result = runner.cmd('manage.up')
output = {'up': [], 'down': []}
for machine in list(names):
if machine not in result:
output['down'].append(machine)
else:
output['up'].append(machine)
return output
from strongr.clouddomain.handler.abstract.cloud import AbstractRequestJidStatusHandler
import salt.runner
import strongr.core
import strongr.core.gateways
class RequestJidStatusHandler(AbstractRequestJidStatusHandler):
def __call__(self, query):
opts = salt.config.master_config(strongr.core.Core.config().clouddomain.Azure.salt_config + '/master')
opts['quiet'] = True
runner = salt.runner.RunnerClient(opts)
cache = strongr.core.gateways.Gateways.cache()
if not cache.exists('clouddomain.jobs.running'):
cache.set('clouddomain.jobs.running', runner.cmd('jobs.active'), 1)
jobs = cache.get('clouddomain.jobs.running')
if jobs is None:
jobs = {}
if query.jid not in jobs: # we only want to give status when the job is finished running
result = runner.cmd('jobs.lookup_jid', [query.jid])
return result
return None
from strongr.clouddomain.handler.abstract.cloud import AbstractRunJobHandler
import strongr.core
import salt.client
class RunJobHandler(AbstractRunJobHandler):
def __call__(self, command):
local = salt.client.LocalClient()
local.cmd_async(command.host, 'cmd.run', [command.script, "runas={}".format(strongr.core.Core.config().clouddomain.Azure.runas)], jid=command.job_id)
...@@ -23,7 +23,7 @@ class DeployVmsHandler(AbstractDeployVmsHandler): ...@@ -23,7 +23,7 @@ class DeployVmsHandler(AbstractDeployVmsHandler):
vmnew_event = strongr.clouddomain.model.gateways.Gateways.inter_domain_event_factory().newVmNewEvent(name, command.cores, command.ram) vmnew_event = strongr.clouddomain.model.gateways.Gateways.inter_domain_event_factory().newVmNewEvent(name, command.cores, command.ram)
strongr.core.Core.inter_domain_events_publisher().publish(vmnew_event) strongr.core.Core.inter_domain_events_publisher().publish(vmnew_event)
ret.append(client.profile(names=chunked_names, profile=command.profile, vm_overrides=overrides, parallel=True)) ret.append(client.profile(names=chunked_names, profile=command.profile, vm_overrides=overrides, parallel=True))
time.sleep(60) #time.sleep(60) # api rate limiting removed from hpc cloud, testing with sleep turned off
# this sleep is here because of HPCCloud API rate limiting # this sleep is here because of HPCCloud API rate limiting
# we should find a better solution at some point... # we should find a better solution at some point...
......
...@@ -6,4 +6,4 @@ import salt.client ...@@ -6,4 +6,4 @@ import salt.client
class RunJobHandler(AbstractRunJobHandler): class RunJobHandler(AbstractRunJobHandler):
def __call__(self, command): def __call__(self, command):
local = salt.client.LocalClient() local = salt.client.LocalClient()
local.cmd_async(command.host, 'cmd.run', [command.script, "runas={}".format(strongr.core.Core.config().clouddomain.OpenNebula.runas)], jid=command.job_id) local.cmd_async(command.host, 'cmd.run', ['cmd={}'.format(command.script), 'runas={}'.format(strongr.core.Core.config().clouddomain.OpenNebula.runas)], jid=command.job_id)
from .opennebula import OpenNebula from .opennebula import OpenNebula
from .mockcloud import MockCloud from .mockcloud import MockCloud
from .azure import Azure
from strongr.clouddomain.handler.impl.cloud.azure import ListDeployedVmsHandler, \
RunJobHandler, DeployVmsHandler, \
RequestJidStatusHandler, DestroyVmsHandler, \
JobFinishedHandler
from .abstractcloudservice import AbstractCloudService
import strongr.clouddomain.model.gateways
class Azure(AbstractCloudService):
_salt_event_translator_thread = None
def __init__(self, *args, **kwargs):
super(Azure, self).__init__(*args, **kwargs)
def start_reactor(self):
salt_event_translator = strongr.clouddomain.model.gateways.Gateways.salt_event_translator()
if not salt_event_translator.is_alive():
salt_event_translator.setDaemon(True)
salt_event_translator.start() # start event translator thread if it wasn't running
def get_command_handlers(self):
return [RunJobHandler, DeployVmsHandler, DestroyVmsHandler, JobFinishedHandler]
def get_query_handlers(self):
return [ListDeployedVmsHandler, RequestJidStatusHandler]
from .cloud import OpenNebula from .cloud import OpenNebula, MockCloud, Azure
from .cloud import MockCloud
# cloud service factory # cloud service factory
class CloudServices(): class CloudServices():
_clouds = [ \ _clouds = [ \
OpenNebula, OpenNebula,
MockCloud MockCloud,
Azure
] ]
_instances = {} _instances = {}
......
...@@ -15,7 +15,8 @@ post_task = ns.model('post-task', { ...@@ -15,7 +15,8 @@ post_task = ns.model('post-task', {
'script': fields.String(required=True, min_length=1, description='The shellcode to be executed'), 'script': fields.String(required=True, min_length=1, description='The shellcode to be executed'),
'scratch': fields.String(required=True, min_length=1, description='Does the job need a scratch dir?'), 'scratch': fields.String(required=True, min_length=1, description='Does the job need a scratch dir?'),
'cores': fields.Integer(required=True, min=1, description='The amount of cores needed to peform the task'), 'cores': fields.Integer(required=True, min=1, description='The amount of cores needed to peform the task'),
'memory': fields.Integer(required=True, min=1, description="The amount of ram in GiB needed to peform the task") 'memory': fields.Integer(required=True, min=1, description="The amount of ram in GiB needed to peform the task"),
'secrets': fields.List(fields.String, required=False, description='A list of keys of secrets to be injected into the process')
}) })
@ns.route('/tasks/status/<string:tasks>') @ns.route('/tasks/status/<string:tasks>')
...@@ -84,7 +85,12 @@ class Tasks(Resource): ...@@ -84,7 +85,12 @@ class Tasks(Resource):
memory = int(request.json['memory']) memory = int(request.json['memory'])
job_id = str(int(time.time())) + '-' + str(uuid.uuid4()) job_id = str(int(time.time())) + '-' + str(uuid.uuid4())
command = commandFactory.newScheduleJobCommand(image, script, job_id, scratch, cores, memory) if 'secrets' in request.json:
secrets = request.json['secrets']
else:
secrets = []
command = commandFactory.newScheduleJobCommand(image, script, job_id, scratch, cores, memory, secrets)
schedulerService.getCommandBus().handle(command) schedulerService.getCommandBus().handle(command)
return {'job_id': job_id}, 201 return {'job_id': job_id}, 201
class ScheduleJob: class ScheduleJob:
def __init__(self, image, script, job_id, scratch, cores, memory): def __init__(self, image, script, job_id, scratch, cores, memory, secrets):
self.image = image self.image = image
self.script = script self.script = script
self.job_id = job_id self.job_id = job_id
self.scratch = scratch self.scratch = scratch
self.cores = cores self.cores = cores
self.memory = memory self.memory = memory
self.secrets = secrets
...@@ -148,7 +148,7 @@ class CommandFactory: ...@@ -148,7 +148,7 @@ class CommandFactory:
""" """
return RunEnqueuedJobs() return RunEnqueuedJobs()
def newScheduleJobCommand(self, image, script, job_id, scratch, cores, memory): def newScheduleJobCommand(self, image, script, job_id, scratch, cores, memory, secrets):
""" Generates a new ScheduleTask command """ Generates a new ScheduleTask command
:param image: the docker image used for running the job :param image: the docker image used for running the job
...@@ -169,6 +169,9 @@ class CommandFactory: ...@@ -169,6 +169,9 @@ class CommandFactory:
:param ram: The amount of RAM in GiB in the VM :param ram: The amount of RAM in GiB in the VM
:type ram: int :type ram: int
:param secrets: List of keys of secrets that should be injected into the process
:type secrets: list
:returns: A ScheduleJob command object :returns: A ScheduleJob command object
:rtype: ScheduleJob :rtype: ScheduleJob
""" """
...@@ -183,8 +186,10 @@ class CommandFactory: ...@@ -183,8 +186,10 @@ class CommandFactory:
raise InvalidParameterException('cores is invalid') raise InvalidParameterException('cores is invalid')
elif memory <= 0: elif memory <= 0:
raise InvalidParameterException('memory is invalid') raise InvalidParameterException('memory is invalid')
elif not isinstance(secrets, list):
raise InvalidParameterException('List of secrets is invalid')
return ScheduleJob(image=image, script=script, job_id=job_id, scratch=scratch, cores=cores, memory=memory) return ScheduleJob(image=image, script=script, job_id=job_id, scratch=scratch, cores=cores, memory=memory, secrets=secrets)
def newStartJobOnVm(self, vm_id, job_id): def newStartJobOnVm(self, vm_id, job_id):
""" Generates a new StartJobOnVm command """ Generates a new StartJobOnVm command
......
...@@ -16,3 +16,4 @@ class ScheduleJobHandler: ...@@ -16,3 +16,4 @@ class ScheduleJobHandler:
session = strongr.core.gateways.Gateways.sqlalchemy_session() session = strongr.core.gateways.Gateways.sqlalchemy_session()
session.add(job) session.add(job)
session.commit()
...@@ -16,6 +16,8 @@ class Job(Base): ...@@ -16,6 +16,8 @@ class Job(Base):
image = Column(Text) image = Column(Text)
scratch = Column(Boolean) scratch = Column(Boolean)
_secrets = Column(Text)
vm_id = Column(String(255), ForeignKey('vms.vm_id')) vm_id = Column(String(255), ForeignKey('vms.vm_id'))
vm = relationship('Vm', back_populates='jobs') vm = relationship('Vm', back_populates='jobs')
...@@ -25,6 +27,18 @@ class Job(Base): ...@@ -25,6 +27,18 @@ class Job(Base):
_state = Column('state', Enum(JobState)) _state = Column('state', Enum(JobState))
@property
def secrets(self):
return self._secrets.split('\n')
# update state_date-field as well when we update state-field
@secrets.setter
def secrets(self, array_of_secrets):
self._secrets = '\n'.join(array_of_secrets)
# create a synonym so that _state and state are considered the same field by the mapper
secrets = synonym('_secrets', descriptor=secrets)
# In classical SQL we would put a trigger to update this field with NOW() if the state-field is updated. # In classical SQL we would put a trigger to update this field with NOW() if the state-field is updated.
# SQLAlchemy has no way to write triggers without writing platform-dependent SQL at the time of writing. # SQLAlchemy has no way to write triggers without writing platform-dependent SQL at the time of writing.
# Instead we use a setter on the state-field, this setter updates the state_date as well. # Instead we use a setter on the state-field, this setter updates the state_date as well.
......
...@@ -3,7 +3,6 @@ from datetime import datetime, timedelta ...@@ -3,7 +3,6 @@ from datetime import datetime, timedelta
from strongr.schedulerdomain.model.scalingdrivers.abstract import AbstractScaleOut, AbstractScaleIn, \ from strongr.schedulerdomain.model.scalingdrivers.abstract import AbstractScaleOut, AbstractScaleIn, \
AbstractVmTemplateRetriever AbstractVmTemplateRetriever
import sys
# The null-scaler ignores scalein and scaleout signals. # The null-scaler ignores scalein and scaleout signals.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment