...
 
Commits (5)
Subproject commit 2d906f5b2f95071b876202e1fbc219b6136eb753
Subproject commit 3d428649514bb858a7434087b2bb185b534b94fa
......@@ -25,12 +25,12 @@ bp = Blueprint(__name__)
@admin_route
async def add_domain(request, admin_id: int):
"""Add a domain."""
domain_name = str(request.json['domain'])
is_adminonly = bool(request.json['admin_only'])
is_official = bool(request.json['official'])
domain_name = str(request.body['domain'])
is_adminonly = bool(request.body['admin_only'])
is_official = bool(request.body['official'])
# default 3
permissions = int(request.json.get('permissions', 3))
permissions = int(request.body.get('permissions', 3))
db = request.app.db
......@@ -50,14 +50,14 @@ async def add_domain(request, admin_id: int):
async with DomainAddAction(request) as action:
action.update(domain_id=domain_id)
if 'owner_id' in request.json:
owner_id = int(request.json['owner_id'])
if 'owner_id' in request.body:
owner_id = int(request.body['owner_id'])
action.update(owner_id=owner_id)
await db.execute("""
INSERT INTO domain_owners (domain_id, user_id)
VALUES ($1, $2)
""", domain_id, int(request.json['owner_id']))
""", domain_id, int(request.body['owner_id']))
keys = solve_domain(domain_name)
await request.app.storage.raw_invalidate(*keys)
......@@ -87,7 +87,7 @@ async def _dp_check(db, domain_id: int, payload: dict,
@admin_route
async def patch_domain(request, admin_id: int, domain_id: int):
"""Patch a domain's information"""
payload = validate(request.json, ADMIN_MODIFY_DOMAIN)
payload = validate(request.body, ADMIN_MODIFY_DOMAIN)
updated_fields = []
db = request.app.db
......@@ -117,7 +117,7 @@ async def patch_domain(request, admin_id: int, domain_id: int):
@bp.post('/api/admin/email_domain/<domain_id:int>')
@admin_route
async def email_domain(request, admin_id: int, domain_id: int):
payload = validate(request.json, ADMIN_SEND_DOMAIN_EMAIL)
payload = validate(request.body, ADMIN_SEND_DOMAIN_EMAIL)
subject, body = payload['subject'], payload['body']
owner_id = await request.app.db.fetchval("""
......@@ -150,7 +150,7 @@ async def email_domain(request, admin_id: int, domain_id: int):
async def add_owner(request, admin_id: int, domain_id: int):
"""Add an owner to a single domain."""
try:
owner_id = int(request.json['owner_id'])
owner_id = int(request.body['owner_id'])
except (ValueError, KeyError):
raise BadInput('Invalid number for owner ID')
......
......@@ -67,7 +67,7 @@ async def _do_broadcast(request, subject, body):
@admin_route
async def email_broadcast(request, admin_id):
app = request.app
payload = validate(request.json, ADMIN_SEND_BROADCAST)
payload = validate(request.body, ADMIN_SEND_BROADCAST)
subject, body = payload['subject'], payload['body']
......
......@@ -59,7 +59,7 @@ async def handle_modify(obj_type: str, request,
table = 'files' if obj_type == 'file' else 'shortens'
field = 'file_id' if obj_type == 'file' else 'shorten_id'
payload = validate(request.json, ADMIN_MODIFY_FILE)
payload = validate(request.body, ADMIN_MODIFY_FILE)
new_domain = payload.get('domain_id')
new_shortname = payload.get('shortname')
......
......@@ -43,7 +43,7 @@ async def _admin_settings(request, admin_id):
async def change_admin_settings(request, admin_id):
"""Change own admin settings."""
try:
audit_emails = bool(request.json['audit_log_emails'])
audit_emails = bool(request.body['audit_log_emails'])
except (KeyError, ValueError, TypeError):
raise BadInput('bad/nonexistant value for audit_log_emails')
......
......@@ -277,7 +277,7 @@ async def inactive_users_handler(request, admin_id, page: int):
async def search_user(request, user_id: int, page: int):
"""Search a user by pattern matching the username."""
try:
pattern = str(request.json['search_term'])
pattern = str(request.body['search_term'])
except (KeyError, TypeError, ValueError):
raise BadInput('Invalid search_term')
......@@ -358,7 +358,7 @@ async def _pu_check(db, db_name,
@admin_route
async def modify_user(request, admin_id, user_id):
"""Modify a user's information."""
payload = validate(request.json, ADMIN_MODIFY_USER)
payload = validate(request.body, ADMIN_MODIFY_USER)
updated = []
......
......@@ -48,7 +48,7 @@ async def revoke_handler(request):
This applies to timed and non-timed tokens.
"""
payload = validate(request.json, REVOKE_SCHEMA)
payload = validate(request.body, REVOKE_SCHEMA)
user = await login_user(request)
# by rehashing the password we change the
......
......@@ -21,7 +21,7 @@ async def d1_check(request):
is a part of it.
"""
try:
ciphertext = request.json['data']
ciphertext = request.body['data']
except (TypeError, KeyError):
raise BadInput('Invalid json')
......
......@@ -116,7 +116,7 @@ async def list_handler(request):
async def delete_handler(request):
"""Invalidate a file."""
user_id = await token_check(request)
file_name = str(request.json['filename'])
file_name = str(request.body['filename'])
await delete_file(request.app, file_name, user_id)
......@@ -132,7 +132,7 @@ async def delete_all(request, user_id):
app = request.app
try:
password = request.json['password']
password = request.body['password']
except KeyError:
raise BadInput('password not provided')
......@@ -162,7 +162,7 @@ async def delete_single(request, user_id, shortname):
async def shortendelete_handler(request):
"""Invalidate a shorten."""
user_id = await token_check(request)
file_name = str(request.json['filename'])
file_name = str(request.body['filename'])
await delete_shorten(request.app, file_name, user_id)
......
......@@ -8,6 +8,7 @@ elixire - misc routes
import datetime
from sanic import Blueprint, response
from ..version import VERSION, API_VERSION
import urllib.parse
bp = Blueprint('misc')
......@@ -15,6 +16,28 @@ bp = Blueprint('misc')
def _owo(string: str) -> str:
return string.replace('0', '0w0').replace('r', 'w')
@bp.middleware('request')
def bodyparser(request):
"""Make body available at request.body"""
if request.body:
# This _probably_ won't become an issue but no sense in tempting fate
try:
curly = bytes('{', request.headers.get('content-encoding', 'utf8'))
except(LookupError):
curly = bytes('{', 'utf8')
if request.headers.get('content-type', 'application/json').startswith('application/x-www-form-urlencoded'):
form = request.form
# Cerberus doesn't like the old dict
new_form = dict()
for key in form.keys():
new_form[key] = str(urllib.parse.unquote(form[key][0]))
request.body = new_form
elif request.headers.get('content-type', '').startswith('application/json') or (request.headers.get('content-type', None) == None and request.body.startswith(curly)):
# Shaky, but it should be alright since we
# need to support people who for whatever reason can't set it or something
request.body = request.json
return
@bp.get('/api/hello')
async def hello_route(request):
......
......@@ -105,7 +105,7 @@ async def change_profile(request):
raise FeatureDisabled('changes on profile are currently disabled')
user_id = await token_check(request)
payload = validate(request.json, PROFILE_SCHEMA)
payload = validate(request.body, PROFILE_SCHEMA)
updated = []
......@@ -264,7 +264,7 @@ async def deactive_own_user(request):
Sends an email to them asking for actual confirmation.
"""
user_id = await token_check(request)
payload = validate(request.json, DEACTIVATE_USER_SCHEMA)
payload = validate(request.body, DEACTIVATE_USER_SCHEMA)
await password_check(request, user_id, payload['password'])
user_email = await request.app.db.fetchval("""
......@@ -449,7 +449,7 @@ async def deactivate_user_from_email(request):
@bp.post('/api/reset_password')
async def reset_password_req(request):
"""Send a password reset request."""
payload = validate(request.json, PASSWORD_RESET_SCHEMA)
payload = validate(request.body, PASSWORD_RESET_SCHEMA)
username = payload['username'].lower()
udata = await request.app.db.fetchrow("""
......@@ -506,7 +506,7 @@ Do not reply to this email specifically, it will not work.
@bp.post('/api/reset_password_confirm')
async def password_reset_confirmation(request):
"""Handle the confirmation of a password reset."""
payload = validate(request.json, PASSWORD_RESET_CONFIRM_SCHEMA)
payload = validate(request.body, PASSWORD_RESET_CONFIRM_SCHEMA)
token = payload['token']
new_pwd = payload['new_password']
......
......@@ -81,7 +81,7 @@ async def register_user(request):
if not request.app.econfig.REGISTRATIONS_ENABLED:
raise FeatureDisabled('Registrations are currently disabled')
payload = validate(request.json, REGISTRATION_SCHEMA)
payload = validate(request.body, REGISTRATION_SCHEMA)
username = payload['username'].lower()
password = payload['password']
......@@ -141,7 +141,7 @@ Your username is {uname}.
@bp.post('/api/recover_username')
async def recover_username(request):
payload = validate(request.json, RECOVER_USERNAME)
payload = validate(request.body, RECOVER_USERNAME)
app = request.app
email = payload['email']
......
......@@ -39,7 +39,7 @@ async def shorten_handler(request):
app = request.app
try:
url_toredir = str(request.json['url'])
url_toredir = str(request.body['url'])
url_parsed = urllib.parse.urlparse(url_toredir)
except (TypeError, ValueError):
raise BadInput('Invalid URL')
......
......@@ -167,7 +167,7 @@ async def login_user(request):
Returns a partial user row.
"""
payload = validate(request.json, LOGIN_SCHEMA)
payload = validate(request.body, LOGIN_SCHEMA)
# always treat usernames as all-lowercase
username = payload['user'].lower()
......
......@@ -64,22 +64,21 @@ def validate(document, schema):
if not validator.validate(document):
raise BadInput('Bad payload', validator.errors)
return document
return validator.document
PROFILE_SCHEMA = {
'username': {'type': 'string', 'required': False},
'password': {'type': 'password', 'required': False},
'domain': {'type': 'integer', 'nullable': True},
'domain': {'type': 'integer', 'nullable': True, 'coerce': int},
'subdomain': {'type': 'subdomain', 'nullable': True},
'shorten_domain': {'type': 'integer', 'nullable': True},
'shorten_domain': {'type': 'integer', 'nullable': True, 'coerce': int},
'shorten_subdomain': {'type': 'subdomain', 'nullable': True},
'new_password': {'type': 'password', 'nullable': True},
'email': {'type': 'email', 'nullable': True},
'consented': {'type': 'boolean', 'nullable': True, 'required': False},
'paranoid': {'type': 'boolean', 'nullable': True},
'consented': {'type': 'boolean', 'nullable': True, 'required': False, 'coerce': bool},
'paranoid': {'type': 'boolean', 'nullable': True, 'coerce': bool},
}
REGISTRATION_SCHEMA = {
......
Subproject commit 939127c95910c7f816bab29362f92433dff67f5f
Subproject commit 2a7228d0c40248714153f756075c3987a6e17e43
# elixire: Image Host software
# Copyright 2018-2019, elixi.re Team and the elixire contributors
# SPDX-License-Identifier: AGPL-3.0-only
from sanic.request import Request
from multidict import CIMultiDict
import pytest
from ..api.bp.misc import bodyparser
from ..api.errors import BadInput
from ..api import schema
equivalent_dict = {
"foo": True,
"bar": "baz",
"foobar": "barfoo"
}
equivalent_urlencoded = b"foo=True&bar=baz&foobar=barfoo"
equivalent_json = b'{"foo":true,"bar":"baz","foobar":"barfoo"}'
# We really don't need to hook much up
def create_urlencoded_request():
request = Request(bytes("/", "utf8"), CIMultiDict([
("content-type", "application/x-www-form-urlencoded")
]), None, "POST", None)
request.body = equivalent_urlencoded
return request
def create_garbage_request(ambiguous=False):
headers = CIMultiDict()
if not ambiguous:
headers["content-type"] = "application/octet-stream"
request = Request(bytes("/", "utf8"), headers, None, "POST", None)
request.body = equivalent_urlencoded
return request
def create_json_request(ambiguous=False):
headers = CIMultiDict()
if not ambiguous:
headers["content-type"] = "application/json"
request = Request(bytes("/", "utf8"), headers, None, "POST", None)
request.body = equivalent_json
return request
# Test the testing environment essentially
def test_validation():
validate_body(equivalent_dict)
with pytest.raises(BadInput) as e_info:
validate_body({"foo":False, "bar":"baz", "foobar":"barfoo"})
def test_formdata_parsing():
request = create_urlencoded_request()
bodyparser(request)
validate_body(request.body)
def test_json_parsing():
request = create_json_request()
bodyparser(request)
validate_body(request.body)
def test_json_ambiguous_parsing():
request = create_json_request(True)
bodyparser(request)
validate_body(request.body)
def test_invalid_ambiguous_parsing():
request = create_garbage_request(True)
bodyparser(request)
assert isinstance(request.body, dict) == False
def test_invalid_parsing():
request = create_garbage_request(False)
bodyparser(request)
assert isinstance(request.body, dict) == False
def validate_body(doc):
schema.validate(doc, {
"foo": {"type": "boolean", "required": True, "coerce": bool, "allowed": [True]},
"bar": {"type": "string", "required": True, "allowed": ["baz"]},
"foobar": {"type": "string", "required": True}
})