Browse Source

IDNA support for Record values holding fqdns

idna-values-display
Ross McFarland 3 years ago
parent
commit
faf277ca01
No known key found for this signature in database GPG Key ID: 943B179E15D3B22A
2 changed files with 201 additions and 30 deletions
  1. +32
    -15
      octodns/record/__init__.py
  2. +169
    -15
      tests/test_octodns_record.py

+ 32
- 15
octodns/record/__init__.py View File

@ -463,12 +463,12 @@ class ValueMixin(object):
class _DynamicPool(object):
log = getLogger('_DynamicPool')
def __init__(self, _id, data):
def __init__(self, _id, data, value_type):
self._id = _id
values = [
{
'value': d['value'],
'value': value_type(d['value']),
'weight': d.get('weight', 1),
'status': d.get('status', 'obey'),
}
@ -745,7 +745,7 @@ class _DynamicMixin(object):
pools = {}
for _id, pool in sorted(pools.items()):
pools[_id] = _DynamicPool(_id, pool)
pools[_id] = _DynamicPool(_id, pool, self._value_type)
# rules
try:
@ -799,20 +799,24 @@ class _TargetValue(str):
reasons.append('empty value')
elif not data:
reasons.append('missing value')
# NOTE: FQDN complains if the data it receives isn't a str, it doesn't
# allow unicode... This is likely specific to 2.7
elif not FQDN(str(data), allow_underscores=True).is_valid:
reasons.append(f'{_type} value "{data}" is not a valid FQDN')
elif not data.endswith('.'):
reasons.append(f'{_type} value "{data}" missing trailing .')
else:
data = idna_encode(data)
if not FQDN(str(data), allow_underscores=True).is_valid:
reasons.append(f'{_type} value "{data}" is not a valid FQDN')
elif not data.endswith('.'):
reasons.append(f'{_type} value "{data}" missing trailing .')
return reasons
@classmethod
def process(cls, value):
if value:
return cls(value.lower())
return cls(value)
return None
def __new__(cls, v):
v = idna_encode(v)
return super().__new__(cls, v)
class CnameValue(_TargetValue):
pass
@ -1292,7 +1296,11 @@ class MxValue(EqualityTupleMixin, dict):
reasons.append(f'invalid preference "{value["preference"]}"')
exchange = None
try:
exchange = str(value.get('exchange', None) or value['value'])
exchange = value.get('exchange', None) or value['value']
if not exchange:
reasons.append('missing exchange')
continue
exchange = idna_encode(exchange)
if (
exchange != '.'
and not FQDN(exchange, allow_underscores=True).is_valid
@ -1323,7 +1331,7 @@ class MxValue(EqualityTupleMixin, dict):
except KeyError:
exchange = value['value']
super().__init__(
{'preference': int(preference), 'exchange': exchange.lower()}
{'preference': int(preference), 'exchange': idna_encode(exchange)}
)
@property
@ -1507,7 +1515,8 @@ class _NsValue(str):
data = (data,)
reasons = []
for value in data:
if not FQDN(str(value), allow_underscores=True).is_valid:
value = idna_encode(value)
if not FQDN(value, allow_underscores=True).is_valid:
reasons.append(
f'Invalid NS value "{value}" is not a valid FQDN.'
)
@ -1519,6 +1528,10 @@ class _NsValue(str):
def process(cls, values):
return [cls(v) for v in values]
def __new__(cls, v):
v = idna_encode(v)
return super().__new__(cls, v)
class NsRecord(ValuesMixin, Record):
_type = 'NS'
@ -1739,11 +1752,15 @@ class SrvValue(EqualityTupleMixin, dict):
reasons.append(f'invalid port "{value["port"]}"')
try:
target = value['target']
if not target:
reasons.append('missing target')
continue
target = idna_encode(target)
if not target.endswith('.'):
reasons.append(f'SRV value "{target}" missing trailing .')
if (
target != '.'
and not FQDN(str(target), allow_underscores=True).is_valid
and not FQDN(target, allow_underscores=True).is_valid
):
reasons.append(
f'Invalid SRV target "{target}" is not a valid FQDN.'
@ -1762,7 +1779,7 @@ class SrvValue(EqualityTupleMixin, dict):
'priority': int(value['priority']),
'weight': int(value['weight']),
'port': int(value['port']),
'target': value['target'].lower(),
'target': idna_encode(value['target']),
}
)


+ 169
- 15
tests/test_octodns_record.py View File

@ -23,6 +23,7 @@ from octodns.record import (
Create,
Delete,
GeoValue,
Ipv4Address,
LocRecord,
LocValue,
MxRecord,
@ -101,6 +102,81 @@ class TestRecord(TestCase):
self.assertTrue(f'{encoded}.{zone.name}', record.fqdn)
self.assertTrue(f'{utf8}.{zone.decoded_name}', record.decoded_fqdn)
def test_utf8_values(self):
zone = Zone('unit.tests.', [])
utf8 = 'гэрбүл.mn.'
encoded = idna_encode(utf8)
# ALIAS
record = Record.new(
zone, '', {'type': 'ALIAS', 'ttl': 300, 'value': utf8}
)
self.assertEqual(encoded, record.value)
# CNAME
record = Record.new(
zone, 'cname', {'type': 'CNAME', 'ttl': 300, 'value': utf8}
)
self.assertEqual(encoded, record.value)
# DNAME
record = Record.new(
zone, 'dname', {'type': 'DNAME', 'ttl': 300, 'value': utf8}
)
self.assertEqual(encoded, record.value)
# MX
record = Record.new(
zone,
'mx',
{
'type': 'MX',
'ttl': 300,
'value': {'preference': 10, 'exchange': utf8},
},
)
self.assertEqual(
MxValue({'preference': 10, 'exchange': encoded}), record.values[0]
)
# NS
record = Record.new(
zone, 'ns', {'type': 'NS', 'ttl': 300, 'value': utf8}
)
self.assertEqual(encoded, record.values[0])
# PTR
another_utf8 = 'niño.mx.'
another_encoded = idna_encode(another_utf8)
record = Record.new(
zone,
'ptr',
{'type': 'PTR', 'ttl': 300, 'values': [utf8, another_utf8]},
)
self.assertEqual([encoded, another_encoded], record.values)
# SRV
record = Record.new(
zone,
'_srv._tcp',
{
'type': 'SRV',
'ttl': 300,
'value': {
'priority': 0,
'weight': 10,
'port': 80,
'target': utf8,
},
},
)
self.assertEqual(
SrvValue(
{'priority': 0, 'weight': 10, 'port': 80, 'target': encoded}
),
record.values[0],
)
def test_alias_lowering_value(self):
upper_record = AliasRecord(
self.zone,
@ -1002,16 +1078,6 @@ class TestRecord(TestCase):
self.assertEqual(a_values[0]['weight'], a.values[0].weight)
self.assertEqual(a_values[0]['port'], a.values[0].port)
self.assertEqual(a_values[0]['target'], a.values[0].target)
from pprint import pprint
pprint(
{
'a_values': a_values,
'self': a_data,
'other': a.data,
'a.values': a.values,
}
)
self.assertEqual(a_data, a.data)
b_value = SrvValue(
@ -2604,7 +2670,7 @@ class TestRecordValidation(TestCase):
)
self.assertEqual(['missing value'], ctx.exception.reasons)
def test_CNAME(self):
def test_cname_validation(self):
# doesn't blow up
Record.new(
self.zone,
@ -3153,6 +3219,19 @@ class TestRecordValidation(TestCase):
ctx.exception.reasons,
)
# if exchange doesn't exist value can not be None/falsey
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'MX',
'ttl': 600,
'value': {'preference': 10, 'value': ''},
},
)
self.assertEqual(['missing exchange'], ctx.exception.reasons)
# exchange can be a single `.`
record = Record.new(
self.zone,
@ -3648,6 +3727,24 @@ class TestRecordValidation(TestCase):
ctx.exception.reasons,
)
# falsey target
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'_srv._tcp',
{
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 2,
'port': 3,
'target': '',
},
},
)
self.assertEqual(['missing target'], ctx.exception.reasons)
# target must be a valid FQDN
with self.assertRaises(ValidationError) as ctx:
Record.new(
@ -5333,7 +5430,6 @@ class TestDynamicRecords(TestCase):
'pools': {
'one': {'values': [{'value': '3.3.3.3'}]},
'two': {
# Testing out of order value sorting here
'values': [{'value': '5.5.5.5'}, {'value': '4.4.4.4'}]
},
'three': {
@ -5361,9 +5457,12 @@ class TestDynamicRecords(TestCase):
)
def test_dynamic_eqs(self):
pool_one = _DynamicPool('one', {'values': [{'value': '1.2.3.4'}]})
pool_two = _DynamicPool('two', {'values': [{'value': '1.2.3.5'}]})
pool_one = _DynamicPool(
'one', {'values': [{'value': '1.2.3.4'}]}, Ipv4Address
)
pool_two = _DynamicPool(
'two', {'values': [{'value': '1.2.3.5'}]}, Ipv4Address
)
self.assertEqual(pool_one, pool_one)
self.assertNotEqual(pool_one, pool_two)
self.assertNotEqual(pool_one, 42)
@ -5382,6 +5481,61 @@ class TestDynamicRecords(TestCase):
self.assertNotEqual(dynamic, other)
self.assertNotEqual(dynamic, 42)
def test_dynamic_cname_idna(self):
a_utf8 = 'natación.mx.'
a_encoded = idna_encode(a_utf8)
b_utf8 = 'гэрбүл.mn.'
b_encoded = idna_encode(b_utf8)
cname_data = {
'dynamic': {
'pools': {
'one': {
# Testing out of order value sorting here
'values': [
{'value': 'b.unit.tests.'},
{'value': 'a.unit.tests.'},
]
},
'two': {
'values': [
# some utf8 values we expect to be idna encoded
{'weight': 10, 'value': a_utf8},
{'weight': 12, 'value': b_utf8},
]
},
},
'rules': [
{'geos': ['NA-US-CA'], 'pool': 'two'},
{'pool': 'one'},
],
},
'type': 'CNAME',
'ttl': 60,
'value': a_utf8,
}
cname = Record.new(self.zone, 'cname', cname_data)
self.assertEqual(a_encoded, cname.value)
self.assertEqual(
{
'fallback': None,
'values': [
{'weight': 1, 'value': 'a.unit.tests.', 'status': 'obey'},
{'weight': 1, 'value': 'b.unit.tests.', 'status': 'obey'},
],
},
cname.dynamic.pools['one'].data,
)
self.assertEqual(
{
'fallback': None,
'values': [
{'weight': 12, 'value': b_encoded, 'status': 'obey'},
{'weight': 10, 'value': a_encoded, 'status': 'obey'},
],
},
cname.dynamic.pools['two'].data,
)
class TestChanges(TestCase):
zone = Zone('unit.tests.', [])


Loading…
Cancel
Save