Commit 6358d9ae authored by Jed Simson's avatar Jed Simson

Merge branch 'feature-improve-sliding-window-search' into develop

parents 6f9b1e10 977a8136
web: gunicorn wsgi:app
web: gunicorn wsgi:app.flask_app
worker: celery -A main.celery worker -l info
from flask import Blueprint, request, jsonify, render_template
from flask import (Blueprint, request, jsonify, render_template, url_for,
session)
from lib.slider import Slider
from tasks import do_sliding_window_search, q1, q2, q3, failed
from datetime import datetime
import time
import random
api = Blueprint('api', __name__, url_prefix='/api')
slider = Slider()
queues = {1: q1, 2: q2, 3: q3}
@api.route('/')
def index():
return jsonify({'name:': 'Reddit Slider',
'description': 'A simple program to allow greater filtering options for Reddit submissions.',
'version': 1.0})
@api.route('/jobs/')
def list_jobs():
if 'job_id' in session:
queue = queues[session['queue_key']]
job = queue.fetch_job(session['job_id'])
if job is None:
del session['job_id']
return jsonify({'error': 'Job does not exist'}), 404
return jsonify({'url': url_for('api.job_status', job_id=job.get_id()),
'id': job.get_id()}), 202
return jsonify({'error': 'No jobs found for this session'}), 404
@api.route('/jobs/clear/', methods=['POST'])
def clear_jobs():
if 'job_id' in session:
del session['job_id']
return jsonify({'success': 'All jobs successfully cleared!'})
@api.route('/jobs/new/', methods=['POST'])
def new_job():
# If there as an existing job, don't allow creation of new jobs
if 'job_id' in session:
queue = queues[session['queue_key']]
job = queue.fetch_job(session['job_id'])
if job is None:
# Probably an old job that's left over in the session
del session['job_id']
return jsonify({'error': 'Job does not exist'}), 404
return jsonify({'url': url_for('api.job_status', job_id=job.get_id()),
'id': job.get_id()}), 202
parameters = request.get_json()
# Choose a random queue from the three queues
key = random.choice(range(1, 4))
queue = queues[key]
# Don't let job go for longer than 10 minutes
job = queue.enqueue_call(func=do_sliding_window_search,
args=(slider, parameters),
timeout=600)
session['job_id'] = job.get_id()
session['queue_key'] = key
return jsonify({'url': url_for('api.job_status', job_id=job.get_id()),
'id': job.get_id()}), 202
@api.route('/jobs/<job_id>')
def job_status(job_id):
if 'queue_key' in session:
queue = queues[session['queue_key']]
job = queue.fetch_job(job_id)
else:
for key, queue in queues.items():
if job_id in queue.job_ids:
job = queue.fetch_job(job_id)
if job is None:
return jsonify({'error': 'Invalid job ID - no such job exists on the queue.'}), 404
state = job.get_status()
if state == 'queued':
# job did not start yet
response = {
'state': state,
'processed': 0,
'time': datetime.now(),
'status': 'Pending...'
}
status = 200
elif state == 'started':
response = {
'state': state,
'processed': job.meta.get('processed', 0),
'time': job.meta.get('time', datetime.now()),
'status': job.meta.get('status', '')
}
status = 200
elif state == 'finished':
response = {
'state': state,
'processed': job.result.get('processed', 0),
'time': job.result.get('time', datetime.now()),
'status': job.result.get('status', ''),
'submissions': job.result.get('submissions'),
'count': len(job.result.get('submissions'))
}
status = 200
if 'job_id' in session:
del session['job_id']
else:
# something went wrong in the background job
# Try to get some more information about the job failure
job = failed.fetch_job(job_id)
response = {
'state': state,
'processed': job.meta.get('processed', 0),
'time': datetime.now(),
'status': job.meta.get('status', ''),
'error': [job.ended_at, job.exc_info, str(job.func_name), str(job.args)]
}
status = 500
return jsonify(response), status
@api.route('/search/<subreddit>/')
def subreddit(subreddit):
def search(subreddit):
dimension = request.args.get('range', None)
delta = request.args.get('delta', None)
sort = request.args.get('sort', 'hot')
limit = int(request.args.get('limit', 100))
sliding_window = request.args.get('use_sliding_window', False)
sliding_window = True if sliding_window == 'true' else False
t0 = time.time()
......@@ -29,8 +154,7 @@ def subreddit(subreddit):
dimension,
delta,
sort,
limit,
sliding_window=sliding_window)
limit)
except ValueError as e:
t1 = time.time()
t = round(t1 - t0, 4)
......
from flask import Flask
from api.views import api
from slider.views import slider
def create_app():
app = Flask(__name__)
app.register_blueprint(api)
app.register_blueprint(slider)
return app
if __name__ == "__main__":
app = create_app()
app.run(host='0.0.0.0', port=5050, debug=True)
from flask import Flask
class Application(object):
def __init__(self, blueprints, config, debug=True):
self.flask_app = Flask(__name__)
self.blueprints = blueprints
self.debug = debug
self._configure_app(config)
self._register_blueprints()
def _register_blueprints(self):
for blueprint in self.blueprints:
self.flask_app.register_blueprint(blueprint)
def _configure_app(self, env):
self.flask_app.config.from_object(env)
# Override debug config
self.debug = self.flask_app.config['DEBUG']
def start_app(self):
self.flask_app.run(debug=self.debug)
import os
class Config(object):
DEBUG = False
SECRET_KEY = os.environ['SECRET_KEY']
CELERY_BROKER_URL = os.environ['CELERY_BROKER_URL']
CELERY_RESULT_BACKEND = os.environ['CELERY_RESULT_BACKEND']
CELERY_TRACK_STARTED = True
class ProductionConfig(Config):
DEBUG = False
class DevelopmentConfig(Config):
DEVELOPMENT = True
DEBUG = True
......@@ -2,14 +2,13 @@ from praw import Reddit
from praw.errors import HTTPException, InvalidSubreddit
from praw.helpers import submissions_between
from .util import top_sort, hot_sort, new_sort, controversial_sort
import time
class Search:
def __init__(self):
self.version = 0.1
self.version = '0.1.2'
self.sorts = ['hot', 'new', 'controversial', 'top']
# For conversion between broken reddit timestamps and unix timestamps
......@@ -25,16 +24,11 @@ class Search:
# To be sure, let's set the workaround offset to 2 hours
self.OUT_OF_ORDER_OFFSET = 7200
self.sort_fns = {'hot': hot_sort,
'top': top_sort,
'new': new_sort,
'controversial': controversial_sort}
def generate(self, subreddit, low_timestamp, high_timestamp,
limit=100, sort='hot', use_sliding_window=False):
limit=100, sort='hot', use_sliding_window=False):
if not sort in self.sorts:
raise ValueError('Invalid sort parameter `{}`... Please use one of {}'\
if sort not in self.sorts:
raise ValueError('Invalid sort parameter `{}`... Please use one of {}' \
.format(sort, self.sorts))
if limit > 1000 or use_sliding_window:
......@@ -64,7 +58,6 @@ class Search:
r = Reddit('Reddit Slider by /u/oracular_demon - v{}'.format(self.version))
backoff = self.BACKOFF_START
try:
t1 = low_timestamp
t2 = high_timestamp
......@@ -83,8 +76,8 @@ class Search:
raise ValueError('Invalid Subreddit provided... Subreddit probably does not exist.')
search_results = [s for s in results
if original_lowest_timestamp <= s.created and
s.created <= original_highest_timestamp]
if original_lowest_timestamp <= s.created and
s.created <= original_highest_timestamp]
# Give results as a generator
return (submission for submission in search_results)
......@@ -94,24 +87,11 @@ class Search:
# Create a reddit session to use
r = Reddit('Reddit Slider by /u/oracular_demon - v{}'.format(self.version))
processed = 0
generator = submissions_between(r, subreddit, low_timestamp, high_timestamp)
submissions = []
count = 0
while count <= limit:
try:
submission = next(generator)
count += 1
submissions.append(submission)
except StopIteration:
print('Submission generator has been exhausted: count = {}, limit = {}' \
.format(count, limit))
break
sorted_submissions = sorted(submissions, key=self.sort_fns[sort])
for submission in sorted_submissions:
for submission in generator:
yield submission
......@@ -120,7 +100,3 @@ class Search:
......@@ -56,12 +56,24 @@ class Slider:
print('Fetching {} `{}` submissions from /r/{} between {} and {}...'.format(limit, sort, subreddit, start, end))
try:
generator = search.generate(subreddit,
start,
end,
sort=sort,
limit=limit,
use_sliding_window=sliding_window)
if sliding_window:
def gen():
for s in search.generate(subreddit,
start,
end,
sort=sort,
limit=int(limit),
use_sliding_window=sliding_window):
yield s
return gen
else:
generator = search.generate(subreddit,
start,
end,
sort=sort,
limit=limit,
use_sliding_window=sliding_window)
except ValueError as e:
raise e
......
from application import Application
from api.views import api
from slider.views import slider
import os
config = os.environ['APP_SETTINGS']
blueprints = [api, slider]
app = Application(blueprints, config, debug=True)
from flask_script import Manager
from main import app
import os
import redis
r = redis.StrictRedis(host=os.environ['REDIS_HOST'],
port=os.environ['REDIS_PORT'],
db=os.environ['REDIS_DB'])
manager = Manager(app.flask_app)
tasks = Manager(usage='Manage Celery tasks')
manager.add_command("tasks", tasks)
@manager.command
def run():
'Run the Flask Application'
app.start_app()
@tasks.command
def list():
'List all Celery Tasks'
for i, key in enumerate(r.scan_iter("rq:job:*")):
print('[{}]\t{}'.format(i, str(key.decode('utf-8'))))
@tasks.command
def info(key):
'Get info for Celery Task by key'
value = r.get(key)
print('{} => {}'.format(key, str(value)))
@tasks.command
def delete(key):
'Delete the task specified by key'
r.delete(key)
print('{} successfully deleted!'.format(key))
@tasks.command
def delete_all():
'Delete all tasks'
deleted = 0
for key in r.scan_iter("rq:job:*"):
r.delete(str(key.decode('utf-8')))
deleted += 1
print('{} successfully deleted!'.format(key))
print('Successfully deleted {} tasks!'.format(deleted))
if __name__ == "__main__":
manager.run()
amqp==1.4.9
anyjson==0.3.3
billiard==3.3.0.23
click==6.6
decorator==4.0.10
eventlet==0.19.0
Flask==0.11.1
Flask-Script==2.0.5
Flask-SSE==0.2.1
gunicorn==19.6.0
itsdangerous==0.24
Jinja2==2.8
kombu==3.0.35
MarkupSafe==0.23
praw==3.5.0
pytz==2013b0
redis==2.10.5
requests==2.10.0
rq==0.6.0
six==1.10.0
timestring==1.6.2
update-checker==0.11
......
This diff is collapsed.
This source diff could not be displayed because it is too large. You can view the blob instead.
const ResultsPanel = React.createClass({
getInitialState: function() {
return {loading: false, submissions: [], info: '', showResults: false, hasError: false};
return {loading: false, submissions: [], info: '', showResults: false,
hasError: false, usingSlidingWindow: false};
},
isLoading: function() {
this.setState({
......@@ -19,6 +20,7 @@ const ResultsPanel = React.createClass({
},
render: function() {
let noSubmissions = this.state.submissions.length == 0;
let showInformation = this.state.usingSlidingWindow;
if (!noSubmissions) {
var submissions = this.state.submissions.map(function(submission) {
......@@ -36,7 +38,7 @@ const ResultsPanel = React.createClass({
<div className="card" id="results-card">
<div className="card-block">
<h4 className="card-title">Results</h4>
{!noSubmissions ? <div id="info" className="alert alert-info">{this.state.info}</div> : null}
{showInformation ? <div id="info" className="alert alert-info">{this.state.info}</div> : null}
</div>
<div className="card-block" style={{'padding-top': '0em'}}>
<div className="list-group" id="results">
......
......@@ -14,20 +14,27 @@ const SearchForm = React.createClass({
useSlidingWindow: false};
},
serializeQuery: function(params) {
if (this.state.useSlidingWindow) {
var params = {name: params.subreddit,
dimension: params.range,
delta: null,
sort: params.sort,
limit: params.limit,
sliding_window: true};
var payload = JSON.stringify(params);
return payload;
}
let query = params.subreddit + '/?range=' + params.range.split(' ').join('+') +
'&sort=' + params.sort + '&limit=' + params.limit;
query = this.state.useSlidingWindow ? query + '&use_sliding_window=true' : query;
return query;
},
fetchResults: function() {
console.log('Fetching results with query: ' + this.state.url + this.state.query);
if (this.state.params.subreddit === '') {
console.log('Empty subreddit');
}
this.refs.resultsPanel.isLoading();
$.ajax({
......@@ -76,11 +83,90 @@ const SearchForm = React.createClass({
}.bind(this));
},
submitSearchJob: function() {
this.refs.resultsPanel.isLoading();
this.refs.resultsPanel.setState({
usingSlidingWindow: true
});
$.ajax({
type: 'POST',
url: '/api/jobs/new/',
data: this.state.query,
dataType: 'json',
contentType: 'application/json'
}).fail(function(data) {
console.log(data);
}.bind(this)).then(function(data) {
let resultsPanel = this.refs.resultsPanel;
resultsPanel.setState({
info: 'Submitted search at ' + new Date().toUTCString()
});
(function poll() {
setTimeout(function() {
$.ajax({
url: '/api/jobs/' + data.id,
type: 'GET',
error: function(data) {
var json = data.responseJSON;
var body = 'An unexpected error occured while I was using the Reddit Slider Search' +
' application. Here is some information about the error: ';
if ('error' in json) {
body += '\nJob Ended: ' + json['error'][0];
body += '\nError Information: ' + json['error'][1];
body += '\nJob ID: ' + data.id;
body += '\nJob Function: ' + json['error'][2];
body += '\nJob Arguments: ' + json['error'][3];
}
var message = 'Processing failed for an unexpected reason. ' +
'Please contact the developer by email at jed.simson@gmail.com and give the following information: ';
resultsPanel.setState({
info: message + body,
loading: false
});
},
success: function(data) {
var message = 'Processed ' + data.processed +
' submissions at ' + data.time;
if ('submissions' in data) {
// A result has been given
resultsPanel.setState({
info: message,
submissions: data['submissions'],
loading: false
})
}
else {
resultsPanel.setState({
info: message
});
}
},
dataType: 'json',
complete: resultsPanel.state.loading ? poll: null,
timeout: 2000
});
}, 5000);
})(this);
}.bind(this));
},
handleSubmit: function(event) {
// Get the results and as a side-effect update the
// internal state
event.preventDefault();
this.fetchResults();
if (this.state.useSlidingWindow) {
this.submitSearchJob();
}
else {
this.fetchResults();
}
},
handleChange: function(event, oldValue, newValue) {
let newParams = this.state.params;
......@@ -98,11 +184,13 @@ const SearchForm = React.createClass({
});
}
else if (option.type == 'slidingWindow') {
this.setState({
useSlidingWindow: option.checked
});
this.setState({useSlidingWindow: option.checked}, function() {
var query = this.serializeQuery(this.state.params);
this.serializeQuery(this.state.params);
this.setState({
query: query
});
});
}
else {
console.log('Unexpected option changed...');
......
from rq import Queue, get_current_job, get_failed_queue
from datetime import datetime
import timestring
from worker import connection
from lib.sorts import top_sort, hot_sort, new_sort, controversial_sort
q1 = Queue('one', connection=connection)
q2 = Queue('two', connection=connection)
q3 = Queue('three', connection=connection)
failed = get_failed_queue(connection=connection)
sort_fns = {'hot': hot_sort,
'top': top_sort,
'new': new_sort,
'controversial': controversial_sort}
def do_sliding_window_search(instance, parameters):
job = get_current_job()
processed = 0
submissions = []
message = 'Searching for posts with parameters {}'.format(parameters)
for submission in instance.get_subreddit_posts(**parameters)():
processed += 1
submissions.append(submission)
job.set_status('started')
job.meta = {'processed': processed,
'status': message,
'time': str(datetime.now())}
job.save()
job.set_status('finished')
job.save()
sort, limit = parameters['sort'], int(parameters['limit'])
sorted_submissions = sorted(submissions, key=sort_fns[sort])
stop = min(limit, len(sorted_submissions))
submissions = []
for s in sorted_submissions[:stop]:
submission = {'info': str(s), 'timestamp': int(s.created_utc),
'href': s.permalink,
'date': timestring.Date(int(s.created_utc), tz='UTC')
.format('%Y/%m/%d %H:%M:%S (%Z)')}
submissions.append(submission)
return {'processed': processed, 'status': 'Task completed!',
'submissions': submissions, 'time': str(datetime.now())}
import os
import redis
from rq import Worker, Queue, Connection
from rq.timeouts import JobTimeoutException
import multiprocessing
listen = ['one', 'two', 'three']
redis_url = os.environ['REDIS_URL']
connection = redis.from_url(redis_url)
def timeout_exception_handler(job, exc_type, exc_value, traceback):
if issubclass(exc_type, JobTimeoutException):
print('Maximum timeout exceeded for {}'.format(job.get_id()))
print('Handling gracefully...')
return False
return True
if __name__ == '__main__':
with Connection(connection):
worker = Worker(list(map(Queue, listen)))
worker.push_exc_handler(timeout_exception_handler)
workers = []
for i in range(5):
p = multiprocessing.Process(target=worker.work)
workers.append(p)
p.start()
from app import create_app
app = create_app()
from main import app
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment