Commit ded3a832 authored by Szilárd Pfeiffer's avatar Szilárd Pfeiffer
Browse files

Merge branch '94-tls-1-3-certificate-messages'

Closes: #94
parents 748f89ad b5bebfc1
Loading
Loading
Loading
Loading
Loading
+13 −0
Original line number Diff line number Diff line
@@ -2,6 +2,19 @@
Changelog
=========

-------------------
1.3.0 - Unreleased
-------------------

Features
========

-  TLS (``tls``)

   -  Generic

      -  protocol item classes for certofocate messages in version 1.3 (#94)

------------------
1.2.1 - 2026-06-02
------------------
+123 −4
Original line number Diff line number Diff line
@@ -209,6 +209,7 @@ class TlsHandshakeType(enum.IntEnum):
    HELLO_VERIFY_REQUEST = 0x03
    NEW_SESSION_TICKET = 0x04
    HELLO_RETRY_REQUEST = 0x06
    ENCRYPTED_EXTENSIONS = 0x08
    CERTIFICATE = 0x0b
    SERVER_KEY_EXCHANGE = 0x0c
    CERTIFICATE_REQUEST = 0x0d
@@ -608,6 +609,11 @@ class TlsHandshakeServerHello(TlsHandshakeHello):
        return header_bytes + payload_composer.composed_bytes + extension_bytes


class TlsCertificateType(enum.IntEnum):
    X509 = 0
    RAW_PUBLIC_KEY = 2


@attr.s
class TlsCertificate(ParsableBase):
    certificate = attr.ib(validator=attr.validators.instance_of(bytes))
@@ -639,7 +645,7 @@ class TlsCertificates(VectorParsable):


@attr.s
class TlsHandshakeCertificate(TlsHandshakeMessage):
class TlsHandshakeServerCertificate(TlsHandshakeMessage):
    certificate_chain = attr.ib(validator=attr.validators.instance_of(TlsCertificates))

    @classmethod
@@ -654,7 +660,7 @@ class TlsHandshakeCertificate(TlsHandshakeMessage):

        parser.parse_parsable('certificates', TlsCertificates)

        return TlsHandshakeCertificate(
        return TlsHandshakeServerCertificate(
            parser['certificates']
        ), handshake_header_parser.parsed_length

@@ -667,6 +673,94 @@ class TlsHandshakeCertificate(TlsHandshakeMessage):
        return header_bytes + payload_composer.composed_bytes


@attr.s
class TlsCertificateEntry(ParsableBase):
    certificate = attr.ib(validator=attr.validators.instance_of(TlsCertificate))
    extensions = attr.ib(validator=attr.validators.instance_of((bytes, bytearray)))

    def __attrs_post_init__(self):
        if len(self.extensions) > 2 ** 16 - 1:
            raise InvalidValue(len(self.extensions), self.__class__, 'extensions')

    @classmethod
    def _parse(cls, parsable):
        parser = ParserBinary(parsable)

        parser.parse_parsable('certificate', TlsCertificate)
        parser.parse_bytes('extensions', 2)

        return TlsCertificateEntry(
            parser['certificate'],
            bytes(parser['extensions']),
        ), parser.parsed_length

    def compose(self):
        composer = ComposerBinary()

        composer.compose_parsable(self.certificate)
        composer.compose_bytes(self.extensions, 2)

        return composer.composed_bytes


class TlsCertificateEntryVector(VectorParsable):
    @classmethod
    def get_param(cls):
        return VectorParamParsable(
            item_class=TlsCertificateEntry,
            fallback_class=None,
            min_byte_num=0,
            max_byte_num=2 ** 24 - 1,
        )


@attr.s
class TlsHandshakeCertificate(TlsHandshakeMessage):

    certificate_request_context = attr.ib(validator=attr.validators.instance_of((bytes, bytearray)))
    certificate_entries = attr.ib(validator=attr.validators.instance_of(TlsCertificateEntryVector))

    def __attrs_post_init__(self):
        if len(self.certificate_request_context) > 255:
            raise InvalidValue(len(self.certificate_request_context), self.__class__, 'certificate_request_context')

    @classmethod
    def get_handshake_type(cls):
        return TlsHandshakeType.CERTIFICATE

    @classmethod
    def _parse(cls, parsable):
        handshake_header_parser = cls._parse_handshake_header(parsable)

        parser = ParserBinary(handshake_header_parser['payload'])

        parser.parse_numeric('certificate_request_context_length', 1)
        parser.parse_raw(
            'certificate_request_context',
            parser['certificate_request_context_length'],
        )
        try:
            parser.parse_parsable('certificate_entries', TlsCertificateEntryVector)
        except (NotEnoughData, InvalidValue) as e:
            raise InvalidType() from e

        return TlsHandshakeCertificate(
            bytes(parser['certificate_request_context']),
            parser['certificate_entries'],
        ), handshake_header_parser.parsed_length

    def compose(self):
        payload_composer = ComposerBinary()

        payload_composer.compose_numeric(len(self.certificate_request_context), 1)
        payload_composer.compose_raw(self.certificate_request_context)
        payload_composer.compose_parsable(self.certificate_entries)

        header_bytes = self._compose_header(payload_composer.composed_length)

        return header_bytes + payload_composer.composed_bytes


class TlsHandshakeCertificateStatus(TlsHandshakeMessage):
    def __init__(self, status_type, status):
        super().__init__()
@@ -924,6 +1018,31 @@ class TlsHandshakeHelloRetryRequest(TlsHandshakeHello):
        return header_bytes + payload_composer.composed_bytes + extension_bytes


@attr.s
class TlsHandshakeEncryptedExtensions(TlsHandshakeMessage):
    extension_data = attr.ib(validator=attr.validators.instance_of((bytes, bytearray)))

    @classmethod
    def get_handshake_type(cls):
        return TlsHandshakeType.ENCRYPTED_EXTENSIONS

    @classmethod
    def _parse(cls, parsable):
        parser = cls._parse_handshake_header(parsable)

        return TlsHandshakeEncryptedExtensions(
            parser['payload'],
        ), parser.parsed_length

    def compose(self):
        payload_composer = ComposerBinary()
        payload_composer.compose_raw(self.extension_data)

        header_bytes = self._compose_header(payload_composer.composed_length)

        return header_bytes + payload_composer.composed_bytes


class SslMessageBase(ParsableBase):
    @classmethod
    def get_message_type(cls):
@@ -1099,7 +1218,8 @@ class TlsHandshakeMessageVariant(VariantParsable):
    _VARIANTS = collections.OrderedDict([
        (TlsHandshakeType.CLIENT_HELLO, [TlsHandshakeClientHello, ]),
        (TlsHandshakeType.SERVER_HELLO, [TlsHandshakeServerHello, ]),
        (TlsHandshakeType.CERTIFICATE, [TlsHandshakeCertificate, ]),
        (TlsHandshakeType.ENCRYPTED_EXTENSIONS, [TlsHandshakeEncryptedExtensions, ]),
        (TlsHandshakeType.CERTIFICATE, [TlsHandshakeCertificate, TlsHandshakeServerCertificate, ]),
        (TlsHandshakeType.SERVER_KEY_EXCHANGE, [TlsHandshakeServerKeyExchange, ]),
        (TlsHandshakeType.CERTIFICATE_REQUEST, [TlsHandshakeCertificateRequest, ]),
        (TlsHandshakeType.CERTIFICATE_STATUS, [TlsHandshakeCertificateStatus, ]),
@@ -1117,7 +1237,6 @@ class TlsSubprotocolMessageParser(SubprotocolParser):
        TlsContentType.CHANGE_CIPHER_SPEC: TlsChangeCipherSpecMessage,
        TlsContentType.ALERT: TlsAlertMessage,
        TlsContentType.HANDSHAKE: TlsHandshakeMessageVariant,
        TlsContentType.APPLICATION_DATA: TlsApplicationDataMessage,
    }

    @classmethod
Compare a294e644 to 1a57b0ed
Original line number Diff line number Diff line
Subproject commit a294e644377dd9378c274991834cf4ebf0f4a828
Subproject commit 1a57b0ed6a09e35f1c05a6c01fab0b7f14164112
+102 −3
Original line number Diff line number Diff line
@@ -38,6 +38,8 @@ from cryptoparser.tls.subprotocol import (
    SslMessageType,
    TlsAlertMessage,
    TlsCertificate,
    TlsCertificateEntry,
    TlsCertificateEntryVector,
    TlsCertificateStatusType,
    TlsCertificates,
    TlsClientCertificateType,
@@ -50,9 +52,11 @@ from cryptoparser.tls.subprotocol import (
    TlsExtensionsClient,
    TlsExtensionsServer,
    TlsHandshakeCertificate,
    TlsHandshakeServerCertificate,
    TlsHandshakeCertificateRequest,
    TlsHandshakeCertificateStatus,
    TlsHandshakeClientHello,
    TlsHandshakeEncryptedExtensions,
    TlsHandshakeHelloRandom,
    TlsHandshakeHelloRandomBytes,
    TlsHandshakeHelloRetryRequest,
@@ -535,7 +539,7 @@ class TestTlsHandshakeHelloRetryRequest(unittest.TestCase):
        )


class TestTlsHandshakeCertificate(unittest.TestCase):
class TestTlsHandshakeServerCertificate(unittest.TestCase):
    def setUp(self):
        self.certificate_minimal_dict = collections.OrderedDict([
            ('handshake_type', b'\x0b'),                  # CERTIFICATE
@@ -548,7 +552,7 @@ class TestTlsHandshakeCertificate(unittest.TestCase):
        ])
        self.certificate_minimal_bytes = b''.join(self.certificate_minimal_dict.values())

        self.certificate_minimal = TlsHandshakeCertificate(
        self.certificate_minimal = TlsHandshakeServerCertificate(
            TlsCertificates([
                TlsCertificate(b'peer certificate'),
                TlsCertificate(b'intermediate certificate'),
@@ -556,7 +560,7 @@ class TestTlsHandshakeCertificate(unittest.TestCase):
        )

    def test_parse(self):
        certificate_minimal = TlsHandshakeCertificate.parse_exact_size(self.certificate_minimal_bytes)
        certificate_minimal = TlsHandshakeServerCertificate.parse_exact_size(self.certificate_minimal_bytes)

        self.assertEqual(
            certificate_minimal.certificate_chain,
@@ -570,6 +574,76 @@ class TestTlsHandshakeCertificate(unittest.TestCase):
        )


class TestTlsHandshakeCertificate(unittest.TestCase):
    def setUp(self):
        self.certificate_minimal_dict = collections.OrderedDict([
            ('handshake_type', b'\x0b'),                  # CERTIFICATE
            ('length', b'\x00\x00\x1a'),
            ('certificate_request_context_length', b'\x01'),
            ('certificate_request_context', b'\xaa'),
            ('certificate_entries_length', b'\x00\x00\x15'),
            ('entry_certificate_length', b'\x00\x00\x10'),
            ('entry_certificate', b'first-cert-bytes'),
            ('entry_extensions_length', b'\x00\x00'),
        ])
        self.certificate_minimal_bytes = b''.join(self.certificate_minimal_dict.values())

        self.certificate_minimal = TlsHandshakeCertificate(
            certificate_request_context=b'\xaa',
            certificate_entries=TlsCertificateEntryVector([
                TlsCertificateEntry(
                    TlsCertificate(b'first-cert-bytes'),
                    b'',
                )
            ]),
        )

    def test_parse(self):
        certificate = TlsHandshakeCertificate.parse_exact_size(self.certificate_minimal_bytes)
        self.assertEqual(
            certificate.certificate_request_context,
            self.certificate_minimal.certificate_request_context
        )
        self.assertEqual(
            len(certificate.certificate_entries),
            len(self.certificate_minimal.certificate_entries)
        )

    def test_compose(self):
        self.assertEqual(
            self.certificate_minimal.compose(),
            self.certificate_minimal_bytes
        )

    def test_error_certificate_request_context_too_long(self):
        with self.assertRaises(InvalidValue) as context_manager:
            TlsHandshakeCertificate(
                certificate_request_context=b'\x00' * 256,
                certificate_entries=TlsCertificateEntryVector([]),
            )
        self.assertEqual(context_manager.exception.value, 256)

    def test_error_entry_extensions_too_long(self):
        with self.assertRaises(InvalidValue) as context_manager:
            TlsCertificateEntry(
                TlsCertificate(b'cert'),
                b'\x00' * (2 ** 16),
            )
        self.assertEqual(context_manager.exception.value, 2 ** 16)

    def test_error_parse_invalid_entries(self):
        bad_dict = collections.OrderedDict([
            ('handshake_type', b'\x0b'),                  # CERTIFICATE
            ('length', b'\x00\x00\x05'),
            ('certificate_request_context_length', b'\x01'),
            ('certificate_request_context', b'\xaa'),
            ('certificate_entries_length', b'\x00\x00\x0f'),  # claims more data than available
        ])
        bad_bytes = b''.join(bad_dict.values())
        with self.assertRaises(InvalidType):
            TlsHandshakeCertificate.parse_exact_size(bad_bytes)


class TestTlsHandshakeCertificateRequestTls10(unittest.TestCase):
    def setUp(self):
        self.certificate_request_dict = collections.OrderedDict([
@@ -759,6 +833,31 @@ class TestTlsHandshakeCertificateStatus(unittest.TestCase):
        self.assertEqual(self.certificate_status.compose(), self.certificate_status_bytes)


class TestTlsHandshakeEncryptedExtensions(unittest.TestCase):
    def setUp(self):
        self.extension_data = b'\x00\x10\x00\x0b\x00\x0c\x00\x00'
        self.encrypted_extensions_dict = collections.OrderedDict([
            ('handshake_type', b'\x08'),                  # ENCRYPTED_EXTENSIONS
            ('length', b'\x00\x00\x08'),
            ('extension_data', self.extension_data),
        ])
        self.encrypted_extensions_bytes = b''.join(self.encrypted_extensions_dict.values())

        self.encrypted_extensions = TlsHandshakeEncryptedExtensions(self.extension_data)

    def test_parse(self):
        encrypted_extensions = TlsHandshakeEncryptedExtensions.parse_exact_size(self.encrypted_extensions_bytes)

        self.assertEqual(encrypted_extensions.get_handshake_type(), TlsHandshakeType.ENCRYPTED_EXTENSIONS)
        self.assertEqual(encrypted_extensions.extension_data, self.extension_data)

    def test_compose(self):
        self.assertEqual(
            self.encrypted_extensions.compose(),
            self.encrypted_extensions_bytes
        )


class TestSslHandshakeClientHello(unittest.TestCase):
    def setUp(self):
        self.client_hello_dict = collections.OrderedDict([