Commit 1da95861 authored by Johan Bloemberg's avatar Johan Bloemberg

Improve worker.

- more memory efficient
- fails if role cannot be applied
- only rate limit qualys initial requests
parent 7906a2bd
Pipeline #17424742 passed with stages
in 11 minutes and 3 seconds
......@@ -59,7 +59,7 @@ WORKER_QUEUE_CONFIGURATION = {
}
def worker_configuration(conf):
def worker_configuration():
"""Apply specific configuration for worker depending on environment."""
role = os.environ.get('WORKER_ROLE', 'default')
......@@ -67,10 +67,10 @@ def worker_configuration(conf):
log.info('Configuring worker for role: %s', role)
# configure which queues should be consumed depending on assigned role for this worker
conf.task_queues = WORKER_QUEUE_CONFIGURATION[role]
return {'task_queues': WORKER_QUEUE_CONFIGURATION[role]}
def tls_client_certificate(conf):
def tls_client_certificate():
"""Configure certificates from PKCS12 file.
If client file is provided will extract key and certificate pem to files and
......@@ -98,11 +98,14 @@ def tls_client_certificate(conf):
tls_client_cert_file.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, p12.get_certificate()))
# configure redis to use TLS
conf.broker_use_ssl = {
'ssl_keyfile': tls_client_key_file.name,
'ssl_certfile': tls_client_cert_file.name,
'ssl_ca_certs': certifi.where(),
'ssl_cert_reqs': ssl.CERT_REQUIRED
return {
'broker_use_ssl': {
'ssl_keyfile': tls_client_key_file.name,
'ssl_certfile': tls_client_cert_file.name,
'ssl_ca_certs': certifi.where(),
'ssl_cert_reqs': ssl.CERT_REQUIRED,
}
}
else:
log.info('no PKCS12 file found, not configuring TLS.')
return {}
......@@ -146,7 +146,9 @@ def qualys_scan(self, url):
# The 'retry' converts this task instance from a rate_limited into a
# scheduled task, so retrying tasks won't interfere with new tasks to be
# started
raise self.retry(countdown=20, priorty=PRIO_HIGH, max_retries=30)
# We use a different queue here as only initial requests count toward the rate limit
# set by Qualys.
raise self.retry(countdown=20, priorty=PRIO_HIGH, max_retries=30, queue='scanners')
@app.task(queue='storage')
......
"""This module is imported by failmap.__init__ to register Signal hooks."""
import logging
import shutil
import sys
import tempfile
from celery.signals import celeryd_init, worker_shutdown
......@@ -8,19 +10,24 @@ from django.conf import settings
from .celery.worker import tls_client_certificate, worker_configuration
log = logging.getLogger(__name__)
@celeryd_init.connect
def configure_workers(sender=None, conf=None, **kwargs):
"""Configure workers when Celery is initialized."""
# create a universal temporary directory to be removed when the application quits
settings.WORKER_TMPDIR = tempfile.mkdtemp()
# configure worker queues
worker_configuration(conf)
# for remote workers configure TLS key and certificate from PKCS12 file
tls_client_certificate(conf)
try:
# create a universal temporary directory to be removed when the application quits
settings.WORKER_TMPDIR = tempfile.mkdtemp()
# configure worker queues
conf.update(worker_configuration())
# for remote workers configure TLS key and certificate from PKCS12 file
conf.update(tls_client_certificate())
except BaseException:
log.exception('Failed to setup worker configuration!')
sys.exit(1)
@worker_shutdown.connect
......
......@@ -16,7 +16,7 @@ requests
# use mozzila certificate bundle by default
certifi
pytz
celery[redis]
celery[redis,eventlet]
flower
django-celery-beat
# https://github.com/pi-bjl/celery-statsd/commit/5d61d7756f115dbf05a7eeb8314495b53ee1955e
......
import logging
import os
import signal
import subprocess
......@@ -8,6 +9,9 @@ import pytest
from failmap.celery import app, waitsome
log = logging.getLogger(__name__)
TIMEOUT = 30
......@@ -22,9 +26,24 @@ def celery_app():
yield app
@pytest.fixture()
def celery_worker(queues):
worker_command = ['failmap', 'celery', 'worker', '-l', 'info', '--queues', ','.join(queues)]
@pytest.fixture(params=['prefork', 'eventlet'])
def celery_worker(queues, request):
"""Spawn celery worker to be used during test.
This worker only listens on specified queues to ensure test integrity!
Tests on both implementations of worker."""
pool = request.param
worker_command = [
'failmap',
'celery',
'worker',
'-l', 'info',
'--pool', pool,
'--queues', ','.join(queues)
]
log.info('Running worker with: %s', ' '.join(worker_command))
worker_process = subprocess.Popen(worker_command,
stdout=sys.stdout.buffer, stderr=sys.stderr.buffer,
preexec_fn=os.setsid)
......
......@@ -20,10 +20,13 @@ def faalonië():
subprocess.call(['failmap', 'load_dataset', 'faalonie'])
@pytest.fixture(scope="session")
def worker():
@pytest.fixture(scope='session', params=['prefork', 'eventlet'])
def worker(request):
"""Run a task worker instance."""
worker_command = ['failmap', 'celery', 'worker', '-l', 'info']
pool = request.param
worker_command = ['failmap', 'celery', 'worker', '-l', 'info', '--pool', pool]
worker_process = subprocess.Popen(worker_command,
stdout=sys.stdout.buffer, stderr=sys.stderr.buffer,
preexec_fn=os.setsid)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment