From faf277ca01b10c3d49b64b871213f066b972bb55 Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Wed, 14 Sep 2022 13:56:27 -0700 Subject: [PATCH] IDNA support for Record values holding fqdns --- octodns/record/__init__.py | 47 ++++++--- tests/test_octodns_record.py | 184 ++++++++++++++++++++++++++++++++--- 2 files changed, 201 insertions(+), 30 deletions(-) diff --git a/octodns/record/__init__.py b/octodns/record/__init__.py index a0c83bb..1893629 100644 --- a/octodns/record/__init__.py +++ b/octodns/record/__init__.py @@ -463,12 +463,12 @@ class ValueMixin(object): class _DynamicPool(object): log = getLogger('_DynamicPool') - def __init__(self, _id, data): + def __init__(self, _id, data, value_type): self._id = _id values = [ { - 'value': d['value'], + 'value': value_type(d['value']), 'weight': d.get('weight', 1), 'status': d.get('status', 'obey'), } @@ -745,7 +745,7 @@ class _DynamicMixin(object): pools = {} for _id, pool in sorted(pools.items()): - pools[_id] = _DynamicPool(_id, pool) + pools[_id] = _DynamicPool(_id, pool, self._value_type) # rules try: @@ -799,20 +799,24 @@ class _TargetValue(str): reasons.append('empty value') elif not data: reasons.append('missing value') - # NOTE: FQDN complains if the data it receives isn't a str, it doesn't - # allow unicode... This is likely specific to 2.7 - elif not FQDN(str(data), allow_underscores=True).is_valid: - reasons.append(f'{_type} value "{data}" is not a valid FQDN') - elif not data.endswith('.'): - reasons.append(f'{_type} value "{data}" missing trailing .') + else: + data = idna_encode(data) + if not FQDN(str(data), allow_underscores=True).is_valid: + reasons.append(f'{_type} value "{data}" is not a valid FQDN') + elif not data.endswith('.'): + reasons.append(f'{_type} value "{data}" missing trailing .') return reasons @classmethod def process(cls, value): if value: - return cls(value.lower()) + return cls(value) return None + def __new__(cls, v): + v = idna_encode(v) + return super().__new__(cls, v) + class CnameValue(_TargetValue): pass @@ -1292,7 +1296,11 @@ class MxValue(EqualityTupleMixin, dict): reasons.append(f'invalid preference "{value["preference"]}"') exchange = None try: - exchange = str(value.get('exchange', None) or value['value']) + exchange = value.get('exchange', None) or value['value'] + if not exchange: + reasons.append('missing exchange') + continue + exchange = idna_encode(exchange) if ( exchange != '.' and not FQDN(exchange, allow_underscores=True).is_valid @@ -1323,7 +1331,7 @@ class MxValue(EqualityTupleMixin, dict): except KeyError: exchange = value['value'] super().__init__( - {'preference': int(preference), 'exchange': exchange.lower()} + {'preference': int(preference), 'exchange': idna_encode(exchange)} ) @property @@ -1507,7 +1515,8 @@ class _NsValue(str): data = (data,) reasons = [] for value in data: - if not FQDN(str(value), allow_underscores=True).is_valid: + value = idna_encode(value) + if not FQDN(value, allow_underscores=True).is_valid: reasons.append( f'Invalid NS value "{value}" is not a valid FQDN.' ) @@ -1519,6 +1528,10 @@ class _NsValue(str): def process(cls, values): return [cls(v) for v in values] + def __new__(cls, v): + v = idna_encode(v) + return super().__new__(cls, v) + class NsRecord(ValuesMixin, Record): _type = 'NS' @@ -1739,11 +1752,15 @@ class SrvValue(EqualityTupleMixin, dict): reasons.append(f'invalid port "{value["port"]}"') try: target = value['target'] + if not target: + reasons.append('missing target') + continue + target = idna_encode(target) if not target.endswith('.'): reasons.append(f'SRV value "{target}" missing trailing .') if ( target != '.' - and not FQDN(str(target), allow_underscores=True).is_valid + and not FQDN(target, allow_underscores=True).is_valid ): reasons.append( f'Invalid SRV target "{target}" is not a valid FQDN.' @@ -1762,7 +1779,7 @@ class SrvValue(EqualityTupleMixin, dict): 'priority': int(value['priority']), 'weight': int(value['weight']), 'port': int(value['port']), - 'target': value['target'].lower(), + 'target': idna_encode(value['target']), } ) diff --git a/tests/test_octodns_record.py b/tests/test_octodns_record.py index 37c0180..180089a 100644 --- a/tests/test_octodns_record.py +++ b/tests/test_octodns_record.py @@ -23,6 +23,7 @@ from octodns.record import ( Create, Delete, GeoValue, + Ipv4Address, LocRecord, LocValue, MxRecord, @@ -101,6 +102,81 @@ class TestRecord(TestCase): self.assertTrue(f'{encoded}.{zone.name}', record.fqdn) self.assertTrue(f'{utf8}.{zone.decoded_name}', record.decoded_fqdn) + def test_utf8_values(self): + zone = Zone('unit.tests.', []) + utf8 = 'гэрбүл.mn.' + encoded = idna_encode(utf8) + + # ALIAS + record = Record.new( + zone, '', {'type': 'ALIAS', 'ttl': 300, 'value': utf8} + ) + self.assertEqual(encoded, record.value) + + # CNAME + record = Record.new( + zone, 'cname', {'type': 'CNAME', 'ttl': 300, 'value': utf8} + ) + self.assertEqual(encoded, record.value) + + # DNAME + record = Record.new( + zone, 'dname', {'type': 'DNAME', 'ttl': 300, 'value': utf8} + ) + self.assertEqual(encoded, record.value) + + # MX + record = Record.new( + zone, + 'mx', + { + 'type': 'MX', + 'ttl': 300, + 'value': {'preference': 10, 'exchange': utf8}, + }, + ) + self.assertEqual( + MxValue({'preference': 10, 'exchange': encoded}), record.values[0] + ) + + # NS + record = Record.new( + zone, 'ns', {'type': 'NS', 'ttl': 300, 'value': utf8} + ) + self.assertEqual(encoded, record.values[0]) + + # PTR + another_utf8 = 'niño.mx.' + another_encoded = idna_encode(another_utf8) + record = Record.new( + zone, + 'ptr', + {'type': 'PTR', 'ttl': 300, 'values': [utf8, another_utf8]}, + ) + self.assertEqual([encoded, another_encoded], record.values) + + # SRV + record = Record.new( + zone, + '_srv._tcp', + { + 'type': 'SRV', + 'ttl': 300, + 'value': { + 'priority': 0, + 'weight': 10, + 'port': 80, + 'target': utf8, + }, + }, + ) + self.assertEqual( + SrvValue( + {'priority': 0, 'weight': 10, 'port': 80, 'target': encoded} + ), + record.values[0], + ) + def test_alias_lowering_value(self): upper_record = AliasRecord( self.zone, @@ -1002,16 +1078,6 @@ class TestRecord(TestCase): self.assertEqual(a_values[0]['weight'], a.values[0].weight) self.assertEqual(a_values[0]['port'], a.values[0].port) self.assertEqual(a_values[0]['target'], a.values[0].target) - from pprint import pprint - - pprint( - { - 'a_values': a_values, - 'self': a_data, - 'other': a.data, - 'a.values': a.values, - } - ) self.assertEqual(a_data, a.data) b_value = SrvValue( @@ -2604,7 +2670,7 @@ class TestRecordValidation(TestCase): ) self.assertEqual(['missing value'], ctx.exception.reasons) - def test_CNAME(self): + def test_cname_validation(self): # doesn't blow up Record.new( self.zone, @@ -3153,6 +3219,19 @@ class TestRecordValidation(TestCase): ctx.exception.reasons, ) + # if exchange doesn't exist value can not be None/falsey + with self.assertRaises(ValidationError) as ctx: + Record.new( + self.zone, + '', + { + 'type': 'MX', + 'ttl': 600, + 'value': {'preference': 10, 'value': ''}, + }, + ) + self.assertEqual(['missing exchange'], ctx.exception.reasons) + # exchange can be a single `.` record = Record.new( self.zone, @@ -3648,6 +3727,24 @@ class TestRecordValidation(TestCase): ctx.exception.reasons, ) + # falsey target + with self.assertRaises(ValidationError) as ctx: + Record.new( + self.zone, + '_srv._tcp', + { + 'type': 'SRV', + 'ttl': 600, + 'value': { + 'priority': 1, + 'weight': 2, + 'port': 3, + 'target': '', + }, + }, + ) + self.assertEqual(['missing target'], ctx.exception.reasons) + # target must be a valid FQDN with self.assertRaises(ValidationError) as ctx: Record.new( @@ -5333,7 +5430,6 @@ class TestDynamicRecords(TestCase): 'pools': { 'one': {'values': [{'value': '3.3.3.3'}]}, 'two': { - # Testing out of order value sorting here 'values': [{'value': '5.5.5.5'}, {'value': '4.4.4.4'}] }, 'three': { @@ -5361,9 +5457,12 @@ class TestDynamicRecords(TestCase): ) def test_dynamic_eqs(self): - - pool_one = _DynamicPool('one', {'values': [{'value': '1.2.3.4'}]}) - pool_two = _DynamicPool('two', {'values': [{'value': '1.2.3.5'}]}) + pool_one = _DynamicPool( + 'one', {'values': [{'value': '1.2.3.4'}]}, Ipv4Address + ) + pool_two = _DynamicPool( + 'two', {'values': [{'value': '1.2.3.5'}]}, Ipv4Address + ) self.assertEqual(pool_one, pool_one) self.assertNotEqual(pool_one, pool_two) self.assertNotEqual(pool_one, 42) @@ -5382,6 +5481,61 @@ class TestDynamicRecords(TestCase): self.assertNotEqual(dynamic, other) self.assertNotEqual(dynamic, 42) + def test_dynamic_cname_idna(self): + a_utf8 = 'natación.mx.' + a_encoded = idna_encode(a_utf8) + b_utf8 = 'гэрбүл.mn.' + b_encoded = idna_encode(b_utf8) + cname_data = { + 'dynamic': { + 'pools': { + 'one': { + # Testing out of order value sorting here + 'values': [ + {'value': 'b.unit.tests.'}, + {'value': 'a.unit.tests.'}, + ] + }, + 'two': { + 'values': [ + # some utf8 values we expect to be idna encoded + {'weight': 10, 'value': a_utf8}, + {'weight': 12, 'value': b_utf8}, + ] + }, + }, + 'rules': [ + {'geos': ['NA-US-CA'], 'pool': 'two'}, + {'pool': 'one'}, + ], + }, + 'type': 'CNAME', + 'ttl': 60, + 'value': a_utf8, + } + cname = Record.new(self.zone, 'cname', cname_data) + self.assertEqual(a_encoded, cname.value) + self.assertEqual( + { + 'fallback': None, + 'values': [ + {'weight': 1, 'value': 'a.unit.tests.', 'status': 'obey'}, + {'weight': 1, 'value': 'b.unit.tests.', 'status': 'obey'}, + ], + }, + cname.dynamic.pools['one'].data, + ) + self.assertEqual( + { + 'fallback': None, + 'values': [ + {'weight': 12, 'value': b_encoded, 'status': 'obey'}, + {'weight': 10, 'value': a_encoded, 'status': 'obey'}, + ], + }, + cname.dynamic.pools['two'].data, + ) + class TestChanges(TestCase): zone = Zone('unit.tests.', [])