Browse Source

Merge pull request #935 from octodns/idna-values

IDNA support for Record values holding fqdns
pull/944/head
Ross McFarland 3 years ago
committed by GitHub
parent
commit
6cf5bc247a
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 203 additions and 30 deletions
  1. +2
    -0
      CHANGELOG.md
  2. +32
    -15
      octodns/record/__init__.py
  3. +169
    -15
      tests/test_octodns_record.py

+ 2
- 0
CHANGELOG.md View File

@ -11,6 +11,8 @@
decoded form. Both forms should be accepted in command line arguments.
Providers may need to be updated to display the decoded form in their logs,
until then they'd display the IDNA version.
* IDNA value support for Record types that hold FQDNs: ALIAS, CNAME, DNAME, PTR,
MX, NS, and SRV.
* Support for configuring global processors that apply to all zones with
`manager.processors`


+ 32
- 15
octodns/record/__init__.py View File

@ -456,12 +456,12 @@ class ValueMixin(object):
class _DynamicPool(object):
log = getLogger('_DynamicPool')
def __init__(self, _id, data):
def __init__(self, _id, data, value_type):
self._id = _id
values = [
{
'value': d['value'],
'value': value_type(d['value']),
'weight': d.get('weight', 1),
'status': d.get('status', 'obey'),
}
@ -738,7 +738,7 @@ class _DynamicMixin(object):
pools = {}
for _id, pool in sorted(pools.items()):
pools[_id] = _DynamicPool(_id, pool)
pools[_id] = _DynamicPool(_id, pool, self._value_type)
# rules
try:
@ -792,20 +792,24 @@ class _TargetValue(str):
reasons.append('empty value')
elif not data:
reasons.append('missing value')
# NOTE: FQDN complains if the data it receives isn't a str, it doesn't
# allow unicode... This is likely specific to 2.7
elif not FQDN(str(data), allow_underscores=True).is_valid:
reasons.append(f'{_type} value "{data}" is not a valid FQDN')
elif not data.endswith('.'):
reasons.append(f'{_type} value "{data}" missing trailing .')
else:
data = idna_encode(data)
if not FQDN(str(data), allow_underscores=True).is_valid:
reasons.append(f'{_type} value "{data}" is not a valid FQDN')
elif not data.endswith('.'):
reasons.append(f'{_type} value "{data}" missing trailing .')
return reasons
@classmethod
def process(cls, value):
if value:
return cls(value.lower())
return cls(value)
return None
def __new__(cls, v):
v = idna_encode(v)
return super().__new__(cls, v)
class CnameValue(_TargetValue):
pass
@ -1285,7 +1289,11 @@ class MxValue(EqualityTupleMixin, dict):
reasons.append(f'invalid preference "{value["preference"]}"')
exchange = None
try:
exchange = str(value.get('exchange', None) or value['value'])
exchange = value.get('exchange', None) or value['value']
if not exchange:
reasons.append('missing exchange')
continue
exchange = idna_encode(exchange)
if (
exchange != '.'
and not FQDN(exchange, allow_underscores=True).is_valid
@ -1316,7 +1324,7 @@ class MxValue(EqualityTupleMixin, dict):
except KeyError:
exchange = value['value']
super().__init__(
{'preference': int(preference), 'exchange': exchange.lower()}
{'preference': int(preference), 'exchange': idna_encode(exchange)}
)
@property
@ -1500,7 +1508,8 @@ class _NsValue(str):
data = (data,)
reasons = []
for value in data:
if not FQDN(str(value), allow_underscores=True).is_valid:
value = idna_encode(value)
if not FQDN(value, allow_underscores=True).is_valid:
reasons.append(
f'Invalid NS value "{value}" is not a valid FQDN.'
)
@ -1512,6 +1521,10 @@ class _NsValue(str):
def process(cls, values):
return [cls(v) for v in values]
def __new__(cls, v):
v = idna_encode(v)
return super().__new__(cls, v)
class NsRecord(ValuesMixin, Record):
_type = 'NS'
@ -1732,11 +1745,15 @@ class SrvValue(EqualityTupleMixin, dict):
reasons.append(f'invalid port "{value["port"]}"')
try:
target = value['target']
if not target:
reasons.append('missing target')
continue
target = idna_encode(target)
if not target.endswith('.'):
reasons.append(f'SRV value "{target}" missing trailing .')
if (
target != '.'
and not FQDN(str(target), allow_underscores=True).is_valid
and not FQDN(target, allow_underscores=True).is_valid
):
reasons.append(
f'Invalid SRV target "{target}" is not a valid FQDN.'
@ -1755,7 +1772,7 @@ class SrvValue(EqualityTupleMixin, dict):
'priority': int(value['priority']),
'weight': int(value['weight']),
'port': int(value['port']),
'target': value['target'].lower(),
'target': idna_encode(value['target']),
}
)


+ 169
- 15
tests/test_octodns_record.py View File

@ -16,6 +16,7 @@ from octodns.record import (
Create,
Delete,
GeoValue,
Ipv4Address,
LocRecord,
LocValue,
MxRecord,
@ -94,6 +95,81 @@ class TestRecord(TestCase):
self.assertTrue(f'{encoded}.{zone.name}', record.fqdn)
self.assertTrue(f'{utf8}.{zone.decoded_name}', record.decoded_fqdn)
def test_utf8_values(self):
zone = Zone('unit.tests.', [])
utf8 = 'гэрбүл.mn.'
encoded = idna_encode(utf8)
# ALIAS
record = Record.new(
zone, '', {'type': 'ALIAS', 'ttl': 300, 'value': utf8}
)
self.assertEqual(encoded, record.value)
# CNAME
record = Record.new(
zone, 'cname', {'type': 'CNAME', 'ttl': 300, 'value': utf8}
)
self.assertEqual(encoded, record.value)
# DNAME
record = Record.new(
zone, 'dname', {'type': 'DNAME', 'ttl': 300, 'value': utf8}
)
self.assertEqual(encoded, record.value)
# MX
record = Record.new(
zone,
'mx',
{
'type': 'MX',
'ttl': 300,
'value': {'preference': 10, 'exchange': utf8},
},
)
self.assertEqual(
MxValue({'preference': 10, 'exchange': encoded}), record.values[0]
)
# NS
record = Record.new(
zone, 'ns', {'type': 'NS', 'ttl': 300, 'value': utf8}
)
self.assertEqual(encoded, record.values[0])
# PTR
another_utf8 = 'niño.mx.'
another_encoded = idna_encode(another_utf8)
record = Record.new(
zone,
'ptr',
{'type': 'PTR', 'ttl': 300, 'values': [utf8, another_utf8]},
)
self.assertEqual([encoded, another_encoded], record.values)
# SRV
record = Record.new(
zone,
'_srv._tcp',
{
'type': 'SRV',
'ttl': 300,
'value': {
'priority': 0,
'weight': 10,
'port': 80,
'target': utf8,
},
},
)
self.assertEqual(
SrvValue(
{'priority': 0, 'weight': 10, 'port': 80, 'target': encoded}
),
record.values[0],
)
def test_alias_lowering_value(self):
upper_record = AliasRecord(
self.zone,
@ -995,16 +1071,6 @@ class TestRecord(TestCase):
self.assertEqual(a_values[0]['weight'], a.values[0].weight)
self.assertEqual(a_values[0]['port'], a.values[0].port)
self.assertEqual(a_values[0]['target'], a.values[0].target)
from pprint import pprint
pprint(
{
'a_values': a_values,
'self': a_data,
'other': a.data,
'a.values': a.values,
}
)
self.assertEqual(a_data, a.data)
b_value = SrvValue(
@ -2597,7 +2663,7 @@ class TestRecordValidation(TestCase):
)
self.assertEqual(['missing value'], ctx.exception.reasons)
def test_CNAME(self):
def test_cname_validation(self):
# doesn't blow up
Record.new(
self.zone,
@ -3146,6 +3212,19 @@ class TestRecordValidation(TestCase):
ctx.exception.reasons,
)
# if exchange doesn't exist value can not be None/falsey
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'MX',
'ttl': 600,
'value': {'preference': 10, 'value': ''},
},
)
self.assertEqual(['missing exchange'], ctx.exception.reasons)
# exchange can be a single `.`
record = Record.new(
self.zone,
@ -3641,6 +3720,24 @@ class TestRecordValidation(TestCase):
ctx.exception.reasons,
)
# falsey target
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'_srv._tcp',
{
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 2,
'port': 3,
'target': '',
},
},
)
self.assertEqual(['missing target'], ctx.exception.reasons)
# target must be a valid FQDN
with self.assertRaises(ValidationError) as ctx:
Record.new(
@ -5326,7 +5423,6 @@ class TestDynamicRecords(TestCase):
'pools': {
'one': {'values': [{'value': '3.3.3.3'}]},
'two': {
# Testing out of order value sorting here
'values': [{'value': '5.5.5.5'}, {'value': '4.4.4.4'}]
},
'three': {
@ -5354,9 +5450,12 @@ class TestDynamicRecords(TestCase):
)
def test_dynamic_eqs(self):
pool_one = _DynamicPool('one', {'values': [{'value': '1.2.3.4'}]})
pool_two = _DynamicPool('two', {'values': [{'value': '1.2.3.5'}]})
pool_one = _DynamicPool(
'one', {'values': [{'value': '1.2.3.4'}]}, Ipv4Address
)
pool_two = _DynamicPool(
'two', {'values': [{'value': '1.2.3.5'}]}, Ipv4Address
)
self.assertEqual(pool_one, pool_one)
self.assertNotEqual(pool_one, pool_two)
self.assertNotEqual(pool_one, 42)
@ -5375,6 +5474,61 @@ class TestDynamicRecords(TestCase):
self.assertNotEqual(dynamic, other)
self.assertNotEqual(dynamic, 42)
def test_dynamic_cname_idna(self):
a_utf8 = 'natación.mx.'
a_encoded = idna_encode(a_utf8)
b_utf8 = 'гэрбүл.mn.'
b_encoded = idna_encode(b_utf8)
cname_data = {
'dynamic': {
'pools': {
'one': {
# Testing out of order value sorting here
'values': [
{'value': 'b.unit.tests.'},
{'value': 'a.unit.tests.'},
]
},
'two': {
'values': [
# some utf8 values we expect to be idna encoded
{'weight': 10, 'value': a_utf8},
{'weight': 12, 'value': b_utf8},
]
},
},
'rules': [
{'geos': ['NA-US-CA'], 'pool': 'two'},
{'pool': 'one'},
],
},
'type': 'CNAME',
'ttl': 60,
'value': a_utf8,
}
cname = Record.new(self.zone, 'cname', cname_data)
self.assertEqual(a_encoded, cname.value)
self.assertEqual(
{
'fallback': None,
'values': [
{'weight': 1, 'value': 'a.unit.tests.', 'status': 'obey'},
{'weight': 1, 'value': 'b.unit.tests.', 'status': 'obey'},
],
},
cname.dynamic.pools['one'].data,
)
self.assertEqual(
{
'fallback': None,
'values': [
{'weight': 12, 'value': b_encoded, 'status': 'obey'},
{'weight': 10, 'value': a_encoded, 'status': 'obey'},
],
},
cname.dynamic.pools['two'].data,
)
class TestChanges(TestCase):
zone = Zone('unit.tests.', [])


Loading…
Cancel
Save