icons on buttons, improved listing to determine actions

parent 093de098
......@@ -773,11 +773,9 @@ def significant_times(organization=None, url=None):
# make sure the URL ratings are up to date, they will check endpoints and such.
# probably not used anymore
# def rate_organizations(organizations, when=""):
# # since a url can now have multiple organizations, you should rate each one separately
# for organization in organizations.all():
# rate_organization(organization, when)
def rate_selected_organizations(organizations, when=""):
for organization in organizations:
rate_organization(organization, when)
def rate_organization(organization, when=""):
......@@ -790,8 +788,8 @@ def rate_organization(organization, when=""):
total_rating = 0
# todo: closing off urls, after no relevant endpoints, but still resolvable.
urls = get_relevant_urls_at_timepoint(organization=organization,
when=when)
urls = get_relevant_urls_at_timepoint(organization=organization, when=when)
all_url_ratings = []
calculation_json = []
for url in urls:
......
from datetime import datetime # admin functions
from datetime import datetime
import pytz # admin functions
import pytz
import logging
from django.contrib import admin
from jet.admin import CompactInline
from ..app.models import Job
from ..map.determineratings import OrganizationRating, UrlRating, rate_organization, rate_urls
from ..scanners.models import Endpoint
from ..scanners.scanner_dns import brute_known_subdomains, certificate_transparency
from ..scanners.scanner_http import scan_urls_on_standard_ports
from ..scanners.scanner_plain_http import scan_urls as plain_http_scan_urls
from ..scanners.scanner_screenshot import screenshot_urls
from ..scanners.scanner_security_headers import scan_urls as security_headers_scan_urls
from ..scanners.scanner_tls_qualys import ScannerTlsQualys
from failmap_admin.map.determineratings import (OrganizationRating, UrlRating, rate_organization,
rate_selected_organizations, rate_urls)
from failmap_admin.scanners.models import Endpoint
from failmap_admin.scanners.scanner_dns import brute_known_subdomains, certificate_transparency
from failmap_admin.scanners.scanner_http import scan_urls_on_standard_ports
from failmap_admin.scanners.scanner_plain_http import scan_urls as plain_http_scan_urls
from failmap_admin.scanners.scanner_screenshot import screenshot_urls
from failmap_admin.scanners.scanner_security_headers import scan_urls as security_headers_scan_urls
from failmap_admin.scanners.scanner_tls_qualys import ScannerTlsQualys
from .models import Coordinate, Organization, OrganizationType, Url
# Solved: http://stackoverflow.com/questions/11754877/
# troubleshooting-related-field-has-invalid-lookup-icontains
# while correct, error should point to ModelAdmin.search fields documentation
logger = logging.getLogger(__name__)
class UrlAdminInline(CompactInline):
......@@ -97,14 +99,14 @@ class UrlAdmin(admin.ModelAdmin):
class Media:
js = ('js/action_buttons.js', )
list_display = ('url', 'is_dead_reason', 'not_resolvable', 'created_on')
list_display = ('url', 'endpoints', 'onboarded', 'uses_dns_wildcard', 'is_dead', 'not_resolvable')
search_fields = ('url', )
list_filter = ('url', 'is_dead', 'is_dead_since', 'is_dead_reason',
'not_resolvable', 'uses_dns_wildcard', 'organization')
fieldsets = (
(None, {
'fields': ('url', 'organization', 'created_on')
'fields': ('url', 'organization', 'created_on', 'onboarded')
}),
('DNS', {
'fields': ('uses_dns_wildcard', ),
......@@ -116,42 +118,56 @@ class UrlAdmin(admin.ModelAdmin):
'fields': ('is_dead', 'is_dead_since', 'is_dead_reason'),
}),
)
readonly_fields = ['created_on']
def is_dead(self):
if self.something == '1':
return True
return False
readonly_fields = ['created_on', 'onboarded']
is_dead.boolean = True
is_dead = property(is_dead)
def endpoints(self, obj: Url):
return obj.endpoint_set.count()
inlines = [EndpointAdminInline, UrlRatingAdminInline]
actions = []
def onboard(self, request, queryset):
# todo, sequentially doesn't matter if you only use tasks :)
# currently it might crash given there are no endpoints yet to process...
for url in queryset:
if url.is_top_level():
brute_known_subdomains([url])
certificate_transparency([url])
scan_urls_on_standard_ports([url]) # discover endpoints
plain_http_scan_urls([url]) # see if there is missing https
security_headers_scan_urls([url])
screenshot_urls([url])
url.onboarded = True
url.onboarded_on = datetime.now(pytz.utc)
url.save()
self.message_user(request, "Onboard: Done")
actions.append('onboard')
onboard.short_description = "🔮 Onboard (dns, endpoints, scans, screenshot)"
def dns_certificate_transparency(self, request, queryset):
certificate_transparency([url for url in queryset])
self.message_user(request, "URL(s) have been scanned on known subdomains: Done")
actions.append('dns_certificate_transparency')
dns_certificate_transparency.short_description = "Discover subdomains (using certificate transparency)"
dns_certificate_transparency.short_description = "🗺 Discover subdomains (using certificate transparency)"
def dns_known_subdomains(self, request, queryset):
brute_known_subdomains([url for url in queryset])
self.message_user(request, "Discover subdomains (using known subdomains): Done")
dns_known_subdomains.short_description = "Discover subdomains (using known subdomains)"
dns_known_subdomains.short_description = "🗺 Discover subdomains (using known subdomains)"
actions.append('dns_known_subdomains')
def discover_http_endpoints(self, request, queryset):
scan_urls_on_standard_ports([url for url in queryset])
self.message_user(request, "Discover http(s) endpoints: Done")
discover_http_endpoints.short_description = "Discover http(s) endpoints"
discover_http_endpoints.short_description = "🗺 Discover http(s) endpoints"
actions.append('discover_http_endpoints')
def scan_tls_qualys(self, request, queryset):
ScannerTlsQualys().scan([url.url for url in queryset])
self.message_user(request, "Scan TLS (qualys, slow): Scheduled with Priority")
scan_tls_qualys.short_description = "Scan TLS (qualys, slow)"
scan_tls_qualys.short_description = "🔬 Scan TLS (qualys, slow)"
actions.append('scan_tls_qualys')
def security_headers(self, request, queryset):
......@@ -161,31 +177,32 @@ class UrlAdmin(admin.ModelAdmin):
name = "Scan Security Headers (%s) " % str(urls)
job = Job.create(task, name, request)
self.message_user(request, "%s: job created, id:%s" % (name, str(job)))
security_headers.short_description = "Scan Security Headers"
security_headers.short_description = "🔬 Scan Security Headers"
actions.append('security_headers')
def plain_http_scan(self, request, queryset):
plain_http_scan_urls([url for url in queryset])
self.message_user(request, "Scan Plain Http: done")
plain_http_scan.short_description = "Scan Plain Http"
plain_http_scan.short_description = "🔬 Scan Plain Http"
actions.append('plain_http_scan')
def screenshots(self, request, queryset):
screenshot_urls([url for url in queryset])
self.message_user(request, "Create screenshot: Done")
screenshots.short_description = "Create screenshot"
screenshots.short_description = "📷 Create screenshot"
actions.append('screenshots')
def rate_url(self, request, queryset):
rate_urls([url for url in queryset])
self.message_user(request, "Rate Url: done")
rate_url.short_description = "Rate Url"
rate_url.short_description = "Rate Url"
actions.append('rate_url')
def rate_organization_(self, request, queryset):
rate_organization([url.organization for url in queryset])
print(list(url.organization.all()) for url in queryset)
rate_selected_organizations(list(url.organization.all()) for url in queryset)
self.message_user(request, "Rate Organization: done")
rate_organization_.short_description = "Rate Organization"
rate_organization_.short_description = "Rate Organization"
actions.append('rate_organization_')
def declare_dead(self, request, queryset):
......@@ -195,7 +212,7 @@ class UrlAdmin(admin.ModelAdmin):
url.is_dead_since = datetime.now(pytz.utc)
url.save()
self.message_user(request, "Declare dead: Done")
declare_dead.short_description = "Declare dead"
declare_dead.short_description = "🔪 Declare dead"
actions.append('declare_dead')
......
......@@ -53,14 +53,18 @@ def scan(organization_names: List[str], execute=True):
@app.task
def scan_urls(urls: List[Url], execute=True):
"""Compose and execute taskset to scan specified urls."""
task = compose(urls=urls)
return task.apply_async() if execute else task
try:
task = compose(urls=urls)
return task.apply_async() if execute else task
except (ValueError, Endpoint.DoesNotExist):
logger.error('Could not schedule scans, due to error reported above.')
def compose(organizations: List[Organization]=None, urls: List[Url]=None):
"""Compose taskset to scan specified organizations or urls (not both)."""
if not any([organizations, urls]):
logger.error('No organizations or urls supplied.')
raise ValueError("No organizations or urls supplied.")
# collect all scannable urls for provided organizations
......@@ -74,6 +78,7 @@ def compose(organizations: List[Organization]=None, urls: List[Url]=None):
endpoints = Endpoint.objects.all().filter(url__in=urls, is_dead=False, protocol__in=['http', 'https'])
if not endpoints:
logger.error('No endpoints exist for the selected urls.')
raise Endpoint.DoesNotExist("No endpoints exist for the selected urls.")
logger.debug('scanning %s endpoints for %s urls', len(endpoints), len(urls))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment