diff --git a/docs/records.md b/docs/records.md index 182a109..30d689a 100644 --- a/docs/records.md +++ b/docs/records.md @@ -18,6 +18,7 @@ OctoDNS supports the following record types: * `SPF` * `SRV` * `SSHFP` +* `TLSA` * `TXT` * `URLFWD` diff --git a/octodns/record/__init__.py b/octodns/record/__init__.py index 6fe3f90..ddb8b92 100644 --- a/octodns/record/__init__.py +++ b/octodns/record/__init__.py @@ -1517,6 +1517,85 @@ class SrvRecord(ValuesMixin, Record): Record.register_type(SrvRecord) +class TlsaValue(EqualityTupleMixin): + + @classmethod + def validate(cls, data, _type): + if not isinstance(data, (list, tuple)): + data = (data,) + reasons = [] + for value in data: + try: + certificate_usage = int(value.get('certificate_usage', 0)) + if certificate_usage < 0 or certificate_usage > 3: + reasons.append(f'invalid certificate_usage ' + f'"{certificate_usage}"') + except ValueError: + reasons.append(f'invalid certificate_usage ' + f'"{value["certificate_usage"]}"') + + try: + selector = int(value.get('selector', 0)) + if selector < 0 or selector > 1: + reasons.append(f'invalid selector "{selector}"') + except ValueError: + reasons.append(f'invalid selector "{value["selector"]}"') + + try: + matching_type = int(value.get('matching_type', 0)) + if matching_type < 0 or matching_type > 2: + reasons.append(f'invalid matching_type "{matching_type}"') + except ValueError: + reasons.append(f'invalid matching_type ' + f'"{value["matching_type"]}"') + + if 'certificate_usage' not in value: + reasons.append('missing certificate_usage') + if 'selector' not in value: + reasons.append('missing selector') + if 'matching_type' not in value: + reasons.append('missing matching_type') + if 'certificate_association_data' not in value: + reasons.append('missing certificate_association_data') + return reasons + + @classmethod + def process(cls, values): + return [TlsaValue(v) for v in values] + + def __init__(self, value): + self.certificate_usage = int(value.get('certificate_usage', 0)) + self.selector = int(value.get('selector', 0)) + self.matching_type = int(value.get('matching_type', 0)) + self.certificate_association_data = \ + value['certificate_association_data'] + + @property + def data(self): + return { + 'certificate_usage': self.certificate_usage, + 'selector': self.selector, + 'matching_type': self.matching_type, + 'certificate_association_data': self.certificate_association_data, + } + + def _equality_tuple(self): + return (self.certificate_usage, self.selector, self.matching_type, + self.certificate_association_data) + + def __repr__(self): + return f"'{self.certificate_usage} {self.selector} '" \ + f"'{self.matching_type} {self.certificate_association_data}'" + + +class TlsaRecord(ValuesMixin, Record): + _type = 'TLSA' + _value_type = TlsaValue + + +Record.register_type(TlsaRecord) + + class _TxtValue(_ChunkedValue): pass diff --git a/tests/test_octodns_record.py b/tests/test_octodns_record.py index fd3f70f..a406d96 100644 --- a/tests/test_octodns_record.py +++ b/tests/test_octodns_record.py @@ -11,9 +11,9 @@ from octodns.record import ARecord, AaaaRecord, AliasRecord, CaaRecord, \ CaaValue, CnameRecord, DnameRecord, Create, Delete, GeoValue, LocRecord, \ LocValue, MxRecord, MxValue, NaptrRecord, NaptrValue, NsRecord, \ PtrRecord, Record, RecordException, SshfpRecord, SshfpValue, SpfRecord, \ - SrvRecord, SrvValue, TxtRecord, Update, UrlfwdRecord, UrlfwdValue, \ - ValidationError, _Dynamic, _DynamicPool, _DynamicRule, _NsValue, \ - ValuesMixin + SrvRecord, SrvValue, TlsaRecord, TxtRecord, Update, UrlfwdRecord, \ + UrlfwdValue, ValidationError, _Dynamic, _DynamicPool, _DynamicRule, \ + _NsValue, ValuesMixin from octodns.zone import Zone from helpers import DynamicProvider, GeoProvider, SimpleProvider @@ -907,6 +907,89 @@ class TestRecord(TestCase): # __repr__ doesn't blow up a.__repr__() + def test_tlsa(self): + a_values = [{ + 'certificate_usage': 1, + 'selector': 1, + 'matching_type': 1, + 'certificate_association_data': 'ABABABABABABABABAB', + }, { + 'certificate_usage': 2, + 'selector': 0, + 'matching_type': 2, + 'certificate_association_data': 'ABABABABABABABABAC', + }] + a_data = {'ttl': 30, 'values': a_values} + a = TlsaRecord(self.zone, 'a', a_data) + self.assertEqual('a.unit.tests.', a.fqdn) + self.assertEqual('a', a.name) + self.assertEqual(30, a.ttl) + self.assertEqual(a_values[0]['certificate_usage'], + a.values[0].certificate_usage) + self.assertEqual(a_values[0]['selector'], a.values[0].selector) + self.assertEqual(a_values[0]['matching_type'], + a.values[0].matching_type) + self.assertEqual(a_values[0]['certificate_association_data'], + a.values[0].certificate_association_data) + + self.assertEqual(a_values[1]['certificate_usage'], + a.values[1].certificate_usage) + self.assertEqual(a_values[1]['selector'], + a.values[1].selector) + self.assertEqual(a_values[1]['matching_type'], + a.values[1].matching_type) + self.assertEqual(a_values[1]['certificate_association_data'], + a.values[1].certificate_association_data) + self.assertEqual(a_data, a.data) + + b_value = { + 'certificate_usage': 0, + 'selector': 0, + 'matching_type': 0, + 'certificate_association_data': 'AAAAAAAAAAAAAAA', + } + b_data = {'ttl': 30, 'value': b_value} + b = TlsaRecord(self.zone, 'b', b_data) + self.assertEqual(b_value['certificate_usage'], + b.values[0].certificate_usage) + self.assertEqual(b_value['selector'], b.values[0].selector) + self.assertEqual(b_value['matching_type'], + b.values[0].matching_type) + self.assertEqual(b_value['certificate_association_data'], + b.values[0].certificate_association_data) + self.assertEqual(b_data, b.data) + + target = SimpleProvider() + # No changes with self + self.assertFalse(a.changes(a, target)) + # Diff in certificate_usage causes change + other = TlsaRecord(self.zone, 'a', {'ttl': 30, 'values': a_values}) + other.values[0].certificate_usage = 0 + change = a.changes(other, target) + self.assertEqual(change.existing, a) + self.assertEqual(change.new, other) + # Diff in selector causes change + other = TlsaRecord(self.zone, 'a', {'ttl': 30, 'values': a_values}) + other.values[0].selector = 0 + change = a.changes(other, target) + self.assertEqual(change.existing, a) + self.assertEqual(change.new, other) + # Diff in matching_type causes change + other = TlsaRecord(self.zone, 'a', {'ttl': 30, 'values': a_values}) + other.values[0].matching_type = 0 + change = a.changes(other, target) + self.assertEqual(change.existing, a) + self.assertEqual(change.new, other) + # Diff in certificate_association_data causes change + other = TlsaRecord(self.zone, 'a', {'ttl': 30, 'values': a_values}) + other.values[0].certificate_association_data = 'AAAAAAAAAAAAA' + change = a.changes(other, target) + self.assertEqual(change.existing, a) + self.assertEqual(change.new, other) + + # __repr__ doesn't blow up + a.__repr__() + def test_txt(self): a_values = ['a one', 'a two'] b_value = 'b other' @@ -3187,6 +3270,190 @@ class TestRecordValidation(TestCase): self.assertEqual(['Invalid SRV target "100 foo.bar.com." is not a ' 'valid FQDN.'], ctx.exception.reasons) + def test_TLSA(self): + # doesn't blow up + Record.new(self.zone, '', { + 'type': 'TLSA', + 'ttl': 600, + 'value': { + 'certificate_usage': 0, + 'selector': 0, + 'matching_type': 0, + 'certificate_association_data': 'AAAAAAAAAAAAA' + } + }) + # Multi value, second missing certificate usage + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'TLSA', + 'ttl': 600, + 'values': [{ + 'certificate_usage': 0, + 'selector': 0, + 'matching_type': 0, + 'certificate_association_data': 'AAAAAAAAAAAAA' + }, { + 'selector': 0, + 'matching_type': 0, + 'certificate_association_data': 'AAAAAAAAAAAAA' + } + ] + }) + self.assertEqual(['missing certificate_usage'], + ctx.exception.reasons) + + # missing certificate_association_data + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'TLSA', + 'ttl': 600, + 'value': { + 'certificate_usage': 0, + 'selector': 0, + 'matching_type': 0 + } + }) + self.assertEqual(['missing certificate_association_data'], + ctx.exception.reasons) + + # missing certificate_usage + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'TLSA', + 'ttl': 600, + 'value': { + 'selector': 0, + 'matching_type': 0, + 'certificate_association_data': 'AAAAAAAAAAAAA' + } + }) + self.assertEqual(['missing certificate_usage'], + ctx.exception.reasons) + + # False certificate_usage + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'TLSA', + 'ttl': 600, + 'value': { + 'certificate_usage': 4, + 'selector': 0, + 'matching_type': 0, + 'certificate_association_data': 'AAAAAAAAAAAAA' + } + }) + self.assertEqual('invalid certificate_usage ' + '"{value["certificate_usage"]}"', + ctx.exception.reasons) + + # Invalid certificate_usage + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'TLSA', + 'ttl': 600, + 'value': { + 'certificate_usage': 'XYZ', + 'selector': 0, + 'matching_type': 0, + 'certificate_association_data': 'AAAAAAAAAAAAA' + } + }) + self.assertEqual('invalid certificate_usage ' + '"{value["certificate_usage"]}"', + ctx.exception.reasons) + + # missing selector + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'TLSA', + 'ttl': 600, + 'value': { + 'certificate_usage': 0, + 'matching_type': 0, + 'certificate_association_data': 'AAAAAAAAAAAAA' + } + }) + self.assertEqual(['missing selector'], + ctx.exception.reasons) + + # False selector + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'TLSA', + 'ttl': 600, + 'value': { + 'certificate_usage': 0, + 'selector': 4, + 'matching_type': 0, + 'certificate_association_data': 'AAAAAAAAAAAAA' + } + }) + self.assertEqual('invalid selector ' + '"{value["selector"]}"', + ctx.exception.reasons) + + # Invalid selector + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'TLSA', + 'ttl': 600, + 'value': { + 'certificate_usage': 0, + 'selector': 'XYZ', + 'matching_type': 0, + 'certificate_association_data': 'AAAAAAAAAAAAA' + } + }) + self.assertEqual('invalid selector ' + '"{value["selector"]}"', + ctx.exception.reasons) + + # missing matching_type + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'TLSA', + 'ttl': 600, + 'value': { + 'certificate_usage': 0, + 'selector': 0, + 'certificate_association_data': 'AAAAAAAAAAAAA' + } + }) + self.assertEqual(['missing matching_type'], + ctx.exception.reasons) + + # False matching_type + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'TLSA', + 'ttl': 600, + 'value': { + 'certificate_usage': 0, + 'selector': 1, + 'matching_type': 3, + 'certificate_association_data': 'AAAAAAAAAAAAA' + } + }) + self.assertEqual('invalid matching_type ' + '"{value["matching_type"]}"', + ctx.exception.reasons) + + # Invalid matching_type + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'TLSA', + 'ttl': 600, + 'value': { + 'certificate_usage': 0, + 'selector': 1, + 'matching_type': 'XYZ', + 'certificate_association_data': 'AAAAAAAAAAAAA' + } + }) + self.assertEqual('invalid matching_type ' + '"{value["matching_type"]}"', + ctx.exception.reasons) + def test_TXT(self): # doesn't blow up (name & zone here don't make any sense, but not # important)