diff --git a/octodns/record/__init__.py b/octodns/record/__init__.py index ce73752..ddb8b92 100644 --- a/octodns/record/__init__.py +++ b/octodns/record/__init__.py @@ -1528,9 +1528,11 @@ class TlsaValue(EqualityTupleMixin): try: certificate_usage = int(value.get('certificate_usage', 0)) if certificate_usage < 0 or certificate_usage > 3: - reasons.append(f'invalid certificate_usage "{certificate_usage}"') + reasons.append(f'invalid certificate_usage ' + f'"{certificate_usage}"') except ValueError: - reasons.append(f'invalid certificate_usage "{value["certificate_usage"]}"') + reasons.append(f'invalid certificate_usage ' + f'"{value["certificate_usage"]}"') try: selector = int(value.get('selector', 0)) @@ -1544,8 +1546,15 @@ class TlsaValue(EqualityTupleMixin): if matching_type < 0 or matching_type > 2: reasons.append(f'invalid matching_type "{matching_type}"') except ValueError: - reasons.append(f'invalid matching_type "{value["matching_type"]}"') - + reasons.append(f'invalid matching_type ' + f'"{value["matching_type"]}"') + + if 'certificate_usage' not in value: + reasons.append('missing certificate_usage') + if 'selector' not in value: + reasons.append('missing selector') + if 'matching_type' not in value: + reasons.append('missing matching_type') if 'certificate_association_data' not in value: reasons.append('missing certificate_association_data') return reasons @@ -1558,7 +1567,8 @@ class TlsaValue(EqualityTupleMixin): self.certificate_usage = int(value.get('certificate_usage', 0)) self.selector = int(value.get('selector', 0)) self.matching_type = int(value.get('matching_type', 0)) - self.certificate_association_data = value['certificate_association_data'] + self.certificate_association_data = \ + value['certificate_association_data'] @property def data(self): @@ -1570,18 +1580,22 @@ class TlsaValue(EqualityTupleMixin): } def _equality_tuple(self): - return (self.certificate_usage, self.selector, self.matching_type, self.certificate_association_data) + return (self.certificate_usage, self.selector, self.matching_type, + self.certificate_association_data) def __repr__(self): - return f'{self.certificate_usage} {self.selector} {self.matching_type}"{self.certificate_association_data}"' + return f"'{self.certificate_usage} {self.selector} '" \ + f"'{self.matching_type} {self.certificate_association_data}'" class TlsaRecord(ValuesMixin, Record): _type = 'TLSA' _value_type = TlsaValue + Record.register_type(TlsaRecord) + class _TxtValue(_ChunkedValue): pass diff --git a/tests/test_octodns_record.py b/tests/test_octodns_record.py index fd3f70f..dbd2666 100644 --- a/tests/test_octodns_record.py +++ b/tests/test_octodns_record.py @@ -11,9 +11,9 @@ from octodns.record import ARecord, AaaaRecord, AliasRecord, CaaRecord, \ CaaValue, CnameRecord, DnameRecord, Create, Delete, GeoValue, LocRecord, \ LocValue, MxRecord, MxValue, NaptrRecord, NaptrValue, NsRecord, \ PtrRecord, Record, RecordException, SshfpRecord, SshfpValue, SpfRecord, \ - SrvRecord, SrvValue, TxtRecord, Update, UrlfwdRecord, UrlfwdValue, \ - ValidationError, _Dynamic, _DynamicPool, _DynamicRule, _NsValue, \ - ValuesMixin + SrvRecord, SrvValue, TlsaRecord, TxtRecord, Update, UrlfwdRecord, \ + UrlfwdValue, ValidationError, _Dynamic, _DynamicPool, _DynamicRule, \ + _NsValue, ValuesMixin from octodns.zone import Zone from helpers import DynamicProvider, GeoProvider, SimpleProvider @@ -907,6 +907,80 @@ class TestRecord(TestCase): # __repr__ doesn't blow up a.__repr__() + def test_tlsa(self): + a_values = [{ + 'certificate_usage': 1, + 'selector': 1, + 'matching_type': 1, + 'certificate_association_data': 'ABABABABABABABABAB', + }, { + 'certificate_usage': 2, + 'selector': 0, + 'matching_type': 2, + 'certificate_association_data': 'ABABABABABABABABAC', + }] + a_data = {'ttl': 30, 'values': a_values} + a = TlsaRecord(self.zone, 'a', a_data) + self.assertEqual('a.unit.tests.', a.fqdn) + self.assertEqual('a', a.name) + self.assertEqual(30, a.ttl) + self.assertEqual(a_values[0]['certificate_usage'], a.values[0].certificate_usage) + self.assertEqual(a_values[0]['selector'], a.values[0].selector) + self.assertEqual(a_values[0]['matching_type'], a.values[0].matching_type) + self.assertEqual(a_values[0]['certificate_association_data'], a.values[0].certificate_association_data) + + self.assertEqual(a_values[1]['certificate_usage'], a.values[1].certificate_usage) + self.assertEqual(a_values[1]['selector'], a.values[1].selector) + self.assertEqual(a_values[1]['matching_type'], a.values[1].matching_type) + self.assertEqual(a_values[1]['certificate_association_data'], a.values[1].certificate_association_data) + self.assertEqual(a_data, a.data) + + b_value = { + 'certificate_usage': 0, + 'selector': 0, + 'matching_type': 0, + 'certificate_association_data': 'AAAAAAAAAAAAAAA', + } + b_data = {'ttl': 30, 'value': b_value} + b = TlsaRecord(self.zone, 'b', b_data) + self.assertEqual(b_value['certificate_usage'], b.values[0].certificate_usage) + self.assertEqual(b_value['selector'], b.values[0].selector) + self.assertEqual(b_value['matching_type'], b.values[0].matching_type) + self.assertEqual(b_value['certificate_association_data'], b.values[0].certificate_association_data) + self.assertEqual(b_data, b.data) + + target = SimpleProvider() + # No changes with self + self.assertFalse(a.changes(a, target)) + # Diff in certificate_usage causes change + other = TlsaRecord(self.zone, 'a', {'ttl': 30, 'values': a_values}) + other.values[0].certificate_usage = 0 + change = a.changes(other, target) + self.assertEqual(change.existing, a) + self.assertEqual(change.new, other) + # Diff in selector causes change + other = TlsaRecord(self.zone, 'a', {'ttl': 30, 'values': a_values}) + other.values[0].selector = 0 + change = a.changes(other, target) + self.assertEqual(change.existing, a) + self.assertEqual(change.new, other) + # Diff in matching_type causes change + other = TlsaRecord(self.zone, 'a', {'ttl': 30, 'values': a_values}) + other.values[0].matching_type = 0 + change = a.changes(other, target) + self.assertEqual(change.existing, a) + self.assertEqual(change.new, other) + # Diff in certificate_association_data causes change + other = TlsaRecord(self.zone, 'a', {'ttl': 30, 'values': a_values}) + other.values[0].certificate_association_data = 'AAAAAAAAAAAAA' + change = a.changes(other, target) + self.assertEqual(change.existing, a) + self.assertEqual(change.new, other) + + # __repr__ doesn't blow up + a.__repr__() + + def test_txt(self): a_values = ['a one', 'a two'] b_value = 'b other' @@ -3187,6 +3261,171 @@ class TestRecordValidation(TestCase): self.assertEqual(['Invalid SRV target "100 foo.bar.com." is not a ' 'valid FQDN.'], ctx.exception.reasons) + def test_TLSA(self): + # doesn't blow up + Record.new(self.zone, '', { + 'type': 'TLSA', + 'ttl': 600, + 'value' : { + 'certificate_usage' : 0, + 'selector' : 0, + 'matching_type' : 0, + 'certificate_association_data' : 'AAAAAAAAAAAAA' + } + }) + + # missing certificate_association_data + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'TLSA', + 'ttl': 600, + 'value' : { + 'certificate_usage' : 0, + 'selector' : 0, + 'matching_type' : 0 + } + }) + self.assertEqual(['missing certificate_association_data'], + ctx.exception.reasons) + + # missing certificate_usage + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'TLSA', + 'ttl': 600, + 'value' : { + 'selector' : 0, + 'matching_type' : 0, + 'certificate_association_data' : 'AAAAAAAAAAAAA' + } + }) + self.assertEqual(['missing certificate_usage'], + ctx.exception.reasons) + + # False certificate_usage + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'TLSA', + 'ttl': 600, + 'value' : { + 'certificate_usage' : 4, + 'selector' : 0, + 'matching_type' : 0, + 'certificate_association_data' : 'AAAAAAAAAAAAA' + } + }) + self.assertEqual('invalid certificate_usage ' + '"{value["certificate_usage"]}"', + ctx.exception.reasons) + + # Invalid certificate_usage + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'TLSA', + 'ttl': 600, + 'value' : { + 'certificate_usage' : 'XYZ', + 'selector' : 0, + 'matching_type' : 0, + 'certificate_association_data' : 'AAAAAAAAAAAAA' + } + }) + self.assertEqual('invalid certificate_usage ' + '"{value["certificate_usage"]}"', + ctx.exception.reasons) + + # missing selector + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'TLSA', + 'ttl': 600, + 'value' : { + 'certificate_usage' : 0, + 'matching_type' : 0, + 'certificate_association_data' : 'AAAAAAAAAAAAA' + } + }) + self.assertEqual(['missing selector'], + ctx.exception.reasons) + + # False selector + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'TLSA', + 'ttl': 600, + 'value' : { + 'certificate_usage' : 0, + 'selector' : 4, + 'matching_type' : 0, + 'certificate_association_data' : 'AAAAAAAAAAAAA' + } + }) + self.assertEqual('invalid selector ' + '"{value["selector"]}"', + ctx.exception.reasons) + + # Invalid selector + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'TLSA', + 'ttl': 600, + 'value' : { + 'certificate_usage' : 0, + 'selector' : 'XYZ', + 'matching_type' : 0, + 'certificate_association_data' : 'AAAAAAAAAAAAA' + } + }) + self.assertEqual('invalid selector ' + '"{value["selector"]}"', + ctx.exception.reasons) + + # missing matching_type + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'TLSA', + 'ttl': 600, + 'value' : { + 'certificate_usage' : 0, + 'selector' : 0, + 'certificate_association_data' : 'AAAAAAAAAAAAA' + } + }) + self.assertEqual(['missing matching_type'], + ctx.exception.reasons) + + # False matching_type + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'TLSA', + 'ttl': 600, + 'value' : { + 'certificate_usage' : 0, + 'selector' : 1, + 'matching_type' : 3, + 'certificate_association_data' : 'AAAAAAAAAAAAA' + } + }) + self.assertEqual('invalid matching_type ' + '"{value["matching_type"]}"', + ctx.exception.reasons) + + # Invalid matching_type + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'TLSA', + 'ttl': 600, + 'value' : { + 'certificate_usage' : 0, + 'selector' : 1, + 'matching_type' : 'XYZ', + 'certificate_association_data' : 'AAAAAAAAAAAAA' + } + }) + self.assertEqual('invalid matching_type ' + '"{value["matching_type"]}"', + ctx.exception.reasons) + def test_TXT(self): # doesn't blow up (name & zone here don't make any sense, but not # important)