Commit 77a72f6a authored by Luna's avatar Luna 😻

manage.cmd: add users and tests subparsers

 - litecord.auth: add hash_data, check_username_usage
 - litecord.auth: add create_user
 - tests: add credentials.py
parent 8d64be15
import base64
import binascii
from random import randint
import bcrypt
from asyncpg import UniqueViolationError
from itsdangerous import Signer, BadSignature
from logbook import Logger
from quart import request, current_app as app
from .errors import Forbidden, Unauthorized
from litecord.errors import Forbidden, Unauthorized, BadRequest
from litecord.snowflake import get_snowflake
log = Logger(__name__)
......@@ -75,3 +79,59 @@ async def token_check():
user_id = await raw_token_check(token)
request.user_id = user_id
return user_id
async def hash_data(data: str, loop=None) -> str:
"""Hash information with bcrypt."""
loop = loop or app.loop
buf = data.encode()
hashed = await loop.run_in_executor(
None, bcrypt.hashpw, buf, bcrypt.gensalt(14)
)
return hashed.decode()
async def check_username_usage(username: str, db=None):
"""Raise an error if too many people are with the same username."""
db = db or app.db
same_username = await db.fetchval("""
SELECT COUNT(*)
FROM users
WHERE username = $1
""", username)
if same_username > 8000:
raise BadRequest('Too many people.', {
'username': 'Too many people used the same username. '
'Please choose another'
})
async def create_user(username: str, email: str, password: str,
db=None, loop=None):
"""Create a single user."""
db = db or app.db
loop = loop or app.loop
new_id = get_snowflake()
new_discrim = randint(1, 9999)
new_discrim = '%04d' % new_discrim
pwd_hash = await hash_data(password, loop)
await check_username_usage(username, db)
try:
await db.execute("""
INSERT INTO users (id, email, username,
discriminator, password_hash)
VALUES ($1, $2, $3, $4, $5)
""", new_id, email, username, new_discrim, pwd_hash)
except UniqueViolationError:
raise BadRequest('Email already used.')
return new_id, pwd_hash
......@@ -8,23 +8,12 @@ from quart import Blueprint, jsonify, request, current_app as app
from litecord.snowflake import get_snowflake
from litecord.errors import BadRequest
from litecord.auth import token_check
from litecord.auth import token_check, create_user
bp = Blueprint('auth', __name__)
async def hash_data(data: str) -> str:
"""Hash information with bcrypt."""
buf = data.encode()
hashed = await app.loop.run_in_executor(
None, bcrypt.hashpw, buf, bcrypt.gensalt(14)
)
return hashed.decode()
async def check_password(pwd_hash: str, given_password: str) -> bool:
"""Check if a given password matches the given hash."""
pwd_encoded = pwd_hash.encode()
......@@ -43,43 +32,15 @@ def make_token(user_id, user_pwd_hash) -> str:
return signer.sign(user_id).decode()
async def check_username_usage(username: str):
"""Raise an error if too many people are with the same username."""
same_username = await app.db.fetchval("""
SELECT COUNT(*)
FROM users
WHERE username = $1
""", username)
if same_username > 8000:
raise BadRequest('Too many people.', {
'username': 'Too many people used the same username. '
'Please choose another'
})
@bp.route('/register', methods=['POST'])
async def register():
"""Register a single user."""
j = await request.get_json()
email, password, username = j['email'], j['password'], j['username']
new_id = get_snowflake()
new_discrim = random.randint(1, 9999)
new_discrim = '%04d' % new_discrim
pwd_hash = await hash_data(password)
await check_username_usage(username)
try:
await app.db.execute("""
INSERT INTO users (id, email, username,
discriminator, password_hash)
VALUES ($1, $2, $3, $4, $5)
""", new_id, email, username, new_discrim, pwd_hash)
except asyncpg.UniqueViolationError:
raise BadRequest('Email already used.')
new_id, pwd_hash = await create_user(
username, email, password, app.db
)
return jsonify({
'token': make_token(new_id, pwd_hash)
......
......@@ -9,7 +9,8 @@ from ..schemas import validate, USER_SETTINGS, \
USER_UPDATE, GUILD_SETTINGS
from .guilds import guild_check
from .auth import hash_data, check_password, check_username_usage
from .auth import check_password
from litecord.auth import hash_data, check_username_usage
bp = Blueprint('user', __name__)
......
from tests.credentials import CREDS
from litecord.blueprints.auth import create_user
from manage.cmd.users import set_user_staff
async def setup_tests(ctx, _args):
"""Setup users for the testing environment."""
for name, creds in CREDS.items():
uid, _ = await create_user(
creds['username'],
creds['email'],
creds['password'],
ctx.db,
ctx.loop
)
print(f'created {name} user: {uid}')
if name == 'admin':
await set_user_staff(uid, ctx)
print('OK')
def setup(subparser):
setup_test_parser = subparser.add_parser(
'setup_tests',
help='Create test users',
)
setup_test_parser.set_defaults(func=setup_tests)
from litecord.blueprints.auth import create_user
from litecord.enums import UserFlags
async def find_user(username, discrim, ctx):
return await ctx.db.fetchval("""
SELECT id
FROM users
WHERE username = $1 AND discriminator = $2
""", username, discrim)
async def set_user_staff(user_id, ctx):
"""Give a single user staff status."""
old_flags = await ctx.db.fetchval("""
SELECT flags
FROM users
WHERE id = $1
""", user_id)
new_flags = old_flags | UserFlags.staff
await ctx.db.execute("""
UPDATE users
SET flags=$1
WHERE id = $2
""", new_flags, user_id)
async def adduser(ctx, args):
"""Create a single user."""
uid, _ = await create_user(args.username, args.email,
args.password, ctx.db, ctx.loop)
print('created!')
print(f'\tuid: {uid}')
async def make_staff(ctx, args):
"""Give a single user the staff flag.
This will grant them access to the Admin API.
The flag changes will only apply after a
server restart.
"""
uid = await find_user(args.username, args.discrim, ctx)
if not uid:
return print('user not found')
await set_user_staff(uid, ctx)
print('OK: set staff')
def setup(subparser):
setup_test_parser = subparser.add_parser(
'adduser',
help='create a user',
)
setup_test_parser.add_argument(
'username', help='username of the user')
setup_test_parser.add_argument(
'email', help='email of the user')
setup_test_parser.add_argument(
'password', help='password of the user')
setup_test_parser.set_defaults(func=adduser)
staff_parser = subparser.add_parser(
'make_staff',
help='make a user staff',
description=make_staff.__doc__
)
staff_parser.add_argument(
'username'
)
staff_parser.add_argument(
'discrim', help='the discriminator of the user'
)
staff_parser.set_defaults(func=make_staff)
......@@ -7,6 +7,7 @@ from logbook import Logger
from run import init_app_managers, init_app_db
from manage.cmd.migration import migration
from manage.cmd import users, tests
log = Logger(__name__)
......@@ -29,6 +30,8 @@ def init_parser():
subparser = parser.add_subparsers(help='operations')
migration(subparser)
users.setup(subparser)
tests.setup(subparser)
return parser
......
"""
dummy credentials for litecord tests
"""
CREDS = {
'normal': {
'username': 'testygirl',
'email': 'girls@girls.com',
# 10 char password should work
'password': 'girls,,,,,'
},
'admin': {
'username': 'big_girl',
'email': 'big_girl@energy.com',
'password': 'big_girl_dot_com',
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment