diff --git a/octodns/record/__init__.py b/octodns/record/__init__.py index da527eb..31ed400 100644 --- a/octodns/record/__init__.py +++ b/octodns/record/__init__.py @@ -2,6 +2,7 @@ # # +from collections import defaultdict from ipaddress import IPv4Address as _IPv4Address, IPv6Address as _IPv6Address from logging import getLogger import re @@ -67,6 +68,11 @@ class RecordException(Exception): pass +class RrParseError(RecordException): + def __init__(self, message='failed to parse string value as RR text'): + super().__init__(message) + + class ValidationError(RecordException): @classmethod def build_message(cls, fqdn, reasons): @@ -79,6 +85,23 @@ class ValidationError(RecordException): self.reasons = reasons +class Rr(object): + ''' + Simple object intended to be used with Record.from_rrs to allow providers + that work with RFC formatted rdata to share centralized parsing/encoding + code + ''' + + def __init__(self, name, _type, ttl, rdata): + self.name = name + self._type = _type + self.ttl = ttl + self.rdata = rdata + + def __repr__(self): + return f'Rr<{self.name}, {self._type}, {self.ttl}, {self.rdata}' + + class Record(EqualityTupleMixin): log = getLogger('Record') @@ -166,6 +189,27 @@ class Record(EqualityTupleMixin): pass return reasons + @classmethod + def from_rrs(cls, zone, rrs, lenient=False): + # group records by name & type so that multiple rdatas can be combined + # into a single record when needed + grouped = defaultdict(list) + for rr in rrs: + grouped[(rr.name, rr._type)].append(rr) + + records = [] + # walk the grouped rrs converting each one to data and then create a + # record with that data + for _, rrs in sorted(grouped.items()): + rr = rrs[0] + name = zone.hostname_from_fqdn(rr.name) + _class = cls._CLASSES[rr._type] + data = _class.data_from_rrs(rrs) + record = Record.new(zone, name, data, lenient=lenient) + records.append(record) + + return records + def __init__(self, zone, name, data, source=None): self.zone = zone if name: @@ -337,6 +381,14 @@ class ValuesMixin(object): return reasons + @classmethod + def data_from_rrs(cls, rrs): + # type and TTL come from the first rr + rr = rrs[0] + # values come from parsing the rdata portion of all rrs + values = [cls._value_type.parse_rdata_text(rr.rdata) for rr in rrs] + return {'ttl': rr.ttl, 'type': rr._type, 'values': values} + def __init__(self, zone, name, data, source=None): super().__init__(zone, name, data, source=source) try: @@ -365,6 +417,15 @@ class ValuesMixin(object): return ret + @property + def rrs(self): + return ( + self.fqdn, + self.ttl, + self._type, + [v.rdata_text for v in self.values], + ) + def __repr__(self): values = "', '".join([str(v) for v in self.values]) klass = self.__class__.__name__ @@ -433,6 +494,16 @@ class ValueMixin(object): ) return reasons + @classmethod + def data_from_rrs(cls, rrs): + # single value, so single rr only... + rr = rrs[0] + return { + 'ttl': rr.ttl, + 'type': rr._type, + 'value': cls._value_type.parse_rdata_text(rr.rdata), + } + def __init__(self, zone, name, data, source=None): super().__init__(zone, name, data, source=source) self.value = self._value_type.process(data['value']) @@ -448,6 +519,10 @@ class ValueMixin(object): ret['value'] = getattr(self.value, 'data', self.value) return ret + @property + def rrs(self): + return self.fqdn, self.ttl, self._type, [self.value.rdata_text] + def __repr__(self): klass = self.__class__.__name__ return f'<{klass} {self._type} {self.ttl}, {self.decoded_fqdn}, {self.value}>' @@ -785,6 +860,10 @@ class _DynamicMixin(object): class _TargetValue(str): + @classmethod + def parse_rdata_text(self, value): + return value + @classmethod def validate(cls, data, _type): reasons = [] @@ -810,6 +889,10 @@ class _TargetValue(str): v = idna_encode(v) return super().__new__(cls, v) + @property + def rdata_text(self): + return self + class CnameValue(_TargetValue): pass @@ -820,6 +903,10 @@ class DnameValue(_TargetValue): class _IpAddress(str): + @classmethod + def parse_rdata_text(cls, value): + return value + @classmethod def validate(cls, data, _type): if not isinstance(data, (list, tuple)): @@ -853,6 +940,10 @@ class _IpAddress(str): v = str(cls._address_type(v)) return super().__new__(cls, v) + @property + def rdata_text(self): + return self + class Ipv4Address(_IpAddress): _address_type = _IPv4Address @@ -903,6 +994,18 @@ Record.register_type(AliasRecord) class CaaValue(EqualityTupleMixin, dict): # https://tools.ietf.org/html/rfc6844#page-5 + @classmethod + def parse_rdata_text(cls, value): + try: + flags, tag, value = value.split(' ') + except ValueError: + raise RrParseError() + try: + flags = int(flags) + except ValueError: + pass + return {'flags': flags, 'tag': tag, 'value': value} + @classmethod def validate(cls, data, _type): if not isinstance(data, (list, tuple)): @@ -963,6 +1066,10 @@ class CaaValue(EqualityTupleMixin, dict): def data(self): return self + @property + def rdata_text(self): + return f'{self.flags} {self.tag} {self.value}' + def _equality_tuple(self): return (self.flags, self.tag, self.value) @@ -1003,7 +1110,85 @@ Record.register_type(DnameRecord) class LocValue(EqualityTupleMixin, dict): - # TODO: work out how to do defaults per RFC + # TODO: this does not really match the RFC, but it's stuck using the details + # of how the type was impelemented. Would be nice to rework things to match + # while maintaining backwards compatibility. + # https://www.rfc-editor.org/rfc/rfc1876.html + + @classmethod + def parse_rdata_text(cls, value): + try: + value = value.replace('m', '') + ( + lat_degrees, + lat_minutes, + lat_seconds, + lat_direction, + long_degrees, + long_minutes, + long_seconds, + long_direction, + altitude, + size, + precision_horz, + precision_vert, + ) = value.split(' ') + except ValueError: + raise RrParseError() + try: + lat_degrees = int(lat_degrees) + except ValueError: + pass + try: + lat_minutes = int(lat_minutes) + except ValueError: + pass + try: + long_degrees = int(long_degrees) + except ValueError: + pass + try: + long_minutes = int(long_minutes) + except ValueError: + pass + try: + lat_seconds = float(lat_seconds) + except ValueError: + pass + try: + long_seconds = float(long_seconds) + except ValueError: + pass + try: + altitude = float(altitude) + except ValueError: + pass + try: + size = float(size) + except ValueError: + pass + try: + precision_horz = float(precision_horz) + except ValueError: + pass + try: + precision_vert = float(precision_vert) + except ValueError: + pass + return { + 'lat_degrees': lat_degrees, + 'lat_minutes': lat_minutes, + 'lat_seconds': lat_seconds, + 'lat_direction': lat_direction, + 'long_degrees': long_degrees, + 'long_minutes': long_minutes, + 'long_seconds': long_seconds, + 'long_direction': long_direction, + 'altitude': altitude, + 'size': size, + 'precision_horz': precision_horz, + 'precision_vert': precision_vert, + } @classmethod def validate(cls, data, _type): @@ -1218,6 +1403,10 @@ class LocValue(EqualityTupleMixin, dict): def data(self): return self + @property + def rdata_text(self): + return f'{self.lat_degrees} {self.lat_minutes} {self.lat_seconds} {self.lat_direction} {self.long_degrees} {self.long_minutes} {self.long_seconds} {self.long_direction} {self.altitude}m {self.size}m {self.precision_horz}m {self.precision_vert}m' + def __hash__(self): return hash( ( @@ -1272,6 +1461,18 @@ Record.register_type(LocRecord) class MxValue(EqualityTupleMixin, dict): + @classmethod + def parse_rdata_text(cls, value): + try: + preference, exchange = value.split(' ') + except ValueError: + raise RrParseError() + try: + preference = int(preference) + except ValueError: + pass + return {'preference': preference, 'exchange': exchange} + @classmethod def validate(cls, data, _type): if not isinstance(data, (list, tuple)): @@ -1347,6 +1548,10 @@ class MxValue(EqualityTupleMixin, dict): def data(self): return self + @property + def rdata_text(self): + return f'{self.preference} {self.exchange}' + def __hash__(self): return hash((self.preference, self.exchange)) @@ -1368,6 +1573,33 @@ Record.register_type(MxRecord) class NaptrValue(EqualityTupleMixin, dict): VALID_FLAGS = ('S', 'A', 'U', 'P') + @classmethod + def parse_rdata_text(cls, value): + try: + ( + order, + preference, + flags, + service, + regexp, + replacement, + ) = value.split(' ') + except ValueError: + raise RrParseError() + try: + order = int(order) + preference = int(preference) + except ValueError: + pass + return { + 'order': order, + 'preference': preference, + 'flags': flags, + 'service': service, + 'regexp': regexp, + 'replacement': replacement, + } + @classmethod def validate(cls, data, _type): if not isinstance(data, (list, tuple)): @@ -1468,6 +1700,10 @@ class NaptrValue(EqualityTupleMixin, dict): def data(self): return self + @property + def rdata_text(self): + return f'{self.order} {self.preference} {self.flags} {self.service} {self.regexp} {self.replacement}' + def __hash__(self): return hash(self.__repr__()) @@ -1500,6 +1736,10 @@ Record.register_type(NaptrRecord) class _NsValue(str): + @classmethod + def parse_rdata_text(cls, value): + return value + @classmethod def validate(cls, data, _type): if not data: @@ -1525,6 +1765,10 @@ class _NsValue(str): v = idna_encode(v) return super().__new__(cls, v) + @property + def rdata_text(self): + return self + class NsRecord(ValuesMixin, Record): _type = 'NS' @@ -1574,6 +1818,26 @@ class SshfpValue(EqualityTupleMixin, dict): VALID_ALGORITHMS = (1, 2, 3, 4) VALID_FINGERPRINT_TYPES = (1, 2) + @classmethod + def parse_rdata_text(self, value): + try: + algorithm, fingerprint_type, fingerprint = value.split(' ') + except ValueError: + raise RrParseError() + try: + algorithm = int(algorithm) + except ValueError: + pass + try: + fingerprint_type = int(fingerprint_type) + except ValueError: + pass + return { + 'algorithm': algorithm, + 'fingerprint_type': fingerprint_type, + 'fingerprint': fingerprint, + } + @classmethod def validate(cls, data, _type): if not isinstance(data, (list, tuple)): @@ -1645,6 +1909,10 @@ class SshfpValue(EqualityTupleMixin, dict): def data(self): return self + @property + def rdata_text(self): + return f'{self.algorithm} {self.fingerprint_type} {self.fingerprint}' + def __hash__(self): return hash(self.__repr__()) @@ -1687,6 +1955,13 @@ class _ChunkedValuesMixin(ValuesMixin): class _ChunkedValue(str): _unescaped_semicolon_re = re.compile(r'\w;') + @classmethod + def parse_rdata_text(cls, value): + try: + return value.replace(';', '\\;') + except AttributeError: + return value + @classmethod def validate(cls, data, _type): if not data: @@ -1708,6 +1983,10 @@ class _ChunkedValue(str): ret.append(cls(v.replace('" "', ''))) return ret + @property + def rdata_text(self): + return self + class SpfRecord(_ChunkedValuesMixin, Record): _type = 'SPF' @@ -1718,6 +1997,31 @@ Record.register_type(SpfRecord) class SrvValue(EqualityTupleMixin, dict): + @classmethod + def parse_rdata_text(self, value): + try: + priority, weight, port, target = value.split(' ') + except ValueError: + raise RrParseError() + try: + priority = int(priority) + except ValueError: + pass + try: + weight = int(weight) + except ValueError: + pass + try: + port = int(port) + except ValueError: + pass + return { + 'priority': priority, + 'weight': weight, + 'port': port, + 'target': target, + } + @classmethod def validate(cls, data, _type): if not isinstance(data, (list, tuple)): @@ -1840,6 +2144,36 @@ Record.register_type(SrvRecord) class TlsaValue(EqualityTupleMixin, dict): + @classmethod + def parse_rdata_text(self, value): + try: + ( + certificate_usage, + selector, + matching_type, + certificate_association_data, + ) = value.split(' ') + except ValueError: + raise RrParseError() + try: + certificate_usage = int(certificate_usage) + except ValueError: + pass + try: + selector = int(selector) + except ValueError: + pass + try: + matching_type = int(matching_type) + except ValueError: + pass + return { + 'certificate_usage': certificate_usage, + 'selector': selector, + 'matching_type': matching_type, + 'certificate_association_data': certificate_association_data, + } + @classmethod def validate(cls, data, _type): if not isinstance(data, (list, tuple)): @@ -1932,6 +2266,10 @@ class TlsaValue(EqualityTupleMixin, dict): def certificate_association_data(self, value): self['certificate_association_data'] = value + @property + def rdata_text(self): + return f'{self.certificate_usage} {self.selector} {self.matching_type} {self.certificate_association_data}' + def _equality_tuple(self): return ( self.certificate_usage, diff --git a/octodns/source/axfr.py b/octodns/source/axfr.py index e40d7a9..abf3fc8 100644 --- a/octodns/source/axfr.py +++ b/octodns/source/axfr.py @@ -9,12 +9,11 @@ import dns.rdatatype from dns.exception import DNSException -from collections import defaultdict from os import listdir from os.path import join import logging -from ..record import Record +from ..record import Record, Rr from .base import BaseSource @@ -42,110 +41,6 @@ class AxfrBaseSource(BaseSource): def __init__(self, id): super().__init__(id) - def _data_for_multiple(self, _type, records): - return { - 'ttl': records[0]['ttl'], - 'type': _type, - 'values': [r['value'] for r in records], - } - - _data_for_A = _data_for_multiple - _data_for_AAAA = _data_for_multiple - _data_for_NS = _data_for_multiple - _data_for_PTR = _data_for_multiple - - def _data_for_CAA(self, _type, records): - values = [] - for record in records: - flags, tag, value = record['value'].split(' ', 2) - values.append( - {'flags': flags, 'tag': tag, 'value': value.replace('"', '')} - ) - return {'ttl': records[0]['ttl'], 'type': _type, 'values': values} - - def _data_for_LOC(self, _type, records): - values = [] - for record in records: - ( - lat_degrees, - lat_minutes, - lat_seconds, - lat_direction, - long_degrees, - long_minutes, - long_seconds, - long_direction, - altitude, - size, - precision_horz, - precision_vert, - ) = (record['value'].replace('m', '').split(' ', 11)) - values.append( - { - 'lat_degrees': lat_degrees, - 'lat_minutes': lat_minutes, - 'lat_seconds': lat_seconds, - 'lat_direction': lat_direction, - 'long_degrees': long_degrees, - 'long_minutes': long_minutes, - 'long_seconds': long_seconds, - 'long_direction': long_direction, - 'altitude': altitude, - 'size': size, - 'precision_horz': precision_horz, - 'precision_vert': precision_vert, - } - ) - return {'ttl': records[0]['ttl'], 'type': _type, 'values': values} - - def _data_for_MX(self, _type, records): - values = [] - for record in records: - preference, exchange = record['value'].split(' ', 1) - values.append({'preference': preference, 'exchange': exchange}) - return {'ttl': records[0]['ttl'], 'type': _type, 'values': values} - - def _data_for_TXT(self, _type, records): - values = [value['value'].replace(';', '\\;') for value in records] - return {'ttl': records[0]['ttl'], 'type': _type, 'values': values} - - _data_for_SPF = _data_for_TXT - - def _data_for_single(self, _type, records): - record = records[0] - return {'ttl': record['ttl'], 'type': _type, 'value': record['value']} - - _data_for_CNAME = _data_for_single - - def _data_for_SRV(self, _type, records): - values = [] - for record in records: - priority, weight, port, target = record['value'].split(' ', 3) - values.append( - { - 'priority': priority, - 'weight': weight, - 'port': port, - 'target': target, - } - ) - return {'type': _type, 'ttl': records[0]['ttl'], 'values': values} - - def _data_for_SSHFP(self, _type, records): - values = [] - for record in records: - algorithm, fingerprint_type, fingerprint = record['value'].split( - ' ', 2 - ) - values.append( - { - 'algorithm': algorithm, - 'fingerprint_type': fingerprint_type, - 'fingerprint': fingerprint, - } - ) - return {'type': _type, 'ttl': records[0]['ttl'], 'values': values} - def populate(self, zone, target=False, lenient=False): self.log.debug( 'populate: name=%s, target=%s, lenient=%s', @@ -154,26 +49,10 @@ class AxfrBaseSource(BaseSource): lenient, ) - values = defaultdict(lambda: defaultdict(list)) - for record in self.zone_records(zone): - _type = record['type'] - if _type not in self.SUPPORTS: - continue - name = zone.hostname_from_fqdn(record['name']) - values[name][record['type']].append(record) - before = len(zone.records) - for name, types in values.items(): - for _type, records in types.items(): - data_for = getattr(self, f'_data_for_{_type}') - record = Record.new( - zone, - name, - data_for(_type, records), - source=self, - lenient=lenient, - ) - zone.add_record(record, lenient=lenient) + rrs = self.zone_records(zone) + for record in Record.from_rrs(zone, rrs, lenient=lenient): + zone.add_record(record, lenient=lenient) self.log.info( 'populate: found %s records', len(zone.records) - before @@ -218,14 +97,8 @@ class AxfrSource(AxfrBaseSource): for (name, ttl, rdata) in z.iterate_rdatas(): rdtype = dns.rdatatype.to_text(rdata.rdtype) - records.append( - { - "name": name.to_text(), - "ttl": ttl, - "type": rdtype, - "value": rdata.to_text(), - } - ) + if rdtype in self.SUPPORTS: + records.append(Rr(name.to_text(), rdtype, ttl, rdata.to_text())) return records @@ -302,20 +175,17 @@ class ZoneFileSource(AxfrBaseSource): if zone.name not in self._zone_records: try: z = self._load_zone_file(zone.name) - records = [] - for (name, ttl, rdata) in z.iterate_rdatas(): - rdtype = dns.rdatatype.to_text(rdata.rdtype) + except ZoneFileSourceNotFound: + return [] + + records = [] + for (name, ttl, rdata) in z.iterate_rdatas(): + rdtype = dns.rdatatype.to_text(rdata.rdtype) + if rdtype in self.SUPPORTS: records.append( - { - "name": name.to_text(), - "ttl": ttl, - "type": rdtype, - "value": rdata.to_text(), - } + Rr(name.to_text(), rdtype, ttl, rdata.to_text()) ) - self._zone_records[zone.name] = records - except ZoneFileSourceNotFound: - return [] + self._zone_records[zone.name] = records return self._zone_records[zone.name] diff --git a/tests/test_octodns_manager.py b/tests/test_octodns_manager.py index abcf9a7..1a36723 100644 --- a/tests/test_octodns_manager.py +++ b/tests/test_octodns_manager.py @@ -186,6 +186,9 @@ class TestManager(TestCase): manager.config['zones'] = manager._config_zones( {'déjà.vu.': {}, 'deja.vu.': {}, idna_encode('こんにちは.jp.'): {}} ) + from pprint import pprint + + pprint(manager.config['zones']) # refer to them with utf-8 with self.assertRaises(ManagerException) as ctx: diff --git a/tests/test_octodns_record.py b/tests/test_octodns_record.py index 8947571..fb9f726 100644 --- a/tests/test_octodns_record.py +++ b/tests/test_octodns_record.py @@ -27,6 +27,8 @@ from octodns.record import ( PtrRecord, Record, RecordException, + Rr, + RrParseError, SshfpRecord, SshfpValue, SpfRecord, @@ -39,11 +41,13 @@ from octodns.record import ( UrlfwdRecord, UrlfwdValue, ValidationError, + ValuesMixin, + _ChunkedValue, _Dynamic, _DynamicPool, _DynamicRule, _NsValue, - ValuesMixin, + _TargetValue, ) from octodns.zone import Zone @@ -293,6 +297,26 @@ class TestRecord(TestCase): DummyRecord().__repr__() + def test_ip_address_rdata_text(self): + + # anything goes, we're a noop + for s in ( + None, + '', + 'word', + 42, + 42.43, + '1.2.3', + 'some.words.that.here', + '1.2.word.4', + '1.2.3.4', + ): + self.assertEqual(s, Ipv4Address.parse_rdata_text(s)) + + zone = Zone('unit.tests.', []) + a = ARecord(zone, 'a', {'ttl': 42, 'value': '1.2.3.4'}) + self.assertEqual('1.2.3.4', a.values[0].rdata_text) + def test_values_mixin_data(self): # no values, no value or values in data a = ARecord(self.zone, '', {'type': 'A', 'ttl': 600, 'values': []}) @@ -461,6 +485,26 @@ class TestRecord(TestCase): # __repr__ doesn't blow up a.__repr__() + def test_target_rdata_text(self): + + # anything goes, we're a noop + for s in ( + None, + '', + 'word', + 42, + 42.43, + '1.2.3', + 'some.words.that.here', + '1.2.word.4', + '1.2.3.4', + ): + self.assertEqual(s, _TargetValue.parse_rdata_text(s)) + + zone = Zone('unit.tests.', []) + a = AliasRecord(zone, 'a', {'ttl': 42, 'value': 'some.target.'}) + self.assertEqual('some.target.', a.value.rdata_text) + def test_caa(self): a_values = [ CaaValue({'flags': 0, 'tag': 'issue', 'value': 'ca.example.net'}), @@ -521,6 +565,56 @@ class TestRecord(TestCase): # __repr__ doesn't blow up a.__repr__() + def test_caa_value_rdata_text(self): + # empty string won't parse + with self.assertRaises(RrParseError): + CaaValue.parse_rdata_text('') + + # single word won't parse + with self.assertRaises(RrParseError): + CaaValue.parse_rdata_text('nope') + + # 2nd word won't parse + with self.assertRaises(RrParseError): + CaaValue.parse_rdata_text('0 tag') + + # 4th word won't parse + with self.assertRaises(RrParseError): + CaaValue.parse_rdata_text('1 tag value another') + + # flags not an int, will parse + self.assertEqual( + {'flags': 'one', 'tag': 'tag', 'value': 'value'}, + CaaValue.parse_rdata_text('one tag value'), + ) + + # valid + self.assertEqual( + {'flags': 0, 'tag': 'tag', 'value': '99148c81'}, + CaaValue.parse_rdata_text('0 tag 99148c81'), + ) + + zone = Zone('unit.tests.', []) + a = CaaRecord( + zone, + 'caa', + { + 'ttl': 32, + 'values': [ + {'flags': 1, 'tag': 'tag1', 'value': '99148c81'}, + {'flags': 2, 'tag': 'tag2', 'value': '99148c44'}, + ], + }, + ) + self.assertEqual(1, a.values[0].flags) + self.assertEqual('tag1', a.values[0].tag) + self.assertEqual('99148c81', a.values[0].value) + self.assertEqual('1 tag1 99148c81', a.values[0].rdata_text) + self.assertEqual(2, a.values[1].flags) + self.assertEqual('tag2', a.values[1].tag) + self.assertEqual('99148c44', a.values[1].value) + self.assertEqual('2 tag2 99148c44', a.values[1].rdata_text) + def test_cname(self): self.assertSingleValue(CnameRecord, 'target.foo.com.', 'other.foo.com.') @@ -623,6 +717,92 @@ class TestRecord(TestCase): # __repr__ doesn't blow up a.__repr__() + def test_loc_value_rdata_text(self): + # only the exact correct number of words is allowed + for i in tuple(range(0, 12)) + (13,): + s = ''.join(['word'] * i) + with self.assertRaises(RrParseError): + LocValue.parse_rdata_text(s) + + # type conversions are best effort + self.assertEqual( + { + 'altitude': 'six', + 'lat_degrees': 'zero', + 'lat_direction': 'S', + 'lat_minutes': 'one', + 'lat_seconds': 'two', + 'long_degrees': 'three', + 'long_direction': 'W', + 'long_minutes': 'four', + 'long_seconds': 'five', + 'precision_horz': 'eight', + 'precision_vert': 'nine', + 'size': 'seven', + }, + LocValue.parse_rdata_text( + 'zero one two S three four five W six seven eight nine' + ), + ) + + # valid + s = '0 1 2.2 N 3 4 5.5 E 6.6m 7.7m 8.8m 9.9m' + self.assertEqual( + { + 'altitude': 6.6, + 'lat_degrees': 0, + 'lat_direction': 'N', + 'lat_minutes': 1, + 'lat_seconds': 2.2, + 'long_degrees': 3, + 'long_direction': 'E', + 'long_minutes': 4, + 'long_seconds': 5.5, + 'precision_horz': 8.8, + 'precision_vert': 9.9, + 'size': 7.7, + }, + LocValue.parse_rdata_text(s), + ) + + # make sure that the cstor is using parse_rdata_text + zone = Zone('unit.tests.', []) + a = LocRecord( + zone, + 'mx', + { + 'type': 'LOC', + 'ttl': 42, + 'value': { + 'altitude': 6.6, + 'lat_degrees': 0, + 'lat_direction': 'N', + 'lat_minutes': 1, + 'lat_seconds': 2.2, + 'long_degrees': 3, + 'long_direction': 'E', + 'long_minutes': 4, + 'long_seconds': 5.5, + 'precision_horz': 8.8, + 'precision_vert': 9.9, + 'size': 7.7, + }, + }, + ) + self.assertEqual(0, a.values[0].lat_degrees) + self.assertEqual(1, a.values[0].lat_minutes) + self.assertEqual(2.2, a.values[0].lat_seconds) + self.assertEqual('N', a.values[0].lat_direction) + self.assertEqual(3, a.values[0].long_degrees) + self.assertEqual(4, a.values[0].long_minutes) + self.assertEqual(5.5, a.values[0].long_seconds) + self.assertEqual('E', a.values[0].long_direction) + self.assertEqual(6.6, a.values[0].altitude) + self.assertEqual(7.7, a.values[0].size) + self.assertEqual(8.8, a.values[0].precision_horz) + self.assertEqual(9.9, a.values[0].precision_vert) + self.assertEqual(s, a.values[0].rdata_text) + def test_mx(self): a_values = [ MxValue({'preference': 10, 'exchange': 'smtp1.'}), @@ -674,6 +854,51 @@ class TestRecord(TestCase): # __repr__ doesn't blow up a.__repr__() + def test_mx_value_rdata_text(self): + + # empty string won't parse + with self.assertRaises(RrParseError): + MxValue.parse_rdata_text('') + + # single word won't parse + with self.assertRaises(RrParseError): + MxValue.parse_rdata_text('nope') + + # 3rd word won't parse + with self.assertRaises(RrParseError): + MxValue.parse_rdata_text('10 mx.unit.tests. another') + + # preference not an int + self.assertEqual( + {'preference': 'abc', 'exchange': 'mx.unit.tests.'}, + MxValue.parse_rdata_text('abc mx.unit.tests.'), + ) + + # valid + self.assertEqual( + {'preference': 10, 'exchange': 'mx.unit.tests.'}, + MxValue.parse_rdata_text('10 mx.unit.tests.'), + ) + + zone = Zone('unit.tests.', []) + a = MxRecord( + zone, + 'mx', + { + 'ttl': 32, + 'values': [ + {'preference': 11, 'exchange': 'mail1.unit.tests.'}, + {'preference': 12, 'exchange': 'mail2.unit.tests.'}, + ], + }, + ) + self.assertEqual(11, a.values[0].preference) + self.assertEqual('mail1.unit.tests.', a.values[0].exchange) + self.assertEqual('11 mail1.unit.tests.', a.values[0].rdata_text) + self.assertEqual(12, a.values[1].preference) + self.assertEqual('mail2.unit.tests.', a.values[1].exchange) + self.assertEqual('12 mail2.unit.tests.', a.values[1].rdata_text) + def test_naptr(self): a_values = [ NaptrValue( @@ -964,6 +1189,72 @@ class TestRecord(TestCase): o.replacement = '1' self.assertEqual('1', o.replacement) + def test_naptr_value_rdata_text(self): + # things with the wrong number of words won't parse + for v in ( + '', + 'one', + 'one two', + 'one two three', + 'one two three four', + 'one two three four five', + 'one two three four five six seven', + ): + with self.assertRaises(RrParseError): + NaptrValue.parse_rdata_text(v) + + # we don't care if the types of things are correct when parsing rr text + self.assertEqual( + { + 'order': 'one', + 'preference': 'two', + 'flags': 'three', + 'service': 'four', + 'regexp': 'five', + 'replacement': 'six', + }, + NaptrValue.parse_rdata_text('one two three four five six'), + ) + + # order and preference will be converted to int's when possible + self.assertEqual( + { + 'order': 1, + 'preference': 2, + 'flags': 'three', + 'service': 'four', + 'regexp': 'five', + 'replacement': 'six', + }, + NaptrValue.parse_rdata_text('1 2 three four five six'), + ) + + # make sure that the cstor is using parse_rdata_text + zone = Zone('unit.tests.', []) + a = NaptrRecord( + zone, + 'naptr', + { + 'ttl': 32, + 'value': { + 'order': 1, + 'preference': 2, + 'flags': 'S', + 'service': 'service', + 'regexp': 'regexp', + 'replacement': 'replacement', + }, + }, + ) + self.assertEqual(1, a.values[0].order) + self.assertEqual(2, a.values[0].preference) + self.assertEqual('S', a.values[0].flags) + self.assertEqual('service', a.values[0].service) + self.assertEqual('regexp', a.values[0].regexp) + self.assertEqual('replacement', a.values[0].replacement) + s = '1 2 S service regexp replacement' + self.assertEqual(s, a.values[0].rdata_text) + def test_ns(self): a_values = ['5.6.7.8.', '6.7.8.9.', '7.8.9.0.'] a_data = {'ttl': 30, 'values': a_values} @@ -980,6 +1271,25 @@ class TestRecord(TestCase): self.assertEqual([b_value], b.values) self.assertEqual(b_data, b.data) + def test_ns_value_rdata_text(self): + # anything goes, we're a noop + for s in ( + None, + '', + 'word', + 42, + 42.43, + '1.2.3', + 'some.words.that.here', + '1.2.word.4', + '1.2.3.4', + ): + self.assertEqual(s, _NsValue.parse_rdata_text(s)) + + zone = Zone('unit.tests.', []) + a = NsRecord(zone, 'a', {'ttl': 42, 'value': 'some.target.'}) + self.assertEqual('some.target.', a.values[0].rdata_text) + def test_sshfp(self): a_values = [ SshfpValue( @@ -1048,11 +1358,85 @@ class TestRecord(TestCase): # __repr__ doesn't blow up a.__repr__() + def test_sshfp_value_rdata_text(self): + + # empty string won't parse + with self.assertRaises(RrParseError): + SshfpValue.parse_rdata_text('') + + # single word won't parse + with self.assertRaises(RrParseError): + SshfpValue.parse_rdata_text('nope') + + # 3rd word won't parse + with self.assertRaises(RrParseError): + SshfpValue.parse_rdata_text('0 1 00479b27 another') + + # algorithm and fingerprint_type not ints + self.assertEqual( + { + 'algorithm': 'one', + 'fingerprint_type': 'two', + 'fingerprint': '00479b27', + }, + SshfpValue.parse_rdata_text('one two 00479b27'), + ) + + # valid + self.assertEqual( + {'algorithm': 1, 'fingerprint_type': 2, 'fingerprint': '00479b27'}, + SshfpValue.parse_rdata_text('1 2 00479b27'), + ) + + zone = Zone('unit.tests.', []) + a = SshfpRecord( + zone, + 'sshfp', + { + 'ttl': 32, + 'value': { + 'algorithm': 1, + 'fingerprint_type': 2, + 'fingerprint': '00479b27', + }, + }, + ) + self.assertEqual(1, a.values[0].algorithm) + self.assertEqual(2, a.values[0].fingerprint_type) + self.assertEqual('00479b27', a.values[0].fingerprint) + self.assertEqual('1 2 00479b27', a.values[0].rdata_text) + def test_spf(self): a_values = ['spf1 -all', 'spf1 -hrm'] b_value = 'spf1 -other' self.assertMultipleValues(SpfRecord, a_values, b_value) + def test_chunked_value_rdata_text(self): + for s in ( + None, + '', + 'word', + 42, + 42.43, + '1.2.3', + 'some.words.that.here', + '1.2.word.4', + '1.2.3.4', + ): + self.assertEqual(s, _ChunkedValue.parse_rdata_text(s)) + + # semi-colons are escaped + self.assertEqual( + 'Hello\\; World!', _ChunkedValue.parse_rdata_text('Hello; World!') + ) + + # since we're always a string validate and __init__ don't + # parse_rdata_text + + zone = Zone('unit.tests.', []) + a = SpfRecord(zone, 'a', {'ttl': 42, 'value': 'some.target.'}) + self.assertEqual('some.target.', a.values[0].rdata_text) + def test_srv(self): a_values = [ SrvValue( @@ -1117,6 +1501,69 @@ class TestRecord(TestCase): # __repr__ doesn't blow up a.__repr__() + def test_srv_value_rdata_text(self): + + # empty string won't parse + with self.assertRaises(RrParseError): + SrvValue.parse_rdata_text('') + + # single word won't parse + with self.assertRaises(RrParseError): + SrvValue.parse_rdata_text('nope') + + # 2nd word won't parse + with self.assertRaises(RrParseError): + SrvValue.parse_rdata_text('1 2') + + # 3rd word won't parse + with self.assertRaises(RrParseError): + SrvValue.parse_rdata_text('1 2 3') + + # 5th word won't parse + with self.assertRaises(RrParseError): + SrvValue.parse_rdata_text('1 2 3 4 5') + + # priority weight and port not ints + self.assertEqual( + { + 'priority': 'one', + 'weight': 'two', + 'port': 'three', + 'target': 'srv.unit.tests.', + }, + SrvValue.parse_rdata_text('one two three srv.unit.tests.'), + ) + + # valid + self.assertEqual( + { + 'priority': 1, + 'weight': 2, + 'port': 3, + 'target': 'srv.unit.tests.', + }, + SrvValue.parse_rdata_text('1 2 3 srv.unit.tests.'), + ) + + zone = Zone('unit.tests.', []) + a = SrvRecord( + zone, + '_srv._tcp', + { + 'ttl': 32, + 'value': { + 'priority': 1, + 'weight': 2, + 'port': 3, + 'target': 'srv.unit.tests.', + }, + }, + ) + self.assertEqual(1, a.values[0].priority) + self.assertEqual(2, a.values[0].weight) + self.assertEqual(3, a.values[0].port) + self.assertEqual('srv.unit.tests.', a.values[0].target) + def test_tlsa(self): a_values = [ TlsaValue( @@ -1218,6 +1665,70 @@ class TestRecord(TestCase): # __repr__ doesn't blow up a.__repr__() + def test_tsla_value_rdata_text(self): + + # empty string won't parse + with self.assertRaises(RrParseError): + TlsaValue.parse_rdata_text('') + + # single word won't parse + with self.assertRaises(RrParseError): + TlsaValue.parse_rdata_text('nope') + + # 2nd word won't parse + with self.assertRaises(RrParseError): + TlsaValue.parse_rdata_text('1 2') + + # 3rd word won't parse + with self.assertRaises(RrParseError): + TlsaValue.parse_rdata_text('1 2 3') + + # 5th word won't parse + with self.assertRaises(RrParseError): + TlsaValue.parse_rdata_text('1 2 3 abcd another') + + # non-ints + self.assertEqual( + { + 'certificate_usage': 'one', + 'selector': 'two', + 'matching_type': 'three', + 'certificate_association_data': 'abcd', + }, + TlsaValue.parse_rdata_text('one two three abcd'), + ) + + # valid + self.assertEqual( + { + 'certificate_usage': 1, + 'selector': 2, + 'matching_type': 3, + 'certificate_association_data': 'abcd', + }, + TlsaValue.parse_rdata_text('1 2 3 abcd'), + ) + + zone = Zone('unit.tests.', []) + a = TlsaRecord( + zone, + 'tlsa', + { + 'ttl': 32, + 'value': { + 'certificate_usage': 2, + 'selector': 1, + 'matching_type': 0, + 'certificate_association_data': 'abcd', + }, + }, + ) + self.assertEqual(2, a.values[0].certificate_usage) + self.assertEqual(1, a.values[0].selector) + self.assertEqual(0, a.values[0].matching_type) + self.assertEqual('abcd', a.values[0].certificate_association_data) + self.assertEqual('2 1 0 abcd', a.values[0].rdata_text) + def test_txt(self): a_values = ['a one', 'a two'] b_value = 'b other' @@ -2026,6 +2537,30 @@ class TestRecord(TestCase): values.add(b) self.assertTrue(b in values) + def test_rr(self): + # nothing much to test, just make sure that things don't blow up + Rr('name', 'type', 42, 'Hello World!').__repr__() + + zone = Zone('unit.tests.', []) + record = Record.new( + zone, + 'a', + {'ttl': 42, 'type': 'A', 'values': ['1.2.3.4', '2.3.4.5']}, + ) + self.assertEqual( + ('a.unit.tests.', 42, 'A', ['1.2.3.4', '2.3.4.5']), record.rrs + ) + + record = Record.new( + zone, + 'cname', + {'ttl': 43, 'type': 'CNAME', 'value': 'target.unit.tests.'}, + ) + self.assertEqual( + ('cname.unit.tests.', 43, 'CNAME', ['target.unit.tests.']), + record.rrs, + ) + class TestRecordValidation(TestCase): zone = Zone('unit.tests.', [])