...
 
Commits (42)
......@@ -2,6 +2,7 @@ image: $DOCKER_REGISTRY/pdsl/rdiffweb-build:$CI_PIPELINE_IID
variables:
PIP_CACHE_DIR: "$CI_PROJECT_DIR/.cache/pip"
PIP_INDEX_URL: https://nexus.patrikdufresne.com/repository/pypi-proxy/simple/
stages:
- prebuild
......@@ -33,6 +34,36 @@ prebuild-docker-image:
- nosetests-*.xml
expire_in: 1 day
py27-jinja26:
<<: *tox
py27-jinja27:
<<: *tox
py27-jinja28:
<<: *tox
py27-jinja29:
<<: *tox
py27-jinja210:
<<: *tox
py3-jinja26:
<<: *tox
py3-jinja27:
<<: *tox
py3-jinja28:
<<: *tox
py3-jinja29:
<<: *tox
py3-jinja210:
<<: *tox
py27-cherrypy35:
<<: *tox
......@@ -185,8 +216,9 @@ rdiffweb_deploy_demo:
- mkdir -p ~/.ssh
- chmod 700 ~/.ssh
script:
- export RDIFFWEB_VERSION=$(python ./setup.py --version)
- python ./setup.py --version
- export RDIFFWEB_VERSION=$(python ./setup.py --version)
- echo RDIFFWEB_VERSION=$RDIFFWEB_VERSION
- git clone "http://${GITLAB_USR}:${GITLAB_PWD}@git.patrikdufresne.com/pdsl/ansible-config.git"
- cd ansible-config
- ansible-playbook rdiffweb.yml -i pdsl --extra-vars "ansible_user=root rdiffweb_version=$RDIFFWEB_VERSION rdiffweb_default_repositories=true" --limit arbuc
......@@ -75,6 +75,28 @@ Professional support for Rdiffweb is available by contacting [Patrik Dufresne Se
# Changelog
## 1.1.0 (2019-10-31)
This release focus on improving the admin area and building the fundation for repository access control list (ACL).
* Update documentation from PDSL web site
* Improve the navigation bar layout
* Update the login page headline
* Update jinja2 version to allow 2.10.x
* Show server log in admin area
* Reduce code smell
* Add System information in admin area
* Validate credential using local database before LDAP
* Reffactoring templates macros
* Enhance user's view search bar
* Change repository URL to username/repopath
* Add System information in admin area
* Improve testcases
* Clean-up obsolete code
* Fix issue with captital case encoding name
* Fix compilation of less files
* Fix google font import
## 1.0.3 (2019-10-04)
* Removing the auto update repos
......
......@@ -146,4 +146,24 @@ This is out-of-scope. The following is only provided as a suggestion and is in
no way a complete reference.
See [/extras/nginx](../extras/nginx) folder for example of nginx configuration
to be used with rdiffweb.
\ No newline at end of file
to be used with rdiffweb.
## Other settings
| Parameter | Description | Required | Example |
| --- | --- | --- | --- |
| ServerHost | Define the IP address to listen to. Use 0.0.0.0 to listen on all interfaces. | No | 127.0.0.1 |
| ServerPort | Define the host to listen to. Default to 8080 | No | 80 |
| LogLevel | Define the log level. ERROR, WARN, INFO, DEBUG | No | DEBUG |
| Environment | Define the type of environment: development, production. This is used to limit the information shown to the user when an error occur. | No | production |
| HeaderName | Define the application name displayed in the title bar and header menu. | No | My Backup |
| DefaultTheme | Define the default theme. Either: default or orange | No | orange |
| WelcomeMsg | Replace the headling displayed in the login page | No | - |
| LogFile | Define the location of the log file | No | /var/log/rdiffweb.log |
| LogAccessFile | Define the location of the access log file | No | /var/log/rdiffweb-access.log |
| RemoveOlderTime | Time when to execute the remove older task | No | 22:00 |
| SQLiteDBFile | Location of the SQLite database | No | /etc/rdiffweb/rdw.db |
| AddMissingUser | True to create users from LDAP when the credential are valid. | No | True |
| AdminUser | Define the name of the default admin user to be created | No | admin |
| FavIcon | Define the FavIcon to be displayed in the browser title | No | /etc/rdiffweb/my-fav.ico |
| TempDir | Define an alternate temp directory to be used when restoring files. | No | /retore/ |
......@@ -61,13 +61,6 @@ class Controller(object):
def app(self):
return cherrypy.request.app
@property
def currentuser(self):
"""
Get the current user.
"""
return cherrypy.serving.request.login
def _compile_template(self, template_name, **kwargs):
"""
Used to generate a standard HTML page using the given template.
......@@ -78,11 +71,11 @@ class Controller(object):
parms = {
"lang": loc.language,
}
if self.currentuser:
if self.app.currentuser:
parms.update({
"is_login": False,
'username': self.currentuser.username,
'is_admin': self.currentuser.is_admin,
'username': self.app.currentuser.username,
'is_admin': self.app.currentuser.is_admin,
})
# Append custom branding
......
#!/usr/bin/python
# -*- coding: utf-8 -*-
# rdiffweb, A web interface to rdiff-backup repositories
# Copyright (C) 2019 rdiffweb contributors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import cherrypy
'''
Created on Oct. 21, 2019
@author: Patrik Dufresne
'''
def is_admin():
# Authentication may have remove the default handle to let the user login.
if cherrypy.serving.request.handler is None:
return True
# Otherwise, validate the permissions.
if not cherrypy.serving.request.login or not cherrypy.serving.request.login.is_admin:
raise cherrypy.HTTPError("403 Forbidden")
# Make sure it's running after authentication (priority = 71)
cherrypy.tools.is_admin = cherrypy.Tool('before_handler', is_admin, priority = 72)
This diff is collapsed.
......@@ -42,22 +42,14 @@ class BrowsePage(Controller):
@cherrypy.expose
def default(self, path=b"", restore="", limit='10'):
validate_isinstance(path, bytes)
validate_isinstance(restore, str)
limit = validate_int(limit)
restore = bool(restore)
logger.debug("browsing [%r]", path)
# Check user access to the given repo & path
(repo_obj, path_obj) = self.app.currentuser.get_repo_path(path)
# Build the parameters
parms = self._get_parms_for_page(repo_obj, path_obj, restore, limit)
return self._compile_template("browse.html", **parms)
def _get_parms_for_page(self, repo_obj, path_obj, restore, limit):
# Build "parent directories" links
# TODO This Should to me elsewhere. It contains logic related to librdiff encoding.
parents = []
......@@ -84,12 +76,12 @@ class BrowsePage(Controller):
# Get list of actual directory entries
dir_entries = path_obj.dir_entries[::-1]
return {"limit": limit,
"repo_name": repo_obj.display_name,
"repo_path": repo_obj.path,
"path": path_obj.path,
"dir_entries": dir_entries,
"isdir": path_obj.isdir,
"parents": parents,
"restore_dates": restore_dates,
"warning": warning}
parms = {
"repo" : repo_obj,
"path" : path_obj,
"limit": limit,
"dir_entries": dir_entries,
"parents": parents,
"restore_dates": restore_dates,
"warning": warning}
return self._compile_template("browse.html", **parms)
......@@ -37,18 +37,7 @@ _logger = logging.getLogger(__name__)
@poppath('graph')
class GraphsPage(Controller):
def _data(self, path, **kwargs):
assert isinstance(path, bytes)
_logger.debug("repo stats [%r]", path)
# Check user permissions
try:
repo_obj = self.app.currentuser.get_repo(path)
except librdiff.FileError as e:
_logger.exception("invalid user path [%r]", path)
return self._compile_error_template(str(e))
def _data(self, repo_obj, **kwargs):
attrs = [
'starttime', 'endtime', 'elapsedtime', 'sourcefiles', 'sourcefilesize',
'mirrorfiles', 'mirrorfilesize', 'newfiles', 'newfilesize', 'deletedfiles',
......@@ -73,42 +62,31 @@ class GraphsPage(Controller):
return func()
def _page(self, path, graph, **kwargs):
def _page(self, repo_obj, graph, **kwargs):
"""
Generic method to show graphs.
"""
_logger.debug("repo graphs [%r][%r]", graph, path)
# Check user permissions
try:
repo_obj = self.app.currentuser.get_repo(path)
except librdiff.FileError as e:
_logger.exception("invalid user path [%r]", path)
return self._compile_error_template(str(e))
# Check if any action to process.
params = {
'repo_name': repo_obj.display_name,
'repo_path': repo_obj.path,
'repo': repo_obj,
'graphs': graph,
}
# Generate page.
return self._compile_template("graphs_%s.html" % graph, **params)
@cherrypy.expose
def index(self, graph, path, **kwargs):
def default(self, graph, path, **kwargs):
"""
Called to show every graphs
"""
validate_isinstance(path, bytes)
validate_isinstance(graph, bytes)
graph = graph.decode('ascii', 'replace')
repo_obj = self.app.currentuser.get_repo(path)
# check if data should be shown.
if graph == 'data':
return self._data(path, **kwargs)
return self._data(repo_obj, **kwargs)
elif graph in ['activities', 'errors', 'files', 'sizes', 'times']:
return self._page(path, graph, **kwargs)
return self._page(repo_obj, graph, **kwargs)
# Raise error.
raise cherrypy.NotFound()
......@@ -23,7 +23,7 @@ import logging
import cherrypy
from rdiffweb.controller import Controller, validate_isinstance, validate_int
from rdiffweb.controller import Controller, validate_int
from rdiffweb.controller.dispatch import poppath
from rdiffweb.core.i18n import ugettext as _
......@@ -37,7 +37,6 @@ class HistoryPage(Controller):
@cherrypy.expose
def default(self, path=b"", limit='10', **kwargs):
validate_isinstance(path, bytes)
limit = validate_int(limit)
repo_obj = self.app.currentuser.get_repo(path)
......@@ -50,8 +49,7 @@ class HistoryPage(Controller):
parms = {
"limit": limit,
"repo_name": repo_obj.display_name,
"repo_path": repo_obj.path,
"repo": repo_obj,
"history_entries": repo_obj.get_history_entries(numLatestEntries=limit, reverse=True),
"warning": warning,
}
......
......@@ -38,12 +38,7 @@ class LocationsPage(Controller):
def index(self):
# Get page params
params = {
"repos": [{
"path": repo_obj.path,
"name_split": repo_obj.display_name.strip('/').split('/'),
"last_backup_date": repo_obj.last_backup_date,
'status': repo_obj.status,
} for repo_obj in self.app.currentuser.repo_objs],
"repos": self.app.currentuser.repo_objs,
"disk_usage": self.app.currentuser.disk_usage,
}
# Render the page.
......
......@@ -20,11 +20,10 @@ from __future__ import absolute_import
from __future__ import unicode_literals
import logging
from rdiffweb.controller import Controller, validate_isinstance, validate
from rdiffweb.controller import Controller, validate, validate_int
from rdiffweb.controller.dispatch import poppath
from rdiffweb.core.i18n import ugettext as _
from builtins import bytes
import cherrypy
# Define the logger
......@@ -35,79 +34,60 @@ _logger = logging.getLogger(__name__)
class SettingsPage(Controller):
@cherrypy.expose
def index(self, path=b""):
validate_isinstance(path, bytes)
def default(self, path=b"", action=None, **kwargs):
repo_obj = self.app.currentuser.get_repo(path)
if action == 'delete':
self._delete(repo_obj, **kwargs)
if kwargs.get('keepdays'):
return self._remove_older(repo_obj, **kwargs)
elif kwargs.get('new_encoding'):
return self._set_encoding(repo_obj, **kwargs)
elif kwargs.get('maxage'):
return self._set_maxage(repo_obj, **kwargs)
# Get page data.
params = {
'repo_name': repo_obj.display_name,
'repo_path': repo_obj.path,
'current_encoding': repo_obj.encoding,
'repo': repo_obj,
'keepdays': repo_obj.keepdays,
}
# Generate page.
return self._compile_template("settings.html", **params)
@poppath()
class SetEncodingPage(Controller):
@cherrypy.expose()
def index(self, path=b'', new_encoding=None):
def _delete(self, repo_obj, confirm=None, redirect='/', **kwargs):
"""
Delete the repository.
"""
# Validate the name
validate(confirm)
if confirm != repo_obj.display_name:
_logger.debug("do not delete repo, bad confirmation %r != %r", confirm, repo_obj.display_name)
raise cherrypy.HTTPError(400)
# Refresh repository list
repo_obj.delete()
raise cherrypy.HTTPRedirect(redirect)
def _set_encoding(self, repo_obj, new_encoding=None, **kwargs):
"""
Update repository encoding via Ajax.
"""
validate_isinstance(path, bytes)
validate(new_encoding)
repo_obj = self.app.currentuser.get_repo(path)
try:
repo_obj.encoding = new_encoding
except ValueError:
raise cherrypy.HTTPError(400, _("invalid encoding value"))
return _("Updated")
@poppath()
class RemoveOlderPage(Controller):
@cherrypy.expose()
def index(self, path=b"", keepdays=None):
validate_isinstance(path, bytes)
validate(keepdays)
# Get repository object from user database.
r = self.app.currentuser.get_repo(path)
# Update the database.
try:
r.keepdays = keepdays
except ValueError:
_logger.warning("invalid keepdays value %r", keepdays)
raise cherrypy.HTTPError(400, _("Invalid value"))
return _("Updated")
@poppath()
class DeleteRepoPage(Controller):
@cherrypy.expose
def index(self, path=b"", **kwargs):
def _set_maxage(self, repo_obj, maxage=None, **kwargs):
"""
Delete the repository.
Update repository maxage via Ajax.
"""
validate_isinstance(path, bytes)
# Check user permissions
repo_obj = self.app.currentuser.get_repo(path)
# Validate the name
confirm_name = kwargs.get('confirm_name', None)
if confirm_name != repo_obj.display_name:
_logger.info("bad confirmation %r != %r", confirm_name, repo_obj.display_name)
raise cherrypy.HTTPError(400)
# Refresh repository list
repo_obj.delete()
validate_int(maxage)
repo_obj.maxage = maxage
return _("Updated")
raise cherrypy.HTTPRedirect("/")
def _remove_older(self, repo_obj, keepdays=None, **kwargs):
validate_int(keepdays)
# Update the database.
repo_obj.keepdays = keepdays
return _("Updated")
......@@ -30,7 +30,6 @@ from rdiffweb.controller.dispatch import poppath
from rdiffweb.core import librdiff
from rdiffweb.core import rdw_helpers
# Define the logger
logger = logging.getLogger(__name__)
......@@ -40,7 +39,6 @@ class StatusPage(Controller):
@cherrypy.expose
def default(self, path=b"", date="", failures=""):
validate_isinstance(path, bytes)
validate_isinstance(date, str)
# Validate date
......@@ -63,7 +61,7 @@ class StatusPage(Controller):
if path:
user_repos = [self.app.currentuser.get_repo(path)]
else:
user_repos = self.app.currentuser.repos
user_repos = self.app.currentuser.repo_objs
failuresOnly = failures != ""
messages = self._getUserMessages(user_repos, not failuresOnly, True, startTime, endTime)
......@@ -80,23 +78,14 @@ class StatusPage(Controller):
earliest_date,
latest_date):
user_root = self.app.currentuser.user_root
repoErrors = []
allBackups = []
for repo in repos:
repo = repo.lstrip("/")
try:
repo_obj = librdiff.RdiffRepo(user_root, repo)
backups = repo_obj.get_history_entries(-1, earliest_date,
latest_date)
allBackups += [{"repo_path": repo_obj.path,
"repo_name": repo_obj.display_name,
"date": backup.date,
"size": backup.size,
"errors": backup.errors} for backup in backups]
except librdiff.FileError:
logger.exception("invalid user path %s" % repo)
for repo_obj in repos:
backups = repo_obj.get_history_entries(-1, earliest_date, latest_date)
allBackups += [{"repo": repo_obj,
"date": backup.date,
"size": backup.size,
"errors": backup.errors} for backup in backups]
allBackups.sort(key=lambda x: x["date"])
failedBackups = [x for x in allBackups if x["errors"]]
......@@ -118,9 +107,7 @@ class StatusPage(Controller):
{"is_success": False,
"date": date,
"repoErrors": [],
"backups": [],
"repo_path": job["repo_path"],
"repo_name": job["repo_name"]})
"backups": []})
userMessages.append(job)
# generate success messages (publish date is most recent backup date)
......
......@@ -22,12 +22,9 @@ User can control the notification period.
from __future__ import unicode_literals
import logging
from rdiffweb.controller import Controller
from rdiffweb.core import RdiffError, RdiffWarning
from rdiffweb.core.i18n import ugettext as _
from builtins import str
import cherrypy
from rdiffweb.controller import Controller, validate_int
from rdiffweb.core.i18n import ugettext as _
_logger = logging.getLogger(__name__)
......@@ -44,39 +41,18 @@ class NotificationPref(Controller):
for repo in self.app.currentuser.repo_objs:
# Get value received for the repo.
value = kwargs.get(repo.name, None)
if value is None:
continue
try:
value = int(value)
except:
continue
# Update the maxage
repo.maxage = value
if value:
# Update the maxage
repo.maxage = validate_int(value)
def render_prefs_panel(self, panelid, **kwargs): # @UnusedVariable
def render_prefs_panel(self, panelid, action=None, **kwargs): # @UnusedVariable
# Process the parameters.
params = dict()
action = kwargs.get('action')
if action:
try:
if action == "set_notification_info":
self._handle_set_notification_info(**kwargs)
else:
_logger.info("unknown action: %s", action)
raise cherrypy.NotFound("Unknown action")
except RdiffWarning as e:
params['warning'] = str(e)
except RdiffError as e:
params['error'] = str(e)
except Exception as e:
_logger.warning("unknown error processing action", exc_info=True)
params['error'] = _("Unknown error")
if action == "set_notification_info":
self._handle_set_notification_info(**kwargs)
params.update({
params = {
'email': self.app.currentuser.email,
'repos': [
{'name': r.name, 'maxage': r.maxage}
for r in self.app.currentuser.repo_objs],
})
'repos': self.app.currentuser.repo_objs,
}
return "prefs_notification.html", params
......@@ -52,7 +52,7 @@ class APITest(WebCase):
self.assertEqual(repo.get('last_backup_date'), '2016-02-02T16:30:40-05:00')
self.assertEqual(repo.get('status'), 'ok')
self.assertEqual(repo.get('display_name'), 'testcases')
self.assertEqual(repo.get('encoding'), 'utf_8')
self.assertEqual(repo.get('encoding'), 'utf-8')
self.assertEqual(repo.get('name'), 'testcases')
self.assertEqual(repo.get('maxage'), 0)
......
......@@ -44,7 +44,7 @@ class CheckLinkTest(WebCase):
"""
Crawl all the pages to find broken links.
"""
ignore = ['/restore/testcases/BrokenSymlink.*', '/browse/testcases/BrokenSymlink.*']
ignore = ['/restore/admin/testcases/BrokenSymlink.*', '/browse/admin/testcases/BrokenSymlink.*']
done = set(['#', '/logout/'])
todo = OrderedDict()
todo["/"] = "/"
......
......@@ -90,7 +90,7 @@ class AdminUsersAsAdminTest(AbstractAdminTest):
self.assertNotInBody("/var/backups/")
self.assertInBody("/tmp/")
# Check with filters
self.getPage("/admin/users/?userfilter=admins")
self.getPage("/admin/users/?criteria=admins")
self.assertInBody("test2")
finally:
self._delete_user("test2")
......@@ -114,7 +114,7 @@ class AdminUsersAsAdminTest(AbstractAdminTest):
self.assertNotInBody("/var/backups/")
self.assertInBody("/tmp/")
# Check with filter
self.getPage("/admin/users/?userfilter=admins")
self.getPage("/admin/users/?criteria=admins")
self.assertInBody("Éric")
finally:
self._delete_user("Éric")
......@@ -196,20 +196,20 @@ class AdminUsersAsAdminTest(AbstractAdminTest):
self._edit_user("test4", "[email protected]", "test1", "/var/backups/", False)
self.assertStatus(500)
def test_userfilter(self):
def test_criteria(self):
"""
Check if admin filter is working.
Check if admin criteria is working.
"""
self.getPage("/admin/users/?userfilter=admins")
self.getPage("/admin/users/?criteria=admins")
self.assertNotInBody("test1")
def test_usersearch(self):
def test_search(self):
"""
Check if user search is working.
"""
self.getPage("/admin/users/?usersearch=tes")
self.getPage("/admin/users?search=tes")
self.assertInBody("test1")
self.getPage("/admin/users/?usersearch=coucou")
self.getPage("/admin/users?search=coucou")
self.assertNotInBody("test1")
......@@ -245,11 +245,18 @@ class AdminUsersAsUserTest(AbstractAdminTest):
self._edit_user("test", "[email protected]", "test", "/var/invalid/", False)
self.assertStatus(403)
def test_list(self):
def test_users(self):
"""
Check if listing user is forbidden.
"""
self.getPage("/admin/users/")
self.getPage("/admin/users")
self.assertStatus(403)
def test_repos(self):
"""
Check if listing user is forbidden.
"""
self.getPage("/admin/repos")
self.assertStatus(403)
......@@ -288,6 +295,59 @@ class AdminLogsTest(WebCase):
self.assertStatus(200)
self.assertInBody("rdiffweb.log")
self.assertInBody("Error getting file content")
def test_logs_with_invalid_file(self):
self.app.cfg['logfile'] = './rdiffweb.log'
self.app.cfg['logaccessfile'] = './rdiffweb-access.log'
self.getPage("/admin/logs/invalid")
self.assertStatus(404)
class AdminReposTest(WebCase):
login = True
reset_testcases = True
def test_repos(self):
self.getPage("/admin/repos")
self.assertStatus(200)
def test_repos_with_search(self):
# Search something that exists
self.getPage("/admin/repos?search=test")
self.assertStatus(200)
self.assertInBody(self.REPO)
# Search something that doesn't exists
self.getPage("/admin/repos?search=coucou")
self.assertStatus(200)
self.assertNotInBody(self.REPO)
self.assertInBody("No repository found")
def test_repos_with_criteria(self):
# Search something that exists
self.getPage("/admin/repos?criteria=ok")
self.assertStatus(200)
self.assertInBody(self.REPO)
# Search something that exists
self.getPage("/admin/repos?criteria=failed")
self.assertStatus(200)
self.assertNotInBody(self.REPO)
self.assertInBody("No repository found")
class AdminSysinfoTest(WebCase):
login = True
def test_sysinfo(self):
self.getPage("/admin/sysinfo")
self.assertStatus(200)
self.assertInBody("Operating System Info")
self.assertInBody("Python Info")
if __name__ == "__main__":
......
......@@ -73,19 +73,19 @@ class BrowsePageTest(WebCase):
self.assertInBody("/Fichier%20%40%20%3Croot%3E?date=")
# Répertoire (@vec) {càraçt#èrë} $épêcial
self.assertInBody("Répertoire (@vec) {càraçt#èrë} $épêcial")
self.assertInBody("/R%C3%A9pertoire%20%28%40vec%29%20%7Bc%C3%A0ra%C3%A7t%23%C3%A8r%C3%AB%7D%20%24%C3%A9p%C3%AAcial/")
self.assertInBody("/R%C3%A9pertoire%20%28%40vec%29%20%7Bc%C3%A0ra%C3%A7t%23%C3%A8r%C3%AB%7D%20%24%C3%A9p%C3%AAcial")
# test\test
self.assertInBody("test\\test")
self.assertInBody("/test%5Ctest/")
self.assertInBody("/test%5Ctest")
# <F!chïer> (@vec) {càraçt#èrë} $épêcial
self.assertInBody("&lt;F!chïer&gt; (@vec) {càraçt#èrë} $épêcial")
self.assertInBody("/%3CF%21ch%C3%AFer%3E%20%28%40vec%29%20%7Bc%C3%A0ra%C3%A7t%23%C3%A8r%C3%AB%7D%20%24%C3%A9p%C3%AAcial?date=")
# Répertoire Existant
self.assertInBody("Répertoire Existant")
self.assertInBody("/R%C3%A9pertoire%20Existant/")
self.assertInBody("/R%C3%A9pertoire%20Existant")
# Répertoire Supprimé
self.assertInBody("Répertoire Supprimé")
self.assertInBody("/R%C3%A9pertoire%20Supprim%C3%A9/")
self.assertInBody("/R%C3%A9pertoire%20Supprim%C3%A9")
# Quoted folder
self.assertInBody("Char Z to quote")
self.assertInBody("/Char%20%3B090%20to%20quote")
......@@ -191,6 +191,10 @@ class BrowsePageTest(WebCase):
self._browse("invalid/", "")
self.assertStatus(404)
self.assertInBody("Not Found")
self._browse("admin/invalid/", "")
self.assertStatus(404)
self.assertInBody("Not Found")
def test_invalid_path(self):
"""
......@@ -213,20 +217,47 @@ class BrowsePageTest(WebCase):
"""
# Change the user setting to match single repo.
user = self.app.userdb.get_user(self.USERNAME)
user.user_root = os.path.join(self.app.testcases, self.REPO)
user.user_root = os.path.join(self.app.testcases, 'testcases')
user.repos = ['']
# Check if listing locations is working
self.getPage('/')
self.assertStatus('200 OK')
self.assertInBody(self.REPO)
self.assertInBody('testcases')
# Check if browsing is working.
self.getPage('/browse/')
self.getPage('/browse/admin')
self.assertStatus('200 OK')
self.assertInBody('Files')
# Check sub directory browsing
self.getPage('/browse/Revisions/')
self.getPage('/browse/admin/Revisions/')
self.assertStatus('200 OK')
self.assertInBody('Files')
def test_as_another_user(self):
# Create a nother user with admin right
user_obj = self.app.userdb.add_user('anotheruser', 'password')
user_obj.user_root = self.app.testcases
user_obj.repos = ['testcases']
self.getPage('/browse/admin')
self.assertStatus('404 Not Found')
# Browse admin's repos
self.getPage('/browse/anotheruser')
self.assertStatus('404 Not Found')
self.getPage('/browse/anotheruser/testcases')
self.assertStatus('200 OK')
self.getPage('/browse/anotheruser/testcases/Revisions/')
self.assertStatus('200 OK')
# Remove admin right
admin = self.app.userdb.get_user('admin')
admin.is_admin = 0
# Browse admin's repos
self.getPage('/browse/anotheruser/testcases')
self.assertStatus('403 Forbidden')
self.getPage('/browse/anotheruser/testcases/Revisions/')
self.assertStatus('403 Forbidden')
if __name__ == "__main__":
# import sys;sys.argv = ['', 'Test.testName']
......
......@@ -36,11 +36,28 @@ class SettingsTest(WebCase):
reset_testcases = True
def _stats(self, repo):
return self.getPage("/graphs/data/" + repo + "/")
def test_activities(self):
self.getPage("/graphs/activities/" + self.REPO + "/")
self.assertStatus('200 OK')
def test_errors(self):
self.getPage("/graphs/errors/" + self.REPO + "/")
self.assertStatus('200 OK')
def test_files(self):
self.getPage("/graphs/files/" + self.REPO + "/")
self.assertStatus('200 OK')
def test_stats(self):
self._stats(self.REPO)
def test_sizes(self):
self.getPage("/graphs/sizes/" + self.REPO + "/")
self.assertStatus('200 OK')
def test_times(self):
self.getPage("/graphs/times/" + self.REPO + "/")
self.assertStatus('200 OK')
def test_data(self):
self.getPage("/graphs/data/" + self.REPO + "/")
self.assertStatus('200 OK')
# Check header
expected = b"""date,starttime,endtime,elapsedtime,sourcefiles,sourcefilesize,mirrorfiles,mirrorfilesize,newfiles,newfilesize,deletedfiles,deletedfilesize,changedfiles,changedsourcesize,changedmirrorsize,incrementfiles,incrementfilesize,totaldestinationsizechange,errors
......@@ -69,6 +86,24 @@ class SettingsTest(WebCase):
"""
self.assertEquals(expected, self.body)
def test_as_another_user(self):
# Create a nother user with admin right
user_obj = self.app.userdb.add_user('anotheruser', 'password')
user_obj.user_root = self.app.testcases
user_obj.repos = ['testcases']
self.getPage("/graphs/activities/anotheruser/testcases")
self.assertStatus('200 OK')
self.assertInBody("Activities")
# Remove admin right
admin = self.app.userdb.get_user('admin')
admin.is_admin = 0
# Browse admin's repos
self.getPage("/graphs/activities/anotheruser/testcases")
self.assertStatus('403 Forbidden')
if __name__ == "__main__":
# import sys;sys.argv = ['', 'Test.testName']
......
......@@ -57,6 +57,24 @@ class HistoryPageTest(WebCase):
self._history(self.REPO, 50)
self.assertNotInBody("Show more")
def test_as_another_user(self):
# Create a nother user with admin right
user_obj = self.app.userdb.add_user('anotheruser', 'password')
user_obj.user_root = self.app.testcases
user_obj.repos = ['testcases']
self.getPage("/history/anotheruser/testcases")
self.assertStatus('200 OK')
# Remove admin right
admin = self.app.userdb.get_user('admin')
admin.is_admin = 0
# Browse admin's repos
self.getPage("/history/anotheruser/testcases")
self.assertStatus('403 Forbidden')
if __name__ == "__main__":
# import sys;sys.argv = ['', 'Test.testName']
logging.basicConfig(level=logging.DEBUG)
......
......@@ -38,7 +38,7 @@ class LoginPageTest(WebCase):
"""
self.getPage('/')
self.assertStatus('200 OK')
self.assertInBody('login')
self.assertInBody('Enter your username and password to log in.')
def test_getpage_with_plaintext(self):
"""
......@@ -46,7 +46,7 @@ class LoginPageTest(WebCase):
"""
self.getPage('/', headers=[("Accept", "text/plain")])
self.assertStatus('200 OK')
self.assertInBody('login')
self.assertInBody('Enter your username and password to log in.')
def test_getpage_with_redirect_get(self):
"""
......@@ -119,6 +119,14 @@ class LoginPageTest(WebCase):
self.assertStatus('200 OK')
self.assertInBody('Invalid username or password.')
def test_getpage_admin(self):
"""
Access to admin area without session should redirect to login page.
"""
self.getPage('/admin/')
self.assertStatus('200 OK')
self.assertInBody('Enter your username and password to log in.')
def test_getapi_without_authorization(self):
"""
Check if 401 is return when authorization is not provided.
......
......@@ -36,6 +36,8 @@ class PrefsTest(WebCase):
login = True
reset_app = True
reset_testcases = True
def _set_password(self, current, new_password, confirm):
b = {'action': 'set_password',
......@@ -49,10 +51,6 @@ class PrefsTest(WebCase):
'email': email}
return self.getPage(self.PREFS, method='POST', body=b)
def _update_repos(self):
b = {'action': 'update_repos'}
return self.getPage(self.PREFS, method='POST', body=b)
def test_change_email(self):
self._set_profile_info("[email protected]")
self.assertInBody("Profile updated successfully.")
......@@ -96,8 +94,25 @@ class PrefsTest(WebCase):
self.assertStatus(404)
def test_update_repos(self):
self._update_repos()
user_obj = self.app.userdb.get_user(self.USERNAME)
user_obj.repos = []
self.getPage(self.PREFS, method='POST', body={'action': 'update_repos'})
self.assertStatus(200)
# Check database update.
self.assertInBody("Repositories successfully updated.")
self.assertEqual(2, len(user_obj.repos))
self.assertIn('testcases', user_obj.repos)
self.assertIn('broker-repo', user_obj.repos)
def test_update_notification(self):
self.getPage("/prefs/notification/", method='POST', body={'action':'set_notification_info', 'testcases':'7'})
self.assertStatus(200)
# Check database update
repo_obj = self.app.userdb.get_user(self.USERNAME).get_repo(self.REPO)
self.assertEqual(7, repo_obj.maxage)
if __name__ == "__main__":
......
......@@ -33,7 +33,6 @@ import zipfile
from rdiffweb.controller.page_restore import _content_disposition
from rdiffweb.test import WebCase, AppTestCase
PY3 = sys.version_info[0] == 3
......@@ -330,6 +329,25 @@ class RestoreTest(WebCase):
def test_invalid_date(self):
self._restore(self.REPO, "Revisions/Data/", "1415221a470", True)
self.assertStatus(400)
def test_as_another_user(self):
# Create a nother user with admin right
user_obj = self.app.userdb.add_user('anotheruser', 'password')
user_obj.user_root = self.app.testcases
user_obj.repos = ['testcases']
self._restore("anotheruser/testcases", "Fichier%20%40%20%3Croot%3E/", "1414921853", True)
self.assertStatus('200 OK')
self.assertInBody("Ajout d'info")
# Remove admin right
admin = self.app.userdb.get_user('admin')
admin.is_admin = 0
# Browse admin's repos
self._restore("anotheruser/testcases", "Fichier%20%40%20%3Croot%3E/", "1414921853", True)
self.assertStatus('403 Forbidden')
if __name__ == "__main__":
# import sys;sys.argv = ['', 'Test.testName']
......
......@@ -36,13 +36,37 @@ class SettingsTest(WebCase):
reset_testcases = True
def _settings(self, repo):
self.getPage("/settings/" + repo + "/")
def test_page(self):
self._settings(self.REPO)
self.getPage("/settings/" + self.REPO)
self.assertInBody("Character encoding")
self.assertStatus(200)
def test_as_another_user(self):
# Create a nother user with admin right
user_obj = self.app.userdb.add_user('anotheruser', 'password')
user_obj.user_root = self.app.testcases
user_obj.repos = ['testcases']
self.getPage("/settings/anotheruser/testcases")
self.assertInBody("Character encoding")
self.assertStatus('200 OK')
# Remove admin right
admin = self.app.userdb.get_user('admin')
admin.is_admin = 0
# Browse admin's repos
self.getPage("/settings/anotheruser/testcases")
self.assertStatus('403 Forbidden')
def test_set_maxage(self):
self.getPage("/settings/" + self.REPO + "/", method="POST",
body={'maxage': '4'})
self.assertStatus(200)
# Check database update
repo_obj = self.app.userdb.get_user('admin').get_repo(self.REPO)
self.assertEqual(4, repo_obj.maxage)
if __name__ == "__main__":
# import sys;sys.argv = ['', 'Test.testName']
......
......@@ -39,19 +39,28 @@ class DeleteRepoTest(WebCase):
def _settings(self, repo):
self.getPage("/settings/" + repo + "/")
def _delete(self, repo, confirm_name):
def _delete(self, repo, confirm, **kwargs):
body = {}
if confirm_name is not None:
body.update({'confirm_name': confirm_name})
self.getPage("/delete/" + repo + "/", method="POST",
body=body)
body.update({'action': 'delete'})
if confirm is not None:
body.update({'confirm': confirm})
if kwargs:
body.update(kwargs)
self.getPage("/settings/" + repo + "/", method="POST", body=body)
def test_delete(self):
"""
Check to delete a repo.
"""
self._delete(self.REPO, self.REPO)
self._delete(self.REPO, 'testcases')
self.assertStatus(303)
self.assertEqual([], self.app.userdb.get_user('admin').repos)
def test_delete_with_slash(self):
self.app.userdb.get_user('admin').repos = ['/testcases']
self._delete(self.REPO, 'testcases')
self.assertStatus(303)
self.assertEqual([], self.app.userdb.get_user('admin').repos)
def test_delete_wrong_confirm(self):
"""
......@@ -60,6 +69,7 @@ class DeleteRepoTest(WebCase):
self._delete(self.REPO, 'wrong')
# TODO Make sure the repository is not delete
self.assertStatus(400)
self.assertEqual(['testcases'], self.app.userdb.get_user('admin').repos)
def test_delete_without_confirm(self):
"""
......@@ -68,6 +78,24 @@ class DeleteRepoTest(WebCase):
self._delete(self.REPO, None)
# TODO Make sure the repository is not delete
self.assertStatus(400)
self.assertEqual(['testcases'], self.app.userdb.get_user('admin').repos)
def test_as_another_user(self):
"""
From admin user delete anotehr user repo.
"""
# Create a nother user with admin right
user_obj = self.app.userdb.add_user('anotheruser', 'password')
user_obj.user_root = self.app.testcases
user_obj.repos = ['testcases']
self._delete('anotheruser/testcases', 'testcases', redirect='/admin/repos/')
self.assertStatus(303)
location = self.assertHeader('Location')
self.assertTrue(location.endswith('/admin/repos/'))
# Check database update
self.assertEqual([], user_obj.repos)
if __name__ == "__main__":
......
......@@ -46,8 +46,18 @@ class RemoveOlderTest(WebCase):
self.getPage("/settings/" + repo + "/")
def _remove_older(self, repo, value):
self.getPage("/api/remove-older/" + repo + "/", method="POST",
body={'keepdays': value})
self.getPage("/settings/" + repo + "/", method="POST", body={'keepdays': value})
def test_page_api_set_remove_older(self):
"""
Check if /api/remove-older/ is still working.
"""
self.getPage("/api/remove-older/" + self.REPO + "/", method="POST", body={'keepdays': '4'})
self.assertStatus(200)
# Check results
user = self.app.userdb.get_user(self.USERNAME)
repo = user.get_repo(self.REPO)
self.assertEqual(4, repo.keepdays)
def test_page_set_keepdays(self):
"""
......@@ -92,6 +102,24 @@ class RemoveOlderTest(WebCase):
with self.assertRaises(AssertionError):
RemoveOlder(cherrypy.engine, self.app)._remove_older(user, repo, '30')
def test_as_another_user(self):
# Create a nother user with admin right
user_obj = self.app.userdb.add_user('anotheruser', 'password')
user_obj.user_root = self.app.testcases
user_obj.repos = ['testcases']
self._remove_older('anotheruser/testcases', '1')
self.assertStatus('200 OK')
self.assertEquals(1, user_obj.get_repo('anotheruser/testcases').keepdays)
# Remove admin right
admin = self.app.userdb.get_user('admin')
admin.is_admin = 0
# Browse admin's repos
self._remove_older('anotheruser/testcases', '2')
self.assertStatus('403 Forbidden')
class RemoveOlderTestWithMock(WebCase):
......
......@@ -40,13 +40,24 @@ class SetEncodingTest(WebCase):
self.getPage("/settings/" + repo + "/")
def _set_encoding(self, repo, encoding):
self.getPage("/api/set-encoding/" + repo + "/", method="POST",
self.getPage("/settings/" + repo + "/", method="POST",
body={'new_encoding': encoding})
def test_check_encoding(self):
self._settings(self.REPO)
self.assertInBody("Character encoding")
self.assertInBody('selected value="utf_8"')
self.assertInBody('selected value="utf-8"')
def test_api_set_encoding(self):
"""
Check if /api/set-encoding/ is still working.
"""
self.getPage("/api/set-encoding/" + self.REPO + "/", method="POST", body={'new_encoding': 'cp1252'})
self.assertStatus(200)
# Check results
user = self.app.userdb.get_user(self.USERNAME)
repo = user.get_repo(self.REPO)
self.assertEqual('cp1252', repo.encoding)
def test_set_encoding(self):
"""
......@@ -55,9 +66,22 @@ class SetEncodingTest(WebCase):
self._set_encoding(self.REPO, 'cp1252')
self.assertStatus(200)
self.assertInBody("Updated")
self.assertEquals('cp1252', self.app.userdb.get_user(self.USERNAME).get_repo(self.REPO).encoding)
# Get back encoding.
self._settings(self.REPO)
self.assertInBody('selected value="cp1252"')
def test_set_encoding_capital_case(self):
"""
Check to update the encoding with US-ASCII.
"""
self._set_encoding(self.REPO, 'US-ASCII')
self.assertStatus(200)
self.assertInBody("Updated")
self.assertEquals('ascii', self.app.userdb.get_user(self.USERNAME).get_repo(self.REPO).encoding)
# Get back encoding.
self._settings(self.REPO)
self.assertInBody('selected value="ascii"')
def test_set_encoding_invalid(self):
"""
......@@ -78,6 +102,25 @@ class SetEncodingTest(WebCase):
# Get back encoding.
self._settings(self.REPO)
self.assertInBody('selected value="cp1252"')
self.assertEquals('cp1252', self.app.userdb.get_user(self.USERNAME).get_repo(self.REPO).encoding)
def test_as_another_user(self):
# Create a nother user with admin right
user_obj = self.app.userdb.add_user('anotheruser', 'password')
user_obj.user_root = self.app.testcases
user_obj.repos = ['testcases']
self._set_encoding('anotheruser/testcases', 'cp1252')
self.assertStatus('200 OK')
self.assertEquals('cp1252', user_obj.get_repo('anotheruser/testcases').encoding)
# Remove admin right
admin = self.app.userdb.get_user('admin')
admin.is_admin = 0
# Browse admin's repos
self._set_encoding('anotheruser/testcases', 'utf-8')
self.assertStatus('403 Forbidden')
if __name__ == "__main__":
......
......@@ -24,9 +24,11 @@ Created on Aug 30, 2019
from __future__ import unicode_literals
import logging
from rdiffweb.test import WebCase
import os
import unittest
from rdiffweb.test import WebCase
class StatusTest(WebCase):
......@@ -51,11 +53,19 @@ class StatusTest(WebCase):
self.assertInBody('Show all')
self.assertInBody('Show errors only')
def test_page_failures(self):
def test_page_no_failures(self):
self._status(failures=True)
self.assertStatus(200)
self.assertInBody('There are no recent backups with errors.')
def test_page_with_failures(self):
# Write fake error in testcases repo
with open(os.path.join(self.app.testcases, 'testcases/rdiff-backup-data/error_log.2014-11-01T15:51:29-04:00.data'), 'w') as f:
f.write('This is a fake error message')
self._status(date=1414814400)
self.assertStatus(200)
self.assertInBody('This is a fake error message')
def test_page_date(self):
self._status(date=1454448640)
self.assertStatus(200)
......
......@@ -946,7 +946,7 @@ class RdiffRepo(object):
# Remove access to rdiff-backup-data directory.
if path.startswith(RDIFF_BACKUP_DATA):
raise AccessDeniedError(