Commit c3cb3e14 authored by Thomas Phil's avatar Thomas Phil

enabled rest-api

parent ec295539
......@@ -3,9 +3,8 @@
from cleo import Application
from cleo.inputs.argv_input import ArgvInput
import strongr.core
import logging.config
from strongr.core.domain.configdomain import ConfigDomain
# Use CLEO ArgvInput to extract some parameters
# this dependency gets injected into
......@@ -18,8 +17,10 @@ env = "develop"
if argvInputs.has_parameter_option('env'):
env = argvInputs.get_parameter_option('env')
import strongr.core
core = strongr.core.Core
from strongr.core.domain.configdomain import ConfigDomain
ConfigDomain.configService().getCommandBus().handle(ConfigDomain.commandFactory().newLoadConfig(env))
logging.config.dictConfig(core.config().logger.as_dict())
......
FROM bigr/fastr_worker_develop
MAINTAINER T. Phil <thomas@tphil.nl>
RUN mkdir /opt/fastr-conf && mkdir /opt/strongr && touch /opt/strongr/init.sh
ENV FASTRHOME /opt/fastr-conf
ADD config.py /opt/fastr-conf/config.py
......@@ -2,6 +2,6 @@ FROM bigr/fastr_worker
MAINTAINER T. Phil <thomas@tphil.nl>
RUN mkdir /opt/fastr-conf
RUN mkdir /opt/fastr-conf && mkdir /opt/strongr && touch /opt/strongr/init.sh
ENV FASTRHOME /opt/fastr-conf
ADD config.py /opt/fastr-conf/config.py
FROM ubuntu:16.04
MAINTAINER Thomas Phil <thomas@tphil.nl>
ARG FASTRBRANCH=develop
ENV FASTRBRANCH ${FASTRBRANCH}
#ARG FASTRBRANCH develop
ARG FASTRBRANCH
RUN export DEBIAN_FRONTED=noninteractive
RUN apt-get update && apt-get install -y python python-pip mercurial
RUN apt-get update && apt-get install -y python python-pip mercurial && pip install --upgrade pip
RUN hg clone https://sikerdebaard@bitbucket.org/bigr_erasmusmc/fastr && cd fastr && hg checkout $FASTRBRANCH && hg pull && hg up && pip install -e .
ENTRYPOINT '/bin/bash'
......@@ -19,9 +19,9 @@ import time
class RunJobHandler(AbstractRunJobHandler):
def __call__(self, command):
#thread = threading.Thread(target=self._run, args=(command,)) # run in separate thread so it doesn't block strongr
#thread.start()
self._run(command)
thread = threading.Thread(target=self._run, args=(command,)) # run in separate thread so it doesn't block strongr
thread.start()
#self._run(command)
def _run(self, command):
inter_domain_event_factory = Gateways.inter_domain_event_factory()
......@@ -40,7 +40,7 @@ class RunJobHandler(AbstractRunJobHandler):
env = "-e SCRATCH_DIR='/scratch'"
cmd = 'docker run --rm {} {} -di --name {} -m {}g --cpus={} --entrypoint /bin/sh {}'.format(volumes, env, command.job_id, command.memory, command.cores, quote(command.image))
cmd = 'docker run {} {} -di --name {} -m {}g --cpus={} --entrypoint /bin/sh {}'.format(volumes, env, command.job_id, command.memory, command.cores, quote(command.image))
ret_code = subprocess.call(cmd, shell=True) # start docker container
print(cmd)
......@@ -77,5 +77,13 @@ class RunJobHandler(AbstractRunJobHandler):
time.sleep(1)
cmd = 'docker rm {}'.format(command.job_id)
try:
subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True)
except subprocess.CalledProcessError as err:
if ret_code != 0:
Exception('Something went wrong while executing script in docker image: {}'.format(cmd))
stdout = err.output
job_finished_event = inter_domain_event_factory.newJobFinishedEvent(command.job_id, stdout, 0)
inter_domain_events_publisher.publish(job_finished_event)
......@@ -49,7 +49,7 @@ class DefaultsLoader:
}
},
'schedulerdomain': {
'scalingdriver': 'simplescaler',
'scalingdriver': 'nullscaler',
'simplescaler': {
'scaleoutmincoresneeded': 1,
'scaleoutminramneeded': 1
......
from flask_restplus import Namespace, Resource, fields, reqparse
from flask import request
from flask_restplus import Namespace, Resource, fields
from flask import request, jsonify
import strongr.core
import time
import uuid
import strongr.restdomain.model.gateways
from strongr.core.domain.schedulerdomain import SchedulerDomain
from strongr.restdomain.api.utils import namespace_require_oauth
ns = Namespace('scheduler', description='Operations related to the schedulerdomain')
post_task = ns.model('post-task', {
'cmd': fields.String(required=True, min_length=1, description='The shellcode to be executed'),
'image': fields.String(required=True, min_length=1, description='The docker image where the script is executed'),
'script': fields.String(required=True, min_length=1, description='The shellcode to be executed'),
'scratch': fields.String(required=True, min_length=1, description='Does the job need a scratch dir?'),
'cores': fields.Integer(required=True, min=1, description='The amount of cores needed to peform the task'),
'ram': fields.Integer(required=True, min=1, description="The amount of ram in GiB needed to peform the task")
'memory': fields.Integer(required=True, min=1, description="The amount of ram in GiB needed to peform the task")
})
@ns.route('/task')
class Tasks(Resource):
@ns.route('/tasks/status/<string:tasks>')
class TaskStatusQuery(Resource):
def __init__(self, *args, **kwargs):
super(Tasks, self).__init__(*args, **kwargs)
super(TaskStatusQuery, self).__init__(*args, **kwargs)
@ns.response(200, 'OK')
@namespace_require_oauth('task')
@ns.param('task_id')
def get(self):
"""Requests task status"""
#@namespace_require_oauth('task')
def get(self, tasks):
tasks = [x.strip() for x in tasks.split(',') if len(x.strip()) > 0] # convert to array
if tasks is None or len(tasks) == 0:
return None, 400
schedulerService = SchedulerDomain.schedulerService()
queryFactory = SchedulerDomain.queryFactory()
query = queryFactory.newRequestScheduledJobs()
query = queryFactory.newRequestJobInfo(tasks)
result = schedulerService.getQueryBus().handle(query)
return result, 200
output = {}
for job in result:
output[job.job_id] = {
'state': str(job.state).split('.')[-1],
'stdout': job.stdout
}
return output, 200
#@ns.route('/task/<string:task_id>')
#class GetTask(Resource):
# def __init__(self, *args, **kwargs):
# super(GetTask, self).__init__(*args, **kwargs)
#
# @ns.response(200, 'OK')
# #@namespace_require_oauth('task')
# def get(self, task_id):
# """Requests task status"""
# schedulerService = SchedulerDomain.schedulerService()
# queryFactory = SchedulerDomain.queryFactory()
#
# query = queryFactory.newRequestScheduledJobs([task_id])
#
# result = schedulerService.getQueryBus().handle(query)
# return result, 200
@ns.route('/task')
class Tasks(Resource):
def __init__(self, *args, **kwargs):
super(Tasks, self).__init__(*args, **kwargs)
@ns.response(201, 'Task successfully created.')
@namespace_require_oauth('task')
#@namespace_require_oauth('task')
@ns.expect(post_task, validate=True)
def post(self):
"""Creates a new task."""
schedulerService = SchedulerDomain.schedulerService()
commandFactory = SchedulerDomain.commandFactory()
cmd = request.json['cmd']
image = request.json['image']
script = request.json['script'].splitlines()
scratch = request.json['scratch']
cores = int(request.json['cores'])
ram = int(request.json['ram'])
taskid = str(int(time.time())) + '-' + str(uuid.uuid4())
memory = int(request.json['memory'])
job_id = str(int(time.time())) + '-' + str(uuid.uuid4())
command = commandFactory.newScheduleJobCommand(taskid, cmd, cores, ram)
command = commandFactory.newScheduleJobCommand(image, script, job_id, scratch, cores, memory)
schedulerService.getCommandBus().handle(command)
return {'taskid': taskid}, 201
return {'job_id': job_id}, 201
......@@ -16,7 +16,6 @@ from strongr.restdomain.model.oauth2.grants.authorizationcodegrant import Author
from strongr.restdomain.model.oauth2.grants.clientcredentialsgrant import ClientCredentialsGrant
from strongr.restdomain.model.oauth2.grants.passwordgrant import PasswordGrant
class Gateways(containers.DeclarativeContainer):
"""IoC container of gateway objects."""
_blueprints = providers.Object([apiv1])
......
......@@ -7,6 +7,10 @@ from strongr.restdomain.model.oauth2 import Token
class ClientCredentialsGrant(_ClientCredentialsGrant):
TOKEN_ENDPOINT_AUTH_METHODS = [
'client_secret_basic', 'client_secret_post'
]
def create_access_token(self, token, client):
item = Token(
client_id=client.client_id,
......
......@@ -3,6 +3,8 @@ from strongr.schedulerdomain.query import RequestScheduledJobs, RequestFinishedJ
from strongr.core.exception import InvalidParameterException
import re
class QueryFactory:
""" This factory instantiates query objects to be sent to a scheduler querybus. """
......@@ -48,13 +50,31 @@ class QueryFactory:
"""
return RequestScheduledJobs()
def newRequestFinishedJobs(self):
def newRequestFinishedJobs(self, jobs=None):
""" Generates a new RequestFinishedJobs query
:param jobs: a list of job id's
:returns: A RequestFinishedJobs query object
:rtype: RequestFinishedJobs
"""
return RequestFinishedJobs()
jobs_sanitized = [re.sub('[^a-zA-Z0-9-]', '', x) for x in jobs] # sanitize inputs
return RequestFinishedJobs(jobs_sanitized)
def newRequestJobInfo(self, jobs=None):
""" Generates a new RequestFinishedJobs query
:param jobs: a list of job id's
:returns: A RequestFinishedJobs query object
:rtype: RequestFinishedJobs
"""
jobs_sanitized = [re.sub('[^a-zA-Z0-9-]', '', x) for x in jobs] # sanitize inputs
return RequestJobInfo(jobs_sanitized)
def newRequestTaskInfo(self, taskid):
""" Generates a new RequestTaskInfo query
......
from sqlalchemy import and_
import strongr.core.gateways
from strongr.schedulerdomain.model import Job, JobState
class RequestFinishedJobsHandler(object):
def __call__(self, query, *args, **kwargs):
session = strongr.core.gateways.Gateways.sqlalchemy_session()
result = session.query(Job).filter(Job.state.in_([JobState.FAILED, JobState.FINISHED])).order_by(
if query.jobs is not None:
result = session.query(Job).filter(and_(Job.state.in_([JobState.FAILED, JobState.FINISHED]), Job.job_id.in_(query.jobs))).order_by(
Job.job_id).all()
else:
result = session.query(Job).filter(Job.state.in_([JobState.FAILED, JobState.FINISHED])).order_by(
Job.job_id).all()
job_ids = []
for job in result:
......
......@@ -4,5 +4,5 @@ import strongr.core.gateways
class RequestTaskInfoHandler:
def __call__(self, query):
session = strongr.core.gateways.Gateways.sqlalchemy_session()
result = session.query(Job).get(query.job_id)
result = session.query(Job).filter(Job.job_id.in_(query.jobs)).all()
return result
class RequestFinishedJobs:
pass
def __init__(self, jobs):
self.jobs = jobs
class RequestJobInfo:
def __init__(self, job_id):
self.job_id = job_id
def __init__(self, jobs):
self.jobs = jobs
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment