Commit 9bb84328 authored by Pascal Cadotte Michaud's avatar Pascal Cadotte Michaud Committed by Etienne Lessard

ctidirectories: replace the uri with directory_id

the uri field is a duplicate of the directory uri or the name of a
ldapfilter prepended with a ldapfilter://.

Directories can now be of type ldapfilter so I'm merging the two kind of
cti directories togeter with a foreign key to the directories id.
parent 0a7d4b51
# -*- coding: utf-8 -*-
# Copyright (C) 2012-2015 Avencall
# Copyright (C) 2012-2016 Avencall
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
......@@ -15,6 +15,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>
from sqlalchemy import ForeignKey
from sqlalchemy.schema import Column
from sqlalchemy.types import Integer, Text, String
......@@ -27,9 +28,9 @@ class CtiDirectories(Base):
id = Column(Integer, primary_key=True)
name = Column(String(255))
uri = Column(String(255))
delimiter = Column(String(20))
match_direct = Column(Text, nullable=False)
match_reverse = Column(Text, nullable=False, default='')
description = Column(String(255))
deletable = Column(Integer)
directory_id = Column(Integer, ForeignKey('directories.id', ondelete='CASCADE'))
# -*- coding: utf-8 -*-
# Copyright (C) 2012-2015 Avencall
# Copyright (C) 2012-2016 Avencall
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
......@@ -56,7 +56,6 @@ def get_profile_configuration(session):
Directories.dirtype,
).join(
Directories,
Directories.uri == CtiDirectories.uri
).filter(CtiDirectories.name.in_(sources))
name_to_type = {row.name: row.dirtype for row in rows.all()}
else:
......
# -*- coding: utf-8 -*-
# Copyright (C) 2015 Avencall
# Copyright (C) 2015-2016 Avencall
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
......@@ -33,7 +33,6 @@ def get_config(session):
Directories.dirtype,
).join(
Directories,
Directories.uri == CtiDirectories.uri
).filter(CtiDirectories.name.in_(sources))
name_to_type = {row.name: row.dirtype for row in rows.all()}
else:
......
......@@ -50,29 +50,30 @@ def get_all_sources(session):
def _get_ldap_sources(session):
ldap_cti_directories = session.query(
Directories.ldapfilter_id,
CtiDirectories.name,
CtiDirectories.uri,
CtiDirectories.match_direct,
CtiDirectories.match_reverse,
func.array_agg(CtiDirectoryFields.fieldname).label('fields'),
func.array_agg(CtiDirectoryFields.value).label('values'),
).outerjoin(
).join(
CtiDirectories,
).join(
CtiDirectoryFields,
CtiDirectoryFields.dir_id == CtiDirectories.id
).filter(
CtiDirectories.uri.like('ldapfilter://%%')
Directories.dirtype == 'ldapfilter',
).group_by(
Directories.ldapfilter_id,
CtiDirectories.name,
CtiDirectories.uri,
CtiDirectories.match_direct,
CtiDirectories.match_reverse,
)
source_configs = []
for dir in ldap_cti_directories.all():
_, _, name = dir.uri.partition('ldapfilter://')
try:
ldap_config = ldap_dao.build_ldapinfo_from_ldapfilter(name)
ldap_config = ldap_dao.build_ldapinfo_from_ldapfilter(dir.ldapfilter_id)
except LookupError:
logger.warning('Skipping LDAP source %s', dir.name)
continue
......@@ -98,7 +99,6 @@ def _get_ldap_sources(session):
def _get_nonldap_sources(session):
sources = session.query(
CtiDirectories.name,
CtiDirectories.uri,
Directories.dirtype,
Directories.xivo_username,
Directories.xivo_password,
......@@ -106,6 +106,7 @@ def _get_nonldap_sources(session):
Directories.xivo_custom_ca_path,
Directories.dird_tenant,
Directories.dird_phonebook,
Directories.uri,
CtiDirectories.delimiter,
CtiDirectories.match_direct,
CtiDirectories.match_reverse,
......@@ -113,13 +114,13 @@ def _get_nonldap_sources(session):
func.array_agg(CtiDirectoryFields.value).label('values'),
).join(
Directories,
Directories.uri == CtiDirectories.uri
).outerjoin(
CtiDirectoryFields,
CtiDirectoryFields.dir_id == CtiDirectories.id
).filter(
Directories.dirtype != 'ldapfilter',
).group_by(
CtiDirectories.name,
CtiDirectories.uri,
Directories.dirtype,
Directories.xivo_username,
Directories.xivo_password,
......@@ -127,6 +128,7 @@ def _get_nonldap_sources(session):
Directories.xivo_custom_ca_path,
Directories.dird_tenant,
Directories.dird_phonebook,
Directories.uri,
CtiDirectories.delimiter,
CtiDirectories.match_direct,
CtiDirectories.match_reverse,
......
......@@ -21,7 +21,7 @@ from xivo_dao.alchemy.ldapserver import LdapServer
@daosession
def build_ldapinfo_from_ldapfilter(session, ldap_filter_name):
def build_ldapinfo_from_ldapfilter(session, ldapfilter_id):
ldap_config = session.query(
LdapFilter.name,
LdapFilter.user,
......@@ -34,13 +34,13 @@ def build_ldapinfo_from_ldapfilter(session, ldap_filter_name):
LdapServer,
LdapServer.id == LdapFilter.ldapserverid
).filter(
LdapFilter.name == ldap_filter_name,
LdapFilter.id == ldapfilter_id,
LdapFilter.commented == 0,
LdapServer.disable == 0,
).first()
if not ldap_config:
raise LookupError('No ldap config matching filter %s', ldap_filter_name)
raise LookupError('No ldap config matching filter %s', ldapfilter_id)
ssl = ldap_config.securitylayer == 'ssl'
host = ldap_config.host or 'localhost'
......
......@@ -609,8 +609,8 @@ class DAOTestCase(unittest.TestCase):
self.add_me(directory)
cti_directory = CtiDirectories(name=directory_args['name'],
uri=directory_args['uri'],
match_direct='')
match_direct='',
directory_id=directory.id)
self.add_me(cti_directory)
def add_trunk(self, **kwargs):
......
......@@ -56,18 +56,30 @@ class TestDirectoryLdapSources(DAOTestCase):
filter='l=Québec',
)
self.add_me(ldap_filter_2)
directory_1 = Directories(
name='ldap_1',
dirtype='ldapfilter',
ldapfilter_id=ldap_filter_1.id,
)
self.add_me(directory_1)
directory_2 = Directories(
name='ldap_2',
dirtype='ldapfilter',
ldapfilter_id=ldap_filter_2.id,
)
self.add_me(directory_2)
self.cti_directory_1 = CtiDirectories(
name='ldapdirectory_1',
uri='ldapfilter://{}'.format(ldap_filter_1.name),
match_direct='["cn"]',
match_reverse='["telephoneNumber"]',
directory_id=directory_1.id,
)
self.add_me(self.cti_directory_1)
self.cti_directory_2 = CtiDirectories(
name='ldapdirectory_2',
uri='ldapfilter://{}'.format(ldap_filter_2.name),
match_direct='["cn"]',
match_reverse='["telephoneNumber"]',
directory_id=directory_2.id,
)
self.add_me(self.cti_directory_2)
fields = {'firstname': '{givenName}',
......@@ -122,9 +134,9 @@ class TestDirectoryLdapSources(DAOTestCase):
def test_that_a_missing_ldap_config_does_not_break_get_all_sources(self):
directory_with_no_matching_config = CtiDirectories(
name='brokenldap',
uri='ldapfilter://missingldapconfig',
match_direct='["cn"]',
match_reverse='["telephoneNumber"]',
directory_id=None,
)
self.add_me(directory_with_no_matching_config)
......@@ -168,26 +180,28 @@ class TestDirectoryNonLdapSources(DAOTestCase):
{'uri': 'file:///tmp/test.csv', 'dirtype': 'file', 'name': 'my_csv'},
{'uri': 'postgresql://', 'dirtype': 'dird_phonebook', 'name': 'dird', 'dird_tenant': 'tenant', 'dird_phonebook': 'thephonebook'},
]
d1, d2, _, d4, d5 = directories = [Directories(**config) for config in self.directory_configs]
self.add_me_all(directories)
self.cti_directory_configs = [
{'id': 1,
'name': 'Internal',
'uri': 'http://localhost:9487',
'directory_id': d1.id,
'match_direct': '["firstname", "lastname"]',
'match_reverse': '["exten"]'},
{'id': 2,
'name': 'mtl',
'uri': 'http://mtl.lan.example.com:9487',
'directory_id': d2.id,
'match_direct': '',
'match_reverse': '[]'},
{'id': 3,
'name': 'acsvfile',
'uri': 'file:///tmp/test.csv',
'directory_id': d4.id,
'match_direct': '["firstname", "lastname"]',
'match_reverse': '["exten"]',
'delimiter': '|'},
{'id': 4,
'name': 'mydirdphonebook',
'uri': 'postgresql://',
'directory_id': d5.id,
'match_direct': '',
'match_reverse': '[]'},
]
......@@ -278,10 +292,9 @@ class TestDirectoryNonLdapSources(DAOTestCase):
}
def test_get_all_sources(self):
directories = [Directories(**config) for config in self.directory_configs]
cti_directories = [CtiDirectories(**config) for config in self.cti_directory_configs]
cti_directory_fields = [CtiDirectoryFields(**config) for config in self.cti_directory_fields_configs]
self.add_me_all(chain(directories, cti_directories, cti_directory_fields))
self.add_me_all(chain(cti_directories, cti_directory_fields))
result = directory_dao.get_all_sources()
......@@ -291,10 +304,9 @@ class TestDirectoryNonLdapSources(DAOTestCase):
self.expected_result_4))
def test_get_all_sources_no_fields(self):
directories = [Directories(**config) for config in self.directory_configs]
cti_directories = [CtiDirectories(**config) for config in self.cti_directory_configs[:-1]]
cti_directory_fields = [CtiDirectoryFields(**config) for config in self.cti_directory_fields_configs]
self.add_me_all(chain(directories, cti_directories, cti_directory_fields[2:]))
self.add_me_all(chain(cti_directories, cti_directory_fields[2:]))
result = directory_dao.get_all_sources()
......@@ -304,6 +316,9 @@ class TestDirectoryNonLdapSources(DAOTestCase):
assert_that(result, contains_inanyorder(*expected))
class TestDirectoryNoSources(DAOTestCase):
def test_get_all_sources_no_directories(self):
results = directory_dao.get_all_sources()
......
......@@ -27,28 +27,28 @@ from xivo_dao.tests.test_dao import DAOTestCase
class TestLdapDAO(DAOTestCase):
def test_build_ldapinfo_from_ldapfilter_not_found(self):
self.assertRaises(LookupError, ldap_dao.build_ldapinfo_from_ldapfilter, 'unknown')
self.assertRaises(LookupError, ldap_dao.build_ldapinfo_from_ldapfilter, 42)
def test_build_ldapinfo_from_ldapfilter_disabled_filter(self):
filter_name = 'filtername'
ldap_server = self._insert_ldapserver(name='ldapserver_test')
ldap_filter = self._insert_ldapfilter(ldap_server.id, name=filter_name, commented=1)
self.assertRaises(LookupError, ldap_dao.build_ldapinfo_from_ldapfilter, ldap_filter.name)
self.assertRaises(LookupError, ldap_dao.build_ldapinfo_from_ldapfilter, ldap_filter.id)
def test_build_ldapinfo_from_ldapfilter_disabled_server(self):
filter_name = 'filtername'
ldap_server = self._insert_ldapserver(name='ldapserver_test', disable=1)
ldap_filter = self._insert_ldapfilter(ldap_server.id, name=filter_name)
self.assertRaises(LookupError, ldap_dao.build_ldapinfo_from_ldapfilter, ldap_filter.name)
self.assertRaises(LookupError, ldap_dao.build_ldapinfo_from_ldapfilter, ldap_filter.id)
def test_build_ldapinfo_from_ldapfilter_minimum_fields(self):
filter_name = 'filtername'
ldap_server = self._insert_ldapserver(name='ldapserver_test')
ldap_filter = self._insert_ldapfilter(ldap_server.id, name=filter_name)
result = ldap_dao.build_ldapinfo_from_ldapfilter(ldap_filter.name)
result = ldap_dao.build_ldapinfo_from_ldapfilter(ldap_filter.id)
assert_that(result, has_entries({
'username': '',
......@@ -66,7 +66,7 @@ class TestLdapDAO(DAOTestCase):
ldap_server = self._insert_ldapserver(name='ldapserver_test', securitylayer='ssl', port=636)
ldap_filter = self._insert_ldapfilter(ldap_server.id, name=filter_name)
result = ldap_dao.build_ldapinfo_from_ldapfilter(ldap_filter.name)
result = ldap_dao.build_ldapinfo_from_ldapfilter(ldap_filter.id)
assert_that(result, has_entries({
'username': '',
......@@ -90,7 +90,7 @@ class TestLdapDAO(DAOTestCase):
basedn='cn=User,dc=company,dc=com',
filter='sn=*')
result = ldap_dao.build_ldapinfo_from_ldapfilter(ldap_filter.name)
result = ldap_dao.build_ldapinfo_from_ldapfilter(ldap_filter.id)
assert_that(result, has_entries({
'username': ldap_filter.user,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment