Commit 2644ac5f authored by Johan Bloemberg's avatar Johan Bloemberg

Run qualys tasks in their own queue to prevent blocking due to rate limiting.

parent f5b9fa3f
Pipeline #17313139 passed with stages
in 13 minutes and 48 seconds
......@@ -31,3 +31,4 @@ failmap_debug_dataset*
temp/*
*.out
*.rdb
.pytest_cache/
......@@ -22,6 +22,9 @@ WORKER_QUEUE_CONFIGURATION = {
# allow to differentiate on scan tasks that have specific ip network family requirements
Queue('scanners.ipv4'),
Queue('scanners.ipv6'),
# a special queue for Qualys as it requires rate limiting and that causes other tasks in
# the same queue to stall.
Queue('scanners.qualys'),
# for tasks that require a database connection
Queue('storage'),
# default queue for task with no explicit queue assigned
......@@ -36,6 +39,7 @@ WORKER_QUEUE_CONFIGURATION = {
Queue('scanners'),
Queue('scanners.ipv4'),
Queue('scanners.ipv6'),
Queue('scanners.qualys'),
],
# scanner with no IPv6 connectivity
# this is an initial concept and can later be replaced with universal
......@@ -43,6 +47,7 @@ WORKER_QUEUE_CONFIGURATION = {
'scanner_ipv4_only': [
Queue('scanners'),
Queue('scanners.ipv4'),
Queue('scanners.qualys'),
],
}
......
......@@ -103,7 +103,8 @@ def compose_task(
bind=True,
# this task should run on an internet connected, distributed worker
queue='scanners',
# also because of rate limiting put in its own queue to prevent blocking other tasks
queue='scanners.qualys',
# start at most 1 new task per minute (per worker)
rate_limit='1/m',
)
......
......@@ -12,9 +12,9 @@ TIMEOUT = 30
@pytest.fixture()
def queue():
def queues():
"""Generate a unique queue to isolate every test."""
yield 'queue-' + str(time.time())
yield ['queue-' + str(time.time()), 'queue2-' + str(time.time())]
@pytest.fixture()
......@@ -23,15 +23,15 @@ def celery_app():
@pytest.fixture()
def celery_worker(queue):
worker_command = ['failmap', 'celery', 'worker', '-l', 'info', '--queues', queue]
def celery_worker(queues):
worker_command = ['failmap', 'celery', 'worker', '-l', 'info', '--queues', ','.join(queues)]
worker_process = subprocess.Popen(worker_command,
stdout=sys.stdout.buffer, stderr=sys.stderr.buffer,
preexec_fn=os.setsid)
# wrap assert in try/finally to kill worker on failing assert, wrap yield as well for cleaner code
try:
# wait for worker to start accepting tasks before turning to test function
assert waitsome.apply_async([0], queue=queue, expires=TIMEOUT).get(timeout=TIMEOUT), \
assert waitsome.apply_async([0], queue=queues[0], expires=TIMEOUT).get(timeout=TIMEOUT), \
"Worker failed to become ready and execute test task."
# give worker stderr time to output into 'Captured stderr setup' and not spill over into 'Captured stderr call'
time.sleep(0.1)
......
......@@ -23,16 +23,16 @@ assert SAMPLES > 10, 'with current settings this test might not provide reliable
TASK_EXPIRY_TIME = SAMPLES * SLEEP
def test_high_priority(celery_app, celery_worker, queue):
def test_high_priority(celery_app, celery_worker, queues):
"""High prio tasks should be executed first."""
# enqueue normal and high prio tasks alternately
high, normal = [], []
for index in range(SAMPLES):
if index % 2:
normal.append(waitsome.apply_async([SLEEP], queue=queue, expires=TASK_EXPIRY_TIME))
normal.append(waitsome.apply_async([SLEEP], queue=queues[0], expires=TASK_EXPIRY_TIME))
else:
high.append(waitsome.apply_async([SLEEP], queue=queue, expires=TASK_EXPIRY_TIME, priority=PRIO_HIGH))
high.append(waitsome.apply_async([SLEEP], queue=queues[0], expires=TASK_EXPIRY_TIME, priority=PRIO_HIGH))
# wait for all tasks to complete
print(ResultSet(results=high + normal).join(timeout=TASK_EXPIRY_TIME))
......
......@@ -6,24 +6,27 @@ from django.conf import settings
from failmap.celery import rate_limited, waitsome
SAMPLES = settings.CELERY_WORKER_CONCURRENCY * 2
SAMPLES = settings.CELERY_WORKER_CONCURRENCY * settings.CELERY_WORKER_CONCURRENCY
SLEEP = 5
TASK_EXPIRY_TIME = SAMPLES * SLEEP
def test_rate_limits(celery_app, celery_worker, queue):
"""Rate limited tasks should not hold up worker queue for other tasks."""
def test_rate_limits(celery_app, celery_worker, queues):
"""Rate limited tasks should not hold up worker queue for other tasks.
Rate limited task are put into a different queue as this seems the only way currently to allow this behaviour.
"""
# fill queue with rate limited tasks
rated_tasks = [
rate_limited.apply_async([SLEEP], queue=queue, expires=TASK_EXPIRY_TIME) for _ in range(SAMPLES)]
time.sleep(SLEEP / 2)
rate_limited.apply_async([SLEEP], queue=queues[0], expires=TASK_EXPIRY_TIME) for _ in range(SAMPLES)]
time.sleep(1)
# add tasks that is not rate limited
task = waitsome.apply_async([0], queue=queue, expires=TASK_EXPIRY_TIME)
task = waitsome.apply_async([0], queue=queues[1], expires=TASK_EXPIRY_TIME)
# make sure task is executed before all rate limited tasks are done
assert task.get(timeout=SLEEP)
assert task.get(timeout=1)
# for sanity make sure not more than half the rate limited task have been executed in the mean time
PENDING_RATED_TASKS = len([s for s in rated_tasks if s.state == 'PENDING'])
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment