Commit c03f8452 authored by Jed Simson's avatar Jed Simson

Celery task queue working

parent df2314fc
web: gunicorn wsgi:app
web: gunicorn wsgi:app.flask_app
worker: celery -A main.celery worker -l info
from flask import Blueprint, request, jsonify, render_template
from flask import Blueprint, request, jsonify, render_template, url_for
from flask_socketio import SocketIO, emit
from lib.slider import Slider
import time
from datetime import datetime
api = Blueprint('api', __name__, url_prefix='/api')
slider = Slider()
socketio = SocketIO()
@api.route('/')
def index():
......@@ -21,27 +20,48 @@ def index():
def test():
return render_template('test.html')
@socketio.on('connect', namespace='/api')
def test_connect():
print('Received connect')
emit('response', {'data': 'Connected'})
@socketio.on('message event', namespace='/api')
def test_message(message):
print('Received message')
emit('response', {'data': 'Received message: ' + str(message['data'])})
@socketio.on('disconnect', namespace='/api')
def test_disconnect():
print('Received disconnect')
print('Client disconnected')
@socketio.on('search event', namespace='/api')
def search_event(subreddit):
pass
@api.route('/test/start/', methods=['POST'])
def start_window_search():
from main import do_sliding_window_search
task = do_sliding_window_search.apply_async(args=[slider])
return jsonify({'url': url_for('api.task_status', task_id=task.id),
'id': task.id}), 202
@api.route('/test/status/<task_id>')
def task_status(task_id):
from main import do_sliding_window_search
task = do_sliding_window_search.AsyncResult(task_id)
if task.state == 'PENDING':
# job did not start yet
response = {
'state': task.state,
'processed': 0,
'time': datetime.now(),
'status': 'Pending...'
}
elif task.state != 'FAILURE':
response = {
'state': task.state,
'processed': task.info.get('processed', 0),
'time': task.info.get('time', datetime.now()),
'status': task.info.get('status', '')
}
if 'submissions' in task.info:
response['submissions'] = task.info['submissions']
else:
# something went wrong in the background job
response = {
'state': task.state,
'processed': 'Unknown',
'time': datetime.now(),
'status': str(task.info), # this is the exception raised
}
return jsonify(response)
@api.route('/search/<subreddit>/')
def search(subreddit):
......
from flask import Flask
from api.views import api, socketio
from slider.views import slider
import os
def create_app():
app = Flask(__name__)
app.config['SECRET_KEY'] = os.environ['SECRET_KEY']
socketio.init_app(app)
app.register_blueprint(api)
app.register_blueprint(slider)
return app
if __name__ == "__main__":
app = create_app()
#app.run(host='0.0.0.0', port=5050, debug=True)
socketio.run(app)
from flask import Flask
from celery import Celery
import os
class Application(object):
def __init__(self, blueprints, config, debug=True):
self.flask_app = Flask(__name__)
self.blueprints = blueprints
self.debug = debug
self._configure_app(config)
self._register_blueprints()
def celery(self):
app = self.flask_app
celery = Celery(app.import_name, broker=app.config[
'CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
def _register_blueprints(self):
for blueprint in self.blueprints:
self.flask_app.register_blueprint(blueprint)
def _configure_app(self, env):
self.flask_app.config.from_object(env)
# Override debug config
self.debug = self.flask_app.config['DEBUG']
def start_app(self):
self.flask_app.run(debug=self.debug)
import os
class Config(object):
DEBUG = False
SECRET_KEY = os.environ['SECRET_KEY']
CELERY_BROKER_URL = os.environ['CELERY_BROKER_URL']
CELERY_RESULT_BACKEND = os.environ['CELERY_RESULT_BACKEND']
CELERY_TRACK_STARTED = True
class ProductionConfig(Config):
DEBUG = False
class DevelopmentConfig(Config):
DEVELOPMENT = True
DEBUG = True
......@@ -2,10 +2,15 @@ from praw import Reddit
from praw.errors import HTTPException, InvalidSubreddit
from praw.helpers import submissions_between
from celery import Celery
from .util import top_sort, hot_sort, new_sort, controversial_sort
import time
celery = Celery('searches', broker='redis://localhost:6379/0')
class Search:
def __init__(self):
......@@ -94,8 +99,17 @@ class Search:
# Create a reddit session to use
r = Reddit('Reddit Slider by /u/oracular_demon - v{}'.format(self.version))
processed = 0
generator = submissions_between(r, subreddit, low_timestamp, high_timestamp)
submissions = [submission for submission in generator]
submissions = []
for submission in generator:
yield submission
#submissions.append(submission)
#processed += 1
#print('{} processed'.format(processed))
'''
sorted_submissions = sorted(submissions, key=self.sort_fns[sort])
current = 0
......@@ -103,28 +117,7 @@ class Search:
while current < min(limit, len(sorted_submissions)):
current += 1
yield sorted_submissions[current]
'''
count = 0
while count <= limit:
try:
submission = next(generator)
count += 1
submissions.append(submission)
except StopIteration:
print('Submission generator has been exhausted: count = {}, limit = {}' \
.format(count, limit))
break
sorted_submissions = sorted(submissions, key=self.sort_fns[sort])
for submission in sorted_submissions:
yield submission
'''
......
......@@ -56,12 +56,29 @@ class Slider:
print('Fetching {} `{}` submissions from /r/{} between {} and {}...'.format(limit, sort, subreddit, start, end))
try:
generator = search.generate(subreddit,
start,
end,
sort=sort,
limit=limit,
use_sliding_window=sliding_window)
if sliding_window:
for s in search.generate(subreddit,
start,
end,
sort=sort,
limit=limit,
use_sliding_window=sliding_window):
submission = {'info': str(s), 'timestamp': int(s.created_utc),
'href': s.permalink,
'date': timestring.Date(int(s.created_utc), tz='UTC')\
.format('%Y/%m/%d %H:%M:%S (%Z)')}
yield submission
return
else:
generator = search.generate(subreddit,
start,
end,
sort=sort,
limit=limit,
use_sliding_window=sliding_window)
except ValueError as e:
raise e
......
from application import Application
from api.views import api
from slider.views import slider
import os
config = os.environ['APP_SETTINGS']
blueprints = [api, slider]
app = Application(blueprints, config, debug=True)
celery = app.celery()
from tasks import do_sliding_window_search
from flask.ext.script import Manager
from main import app
import os
import redis
r = redis.StrictRedis(host=os.environ['REDIS_HOST'],
port=os.environ['REDIS_PORT'],
db=os.environ['REDIS_DB'])
manager = Manager(app.flask_app)
tasks = Manager(usage='Manage Celery tasks')
manager.add_command("tasks", tasks)
@manager.command
def run():
'Run the Flask Application'
app.start_app()
@tasks.command
def list():
'List all Celery Tasks'
for i, key in enumerate(r.scan_iter("celery-*")):
print('[{}]\t{}'.format(i, str(key.decode('utf-8'))))
@tasks.command
def info(key):
'Get info for Celery Task by key'
value = r.get(key)
print('{} => {}'.format(key, str(value)))
@tasks.command
def delete(key):
'Delete the task specified by key'
r.delete(key)
print('{} successfully deleted!'.format(key))
@tasks.command
def delete_all():
'Delete all tasks'
deleted = 0
for key in r.scan_iter("celery-*"):
r.delete(str(key.decode('utf-8')))
deleted += 1
print('{} successfully deleted!'.format(key))
print('Successfully deleted {} tasks!'.format(deleted))
if __name__ == "__main__":
manager.run()
amqp==1.4.9
anyjson==0.3.3
billiard==3.3.0.23
celery==3.1.23
click==6.6
decorator==4.0.10
eventlet==0.19.0
Flask==0.11.1
Flask-Script==2.0.5
Flask-SSE==0.2.1
gunicorn==19.6.0
itsdangerous==0.24
Jinja2==2.8
kombu==3.0.35
MarkupSafe==0.23
praw==3.5.0
pytz==2013b0
redis==2.10.5
requests==2.10.0
six==1.10.0
timestring==1.6.2
......
from main import celery
from datetime import datetime
@celery.task(bind=True)
def do_sliding_window_search(self, instance):
parameters = {'name': 'askreddit', 'dimension': 'yesterday to now',
'delta': None, 'sort': 'top', 'limit': 1500,
'sliding_window': True}
processed = 0
submissions = []
message = 'Searching for posts with parameters {}'.format(parameters)
for submission in instance.get_subreddit_posts(**parameters):
processed += 1
submissions.append(submission)
self.update_state(state='SEARCHING',
meta={'processed': processed,
'status': message,
'time': str(datetime.now())})
return {'processed': processed, 'status': 'Task completed!',
'submissions': submissions}
<!DOCTYPE HTML>
<!DOCTYPE html>
<html>
<head>
<title>Flask-SocketIO Test</title>
<script type="text/javascript" src="//code.jquery.com/jquery-1.4.2.min.js"></script>
<script type="text/javascript" src="//cdnjs.cloudflare.com/ajax/libs/socket.io/1.3.5/socket.io.min.js"></script>
<script type="text/javascript" charset="utf-8">
$(document).ready(function(){
namespace = '/api'; // change to an empty string to use the global namespace
// the socket.io documentation recommends sending an explicit package upon connection
// this is specially important when using the global namespace
var socket = io.connect('http://' + document.domain + ':' + location.port + namespace);
// event handler for server sent data
// the data is displayed in the "Received" section of the page
socket.on('response', function(msg) {
$('#log').append('<br>' + $('<div/>').text('Received #' + msg.count + ': ' + msg.data).html());
});
<title>Celery Test</title>
<script src="http://ajax.googleapis.com/ajax/libs/jquery/1.7.1/jquery.min.js"></script>
</head>
<body>
<h1>Celery Test</h1>
// event handler for new connections
socket.on('connect', function() {
socket.emit('message event', {data: 'I\'m connected!'});
});
<button id="search">Search!</button>
// handlers for the different forms in the page
// these send data to the server in a variety of ways
$('form#emit').submit(function(event) {
console.log('Emit');
socket.emit('message event', {data: $('#emit_data').val()});
return false;
});
<p>Check Job Status:<input type="text" id="job-status"/></p>
<pre id="out"></pre>
$('form#disconnect').submit(function(event) {
socket.emit('disconnect');
return false;
});
});
</script>
</head>
<script>
$(document).ready(function() {
$('#search').on('click', function() {
$.post('/api/test/start/', function(data) {
$('#out').html('<a href="' + data.url + '">Task ' + data.id + '</a>');
});
});
<body>
<h2>Send:</h2>
<form id="emit" method="POST" action='#'>
<input type="text" name="emit_data" id="emit_data" placeholder="Message">
<input type="submit" value="Echo">
</form>
$('#job-status').on('keydown', function(e) {
if (e.which == 13) {
var id = $('#job-status').val();
<form id="disconnect" method="POST" action="#">
<input type="submit" value="Disconnect">
</form>
$.get('/api/test/status/' + id, function(data) {
$('#out').html('<a href="/api/test/status' + id + '/">Task ' + id + ' | State: ' + data.state + '</a>');
});
<h2>Receive:</h2>
<div id="log"></div>
e.preventDefault();
}
});
});
</script>
</body>
</html>
from app import create_app
app = create_app()
from main import app
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment