Loading test/ike/test_dh.py +38 −29 Original line number Diff line number Diff line Loading @@ -6,6 +6,7 @@ Common test base for DH group analyzers. import typing import unittest import unittest.mock from test.common.classes import TestLoggerBase Loading Loading @@ -96,27 +97,35 @@ class TestAnalyzerDHBase(TestLoggerBase): self.assertNotIn('No response from server', log_output) threaded_server.join() def _run_test_no_proposal_chosen(self, protocol_version): threaded_server = create_ike_server( L7ServerIke, configuration=self.get_no_proposal_config(), max_handshake_count=self.get_max_handshakes(), timeout=self.get_server_timeout(), @staticmethod def _make_mock_client(): l7_client = L7ClientIPsecBase.from_scheme( 'ipsec', 'localhost', 65535, L4TransferSocketParams(timeout=0.1) ) l4_transfer = threaded_server.l7_server.l4_transfer assert l4_transfer is not None l7_client.l4_socket_params.throttle_delay = 0 l7_client.do_ikev1_handshake = unittest.mock.MagicMock() l7_client.do_ikev2_handshake = unittest.mock.MagicMock() return l7_client result = self._get_result( 'localhost', l4_transfer.bind_port, protocol_version, ip=l4_transfer.bind_address, ) def _run_test_no_proposal_chosen(self, protocol_version): l7_client = self._make_mock_client() if protocol_version == IsakmpVersion.V1: l7_client.do_ikev1_handshake.side_effect = IsakmpNotify(Ikev1NotifyType.NO_PROPOSAL_CHOSEN) else: l7_client.do_ikev2_handshake.side_effect = IsakmpNotify(Ikev2NotifyType.NO_PROPOSAL_CHOSEN) analyzer = self.get_analyzer_class()() result = analyzer.analyze(l7_client, protocol_version) self.assertEqual(len(result.groups), 0) self.assertGreater( l7_client.do_ikev1_handshake.call_count if protocol_version == IsakmpVersion.V1 else l7_client.do_ikev2_handshake.call_count, 0, 'Analyzer must probe at least one Diffie-Hellman group', ) log_output = '\n'.join(self.get_log_lines()) self.assertIn('No proposal chosen', log_output) threaded_server.join() def test_ikev1(self): self._run_test_success(IsakmpVersion.V1, self.get_expected_groups_ikev1()) Loading @@ -131,25 +140,25 @@ class TestAnalyzerDHBase(TestLoggerBase): self._run_test_no_proposal_chosen(IsakmpVersion.V2) def _run_test_no_response(self, protocol_version): threaded_server = create_ike_server( L7ServerIke, configuration=IkeServerConfiguration(response_mode=ServerResponseMode.NONE), ) l4_transfer = threaded_server.l7_server.l4_transfer assert l4_transfer is not None l7_client = self._make_mock_client() no_response = NetworkError(NetworkErrorType.NO_RESPONSE) if protocol_version == IsakmpVersion.V1: l7_client.do_ikev1_handshake.side_effect = no_response else: l7_client.do_ikev2_handshake.side_effect = no_response result = self._get_result( 'localhost', l4_transfer.bind_port, protocol_version, l4_socket_params=L4TransferSocketParams(timeout=0.5), ip=l4_transfer.bind_address, ) analyzer = self.get_analyzer_class()() result = analyzer.analyze(l7_client, protocol_version) self.assertEqual(len(result.groups), 0) self.assertGreater( l7_client.do_ikev1_handshake.call_count if protocol_version == IsakmpVersion.V1 else l7_client.do_ikev2_handshake.call_count, 0, 'Analyzer must probe at least one Diffie-Hellman group', ) log_output = '\n'.join(self.get_log_lines()) self.assertIn('No response', log_output) threaded_server.join() def test_ikev1_no_response(self): self._run_test_no_response(IsakmpVersion.V1) Loading Loading
test/ike/test_dh.py +38 −29 Original line number Diff line number Diff line Loading @@ -6,6 +6,7 @@ Common test base for DH group analyzers. import typing import unittest import unittest.mock from test.common.classes import TestLoggerBase Loading Loading @@ -96,27 +97,35 @@ class TestAnalyzerDHBase(TestLoggerBase): self.assertNotIn('No response from server', log_output) threaded_server.join() def _run_test_no_proposal_chosen(self, protocol_version): threaded_server = create_ike_server( L7ServerIke, configuration=self.get_no_proposal_config(), max_handshake_count=self.get_max_handshakes(), timeout=self.get_server_timeout(), @staticmethod def _make_mock_client(): l7_client = L7ClientIPsecBase.from_scheme( 'ipsec', 'localhost', 65535, L4TransferSocketParams(timeout=0.1) ) l4_transfer = threaded_server.l7_server.l4_transfer assert l4_transfer is not None l7_client.l4_socket_params.throttle_delay = 0 l7_client.do_ikev1_handshake = unittest.mock.MagicMock() l7_client.do_ikev2_handshake = unittest.mock.MagicMock() return l7_client result = self._get_result( 'localhost', l4_transfer.bind_port, protocol_version, ip=l4_transfer.bind_address, ) def _run_test_no_proposal_chosen(self, protocol_version): l7_client = self._make_mock_client() if protocol_version == IsakmpVersion.V1: l7_client.do_ikev1_handshake.side_effect = IsakmpNotify(Ikev1NotifyType.NO_PROPOSAL_CHOSEN) else: l7_client.do_ikev2_handshake.side_effect = IsakmpNotify(Ikev2NotifyType.NO_PROPOSAL_CHOSEN) analyzer = self.get_analyzer_class()() result = analyzer.analyze(l7_client, protocol_version) self.assertEqual(len(result.groups), 0) self.assertGreater( l7_client.do_ikev1_handshake.call_count if protocol_version == IsakmpVersion.V1 else l7_client.do_ikev2_handshake.call_count, 0, 'Analyzer must probe at least one Diffie-Hellman group', ) log_output = '\n'.join(self.get_log_lines()) self.assertIn('No proposal chosen', log_output) threaded_server.join() def test_ikev1(self): self._run_test_success(IsakmpVersion.V1, self.get_expected_groups_ikev1()) Loading @@ -131,25 +140,25 @@ class TestAnalyzerDHBase(TestLoggerBase): self._run_test_no_proposal_chosen(IsakmpVersion.V2) def _run_test_no_response(self, protocol_version): threaded_server = create_ike_server( L7ServerIke, configuration=IkeServerConfiguration(response_mode=ServerResponseMode.NONE), ) l4_transfer = threaded_server.l7_server.l4_transfer assert l4_transfer is not None l7_client = self._make_mock_client() no_response = NetworkError(NetworkErrorType.NO_RESPONSE) if protocol_version == IsakmpVersion.V1: l7_client.do_ikev1_handshake.side_effect = no_response else: l7_client.do_ikev2_handshake.side_effect = no_response result = self._get_result( 'localhost', l4_transfer.bind_port, protocol_version, l4_socket_params=L4TransferSocketParams(timeout=0.5), ip=l4_transfer.bind_address, ) analyzer = self.get_analyzer_class()() result = analyzer.analyze(l7_client, protocol_version) self.assertEqual(len(result.groups), 0) self.assertGreater( l7_client.do_ikev1_handshake.call_count if protocol_version == IsakmpVersion.V1 else l7_client.do_ikev2_handshake.call_count, 0, 'Analyzer must probe at least one Diffie-Hellman group', ) log_output = '\n'.join(self.get_log_lines()) self.assertIn('No response', log_output) threaded_server.join() def test_ikev1_no_response(self): self._run_test_no_response(IsakmpVersion.V1) Loading