Commit 6a566be4 authored by Etienne Lessard's avatar Etienne Lessard

hint_dao: handle users with multiple lines (2/2)

Now, all the extensions referencing a same user will share a similar
hint definition (hint "argument"), whatever the user's lines share the
same extension or not or whatever the line's extension are defined in
the same context or not. This should make hints for user with multiple
lines more predictable.
parent ca2627d6
......@@ -85,8 +85,39 @@ def calluser_extension(session):
@daosession
def user_hints(session, context):
user_extensions = _list_user_extensions(session, context)
if not user_extensions:
return tuple()
user_arguments = _list_user_arguments(session, set(item.user_id for item in user_extensions))
hints = []
for user_id, extension in user_extensions:
argument = user_arguments.get(user_id)
if argument:
hints.append(Hint(user_id=user_id, extension=extension, argument=argument))
return tuple(hints)
def _list_user_extensions(session, context):
query = (session.query(UserFeatures.id.label('user_id'),
Extension.exten.label('extension'))
.distinct()
.join(UserLine.userfeatures)
.join(LineExtension,
UserLine.line_id == LineExtension.line_id)
.join(Extension,
LineExtension.extension_id == Extension.id)
.join(FuncKeyDestUser,
FuncKeyDestUser.user_id == UserFeatures.id)
.filter(UserLine.main_user == True)
.filter(LineExtension.main_extension == True)
.filter(Extension.context == context)
.filter(UserFeatures.enablehint == 1))
return query.all()
def _list_user_arguments(session, user_ids):
query = (session.query(UserFeatures.id.label('user_id'),
Extension.exten.label('extension'),
sql.func.string_agg(sql.case([
(LineFeatures.protocol == 'sip', literal_column("'SIP/'") + UserSIP.name),
(LineFeatures.protocol == 'sccp', literal_column("'SCCP/'") + SCCPLine.name),
......@@ -106,23 +137,11 @@ def user_hints(session, context):
sql.and_(
LineFeatures.protocol == 'custom',
LineFeatures.protocolid == UserCustom.id))
.join(LineExtension,
UserLine.line_id == LineExtension.line_id)
.join(Extension,
LineExtension.extension_id == Extension.id)
.join(FuncKeyDestUser,
FuncKeyDestUser.user_id == UserFeatures.id)
.filter(UserFeatures.id.in_(user_ids))
.filter(UserLine.main_user == True)
.filter(LineExtension.main_extension == True)
.filter(LineFeatures.commented == 0)
.filter(Extension.context == context)
.filter(UserFeatures.enablehint == 1)
.group_by(UserFeatures.id, Extension.exten))
return tuple(Hint(user_id=row.user_id,
extension=row.extension,
argument=row.argument)
for row in query)
.group_by(UserFeatures.id))
return {row.user_id: row.argument for row in query}
@daosession
......
......@@ -85,6 +85,7 @@ class TestHints(DAOTestCase, FuncKeyHelper):
super(TestHints, self).setUp()
self.setup_funckeys()
self.context = 'mycontext'
self.context2 = 'mycontext2'
def add_user_and_func_key(self, protocol='sip', protocol_id=None, exten='1000', commented=0, enablehint=1):
if not protocol_id:
......@@ -188,12 +189,26 @@ class TestUserHints(TestHints):
self.add_sip_line_to_extension_and_user('line1', user.id, extension1.id)
self.add_sip_line_to_extension_and_user('line2', user.id, extension2.id, main_line=False)
expected = [a_hint(user_id=user.id, extension='1001', argument='SIP/line1'),
a_hint(user_id=user.id, extension='1002', argument='SIP/line2')]
expected_argument = any_of('SIP/line1&SIP/line2', 'SIP/line2&SIP/line1')
expected = [a_hint(user_id=user.id, extension='1001', argument=expected_argument),
a_hint(user_id=user.id, extension='1002', argument=expected_argument)]
assert_that(hint_dao.user_hints(self.context), contains_inanyorder(*expected))
def test_given_one_user_three_lines_two_extension_then_returns_user_hint(self):
def test_given_one_user_two_lines_two_extensions_two_contexts_then_returns_user_hint(self):
user = self.add_user(enablehint=1)
extension1 = self.add_extension(exten='1001', context=self.context)
extension2 = self.add_extension(exten='1002', context=self.context2)
self.add_user_destination(user.id)
self.add_sip_line_to_extension_and_user('line1', user.id, extension1.id)
self.add_sip_line_to_extension_and_user('line2', user.id, extension2.id, main_line=False)
expected = a_hint(user_id=user.id, extension='1001',
argument=any_of('SIP/line1&SIP/line2', 'SIP/line2&SIP/line1'))
assert_that(hint_dao.user_hints(self.context), contains(expected))
def test_given_one_user_three_lines_two_extensions_then_returns_user_hint(self):
user = self.add_user(enablehint=1)
extension1 = self.add_extension(exten='1001', context=self.context)
extension2 = self.add_extension(exten='1002', context=self.context)
......@@ -202,9 +217,14 @@ class TestUserHints(TestHints):
self.add_sip_line_to_extension_and_user('line2', user.id, extension2.id, main_line=False)
self.add_sip_line_to_extension_and_user('line3', user.id, extension2.id, main_line=False)
expected = [a_hint(user_id=user.id, extension='1001', argument='SIP/line1'),
a_hint(user_id=user.id, extension='1002',
argument=any_of('SIP/line2&SIP/line3', 'SIP/line3&SIP/line2'))]
expected_argument = any_of('SIP/line1&SIP/line2&SIP/line3',
'SIP/line1&SIP/line3&SIP/line2',
'SIP/line2&SIP/line1&SIP/line3',
'SIP/line2&SIP/line3&SIP/line1',
'SIP/line3&SIP/line1&SIP/line2',
'SIP/line3&SIP/line2&SIP/line1')
expected = [a_hint(user_id=user.id, extension='1001', argument=expected_argument),
a_hint(user_id=user.id, extension='1002', argument=expected_argument)]
assert_that(hint_dao.user_hints(self.context), contains_inanyorder(*expected))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment