diff --git a/octodns/record/__init__.py b/octodns/record/__init__.py index 86e2ef5..b2f07be 100644 --- a/octodns/record/__init__.py +++ b/octodns/record/__init__.py @@ -74,7 +74,8 @@ class RecordException(Exception): class RrParseError(RecordException): - pass + def __init__(self, message='failed to parse string value as RR text'): + super().__init__(message) class ValidationError(RecordException): @@ -775,8 +776,13 @@ class _DynamicMixin(object): class _TargetValue(str): + @classmethod + def parse_rr_text(self, value): + return value + @classmethod def validate(cls, data, _type): + # no need to call parse_rr_text since it's a noop reasons = [] if data == '': reasons.append('empty value') @@ -792,10 +798,15 @@ class _TargetValue(str): @classmethod def process(cls, value): + # no need to call parse_rr_text since it's a noop if value: return cls(value.lower()) return None + @property + def rr_text(self): + return self + class CnameValue(_TargetValue): pass @@ -806,6 +817,10 @@ class DnameValue(_TargetValue): class _IpAddress(str): + @classmethod + def parse_rr_text(cls, value): + return value + @classmethod def validate(cls, data, _type): if not isinstance(data, (list, tuple)): @@ -814,6 +829,7 @@ class _IpAddress(str): return ['missing value(s)'] reasons = [] for value in data: + # no need to call parse_rr_text here as it's a noop if value == '': reasons.append('empty value') elif value is None: @@ -836,9 +852,14 @@ class _IpAddress(str): return [cls(v) if v != '' else '' for v in values] def __new__(cls, v): + # no need to call parse_rr_text here as it's a noop v = str(cls._address_type(v)) return super().__new__(cls, v) + @property + def rr_text(self): + return self + class Ipv4Address(_IpAddress): _address_type = _IPv4Address @@ -889,12 +910,32 @@ Record.register_type(AliasRecord) class CaaValue(EqualityTupleMixin, dict): # https://tools.ietf.org/html/rfc6844#page-5 + @classmethod + def parse_rr_text(cls, value): + try: + flags, tag, value = value.split(' ') + except ValueError: + raise RrParseError() + try: + flags = int(flags) + except ValueError: + pass + return {'flags': flags, 'tag': tag, 'value': value} + @classmethod def validate(cls, data, _type): if not isinstance(data, (list, tuple)): data = (data,) reasons = [] for value in data: + if isinstance(value, str): + # it's hopefully RR formatted, give parsing a try + try: + value = cls.parse_rr_text(value) + except RrParseError as e: + reasons.append(str(e)) + # not a dict so no point in continuing + continue try: flags = int(value.get('flags', 0)) if flags < 0 or flags > 255: @@ -913,6 +954,8 @@ class CaaValue(EqualityTupleMixin, dict): return [cls(v) for v in values] def __init__(self, value): + if isinstance(value, str): + value = self.parse_rr_text(value) super().__init__( { 'flags': int(value.get('flags', 0)), @@ -949,6 +992,10 @@ class CaaValue(EqualityTupleMixin, dict): def data(self): return self + @property + def rr_text(self): + return f'{self.flags} {self.tag} {self.value}' + def _equality_tuple(self): return (self.flags, self.tag, self.value) @@ -989,7 +1036,85 @@ Record.register_type(DnameRecord) class LocValue(EqualityTupleMixin, dict): - # TODO: work out how to do defaults per RFC + # TODO: this does not really match the RFC, but it's stuck using the details + # of how the type was impelemented. Would be nice to rework things to match + # while maintaining backwards compatibility. + # https://www.rfc-editor.org/rfc/rfc1876.html + + @classmethod + def parse_rr_text(cls, value): + try: + value = value.replace('m', '') + ( + lat_degrees, + lat_minutes, + lat_seconds, + lat_direction, + long_degrees, + long_minutes, + long_seconds, + long_direction, + altitude, + size, + precision_horz, + precision_vert, + ) = value.split(' ') + except ValueError: + raise RrParseError() + try: + lat_degrees = int(lat_degrees) + except ValueError: + pass + try: + lat_minutes = int(lat_minutes) + except ValueError: + pass + try: + long_degrees = int(long_degrees) + except ValueError: + pass + try: + long_minutes = int(long_minutes) + except ValueError: + pass + try: + lat_seconds = float(lat_seconds) + except ValueError: + pass + try: + long_seconds = float(long_seconds) + except ValueError: + pass + try: + altitude = float(altitude) + except ValueError: + pass + try: + size = float(size) + except ValueError: + pass + try: + precision_horz = float(precision_horz) + except ValueError: + pass + try: + precision_vert = float(precision_vert) + except ValueError: + pass + return { + 'lat_degrees': lat_degrees, + 'lat_minutes': lat_minutes, + 'lat_seconds': lat_seconds, + 'lat_direction': lat_direction, + 'long_degrees': long_degrees, + 'long_minutes': long_minutes, + 'long_seconds': long_seconds, + 'long_direction': long_direction, + 'altitude': altitude, + 'size': size, + 'precision_horz': precision_horz, + 'precision_vert': precision_vert, + } @classmethod def validate(cls, data, _type): @@ -1015,6 +1140,14 @@ class LocValue(EqualityTupleMixin, dict): data = (data,) reasons = [] for value in data: + if isinstance(value, str): + # it's hopefully RR formatted, give parsing a try + try: + value = cls.parse_rr_text(value) + except RrParseError as e: + reasons.append(str(e)) + # not a dict so no point in continuing + continue for key in int_keys: try: int(value[key]) @@ -1087,6 +1220,8 @@ class LocValue(EqualityTupleMixin, dict): return [cls(v) for v in values] def __init__(self, value): + if isinstance(value, str): + value = self.parse_rr_text(value) super().__init__( { 'lat_degrees': int(value['lat_degrees']), @@ -1204,6 +1339,10 @@ class LocValue(EqualityTupleMixin, dict): def data(self): return self + @property + def rr_text(self): + return f'{self.lat_degrees} {self.lat_minutes} {self.lat_seconds} {self.lat_direction} {self.long_degrees} {self.long_minutes} {self.long_seconds} {self.long_direction} {self.altitude}m {self.size}m {self.precision_horz}m {self.precision_vert}m' + def __hash__(self): return hash( ( @@ -1259,11 +1398,11 @@ Record.register_type(LocRecord) class MxValue(EqualityTupleMixin, dict): @classmethod - def parse_rr_text(self, value): + def parse_rr_text(cls, value): try: preference, exchange = value.split(' ') except ValueError: - raise RrParseError('failed to parse string value as RR text') + raise RrParseError() try: preference = int(preference) except ValueError: @@ -1377,7 +1516,7 @@ class NaptrValue(EqualityTupleMixin, dict): VALID_FLAGS = ('S', 'A', 'U', 'P') @classmethod - def parse_rr_text(self, value): + def parse_rr_text(cls, value): try: ( order, @@ -1388,7 +1527,7 @@ class NaptrValue(EqualityTupleMixin, dict): replacement, ) = value.split(' ') except ValueError: - raise RrParseError('failed to parse string value as RR text') + raise RrParseError() try: order = int(order) preference = int(preference) @@ -1549,6 +1688,10 @@ Record.register_type(NaptrRecord) class _NsValue(str): + @classmethod + def parse_rr_text(cls, value): + return value + @classmethod def validate(cls, data, _type): if not data: @@ -1557,6 +1700,7 @@ class _NsValue(str): data = (data,) reasons = [] for value in data: + # no need to consider parse_rr_text as it's a noop if not FQDN(str(value), allow_underscores=True).is_valid: reasons.append( f'Invalid NS value "{value}" is not a valid FQDN.' @@ -1569,6 +1713,10 @@ class _NsValue(str): def process(cls, values): return [cls(v) for v in values] + @property + def rr_text(self): + return self + class NsRecord(ValuesMixin, Record): _type = 'NS' @@ -1590,7 +1738,7 @@ class PtrValue(_TargetValue): reasons.append('missing values') for value in values: - reasons.extend(super(PtrValue, cls).validate(value, _type)) + reasons.extend(super().validate(value, _type)) return reasons @@ -1618,12 +1766,40 @@ class SshfpValue(EqualityTupleMixin, dict): VALID_ALGORITHMS = (1, 2, 3, 4) VALID_FINGERPRINT_TYPES = (1, 2) + @classmethod + def parse_rr_text(self, value): + try: + algorithm, fingerprint_type, fingerprint = value.split(' ') + except ValueError: + raise RrParseError() + try: + algorithm = int(algorithm) + except ValueError: + pass + try: + fingerprint_type = int(fingerprint_type) + except ValueError: + pass + return { + 'algorithm': algorithm, + 'fingerprint_type': fingerprint_type, + 'fingerprint': fingerprint, + } + @classmethod def validate(cls, data, _type): if not isinstance(data, (list, tuple)): data = (data,) reasons = [] for value in data: + if isinstance(value, str): + # it's hopefully RR formatted, give parsing a try + try: + value = cls.parse_rr_text(value) + except RrParseError as e: + reasons.append(str(e)) + # not a dict so no point in continuing + continue try: algorithm = int(value['algorithm']) if algorithm not in cls.VALID_ALGORITHMS: @@ -1653,6 +1829,8 @@ class SshfpValue(EqualityTupleMixin, dict): return [cls(v) for v in values] def __init__(self, value): + if isinstance(value, str): + value = self.parse_rr_text(value) super().__init__( { 'algorithm': int(value['algorithm']), @@ -1689,6 +1867,10 @@ class SshfpValue(EqualityTupleMixin, dict): def data(self): return self + @property + def rr_text(self): + return f'{self.algorithm} {self.fingerprint_type} {self.fingerprint}' + def __hash__(self): return hash(self.__repr__()) @@ -1731,6 +1913,10 @@ class _ChunkedValuesMixin(ValuesMixin): class _ChunkedValue(str): _unescaped_semicolon_re = re.compile(r'\w;') + @classmethod + def parse_rr_text(cls, value): + return value + @classmethod def validate(cls, data, _type): if not data: @@ -1739,6 +1925,7 @@ class _ChunkedValue(str): data = (data,) reasons = [] for value in data: + # no need to try parse_rr_text here as it's a noop if cls._unescaped_semicolon_re.search(value): reasons.append(f'unescaped ; in "{value}"') return reasons @@ -1752,6 +1939,10 @@ class _ChunkedValue(str): ret.append(cls(v.replace('" "', ''))) return ret + @property + def rr_text(self): + return self + class SpfRecord(_ChunkedValuesMixin, Record): _type = 'SPF' @@ -1762,6 +1953,18 @@ Record.register_type(SpfRecord) class SrvValue(EqualityTupleMixin, dict): + @classmethod + def preprocess_value(self, value): + if isinstance(value, str): + priority, weight, port, target = value.split(' ', 3) + return { + 'priority': priority, + 'weight': weight, + 'port': port, + 'target': target, + } + return value + @classmethod def validate(cls, data, _type): if not isinstance(data, (list, tuple)): @@ -1880,6 +2083,23 @@ Record.register_type(SrvRecord) class TlsaValue(EqualityTupleMixin, dict): + @classmethod + def preprocess_value(self, value): + if isinstance(value, str): + ( + certificate_usage, + certificate_association_data, + matching_type, + certificate_association_data, + ) = value.split(' ', 3) + return { + 'certificate_usage': certificate_usage, + 'certificate_association_data': certificate_association_data, + 'matching_type': matching_type, + 'certificate_association_data': certificate_association_data, + } + return value + @classmethod def validate(cls, data, _type): if not isinstance(data, (list, tuple)): @@ -2012,6 +2232,11 @@ class UrlfwdValue(EqualityTupleMixin, dict): VALID_MASKS = (0, 1, 2) VALID_QUERY = (0, 1) + @classmethod + def preprocess_value(self, value): + # TODO: + return value + @classmethod def validate(cls, data, _type): if not isinstance(data, (list, tuple)): diff --git a/tests/test_octodns_record.py b/tests/test_octodns_record.py index 1fc9db6..e65e031 100644 --- a/tests/test_octodns_record.py +++ b/tests/test_octodns_record.py @@ -22,6 +22,7 @@ from octodns.record import ( Create, Delete, GeoValue, + Ipv4Address, LocRecord, LocValue, MxRecord, @@ -45,11 +46,12 @@ from octodns.record import ( UrlfwdRecord, UrlfwdValue, ValidationError, + ValuesMixin, + _ChunkedValue, _Dynamic, _DynamicPool, _DynamicRule, _NsValue, - ValuesMixin, ) from octodns.zone import Zone @@ -212,6 +214,29 @@ class TestRecord(TestCase): DummyRecord().__repr__() + def test_ip_address_rr_text(self): + + # anything goes, we're a noop + for s in ( + None, + '', + 'word', + 42, + 42.43, + '1.2.3', + 'some.words.that.here', + '1.2.word.4', + '1.2.3.4', + ): + self.assertEqual(s, Ipv4Address.parse_rr_text(s)) + + # since we're a noop there's no need/way to check whether validate or + # __init__ call parse_rr_text + + zone = Zone('unit.tests.', []) + a = ARecord(zone, 'a', {'ttl': 42, 'value': '1.2.3.4'}) + self.assertEqual('1.2.3.4', a.values[0].rr_text) + def test_values_mixin_data(self): # no values, no value or values in data a = ARecord(self.zone, '', {'type': 'A', 'ttl': 600, 'values': []}) @@ -380,6 +405,29 @@ class TestRecord(TestCase): # __repr__ doesn't blow up a.__repr__() + def test_target_rr_text(self): + + # anything goes, we're a noop + for s in ( + None, + '', + 'word', + 42, + 42.43, + '1.2.3', + 'some.words.that.here', + '1.2.word.4', + '1.2.3.4', + ): + self.assertEqual(s, Ipv4Address.parse_rr_text(s)) + + # since we're a noop there's no need/way to check whether validate or + # __init__ call parse_rr_text + + zone = Zone('unit.tests.', []) + a = AliasRecord(zone, 'a', {'ttl': 42, 'value': 'some.target.'}) + self.assertEqual('some.target.', a.value.rr_text) + def test_caa(self): a_values = [ CaaValue({'flags': 0, 'tag': 'issue', 'value': 'ca.example.net'}), @@ -440,6 +488,71 @@ class TestRecord(TestCase): # __repr__ doesn't blow up a.__repr__() + def test_caa_value_rr_text(self): + # empty string won't parse + with self.assertRaises(RrParseError): + CaaValue.parse_rr_text('') + + # single word won't parse + with self.assertRaises(RrParseError): + CaaValue.parse_rr_text('nope') + + # 2nd word won't parse + with self.assertRaises(RrParseError): + CaaValue.parse_rr_text('0 tag') + + # 4th word won't parse + with self.assertRaises(RrParseError): + CaaValue.parse_rr_text('1 tag value another') + + # flags not an int, will parse + self.assertEqual( + {'flags': 'one', 'tag': 'tag', 'value': 'value'}, + CaaValue.parse_rr_text('one tag value'), + ) + + # valid + self.assertEqual( + {'flags': 0, 'tag': 'tag', 'value': '99148c81'}, + CaaValue.parse_rr_text('0 tag 99148c81'), + ) + + # make sure that validate is using parse_rr_text when passed string + # value(s) + reasons = CaaRecord.validate( + 'caa', 'caa.unit.tests.', {'ttl': 32, 'value': ''} + ) + self.assertEqual(['failed to parse string value as RR text'], reasons) + reasons = CaaRecord.validate( + 'caa', 'caa.unit.tests.', {'ttl': 32, 'values': ['nope']} + ) + self.assertEqual(['failed to parse string value as RR text'], reasons) + reasons = CaaRecord.validate( + 'caa', 'caa.unit.tests.', {'ttl': 32, 'value': '0 tag 99148c81'} + ) + self.assertFalse(reasons) + + # make sure that the cstor is using parse_rr_text + zone = Zone('unit.tests.', []) + a = CaaRecord(zone, 'caa', {'ttl': 32, 'value': '0 tag 99148c81'}) + self.assertEqual(0, a.values[0].flags) + self.assertEqual('tag', a.values[0].tag) + self.assertEqual('99148c81', a.values[0].value) + self.assertEqual('0 tag 99148c81', a.values[0].rr_text) + a = CaaRecord( + zone, + 'caa', + {'ttl': 32, 'values': ['1 tag1 99148c81', '2 tag2 99148c44']}, + ) + self.assertEqual(1, a.values[0].flags) + self.assertEqual('tag1', a.values[0].tag) + self.assertEqual('99148c81', a.values[0].value) + self.assertEqual('1 tag1 99148c81', a.values[0].rr_text) + self.assertEqual(2, a.values[1].flags) + self.assertEqual('tag2', a.values[1].tag) + self.assertEqual('99148c44', a.values[1].value) + self.assertEqual('2 tag2 99148c44', a.values[1].rr_text) + def test_cname(self): self.assertSingleValue(CnameRecord, 'target.foo.com.', 'other.foo.com.') @@ -542,6 +655,60 @@ class TestRecord(TestCase): # __repr__ doesn't blow up a.__repr__() + def test_loc_value_rr_text(self): + # only the exact correct number of words + for i in tuple(range(0, 12)) + (13,): + s = ''.join(['word'] * i) + with self.assertRaises(RrParseError): + LocValue.parse_rr_text(s) + + # valid + s = '0 1 2.2 N 3 4 5.5 E 6.6m 7.7m 8.8m 9.9m' + self.assertEqual( + { + 'altitude': 6.6, + 'lat_degrees': 0, + 'lat_direction': 'N', + 'lat_minutes': 1, + 'lat_seconds': 2.2, + 'long_degrees': 3, + 'long_direction': 'E', + 'long_minutes': 4, + 'long_seconds': 5.5, + 'precision_horz': 8.8, + 'precision_vert': 9.9, + 'size': 7.7, + }, + LocValue.parse_rr_text(s), + ) + + # make sure validate is using parse_rr_text when passed string values + reasons = LocRecord.validate( + 'loc', 'loc.unit.tests', {'ttl': 42, 'value': ''} + ) + self.assertEqual(['failed to parse string value as RR text'], reasons) + reasons = LocRecord.validate( + 'loc', 'loc.unit.tests', {'ttl': 42, 'value': s} + ) + self.assertFalse(reasons) + + # make sure that the cstor is using parse_rr_text + zone = Zone('unit.tests.', []) + a = LocRecord(zone, 'mx', {'ttl': 32, 'value': s}) + self.assertEqual(0, a.values[0].lat_degrees) + self.assertEqual(1, a.values[0].lat_minutes) + self.assertEqual(2.2, a.values[0].lat_seconds) + self.assertEqual('N', a.values[0].lat_direction) + self.assertEqual(3, a.values[0].long_degrees) + self.assertEqual(4, a.values[0].long_minutes) + self.assertEqual(5.5, a.values[0].long_seconds) + self.assertEqual('E', a.values[0].long_direction) + self.assertEqual(6.6, a.values[0].altitude) + self.assertEqual(7.7, a.values[0].size) + self.assertEqual(8.8, a.values[0].precision_horz) + self.assertEqual(9.9, a.values[0].precision_vert) + self.assertEqual(s, a.values[0].rr_text) + def test_mx(self): a_values = [ MxValue({'preference': 10, 'exchange': 'smtp1.'}), @@ -593,7 +760,7 @@ class TestRecord(TestCase): # __repr__ doesn't blow up a.__repr__() - def test_mx_rr_text(self): + def test_mx_value_rr_text(self): # empty string won't parse with self.assertRaises(RrParseError): @@ -607,18 +774,18 @@ class TestRecord(TestCase): with self.assertRaises(RrParseError): MxValue.parse_rr_text('10 mx.unit.tests. another') - # preference not an int, will parse - self.assertEqual( - {'preference': 10, 'exchange': 'mx.unit.tests.'}, - MxValue.parse_rr_text('10 mx.unit.tests.'), - ) - # preference not an int self.assertEqual( {'preference': 'abc', 'exchange': 'mx.unit.tests.'}, MxValue.parse_rr_text('abc mx.unit.tests.'), ) + # valid + self.assertEqual( + {'preference': 10, 'exchange': 'mx.unit.tests.'}, + MxValue.parse_rr_text('10 mx.unit.tests.'), + ) + # make sure that validate is using parse_rr_text when passed string # value(s) reasons = MxRecord.validate( @@ -945,7 +1112,7 @@ class TestRecord(TestCase): o.replacement = '1' self.assertEqual('1', o.replacement) - def test_naptr_rr_text(self): + def test_naptr_value_rr_text(self): # things with the wrong number of words won't parse for v in ( '', @@ -1030,6 +1197,28 @@ class TestRecord(TestCase): self.assertEqual([b_value], b.values) self.assertEqual(b_data, b.data) + def test_ns_value_rr_text(self): + # anything goes, we're a noop + for s in ( + None, + '', + 'word', + 42, + 42.43, + '1.2.3', + 'some.words.that.here', + '1.2.word.4', + '1.2.3.4', + ): + self.assertEqual(s, _NsValue.parse_rr_text(s)) + + # since we're a noop there's no need/way to check whether validate or + # __init__ call parse_rr_text + + zone = Zone('unit.tests.', []) + a = NsRecord(zone, 'a', {'ttl': 42, 'value': 'some.target.'}) + self.assertEqual('some.target.', a.values[0].rr_text) + def test_sshfp(self): a_values = [ SshfpValue( @@ -1098,11 +1287,86 @@ class TestRecord(TestCase): # __repr__ doesn't blow up a.__repr__() + def test_sshfp_value_rr_text(self): + + # empty string won't parse + with self.assertRaises(RrParseError): + SshfpValue.parse_rr_text('') + + # single word won't parse + with self.assertRaises(RrParseError): + SshfpValue.parse_rr_text('nope') + + # 3rd word won't parse + with self.assertRaises(RrParseError): + SshfpValue.parse_rr_text('0 1 00479b27 another') + + # algorithm and fingerprint_type not ints + self.assertEqual( + { + 'algorithm': 'one', + 'fingerprint_type': 'two', + 'fingerprint': '00479b27', + }, + SshfpValue.parse_rr_text('one two 00479b27'), + ) + + # valid + self.assertEqual( + {'algorithm': 1, 'fingerprint_type': 2, 'fingerprint': '00479b27'}, + SshfpValue.parse_rr_text('1 2 00479b27'), + ) + + # make sure that validate is using parse_rr_text when passed string + # value(s) + reasons = SshfpRecord.validate( + 'sshfp', 'sshfp.unit.tests.', {'ttl': 32, 'value': ''} + ) + self.assertEqual(['failed to parse string value as RR text'], reasons) + reasons = SshfpRecord.validate( + 'sshfp', 'sshfp.unit.tests.', {'ttl': 32, 'values': ['nope']} + ) + self.assertEqual(['failed to parse string value as RR text'], reasons) + reasons = SshfpRecord.validate( + 'sshfp', 'sshfp.unit.tests.', {'ttl': 32, 'value': '1 2 00479b27'} + ) + self.assertFalse(reasons) + + # make sure that the cstor is using parse_rr_text + zone = Zone('unit.tests.', []) + a = SshfpRecord(zone, 'sshfp', {'ttl': 32, 'value': '1 2 00479b27'}) + self.assertEqual(1, a.values[0].algorithm) + self.assertEqual(2, a.values[0].fingerprint_type) + self.assertEqual('00479b27', a.values[0].fingerprint) + self.assertEqual('1 2 00479b27', a.values[0].rr_text) + def test_spf(self): a_values = ['spf1 -all', 'spf1 -hrm'] b_value = 'spf1 -other' self.assertMultipleValues(SpfRecord, a_values, b_value) + def test_chunked_value_rr_text(self): + # anything goes, we're a noop + for s in ( + None, + '', + 'word', + 42, + 42.43, + '1.2.3', + 'some.words.that.here', + '1.2.word.4', + '1.2.3.4', + ): + self.assertEqual(s, _ChunkedValue.parse_rr_text(s)) + + # since we're a noop there's no need/way to check whether validate or + # __init__ call parse_rr_text + + zone = Zone('unit.tests.', []) + a = SpfRecord(zone, 'a', {'ttl': 42, 'value': 'some.target.'}) + self.assertEqual('some.target.', a.values[0].rr_text) + def test_srv(self): a_values = [ SrvValue(