Commit 5e8a2c36 authored by Johan Bloemberg's avatar Johan Bloemberg

refactor to not rely on buggy celery pytest implementation, invoke correctly...

refactor to not rely on buggy celery pytest implementation, invoke correctly configured worker via threading
parent ff9deb73
......@@ -4,6 +4,7 @@
# http://docs.celeryproject.org/en/latest/userguide/security.html
import os
import time
from celery import Celery, Task
from django.conf import settings
......@@ -41,11 +42,6 @@ class DefaultTask(Task):
app.Task = DefaultTask
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
class ParentFailed(Exception):
"""Error to indicate parent task has failed."""
......@@ -54,3 +50,24 @@ class ParentFailed(Exception):
if cause:
self.__cause__ = cause
super(ParentFailed, self).__init__(message, *args)
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
@app.task
def waitsome(sleep):
"""Wait some time and return epoch at completion."""
time.sleep(sleep)
return time.time()
@app.task(rate_limit='1/s')
def rate_limited(sleep):
"""Wait some time but limit to maximum tasks of 1 per second."""
time.sleep(sleep)
return time.time()
import os
import signal
import subprocess
import sys
import pytest
from django.conf import settings
from failmap_admin.celery import app
@pytest.fixture()
def celery_app():
"""Use project app and settings instead of generic test app for more reliable tests."""
# test connection(settings) as default behaviour is to verify initial connection
# indefinitely which is fine for production use but not when testing/debugging
with app.connection() as conn:
conn.ensure_connection(max_retries=1)
return app
yield app
@pytest.fixture(scope='session')
def celery_includes():
"""Fix test worker behaviour lost due to using project app."""
return ['celery.contrib.testing.tasks']
def celery_worker():
worker_command = ['failmap-admin', 'celery', 'worker', '-l', 'info']
worker_process = subprocess.Popen(worker_command,
stdout=sys.stdout.buffer, stderr=sys.stderr.buffer,
preexec_fn=os.setsid)
yield worker_process
worker_process.terminate()
os.killpg(os.getpgid(worker_process.pid), signal.SIGTERM)
@pytest.fixture(scope='session')
def celery_worker_pool():
"""Align test worker settings with project settings."""
return 'prefork'
@pytest.fixture(scope='session')
def celery_worker_parameters():
"""Align test worker settings with project settings."""
return {
'concurrency': settings.CELERY_WORKER_CONCURRENCY,
}
"""Test/prove some assumtions about celery behaviour (eg: priorities)."""
"""Test/prove some assumptions about celery behaviour (eg: priorities)."""
# The concurrent nature of this test and the parts its testing can cause this
# test to be 'flaky' (fail/succeed at random) and non-deterministic.
......@@ -6,12 +6,10 @@
# behaviour which invalidates the results of this test.
# For example a to low number of samples might cause
import time
from celery.result import ResultSet
from django.conf import settings
from failmap_admin.celery import PRIO_HIGH, app
from failmap_admin.celery import PRIO_HIGH, waitsome
# amount of time the dummy task 'runs'
SLEEP = 0.1
......@@ -22,27 +20,19 @@ SAMPLES = settings.CELERY_WORKER_CONCURRENCY * settings.CELERY_WORKER_CONCURRENC
assert SAMPLES > 10, 'with current settings this test might not provide reliable results!'
@app.task
def waitsome():
"""Wait some time and return epoch at completion."""
time.sleep(SLEEP)
return time.time()
TASK_EXPIRY_TIME = SAMPLES * SLEEP
def test_high_priority(celery_app, celery_worker):
"""High prio tasks should be executed first."""
TASK_EXPIRY_TIME = SAMPLES * SLEEP
# enqueue normal and high prio tasks alternately
high, normal = [], []
for index in range(SAMPLES):
if index % 2:
normal.append(waitsome.apply_async(expires=TASK_EXPIRY_TIME))
normal.append(waitsome.apply_async([SLEEP], expires=TASK_EXPIRY_TIME))
else:
high.append(waitsome.apply_async(expires=TASK_EXPIRY_TIME, priority=PRIO_HIGH))
high.append(waitsome.apply_async([SLEEP], expires=TASK_EXPIRY_TIME, priority=PRIO_HIGH))
# wait for all tasks to complete
print(ResultSet(results=high + normal).join(timeout=TASK_EXPIRY_TIME))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment