Commit 304ea4e3 authored by Jed Simson's avatar Jed Simson

Add RQ job queueing

parent c03f8452
from flask import Blueprint, request, jsonify, render_template, url_for
from flask import (Blueprint, request, jsonify, render_template, url_for,
session)
from lib.slider import Slider
from tasks import do_sliding_window_search, queue
from datetime import datetime
import time
api = Blueprint('api', __name__, url_prefix='/api')
slider = Slider()
......@@ -16,53 +17,103 @@ def index():
'description': 'A simple program to allow greater filtering options for Reddit submissions.',
'version': 1.0})
@api.route('/test/')
def test():
return render_template('test.html')
@api.route('/test/start/', methods=['POST'])
def start_window_search():
from main import do_sliding_window_search
task = do_sliding_window_search.apply_async(args=[slider])
@api.route('/jobs/')
def list_jobs():
if 'job_id' in session:
job = queue.fetch_job(session['job_id'])
if job is None:
return jsonify({'error': 'Job does not exist'})
return jsonify({'url': url_for('api.job_status', job_id=job.get_id()),
'id': job.get_id()}), 202
return jsonify({'error': 'No jobs for this session'})
@api.route('/jobs/clear/', methods=['POST'])
def clear_jobs():
if 'job_id' in session:
del session['job_id']
return jsonify({'success': 'All jobs successfully cleared!'})
@api.route('/jobs/new/', methods=['POST'])
def new_job():
# If there as an existing job, don't allow creation of new jobs
if 'job_id' in session:
job = queue.fetch_job(session['job_id'])
return jsonify({'url': url_for('api.task_status', task_id=task.id),
'id': task.id}), 202
return jsonify({'url': url_for('api.job_status', job_id=job.get_id()),
'id': job.get_id()}), 202
@api.route('/test/status/<task_id>')
def task_status(task_id):
from main import do_sliding_window_search
parameters = request.get_json()
task = do_sliding_window_search.AsyncResult(task_id)
# Don't let job go for longer than 10 minutes
job = queue.enqueue_call(func=do_sliding_window_search,
args=(slider, parameters),
timeout=600)
if task.state == 'PENDING':
session['job_id'] = job.get_id()
return jsonify({'url': url_for('api.job_status', job_id=job.get_id()),
'id': job.get_id()}), 202
@api.route('/jobs/<job_id>')
def job_status(job_id):
job = queue.fetch_job(job_id)
if job is None:
return jsonify({'error': 'Invalid job ID - no such job exists on the queue.'})
state = job.get_status()
if state == 'queued':
# job did not start yet
response = {
'state': task.state,
'state': state,
'processed': 0,
'time': datetime.now(),
'status': 'Pending...'
}
elif task.state != 'FAILURE':
elif state == 'started':
response = {
'state': state,
'processed': job.meta.get('processed', 0),
'time': job.meta.get('time', datetime.now()),
'status': job.meta.get('status', '')
}
elif state == 'finished':
response = {
'state': task.state,
'processed': task.info.get('processed', 0),
'time': task.info.get('time', datetime.now()),
'status': task.info.get('status', '')
'state': state,
'processed': job.result.get('processed', 0),
'time': job.result.get('time', datetime.now()),
'status': job.result.get('status', ''),
'submissions': job.result.get('submissions')
}
if 'submissions' in task.info:
response['submissions'] = task.info['submissions']
if 'job_id' in session:
del session['job_id']
else:
# something went wrong in the background job
response = {
'state': task.state,
'processed': 'Unknown',
'state': state,
'processed': job.meta.get('processed', 0),
'time': datetime.now(),
'status': str(task.info), # this is the exception raised
'status': job.meta.get('status', '')
}
return jsonify(response)
@api.route('/search/<subreddit>/')
def search(subreddit):
dimension = request.args.get('range', None)
......
from flask import Flask
from celery import Celery
import os
class Application(object):
......@@ -11,26 +10,6 @@ class Application(object):
self._configure_app(config)
self._register_blueprints()
def celery(self):
app = self.flask_app
celery = Celery(app.import_name, broker=app.config[
'CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
def _register_blueprints(self):
for blueprint in self.blueprints:
self.flask_app.register_blueprint(blueprint)
......
......@@ -25,6 +25,7 @@ class Slider:
pass
def get_subreddit_posts(self, name, dimension, delta, sort, limit, sliding_window=False):
print('GETTING SUBREDDIT POSTS')
subreddit = name
# Can't have a dimension and a delta
......@@ -57,21 +58,22 @@ class Slider:
try:
if sliding_window:
for s in search.generate(subreddit,
start,
end,
sort=sort,
limit=limit,
use_sliding_window=sliding_window):
submission = {'info': str(s), 'timestamp': int(s.created_utc),
'href': s.permalink,
'date': timestring.Date(int(s.created_utc), tz='UTC')\
.format('%Y/%m/%d %H:%M:%S (%Z)')}
yield submission
return
def gen():
for s in search.generate(subreddit,
start,
end,
sort=sort,
limit=limit,
use_sliding_window=sliding_window):
submission = {'info': str(s), 'timestamp': int(s.created_utc),
'href': s.permalink,
'date': timestring.Date(int(s.created_utc), tz='UTC')\
.format('%Y/%m/%d %H:%M:%S (%Z)')}
yield submission
return gen
else:
generator = search.generate(subreddit,
start,
......
......@@ -9,8 +9,3 @@ config = os.environ['APP_SETTINGS']
blueprints = [api, slider]
app = Application(blueprints, config, debug=True)
celery = app.celery()
from tasks import do_sliding_window_search
from flask.ext.script import Manager
from flask_script import Manager
from main import app
......@@ -24,7 +24,7 @@ def run():
def list():
'List all Celery Tasks'
for i, key in enumerate(r.scan_iter("celery-*")):
for i, key in enumerate(r.scan_iter("rq:job:*")):
print('[{}]\t{}'.format(i, str(key.decode('utf-8'))))
......@@ -51,7 +51,7 @@ def delete_all():
'Delete all tasks'
deleted = 0
for key in r.scan_iter("celery-*"):
for key in r.scan_iter("rq:job:*"):
r.delete(str(key.decode('utf-8')))
deleted += 1
print('{} successfully deleted!'.format(key))
......
from main import celery
from rq import Queue, get_current_job
from datetime import datetime
from worker import connection
queue = Queue(connection=connection)
@celery.task(bind=True)
def do_sliding_window_search(self, instance):
parameters = {'name': 'askreddit', 'dimension': 'yesterday to now',
'delta': None, 'sort': 'top', 'limit': 1500,
'sliding_window': True}
def do_sliding_window_search(instance, parameters):
job = get_current_job()
processed = 0
submissions = []
message = 'Searching for posts with parameters {}'.format(parameters)
for submission in instance.get_subreddit_posts(**parameters):
for submission in instance.get_subreddit_posts(**parameters)():
processed += 1
submissions.append(submission)
self.update_state(state='SEARCHING',
meta={'processed': processed,
'status': message,
'time': str(datetime.now())})
job.set_status('started')
job.meta = {'processed': processed,
'status': message,
'time': str(datetime.now())}
job.save()
job.set_status('finished')
job.save()
return {'processed': processed, 'status': 'Task completed!',
'submissions': submissions}
'submissions': submissions, 'time': str(datetime.now())}
<!DOCTYPE html>
<html>
<head>
<title>Celery Test</title>
<title>Queue Test</title>
<script src="http://ajax.googleapis.com/ajax/libs/jquery/1.7.1/jquery.min.js"></script>
</head>
<body>
<h1>Celery Test</h1>
<h1>Queue Test</h1>
<button id="search">Search!</button>
<button id="new">New Job</button>
<button id="clear">Clear Jobs</button>
<p>Check Job Status:<input type="text" id="job-status"/></p>
<pre id="out"></pre>
<div id="status"></div>
<script>
$(document).ready(function() {
$('#search').on('click', function() {
$.post('/api/test/start/', function(data) {
$('#out').html('<a href="' + data.url + '">Task ' + data.id + '</a>');
$('#new').on('click', function() {
var params = {name: 'askreddit', dimension: 'yesterday to now',
delta: null, sort: 'top', limit: 1500,
sliding_window: true};
$.ajax({
type: 'POST',
url: '/api/jobs/new/',
data: JSON.stringify(params),
contentType: 'application/json',
success: function(data) {
$('#out').html('<a href="' + data.url + '">' + data.id + '</a>');
(function poll() {
setTimeout(function() {
$.ajax({
url: '/api/jobs/' + data.id,
type: "GET",
success: function(data) {
console.log(JSON.stringify(data));
$('#status').html('<p>' + JSON.stringify(data) + '</p>');
},
dataType: "json",
complete: poll,
timeout: 2000
});
}, 5000);
})();
}
});
});
$('#job-status').on('keydown', function(e) {
if (e.which == 13) {
var id = $('#job-status').val();
});
$.get('/api/test/status/' + id, function(data) {
$('#out').html('<a href="/api/test/status' + id + '/">Task ' + id + ' | State: ' + data.state + '</a>');
});
e.preventDefault();
}
$('#clear').on('click', function() {
$.post('/api/jobs/clear/', function(data) {
console.log(data);
});
});
});
</script>
......
import os
import redis
from rq import Worker, Queue, Connection
from rq.timeouts import JobTimeoutException
listen = ['default']
redis_url = os.environ['REDIS_URL']
connection = redis.from_url(redis_url)
def timeout_exception_handler(job, exc_type, exc_value, traceback):
if issubclass(exc_type, JobTimeoutException):
print('Maximum timeout exceeded for {}'.format(job.get_id()))
print('Handling gracefully...')
return False
return True
if __name__ == '__main__':
with Connection(connection):
worker = Worker(list(map(Queue, listen)),
exception_handlers=timeout_exception_handler)
worker.work()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment