Commit bae32091 authored by Szilárd Pfeiffer's avatar Szilárd Pfeiffer
Browse files

Merge branch '98-dns-record-sshfp-parsing'

Closes: #98
parents c7df8c45 66b5fe48
Loading
Loading
Loading
Loading
Loading
+51 −1
Original line number Diff line number Diff line
@@ -18,7 +18,7 @@ from cryptodatahub.common.key import (
    PublicKeyParamsRsa,
)

from cryptodatahub.dnsrec.algorithm import DnsRrType, DnsSecAlgorithm, DnsSecDigestType
from cryptodatahub.dnsrec.algorithm import DnsRrType, DnsSecAlgorithm, DnsSecDigestType, SshFpAlgorithm, SshFpFingerprintType

from cryptoparser.common.base import NumericRangeParsableBase, OneByteEnumParsable, Serializable, TwoByteEnumParsable
from cryptoparser.common.exception import NotEnoughData
@@ -478,6 +478,56 @@ class DnsRecordMx(ParsableBase):
        return composer.composed_bytes


class SshFpAlgorithmFactory(OneByteEnumParsable):
    @classmethod
    def get_enum_class(cls):
        return SshFpAlgorithm

    @abc.abstractmethod
    def compose(self):
        raise NotImplementedError()


class SshFpFingerprintTypeFactory(OneByteEnumParsable):
    @classmethod
    def get_enum_class(cls):
        return SshFpFingerprintType

    @abc.abstractmethod
    def compose(self):
        raise NotImplementedError()


@attr.s
class DnsRecordSshfp(ParsableBase, Serializable):
    HEADER_SIZE = 2

    algorithm = attr.ib(validator=attr.validators.instance_of(SshFpAlgorithm))
    fingerprint_type = attr.ib(validator=attr.validators.instance_of(SshFpFingerprintType))
    fingerprint = attr.ib(validator=attr.validators.instance_of((bytes, bytearray)))

    @classmethod
    def _parse(cls, parsable):
        if len(parsable) < cls.HEADER_SIZE:
            raise NotEnoughData(cls.HEADER_SIZE - len(parsable))

        parser = ParserBinary(parsable)
        parser.parse_parsable('algorithm', SshFpAlgorithmFactory)
        parser.parse_parsable('fingerprint_type', SshFpFingerprintTypeFactory)
        parser.parse_raw('fingerprint', parser.unparsed_length)

        return cls(**parser), parser.parsed_length

    def compose(self):
        composer = ComposerBinary()

        composer.compose_numeric_enum_coded(self.algorithm)
        composer.compose_numeric_enum_coded(self.fingerprint_type)
        composer.compose_raw(self.fingerprint)

        return composer.composed_bytes


@attr.s
class DnsRecordTxt(ParsableBase):
    HEADER_SIZE = 1
Compare bb7a7a40 to d75f40a7
Original line number Diff line number Diff line
Subproject commit bb7a7a408ed4c0cfb42a5788007bddd601f74341
Subproject commit d75f40a72b81f9acc8589483ee7f3f8968bffc92
+50 −1
Original line number Diff line number Diff line
@@ -16,7 +16,7 @@ from cryptodatahub.common.key import (
    PublicKeyParamsRsa,
)

from cryptodatahub.dnsrec.algorithm import DnsSecAlgorithm, DnsSecDigestType, DnsRrType
from cryptodatahub.dnsrec.algorithm import DnsSecAlgorithm, DnsSecDigestType, DnsRrType, SshFpAlgorithm, SshFpFingerprintType

from cryptoparser.common.exception import NotEnoughData
from cryptoparser.dnsrec.record import (
@@ -25,6 +25,7 @@ from cryptoparser.dnsrec.record import (
    DnsRecordDs,
    DnsRecordMx,
    DnsRecordRrsig,
    DnsRecordSshfp,
    DnsRecordTxt,
    DnsRrTypePrivate,
    DnsSecFlag,
@@ -549,3 +550,51 @@ class TestDnsRecordTxt(unittest.TestCase):

    def test_compose(self):
        self.assertEqual(self.record_single.compose(), self.record_bytes_single)


class TestDnsRecordSshfp(unittest.TestCase):
    def setUp(self):
        self.record_rsa_sha1_bytes = bytes(
            b'\x01' +          # algorithm: RSA
            b'\x01' +          # fingerprint type: SHA-1
            b'\xde\xad\xbe\xef' * 5  # fingerprint: 20 bytes
        )
        self.record_rsa_sha1 = DnsRecordSshfp(
            algorithm=SshFpAlgorithm.RSA,
            fingerprint_type=SshFpFingerprintType.SHA1,
            fingerprint=b'\xde\xad\xbe\xef' * 5,
        )

        self.record_ecdsa_sha256_bytes = bytes(
            b'\x03' +          # algorithm: ECDSA
            b'\x02' +          # fingerprint type: SHA-256
            b'\xab\xcd\xef\x01' * 8  # fingerprint: 32 bytes
        )
        self.record_ecdsa_sha256 = DnsRecordSshfp(
            algorithm=SshFpAlgorithm.ECDSA,
            fingerprint_type=SshFpFingerprintType.SHA2_256,
            fingerprint=b'\xab\xcd\xef\x01' * 8,
        )

    def test_error_not_enough_data(self):
        with self.assertRaises(NotEnoughData) as context_manager:
            DnsRecordSshfp.parse_exact_size(b'\x01')

        self.assertEqual(
            context_manager.exception.bytes_needed,
            DnsRecordSshfp.HEADER_SIZE - 1
        )

    def test_parse(self):
        self.assertEqual(
            DnsRecordSshfp.parse_exact_size(self.record_rsa_sha1_bytes),
            self.record_rsa_sha1
        )
        self.assertEqual(
            DnsRecordSshfp.parse_exact_size(self.record_ecdsa_sha256_bytes),
            self.record_ecdsa_sha256
        )

    def test_compose(self):
        self.assertEqual(self.record_rsa_sha1.compose(), self.record_rsa_sha1_bytes)
        self.assertEqual(self.record_ecdsa_sha256.compose(), self.record_ecdsa_sha256_bytes)