Commit af193218 authored by Johan Bloemberg's avatar Johan Bloemberg

Improve celery integrationtests, adjust settings to make both rate limiting...

Improve celery integrationtests, adjust settings to make both rate limiting and priorities work with current test cases (needs more testcases to verify different scenarios.)
parent 1fe238cf
......@@ -23,7 +23,7 @@ app.autodiscover_tasks([app for app in settings.INSTALLED_APPS if app.startswith
# https://github.com/celery/celery/blob/a87ef75884e59c78da21b1482bb66cf649fbb7d3/docs/history/whatsnew-3.0.rst#redis-priority-support
# https://github.com/celery/celery/blob/f83b072fba7831f60106c81472e3477608baf289/docs/whatsnew-4.0.rst#redis-priorities-reversed
# contrary to 'documentation' in release notes the redis priorities do not seem aligned with rabbitmq
if 'redis' in app.conf.broker_url:
if 'redis://' in app.conf.broker_url:
PRIO_HIGH = 0
PRIO_NORMAL = 5
PRIO_LOW = 9
......
......@@ -460,7 +460,8 @@ CELERY_BROKER_CONNECTION_RETRY = False
# workaround to try and make rate limited tasks coexist on the same worker as non-rate limited whilst keeping
# good throughput on non-rate limited tasks even though worker interal queue might be plugged with rate limited tasks
CELERY_WORKER_PREFETCH_MULTIPLIER = 0
# let worker consume as many tasks as it wants
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
# numer of tasks to be executed in parallel by celery
CELERY_WORKER_CONCURRENCY = 10
......
......@@ -2,10 +2,19 @@ import os
import signal
import subprocess
import sys
import time
import pytest
from failmap_admin.celery import app
from failmap_admin.celery import app, waitsome
TIMEOUT = 30
@pytest.fixture()
def queue():
"""Generate a unique queue to isolate every test."""
yield 'queue-' + str(time.time())
@pytest.fixture()
......@@ -13,18 +22,16 @@ def celery_app():
yield app
@pytest.fixture(scope='session')
def celery_worker():
worker_command = ['failmap-admin', 'celery', 'worker', '-l', 'info']
@pytest.fixture()
def celery_worker(queue):
worker_command = ['failmap-admin', 'celery', 'worker', '-l', 'info', '--queues', queue]
worker_process = subprocess.Popen(worker_command,
stdout=sys.stdout.buffer, stderr=sys.stderr.buffer,
preexec_fn=os.setsid)
# wait for worker to start accepting tasks before turning to test function
assert waitsome.apply_async([0], queue=queue, expires=TIMEOUT).get(timeout=TIMEOUT)
print('worker ready', file=sys.stderr)
yield worker_process
worker_process.terminate()
os.killpg(os.getpgid(worker_process.pid), signal.SIGTERM)
@pytest.fixture(scope='session')
def celery_worker_pool():
"""Align test worker settings with project settings."""
return 'prefork'
# stop worker and all child threads
os.killpg(os.getpgid(worker_process.pid), signal.SIGKILL)
......@@ -23,16 +23,16 @@ assert SAMPLES > 10, 'with current settings this test might not provide reliable
TASK_EXPIRY_TIME = SAMPLES * SLEEP
def test_high_priority(celery_app, celery_worker):
def test_high_priority(celery_app, celery_worker, queue):
"""High prio tasks should be executed first."""
# enqueue normal and high prio tasks alternately
high, normal = [], []
for index in range(SAMPLES):
if index % 2:
normal.append(waitsome.apply_async([SLEEP], expires=TASK_EXPIRY_TIME))
normal.append(waitsome.apply_async([SLEEP], queue=queue, expires=TASK_EXPIRY_TIME))
else:
high.append(waitsome.apply_async([SLEEP], expires=TASK_EXPIRY_TIME, priority=PRIO_HIGH))
high.append(waitsome.apply_async([SLEEP], queue=queue, expires=TASK_EXPIRY_TIME, priority=PRIO_HIGH))
# wait for all tasks to complete
print(ResultSet(results=high + normal).join(timeout=TASK_EXPIRY_TIME))
......
"""Test assumptions about rate limiting."""
import time
from django.conf import settings
from failmap_admin.celery import rate_limited, waitsome
from failmap_admin.celery import PRIO_HIGH, rate_limited, waitsome
SAMPLES = settings.CELERY_WORKER_CONCURRENCY
SAMPLES = settings.CELERY_WORKER_CONCURRENCY * 1
SLEEP = 1
TASK_EXPIRY_TIME = SAMPLES * SLEEP
def test_rate_limits(celery_app, celery_worker):
def test_rate_limits(celery_app, celery_worker, queue):
"""Rate limited tasks should not hold up worker queue for other tasks."""
# fill queue with rate limited tasks
rated_tasks = [rate_limited.apply_async([SLEEP], expires=TASK_EXPIRY_TIME) for _ in range(SAMPLES)]
rated_tasks = [
rate_limited.apply_async([SLEEP], queue=queue, expires=TASK_EXPIRY_TIME) for _ in range(SAMPLES)]
# add tasks that is not rate limited
task = waitsome.apply_async([SLEEP], expires=TASK_EXPIRY_TIME)
task = waitsome.apply_async([SLEEP], queue=queue, expires=TASK_EXPIRY_TIME)
# make sure task is executed before all rate limited tasks are done
assert task.get(timeout=SLEEP * 2)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment