Commit 55da8b0b authored by Jed Simson's avatar Jed Simson

Implement multithreaded job scheduling

parent 304ea4e3
......@@ -2,14 +2,17 @@ from flask import (Blueprint, request, jsonify, render_template, url_for,
session)
from lib.slider import Slider
from tasks import do_sliding_window_search, queue
from tasks import do_sliding_window_search, q1, q2, q3, failed
from datetime import datetime
import time
import random
api = Blueprint('api', __name__, url_prefix='/api')
slider = Slider()
queues = {1: q1, 2: q2, 3: q3}
@api.route('/')
def index():
......@@ -18,23 +21,20 @@ def index():
'version': 1.0})
@api.route('/test/')
def test():
return render_template('test.html')
@api.route('/jobs/')
def list_jobs():
if 'job_id' in session:
queue = queues[session['queue_key']]
job = queue.fetch_job(session['job_id'])
if job is None:
return jsonify({'error': 'Job does not exist'})
del session['job_id']
return jsonify({'error': 'Job does not exist'}), 404
return jsonify({'url': url_for('api.job_status', job_id=job.get_id()),
'id': job.get_id()}), 202
return jsonify({'error': 'No jobs for this session'})
return jsonify({'error': 'No jobs found for this session'}), 404
@api.route('/jobs/clear/', methods=['POST'])
......@@ -49,19 +49,30 @@ def clear_jobs():
def new_job():
# If there as an existing job, don't allow creation of new jobs
if 'job_id' in session:
queue = queues[session['queue_key']]
job = queue.fetch_job(session['job_id'])
if job is None:
# Probably an old job that's left over in the session
del session['job_id']
return jsonify({'error': 'Job does not exist'}), 404
return jsonify({'url': url_for('api.job_status', job_id=job.get_id()),
'id': job.get_id()}), 202
parameters = request.get_json()
# Choose a random queue from the three queues
key = random.choice(range(1, 4))
queue = queues[key]
# Don't let job go for longer than 10 minutes
job = queue.enqueue_call(func=do_sliding_window_search,
args=(slider, parameters),
timeout=600)
session['job_id'] = job.get_id()
session['queue_key'] = key
return jsonify({'url': url_for('api.job_status', job_id=job.get_id()),
'id': job.get_id()}), 202
......@@ -69,10 +80,16 @@ def new_job():
@api.route('/jobs/<job_id>')
def job_status(job_id):
job = queue.fetch_job(job_id)
if 'queue_key' in session:
queue = queues[session['queue_key']]
job = queue.fetch_job(job_id)
else:
for key, queue in queues.items():
if job_id in queue.job_ids:
job = queue.fetch_job(job_id)
if job is None:
return jsonify({'error': 'Invalid job ID - no such job exists on the queue.'})
return jsonify({'error': 'Invalid job ID - no such job exists on the queue.'}), 404
state = job.get_status()
......@@ -84,6 +101,7 @@ def job_status(job_id):
'time': datetime.now(),
'status': 'Pending...'
}
status = 200
elif state == 'started':
response = {
'state': state,
......@@ -91,27 +109,35 @@ def job_status(job_id):
'time': job.meta.get('time', datetime.now()),
'status': job.meta.get('status', '')
}
status = 200
elif state == 'finished':
response = {
'state': state,
'processed': job.result.get('processed', 0),
'time': job.result.get('time', datetime.now()),
'status': job.result.get('status', ''),
'submissions': job.result.get('submissions')
'submissions': job.result.get('submissions'),
'count': len(job.result.get('submissions'))
}
status = 200
if 'job_id' in session:
del session['job_id']
else:
# something went wrong in the background job
# Try to get some more information about the job failure
job = failed.fetch_job(job_id)
response = {
'state': state,
'processed': job.meta.get('processed', 0),
'time': datetime.now(),
'status': job.meta.get('status', '')
'status': job.meta.get('status', ''),
'error': [job.ended_at, job.exc_info, str(job.func_name), str(job.args)]
}
status = 500
return jsonify(response)
return jsonify(response), status
@api.route('/search/<subreddit>/')
......
......@@ -2,19 +2,13 @@ from praw import Reddit
from praw.errors import HTTPException, InvalidSubreddit
from praw.helpers import submissions_between
from celery import Celery
from .util import top_sort, hot_sort, new_sort, controversial_sort
import time
celery = Celery('searches', broker='redis://localhost:6379/0')
class Search:
def __init__(self):
self.version = 0.1
self.version = '0.1.2'
self.sorts = ['hot', 'new', 'controversial', 'top']
# For conversion between broken reddit timestamps and unix timestamps
......@@ -30,16 +24,11 @@ class Search:
# To be sure, let's set the workaround offset to 2 hours
self.OUT_OF_ORDER_OFFSET = 7200
self.sort_fns = {'hot': hot_sort,
'top': top_sort,
'new': new_sort,
'controversial': controversial_sort}
def generate(self, subreddit, low_timestamp, high_timestamp,
limit=100, sort='hot', use_sliding_window=False):
limit=100, sort='hot', use_sliding_window=False):
if not sort in self.sorts:
raise ValueError('Invalid sort parameter `{}`... Please use one of {}'\
if sort not in self.sorts:
raise ValueError('Invalid sort parameter `{}`... Please use one of {}' \
.format(sort, self.sorts))
if limit > 1000 or use_sliding_window:
......@@ -69,7 +58,6 @@ class Search:
r = Reddit('Reddit Slider by /u/oracular_demon - v{}'.format(self.version))
backoff = self.BACKOFF_START
try:
t1 = low_timestamp
t2 = high_timestamp
......@@ -88,8 +76,8 @@ class Search:
raise ValueError('Invalid Subreddit provided... Subreddit probably does not exist.')
search_results = [s for s in results
if original_lowest_timestamp <= s.created and
s.created <= original_highest_timestamp]
if original_lowest_timestamp <= s.created and
s.created <= original_highest_timestamp]
# Give results as a generator
return (submission for submission in search_results)
......@@ -105,21 +93,6 @@ class Search:
for submission in generator:
yield submission
#submissions.append(submission)
#processed += 1
#print('{} processed'.format(processed))
'''
sorted_submissions = sorted(submissions, key=self.sort_fns[sort])
current = 0
while current < min(limit, len(sorted_submissions)):
current += 1
yield sorted_submissions[current]
'''
......
......@@ -25,7 +25,6 @@ class Slider:
pass
def get_subreddit_posts(self, name, dimension, delta, sort, limit, sliding_window=False):
print('GETTING SUBREDDIT POSTS')
subreddit = name
# Can't have a dimension and a delta
......@@ -63,15 +62,9 @@ class Slider:
start,
end,
sort=sort,
limit=limit,
limit=int(limit),
use_sliding_window=sliding_window):
submission = {'info': str(s), 'timestamp': int(s.created_utc),
'href': s.permalink,
'date': timestring.Date(int(s.created_utc), tz='UTC')\
.format('%Y/%m/%d %H:%M:%S (%Z)')}
yield submission
yield s
return gen
else:
......
This diff is collapsed.
This source diff could not be displayed because it is too large. You can view the blob instead.
const ResultsPanel = React.createClass({
getInitialState: function() {
return {loading: false, submissions: [], info: '', showResults: false, hasError: false};
return {loading: false, submissions: [], info: '', showResults: false,
hasError: false, usingSlidingWindow: false};
},
isLoading: function() {
this.setState({
......@@ -19,6 +20,7 @@ const ResultsPanel = React.createClass({
},
render: function() {
let noSubmissions = this.state.submissions.length == 0;
let showInformation = this.state.usingSlidingWindow;
if (!noSubmissions) {
var submissions = this.state.submissions.map(function(submission) {
......@@ -36,7 +38,7 @@ const ResultsPanel = React.createClass({
<div className="card" id="results-card">
<div className="card-block">
<h4 className="card-title">Results</h4>
{!noSubmissions ? <div id="info" className="alert alert-info">{this.state.info}</div> : null}
{showInformation ? <div id="info" className="alert alert-info">{this.state.info}</div> : null}
</div>
<div className="card-block" style={{'padding-top': '0em'}}>
<div className="list-group" id="results">
......
......@@ -14,20 +14,27 @@ const SearchForm = React.createClass({
useSlidingWindow: false};
},
serializeQuery: function(params) {
if (this.state.useSlidingWindow) {
var params = {name: params.subreddit,
dimension: params.range,
delta: null,
sort: params.sort,
limit: params.limit,
sliding_window: true};
var payload = JSON.stringify(params);
return payload;
}
let query = params.subreddit + '/?range=' + params.range.split(' ').join('+') +
'&sort=' + params.sort + '&limit=' + params.limit;
query = this.state.useSlidingWindow ? query + '&use_sliding_window=true' : query;
return query;
},
fetchResults: function() {
console.log('Fetching results with query: ' + this.state.url + this.state.query);
if (this.state.params.subreddit === '') {
console.log('Empty subreddit');
}
this.refs.resultsPanel.isLoading();
$.ajax({
......@@ -76,11 +83,90 @@ const SearchForm = React.createClass({
}.bind(this));
},
submitSearchJob: function() {
this.refs.resultsPanel.isLoading();
this.refs.resultsPanel.setState({
usingSlidingWindow: true
});
$.ajax({
type: 'POST',
url: '/api/jobs/new/',
data: this.state.query,
dataType: 'json',
contentType: 'application/json'
}).fail(function(data) {
console.log(data);
}.bind(this)).then(function(data) {
let resultsPanel = this.refs.resultsPanel;
resultsPanel.setState({
info: 'Submitted search at ' + new Date().toUTCString()
});
(function poll() {
setTimeout(function() {
$.ajax({
url: '/api/jobs/' + data.id,
type: 'GET',
error: function(data) {
var json = data.responseJSON;
var body = 'An unexpected error occured while I was using the Reddit Slider Search' +
' application. Here is some information about the error: ';
if ('error' in json) {
body += '\nJob Ended: ' + json['error'][0];
body += '\nError Information: ' + json['error'][1];
body += '\nJob ID: ' + data.id;
body += '\nJob Function: ' + json['error'][2];
body += '\nJob Arguments: ' + json['error'][3];
}
var message = 'Processing failed for an unexpected reason. ' +
'Please contact the developer by email at jed.simson@gmail.com and give the following information: ';
resultsPanel.setState({
info: message + body,
loading: false
});
},
success: function(data) {
var message = 'Processed ' + data.processed +
' submissions at ' + data.time;
if ('submissions' in data) {
// A result has been given
resultsPanel.setState({
info: message,
submissions: data['submissions'],
loading: false
})
}
else {
resultsPanel.setState({
info: message
});
}
},
dataType: 'json',
complete: resultsPanel.state.loading ? poll: null,
timeout: 2000
});
}, 5000);
})(this);
}.bind(this));
},
handleSubmit: function(event) {
// Get the results and as a side-effect update the
// internal state
event.preventDefault();
this.fetchResults();
if (this.state.useSlidingWindow) {
this.submitSearchJob();
}
else {
this.fetchResults();
}
},
handleChange: function(event, oldValue, newValue) {
let newParams = this.state.params;
......@@ -98,11 +184,13 @@ const SearchForm = React.createClass({
});
}
else if (option.type == 'slidingWindow') {
this.setState({
useSlidingWindow: option.checked
});
this.setState({useSlidingWindow: option.checked}, function() {
var query = this.serializeQuery(this.state.params);
this.serializeQuery(this.state.params);
this.setState({
query: query
});
});
}
else {
console.log('Unexpected option changed...');
......
from rq import Queue, get_current_job
from rq import Queue, get_current_job, get_failed_queue
from datetime import datetime
import timestring
from worker import connection
queue = Queue(connection=connection)
from lib.sorts import top_sort, hot_sort, new_sort, controversial_sort
q1 = Queue('one', connection=connection)
q2 = Queue('two', connection=connection)
q3 = Queue('three', connection=connection)
failed = get_failed_queue(connection=connection)
sort_fns = {'hot': hot_sort,
'top': top_sort,
'new': new_sort,
'controversial': controversial_sort}
def do_sliding_window_search(instance, parameters):
......@@ -24,5 +36,17 @@ def do_sliding_window_search(instance, parameters):
job.set_status('finished')
job.save()
sort, limit = parameters['sort'], int(parameters['limit'])
sorted_submissions = sorted(submissions, key=sort_fns[sort])
stop = min(limit, len(sorted_submissions))
submissions = []
for s in sorted_submissions[:stop]:
submission = {'info': str(s), 'timestamp': int(s.created_utc),
'href': s.permalink,
'date': timestring.Date(int(s.created_utc), tz='UTC')
.format('%Y/%m/%d %H:%M:%S (%Z)')}
submissions.append(submission)
return {'processed': processed, 'status': 'Task completed!',
'submissions': submissions, 'time': str(datetime.now())}
<!DOCTYPE html>
<html>
<head>
<title>Queue Test</title>
<script src="http://ajax.googleapis.com/ajax/libs/jquery/1.7.1/jquery.min.js"></script>
</head>
<body>
<h1>Queue Test</h1>
<button id="new">New Job</button>
<button id="clear">Clear Jobs</button>
<pre id="out"></pre>
<div id="status"></div>
<script>
$(document).ready(function() {
$('#new').on('click', function() {
var params = {name: 'askreddit', dimension: 'yesterday to now',
delta: null, sort: 'top', limit: 1500,
sliding_window: true};
$.ajax({
type: 'POST',
url: '/api/jobs/new/',
data: JSON.stringify(params),
contentType: 'application/json',
success: function(data) {
$('#out').html('<a href="' + data.url + '">' + data.id + '</a>');
(function poll() {
setTimeout(function() {
$.ajax({
url: '/api/jobs/' + data.id,
type: "GET",
success: function(data) {
console.log(JSON.stringify(data));
$('#status').html('<p>' + JSON.stringify(data) + '</p>');
},
dataType: "json",
complete: poll,
timeout: 2000
});
}, 5000);
})();
}
});
});
$('#clear').on('click', function() {
$.post('/api/jobs/clear/', function(data) {
console.log(data);
});
});
});
</script>
</body>
</html>
......@@ -3,7 +3,9 @@ import redis
from rq import Worker, Queue, Connection
from rq.timeouts import JobTimeoutException
listen = ['default']
import multiprocessing
listen = ['one', 'two', 'three']
redis_url = os.environ['REDIS_URL']
connection = redis.from_url(redis_url)
......@@ -18,6 +20,11 @@ def timeout_exception_handler(job, exc_type, exc_value, traceback):
if __name__ == '__main__':
with Connection(connection):
worker = Worker(list(map(Queue, listen)),
exception_handlers=timeout_exception_handler)
worker.work()
worker = Worker(list(map(Queue, listen)))
worker.push_exc_handler(timeout_exception_handler)
workers = []
for i in range(5):
p = multiprocessing.Process(target=worker.work)
workers.append(p)
p.start()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment