Commit 5aadeedc authored by Vlad Calin's avatar Vlad Calin

Use settings file to configure workloads

parent 153f5268
WORKLOADS = [
'examples.file_processing.workload.FileProcessingWorkload'
]
QUEUE = "redis://localhost:6379/0"
from workloads.registry import WorkloadRegistry
from workloads.workload import Workload, step
registry = WorkloadRegistry()
@registry.register
class FileProcessingWorkload(Workload):
name = 'file_processing'
@step('start')
def start(self):
pass
if __name__ == '__main__':
registry.cli()
print("This is cool")
......@@ -16,7 +16,7 @@ setup(
packages=find_packages(exclude=['examples']),
install_requires=[
'click',
'redis'
'redis',
],
extras_require={
'dev': [
......
import click
from workloads.settings import Settings
@click.group()
def cli():
pass
settings_option = click.option('--settings', help='Settings module', default='settings')
@cli.command('list')
@settings_option
def list_workloads(settings):
settings = Settings(settings)
for wl in settings.workloads:
print(wl.name)
@cli.command('run')
@settings_option
@click.argument('name')
def run_workload(settings, name):
settings = Settings(settings)
workload = settings.get_workload(name)()
workload.run(queue=settings.queue, worker_count=settings.worker_count)
import functools
import click
from workloads.workload import run_workload
def parameter_to_click_option(parameter):
return click.option("--" + parameter.name)
class WorkloadRegistry(object):
def __init__(self):
self.workloads = []
def register(self, workload_cls, name=None):
name = name or workload_cls.name
if not name:
raise ValueError('Workload name must be specified')
self.workloads.append(workload_cls)
def list(self):
return self.workloads
def get(self, name):
for wk in self.workloads:
if wk.name == name:
return wk
raise KeyError('No workload {} found'.format(name))
def get_cli(self):
@click.group()
def cli():
pass
@cli.command('list')
def list_workflows():
for wf in self.list():
print(wf.name)
@cli.group('run')
def run_cli():
pass
for wl in self.workloads:
decorator = run_cli.command(wl.name)
decorator = decorator(functools.partial(run_workload, wl))
decorator = click.option('--queue', required=True)(decorator)
return cli
def cli(self):
return self.get_cli()()
import importlib
class Settings:
def __init__(self, module):
self.module = importlib.import_module(module)
self.workloads = self.init_workloads(self.module.WORKLOADS)
self.queue = self.module.QUEUE
self.worker_count = self.get_worker_count()
def init_workloads(self, workload_cls_list):
workloads = []
for cls_path in workload_cls_list:
m, cls = cls_path.rsplit('.', maxsplit=1)
workloads.append(getattr(importlib.import_module(m), cls))
return workloads
def get_workload(self, name):
for wl in self.workloads:
if wl.name == name:
return wl
raise KeyError("Workload {} not found".format(name))
def get_worker_count(self, default=8):
if hasattr(self.module, 'WORKER_COUNT'):
return self.module.WORKER_COUNT
else:
return default
import inspect
import threading
import redis
class RedisDataStore(object):
def __init__(self, connection, namespace):
self.conn = connection
self.namespace = namespace
self.job_queue_name = self.get_job_queue_name(namespace)
def get(self):
pass
def get_job_queue_name(self, namespace):
return '{}-job-queue'.format(namespace)
def get_worker_status_key(self, worker):
return '{}-worker-status-{}'.format(self.namespace, worker)
def update_worker_status(self, worker_id, status):
self.conn.set(self.get_worker_status_key(worker_id), status)
def get_worker_status(self, worker_id):
self.conn.get(self.get_worker_status_key(worker_id))
class Workload(object):
......@@ -6,10 +31,17 @@ class Workload(object):
def __init__(self):
self.steps = self.discover_steps()
self.data_store = None
self.worker_pool = {}
def run(self, queue, *, start_step='start'):
def run(self, queue, worker_count, *, start_step='start'):
print("running with queue {}".format(queue))
print("start step {}".format(start_step))
self.data_store = self.initialize_data_store(queue)
print("Using data store {}".format(self.data_store))
for _ in range(worker_count):
self.worker_pool[_] = self.create_worker(_)
self.wait_until_done()
def discover_steps(self):
steps = {}
......@@ -19,6 +51,24 @@ class Workload(object):
steps[attr.__workloads_step__] = attr
return steps
def create_worker(self, worker_index):
thr = threading.Thread(target=self._worker, args=(worker_index, self.data_store,))
thr.start()
return thr
def _worker(self, worker_id, data_store):
data_store.update_worker_status(worker_id, 'idle')
while True:
message = data_store.get()
def initialize_data_store(self, queue_url):
redis_client = redis.Redis.from_url(queue_url)
return RedisDataStore(redis_client, namespace=self.name)
def wait_until_done(self):
if all(self.data_store.get_worker_status(worker_id) == 'idle' for worker_id in self.worker_pool):
return
def step(name):
def decorator(func):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment