Commit 3d9a644d authored by Johan Bloemberg's avatar Johan Bloemberg

Add queue stats to task processing status, explicitly declare priority queues...

Add queue stats to task processing status, explicitly declare priority queues and remove '0' priority to clearly have every internal redis queue communicate priority number. Add alerts to tp status.
parent a97dcb6d
Pipeline #16103057 passed with stages
in 9 minutes and 52 seconds
......@@ -7,27 +7,14 @@ from jet.dashboard import dashboard
from failmap.map.rating import rebuild_ratings as rebuild_ratings_task
from ..celery import PRIO_HIGH, app
from ..celery import PRIO_HIGH, status
from .models import Job
def task_processing_status(request):
"""Return a JSON object with current status of task processing."""
inspect = app.control.inspect()
# query workforce statistics using control.inspect API and extract some relevant data from it
stats = inspect.stats() or {}
active = inspect.active()
workers = [{
'name': worker_name,
'tasks_processed': sum(worker_stats['total'].values()),
'tasks_active': len(active[worker_name]),
} for worker_name, worker_stats in stats.items()]
status = {'workers': workers}
return JsonResponse(status)
return JsonResponse(status())
dashboard.urls.register_urls([
......
......@@ -5,29 +5,54 @@
<div id="task_processing_status">
<a href="#" v-on:click="update" class="float-right reset-dashboard-link-icon icon-reset" style="padding:8px 10px;"></a>
<div v-if="loading" class="float-right"><ul><li>Loading</li></ul></div>
<div v-if="workers">
<div v-if="alerts.length">
<ul>
<li class="contrast">Workers</li>
<li v-for="worker in workers" v-if="workers.length">
<p>{{ worker.name }}</p>
<ul>
<li>total processed tasks {{worker.tasks_processed}}</li>
<li>currently processing {{worker.tasks_active}}</li>
</ul>
</li>
<li v-if="!workers.length">No active workers!</li>
<li class="contrast">Alerts</li>
<li v-for="alert in alerts">{{alert}}</li>
</ul>
</div>
<ul>
<li class="contrast">Workers</li>
<li v-for="worker in workers">
<p>{{ worker.name }}</p>
<ul>
<li>subscribed queues: <span v-for="queue in worker.queues">{{queue}} </span></li>
<li>prefetched tasks: {{worker.tasks_reserved}}/{{worker.prefetch_count}}</li>
<li>running tasks: {{worker.tasks_active}}/{{worker.concurrency}}</li>
<li>total processed tasks: {{worker.tasks_processed}}</li>
</ul>
</li>
<div v-if="!workers.length"><li>-</li></div>
</ul>
<ul>
<li class="contrast">Queues</li>
<li v-for="queue in queues">
<p>{{ queue.name }}</p>
<ul>
<li>pending tasks: {{queue.tasks_pending}}</li>
</ul>
</li>
<div v-if="!queues.length"><li>-</li></div>
</ul>
</div>
<script type="text/javascript">
var vm = new Vue({
el: '#task_processing_status',
data: {
"loading": true,
"workers": null,
"loading": false,
"workers": [],
"alerts": [],
"queues": []
},
mounted () {
// update from API
this.update();
// periodic poll update
setInterval(function () {
this.update();
}.bind(this), 10000);
},
mounted () { this.update();},
methods: {
update (){
this.loading = true;
......@@ -39,6 +64,8 @@
success (data){
self.loading = false;
self.workers = data.workers;
self.alerts = data.alerts;
self.queues = data.queues;
},
error (error){
console.error('failed to load task processing status', error);
......
......@@ -6,9 +6,12 @@
import os
import time
import flower.utils.broker
from celery import Celery, Task
from django.conf import settings
from .worker import WORKER_QUEUE_CONFIGURATION
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "failmap.settings")
app = Celery(__name__)
......@@ -23,14 +26,17 @@ app.autodiscover_tasks([app for app in settings.INSTALLED_APPS if app.startswith
# https://github.com/celery/celery/blob/a87ef75884e59c78da21b1482bb66cf649fbb7d3/docs/history/whatsnew-3.0.rst#redis-priority-support
# https://github.com/celery/celery/blob/f83b072fba7831f60106c81472e3477608baf289/docs/whatsnew-4.0.rst#redis-priorities-reversed
# contrary to 'documentation' in release notes the redis priorities do not seem aligned with rabbitmq
app.conf.broker_transport_options = {
'priority_steps': [1, 5, 9],
}
if 'redis://' in app.conf.broker_url:
PRIO_HIGH = 0
PRIO_HIGH = 1
PRIO_NORMAL = 5
PRIO_LOW = 9
else:
PRIO_HIGH = 9
PRIO_NORMAL = 5
PRIO_LOW = 0
PRIO_LOW = 1
# lookup table for routing keys for different IP versions
IP_VERSION_QUEUE = {
......@@ -77,3 +83,46 @@ def rate_limited(sleep):
time.sleep(sleep)
return time.time()
def status():
"""Return a dictionary with the status of the Celery task processing system."""
inspect = app.control.inspect()
# query workforce statistics using control.inspect API and extract some relevant data from it
stats = inspect.stats() or {}
active = inspect.active()
reserved = inspect.reserved()
active_queues = inspect.active_queues()
workers = [{
'name': worker_name,
'queues': [q['name'] for q in active_queues[worker_name]],
'tasks_processed': sum(worker_stats['total'].values()),
'tasks_active': len(active[worker_name]),
'tasks_reserved': len(reserved[worker_name]),
'prefetch_count': worker_stats['prefetch_count'],
'concurrency': worker_stats['pool']['max-concurrency'],
} for worker_name, worker_stats in stats.items()]
if 'redis://' in app.conf.broker_url:
queue_names = [q.name for q in WORKER_QUEUE_CONFIGURATION['default']]
# use flower to not reinvent the wheel on querying queue statistics
broker = flower.utils.broker.Broker(app.conf.broker_url, broker_options=app.conf.broker_transport_options)
queue_stats = broker.queues(queue_names).result()
queues = [{'name': x['name'], 'tasks_pending': x['messages']} for x in queue_stats]
else:
raise NotImplementedError('Currently only Redis is supported!')
alerts = []
if not workers:
alerts.append('No active workers!')
if len(workers) > 9000:
alerts.append('Number of workers is OVER 9000!!!!1111')
return {
'alerts': alerts,
'workers': workers,
'queues': queues
}
......@@ -8,7 +8,7 @@ from kombu import Queue
# define roles for workers
WORKER_QUEUE_CONFIGURATION = {
# universal worker that has access to database and internet
'default': {
'default': [
# for tasks that require network connectivity to perform a scanning task
Queue('scanners'),
# allow to differentiate on scan tasks that have specific ip network family requirements
......@@ -22,20 +22,20 @@ WORKER_QUEUE_CONFIGURATION = {
# legacy default queue, can be removed after transition period to multiworkers
Queue('celery'),
},
],
# universal scanner worker that has internet access for both IPv4 and IPv6
'scanner': {
'scanner': [
Queue('scanners'),
Queue('scanners.ipv4'),
Queue('scanners.ipv6'),
},
],
# scanner with no IPv6 connectivity
# this is an initial concept and can later be replaced with universal
# scanner that automatically detects connectivity
'scanner_ipv4_only': {
'scanner_ipv4_only': [
Queue('scanners'),
Queue('scanners.ipv4'),
},
],
}
......
......@@ -17,6 +17,7 @@ requests
certifi
pytz
celery[redis]
flower
django-celery-beat
# https://github.com/pi-bjl/celery-statsd/commit/5d61d7756f115dbf05a7eeb8314495b53ee1955e
django-statsd-mozilla
......@@ -62,3 +63,4 @@ simplejson
sphinx
sphinx-autobuild
recommonmark
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment