Browse Source

Merge in main, remove parse_rdata_text usage from validate & cstor

pull/930/head
Ross McFarland 3 years ago
parent
commit
dbd40fb20a
No known key found for this signature in database GPG Key ID: 943B179E15D3B22A
3 changed files with 290 additions and 226 deletions
  1. +2
    -0
      CHANGELOG.md
  2. +32
    -91
      octodns/record/__init__.py
  3. +256
    -135
      tests/test_octodns_record.py

+ 2
- 0
CHANGELOG.md View File

@ -11,6 +11,8 @@
decoded form. Both forms should be accepted in command line arguments.
Providers may need to be updated to display the decoded form in their logs,
until then they'd display the IDNA version.
* IDNA value support for Record types that hold FQDNs: ALIAS, CNAME, DNAME, PTR,
MX, NS, and SRV.
* Support for configuring global processors that apply to all zones with
`manager.processors`


+ 32
- 91
octodns/record/__init__.py View File

@ -531,12 +531,12 @@ class ValueMixin(object):
class _DynamicPool(object):
log = getLogger('_DynamicPool')
def __init__(self, _id, data):
def __init__(self, _id, data, value_type):
self._id = _id
values = [
{
'value': d['value'],
'value': value_type(d['value']),
'weight': d.get('weight', 1),
'status': d.get('status', 'obey'),
}
@ -813,7 +813,7 @@ class _DynamicMixin(object):
pools = {}
for _id, pool in sorted(pools.items()):
pools[_id] = _DynamicPool(_id, pool)
pools[_id] = _DynamicPool(_id, pool, self._value_type)
# rules
try:
@ -866,27 +866,29 @@ class _TargetValue(str):
@classmethod
def validate(cls, data, _type):
# no need to call parse_rdata_text since it's a noop
reasons = []
if data == '':
reasons.append('empty value')
elif not data:
reasons.append('missing value')
# NOTE: FQDN complains if the data it receives isn't a str, it doesn't
# allow unicode... This is likely specific to 2.7
elif not FQDN(str(data), allow_underscores=True).is_valid:
reasons.append(f'{_type} value "{data}" is not a valid FQDN')
elif not data.endswith('.'):
reasons.append(f'{_type} value "{data}" missing trailing .')
else:
data = idna_encode(data)
if not FQDN(str(data), allow_underscores=True).is_valid:
reasons.append(f'{_type} value "{data}" is not a valid FQDN')
elif not data.endswith('.'):
reasons.append(f'{_type} value "{data}" missing trailing .')
return reasons
@classmethod
def process(cls, value):
# no need to call parse_rdata_text since it's a noop
if value:
return cls(value.lower())
return cls(value)
return None
def __new__(cls, v):
v = idna_encode(v)
return super().__new__(cls, v)
@property
def rdata_text(self):
return self
@ -913,7 +915,6 @@ class _IpAddress(str):
return ['missing value(s)']
reasons = []
for value in data:
# no need to call parse_rdata_text here as it's a noop
if value == '':
reasons.append('empty value')
elif value is None:
@ -936,7 +937,6 @@ class _IpAddress(str):
return [cls(v) if v != '' else '' for v in values]
def __new__(cls, v):
# no need to call parse_rdata_text here as it's a noop
v = str(cls._address_type(v))
return super().__new__(cls, v)
@ -1012,14 +1012,6 @@ class CaaValue(EqualityTupleMixin, dict):
data = (data,)
reasons = []
for value in data:
if isinstance(value, str):
# it's hopefully RR formatted, give parsing a try
try:
value = cls.parse_rdata_text(value)
except RrParseError as e:
reasons.append(str(e))
# not a dict so no point in continuing
continue
try:
flags = int(value.get('flags', 0))
if flags < 0 or flags > 255:
@ -1038,8 +1030,6 @@ class CaaValue(EqualityTupleMixin, dict):
return [cls(v) for v in values]
def __init__(self, value):
if isinstance(value, str):
value = self.parse_rdata_text(value)
super().__init__(
{
'flags': int(value.get('flags', 0)),
@ -1224,14 +1214,6 @@ class LocValue(EqualityTupleMixin, dict):
data = (data,)
reasons = []
for value in data:
if isinstance(value, str):
# it's hopefully RR formatted, give parsing a try
try:
value = cls.parse_rdata_text(value)
except RrParseError as e:
reasons.append(str(e))
# not a dict so no point in continuing
continue
for key in int_keys:
try:
int(value[key])
@ -1304,8 +1286,6 @@ class LocValue(EqualityTupleMixin, dict):
return [cls(v) for v in values]
def __init__(self, value):
if isinstance(value, str):
value = self.parse_rdata_text(value)
super().__init__(
{
'lat_degrees': int(value['lat_degrees']),
@ -1499,14 +1479,6 @@ class MxValue(EqualityTupleMixin, dict):
data = (data,)
reasons = []
for value in data:
if isinstance(value, str):
# it's hopefully RR formatted, give parsing a try
try:
value = cls.parse_rdata_text(value)
except RrParseError as e:
reasons.append(str(e))
# not a dict so no point in continuing
continue
try:
try:
int(value['preference'])
@ -1518,7 +1490,11 @@ class MxValue(EqualityTupleMixin, dict):
reasons.append(f'invalid preference "{value["preference"]}"')
exchange = None
try:
exchange = str(value.get('exchange', None) or value['value'])
exchange = value.get('exchange', None) or value['value']
if not exchange:
reasons.append('missing exchange')
continue
exchange = idna_encode(exchange)
if (
exchange != '.'
and not FQDN(exchange, allow_underscores=True).is_valid
@ -1538,8 +1514,6 @@ class MxValue(EqualityTupleMixin, dict):
return [cls(v) for v in values]
def __init__(self, value):
if isinstance(value, str):
value = self.parse_rdata_text(value)
# RFC1035 says preference, half the providers use priority
try:
preference = value['preference']
@ -1551,7 +1525,7 @@ class MxValue(EqualityTupleMixin, dict):
except KeyError:
exchange = value['value']
super().__init__(
{'preference': int(preference), 'exchange': exchange.lower()}
{'preference': int(preference), 'exchange': idna_encode(exchange)}
)
@property
@ -1632,14 +1606,6 @@ class NaptrValue(EqualityTupleMixin, dict):
data = (data,)
reasons = []
for value in data:
if isinstance(value, str):
# it's hopefully RR formatted, give parsing a try
try:
value = cls.parse_rdata_text(value)
except RrParseError as e:
reasons.append(str(e))
# not a dict so no point in continuing
continue
try:
int(value['order'])
except KeyError:
@ -1671,8 +1637,6 @@ class NaptrValue(EqualityTupleMixin, dict):
return [cls(v) for v in values]
def __init__(self, value):
if isinstance(value, str):
value = self.parse_rdata_text(value)
super().__init__(
{
'order': int(value['order']),
@ -1784,8 +1748,8 @@ class _NsValue(str):
data = (data,)
reasons = []
for value in data:
# no need to consider parse_rdata_text as it's a noop
if not FQDN(str(value), allow_underscores=True).is_valid:
value = idna_encode(value)
if not FQDN(value, allow_underscores=True).is_valid:
reasons.append(
f'Invalid NS value "{value}" is not a valid FQDN.'
)
@ -1797,6 +1761,10 @@ class _NsValue(str):
def process(cls, values):
return [cls(v) for v in values]
def __new__(cls, v):
v = idna_encode(v)
return super().__new__(cls, v)
@property
def rdata_text(self):
return self
@ -1876,14 +1844,6 @@ class SshfpValue(EqualityTupleMixin, dict):
data = (data,)
reasons = []
for value in data:
if isinstance(value, str):
# it's hopefully RR formatted, give parsing a try
try:
value = cls.parse_rdata_text(value)
except RrParseError as e:
reasons.append(str(e))
# not a dict so no point in continuing
continue
try:
algorithm = int(value['algorithm'])
if algorithm not in cls.VALID_ALGORITHMS:
@ -1913,8 +1873,6 @@ class SshfpValue(EqualityTupleMixin, dict):
return [cls(v) for v in values]
def __init__(self, value):
if isinstance(value, str):
value = self.parse_rdata_text(value)
super().__init__(
{
'algorithm': int(value['algorithm']),
@ -2012,7 +1970,6 @@ class _ChunkedValue(str):
data = (data,)
reasons = []
for value in data:
# no need to try parse_rdata_text here as it's a noop
if cls._unescaped_semicolon_re.search(value):
reasons.append(f'unescaped ; in "{value}"')
return reasons
@ -2071,14 +2028,6 @@ class SrvValue(EqualityTupleMixin, dict):
data = (data,)
reasons = []
for value in data:
if isinstance(value, str):
# it's hopefully RR formatted, give parsing a try
try:
value = cls.parse_rdata_text(value)
except RrParseError as e:
reasons.append(str(e))
# not a dict so no point in continuing
continue
# TODO: validate algorithm and fingerprint_type values
try:
int(value['priority'])
@ -2100,11 +2049,15 @@ class SrvValue(EqualityTupleMixin, dict):
reasons.append(f'invalid port "{value["port"]}"')
try:
target = value['target']
if not target:
reasons.append('missing target')
continue
target = idna_encode(target)
if not target.endswith('.'):
reasons.append(f'SRV value "{target}" missing trailing .')
if (
target != '.'
and not FQDN(str(target), allow_underscores=True).is_valid
and not FQDN(target, allow_underscores=True).is_valid
):
reasons.append(
f'Invalid SRV target "{target}" is not a valid FQDN.'
@ -2118,14 +2071,12 @@ class SrvValue(EqualityTupleMixin, dict):
return [cls(v) for v in values]
def __init__(self, value):
if isinstance(value, str):
value = self.parse_rdata_text(value)
super().__init__(
{
'priority': int(value['priority']),
'weight': int(value['weight']),
'port': int(value['port']),
'target': value['target'].lower(),
'target': idna_encode(value['target']),
}
)
@ -2229,14 +2180,6 @@ class TlsaValue(EqualityTupleMixin, dict):
data = (data,)
reasons = []
for value in data:
if isinstance(value, str):
# it's hopefully RR formatted, give parsing a try
try:
value = cls.parse_rdata_text(value)
except RrParseError as e:
reasons.append(str(e))
# not a dict so no point in continuing
continue
try:
certificate_usage = int(value.get('certificate_usage', 0))
if certificate_usage < 0 or certificate_usage > 3:
@ -2280,8 +2223,6 @@ class TlsaValue(EqualityTupleMixin, dict):
return [cls(v) for v in values]
def __init__(self, value):
if isinstance(value, str):
value = self.parse_rdata_text(value)
super().__init__(
{
'certificate_usage': int(value.get('certificate_usage', 0)),


+ 256
- 135
tests/test_octodns_record.py View File

@ -99,6 +99,81 @@ class TestRecord(TestCase):
self.assertTrue(f'{encoded}.{zone.name}', record.fqdn)
self.assertTrue(f'{utf8}.{zone.decoded_name}', record.decoded_fqdn)
def test_utf8_values(self):
zone = Zone('unit.tests.', [])
utf8 = 'гэрбүл.mn.'
encoded = idna_encode(utf8)
# ALIAS
record = Record.new(
zone, '', {'type': 'ALIAS', 'ttl': 300, 'value': utf8}
)
self.assertEqual(encoded, record.value)
# CNAME
record = Record.new(
zone, 'cname', {'type': 'CNAME', 'ttl': 300, 'value': utf8}
)
self.assertEqual(encoded, record.value)
# DNAME
record = Record.new(
zone, 'dname', {'type': 'DNAME', 'ttl': 300, 'value': utf8}
)
self.assertEqual(encoded, record.value)
# MX
record = Record.new(
zone,
'mx',
{
'type': 'MX',
'ttl': 300,
'value': {'preference': 10, 'exchange': utf8},
},
)
self.assertEqual(
MxValue({'preference': 10, 'exchange': encoded}), record.values[0]
)
# NS
record = Record.new(
zone, 'ns', {'type': 'NS', 'ttl': 300, 'value': utf8}
)
self.assertEqual(encoded, record.values[0])
# PTR
another_utf8 = 'niño.mx.'
another_encoded = idna_encode(another_utf8)
record = Record.new(
zone,
'ptr',
{'type': 'PTR', 'ttl': 300, 'values': [utf8, another_utf8]},
)
self.assertEqual([encoded, another_encoded], record.values)
# SRV
record = Record.new(
zone,
'_srv._tcp',
{
'type': 'SRV',
'ttl': 300,
'value': {
'priority': 0,
'weight': 10,
'port': 80,
'target': utf8,
},
},
)
self.assertEqual(
SrvValue(
{'priority': 0, 'weight': 10, 'port': 80, 'target': encoded}
),
record.values[0],
)
def test_alias_lowering_value(self):
upper_record = AliasRecord(
self.zone,
@ -238,9 +313,6 @@ class TestRecord(TestCase):
):
self.assertEqual(s, Ipv4Address.parse_rdata_text(s))
# since we're a noop there's no need/way to check whether validate or
# __init__ call parse_rdata_text
zone = Zone('unit.tests.', [])
a = ARecord(zone, 'a', {'ttl': 42, 'value': '1.2.3.4'})
self.assertEqual('1.2.3.4', a.values[0].rdata_text)
@ -429,9 +501,6 @@ class TestRecord(TestCase):
):
self.assertEqual(s, _TargetValue.parse_rdata_text(s))
# since we're a noop there's no need/way to check whether validate or
# __init__ call parse_rdata_text
zone = Zone('unit.tests.', [])
a = AliasRecord(zone, 'a', {'ttl': 42, 'value': 'some.target.'})
self.assertEqual('some.target.', a.value.rdata_text)
@ -525,32 +594,17 @@ class TestRecord(TestCase):
CaaValue.parse_rdata_text('0 tag 99148c81'),
)
# make sure that validate is using parse_rdata_text when passed string
# value(s)
reasons = CaaRecord.validate(
'caa', 'caa.unit.tests.', {'ttl': 32, 'value': ''}
)
self.assertEqual(['failed to parse string value as RR text'], reasons)
reasons = CaaRecord.validate(
'caa', 'caa.unit.tests.', {'ttl': 32, 'values': ['nope']}
)
self.assertEqual(['failed to parse string value as RR text'], reasons)
reasons = CaaRecord.validate(
'caa', 'caa.unit.tests.', {'ttl': 32, 'value': '0 tag 99148c81'}
)
self.assertFalse(reasons)
# make sure that the cstor is using parse_rdata_text
zone = Zone('unit.tests.', [])
a = CaaRecord(zone, 'caa', {'ttl': 32, 'value': '0 tag 99148c81'})
self.assertEqual(0, a.values[0].flags)
self.assertEqual('tag', a.values[0].tag)
self.assertEqual('99148c81', a.values[0].value)
self.assertEqual('0 tag 99148c81', a.values[0].rdata_text)
a = CaaRecord(
zone,
'caa',
{'ttl': 32, 'values': ['1 tag1 99148c81', '2 tag2 99148c44']},
{
'ttl': 32,
'values': [
{'flags': 1, 'tag': 'tag1', 'value': '99148c81'},
{'flags': 2, 'tag': 'tag2', 'value': '99148c44'},
],
},
)
self.assertEqual(1, a.values[0].flags)
self.assertEqual('tag1', a.values[0].tag)
@ -711,19 +765,30 @@ class TestRecord(TestCase):
LocValue.parse_rdata_text(s),
)
# make sure validate is using parse_rdata_text when passed string values
reasons = LocRecord.validate(
'loc', 'loc.unit.tests', {'ttl': 42, 'value': ''}
)
self.assertEqual(['failed to parse string value as RR text'], reasons)
reasons = LocRecord.validate(
'loc', 'loc.unit.tests', {'ttl': 42, 'value': s}
)
self.assertFalse(reasons)
# make sure that the cstor is using parse_rdata_text
zone = Zone('unit.tests.', [])
a = LocRecord(zone, 'mx', {'ttl': 32, 'value': s})
a = LocRecord(
zone,
'mx',
{
'type': 'LOC',
'ttl': 42,
'value': {
'altitude': 6.6,
'lat_degrees': 0,
'lat_direction': 'N',
'lat_minutes': 1,
'lat_seconds': 2.2,
'long_degrees': 3,
'long_direction': 'E',
'long_minutes': 4,
'long_seconds': 5.5,
'precision_horz': 8.8,
'precision_vert': 9.9,
'size': 7.7,
},
},
)
self.assertEqual(0, a.values[0].lat_degrees)
self.assertEqual(1, a.values[0].lat_minutes)
self.assertEqual(2.2, a.values[0].lat_seconds)
@ -815,33 +880,16 @@ class TestRecord(TestCase):
MxValue.parse_rdata_text('10 mx.unit.tests.'),
)
# make sure that validate is using parse_rdata_text when passed string
# value(s)
reasons = MxRecord.validate(
'mx', 'mx.unit.tests.', {'ttl': 32, 'value': ''}
)
self.assertEqual(['failed to parse string value as RR text'], reasons)
reasons = MxRecord.validate(
'mx', 'mx.unit.tests.', {'ttl': 32, 'values': ['nope']}
)
self.assertEqual(['failed to parse string value as RR text'], reasons)
reasons = MxRecord.validate(
'mx', 'mx.unit.tests.', {'ttl': 32, 'value': '10 mail.unit.tests.'}
)
self.assertFalse(reasons)
# make sure that the cstor is using parse_rdata_text
zone = Zone('unit.tests.', [])
a = MxRecord(zone, 'mx', {'ttl': 32, 'value': '10 mail.unit.tests.'})
self.assertEqual(10, a.values[0].preference)
self.assertEqual('mail.unit.tests.', a.values[0].exchange)
self.assertEqual('10 mail.unit.tests.', a.values[0].rdata_text)
a = MxRecord(
zone,
'mx',
{
'ttl': 32,
'values': ['11 mail1.unit.tests.', '12 mail2.unit.tests.'],
'values': [
{'preference': 11, 'exchange': 'mail1.unit.tests.'},
{'preference': 12, 'exchange': 'mail2.unit.tests.'},
],
},
)
self.assertEqual(11, a.values[0].preference)
@ -1181,33 +1229,30 @@ class TestRecord(TestCase):
NaptrValue.parse_rdata_text('1 2 three four five six'),
)
# make sure that validate is using parse_rdata_text when passed string
# value(s)
reasons = NaptrRecord.validate(
'naptr', 'naptr.unit.tests.', {'ttl': 32, 'value': ''}
)
self.assertEqual(['failed to parse string value as RR text'], reasons)
reasons = NaptrRecord.validate(
'naptr', 'naptr.unit.tests.', {'ttl': 32, 'value': ['']}
)
self.assertEqual(['failed to parse string value as RR text'], reasons)
reasons = NaptrRecord.validate(
'naptr',
'naptr.unit.tests.',
{'ttl': 32, 'value': ['1 2 S service regexp replacement']},
)
self.assertFalse(reasons)
# make sure that the cstor is using parse_rdata_text
zone = Zone('unit.tests.', [])
s = '1 2 S service regexp replacement'
a = NaptrRecord(zone, 'naptr', {'ttl': 32, 'value': s})
a = NaptrRecord(
zone,
'naptr',
{
'ttl': 32,
'value': {
'order': 1,
'preference': 2,
'flags': 'S',
'service': 'service',
'regexp': 'regexp',
'replacement': 'replacement',
},
},
)
self.assertEqual(1, a.values[0].order)
self.assertEqual(2, a.values[0].preference)
self.assertEqual('S', a.values[0].flags)
self.assertEqual('service', a.values[0].service)
self.assertEqual('regexp', a.values[0].regexp)
self.assertEqual('replacement', a.values[0].replacement)
s = '1 2 S service regexp replacement'
self.assertEqual(s, a.values[0].rdata_text)
def test_ns(self):
@ -1241,9 +1286,6 @@ class TestRecord(TestCase):
):
self.assertEqual(s, _NsValue.parse_rdata_text(s))
# since we're a noop there's no need/way to check whether validate or
# __init__ call parse_rdata_text
zone = Zone('unit.tests.', [])
a = NsRecord(zone, 'a', {'ttl': 42, 'value': 'some.target.'})
self.assertEqual('some.target.', a.values[0].rdata_text)
@ -1346,24 +1388,19 @@ class TestRecord(TestCase):
SshfpValue.parse_rdata_text('1 2 00479b27'),
)
# make sure that validate is using parse_rdata_text when passed string
# value(s)
reasons = SshfpRecord.validate(
'sshfp', 'sshfp.unit.tests.', {'ttl': 32, 'value': ''}
)
self.assertEqual(['failed to parse string value as RR text'], reasons)
reasons = SshfpRecord.validate(
'sshfp', 'sshfp.unit.tests.', {'ttl': 32, 'values': ['nope']}
)
self.assertEqual(['failed to parse string value as RR text'], reasons)
reasons = SshfpRecord.validate(
'sshfp', 'sshfp.unit.tests.', {'ttl': 32, 'value': '1 2 00479b27'}
)
self.assertFalse(reasons)
# make sure that the cstor is using parse_rdata_text
zone = Zone('unit.tests.', [])
a = SshfpRecord(zone, 'sshfp', {'ttl': 32, 'value': '1 2 00479b27'})
a = SshfpRecord(
zone,
'sshfp',
{
'ttl': 32,
'value': {
'algorithm': 1,
'fingerprint_type': 2,
'fingerprint': '00479b27',
},
},
)
self.assertEqual(1, a.values[0].algorithm)
self.assertEqual(2, a.values[0].fingerprint_type)
self.assertEqual('00479b27', a.values[0].fingerprint)
@ -1374,7 +1411,7 @@ class TestRecord(TestCase):
b_value = 'spf1 -other'
self.assertMultipleValues(SpfRecord, a_values, b_value)
def test_chunked_value_rr_text(self):
def test_chunked_value_rdata_text(self):
for s in (
None,
'',
@ -1464,7 +1501,7 @@ class TestRecord(TestCase):
# __repr__ doesn't blow up
a.__repr__()
def test_srv_value_rr_text(self):
def test_srv_value_rdata_text(self):
# empty string won't parse
with self.assertRaises(RrParseError):
@ -1508,23 +1545,19 @@ class TestRecord(TestCase):
SrvValue.parse_rdata_text('1 2 3 srv.unit.tests.'),
)
# make sure that validate is using parse_rdata_text when passed string
# value(s)
reasons = SrvRecord.validate(
'_srv._tcp', '_srv._tcp.unit.tests.', {'ttl': 32, 'value': ''}
)
self.assertEqual(['failed to parse string value as RR text'], reasons)
reasons = SrvRecord.validate(
'_srv._tcp',
'_srv._tcp.unit.tests.',
{'ttl': 32, 'value': '1 2 3 srv.unit.tests.'},
)
self.assertFalse(reasons)
# make sure that the cstor is using parse_rdata_text
zone = Zone('unit.tests.', [])
a = SrvRecord(
zone, '_srv._tcp', {'ttl': 32, 'value': '1 2 3 srv.unit.tests.'}
zone,
'_srv._tcp',
{
'ttl': 32,
'value': {
'priority': 1,
'weight': 2,
'port': 3,
'target': 'srv.unit.tests.',
},
},
)
self.assertEqual(1, a.values[0].priority)
self.assertEqual(2, a.values[0].weight)
@ -1632,7 +1665,7 @@ class TestRecord(TestCase):
# __repr__ doesn't blow up
a.__repr__()
def test_tsla_value_rr_text(self):
def test_tsla_value_rdata_text(self):
# empty string won't parse
with self.assertRaises(RrParseError):
@ -1676,20 +1709,20 @@ class TestRecord(TestCase):
TlsaValue.parse_rdata_text('1 2 3 abcd'),
)
# make sure that validate is using parse_rdata_text when passed string
# value(s)
reasons = TlsaRecord.validate(
'tlsa', 'tlsa.unit.tests.', {'ttl': 32, 'value': ''}
)
self.assertEqual(['failed to parse string value as RR text'], reasons)
reasons = TlsaRecord.validate(
'tlsa', 'tlsa.unit.tests.', {'ttl': 32, 'value': '2 1 0 abcd'}
)
self.assertFalse(reasons)
# make sure that the cstor is using parse_rdata_text
zone = Zone('unit.tests.', [])
a = TlsaRecord(zone, 'tlsa', {'ttl': 32, 'value': '2 1 0 abcd'})
a = TlsaRecord(
zone,
'tlsa',
{
'ttl': 32,
'value': {
'certificate_usage': 2,
'selector': 1,
'matching_type': 0,
'certificate_association_data': 'abcd',
},
},
)
self.assertEqual(2, a.values[0].certificate_usage)
self.assertEqual(1, a.values[0].selector)
self.assertEqual(0, a.values[0].matching_type)
@ -3165,7 +3198,7 @@ class TestRecordValidation(TestCase):
)
self.assertEqual(['missing value'], ctx.exception.reasons)
def test_CNAME(self):
def test_cname_validation(self):
# doesn't blow up
Record.new(
self.zone,
@ -3714,6 +3747,19 @@ class TestRecordValidation(TestCase):
ctx.exception.reasons,
)
# if exchange doesn't exist value can not be None/falsey
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'MX',
'ttl': 600,
'value': {'preference': 10, 'value': ''},
},
)
self.assertEqual(['missing exchange'], ctx.exception.reasons)
# exchange can be a single `.`
record = Record.new(
self.zone,
@ -4209,6 +4255,24 @@ class TestRecordValidation(TestCase):
ctx.exception.reasons,
)
# falsey target
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'_srv._tcp',
{
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 2,
'port': 3,
'target': '',
},
},
)
self.assertEqual(['missing target'], ctx.exception.reasons)
# target must be a valid FQDN
with self.assertRaises(ValidationError) as ctx:
Record.new(
@ -5894,7 +5958,6 @@ class TestDynamicRecords(TestCase):
'pools': {
'one': {'values': [{'value': '3.3.3.3'}]},
'two': {
# Testing out of order value sorting here
'values': [{'value': '5.5.5.5'}, {'value': '4.4.4.4'}]
},
'three': {
@ -5922,9 +5985,12 @@ class TestDynamicRecords(TestCase):
)
def test_dynamic_eqs(self):
pool_one = _DynamicPool('one', {'values': [{'value': '1.2.3.4'}]})
pool_two = _DynamicPool('two', {'values': [{'value': '1.2.3.5'}]})
pool_one = _DynamicPool(
'one', {'values': [{'value': '1.2.3.4'}]}, Ipv4Address
)
pool_two = _DynamicPool(
'two', {'values': [{'value': '1.2.3.5'}]}, Ipv4Address
)
self.assertEqual(pool_one, pool_one)
self.assertNotEqual(pool_one, pool_two)
self.assertNotEqual(pool_one, 42)
@ -5943,6 +6009,61 @@ class TestDynamicRecords(TestCase):
self.assertNotEqual(dynamic, other)
self.assertNotEqual(dynamic, 42)
def test_dynamic_cname_idna(self):
a_utf8 = 'natación.mx.'
a_encoded = idna_encode(a_utf8)
b_utf8 = 'гэрбүл.mn.'
b_encoded = idna_encode(b_utf8)
cname_data = {
'dynamic': {
'pools': {
'one': {
# Testing out of order value sorting here
'values': [
{'value': 'b.unit.tests.'},
{'value': 'a.unit.tests.'},
]
},
'two': {
'values': [
# some utf8 values we expect to be idna encoded
{'weight': 10, 'value': a_utf8},
{'weight': 12, 'value': b_utf8},
]
},
},
'rules': [
{'geos': ['NA-US-CA'], 'pool': 'two'},
{'pool': 'one'},
],
},
'type': 'CNAME',
'ttl': 60,
'value': a_utf8,
}
cname = Record.new(self.zone, 'cname', cname_data)
self.assertEqual(a_encoded, cname.value)
self.assertEqual(
{
'fallback': None,
'values': [
{'weight': 1, 'value': 'a.unit.tests.', 'status': 'obey'},
{'weight': 1, 'value': 'b.unit.tests.', 'status': 'obey'},
],
},
cname.dynamic.pools['one'].data,
)
self.assertEqual(
{
'fallback': None,
'values': [
{'weight': 12, 'value': b_encoded, 'status': 'obey'},
{'weight': 10, 'value': a_encoded, 'status': 'obey'},
],
},
cname.dynamic.pools['two'].data,
)
class TestChanges(TestCase):
zone = Zone('unit.tests.', [])


Loading…
Cancel
Save