Commit fb2ffeb6 authored by Elger Jonker's avatar Elger Jonker

icons on buttons, improved listing to determine actions

parent caa97fc1
...@@ -773,11 +773,9 @@ def significant_times(organization=None, url=None): ...@@ -773,11 +773,9 @@ def significant_times(organization=None, url=None):
# make sure the URL ratings are up to date, they will check endpoints and such. # make sure the URL ratings are up to date, they will check endpoints and such.
# probably not used anymore def rate_selected_organizations(organizations, when=""):
# def rate_organizations(organizations, when=""): for organization in organizations:
# # since a url can now have multiple organizations, you should rate each one separately rate_organization(organization, when)
# for organization in organizations.all():
# rate_organization(organization, when)
def rate_organization(organization, when=""): def rate_organization(organization, when=""):
...@@ -790,8 +788,8 @@ def rate_organization(organization, when=""): ...@@ -790,8 +788,8 @@ def rate_organization(organization, when=""):
total_rating = 0 total_rating = 0
# todo: closing off urls, after no relevant endpoints, but still resolvable. # todo: closing off urls, after no relevant endpoints, but still resolvable.
urls = get_relevant_urls_at_timepoint(organization=organization, urls = get_relevant_urls_at_timepoint(organization=organization, when=when)
when=when)
all_url_ratings = [] all_url_ratings = []
calculation_json = [] calculation_json = []
for url in urls: for url in urls:
......
from datetime import datetime # admin functions from datetime import datetime
import pytz # admin functions import pytz
import logging
from django.contrib import admin from django.contrib import admin
from jet.admin import CompactInline from jet.admin import CompactInline
from ..app.models import Job from ..app.models import Job
from ..map.determineratings import OrganizationRating, UrlRating, rate_organization, rate_urls
from ..scanners.models import Endpoint from failmap_admin.map.determineratings import (OrganizationRating, UrlRating, rate_organization,
from ..scanners.scanner_dns import brute_known_subdomains, certificate_transparency rate_selected_organizations, rate_urls)
from ..scanners.scanner_http import scan_urls_on_standard_ports from failmap_admin.scanners.models import Endpoint
from ..scanners.scanner_plain_http import scan_urls as plain_http_scan_urls from failmap_admin.scanners.scanner_dns import brute_known_subdomains, certificate_transparency
from ..scanners.scanner_screenshot import screenshot_urls from failmap_admin.scanners.scanner_http import scan_urls_on_standard_ports
from ..scanners.scanner_security_headers import scan_urls as security_headers_scan_urls from failmap_admin.scanners.scanner_plain_http import scan_urls as plain_http_scan_urls
from ..scanners.scanner_tls_qualys import ScannerTlsQualys from failmap_admin.scanners.scanner_screenshot import screenshot_urls
from failmap_admin.scanners.scanner_security_headers import scan_urls as security_headers_scan_urls
from failmap_admin.scanners.scanner_tls_qualys import ScannerTlsQualys
from .models import Coordinate, Organization, OrganizationType, Url from .models import Coordinate, Organization, OrganizationType, Url
# Solved: http://stackoverflow.com/questions/11754877/
# troubleshooting-related-field-has-invalid-lookup-icontains logger = logging.getLogger(__name__)
# while correct, error should point to ModelAdmin.search fields documentation
class UrlAdminInline(CompactInline): class UrlAdminInline(CompactInline):
...@@ -97,14 +99,14 @@ class UrlAdmin(admin.ModelAdmin): ...@@ -97,14 +99,14 @@ class UrlAdmin(admin.ModelAdmin):
class Media: class Media:
js = ('js/action_buttons.js', ) js = ('js/action_buttons.js', )
list_display = ('url', 'is_dead_reason', 'not_resolvable', 'created_on') list_display = ('url', 'endpoints', 'onboarded', 'uses_dns_wildcard', 'is_dead', 'not_resolvable')
search_fields = ('url', ) search_fields = ('url', )
list_filter = ('url', 'is_dead', 'is_dead_since', 'is_dead_reason', list_filter = ('url', 'is_dead', 'is_dead_since', 'is_dead_reason',
'not_resolvable', 'uses_dns_wildcard', 'organization') 'not_resolvable', 'uses_dns_wildcard', 'organization')
fieldsets = ( fieldsets = (
(None, { (None, {
'fields': ('url', 'organization', 'created_on') 'fields': ('url', 'organization', 'created_on', 'onboarded')
}), }),
('DNS', { ('DNS', {
'fields': ('uses_dns_wildcard', ), 'fields': ('uses_dns_wildcard', ),
...@@ -116,42 +118,56 @@ class UrlAdmin(admin.ModelAdmin): ...@@ -116,42 +118,56 @@ class UrlAdmin(admin.ModelAdmin):
'fields': ('is_dead', 'is_dead_since', 'is_dead_reason'), 'fields': ('is_dead', 'is_dead_since', 'is_dead_reason'),
}), }),
) )
readonly_fields = ['created_on'] readonly_fields = ['created_on', 'onboarded']
def is_dead(self):
if self.something == '1':
return True
return False
is_dead.boolean = True def endpoints(self, obj: Url):
is_dead = property(is_dead) return obj.endpoint_set.count()
inlines = [EndpointAdminInline, UrlRatingAdminInline] inlines = [EndpointAdminInline, UrlRatingAdminInline]
actions = [] actions = []
def onboard(self, request, queryset):
# todo, sequentially doesn't matter if you only use tasks :)
# currently it might crash given there are no endpoints yet to process...
for url in queryset:
if url.is_top_level():
brute_known_subdomains([url])
certificate_transparency([url])
scan_urls_on_standard_ports([url]) # discover endpoints
plain_http_scan_urls([url]) # see if there is missing https
security_headers_scan_urls([url])
screenshot_urls([url])
url.onboarded = True
url.onboarded_on = datetime.now(pytz.utc)
url.save()
self.message_user(request, "Onboard: Done")
actions.append('onboard')
onboard.short_description = "🔮 Onboard (dns, endpoints, scans, screenshot)"
def dns_certificate_transparency(self, request, queryset): def dns_certificate_transparency(self, request, queryset):
certificate_transparency([url for url in queryset]) certificate_transparency([url for url in queryset])
self.message_user(request, "URL(s) have been scanned on known subdomains: Done") self.message_user(request, "URL(s) have been scanned on known subdomains: Done")
actions.append('dns_certificate_transparency') actions.append('dns_certificate_transparency')
dns_certificate_transparency.short_description = "Discover subdomains (using certificate transparency)" dns_certificate_transparency.short_description = "🗺 Discover subdomains (using certificate transparency)"
def dns_known_subdomains(self, request, queryset): def dns_known_subdomains(self, request, queryset):
brute_known_subdomains([url for url in queryset]) brute_known_subdomains([url for url in queryset])
self.message_user(request, "Discover subdomains (using known subdomains): Done") self.message_user(request, "Discover subdomains (using known subdomains): Done")
dns_known_subdomains.short_description = "Discover subdomains (using known subdomains)" dns_known_subdomains.short_description = "🗺 Discover subdomains (using known subdomains)"
actions.append('dns_known_subdomains') actions.append('dns_known_subdomains')
def discover_http_endpoints(self, request, queryset): def discover_http_endpoints(self, request, queryset):
scan_urls_on_standard_ports([url for url in queryset]) scan_urls_on_standard_ports([url for url in queryset])
self.message_user(request, "Discover http(s) endpoints: Done") self.message_user(request, "Discover http(s) endpoints: Done")
discover_http_endpoints.short_description = "Discover http(s) endpoints" discover_http_endpoints.short_description = "🗺 Discover http(s) endpoints"
actions.append('discover_http_endpoints') actions.append('discover_http_endpoints')
def scan_tls_qualys(self, request, queryset): def scan_tls_qualys(self, request, queryset):
ScannerTlsQualys().scan([url.url for url in queryset]) ScannerTlsQualys().scan([url.url for url in queryset])
self.message_user(request, "Scan TLS (qualys, slow): Scheduled with Priority") self.message_user(request, "Scan TLS (qualys, slow): Scheduled with Priority")
scan_tls_qualys.short_description = "Scan TLS (qualys, slow)" scan_tls_qualys.short_description = "🔬 Scan TLS (qualys, slow)"
actions.append('scan_tls_qualys') actions.append('scan_tls_qualys')
def security_headers(self, request, queryset): def security_headers(self, request, queryset):
...@@ -161,31 +177,32 @@ class UrlAdmin(admin.ModelAdmin): ...@@ -161,31 +177,32 @@ class UrlAdmin(admin.ModelAdmin):
name = "Scan Security Headers (%s) " % str(urls) name = "Scan Security Headers (%s) " % str(urls)
job = Job.create(task, name, request) job = Job.create(task, name, request)
self.message_user(request, "%s: job created, id:%s" % (name, str(job))) self.message_user(request, "%s: job created, id:%s" % (name, str(job)))
security_headers.short_description = "Scan Security Headers" security_headers.short_description = "🔬 Scan Security Headers"
actions.append('security_headers') actions.append('security_headers')
def plain_http_scan(self, request, queryset): def plain_http_scan(self, request, queryset):
plain_http_scan_urls([url for url in queryset]) plain_http_scan_urls([url for url in queryset])
self.message_user(request, "Scan Plain Http: done") self.message_user(request, "Scan Plain Http: done")
plain_http_scan.short_description = "Scan Plain Http" plain_http_scan.short_description = "🔬 Scan Plain Http"
actions.append('plain_http_scan') actions.append('plain_http_scan')
def screenshots(self, request, queryset): def screenshots(self, request, queryset):
screenshot_urls([url for url in queryset]) screenshot_urls([url for url in queryset])
self.message_user(request, "Create screenshot: Done") self.message_user(request, "Create screenshot: Done")
screenshots.short_description = "Create screenshot" screenshots.short_description = "📷 Create screenshot"
actions.append('screenshots') actions.append('screenshots')
def rate_url(self, request, queryset): def rate_url(self, request, queryset):
rate_urls([url for url in queryset]) rate_urls([url for url in queryset])
self.message_user(request, "Rate Url: done") self.message_user(request, "Rate Url: done")
rate_url.short_description = "Rate Url" rate_url.short_description = "Rate Url"
actions.append('rate_url') actions.append('rate_url')
def rate_organization_(self, request, queryset): def rate_organization_(self, request, queryset):
rate_organization([url.organization for url in queryset]) print(list(url.organization.all()) for url in queryset)
rate_selected_organizations(list(url.organization.all()) for url in queryset)
self.message_user(request, "Rate Organization: done") self.message_user(request, "Rate Organization: done")
rate_organization_.short_description = "Rate Organization" rate_organization_.short_description = "Rate Organization"
actions.append('rate_organization_') actions.append('rate_organization_')
def declare_dead(self, request, queryset): def declare_dead(self, request, queryset):
...@@ -195,7 +212,7 @@ class UrlAdmin(admin.ModelAdmin): ...@@ -195,7 +212,7 @@ class UrlAdmin(admin.ModelAdmin):
url.is_dead_since = datetime.now(pytz.utc) url.is_dead_since = datetime.now(pytz.utc)
url.save() url.save()
self.message_user(request, "Declare dead: Done") self.message_user(request, "Declare dead: Done")
declare_dead.short_description = "Declare dead" declare_dead.short_description = "🔪 Declare dead"
actions.append('declare_dead') actions.append('declare_dead')
......
...@@ -53,14 +53,18 @@ def scan(organization_names: List[str], execute=True): ...@@ -53,14 +53,18 @@ def scan(organization_names: List[str], execute=True):
@app.task @app.task
def scan_urls(urls: List[Url], execute=True): def scan_urls(urls: List[Url], execute=True):
"""Compose and execute taskset to scan specified urls.""" """Compose and execute taskset to scan specified urls."""
task = compose(urls=urls) try:
return task.apply_async() if execute else task task = compose(urls=urls)
return task.apply_async() if execute else task
except (ValueError, Endpoint.DoesNotExist):
logger.error('Could not schedule scans, due to error reported above.')
def compose(organizations: List[Organization]=None, urls: List[Url]=None): def compose(organizations: List[Organization]=None, urls: List[Url]=None):
"""Compose taskset to scan specified organizations or urls (not both).""" """Compose taskset to scan specified organizations or urls (not both)."""
if not any([organizations, urls]): if not any([organizations, urls]):
logger.error('No organizations or urls supplied.')
raise ValueError("No organizations or urls supplied.") raise ValueError("No organizations or urls supplied.")
# collect all scannable urls for provided organizations # collect all scannable urls for provided organizations
...@@ -74,6 +78,7 @@ def compose(organizations: List[Organization]=None, urls: List[Url]=None): ...@@ -74,6 +78,7 @@ def compose(organizations: List[Organization]=None, urls: List[Url]=None):
endpoints = Endpoint.objects.all().filter(url__in=urls, is_dead=False, protocol__in=['http', 'https']) endpoints = Endpoint.objects.all().filter(url__in=urls, is_dead=False, protocol__in=['http', 'https'])
if not endpoints: if not endpoints:
logger.error('No endpoints exist for the selected urls.')
raise Endpoint.DoesNotExist("No endpoints exist for the selected urls.") raise Endpoint.DoesNotExist("No endpoints exist for the selected urls.")
logger.debug('scanning %s endpoints for %s urls', len(endpoints), len(urls)) logger.debug('scanning %s endpoints for %s urls', len(endpoints), len(urls))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment