...
 
Commits (8)
---
# Configuration file for the Bandit python security scanner
# https://bandit.readthedocs.io/en/latest/config.html
# If `tests` is empty, all tests are are considered included.
tests:
# - B101
# - B102
skips:
- B101 # skip "assert used" check since assertions are required in pytests
......@@ -42,7 +42,7 @@ include:
template: Dependency-Scanning.gitlab-ci.yml
variables:
DS_PYTHON_VERSION: 2
DS_PYTHON_VERSION: 3
stages:
- lint
......
......@@ -12,6 +12,9 @@ max-line-length = 119
[pycodestyle]
max-line-length = 119
[yapf]
BLANK_LINE_BEFORE_NESTED_CLASS_OR_DEF=true
COLUMN_LIMIT = 119
[metadata]
description-file = README.md
from setuptools import setup, find_packages
setup(name='sl1ther',
version='0.8.1',
version='0.8.3',
description='ScienceLogic Python wrapper, extending the EM7/SL1 API.',
url='https://gitlab.com/saxmanmike/slither',
author='Michael Gray',
......
This diff is collapsed.
......@@ -2,16 +2,14 @@
import json
import os
from sys import stderr
from flask import Flask, render_template, request
import slither.functions as functions
import redis
import requests
from flask import Flask, render_template, request
from slither.functions import Functions, get_org
app = Flask(__name__)
__version__ = "0.8.1"
__version__ = "0.8.3"
if os.getenv("BASE"):
app.config["BASE"] = str(os.getenv("BASE")) + "/api"
......@@ -26,30 +24,26 @@ if os.getenv("INSECURE_SSL") == "True":
app.config["INSECURE_SSL"] = True
if app.config["BASE"] is None:
print(
"Base URL is required. Please re-run setting BASE=<your ScienceLogic URL>.",
file=stderr,
)
app.logger.error("Base URL is required. Please re-run setting BASE=<your ScienceLogic URL>.")
exit()
if app.config["USER"] is None or app.config["PASS"] is None:
print(
"Authentication information is required for most functions.\
Please re-run and set USER and PASS environment variables.",
file=stderr,
)
app.logger.error("Authentication information is required for most functions.\
Please re-run and set USER and PASS environment variables.")
exit()
try:
r = app.config["r"] = redis.Redis(host=app.config["REDIS_HOST"])
# TODO: Add possibility of Redis authentication
orgs = functions.get_org(app.config["USER"], app.config["PASS"], app.config["BASE"])
orgs = get_org(app.config["USER"], app.config["PASS"], app.config["BASE"])
r.set("orgs", json.dumps(orgs))
except Exception as e: # TODO: Specific exception
print("No Redis connection available. Re-run setting REDIS_HOST appropriately.",
file=stderr)
print("Exception: {}".format(e), file=stderr)
app.logger.error("No Redis connection available. Re-run setting REDIS_HOST appropriately.")
app.logger.error(f"Exception: {e}")
exit()
functions = Functions(app.logger, app.config["BASE"], app.config["PASS"], app.config["base"], r,
app.config["INSECURE_SSL"])
@app.route("/")
def index():
......@@ -63,45 +57,35 @@ def assign_sku():
if request.method == "POST":
sku = request.form.get("sku")
dids = request.form["dids"].split(",")
username = request.form.get("username")
password = request.form.get("password")
status = functions.set_sku(sku, dids, username, password, app.config["BASE"])
try:
status = functions.set_sku(sku, dids)
except:
app.status_code = 500
data = {}
for did in dids:
item = 0
data[did] = {"sku": sku, "status": status[item], "username": username}
item += 1
for item, did in enumerate(dids):
data[did] = {"sku": sku, "status": status[item], "username": functions.USER}
app.logger.info(data)
return render_template("sku_results.html", data=data)
return render_template("sku.html")
@app.route("/devgroup_report", methods=["GET", "POST"])
def devgroup_report(USER=app.config["USER"],
PASS=app.config["PASS"],
base=app.config["BASE"]):
def devgroup_report(USER=app.config["USER"], PASS=app.config["PASS"], base=app.config["BASE"]):
"""Generate a device group report."""
if request.method == "POST":
try:
username = request.form.get("username")
password = request.form.get("password")
uri = request.form.get("uri")
report = functions.devicegroup_report(
uri,
username,
password,
app.config["BASE"],
app.config["r"]
)
report = functions.devicegroup_report(uri)
if report is None:
app.status_code = 500
return render_template("devgroup_results.html", report=report)
except requests.exceptions.RequestException as e:
return e
r = requests.get(base + "/device_group?limit=1000", auth=(USER, PASS), verify=app.config["INSECURE_SSL"])
r = requests.get(f"{base}/device_group?limit=1000", auth=(USER, PASS), verify=app.config["INSECURE_SSL"])
data = []
for dev in r.json()["result_set"]:
data.append({dev["URI"].strip("/api/device_group/"): dev["description"]})
data.append({dev["URI"].replace("/api/device_group/", ""): dev["description"]})
return render_template("devgroup_report.html", data=data)
......@@ -109,34 +93,18 @@ def devgroup_report(USER=app.config["USER"],
@app.route("/empty_skus", methods=["GET", "POST"])
def empty_sku_page():
"""Get a list of devices that don't have a custom attribute called SKU."""
if request.method == "POST":
username = request.form.get("username")
password = request.form.get("password")
report = functions.empty_skus(
username,
password,
app.config["BASE"],
app.config["r"]
)
if request.method in ("GET", "POST"):
report = functions.empty_skus()
return render_template("empty_sku_results.html", report=report)
return render_template("empty_sku.html")
@app.route("/event_id", methods=["GET", "POST"])
def event_page():
"""Get information on an event based on its event ID."""
if request.method == "POST":
username = request.form.get("username")
password = request.form.get("password")
event_id = request.form.get("event_id")
active = request.form.get("active")
report = functions.get_event(
event_id,
active,
username,
password,
app.config["BASE"]
)
report = functions.get_event(event_id, active)
return render_template("event_result.html", report=report)
return render_template("event.html")
......@@ -145,16 +113,16 @@ def event_page():
def add_notes_page():
"""Add a note to one or more devices by DID."""
if request.method == "POST":
username = request.form.get("username")
password = request.form.get("password")
note = request.form.get("note")
dids = request.form.get("dids").split(",")
status = functions.add_notes(note, dids, username, password, app.config["BASE"])
try:
status = functions.add_notes(note, dids)
except:
app.status_code = 500
data = {}
for did in dids:
item = 0
data[did] = {"note": note, "status": status[item], "username": username}
item += 1
for item, did in enumerate(dids):
data[did] = {"note": note, "status": status[item], "username": functions.USER}
app.logger.info(data)
return render_template("add_notes_results.html", data=data)
return render_template("add_notes.html")
......@@ -163,15 +131,11 @@ def add_notes_page():
def see_notes_page():
"""See all notes on one or more devices."""
if request.method == "POST":
username = request.form.get("username")
password = request.form.get("password")
dids = request.form.get("dids").split(",")
data = functions.see_notes(dids, username, password, app.config["BASE"])
data = functions.see_notes(dids)
report = {}
for did in dids:
item = 0
for item, did in enumerate(dids):
report[did] = {"note": data[item]}
item += 1
return render_template("see_notes_results.html", report=report)
return render_template("see_notes.html")
......@@ -187,9 +151,7 @@ def view_cidr():
stripped = functions.cidr(ips, subnet, octet)
results = "\r\n".join(stripped)
original = "\r\n".join(ips)
return render_template(
"iptocidr_results.html", original=original, results=results
)
return render_template("iptocidr_results.html", original=original, results=results)
return render_template("iptocidr.html")
......@@ -208,14 +170,7 @@ def serial_report():
"""Generate a list of devices and their serial numbers, if they have them."""
if request.method == "POST":
company = request.form.get("company")
username = request.form.get("username")
password = request.form.get("password")
report = functions.serial_report(
company,
username,
password,
app.config["BASE"], app.config["r"]
)
report = functions.serial_report(company)
return render_template("serial_results.html", report=report)
r = app.config["r"]
data = json.loads(r.get("orgs"))
......@@ -225,18 +180,17 @@ def serial_report():
@app.errorhandler(500)
def server_error(e):
"""Handle HTTP 500 errors."""
print("An error occurred during a request.", file=stderr)
return (
"""
An internal error occurred: <pre>{}</pre>
app.logger.error(f"An error occurred during a request: {e}")
return (f"""
An internal error ocurred: <pre>{e}</pre>
See logs for full stacktrace.
""".format(e)), 500
""", 500)
@app.errorhandler(404)
def not_found(e):
"""It's me, 404."""
print("Page not found.", file=stderr)
app.logger.error("Page not found.")
return render_template("404.html"), 404
......
......@@ -8,10 +8,6 @@
<textarea rows="4" cols="50" name="note" form="add_notes">Note To Add:</textarea><br>
<label for="dids"> DIDs (Comma-separated):</label>
<input type="text" name="dids"><br>
<label for="username">EM7 Username:</label>
<input type="text" name="username"><br>
<label for="password">Password:</label>
<input type="password" name="password"><br>
<input type="submit" value="Submit"><br>
</form>
</div>
......
......@@ -12,8 +12,6 @@
{% endfor %}
{% endfor %}
</select><br>
<div align="left">EM7 Username: </div> <input type="text" name="username"><br>
<div align="left">Password: </div> <input type="password" name="password"><br>
<input type="submit" value="Submit"><br>
</form>
</div>
......
{% extends "header.html" %}
{% block content %}
<div style="height: 100vh">
<div class="flex-center flex-column">
<h1 class="animated fadeIn mb-4">Please authenticate to see a list of devices with empty SKUs:</h1>
<form method="POST">
<div align="left">EM7 Username: </div> <input type="text" name="username"><br>
<div align="left">Password: </div> <input type="password" name="password"><br>
<input type="submit" value="Submit"><br>
</form>
</div>
</div>
{% endblock %}
\ No newline at end of file
......@@ -7,8 +7,6 @@
<form method="POST">
Event ID: <input type="text" name="event_id"><br>
Active Event <input type="checkbox" name="active"><br>
EM7 Username: <input type="text" name="username"><br>
Password: <input type="password" name="password"><br>
<input type="submit" value="Submit"><br>
</form>
</div>
......
......@@ -6,8 +6,6 @@
<form method="POST">
DIDs (comma-separated): <input type="text" name="dids"><br>
EM7 Username: <input type="text" name="username"><br>
Password: <input type="password" name="password"><br>
<input type="submit" value="Submit"><br>
</form>
</div>
......
......@@ -10,8 +10,6 @@
<option value={{ key }}>{{ value }}</option>
{% endfor %}
</select><br>
<div align="left">EM7 Username: </div> <input type="text" name="username"><br>
<div align="left">Password: </div> <input type="password" name="password"><br>
<input type="submit" value="Submit"><br>
</form>
</div>
......
......@@ -7,8 +7,6 @@
<form method="POST">
SKU: <input type="text" name="sku"><br>
DIDs (Comma-separated): <input type="text" name="dids"><br>
EM7 Username: <input type="text" name="username"><br>
Password: <input type="password" name="password"><br>
<input type="submit" value="Submit"><br>
</form>
</div>
......
import slither.functions as functions
"""Test functions for SL1ther."""
import logging
from slither.functions import get_org, Functions
logging.basicConfig(format='%(asctime)s %(message)s')
functions = Functions(logging, None, None, "", None, True)
class TestIPExtractor:
"""Test IP Extractor function."""
def test_ip_extractor(self):
"""Test for a positive."""
text = open("tests/ip_text.txt").read()
assert functions.ip_extractor(text) == ["10.0.0.1"]
def test_negative(self):
"""Test for a negative."""
text = open("tests/ip_text.txt").read()
assert functions.ip_extractor(text[9:]) == []
class TestCIDR:
"""Test CIDR notation function."""
def test1(self):
assert functions.cidr(
["10.0.0.1", "192.168.1.1", "8.8.8.8", "172.17.17.3"], "24", "one"
) == ["10.0.0.0/24", "192.168.1.0/24", "8.8.8.0/24", "172.17.17.0/24"]
"""Test for positive /24."""
assert functions.cidr(["10.0.0.1", "192.168.1.1", "8.8.8.8", "172.17.17.3"], "24",
"one") == ["10.0.0.0/24", "192.168.1.0/24", "8.8.8.0/24", "172.17.17.0/24"]
def test2(self):
result = functions.cidr(["0.0.0.0"], "64", "one")
"""Test for negative, subnet mask is too large."""
result = functions.cidr(["10.0.0.0"], "64", "one")
assert result == "Invalid subnet mask! Please type a number between 1 and 32."
def test3(self):
result = functions.cidr(["0.0.0.0"], "-3", "one")
"""Test for negative, subnet mask is a negative number."""
result = functions.cidr(["10.0.0.0"], "-3", "one")
assert result == "Invalid subnet mask! Please type a number between 1 and 32."
def test4(self):
assert functions.cidr(
["10.0.0.1", "192.168.1.1", "8.8.8.8", "172.17.17.3"], "16", "two"
) == ["10.0.0.0/16", "192.168.0.0/16", "8.8.0.0/16", "172.17.0.0/16"]
"""Test for positive /16."""
assert functions.cidr(["10.0.0.1", "192.168.1.1", "8.8.8.8", "172.17.17.3"], "16",
"two") == ["10.0.0.0/16", "192.168.0.0/16", "8.8.0.0/16", "172.17.0.0/16"]
def test5(self):
assert functions.cidr(
["10.0.0.1", "192.168.1.1", "8.8.8.8", "172.17.17.3"], "8", "three"
) == ["10.0.0.0/8", "192.0.0.0/8", "8.0.0.0/8", "172.0.0.0/8"]
"""Test for positive /8."""
assert functions.cidr(["10.0.0.1", "192.168.1.1", "8.8.8.8", "172.17.17.3"], "8",
"three") == ["10.0.0.0/8", "192.0.0.0/8", "8.0.0.0/8", "172.0.0.0/8"]
class TestSerialReport:
"""Test for serial number report."""
def test1(self):
assert functions.serial_report(
"/api/organization/0", None, None, None, None, test=True
) == {
"""Test for positive."""
assert functions.serial_report("/api/organization/0") == {
"3": {
"Name": "TestDevice",
"DID": "3",
......@@ -54,26 +69,38 @@ class TestSerialReport:
class TestOrg:
"""Test to get organization mapping."""
def test1(self):
assert functions.get_org(None, None, None, test=True) == {
"""Test for positive."""
assert get_org(None, None, None, test=True) == {
"/api/organization/0": "System",
"/api/organization/1": "Test1",
}
class TestNotes:
"""Test for viewing notes on a device."""
def test1(self):
assert functions.see_notes(None, None, None, None, test=True) == ["Test note"]
"""Test for positive."""
assert functions.see_notes(None) == ["Test note"]
class TestMerge:
"""Test for the ability to merge two dictionaries."""
def test1(self):
"""Test for positive."""
assert functions.merge_two_dicts({"foo": 1}, {"bar": 2}) == {"foo": 1, "bar": 2}
class TestEvent:
"""Test for viewing events on a device."""
def test1(self):
assert functions.get_event(None, None, None, None, "", test=True) == {
"""Test for positive."""
assert functions.get_event("", "") == {
"uri": "/api/event_policy/3694",
"number": "3694",
"name": "Trap: Received trap from unknown device once",
......@@ -84,8 +111,11 @@ class TestEvent:
class TestSKU:
"""Test for generating a list of devices with empty custom attribute SKU."""
def test1(self):
assert functions.empty_skus(None, None, None, "", test=True) == {
"""Test for positive."""
assert functions.empty_skus() == {
"2": {
"DID": "2",
"Name": "Tester",
......@@ -108,8 +138,11 @@ class TestSKU:
class TestDeviceGroup:
"""Test for device group report."""
def test1(self):
assert functions.devicegroup_report(None, None, None, None, "", test=True) == {
"""Test for positive."""
assert functions.devicegroup_report("") == {
"2": {
"Name": "Tester",
"IP": "8.8.8.8",
......