From e6944ff5ae0682a2af2cecd2dacf0474a11fe2c2 Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Thu, 8 Sep 2022 10:37:26 -0700 Subject: [PATCH] finish up rr text Record coverage (hopefully) --- octodns/record/__init__.py | 135 +++++++++++++++---- tests/test_octodns_record.py | 242 +++++++++++++++++++++++++++++++++-- 2 files changed, 341 insertions(+), 36 deletions(-) diff --git a/octodns/record/__init__.py b/octodns/record/__init__.py index b2f07be..6091398 100644 --- a/octodns/record/__init__.py +++ b/octodns/record/__init__.py @@ -1954,16 +1954,29 @@ Record.register_type(SpfRecord) class SrvValue(EqualityTupleMixin, dict): @classmethod - def preprocess_value(self, value): - if isinstance(value, str): - priority, weight, port, target = value.split(' ', 3) - return { - 'priority': priority, - 'weight': weight, - 'port': port, - 'target': target, - } - return value + def parse_rr_text(self, value): + try: + priority, weight, port, target = value.split(' ') + except ValueError: + raise RrParseError() + try: + priority = int(priority) + except ValueError: + pass + try: + weight = int(weight) + except ValueError: + pass + try: + port = int(port) + except ValueError: + pass + return { + 'priority': priority, + 'weight': weight, + 'port': port, + 'target': target, + } @classmethod def validate(cls, data, _type): @@ -1971,6 +1984,14 @@ class SrvValue(EqualityTupleMixin, dict): data = (data,) reasons = [] for value in data: + if isinstance(value, str): + # it's hopefully RR formatted, give parsing a try + try: + value = cls.parse_rr_text(value) + except RrParseError as e: + reasons.append(str(e)) + # not a dict so no point in continuing + continue # TODO: validate algorithm and fingerprint_type values try: int(value['priority']) @@ -2010,6 +2031,8 @@ class SrvValue(EqualityTupleMixin, dict): return [cls(v) for v in values] def __init__(self, value): + if isinstance(value, str): + value = self.parse_rr_text(value) super().__init__( { 'priority': int(value['priority']), @@ -2084,21 +2107,34 @@ Record.register_type(SrvRecord) class TlsaValue(EqualityTupleMixin, dict): @classmethod - def preprocess_value(self, value): - if isinstance(value, str): + def parse_rr_text(self, value): + try: ( certificate_usage, - certificate_association_data, + selector, matching_type, certificate_association_data, - ) = value.split(' ', 3) - return { - 'certificate_usage': certificate_usage, - 'certificate_association_data': certificate_association_data, - 'matching_type': matching_type, - 'certificate_association_data': certificate_association_data, - } - return value + ) = value.split(' ') + except ValueError: + raise RrParseError() + try: + certificate_usage = int(certificate_usage) + except ValueError: + pass + try: + selector = int(selector) + except ValueError: + pass + try: + matching_type = int(matching_type) + except ValueError: + pass + return { + 'certificate_usage': certificate_usage, + 'selector': selector, + 'matching_type': matching_type, + 'certificate_association_data': certificate_association_data, + } @classmethod def validate(cls, data, _type): @@ -2106,6 +2142,14 @@ class TlsaValue(EqualityTupleMixin, dict): data = (data,) reasons = [] for value in data: + if isinstance(value, str): + # it's hopefully RR formatted, give parsing a try + try: + value = cls.parse_rr_text(value) + except RrParseError as e: + reasons.append(str(e)) + # not a dict so no point in continuing + continue try: certificate_usage = int(value.get('certificate_usage', 0)) if certificate_usage < 0 or certificate_usage > 3: @@ -2149,6 +2193,8 @@ class TlsaValue(EqualityTupleMixin, dict): return [cls(v) for v in values] def __init__(self, value): + if isinstance(value, str): + value = self.parse_rr_text(value) super().__init__( { 'certificate_usage': int(value.get('certificate_usage', 0)), @@ -2192,6 +2238,10 @@ class TlsaValue(EqualityTupleMixin, dict): def certificate_association_data(self, value): self['certificate_association_data'] = value + @property + def rr_text(self): + return f'{self.certificate_usage} {self.selector} {self.matching_type} {self.certificate_association_data}' + def _equality_tuple(self): return ( self.certificate_usage, @@ -2233,9 +2283,30 @@ class UrlfwdValue(EqualityTupleMixin, dict): VALID_QUERY = (0, 1) @classmethod - def preprocess_value(self, value): - # TODO: - return value + def parse_rr_text(self, value): + try: + code, masking, query, path, target = value.split(' ') + except ValueError: + raise RrParseError() + try: + code = int(code) + except ValueError: + pass + try: + masking = int(masking) + except ValueError: + pass + try: + query = int(query) + except ValueError: + pass + return { + 'code': code, + 'masking': masking, + 'query': query, + 'path': path, + 'target': target, + } @classmethod def validate(cls, data, _type): @@ -2243,6 +2314,14 @@ class UrlfwdValue(EqualityTupleMixin, dict): data = (data,) reasons = [] for value in data: + if isinstance(value, str): + # it's hopefully RR formatted, give parsing a try + try: + value = cls.parse_rr_text(value) + except RrParseError as e: + reasons.append(str(e)) + # not a dict so no point in continuing + continue try: code = int(value['code']) if code not in cls.VALID_CODES: @@ -2277,6 +2356,8 @@ class UrlfwdValue(EqualityTupleMixin, dict): return [UrlfwdValue(v) for v in values] def __init__(self, value): + if isinstance(value, str): + value = self.parse_rr_text(value) super().__init__( { 'path': value['path'], @@ -2327,6 +2408,12 @@ class UrlfwdValue(EqualityTupleMixin, dict): def query(self, value): self['query'] = value + @property + def rr_text(self): + return ( + f'{self.code} {self.masking} {self.query} {self.path} {self.target}' + ) + def _equality_tuple(self): return (self.path, self.target, self.code, self.masking, self.query) diff --git a/tests/test_octodns_record.py b/tests/test_octodns_record.py index e65e031..01eea90 100644 --- a/tests/test_octodns_record.py +++ b/tests/test_octodns_record.py @@ -52,6 +52,7 @@ from octodns.record import ( _DynamicPool, _DynamicRule, _NsValue, + _TargetValue, ) from octodns.zone import Zone @@ -419,7 +420,7 @@ class TestRecord(TestCase): '1.2.word.4', '1.2.3.4', ): - self.assertEqual(s, Ipv4Address.parse_rr_text(s)) + self.assertEqual(s, _TargetValue.parse_rr_text(s)) # since we're a noop there's no need/way to check whether validate or # __init__ call parse_rr_text @@ -656,12 +657,33 @@ class TestRecord(TestCase): a.__repr__() def test_loc_value_rr_text(self): - # only the exact correct number of words + # only the exact correct number of words is allowed for i in tuple(range(0, 12)) + (13,): s = ''.join(['word'] * i) with self.assertRaises(RrParseError): LocValue.parse_rr_text(s) + # type conversions are best effort + self.assertEqual( + { + 'altitude': 'six', + 'lat_degrees': 'zero', + 'lat_direction': 'S', + 'lat_minutes': 'one', + 'lat_seconds': 'two', + 'long_degrees': 'three', + 'long_direction': 'W', + 'long_minutes': 'four', + 'long_seconds': 'five', + 'precision_horz': 'eight', + 'precision_vert': 'nine', + 'size': 'seven', + }, + LocValue.parse_rr_text( + 'zero one two S three four five W six seven eight nine' + ), + ) + # valid s = '0 1 2.2 N 3 4 5.5 E 6.6m 7.7m 8.8m 9.9m' self.assertEqual( @@ -1385,16 +1407,6 @@ class TestRecord(TestCase): self.assertEqual(a_values[0]['weight'], a.values[0].weight) self.assertEqual(a_values[0]['port'], a.values[0].port) self.assertEqual(a_values[0]['target'], a.values[0].target) - from pprint import pprint - - pprint( - { - 'a_values': a_values, - 'self': a_data, - 'other': a.data, - 'a.values': a.values, - } - ) self.assertEqual(a_data, a.data) b_value = SrvValue( @@ -1441,6 +1453,73 @@ class TestRecord(TestCase): # __repr__ doesn't blow up a.__repr__() + def test_srv_value_rr_text(self): + + # empty string won't parse + with self.assertRaises(RrParseError): + SrvValue.parse_rr_text('') + + # single word won't parse + with self.assertRaises(RrParseError): + SrvValue.parse_rr_text('nope') + + # 2nd word won't parse + with self.assertRaises(RrParseError): + SrvValue.parse_rr_text('1 2') + + # 3rd word won't parse + with self.assertRaises(RrParseError): + SrvValue.parse_rr_text('1 2 3') + + # 5th word won't parse + with self.assertRaises(RrParseError): + SrvValue.parse_rr_text('1 2 3 4 5') + + # priority weight and port not ints + self.assertEqual( + { + 'priority': 'one', + 'weight': 'two', + 'port': 'three', + 'target': 'srv.unit.tests.', + }, + SrvValue.parse_rr_text('one two three srv.unit.tests.'), + ) + + # valid + self.assertEqual( + { + 'priority': 1, + 'weight': 2, + 'port': 3, + 'target': 'srv.unit.tests.', + }, + SrvValue.parse_rr_text('1 2 3 srv.unit.tests.'), + ) + + # make sure that validate is using parse_rr_text when passed string + # value(s) + reasons = SrvRecord.validate( + '_srv._tcp', '_srv._tcp.unit.tests.', {'ttl': 32, 'value': ''} + ) + self.assertEqual(['failed to parse string value as RR text'], reasons) + reasons = SrvRecord.validate( + '_srv._tcp', + '_srv._tcp.unit.tests.', + {'ttl': 32, 'value': '1 2 3 srv.unit.tests.'}, + ) + self.assertFalse(reasons) + + # make sure that the cstor is using parse_rr_text + zone = Zone('unit.tests.', []) + a = SrvRecord( + zone, '_srv._tcp', {'ttl': 32, 'value': '1 2 3 srv.unit.tests.'} + ) + self.assertEqual(1, a.values[0].priority) + self.assertEqual(2, a.values[0].weight) + self.assertEqual(3, a.values[0].port) + self.assertEqual('srv.unit.tests.', a.values[0].target) + def test_tlsa(self): a_values = [ TlsaValue( @@ -1542,6 +1621,70 @@ class TestRecord(TestCase): # __repr__ doesn't blow up a.__repr__() + def test_tsla_value_rr_text(self): + + # empty string won't parse + with self.assertRaises(RrParseError): + TlsaValue.parse_rr_text('') + + # single word won't parse + with self.assertRaises(RrParseError): + TlsaValue.parse_rr_text('nope') + + # 2nd word won't parse + with self.assertRaises(RrParseError): + TlsaValue.parse_rr_text('1 2') + + # 3rd word won't parse + with self.assertRaises(RrParseError): + TlsaValue.parse_rr_text('1 2 3') + + # 5th word won't parse + with self.assertRaises(RrParseError): + TlsaValue.parse_rr_text('1 2 3 abcd another') + + # non-ints + self.assertEqual( + { + 'certificate_usage': 'one', + 'selector': 'two', + 'matching_type': 'three', + 'certificate_association_data': 'abcd', + }, + TlsaValue.parse_rr_text('one two three abcd'), + ) + + # valid + self.assertEqual( + { + 'certificate_usage': 1, + 'selector': 2, + 'matching_type': 3, + 'certificate_association_data': 'abcd', + }, + TlsaValue.parse_rr_text('1 2 3 abcd'), + ) + + # make sure that validate is using parse_rr_text when passed string + # value(s) + reasons = TlsaRecord.validate( + 'tlsa', 'tlsa.unit.tests.', {'ttl': 32, 'value': ''} + ) + self.assertEqual(['failed to parse string value as RR text'], reasons) + reasons = TlsaRecord.validate( + 'tlsa', 'tlsa.unit.tests.', {'ttl': 32, 'value': '2 1 0 abcd'} + ) + self.assertFalse(reasons) + + # make sure that the cstor is using parse_rr_text + zone = Zone('unit.tests.', []) + a = TlsaRecord(zone, 'tlsa', {'ttl': 32, 'value': '2 1 0 abcd'}) + self.assertEqual(2, a.values[0].certificate_usage) + self.assertEqual(1, a.values[0].selector) + self.assertEqual(0, a.values[0].matching_type) + self.assertEqual('abcd', a.values[0].certificate_association_data) + self.assertEqual('2 1 0 abcd', a.values[0].rr_text) + def test_txt(self): a_values = ['a one', 'a two'] b_value = 'b other' @@ -1666,6 +1809,81 @@ class TestRecord(TestCase): # __repr__ doesn't blow up a.__repr__() + def test_urlfwd_value_rr_text(self): + + # empty string won't parse + with self.assertRaises(RrParseError): + UrlfwdValue.parse_rr_text('') + + # single word won't parse + with self.assertRaises(RrParseError): + UrlfwdValue.parse_rr_text('nope') + + # 2nd word won't parse + with self.assertRaises(RrParseError): + UrlfwdValue.parse_rr_text('one two') + + # 3rd word won't parse + with self.assertRaises(RrParseError): + UrlfwdValue.parse_rr_text('one two three') + + # 4th word won't parse + with self.assertRaises(RrParseError): + UrlfwdValue.parse_rr_text('one two three four') + + # 6th word won't parse + with self.assertRaises(RrParseError): + UrlfwdValue.parse_rr_text('one two three four five size') + + # non-ints + self.assertEqual( + { + 'code': 'one', + 'masking': 'two', + 'query': 'three', + 'path': 'four', + 'target': 'five', + }, + UrlfwdValue.parse_rr_text('one two three four five'), + ) + + # valid + self.assertEqual( + { + 'code': 301, + 'masking': 0, + 'query': 1, + 'path': 'four', + 'target': 'five', + }, + UrlfwdValue.parse_rr_text('301 0 1 four five'), + ) + + # make sure that validate is using parse_rr_text when passed string + # value(s) + reasons = UrlfwdRecord.validate( + 'urlfwd', 'urlfwd.unit.tests.', {'ttl': 32, 'value': ''} + ) + self.assertEqual(['failed to parse string value as RR text'], reasons) + reasons = UrlfwdRecord.validate( + 'urlfwd', + 'urlfwd.unit.tests.', + {'ttl': 32, 'value': '301 0 1 four five'}, + ) + self.assertFalse(reasons) + + # make sure that the cstor is using parse_rr_text + zone = Zone('unit.tests.', []) + a = UrlfwdRecord( + zone, 'urlfwd', {'ttl': 32, 'value': '301 0 1 four five'} + ) + self.assertEqual(301, a.values[0].code) + self.assertEqual(0, a.values[0].masking) + self.assertEqual(1, a.values[0].query) + self.assertEqual('four', a.values[0].path) + self.assertEqual('five', a.values[0].target) + self.assertEqual('301 0 1 four five', a.values[0].rr_text) + def test_record_new(self): txt = Record.new( self.zone, 'txt', {'ttl': 44, 'type': 'TXT', 'value': 'some text'}