Commit 31936f46 authored by Jed Simson's avatar Jed Simson

Add job dispatcher

parent 76c813f1
from flask import (Blueprint, request, jsonify, render_template, url_for,
session)
from flask import Blueprint, request, jsonify, url_for, session
from lib.slider import Slider
from tasks import do_sliding_window_search, q1, q2, q3, failed
from lib.queues import QueueManager
from lib.dispatcher import Dispatcher, JobNotFoundError
from datetime import datetime
import time
import random
api = Blueprint('api', __name__, url_prefix='/api')
slider = Slider()
queues = {1: q1, 2: q2, 3: q3}
queue_manager = QueueManager(queues)
dispatcher = Dispatcher(queue_manager)
@api.route('/')
def index():
return jsonify({'name:': 'Reddit Slider',
'description': 'A simple program to allow greater filtering options for Reddit submissions.',
'description': 'A simple program to allow greater ' +
'filtering options for Reddit submissions.',
'version': 1.0})
@api.route('/jobs/')
def list_jobs():
if 'job_id' in session:
queue = queues[session['queue_key']]
job = queue.fetch_job(session['job_id'])
if job is None:
del session['job_id']
return jsonify({'error': 'Job does not exist'}), 404
try:
# Get the running job (if there is one)
job = dispatcher.get_running_job()
return jsonify({'url': url_for('api.job_status', job_id=job.get_id()),
'id': job.get_id()}), 202
return jsonify({'error': 'No jobs found for this session'}), 404
except JobNotFoundError as e:
return jsonify({'status': str(e)}), 404
@api.route('/jobs/clear/', methods=['POST'])
def clear_jobs():
if 'job_id' in session:
del session['job_id']
dispatcher.clear_old_jobs()
return jsonify({'success': 'All jobs successfully cleared!'})
@api.route('/jobs/new/', methods=['POST'])
def new_job():
# If there as an existing job, don't allow creation of new jobs
if 'job_id' in session:
queue = queues[session['queue_key']]
job = queue.fetch_job(session['job_id'])
parameters = request.get_json()
arguments = (slider, parameters)
if job is None:
# Probably an old job that's left over in the session
del session['job_id']
return jsonify({'error': 'Job does not exist'}), 404
try:
job = dispatcher.new_job(do_sliding_window_search, arguments)
return jsonify({'url': url_for('api.job_status', job_id=job.get_id()),
'id': job.get_id()}), 202
parameters = request.get_json()
# Choose a random queue from the three queues
key = random.choice(range(1, 4))
queue = queues[key]
# Don't let job go for longer than 10 minutes
job = queue.enqueue_call(func=do_sliding_window_search,
args=(slider, parameters),
timeout=600)
session['job_id'] = job.get_id()
session['queue_key'] = key
return jsonify({'url': url_for('api.job_status', job_id=job.get_id()),
'id': job.get_id()}), 202
except JobNotFoundError as e:
return jsonify({'error': str(e)}), 404
@api.route('/jobs/<job_id>')
def job_status(job_id):
if 'queue_key' in session:
queue = queues[session['queue_key']]
job = queue.fetch_job(job_id)
else:
for key, queue in queues.items():
if job_id in queue.job_ids:
job = queue.fetch_job(job_id)
if job is None:
return jsonify({'error': 'Invalid job ID - no such job exists on the queue.'}), 404
try:
job = dispatcher.get_job_by_id(job_id)
except JobNotFoundError as e:
return jsonify({'error': str(e)}), 404
state = job.get_status()
......@@ -133,7 +108,8 @@ def job_status(job_id):
'processed': job.meta.get('processed', 0),
'time': datetime.now(),
'status': job.meta.get('status', ''),
'error': [job.ended_at, job.exc_info, str(job.func_name), str(job.args)]
'error': [job.ended_at, job.exc_info,
str(job.func_name), str(job.args)]
}
status = 500
......@@ -169,4 +145,3 @@ def search(subreddit):
'delta': delta,
'submissions': submissions,
'count': len(submissions)})
from flask import session
from datetime import datetime
JOB_KEY = 'job_id'
QUEUE_KEY = 'queue_key'
class JobNotFoundError(Exception):
pass
class Dispatcher:
def __init__(self, queue_manager):
self.name = 'job-dispatcher'
self.session = session
self.queue_manager = queue_manager
def log(self, msg):
print('[{} ({})] {}'.format(datetime.now(),
self.name,
msg))
def is_job_running(self):
if JOB_KEY in session:
return True
def get_job_by_id(self, job_id):
if QUEUE_KEY in self.session:
key = self.session[QUEUE_KEY]
queue = self.queue_manager.get(key)
job = queue.fetch_job(job_id)
if job is None:
raise JobNotFoundError('Job with ID {} not found on the queue'
.format(job_id))
return job
else:
queues = self.queue_manager.queues
for key, queue in queues.items():
if job_id in queue.job_ids:
job = queue.fetch_job(job_id)
return job
raise JobNotFoundError('Job with ID {} not found on any queue'
.format(job_id))
def get_running_job(self):
if self.is_job_running():
# There is already a job running so fetch information
# about that job
key = self.session[QUEUE_KEY]
identifier = self.session[JOB_KEY]
try:
queue = self.queue_manager.get(key)
except KeyError as e:
self.log(str(e))
job = queue.fetch_job(identifier)
if job is None:
# Indicates an old job is still in the session
# but is no longer running, so we evict this job
# from the session
self.log('Evicting Job<{}> from session'.format(identifier))
self.session.pop(JOB_KEY, None)
self.session.pop(QUEUE_KEY, None)
raise JobNotFoundError('Job with ID {} was found in the ' +
'session but was not found in its ' +
'associated queue (Q<{}>).'
.format(identifier, key))
return job
raise JobNotFoundError('No jobs are currently running.')
def clear_old_jobs(self):
if self.is_job_running():
self.session.pop(JOB_KEY, None)
self.session.pop(QUEUE_KEY, None)
def new_job(self, func, args):
if self.is_job_running():
# There is already a job running so try to return it first
# (the job could be still around and need eviction from the
# session)
try:
return self.get_running_job()
except JobNotFoundError as e:
raise e
# Get a random queue to add the job to
key, queue = self.queue_manager.random()
job = queue.enqueue_call(func=func,
args=args,
timeout=600)
# Set session state so the job can be tracked
# across sessions
self.session[JOB_KEY] = job.get_id()
self.session[QUEUE_KEY] = key
return job
import random
class QueueManager:
def __init__(self, queues):
self.queues = queues
def get(self, key):
if key in self.queues:
return self.queues[key]
raise KeyError('Queue with key {} not found.'
.format(key))
def random(self):
count = len(self.queues)
key = random.choice(range(1, count+1))
return key, self.get(key)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment