Commit 67528a9d authored by Pascal Cadotte Michaud's avatar Pascal Cadotte Michaud

Merge remote-tracking branch 'origin/6405-multi-lines-hints'

parents 4bfa3efa 6a566be4
......@@ -85,13 +85,44 @@ def calluser_extension(session):
@daosession
def user_hints(session, context):
user_extensions = _list_user_extensions(session, context)
if not user_extensions:
return tuple()
user_arguments = _list_user_arguments(session, set(item.user_id for item in user_extensions))
hints = []
for user_id, extension in user_extensions:
argument = user_arguments.get(user_id)
if argument:
hints.append(Hint(user_id=user_id, extension=extension, argument=argument))
return tuple(hints)
def _list_user_extensions(session, context):
query = (session.query(UserFeatures.id.label('user_id'),
Extension.exten.label('extension'),
sql.case([
Extension.exten.label('extension'))
.distinct()
.join(UserLine.userfeatures)
.join(LineExtension,
UserLine.line_id == LineExtension.line_id)
.join(Extension,
LineExtension.extension_id == Extension.id)
.join(FuncKeyDestUser,
FuncKeyDestUser.user_id == UserFeatures.id)
.filter(UserLine.main_user == True)
.filter(LineExtension.main_extension == True)
.filter(Extension.context == context)
.filter(UserFeatures.enablehint == 1))
return query.all()
def _list_user_arguments(session, user_ids):
query = (session.query(UserFeatures.id.label('user_id'),
sql.func.string_agg(sql.case([
(LineFeatures.protocol == 'sip', literal_column("'SIP/'") + UserSIP.name),
(LineFeatures.protocol == 'sccp', literal_column("'SCCP/'") + SCCPLine.name),
(LineFeatures.protocol == 'custom', UserCustom.interface)
]).label('argument'))
]), literal_column("'&'")).label('argument'))
.join(UserLine.userfeatures)
.join(UserLine.linefeatures)
.outerjoin(UserSIP,
......@@ -106,23 +137,11 @@ def user_hints(session, context):
sql.and_(
LineFeatures.protocol == 'custom',
LineFeatures.protocolid == UserCustom.id))
.join(LineExtension,
UserLine.line_id == LineExtension.line_id)
.join(Extension,
LineExtension.extension_id == Extension.id)
.join(FuncKeyDestUser,
FuncKeyDestUser.user_id == UserFeatures.id)
.filter(UserFeatures.id.in_(user_ids))
.filter(UserLine.main_user == True)
.filter(UserLine.main_line == True)
.filter(LineExtension.main_extension == True)
.filter(LineFeatures.commented == 0)
.filter(Extension.context == context)
.filter(UserFeatures.enablehint == 1))
return tuple(Hint(user_id=row.user_id,
extension=row.extension,
argument=row.argument)
for row in query)
.group_by(UserFeatures.id))
return {row.user_id: row.argument for row in query}
@daosession
......
......@@ -15,7 +15,9 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>
from hamcrest import assert_that, equal_to, contains
from hamcrest import assert_that, equal_to, contains, contains_inanyorder, any_of
from hamcrest.core.base_matcher import BaseMatcher
from hamcrest.core.helpers.wrap_matcher import wrap_matcher
from xivo_dao.tests.test_dao import DAOTestCase
from xivo_dao.alchemy.callfilter import Callfilter
......@@ -26,6 +28,31 @@ from xivo_dao.resources.func_key import hint_dao
from xivo_dao.resources.func_key.model import Hint
class HintMatcher(BaseMatcher):
def __init__(self, user_id, extension, argument_matcher):
self._user_id = user_id
self._extension = extension
self._argument_matcher = argument_matcher
def _matches(self, item):
return (item.user_id == self._user_id and
item.extension == self._extension and
self._argument_matcher.matches(item.argument))
def describe_to(self, description):
(description.append_text('hint with user_id ')
.append_description_of(self._user_id)
.append_text(' extension ')
.append_description_of(self._extension)
.append_text(' argument ')
.append_description_of(self._argument_matcher))
def a_hint(user_id, extension, argument):
return HintMatcher(user_id, extension, wrap_matcher(argument))
class TestProgfunckeyExtension(DAOTestCase):
def test_given_progfunc_key_extension_then_returns_cleaned_progfunckey(self):
......@@ -58,6 +85,7 @@ class TestHints(DAOTestCase, FuncKeyHelper):
super(TestHints, self).setUp()
self.setup_funckeys()
self.context = 'mycontext'
self.context2 = 'mycontext2'
def add_user_and_func_key(self, protocol='sip', protocol_id=None, exten='1000', commented=0, enablehint=1):
if not protocol_id:
......@@ -84,6 +112,12 @@ class TestHints(DAOTestCase, FuncKeyHelper):
main_extension=True)
return user_row
def add_sip_line_to_extension_and_user(self, name, user_id, extension_id, main_line=True):
endpoint = self.add_usersip(name=name, context=self.context)
line = self.add_line(context=self.context, protocol='sip', protocolid=endpoint.id)
self.add_user_line(user_id=user_id, line_id=line.id, main_user=True, main_line=main_line)
self.add_line_extension(line_id=line.id, extension_id=extension_id, main_extension=True)
class TestUserHints(TestHints):
......@@ -91,9 +125,7 @@ class TestUserHints(TestHints):
usersip_row = self.add_usersip(name='abcdef')
user_row = self.add_user_and_func_key('sip', usersip_row.id)
expected = Hint(user_id=user_row.id,
extension='1000',
argument='SIP/abcdef')
expected = a_hint(user_id=user_row.id, extension='1000', argument='SIP/abcdef')
assert_that(hint_dao.user_hints(self.context), contains(expected))
......@@ -101,9 +133,7 @@ class TestUserHints(TestHints):
sccpline_row = self.add_sccpline(name='1001', context=self.context)
user_row = self.add_user_and_func_key('sccp', sccpline_row.id, '1001')
expected = Hint(user_id=user_row.id,
extension='1001',
argument='SCCP/1001')
expected = a_hint(user_id=user_row.id, extension='1001', argument='SCCP/1001')
assert_that(hint_dao.user_hints(self.context), contains(expected))
......@@ -111,9 +141,7 @@ class TestUserHints(TestHints):
custom_row = self.add_usercustom(interface='ghijkl', context=self.context)
user_row = self.add_user_and_func_key('custom', custom_row.id, '1002')
expected = Hint(user_id=user_row.id,
extension='1002',
argument='ghijkl')
expected = a_hint(user_id=user_row.id, extension='1002', argument='ghijkl')
assert_that(hint_dao.user_hints(self.context), contains(expected))
......@@ -136,11 +164,69 @@ class TestUserHints(TestHints):
user1 = self.add_user_and_func_key('sip', self.add_usersip(name='user1').id, '1001')
user2 = self.add_user_and_func_key('sip', self.add_usersip(name='user2').id, '1002')
expected = (Hint(user_id=user1.id, extension='1001', argument='SIP/user1'),
Hint(user_id=user2.id, extension='1002', argument='SIP/user2'))
expected = [a_hint(user_id=user1.id, extension='1001', argument='SIP/user1'),
a_hint(user_id=user2.id, extension='1002', argument='SIP/user2')]
assert_that(hint_dao.user_hints(self.context), equal_to(expected))
assert_that(hint_dao.user_hints(self.context), contains_inanyorder(*expected))
def test_given_one_user_two_lines_one_extension_then_returns_user_hint(self):
user = self.add_user(enablehint=1)
extension = self.add_extension(exten='1001', context=self.context)
self.add_user_destination(user.id)
self.add_sip_line_to_extension_and_user('line1', user.id, extension.id)
self.add_sip_line_to_extension_and_user('line2', user.id, extension.id, main_line=False)
expected = a_hint(user_id=user.id, extension='1001',
argument=any_of('SIP/line1&SIP/line2', 'SIP/line2&SIP/line1'))
assert_that(hint_dao.user_hints(self.context), contains(expected))
def test_given_one_user_two_lines_two_extensions_then_returns_user_hint(self):
user = self.add_user(enablehint=1)
extension1 = self.add_extension(exten='1001', context=self.context)
extension2 = self.add_extension(exten='1002', context=self.context)
self.add_user_destination(user.id)
self.add_sip_line_to_extension_and_user('line1', user.id, extension1.id)
self.add_sip_line_to_extension_and_user('line2', user.id, extension2.id, main_line=False)
expected_argument = any_of('SIP/line1&SIP/line2', 'SIP/line2&SIP/line1')
expected = [a_hint(user_id=user.id, extension='1001', argument=expected_argument),
a_hint(user_id=user.id, extension='1002', argument=expected_argument)]
assert_that(hint_dao.user_hints(self.context), contains_inanyorder(*expected))
def test_given_one_user_two_lines_two_extensions_two_contexts_then_returns_user_hint(self):
user = self.add_user(enablehint=1)
extension1 = self.add_extension(exten='1001', context=self.context)
extension2 = self.add_extension(exten='1002', context=self.context2)
self.add_user_destination(user.id)
self.add_sip_line_to_extension_and_user('line1', user.id, extension1.id)
self.add_sip_line_to_extension_and_user('line2', user.id, extension2.id, main_line=False)
expected = a_hint(user_id=user.id, extension='1001',
argument=any_of('SIP/line1&SIP/line2', 'SIP/line2&SIP/line1'))
assert_that(hint_dao.user_hints(self.context), contains(expected))
def test_given_one_user_three_lines_two_extensions_then_returns_user_hint(self):
user = self.add_user(enablehint=1)
extension1 = self.add_extension(exten='1001', context=self.context)
extension2 = self.add_extension(exten='1002', context=self.context)
self.add_user_destination(user.id)
self.add_sip_line_to_extension_and_user('line1', user.id, extension1.id)
self.add_sip_line_to_extension_and_user('line2', user.id, extension2.id, main_line=False)
self.add_sip_line_to_extension_and_user('line3', user.id, extension2.id, main_line=False)
expected_argument = any_of('SIP/line1&SIP/line2&SIP/line3',
'SIP/line1&SIP/line3&SIP/line2',
'SIP/line2&SIP/line1&SIP/line3',
'SIP/line2&SIP/line3&SIP/line1',
'SIP/line3&SIP/line1&SIP/line2',
'SIP/line3&SIP/line2&SIP/line1')
expected = [a_hint(user_id=user.id, extension='1001', argument=expected_argument),
a_hint(user_id=user.id, extension='1002', argument=expected_argument)]
assert_that(hint_dao.user_hints(self.context), contains_inanyorder(*expected))
class TestConferenceHints(TestHints):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment