Initial implementation of Job concept for user triggered tasks.

parent 628901b8
from django.contrib import admin
from .models import Job
class JobAdmin(admin.ModelAdmin):
list_display = ('name', 'result_id', 'created_on', 'finished_on')
list_filter = ('result', )
readonly_fields = ('name', 'task', 'result_id', 'result', 'status', 'created_on', 'finished_on')
admin.site.register(Job, JobAdmin)
# -*- coding: utf-8 -*-
# Generated by Django 1.11.7 on 2017-11-09 10:48
from __future__ import unicode_literals
from django.db import migrations, models
import jsonfield.fields
class Migration(migrations.Migration):
initial = True
dependencies = [
]
operations = [
migrations.CreateModel(
name='Job',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('task', models.TextField(help_text='celery task signature in string form')),
('result_id', models.CharField(help_text='celery asyncresult ID for tracing task', max_length=255)),
('status', models.CharField(help_text='status of the job', max_length=255)),
('result', jsonfield.fields.JSONField(default=dict, help_text='output of the task as JSON')),
('created_on', models.DateTimeField(auto_now_add=True, help_text='when task was created', null=True)),
('finished_on', models.DateTimeField(blank=True, help_text='when task ended', null=True)),
],
),
]
# -*- coding: utf-8 -*-
# Generated by Django 1.11.7 on 2017-11-09 11:31
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('app', '0001_initial'),
]
operations = [
migrations.AddField(
model_name='job',
name='name',
field=models.CharField(default='N/A', help_text='name of the job', max_length=255),
preserve_default=False,
),
]
# coding=UTF-8
# from __future__ import unicode_literals
import datetime
from django.db import models
from jsonfield import JSONField
from ..celery import app
class Job(models.Model):
"""An object to provide tracking of tasks."""
name = models.CharField(max_length=255, help_text="name of the job")
task = models.TextField(help_text="celery task signature in string form")
result_id = models.CharField(max_length=255, help_text="celery asyncresult ID for tracing task")
status = models.CharField(max_length=255, help_text="status of the job")
result = JSONField(help_text="output of the task as JSON")
created_on = models.DateTimeField(auto_now_add=True, blank=True, null=True, help_text="when task was created")
finished_on = models.DateTimeField(blank=True, null=True, help_text="when task ended")
@classmethod
def create(cls, task, name, *args, **kwargs):
"""Create job object and publish task on celery queue."""
# create database object
job = cls(task=str(task))
job.save()
# publish original task which stores the result in this Job object
result_id = (task | cls.store_result.s(job_id=job.id)).apply_async(*args, **kwargs)
job.name = name
job.result_id = result_id.id
job.status = 'created'
job.save()
return job
@staticmethod
@app.task
def store_result(result, job_id=None):
"""Celery task to store result of task after it has completed."""
print(result, job_id)
job = Job.objects.get(id=job_id)
job.result = result
job.status = 'completed'
job.finished_on = datetime.datetime.now()
job.save()
def __str__(self):
return self.result_id
......@@ -4,16 +4,15 @@ import pytz # admin functions
from django.contrib import admin
from jet.admin import CompactInline
from failmap_admin.map.determineratings import (OrganizationRating, UrlRating, rate_organization,
rate_urls)
from failmap_admin.scanners.models import Endpoint
from failmap_admin.scanners.scanner_dns import brute_known_subdomains, certificate_transparency
from failmap_admin.scanners.scanner_http import scan_urls_on_standard_ports
from failmap_admin.scanners.scanner_plain_http import scan_urls as plain_http_scan_urls
from failmap_admin.scanners.scanner_screenshot import screenshot_urls
from failmap_admin.scanners.scanner_security_headers import scan_urls as security_headers_scan_urls
from failmap_admin.scanners.scanner_tls_qualys import ScannerTlsQualys
from ..app.models import Job
from ..map.determineratings import OrganizationRating, UrlRating, rate_organization, rate_urls
from ..scanners.models import Endpoint
from ..scanners.scanner_dns import brute_known_subdomains, certificate_transparency
from ..scanners.scanner_http import scan_urls_on_standard_ports
from ..scanners.scanner_plain_http import scan_urls as plain_http_scan_urls
from ..scanners.scanner_screenshot import screenshot_urls
from ..scanners.scanner_security_headers import scan_urls as security_headers_scan_urls
from ..scanners.scanner_tls_qualys import ScannerTlsQualys
from .models import Coordinate, Organization, OrganizationType, Url
# Solved: http://stackoverflow.com/questions/11754877/
......@@ -156,8 +155,12 @@ class UrlAdmin(admin.ModelAdmin):
actions.append('scan_tls_qualys')
def security_headers(self, request, queryset):
security_headers_scan_urls(urls=([url for url in queryset]))
self.message_user(request, "Scan Security Headers: done")
# create a celery task and use Job object to keep track of the status
urls = list(queryset)
task = security_headers_scan_urls(urls=urls, execute=False)
name = "Scan Security Headers (%s) " % str(urls)
job = Job.create(task, name)
self.message_user(request, "%s: job created, id:%s" % (name, job.id))
security_headers.short_description = "Scan Security Headers"
actions.append('security_headers')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment