Commit 5f55a189 authored by buttle's avatar buttle

Added utils directory

parent b14ce108
"""
“Copyright 2019 La Coordinadora d’Entitats per la Lleialtat Santsenca”
This file is part of GNGforms.
GNGforms is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
from flask import g, flash, request
from flask_babel import gettext
from GNGforms import app
from GNGforms.models import Site
import smtplib, ssl, socket
from threading import Thread
def createSmtpObj():
config=g.site.smtpConfig
try:
if config["encryption"] == "SSL":
server = smtplib.SMTP_SSL(config["host"], port=config["port"], timeout=2)
server.login(config["user"], config["password"])
elif config["encryption"] == "STARTTLS":
server = smtplib.SMTP_SSL(config["host"], port=config["port"], timeout=2)
context = ssl.create_default_context()
server.starttls(context=context)
server.login(config["user"], config["password"])
else:
server = smtplib.SMTP(config["host"], port=config["port"])
if config["user"] and config["password"]:
server.login(config["user"], config["password"])
return server
except socket.error as e:
if g.isAdmin:
flash(str(e), 'error')
return False
def sendMail(email, message):
server = createSmtpObj()
if server:
try:
header='To: ' + email + '\n' + 'From: ' + g.site.smtpConfig["noreplyAddress"] + '\n'
message=header + message
server.sendmail(g.site.smtpConfig["noreplyAddress"], email, message.encode('utf-8'))
return True
except Exception as e:
if g.isAdmin:
flash(str(e) , 'error')
return False
def sendConfirmEmail(user, newEmail=None):
link="%suser/validate-email/%s" % (g.site.host_url, user.token['token'])
message=gettext("Hello %s\n\nPlease confirm your email\n\n%s") % (user.username, link)
message = 'Subject: {}\n\n{}'.format(gettext("GNGforms. Confirm email"), message)
if newEmail:
return sendMail(newEmail, message)
else:
return sendMail(user.email, message)
def sendInvite(invite):
site=Site.find(hostname=invite.hostname)
link="%suser/new/%s" % (site.host_url, invite.token['token'])
message="%s\n\n%s" % (invite.message, link)
message='Subject: {}\n\n{}'.format(gettext("GNGforms. Invitation to %s" % site.hostname), message)
return sendMail(invite.email, message)
def sendRecoverPassword(user):
link="%ssite/recover-password/%s" % (g.site.host_url, user.token['token'])
message=gettext("Please use this link to recover your password")
message="%s\n\n%s" % (message, link)
message='Subject: {}\n\n{}'.format(gettext("GNGforms. Recover password"), message)
return sendMail(user.email, message)
def sendNewFormEntryNotification(emails, entry, slug):
message=gettext("New form entry in %s at %s\n" % (slug, g.site.hostname))
for data in entry:
message="%s\n%s: %s" % (message, data[0], data[1])
message="%s\n" % message
message='Subject: {}\n\n{}'.format(gettext("GNGforms. New form entry"), message)
for email in emails:
sendMail(email, message)
def sendExpiredFormNotification(editorEmails, form):
message=gettext("The form '%s' has expired at %s" % (form.slug, g.site.hostname))
message='Subject: {}\n\n{}'.format(gettext("GNGforms. A form has expired"), message)
for email in editorEmails:
sendMail(email, message)
def sendNewFormNotification(adminEmails, form):
message=gettext("New form '%s' created at %s" % (form.slug, g.site.hostname))
message='Subject: {}\n\n{}'.format(gettext("GNGforms. New form notification"), message)
for email in adminEmails:
sendMail(email, message)
def sendNewUserNotification(adminEmails, username):
message=gettext("New user '%s' created at %s" % (username, g.site.hostname))
message='Subject: {}\n\n{}'.format(gettext("GNGforms. New user notification"), message)
for email in adminEmails:
sendMail(email, message)
def sendTestEmail(email):
message=gettext("Congratulations!")
message='Subject: {}\n\n{}'.format(gettext("SMTP test"), message)
return sendMail(email, message)
"""
“Copyright 2019 La Coordinadora d’Entitats per la Lleialtat Santsenca”
This file is part of GNGforms.
GNGforms is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
from GNGforms.models import *
#from pprint import pprint as pp
"""
Mirgations previous to schemaVersion 13 used the flask_pymongo library.
schemaVersion >= 13 uses flask_mongoengine.
"""
def migrateMongoSchema(schemaVersion):
#if schemaVersion == 13:
# ....
# schemaVersion = 14
return schemaVersion
"""
“Copyright 2019 La Coordinadora d’Entitats per la Lleialtat Santsenca”
This file is part of GNGforms.
GNGforms is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
from flask import session
import json
def ensureSessionFormKeys():
if not 'slug' in session:
session['slug'] = ""
if not 'formFieldIndex' in session:
session['formFieldIndex'] = []
if not 'formStructure' in session:
session['formStructure'] = json.dumps([])
if not 'afterSubmitTextMD' in session:
session['afterSubmitTextMD'] = ''
def populateSessionFormData(form):
#session['form_id'] = str(form._id)
session['slug'] = form.slug
session['formFieldIndex'] = form.fieldIndex
session['formStructure'] = form.structure
session['afterSubmitTextMD'] = form.afterSubmitText['markdown']
def clearSessionFormData():
session['slug'] = ""
session['form_id']=None
session['formFieldIndex'] = []
session['formStructure'] = json.dumps([])
session['afterSubmitTextMD'] = ''
"""
“Copyright 2019 La Coordinadora d’Entitats per la Lleialtat Santsenca”
This file is part of GNGforms.
GNGforms is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
from GNGforms import app, babel
from GNGforms import models
from flask import Response, redirect, request, url_for
from flask import g, flash
from flask_babel import gettext
from unidecode import unidecode
import json, time, re, string, random, datetime, csv
from passlib.hash import pbkdf2_sha256
from password_strength import PasswordPolicy
import markdown, html.parser
from pprint import pformat
def get_obj_values_as_dict(obj):
values = {}
fields = type(obj).__dict__['_fields']
for key, _ in fields.items():
value = getattr(obj, key, None)
values[key] = value
return values
def make_url_for(function, **kwargs):
kwargs["_external"]=True
kwargs["_scheme"]=g.site.scheme
return url_for(function, **kwargs)
@babel.localeselector
def get_locale():
if g.current_user:
return g.current_user.language
else:
return request.accept_languages.best_match(app.config['LANGUAGES'].keys())
"""
Used to respond to Ajax requests
"""
def JsonResponse(json_response="1", status_code=200):
response = Response(json_response, 'application/json; charset=utf-8')
response.headers.add('content-length', len(json_response))
response.status_code=status_code
return response
""" ######## Sanitizers ######## """
def sanitizeString(string):
string = unidecode(string)
string = string.replace(" ", "")
return re.sub('[^A-Za-z0-9\-]', '', string)
def sanitizeSlug(slug):
slug = slug.lower()
slug = slug.replace(" ", "-")
return sanitizeString(slug)
def sanitizeHexidecimal(string):
return re.sub('[^A-Fa-f0-9]', '', string)
def isSaneSlug(slug):
if slug and slug == sanitizeSlug(slug):
return True
return False
def sanitizeUsername(username):
return sanitizeString(username)
def isSaneUsername(username):
if username and username == sanitizeUsername(username):
return True
return False
def sanitizeTokenString(string):
return re.sub('[^a-z0-9]', '', string)
def stripHTMLTags(text):
h = html.parser.HTMLParser()
text=h.unescape(text)
return re.sub('<[^<]+?>', '', text)
# remember to remove this from the code because now tags are stripped from Labels at view/preview
def stripHTMLTagsForLabel(text):
h = html.parser.HTMLParser()
text=h.unescape(text)
text = text.replace("<br>","-") # formbuilder generates "<br>"s
return re.sub('<[^<]+?>', '', text)
def escapeMarkdown(MDtext):
return re.sub(r'<[^>]*?>', '', MDtext)
def markdown2HTML(MDtext):
MDtext=escapeMarkdown(MDtext)
return markdown.markdown(MDtext, extensions=['nl2br'])
""" ######## Password ######## """
pwd_policy = PasswordPolicy.from_names(
length=8, # min length: 8
uppercase=0, # need min. 2 uppercase letters
numbers=0, # need min. 2 digits
special=0, # need min. 2 special characters
nonletters=1, # need min. 2 non-letter characters (digits, specials, anything)
)
def hashPassword(password):
return pbkdf2_sha256.hash(password, rounds=200000, salt_size=16)
def verifyPassword(password, hash):
return pbkdf2_sha256.verify(password, hash)
""" ######## fieldIndex helpers ######## """
def getFieldByNameInIndex(index, name):
for field in index:
if 'name' in field and field['name'] == name:
return field
return None
""" ######## Tokens ######## """
def getRandomString(length=32):
return ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(length))
"""
Create a unique token.
persistentClass may be a User class, or an Invite class, ..
"""
def createToken(persistentClass, **kwargs):
tokenString = getRandomString(length=48)
while persistentClass.find(token=tokenString):
tokenString = getRandomString(length=48)
result={'token': tokenString, 'created': datetime.datetime.now()}
return {**result, **kwargs}
def isValidToken(tokenData):
token_age = datetime.datetime.now() - tokenData['created']
if token_age.total_seconds() > app.config['TOKEN_EXPIRATION']:
return False
return True
""" ######## Dates ######## """
def isValidExpireDate(date):
try:
datetime.datetime.strptime(date, "%Y-%m-%d %H:%M:%S")
return True
except:
return False
def isFutureDate(date):
now=time.time()
future=int(datetime.datetime.strptime(date, "%Y-%m-%d %H:%M:%S").strftime("%s"))
return True if future > now else False
""" ######## Others ######## """
def writeCSV(form):
fieldnames=[]
fieldheaders={}
for field in form.fieldIndex:
fieldnames.append(field['name'])
fieldheaders[field['name']]=field['label']
csv_name='%s/%s.csv' % (app.config['TMP_DIR'], form.slug)
with open(csv_name, mode='w') as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=fieldnames, extrasaction='ignore')
writer.writerow(fieldheaders)
for entry in form.entries:
writer.writerow(entry)
return csv_name
"""
“Copyright 2020 La Coordinadora d’Entitats per la Lleialtat Santsenca”
This file is part of GNGforms.
GNGforms is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
from GNGforms.utils.utils import *
from functools import wraps
def login_required(f):
@wraps(f)
def wrap(*args, **kwargs):
if g.current_user:
return f(*args, **kwargs)
else:
return redirect(url_for('main_bp.index'))
return wrap
def enabled_user_required(f):
@wraps(f)
def wrap(*args, **kwargs):
if g.current_user and g.current_user.enabled:
return f(*args, **kwargs)
else:
return redirect(url_for('main_bp.index'))
return wrap
def admin_required(f):
@wraps(f)
def wrap(*args, **kwargs):
if g.isAdmin:
return f(*args, **kwargs)
else:
return redirect(url_for('main_bp.index'))
return wrap
def rootuser_required(f):
@wraps(f)
def wrap(*args, **kwargs):
if g.isRootUser:
return f(*args, **kwargs)
else:
return redirect(url_for('main_bp.index'))
return wrap
def anon_required(f):
@wraps(f)
def wrap(*args, **kwargs):
if g.current_user:
return redirect(url_for('main_bp.index'))
else:
return f(*args, **kwargs)
return wrap
def queriedForm_editor_required(f):
@wraps(f)
def wrap(*args, **kwargs):
queriedForm=models.Form.find(id=kwargs['id'], editor_id=str(g.current_user.id))
if not queriedForm:
flash(gettext("Form is not available. 404"), 'warning')
return redirect(make_url_for('forms_bp.my_forms'))
kwargs['queriedForm']=queriedForm
return f(*args, **kwargs)
return wrap
def sanitized_slug_required(f):
@wraps(f)
def wrap(*args, **kwargs):
if not 'slug' in kwargs:
if g.current_user:
flash("No slug found!", 'error')
return render_template('page-not-found.html'), 404
if kwargs['slug'] in app.config['RESERVED_SLUGS']:
if g.current_user:
flash("Reserved slug!", 'warning')
return render_template('page-not-found.html'), 404
if kwargs['slug'] != sanitizeSlug(kwargs['slug']):
if g.current_user:
flash("That's a nasty slug!", 'warning')
return render_template('page-not-found.html'), 404
return f(*args, **kwargs)
return wrap
def sanitized_key_required(f):
@wraps(f)
def wrap(*args, **kwargs):
if not ('key' in kwargs and kwargs['key'] == sanitizeString(kwargs['key'])):
if g.current_user:
flash(gettext("That's a nasty key!"), 'warning')
return render_template('page-not-found.html'), 404
else:
return f(*args, **kwargs)
return wrap
def sanitized_token(f):
@wraps(f)
def wrap(*args, **kwargs):
if 'token' in kwargs and kwargs['token'] != sanitizeTokenString(kwargs['token']):
if g.current_user:
flash(gettext("That's a nasty token!"), 'warning')
return render_template('page_not_found.html'), 404
else:
return f(*args, **kwargs)
return wrap
from flask_wtf import FlaskForm
from wtforms import StringField, TextAreaField, IntegerField, SelectField, PasswordField, BooleanField, RadioField
from wtforms.validators import ValidationError, DataRequired, Email, EqualTo
from flask import g
from flask_babel import lazy_gettext as _
from GNGforms import app
from GNGforms.models import User, Installation
from GNGforms.utils.utils import sanitizeUsername, pwd_policy
class NewUser(FlaskForm):
username = StringField(_("Username"), validators=[DataRequired()])
email = StringField(_("Email"), validators=[DataRequired(), Email()])
password = PasswordField(_("Password"), validators=[DataRequired()])
password2 = PasswordField(_("Password again"), validators=[DataRequired(), EqualTo('password')])
def validate_username(self, username):
if username.data != sanitizeUsername(username.data):
raise ValidationError(_("Username is not valid"))
return False
if username.data in app.config['RESERVED_USERNAMES']:
raise ValidationError(_("Please use a different username"))
return False
if User.find(username=username.data):
raise ValidationError(_("Please use a different username"))
def validate_email(self, email):
if User.find(email=email.data):
raise ValidationError(_("Please use a different email address"))
elif email.data in app.config['ROOT_USERS'] and Installation.isUser(email.data):
# a root_user email can only be used once across all sites.
raise ValidationError(_("Please use a different email address"))
def validate_password(self, password):
if pwd_policy.test(password.data):
raise ValidationError(_("Your password is weak"))
class Login(FlaskForm):
username = StringField(_("Username"), validators=[DataRequired()])
password = PasswordField(_("Password"), validators=[DataRequired()])
def validate_username(self, username):
if username.data != sanitizeUsername(username.data):
return False
class GetEmail(FlaskForm):
email = StringField(_("Email address"), validators=[DataRequired(), Email()])
class ChangeEmail(FlaskForm):
email = StringField(_("New email address"), validators=[DataRequired(), Email()])
def validate_email(self, email):
if User.find(email=email.data):
raise ValidationError(_("Please use a different email address"))
elif email.data in app.config['ROOT_USERS'] and Installation.isUser(email.data):
# a root_user email can only be used once across all sites.
raise ValidationError(_("Please use a different email address"))
class ResetPassword(FlaskForm):
password = PasswordField(_("Password"), validators=[DataRequired()])
password2 = PasswordField(_("Password again"), validators=[DataRequired(), EqualTo('password')])
def validate_password(self, password):
if pwd_policy.test(password.data):
raise ValidationError(_("Your password is weak"))
class smtpConfig(FlaskForm):
host = StringField(_("Email server"), validators=[DataRequired()])
port = IntegerField(_("Port"))
encryption = SelectField(_("Encryption"), choices=[ ('None', 'None'),
('SSL', 'SSL'),
('STARTTLS', 'STARTTLS (maybe)')])
user = StringField(_("User"))
password = StringField(_("Password"))
noreplyAddress = StringField(_("Sender address"), validators=[DataRequired(), Email()])
class NewInvite(FlaskForm):
email = StringField(_("New user's email"), validators=[DataRequired(), Email()])
message = TextAreaField(_("Include message"))
admin = BooleanField(_("Make the new user an Admin"))
hostname = StringField(_("hostname"), validators=[DataRequired()])
def validate_email(self, email):
if User.find(email=email.data, hostname=self.hostname.data):
raise ValidationError(_("Please use a different email address"))
elif email.data in app.config['ROOT_USERS'] and Installation.isUser(email.data):
# a root_user email can only be used once across all sites.
raise ValidationError(_("Please use a different email address"))
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment