Commit e79fb391 authored by Szilárd Pfeiffer's avatar Szilárd Pfeiffer
Browse files

Merge branch '23-ldap-related-messages'

Closes: #23
parents 7699ab19 4796242a
Loading
Loading
Loading
Loading
Loading
+206 −0
Original line number Diff line number Diff line
# -*- coding: utf-8 -*-

import abc
import enum
import re
import six

import attr

import asn1crypto.core

from cryptoparser.common.exception import NotEnoughData, InvalidValue
from cryptoparser.common.parse import ParsableBase


class LDAPClass(enum.IntEnum):
    UNIVERSAL = 0
    APPLICATION = 1
    CONTEXT = 2


class LDAPResultCode(enum.IntEnum):
    SUCCESS = 0
    OPERATIONS_ERROR = 1
    PROTOCOL_ERROR = 2
    TIME_LIMIT_EXCEEDED = 3
    SIZE_LIMIT_EXCEEDED = 4
    COMPARE_FALSE = 5
    COMPARE_TRUE = 6
    AUTH_METHOD_NOT_SUPPORTED = 7
    STRONGER_AUTH_REQUIRED = 8
    REFERRAL = 10
    ADMIN_LIMIT_EXCEEDED = 11
    UNAVAILABLE_CRITICAL_EXTENSION = 12
    CONFIDENTIALITY_REQUIRED = 13
    SASL_BIND_IN_PROGRESS = 14
    NO_SUCH_ATTRIBUTE = 16
    UNDEFINED_ATTRIBUTE_TYPE = 17
    INAPPROPRIATE_MATCHING = 18
    CONSTRAINT_VIOLATION = 19
    ATTRIBUTE_OR_VALUE_EXISTS = 20
    INVALID_ATTRIBUTE_SYNTAX = 21
    NO_SUCH_OBJECT = 32
    ALIAS_PROBLEM = 33
    INVALID_DN_SYNTAX = 34
    ALIAS_DEREFERENCING_PROBLEM = 36
    INAPPROPRIATE_AUTHENTICATION = 48
    INVALID_CREDENTIALS = 49
    INSUFFICIENT_ACCESS_RIGHTS = 50
    BUSY = 51
    UNAVAILABLE = 52
    UNWILLING_TO_PERFORM = 53
    LOOP_DETECT = 54
    NAMING_VIOLATION = 64
    OBJECT_CLASS_VIOLATION = 65
    NOT_ALLOWED_ON_NON_LEAF = 66
    NOT_ALLOWED_ON_RDN = 67
    ENTRY_ALREADY_EXISTS = 68
    OBJECT_CLASS_MODS_PROHIBITED = 69
    AFFECTS_MULTIPLE_DSAS = 71
    OTHER = 80


class LDAPResultCodeEnum(asn1crypto.core.Enumerated):
    _map = {result_code.value: result_code for result_code in list(LDAPResultCode)}


class LDAPOID(asn1crypto.core.OctetString):
    pass


class LDAPControl(asn1crypto.core.Sequence):
    _fields = [
        ('controlType', LDAPOID),
        ('criticality', asn1crypto.core.Boolean, {'default': False}),
        ('controlValue', asn1crypto.core.OctetString, {'optional': True}),
    ]


class LDAPControls(asn1crypto.core.SequenceOf):
    _child_spec = LDAPControl


class LDAPExtendedRequest(asn1crypto.core.Sequence):
    _fields = [
        ('requestName', LDAPOID, {'implicit': (LDAPClass.CONTEXT.value, 0)}),
        ('requestValue', asn1crypto.core.OctetString, {'implicit': (LDAPClass.CONTEXT.value, 1), 'optional': True}),
    ]


class LDAPDN(asn1crypto.core.OctetString):
    pass


class LDAPString(asn1crypto.core.OctetString):
    pass


class LDAPURI(LDAPString):
    pass


class LDAPReferral(asn1crypto.core.SequenceOf):
    _child_spec = LDAPURI


class LDAPExtendedResponse(asn1crypto.core.Sequence):
    _fields = [
        ('resultCode', LDAPResultCodeEnum),
        ('matchedDN', LDAPDN),
        ('diagnosticMessage', LDAPString),
        ('referral', LDAPReferral,  {'implicit': (LDAPClass.CONTEXT.value, 3), 'optional': True}),
        ('responseName', LDAPOID, {'implicit': (LDAPClass.CONTEXT.value, 10), 'optional': True}),
        ('responseValue', asn1crypto.core.OctetString, {'implicit': (LDAPClass.CONTEXT.value, 11), 'optional': True}),
    ]


class LDAPProtocolOp(asn1crypto.core.Choice):
    _alternatives = [
        ('extendedReq', LDAPExtendedRequest, {'implicit': (LDAPClass.APPLICATION.value, 23)}),
        ('extendedResp', LDAPExtendedResponse, {'implicit': (LDAPClass.APPLICATION.value, 24)}),
    ]


class LDAPMessage(asn1crypto.core.Sequence):
    _fields = [
        ('messageID', asn1crypto.core.Integer),
        ('protocolOp', LDAPProtocolOp),
        ('controls', LDAPControls, {'implicit': (LDAPClass.CONTEXT.value, 0), 'optional': True}),
    ]


class LDAPMessageParsableBase(ParsableBase):
    HEADER_SIZE = 6

    _NOT_ENOUGH_DATA_REGEX = re.compile(
        r'Insufficient data - ([0-9]+) bytes requested but only ([0-9]+) available'
    )

    @classmethod
    @abc.abstractmethod
    def _parse(cls, parsable):
        raise NotImplementedError()

    @abc.abstractmethod
    def compose(self):
        raise NotImplementedError()

    @classmethod
    def _parse_asn1(cls, parsable):
        try:
            message = LDAPMessage.load(bytes(parsable))
            # ensure recursive parsing
            message.native  # pylint: disable=pointless-statement
            return message
        except ValueError as e:
            match = cls._NOT_ENOUGH_DATA_REGEX.match(e.args[0])
            if match:
                bytes_requested = int(match.group(1))
                bytes_available = int(match.group(2))
                six.raise_from(NotEnoughData(bytes_requested - bytes_available), e)
            else:
                six.raise_from(InvalidValue(parsable, cls), e)


class LDAPExtendedRequestStartTLS(LDAPMessageParsableBase):
    @classmethod
    def _parse(cls, parsable):
        asn1_message = cls._parse_asn1(parsable)

        return LDAPExtendedRequestStartTLS(), len(asn1_message.dump())

    def compose(self):
        return LDAPMessage({
            'messageID': 1,
            'protocolOp': {
                'extendedReq': {
                    'requestName': b'1.3.6.1.4.1.1466.20037'
                }
            }
        }).dump()


@attr.s
class LDAPExtendedResponseStartTLS(LDAPMessageParsableBase):
    result_code = attr.ib(validator=attr.validators.in_(LDAPResultCode))

    @classmethod
    def _parse(cls, parsable):
        asn1_message = cls._parse_asn1(parsable)

        return LDAPExtendedResponseStartTLS(
            asn1_message['protocolOp'].chosen['resultCode'].native
        ), len(asn1_message.dump())

    def compose(self):
        return LDAPMessage({
            'messageID': 1,
            'protocolOp': {
                'extendedResp': {
                    'resultCode': self.result_code.value,
                    'matchedDN': b'',
                    'diagnosticMessage': b''
                }
            }
        }).dump()
+1 −0
Original line number Diff line number Diff line
asn1crypto
attrs>=19.1
enum34==1.1.6;python_version<"3.4"
six

test/tls/test_ldap.py

0 → 100644
+105 −0
Original line number Diff line number Diff line
# -*- coding: utf-8 -*-

import copy

import unittest
import collections

from cryptoparser.common.exception import InvalidValue, NotEnoughData

from cryptoparser.tls.ldap import (
    LDAPExtendedRequestStartTLS,
    LDAPExtendedResponseStartTLS,
    LDAPResultCode,
)


class TestLDAPExtendedRequest(unittest.TestCase):
    def setUp(self):
        self.ldap_extended_request_dict = collections.OrderedDict([
            ('message_sequence', b'\x30\x1d'),
            ('message_id', b'\x02\x01\x01'),
            ('protocol_op', b'\x77\x18'),
            ('extended_request', b'\x80'),
            ('request_name', (
                b'\x16\x31\x2e\x33\x2e\x36\x2e\x31\x2e\x34\x2e\x31\x2e\x31\x34\x36' +
                b'\x36\x2e\x32\x30\x30\x33\x37'
            )),
        ])
        self.ldap_extended_request_bytes = b''.join(self.ldap_extended_request_dict.values())

        self.ldap_extended_request = LDAPExtendedRequestStartTLS()

    def test_parse(self):
        LDAPExtendedRequestStartTLS.parse_exact_size(self.ldap_extended_request_bytes)

    def test_compose(self):
        self.assertEqual(self.ldap_extended_request.compose(), self.ldap_extended_request_bytes)


class TestLDAPExtendedResponseMinimal(unittest.TestCase):
    def setUp(self):
        self.ldap_extended_response_dict = collections.OrderedDict([
            ('message_sequence', b'\x30\x0c'),
            ('message_id', b'\x02\x01\x01'),
            ('protocol_op', b'\x78\x07'),
            ('extended_response', b''),
            ('result_code', b'\x0a\x01\x07'),
            ('matched_dn', b'\x04\x00'),
            ('diagnostic_message', b'\x04\x00'),
            ('referral', b''),
        ])
        self.ldap_extended_response_bytes = b''.join(self.ldap_extended_response_dict.values())

        self.ldap_extended_response = LDAPExtendedResponseStartTLS(LDAPResultCode.AUTH_METHOD_NOT_SUPPORTED)

    def test_error(self):
        with self.assertRaises(NotEnoughData) as context_manager:
            # pylint: disable=expression-not-assigned
            LDAPExtendedResponseStartTLS.parse_exact_size(
                    self.ldap_extended_response_bytes[:LDAPExtendedResponseStartTLS.HEADER_SIZE]
            )
        self.assertEqual(
            context_manager.exception.bytes_needed,
            len(self.ldap_extended_response_bytes) - LDAPExtendedResponseStartTLS.HEADER_SIZE
        )

        ldap_extended_response_dict = copy.copy(self.ldap_extended_response_dict)
        ldap_extended_response_dict['protocol_op'] = b'\xff\xff'
        ldap_extended_response_bytes = b''.join(ldap_extended_response_dict.values())

        with self.assertRaises(InvalidValue) as context_manager:
            # pylint: disable=expression-not-assigned
            LDAPExtendedResponseStartTLS.parse_exact_size(ldap_extended_response_bytes)

    def test_parse(self):
        ldap_extended_response = LDAPExtendedResponseStartTLS.parse_exact_size(self.ldap_extended_response_bytes)
        self.assertEqual(ldap_extended_response.result_code, self.ldap_extended_response.result_code)

    def test_compose(self):
        self.assertEqual(self.ldap_extended_response.compose(), self.ldap_extended_response_bytes)


class TestLDAPExtendedResponseFull(unittest.TestCase):
    def setUp(self):
        self.ldap_extended_response_dict = collections.OrderedDict([
            ('message_sequence', b'\x30\x34'),
            ('message_id', b'\x02\x01\x01'),
            ('protocol_op', b'\x78\x2f'),
            ('extended_response', b''),
            ('result_code', b'\x0a\x01\x07'),
            ('matched_dn', b'\x04\x08\x00\x01\x02\x03\x04\x05\x06\x07'),
            ('diagnostic_message', b'\x04\x08\x00\x01\x02\x03\x04\x05\x06\x07'),
            ('referral', b''),
            ('response_name', (
                b'\x8a\x16\x31\x2e\x33\x2e\x36\x2e\x31\x2e\x34\x2e\x31\x2e\x31\x34' +
                b'\x36\x36\x2e\x32\x30\x30\x33\x37'
            )),
        ])
        self.ldap_extended_response_bytes = b''.join(self.ldap_extended_response_dict.values())

        self.ldap_extended_response = LDAPExtendedResponseStartTLS(LDAPResultCode.AUTH_METHOD_NOT_SUPPORTED)

    def test_parse(self):
        ldap_extended_response = LDAPExtendedResponseStartTLS.parse_exact_size(self.ldap_extended_response_bytes)
        self.assertEqual(ldap_extended_response.result_code, self.ldap_extended_response.result_code)