Commit 6044c64a authored by Johan Bloemberg's avatar Johan Bloemberg

Extend scan method integration test to work for all kinds.

parent 51bc9f5d
Pipeline #15959723 failed with stage
in 9 minutes and 54 seconds
......@@ -6,7 +6,7 @@ import time
from collections import Counter
import kombu.exceptions
from celery.result import ResultSet
from celery.result import AsyncResult, ResultSet
from django.conf import settings
from django.core.management.base import BaseCommand
......@@ -80,11 +80,11 @@ class TaskCommand(BaseCommand):
self.interval = options['interval']
if options['task_id']:
# return self.wait_for_result(GroupResult(options['task_id']))
# this currently doesn't work
raise NotImplementedError('needs to be added')
result = self.wait_for_result(ResultSet([AsyncResult(options['task_id'])]))
else:
result = self.run_task(*args, **options)
return json.dumps(self.run_task(*args, **options), cls=ResultEncoder)
return json.dumps(result, cls=ResultEncoder)
def run_task(self, *args, **options):
# try to compose task if not specified
......@@ -113,7 +113,7 @@ class TaskCommand(BaseCommand):
return self.wait_for_result(task_id)
else:
# if async return taskid to allow query for status later on
return [task_id.id]
return [r.id for r in task_id.results]
else:
# By default execute the task directly without involving celery or a broker.
# Return all results without raising exceptions.
......
# Perform different types of dummy scanner runs from a user level (using failmap command)
import json
from subprocess import check_output
import pytest
......@@ -9,6 +10,20 @@ import pytest
def test_scan_method(method, worker, faalonië):
"""Runs the scanner using each of the three methods."""
output = check_output('failmap scan_dummy -m {method} -o faalonië'.format(method=method).split(' '))
output_json = check_output(
'failmap scan_dummy -m {method} -o faalonië'.format(method=method).split(' '), encoding='utf8')
output = json.loads(output_json)
assert output
# async required extra command to wait for and retrieve result
if method == 'async':
task_id = output[0]
output_json = check_output(
'failmap scan_dummy -t {task_id}'.format(task_id=task_id).split(' '), encoding='utf8')
output = json.loads(output_json)
assert len(output) == 1, "Only one result is expected from fixture."
result = output[0]
# test output is a task response and account for the both success or failure responses as the tasks
assert isinstance(result, dict) and ('status' in result or 'error' in result)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment