updated admin actions, equalized method calls

parent a05de415
......@@ -851,6 +851,11 @@ def rate_organization(organization, when=""):
"The calculation is still the same, not creating a new OrganizationRating")
def rate_urls(urls, when=""):
for url in urls:
rate_url(url, when)
# also callable as admin action
# this is incomplete, use the timeline variant -> it's better with endpoints over time.
# but this will give the correct score, possibly on the wrong endpoints.
......
......@@ -5,11 +5,14 @@ from django.contrib import admin
from jet.admin import CompactInline
from failmap_admin.map.determineratings import (OrganizationRating, UrlRating, rate_organization,
rate_url)
rate_urls)
from failmap_admin.scanners.models import Endpoint
from failmap_admin.scanners.scanner_dns import brute_known_subdomains, certificate_transparency
from failmap_admin.scanners.scanner_http import scan_urls_on_standard_ports
from failmap_admin.scanners.scanner_tls_qualys import ScannerTlsQualys
from failmap_admin.scanners.scanner_screenshot import screenshot_urls
from failmap_admin.scanners.scanner_plain_http import scan_urls as plain_http_scan_urls
from failmap_admin.scanners.scanner_security_headers import scan_urls as security_headers_scan_urls
from .models import Coordinate, Organization, OrganizationType, Url
......@@ -120,66 +123,71 @@ class UrlAdmin(admin.ModelAdmin):
inlines = [EndpointAdminInline, UrlRatingAdminInline]
actions = ['rate_url', 'dns_subdomains', 'dns_transparency', 'discover_http_endpoints',
'scan_url', 'declare_dead', 'print_on_commandline']
actions = []
def declare_dead(self, request, queryset):
for url in queryset:
url.is_dead = True
url.is_dead_reason = "Killed via admin interface"
url.is_dead_since = datetime.now(pytz.utc)
url.save()
self.message_user(request, "URL(s) have been declared dead")
def dns_certificate_transparency(self, request, queryset):
certificate_transparency([url for url in queryset])
self.message_user(request, "URL(s) have been scanned on known subdomains: Done")
actions.append('dns_certificate_transparency')
dns_certificate_transparency.short_description = "Discover subdomains (using certificate transparency)"
def rate_url(self, request, queryset):
for url in queryset:
rate_url(url=url)
self.message_user(request, "URL(s) have been rated")
def scan_url(self, request, queryset):
urls_to_scan = []
for url in queryset:
urls_to_scan.append(url.url)
s = ScannerTlsQualys()
s.scan(urls_to_scan)
self.message_user(request, "URL(s) have been scanned on TLS")
def dns_known_subdomains(self, request, queryset):
brute_known_subdomains([url for url in queryset])
self.message_user(request, "Discover subdomains (using known subdomains): Done")
dns_known_subdomains.short_description = "Discover subdomains (using known subdomains)"
actions.append('dns_known_subdomains')
def discover_http_endpoints(self, request, queryset):
urls_to_scan = [url for url in queryset]
scan_urls_on_standard_ports(urls_to_scan)
self.message_user(request, "URL(s) have been scanned for HTTP")
def dns_subdomains(self, request, queryset):
for url in queryset:
brute_known_subdomains(url)
self.message_user(request, "URL(s) have been scanned on known subdomains.")
scan_urls_on_standard_ports([url for url in queryset])
self.message_user(request, "Discover http(s) endpoints: Done")
discover_http_endpoints.short_description = "Discover http(s) endpoints"
actions.append('discover_http_endpoints')
def scan_tls_qualys(self, request, queryset):
ScannerTlsQualys().scan([url.url for url in queryset])
self.message_user(request, "Scan TLS (qualys, slow): Scheduled with Priority")
scan_tls_qualys.short_description = "Scan TLS (qualys, slow)"
actions.append('scan_tls_qualys')
def security_headers(self, request, queryset):
security_headers_scan_urls(urls=([url for url in queryset]))
self.message_user(request, "Scan Security Headers: done")
security_headers.short_description = "Scan Security Headers"
actions.append('security_headers')
def plain_http_scan(self, request, queryset):
plain_http_scan_urls([url for url in queryset])
self.message_user(request, "Scan Plain Http: done")
plain_http_scan.short_description = "Scan Plain Http"
actions.append('plain_http_scan')
def screenshots(self, request, queryset):
screenshot_urls([url for url in queryset])
self.message_user(request, "Create screenshot: Done")
screenshots.short_description = "Create screenshot"
actions.append('screenshots')
def dns_transparency(self, request, queryset):
for url in queryset:
certificate_transparency(url)
def rate_url(self, request, queryset):
rate_urls([url for url in queryset])
self.message_user(request, "Rate Url: done")
rate_url.short_description = "Rate Url"
actions.append('rate_url')
self.message_user(request, "URL(s) have been scanned on known subdomains.")
def rate_organization_(self, request, queryset):
rate_organization([url.organization for url in queryset])
self.message_user(request, "Rate Organization: done")
rate_organization_.short_description = "Rate Organization"
actions.append('rate_organization_')
def print_on_commandline(self, request, queryset):
def declare_dead(self, request, queryset):
for url in queryset:
print(url.url)
dns_subdomains.short_description = "Scan DNS (known subdomains)"
dns_transparency.short_description = "Scan DNS (certificate transparency)"
discover_http_endpoints.short_description = "Discover HTTP(S) endpoints"
scan_url.short_description = "Scan (tls qualys)"
rate_url.short_description = "Rate"
declare_dead.short_description = "Declare dead" # can still scan it
print_on_commandline.short_description = "(debug) Print on command line"
url.is_dead = True
url.is_dead_reason = "Killed via admin interface"
url.is_dead_since = datetime.now(pytz.utc)
url.save()
self.message_user(request, "Declare dead: Done")
declare_dead.short_description = "Declare dead"
actions.append('declare_dead')
class OrganizationTypeAdmin(admin.ModelAdmin):
......
......@@ -7,9 +7,9 @@ from django.core.management.base import BaseCommand
from failmap_admin.organizations.models import Url
from failmap_admin.scanners.scanner_dns import brute_known_subdomains, certificate_transparency
from failmap_admin.scanners.scanner_http import scan_url_list_standard_ports
from failmap_admin.scanners.scanner_http import scan_urls_on_standard_ports
from failmap_admin.scanners.scanner_plain_http import scan_url
from failmap_admin.scanners.scanner_screenshot import screenshot_url
from failmap_admin.scanners.scanner_screenshot import screenshot_urls
logger = logging.getLogger(__package__)
......@@ -21,7 +21,7 @@ class Command(BaseCommand):
try:
logger.info("Started onboarding.")
while True:
Command.onboard()
onboard()
logger.info("Waiting for more urls to be onboarded. Sleeping for 60 seconds.")
sleep(60)
except KeyboardInterrupt:
......@@ -32,44 +32,43 @@ class Command(BaseCommand):
else:
logger.info("Stopped onboarding.")
@staticmethod
# todo: make scan log, so you can see what has been scanned, and what completed.
def onboard():
urls = Command.onboard_gather()
# perform initial tests that can happen asynchronous
for url in urls:
# scan for http/https endpoints
if url.is_top_level():
# some DNS scans, to find more urls to onboard.
brute_known_subdomains([url])
certificate_transparency(url) # todo, equal method calls
scan_url_list_standard_ports([url]) # takes about 60 seconds per url
scan_url(url) # plain http, takes about 10 seconds per url, if internet conn.
screenshot_url(url) # takes about 10 seconds per url, can hang.
# tls scans are picked up by scanner_tls_qualys and may take a while.
# other scans the same. They will do the ratings.
url.onboarded = True
url.onboarded_on = datetime.now(pytz.utc)
url.save()
@staticmethod
def onboard_existing_urls():
"""A quick fix for an existing database."""
urls = Url.objects.all()
for url in urls:
url.onboarded = True
url.onboarded_on = datetime.now(pytz.utc)
url.save()
@staticmethod
def onboard_gather():
never_onboarded = Url.objects.all().filter(onboarded=False)
if never_onboarded.count() > 0:
cyber = """
# todo: make scan log, so you can see what has been scanned, and what completed.
def onboard():
urls = onboard_gather()
for url in urls:
# scan for http/https endpoints
if url.is_top_level():
# some DNS scans, to find more urls to onboard.
brute_known_subdomains([url])
certificate_transparency([url])
scan_urls_on_standard_ports([url])
scan_url(url)
screenshot_urls([url])
# tls scans are picked up by scanner_tls_qualys and may take a while.
# other scans the same. They will do the ratings.
url.onboarded = True
url.onboarded_on = datetime.now(pytz.utc)
url.save()
def onboard_existing_urls():
"""A quick fix for an existing database."""
urls = Url.objects.all()
for url in urls:
url.onboarded = True
url.onboarded_on = datetime.now(pytz.utc)
url.save()
def onboard_gather():
never_onboarded = Url.objects.all().filter(onboarded=False)
if never_onboarded.count() > 0:
cyber = """
................................................................................
.......-:////:.....:-.......::...-///////:-......://////////:..../////////:.....
...../mMMMMMMMN...NMM+.....hMMy..+MMMMMMMMMNy-...dMMMMMMMMMMMN..-MMMMMMMMMMNy...
......@@ -83,7 +82,7 @@ class Command(BaseCommand):
...../dMMMMMMMN......./MMN.......+MMMMMMMMMNy-...dMMMMMMMMMMMN...NMM+....-mMMs..
.......-::::::.........-:........-::::::::-......::::::::::::.....:-.......::...
................................................................................
"""
logger.info("There are %s new urls to onboard! %s" % (never_onboarded.count(), cyber))
"""
logger.info("There are %s new urls to onboard! %s" % (never_onboarded.count(), cyber))
return never_onboarded
return never_onboarded
......@@ -114,7 +114,7 @@ def organization_certificate_transparency(organization):
addedlist = []
for url in urls:
addedlist = addedlist + certificate_transparency(url)
addedlist = addedlist + certificate_transparency([url])
return addedlist
......@@ -175,53 +175,55 @@ def search_engines_scan(url):
# todo: also include censys, google and let's encrypt( if has one )
def certificate_transparency(url):
def certificate_transparency(urls):
"""
Checks the certificate transparency database for subdomains. Using a regex the subdomains
are extracted. This method is extremely fast and reliable: these certificates all exist.
Hooray for transparency :)
:param url:
:param urls: List of Url objects
:return:
"""
import requests
import re
# https://crt.sh/?q=%25.zutphen.nl
crt_sh_url = "https://crt.sh/?q=%25." + str(url.url)
pattern = r"[^\s%>]*\." + str(url.url.replace(".", "\.")) # harder string formatting :)
for url in urls:
response = requests.get(crt_sh_url, timeout=(10, 10), allow_redirects=False)
matches = re.findall(pattern, response.text)
# https://crt.sh/?q=%25.zutphen.nl
crt_sh_url = "https://crt.sh/?q=%25." + str(url.url)
pattern = r"[^\s%>]*\." + str(url.url.replace(".", "\.")) # harder string formatting :)
subdomains = []
for match in matches:
# handle wildcards, sometimes subdomains have nice features.
# examples: *.apps.domain.tld.
# todo: perhaps store that it was a wildcard cert, for further inspection?
match = match.replace("*.", "")
if match != url.url:
subdomains.append(match[0:len(match) - len(url.url) - 1]) # wraps around
subdomains = [x.lower() for x in subdomains] # do lowercase normalization elsewhere
subdomains = set(subdomains)
# 25 and '' are created due to the percentage and empty subdomains. Remove them
# wildcards (*) are also not allowed.
if '' in subdomains:
subdomains.remove('')
if '25' in subdomains:
subdomains.remove('25')
response = requests.get(crt_sh_url, timeout=(10, 10), allow_redirects=False)
matches = re.findall(pattern, response.text)
logger.debug("Found subdomains: %s" % subdomains)
subdomains = []
for match in matches:
# handle wildcards, sometimes subdomains have nice features.
# examples: *.apps.domain.tld.
# todo: perhaps store that it was a wildcard cert, for further inspection?
match = match.replace("*.", "")
if match != url.url:
subdomains.append(match[0:len(match) - len(url.url) - 1]) # wraps around
addedlist = []
for subdomain in subdomains:
added = add_subdomain(subdomain, url)
if added:
addedlist.append(added)
return addedlist
subdomains = [x.lower() for x in subdomains] # do lowercase normalization elsewhere
subdomains = set(subdomains)
# 25 and '' are created due to the percentage and empty subdomains. Remove them
# wildcards (*) are also not allowed.
if '' in subdomains:
subdomains.remove('')
if '25' in subdomains:
subdomains.remove('25')
logger.debug("Found subdomains: %s" % subdomains)
addedlist = []
for subdomain in subdomains:
added = add_subdomain(subdomain, url)
if added:
addedlist.append(added)
return addedlist
def subdomains_harvester(url):
......
......@@ -9,7 +9,7 @@ import logging
from failmap_admin.organizations.models import Url
from failmap_admin.scanners.endpoint_scan_manager import EndpointScanManager
from failmap_admin.scanners.scanner_http import scan_url_list
from failmap_admin.scanners.scanner_http import scan_urls
from .models import Endpoint
......@@ -36,6 +36,11 @@ def scan_all_urls():
scan_url(url)
def scan_urls(urls):
for url in urls:
scan_url(url)
def scan_url(url):
scan_manager = EndpointScanManager
logger.debug("Checking for http only sites on: %s" % url)
......@@ -109,7 +114,7 @@ def scan_url(url):
def verify_is_secure(url):
# i've seen qualys saying there is no TLS, while there is!
# This _might_ revive an endpoint.
scan_url_list([url], 443, 'https')
scan_urls([url], [443], ['https'])
endpoints = Endpoint.objects.all().filter(url=url, is_dead=False,
protocol="https", port=443)
......
......@@ -39,17 +39,17 @@ working_directory = '../map/static/images/screenshots' # deprecated
script_directory = os.path.join(os.path.abspath(os.path.dirname(__file__))) # deprecated
@timeout(30, 'Took too long to make screenshot')
def screenshot_url(url):
def screenshot_urls(urls):
"""
Contains a pointer to the most accurate and fastes screenshot method.
Will remove the hassle of chosing the right screenshot tool.
:param url:
:param urls: list of url objects
:return:
"""
endpoints = Endpoint.objects.all().filter(url=url)
for endpoint in endpoints:
screenshot_with_chrome(endpoint)
for url in urls:
endpoints = Endpoint.objects.all().filter(url=url)
for endpoint in endpoints:
screenshot_with_chrome(endpoint)
@timeout(30, 'Took too long to make screenshot')
......
......@@ -43,20 +43,33 @@ def organizations_from_names(organization_names: List[str]) -> List[Organization
@app.task
def scan(organization_names: List[str], execute=True):
"""Compose and execute taskset to scan specified organizations."""
task = compose(organizations_from_names(organization_names))
task = compose(organizations=organizations_from_names(organization_names))
if execute:
return task.apply_async()
else:
return task
def compose(organizations: List[Organization]):
"""Compose taskset to scan specified organizations."""
@app.task
def scan_urls(urls: List[Url], execute=True):
"""Compose and execute taskset to scan specified urls."""
task = compose(urls=urls)
return task.apply_async() if execute else task
def compose(organizations: List[Organization]=None, urls: List[Url]=None):
"""Compose taskset to scan specified organizations or urls (not both)."""
if not any([organizations, urls]):
raise ValueError("No organizations or urls supplied.")
# collect all scannable urls for provided organizations
urls = Url.objects.all().filter(is_dead=False,
not_resolvable=False,
organization__in=organizations)
if organizations:
urls_organizations = Url.objects.all().filter(is_dead=False,
not_resolvable=False,
organization__in=organizations)
urls = list(urls_organizations) + urls if urls else list(urls_organizations)
endpoints = Endpoint.objects.all().filter(url__in=urls, is_dead=False, protocol__in=['http', 'https'])
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment