Commit 988da854 authored by Johan Bloemberg's avatar Johan Bloemberg

Add drama.

parent fe632ae0
Pipeline #18629056 failed with stages
in 18 minutes and 23 seconds
......@@ -83,7 +83,10 @@ class TaskCommand(BaseCommand):
else:
result = self.run_task(*args, **options)
return json.dumps(result, cls=ResultEncoder)
if result:
return json.dumps(result, cls=ResultEncoder)
else:
return
def run_task(self, *args, **options):
# try to compose task if not specified
......@@ -156,3 +159,33 @@ class ScannerTaskCommand(TaskCommand):
# compose set of tasks to be executed
return self.scanner_module.compose_task(organization_filter)
class DramaScannerTaskCommand(ScannerTaskCommand):
def run_task(self, *args, **options):
# try to compose task if not specified
if not self.task:
self.task = self.compose(*args, **options)
# execute task based on selected method
if options['method'] in ['sync', 'async']:
self.task.run()
if options['method'] == 'sync':
log.info('Task scheduled for execution, waiting for it to complete.')
while not self.task.completed:
# show intermediate status
log.info('Task execution status: %s/%s completed',
self.task.completed_count, len(self.task.children))
time.sleep(self.interval)
return [r for r in self.task.get_results(block=True, timeout=60 * 60 * 1000)]
else:
log.info('Task scheduled for execution.')
return str(self.task)
else:
raise NotImplementedError("Dramatiq task backend does not support direct execution.")
def wait_for_result(self, task_id):
raise NotImplementedError()
......@@ -6,11 +6,13 @@ import time
import warnings
from uuid import uuid1
import dramatiq
from django.conf import settings
from django.core.management import call_command
from django.core.management.commands.runserver import Command as RunserverCommand
from retry import retry
import failmap.dramatiq
from failmap.celery import app
log = logging.getLogger(__name__)
......@@ -54,6 +56,13 @@ def start_worker(broker_port, silent=True):
return worker_process
def start_drama_worker(broker_port, silent=True):
worker_command = 'failmap worker --processes 1 --broker redis://localhost:%d/0' % broker_port
worker_process = subprocess.Popen(worker_command.split(), stdout=sys.stdout.buffer, stderr=sys.stderr.buffer)
return worker_process
def stop_process(process):
# soft shutdown
process.terminate()
......@@ -121,19 +130,33 @@ class Command(RunserverCommand):
broker_process, broker_port = start_borker(uuid)
log.info('Starting broker')
self.processes.append(broker_process)
log.info('Starting worker')
log.info('Starting workers')
self.processes.append(start_worker(broker_port, options['verbosity'] < 2))
self.processes.append(start_drama_worker(broker_port, options['verbosity'] < 2))
# set celery broker url
settings.CELERY_BROKER_URL = 'redis://localhost:%d/0' % broker_port
# set as environment variable for the inner_run
os.environ['BROKER'] = settings.CELERY_BROKER_URL
os.environ['BROKER'] = 'redis://localhost:%d/0' % broker_port
# wait for worker to be ready
# wait for celery worker to be ready
log.info('Waiting for worker to be ready')
for _ in range(TIMEOUT * 2):
if app.control.ping(timeout=0.5):
break
# required to make this dramatiq task pickup new broker settings in this instance
failmap.dramatiq.setbroker()
broker = dramatiq.get_broker()
# wait for dramatiq worker to be ready
message = dramatiq.Message(
queue_name='default',
actor_name="ping",
args=(), kwargs={}, options={})
broker.enqueue(message)
message.get_result(block=True, timeout=TIMEOUT * 1000)
log.info('Worker ready')
if not options['no_data']:
......
from __future__ import absolute_import, unicode_literals
import logging
import os
from django_dramatiq.management.commands.rundramatiq import Command as RundramatiqCommand
log = logging.getLogger(__name__)
class Command(RundramatiqCommand):
"""Dramatiq command wrapper."""
help = __doc__
# disable (MySQL) check on startup
requires_system_checks = False
def add_arguments(self, parser):
default_broker = os.environ.get("BROKER", 'redis://127.0.0.1:6379/0')
parser.add_argument('-b', '--broker', default=default_broker, type=str,
help='Url to broker.')
super().add_arguments(parser)
def discover_tasks_modules(self):
"""Filter non-app modules (like uwsgi) and add dramatiq config."""
return ['failmap.dramatiq'] + [m for m in super().discover_tasks_modules() if m.startswith(
'failmap') or m.startswith('django_dramatiq.setup')]
def handle(self, *args, **kwargs):
broker = kwargs['broker']
log.info("Setting broker to: %s", broker)
os.environ["BROKER"] = broker
super().handle(*args, **kwargs)
......@@ -3,8 +3,10 @@
import datetime
import importlib
from typing import Union
import celery
import dramatiq
from django.contrib.auth.models import User
from django.db import models
from jsonfield import JSONField
......@@ -31,7 +33,8 @@ class Job(models.Model):
created_by = models.ForeignKey(User, blank=True, null=True, on_delete=models.CASCADE,)
@classmethod
def create(cls, task: celery.Task, name: str, request, *args, **kwargs) -> 'Job':
def create(cls, task: Union[celery.Task, celery.canvas.Signature, dramatiq.group],
name: str, request, *args, **kwargs) -> 'Job':
"""Create job object and publish task on celery queue."""
# create database object
job = cls(task=str(task))
......@@ -41,12 +44,15 @@ class Job(models.Model):
job.status = 'created'
job.save()
# publish original task which stores the result in this Job object
result_id = (task | cls.store_result.s(job_id=job.id)).apply_async(*args, **kwargs)
if isinstance(task, celery.Task) or isinstance(task, celery.canvas.Signature):
# publish original task which stores the result in this Job object
result_id = (task | cls.store_result.s(job_id=job.id)).apply_async(*args, **kwargs)
# store the task async result ID for reference
job.result_id = result_id.id
job.save(update_fields=['result_id'])
# store the task async result ID for reference
job.result_id = result_id.id
job.save(update_fields=['result_id'])
else:
task.run()
return job
......
import logging
import os
import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.brokers.stub import StubBroker
from dramatiq.results.backends import RedisBackend, StubBackend
log = logging.getLogger(__name__)
def setbroker(watcher=False):
broker_url = os.environ.get('BROKER', 'redis://localhost:6379/0')
if broker_url.startswith('redis'):
broker = RedisBroker(url=broker_url)
broker.add_middleware(dramatiq.results.Results(backend=RedisBackend(client=broker.client)))
else:
broker = StubBroker()
broker.add_middleware(dramatiq.results.Results(backend=StubBackend()))
broker.add_middleware(dramatiq.middleware.AgeLimit())
broker.add_middleware(dramatiq.middleware.Callbacks())
broker.add_middleware(dramatiq.middleware.Retries())
broker.add_middleware(dramatiq.middleware.Pipelines())
dramatiq.set_broker(broker)
dramatiq.set_encoder(dramatiq.PickleEncoder)
setbroker()
@dramatiq.actor(store_results=True)
def ping():
return 'pong'
......@@ -17,7 +17,8 @@ import failmap.scanners.scanner_http as scanner_http
from failmap import types
from failmap.map import rating
from failmap.map.rating import OrganizationRating, UrlRating
from failmap.scanners import scanner_plain_http, scanner_security_headers, scanner_tls_qualys
from failmap.scanners import (scanner_dummy, scanner_plain_http, scanner_security_headers,
scanner_tls_qualys)
from failmap.scanners.admin import UrlIpInline
from failmap.scanners.models import Endpoint
from failmap.scanners.onboard import onboard_urls
......@@ -119,6 +120,11 @@ class ActionMixin:
scan_tls_qualys.short_description = '🔬 Scan TLS Qualys'
actions.append(scan_tls_qualys)
def scan_dummy(self, *args, **kwargs):
return self.generic_action(scanner_dummy.compose_task, 'Dummy/test scanner', *args, **kwargs)
scan_dummy.short_description = '💥 Dummy/test scanner'
actions.append(scan_dummy)
def rebuild_ratings(self, *args, **kwargs):
return self.generic_action(rating.compose_task, 'Rebuild rating', *args, **kwargs)
rebuild_ratings.short_description = '✅ Rebuild rating'
......
import logging
from failmap.app.management.commands._private import ScannerTaskCommand
from failmap.app.management.commands._private import DramaScannerTaskCommand
from failmap.scanners import scanner_dummy
log = logging.getLogger(__name__)
class Command(ScannerTaskCommand):
class Command(DramaScannerTaskCommand):
"""Demostrative NOOP scanner for example purposes."""
help = __doc__
......
......@@ -8,10 +8,10 @@ import logging
import random
import time
from celery import Task, group
from django.conf import settings
from failmap.celery import ParentFailed, app
from failmap.celery import ParentFailed
from failmap.dramatiq import dramatiq
from failmap.organizations.models import Organization, Url
from failmap.scanners.endpoint_scan_manager import EndpointScanManager
......@@ -30,7 +30,7 @@ def compose_task(
organizations_filter: dict = dict(),
urls_filter: dict = dict(),
endpoints_filter: dict = dict(),
) -> Task:
) -> dramatiq.group:
"""Compose taskset to scan specified endpoints.
*This is an implementation of `compose_task`. For more documentation about this concept, arguments and concrete
......@@ -72,21 +72,22 @@ def compose_task(
# create tasks for scanning all selected endpoints as a single managable group
# Sending entire objects is possible. How signatures (.s and .si) work is documented:
# http://docs.celeryproject.org/en/latest/reference/celery.html#celery.signature
task = group(
scan_dummy.s(endpoint.uri_url()) | store_dummy.s(endpoint) for endpoint in endpoints
task = dramatiq.group(
scan_dummy.message(endpoint.uri_url()) | store_dummy.message(endpoint) for endpoint in endpoints
)
return task
@app.task(queue='storage')
def store_dummy(result, endpoint):
@dramatiq.actor(queue_name='storage', store_results=True)
def store_dummy(endpoint, result):
"""
:param result: param endpoint:
:param endpoint:
"""
# if scan task failed, ignore the result (exception) and report failed status
if isinstance(result, Exception):
return ParentFailed('skipping result parsing because scan failed.', cause=result)
......@@ -115,12 +116,8 @@ class SomeError(Exception):
"""Just some expectable error."""
@app.task(queue='scanners',
bind=True,
default_retry_delay=RETRY_DELAY,
retry_kwargs={'max_retries': MAX_RETRIES},
expires=EXPIRES)
def scan_dummy(self, uri_url):
@dramatiq.actor(queue_name='scanners', store_results=True)
def scan_dummy(uri_url):
"""
Before committing your scanner, verify the following:
......@@ -137,45 +134,32 @@ def scan_dummy(self, uri_url):
:param uri_url:
"""
try:
log.info('Start scanning %s', uri_url)
# Tools and output for this scan are registered in /failmap/settings.py
# We prefer tools written in python, limiting the amount of dependencies used in the project.
# Another tool is fine too, but please announce so in chat etc.
# Example:
# TOOLS = {
# 'yourtool': {
# 'executable': VENDOR_DIR + os.environ.get('YOURTOOL_EXECUTABLE', "yourtool/yourtool.py"),
# 'output_dir': OUTPUT_DIR + os.environ.get('YOURTOOL_OUTPUT_DIR',
# "scanners/resources/output/yourtool/"),
# },
# mytool = settings.TOOLS['youtool']['executable']
# Below demonstrates the usage of settings.
sample_settings_usage = len(settings.TOOLS)
log.debug("%s are registered." % sample_settings_usage)
# simulation: sometimes a task fails, for example with network errors etcetera. The task will be retried.
if not random.randint(0, 5):
raise SomeError('some error occured')
# simulation: often tasks take different times to execute
time.sleep(random.randint(1, 10) / 10)
# simulation: the result can be different
result = bool(random.randint(0, 1))
log.info('Done scanning: %s, result: %s', uri_url, result)
return result
except SomeError as e:
# If an expected error is encountered put this task back on the queue to be retried.
# This will keep the chained logic in place (saving result after successful scan).
# Retry delay and total number of attempts is configured in the task decorator.
try:
# Since this action raises an exception itself, any code after this won't be executed.
raise self.retry(exc=e)
except BaseException:
# If this task still fails after maximum retries the last
# error will be passed as result to the next task.
log.exception('Retried %s times and it still failed', MAX_RETRIES)
return e
log.info('Start scanning %s', uri_url)
# Tools and output for this scan are registered in /failmap/settings.py
# We prefer tools written in python, limiting the amount of dependencies used in the project.
# Another tool is fine too, but please announce so in chat etc.
# Example:
# TOOLS = {
# 'yourtool': {
# 'executable': VENDOR_DIR + os.environ.get('YOURTOOL_EXECUTABLE', "yourtool/yourtool.py"),
# 'output_dir': OUTPUT_DIR + os.environ.get('YOURTOOL_OUTPUT_DIR',
# "scanners/resources/output/yourtool/"),
# },
# mytool = settings.TOOLS['youtool']['executable']
# Below demonstrates the usage of settings.
sample_settings_usage = len(settings.TOOLS)
log.debug("%s are registered." % sample_settings_usage)
# simulation: sometimes a task fails, for example with network errors etcetera. The task will be retried.
if not random.randint(0, 5):
raise SomeError('some error occured')
# simulation: often tasks take different times to execute
time.sleep(random.randint(1, 10) / 10)
# simulation: the result can be different
result = bool(random.randint(0, 1))
log.info('Done scanning: %s, result: %s', uri_url, result)
return result
......@@ -23,6 +23,10 @@ celery[redis,eventlet]
django-celery-beat
flower # used for queue statistics
# dramatiq task handling
dramatiq[redis,watch]
django_dramatiq
# https://github.com/pi-bjl/celery-statsd/commit/5d61d7756f115dbf05a7eeb8314495b53ee1955e
django-statsd-mozilla
django_uwsgi
......
......@@ -8,8 +8,9 @@ import time
import pytest
from failmap.celery import waitsome
from failmap.dramatiq import ping
TIMEOUT = 30
TIMEOUT = 10
@pytest.fixture(scope="session")
......@@ -21,7 +22,7 @@ def faalonië():
@pytest.fixture(scope='session', params=['prefork', 'eventlet'])
def worker(request):
def celery_worker(request):
"""Run a task worker instance."""
pool = request.param
......@@ -41,3 +42,31 @@ def worker(request):
finally:
# stop worker and all child threads
os.killpg(os.getpgid(worker_process.pid), signal.SIGKILL)
@pytest.fixture(scope='session', params=['threads', 'gevent'])
def dramatiq_worker(request):
"""Run a task worker instance."""
if request.param == 'gevent':
worker_command = ['failmap', 'worker', '--use-gevent']
else:
worker_command = ['failmap', 'worker']
worker_process = subprocess.Popen(worker_command,
stdout=sys.stdout.buffer, stderr=sys.stderr.buffer,
preexec_fn=os.setsid)
# wrap assert in try/finally to kill worker on failing assert, wrap yield as well for cleaner code
try:
# wait for worker to start accepting tasks before turning to test function
message = ping.send()
# give worker stderr time to output into 'Captured stderr setup' and not spill over into 'Captured stderr call'
message.get_result(block=True, timeout=TIMEOUT * 1000)
time.sleep(0.1)
yield worker_process
finally:
# stop worker and all child threads
os.killpg(os.getpgid(worker_process.pid), signal.SIGKILL)
worker = dramatiq_worker
......@@ -10,12 +10,17 @@ import pytest
def test_scan_method(method, worker, faalonië):
"""Runs the scanner using each of the three methods."""
if method == 'direct':
pytest.skip("direct execution method currently not supported with dramatiq")
output_json = check_output(
'failmap scan_dummy -m {method} -o faalonië'.format(method=method).split(' '), encoding='utf8')
output = json.loads(output_json)
# async required extra command to wait for and retrieve result
if method == 'async':
pytest.skip("async result fetching currently not supported with dramatiq")
task_id = output[0]
output_json = check_output(
......
......@@ -2,12 +2,14 @@
import json
import pytest
from django.core.management import call_command
TEST_ORGANIZATION = 'faalonië'
NON_EXISTING_ORGANIZATION = 'faaloniet'
@pytest.mark.skip("Dramatiq currently doesn't support direct execution.")
def test_dummy(responses, db, faalonië):
"""Test running dummy scan."""
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment