Browse Source

Merge branch 'main' into empty-super

pull/937/head
Ross McFarland 3 years ago
committed by GitHub
parent
commit
1e57595554
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 893 additions and 296 deletions
  1. +6
    -2
      CHANGELOG.md
  2. +1
    -1
      CONTRIBUTING.md
  3. +482
    -168
      octodns/record/__init__.py
  4. +5
    -0
      octodns/yaml.py
  5. +399
    -125
      tests/test_octodns_record.py

+ 6
- 2
CHANGELOG.md View File

@ -1,4 +1,4 @@
## v0.9.19 - 2022-??-?? - ???
## v0.9.20 - 2022-??-?? - ???
#### Noteworthy changes
@ -11,6 +11,8 @@
decoded form. Both forms should be accepted in command line arguments.
Providers may need to be updated to display the decoded form in their logs,
until then they'd display the IDNA version.
* IDNA value support for Record types that hold FQDNs: ALIAS, CNAME, DNAME, PTR,
MX, NS, and SRV.
* Support for configuring global processors that apply to all zones with
`manager.processors`
@ -30,8 +32,10 @@
* Add TtlRestrictionFilter processor for adding ttl restriction/checking
* NameAllowlistFilter & NameRejectlistFilter implementations to support
filtering on record names to include/exclude records from management.
* All Record values are now first class objects. This shouldn't be an externally
visible change, but will enable future improvements.
## v0.9.18 - 2022-08-14 - Subzone handling
## v0.9.19 - 2022-08-14 - Subzone handling
* Fixed issue with sub-zone handling introduced in 0.9.18


+ 1
- 1
CONTRIBUTING.md View File

@ -8,7 +8,7 @@ If you have questions, or you'd like to check with us before embarking on a majo
## How to contribute
This project uses the [GitHub Flow](https://guides.github.com/introduction/flow/). That means that the `master` branch is stable and new development is done in feature branches. Feature branches are merged into the `master` branch via a Pull Request.
This project uses the [GitHub Flow](https://guides.github.com/introduction/flow/). That means that the `main` branch is stable and new development is done in feature branches. Feature branches are merged into the `main` branch via a Pull Request.
0. Fork and clone the repository
0. Configure and install the dependencies: `./script/bootstrap`


+ 482
- 168
octodns/record/__init__.py View File

@ -2,7 +2,7 @@
#
#
from ipaddress import IPv4Address, IPv6Address
from ipaddress import IPv4Address as _IPv4Address, IPv6Address as _IPv6Address
from logging import getLogger
import re
@ -456,12 +456,12 @@ class ValueMixin(object):
class _DynamicPool(object):
log = getLogger('_DynamicPool')
def __init__(self, _id, data):
def __init__(self, _id, data, value_type):
self._id = _id
values = [
{
'value': d['value'],
'value': value_type(d['value']),
'weight': d.get('weight', 1),
'status': d.get('status', 'obey'),
}
@ -738,7 +738,7 @@ class _DynamicMixin(object):
pools = {}
for _id, pool in sorted(pools.items()):
pools[_id] = _DynamicPool(_id, pool)
pools[_id] = _DynamicPool(_id, pool, self._value_type)
# rules
try:
@ -784,7 +784,42 @@ class _DynamicMixin(object):
return super().__repr__()
class _IpList(object):
class _TargetValue(str):
@classmethod
def validate(cls, data, _type):
reasons = []
if data == '':
reasons.append('empty value')
elif not data:
reasons.append('missing value')
else:
data = idna_encode(data)
if not FQDN(str(data), allow_underscores=True).is_valid:
reasons.append(f'{_type} value "{data}" is not a valid FQDN')
elif not data.endswith('.'):
reasons.append(f'{_type} value "{data}" missing trailing .')
return reasons
@classmethod
def process(cls, value):
if value:
return cls(value)
return None
def __new__(cls, v):
v = idna_encode(v)
return super().__new__(cls, v)
class CnameValue(_TargetValue):
pass
class DnameValue(_TargetValue):
pass
class _IpAddress(str):
@classmethod
def validate(cls, data, _type):
if not isinstance(data, (list, tuple)):
@ -809,64 +844,37 @@ class _IpList(object):
def process(cls, values):
# Translating None into '' so that the list will be sortable in
# python3, get everything to str first
values = [str(v) if v is not None else '' for v in values]
values = [v if v is not None else '' for v in values]
# Now round trip all non-'' through the address type and back to a str
# to normalize the address representation.
return [str(cls._address_type(v)) if v != '' else '' for v in values]
class Ipv4List(_IpList):
_address_name = 'IPv4'
_address_type = IPv4Address
return [cls(v) if v != '' else '' for v in values]
class Ipv6List(_IpList):
_address_name = 'IPv6'
_address_type = IPv6Address
class _TargetValue(object):
@classmethod
def validate(cls, data, _type):
reasons = []
if data == '':
reasons.append('empty value')
elif not data:
reasons.append('missing value')
# NOTE: FQDN complains if the data it receives isn't a str, it doesn't
# allow unicode... This is likely specific to 2.7
elif not FQDN(str(data), allow_underscores=True).is_valid:
reasons.append(f'{_type} value "{data}" is not a valid FQDN')
elif not data.endswith('.'):
reasons.append(f'{_type} value "{data}" missing trailing .')
return reasons
@classmethod
def process(self, value):
if value:
return value.lower()
return value
def __new__(cls, v):
v = str(cls._address_type(v))
return super().__new__(cls, v)
class CnameValue(_TargetValue):
pass
class DnameValue(_TargetValue):
pass
class Ipv4Address(_IpAddress):
_address_type = _IPv4Address
_address_name = 'IPv4'
class ARecord(_DynamicMixin, _GeoMixin, Record):
_type = 'A'
_value_type = Ipv4List
_value_type = Ipv4Address
Record.register_type(ARecord)
class Ipv6Address(_IpAddress):
_address_type = _IPv6Address
_address_name = 'IPv6'
class AaaaRecord(_DynamicMixin, _GeoMixin, Record):
_type = 'AAAA'
_value_type = Ipv6List
_value_type = Ipv6Address
Record.register_type(AaaaRecord)
@ -892,7 +900,7 @@ class AliasRecord(ValueMixin, Record):
Record.register_type(AliasRecord)
class CaaValue(EqualityTupleMixin):
class CaaValue(EqualityTupleMixin, dict):
# https://tools.ietf.org/html/rfc6844#page-5
@classmethod
@ -916,16 +924,44 @@ class CaaValue(EqualityTupleMixin):
@classmethod
def process(cls, values):
return [CaaValue(v) for v in values]
return [cls(v) for v in values]
def __init__(self, value):
self.flags = int(value.get('flags', 0))
self.tag = value['tag']
self.value = value['value']
super().__init__(
{
'flags': int(value.get('flags', 0)),
'tag': value['tag'],
'value': value['value'],
}
)
@property
def flags(self):
return self['flags']
@flags.setter
def flags(self, value):
self['flags'] = value
@property
def tag(self):
return self['tag']
@tag.setter
def tag(self, value):
self['tag'] = value
@property
def value(self):
return self['value']
@value.setter
def value(self, value):
self['value'] = value
@property
def data(self):
return {'flags': self.flags, 'tag': self.tag, 'value': self.value}
return self
def _equality_tuple(self):
return (self.flags, self.tag, self.value)
@ -966,7 +1002,7 @@ class DnameRecord(_DynamicMixin, ValueMixin, Record):
Record.register_type(DnameRecord)
class LocValue(EqualityTupleMixin):
class LocValue(EqualityTupleMixin, dict):
# TODO: work out how to do defaults per RFC
@classmethod
@ -1062,38 +1098,125 @@ class LocValue(EqualityTupleMixin):
@classmethod
def process(cls, values):
return [LocValue(v) for v in values]
return [cls(v) for v in values]
def __init__(self, value):
self.lat_degrees = int(value['lat_degrees'])
self.lat_minutes = int(value['lat_minutes'])
self.lat_seconds = float(value['lat_seconds'])
self.lat_direction = value['lat_direction'].upper()
self.long_degrees = int(value['long_degrees'])
self.long_minutes = int(value['long_minutes'])
self.long_seconds = float(value['long_seconds'])
self.long_direction = value['long_direction'].upper()
self.altitude = float(value['altitude'])
self.size = float(value['size'])
self.precision_horz = float(value['precision_horz'])
self.precision_vert = float(value['precision_vert'])
super().__init__(
{
'lat_degrees': int(value['lat_degrees']),
'lat_minutes': int(value['lat_minutes']),
'lat_seconds': float(value['lat_seconds']),
'lat_direction': value['lat_direction'].upper(),
'long_degrees': int(value['long_degrees']),
'long_minutes': int(value['long_minutes']),
'long_seconds': float(value['long_seconds']),
'long_direction': value['long_direction'].upper(),
'altitude': float(value['altitude']),
'size': float(value['size']),
'precision_horz': float(value['precision_horz']),
'precision_vert': float(value['precision_vert']),
}
)
@property
def lat_degrees(self):
return self['lat_degrees']
@lat_degrees.setter
def lat_degrees(self, value):
self['lat_degrees'] = value
@property
def lat_minutes(self):
return self['lat_minutes']
@lat_minutes.setter
def lat_minutes(self, value):
self['lat_minutes'] = value
@property
def lat_seconds(self):
return self['lat_seconds']
@lat_seconds.setter
def lat_seconds(self, value):
self['lat_seconds'] = value
@property
def lat_direction(self):
return self['lat_direction']
@lat_direction.setter
def lat_direction(self, value):
self['lat_direction'] = value
@property
def long_degrees(self):
return self['long_degrees']
@long_degrees.setter
def long_degrees(self, value):
self['long_degrees'] = value
@property
def long_minutes(self):
return self['long_minutes']
@long_minutes.setter
def long_minutes(self, value):
self['long_minutes'] = value
@property
def long_seconds(self):
return self['long_seconds']
@long_seconds.setter
def long_seconds(self, value):
self['long_seconds'] = value
@property
def long_direction(self):
return self['long_direction']
@long_direction.setter
def long_direction(self, value):
self['long_direction'] = value
@property
def altitude(self):
return self['altitude']
@altitude.setter
def altitude(self, value):
self['altitude'] = value
@property
def size(self):
return self['size']
@size.setter
def size(self, value):
self['size'] = value
@property
def precision_horz(self):
return self['precision_horz']
@precision_horz.setter
def precision_horz(self, value):
self['precision_horz'] = value
@property
def precision_vert(self):
return self['precision_vert']
@precision_vert.setter
def precision_vert(self, value):
self['precision_vert'] = value
@property
def data(self):
return {
'lat_degrees': self.lat_degrees,
'lat_minutes': self.lat_minutes,
'lat_seconds': self.lat_seconds,
'lat_direction': self.lat_direction,
'long_degrees': self.long_degrees,
'long_minutes': self.long_minutes,
'long_seconds': self.long_seconds,
'long_direction': self.long_direction,
'altitude': self.altitude,
'size': self.size,
'precision_horz': self.precision_horz,
'precision_vert': self.precision_vert,
}
return self
def __hash__(self):
return hash(
@ -1148,7 +1271,7 @@ class LocRecord(ValuesMixin, Record):
Record.register_type(LocRecord)
class MxValue(EqualityTupleMixin):
class MxValue(EqualityTupleMixin, dict):
@classmethod
def validate(cls, data, _type):
if not isinstance(data, (list, tuple)):
@ -1166,7 +1289,11 @@ class MxValue(EqualityTupleMixin):
reasons.append(f'invalid preference "{value["preference"]}"')
exchange = None
try:
exchange = str(value.get('exchange', None) or value['value'])
exchange = value.get('exchange', None) or value['value']
if not exchange:
reasons.append('missing exchange')
continue
exchange = idna_encode(exchange)
if (
exchange != '.'
and not FQDN(exchange, allow_underscores=True).is_valid
@ -1183,7 +1310,7 @@ class MxValue(EqualityTupleMixin):
@classmethod
def process(cls, values):
return [MxValue(v) for v in values]
return [cls(v) for v in values]
def __init__(self, value):
# RFC1035 says preference, half the providers use priority
@ -1191,17 +1318,34 @@ class MxValue(EqualityTupleMixin):
preference = value['preference']
except KeyError:
preference = value['priority']
self.preference = int(preference)
# UNTIL 1.0 remove value fallback
try:
exchange = value['exchange']
except KeyError:
exchange = value['value']
self.exchange = exchange.lower()
super().__init__(
{'preference': int(preference), 'exchange': idna_encode(exchange)}
)
@property
def preference(self):
return self['preference']
@preference.setter
def preference(self, value):
self['preference'] = value
@property
def exchange(self):
return self['exchange']
@exchange.setter
def exchange(self, value):
self['exchange'] = value
@property
def data(self):
return {'preference': self.preference, 'exchange': self.exchange}
return self
def __hash__(self):
return hash((self.preference, self.exchange))
@ -1221,7 +1365,7 @@ class MxRecord(ValuesMixin, Record):
Record.register_type(MxRecord)
class NaptrValue(EqualityTupleMixin):
class NaptrValue(EqualityTupleMixin, dict):
VALID_FLAGS = ('S', 'A', 'U', 'P')
@classmethod
@ -1258,26 +1402,71 @@ class NaptrValue(EqualityTupleMixin):
@classmethod
def process(cls, values):
return [NaptrValue(v) for v in values]
return [cls(v) for v in values]
def __init__(self, value):
self.order = int(value['order'])
self.preference = int(value['preference'])
self.flags = value['flags']
self.service = value['service']
self.regexp = value['regexp']
self.replacement = value['replacement']
super().__init__(
{
'order': int(value['order']),
'preference': int(value['preference']),
'flags': value['flags'],
'service': value['service'],
'regexp': value['regexp'],
'replacement': value['replacement'],
}
)
@property
def order(self):
return self['order']
@order.setter
def order(self, value):
self['order'] = value
@property
def preference(self):
return self['preference']
@preference.setter
def preference(self, value):
self['preference'] = value
@property
def flags(self):
return self['flags']
@flags.setter
def flags(self, value):
self['flags'] = value
@property
def service(self):
return self['service']
@service.setter
def service(self, value):
self['service'] = value
@property
def regexp(self):
return self['regexp']
@regexp.setter
def regexp(self, value):
self['regexp'] = value
@property
def replacement(self):
return self['replacement']
@replacement.setter
def replacement(self, value):
self['replacement'] = value
@property
def data(self):
return {
'order': self.order,
'preference': self.preference,
'flags': self.flags,
'service': self.service,
'regexp': self.regexp,
'replacement': self.replacement,
}
return self
def __hash__(self):
return hash(self.__repr__())
@ -1310,7 +1499,7 @@ class NaptrRecord(ValuesMixin, Record):
Record.register_type(NaptrRecord)
class _NsValue(object):
class _NsValue(str):
@classmethod
def validate(cls, data, _type):
if not data:
@ -1319,7 +1508,8 @@ class _NsValue(object):
data = (data,)
reasons = []
for value in data:
if not FQDN(str(value), allow_underscores=True).is_valid:
value = idna_encode(value)
if not FQDN(value, allow_underscores=True).is_valid:
reasons.append(
f'Invalid NS value "{value}" is not a valid FQDN.'
)
@ -1329,7 +1519,11 @@ class _NsValue(object):
@classmethod
def process(cls, values):
return values
return [cls(v) for v in values]
def __new__(cls, v):
v = idna_encode(v)
return super().__new__(cls, v)
class NsRecord(ValuesMixin, Record):
@ -1358,7 +1552,8 @@ class PtrValue(_TargetValue):
@classmethod
def process(cls, values):
return [super(PtrValue, cls).process(v) for v in values]
supr = super()
return [supr.process(v) for v in values]
class PtrRecord(ValuesMixin, Record):
@ -1375,7 +1570,7 @@ class PtrRecord(ValuesMixin, Record):
Record.register_type(PtrRecord)
class SshfpValue(EqualityTupleMixin):
class SshfpValue(EqualityTupleMixin, dict):
VALID_ALGORITHMS = (1, 2, 3, 4)
VALID_FINGERPRINT_TYPES = (1, 2)
@ -1411,20 +1606,44 @@ class SshfpValue(EqualityTupleMixin):
@classmethod
def process(cls, values):
return [SshfpValue(v) for v in values]
return [cls(v) for v in values]
def __init__(self, value):
self.algorithm = int(value['algorithm'])
self.fingerprint_type = int(value['fingerprint_type'])
self.fingerprint = value['fingerprint']
super().__init__(
{
'algorithm': int(value['algorithm']),
'fingerprint_type': int(value['fingerprint_type']),
'fingerprint': value['fingerprint'],
}
)
@property
def algorithm(self):
return self['algorithm']
@algorithm.setter
def algorithm(self, value):
self['algorithm'] = value
@property
def fingerprint_type(self):
return self['fingerprint_type']
@fingerprint_type.setter
def fingerprint_type(self, value):
self['fingerprint_type'] = value
@property
def fingerprint(self):
return self['fingerprint']
@fingerprint.setter
def fingerprint(self, value):
self['fingerprint'] = value
@property
def data(self):
return {
'algorithm': self.algorithm,
'fingerprint_type': self.fingerprint_type,
'fingerprint': self.fingerprint,
}
return self
def __hash__(self):
return hash(self.__repr__())
@ -1465,7 +1684,7 @@ class _ChunkedValuesMixin(ValuesMixin):
return values
class _ChunkedValue(object):
class _ChunkedValue(str):
_unescaped_semicolon_re = re.compile(r'\w;')
@classmethod
@ -1486,7 +1705,7 @@ class _ChunkedValue(object):
for v in values:
if v and v[0] == '"':
v = v[1:-1]
ret.append(v.replace('" "', ''))
ret.append(cls(v.replace('" "', '')))
return ret
@ -1498,7 +1717,7 @@ class SpfRecord(_ChunkedValuesMixin, Record):
Record.register_type(SpfRecord)
class SrvValue(EqualityTupleMixin):
class SrvValue(EqualityTupleMixin, dict):
@classmethod
def validate(cls, data, _type):
if not isinstance(data, (list, tuple)):
@ -1526,11 +1745,15 @@ class SrvValue(EqualityTupleMixin):
reasons.append(f'invalid port "{value["port"]}"')
try:
target = value['target']
if not target:
reasons.append('missing target')
continue
target = idna_encode(target)
if not target.endswith('.'):
reasons.append(f'SRV value "{target}" missing trailing .')
if (
target != '.'
and not FQDN(str(target), allow_underscores=True).is_valid
and not FQDN(target, allow_underscores=True).is_valid
):
reasons.append(
f'Invalid SRV target "{target}" is not a valid FQDN.'
@ -1541,22 +1764,53 @@ class SrvValue(EqualityTupleMixin):
@classmethod
def process(cls, values):
return [SrvValue(v) for v in values]
return [cls(v) for v in values]
def __init__(self, value):
self.priority = int(value['priority'])
self.weight = int(value['weight'])
self.port = int(value['port'])
self.target = value['target'].lower()
super().__init__(
{
'priority': int(value['priority']),
'weight': int(value['weight']),
'port': int(value['port']),
'target': idna_encode(value['target']),
}
)
@property
def priority(self):
return self['priority']
@priority.setter
def priority(self, value):
self['priority'] = value
@property
def weight(self):
return self['weight']
@weight.setter
def weight(self, value):
self['weight'] = value
@property
def port(self):
return self['port']
@port.setter
def port(self, value):
self['port'] = value
@property
def target(self):
return self['target']
@target.setter
def target(self, value):
self['target'] = value
@property
def data(self):
return {
'priority': self.priority,
'weight': self.weight,
'port': self.port,
'target': self.target,
}
return self
def __hash__(self):
return hash(self.__repr__())
@ -1585,7 +1839,7 @@ class SrvRecord(ValuesMixin, Record):
Record.register_type(SrvRecord)
class TlsaValue(EqualityTupleMixin):
class TlsaValue(EqualityTupleMixin, dict):
@classmethod
def validate(cls, data, _type):
if not isinstance(data, (list, tuple)):
@ -1632,24 +1886,51 @@ class TlsaValue(EqualityTupleMixin):
@classmethod
def process(cls, values):
return [TlsaValue(v) for v in values]
return [cls(v) for v in values]
def __init__(self, value):
self.certificate_usage = int(value.get('certificate_usage', 0))
self.selector = int(value.get('selector', 0))
self.matching_type = int(value.get('matching_type', 0))
self.certificate_association_data = value[
'certificate_association_data'
]
super().__init__(
{
'certificate_usage': int(value.get('certificate_usage', 0)),
'selector': int(value.get('selector', 0)),
'matching_type': int(value.get('matching_type', 0)),
'certificate_association_data': value[
'certificate_association_data'
],
}
)
@property
def data(self):
return {
'certificate_usage': self.certificate_usage,
'selector': self.selector,
'matching_type': self.matching_type,
'certificate_association_data': self.certificate_association_data,
}
def certificate_usage(self):
return self['certificate_usage']
@certificate_usage.setter
def certificate_usage(self, value):
self['certificate_usage'] = value
@property
def selector(self):
return self['selector']
@selector.setter
def selector(self, value):
self['selector'] = value
@property
def matching_type(self):
return self['matching_type']
@matching_type.setter
def matching_type(self, value):
self['matching_type'] = value
@property
def certificate_association_data(self):
return self['certificate_association_data']
@certificate_association_data.setter
def certificate_association_data(self, value):
self['certificate_association_data'] = value
def _equality_tuple(self):
return (
@ -1686,7 +1967,7 @@ class TxtRecord(_ChunkedValuesMixin, Record):
Record.register_type(TxtRecord)
class UrlfwdValue(EqualityTupleMixin):
class UrlfwdValue(EqualityTupleMixin, dict):
VALID_CODES = (301, 302)
VALID_MASKS = (0, 1, 2)
VALID_QUERY = (0, 1)
@ -1728,37 +2009,70 @@ class UrlfwdValue(EqualityTupleMixin):
@classmethod
def process(cls, values):
return [UrlfwdValue(v) for v in values]
return [cls(v) for v in values]
def __init__(self, value):
self.path = value['path']
self.target = value['target']
self.code = int(value['code'])
self.masking = int(value['masking'])
self.query = int(value['query'])
super().__init__(
{
'path': value['path'],
'target': value['target'],
'code': int(value['code']),
'masking': int(value['masking']),
'query': int(value['query']),
}
)
@property
def data(self):
return {
'path': self.path,
'target': self.target,
'code': self.code,
'masking': self.masking,
'query': self.query,
}
def path(self):
return self['path']
def __hash__(self):
return hash(self.__repr__())
@path.setter
def path(self, value):
self['path'] = value
@property
def target(self):
return self['target']
@target.setter
def target(self, value):
self['target'] = value
@property
def code(self):
return self['code']
@code.setter
def code(self, value):
self['code'] = value
@property
def masking(self):
return self['masking']
@masking.setter
def masking(self, value):
self['masking'] = value
@property
def query(self):
return self['query']
@query.setter
def query(self, value):
self['query'] = value
def _equality_tuple(self):
return (self.path, self.target, self.code, self.masking, self.query)
def __repr__(self):
return (
f'"{self.path}" "{self.target}" {self.code} '
f'{self.masking} {self.query}'
def __hash__(self):
return hash(
(self.path, self.target, self.code, self.masking, self.query)
)
def __repr__(self):
return f'"{self.path}" "{self.target}" {self.code} {self.masking} {self.query}'
class UrlfwdRecord(ValuesMixin, Record):
_type = 'URLFWD'


+ 5
- 0
octodns/yaml.py View File

@ -4,6 +4,7 @@
from natsort import natsort_keygen
from yaml import SafeDumper, SafeLoader, load, dump
from yaml.representer import SafeRepresenter
from yaml.constructor import ConstructorError
@ -54,6 +55,10 @@ class SortingDumper(SafeDumper):
SortingDumper.add_representer(dict, SortingDumper._representer)
# This should handle all the record value types which are ultimately either str
# or dict at some point in their inheritance hierarchy
SortingDumper.add_multi_representer(str, SafeRepresenter.represent_str)
SortingDumper.add_multi_representer(dict, SortingDumper._representer)
def safe_dump(data, fh, **options):


+ 399
- 125
tests/test_octodns_record.py View File

@ -16,6 +16,7 @@ from octodns.record import (
Create,
Delete,
GeoValue,
Ipv4Address,
LocRecord,
LocValue,
MxRecord,
@ -32,6 +33,7 @@ from octodns.record import (
SrvRecord,
SrvValue,
TlsaRecord,
TlsaValue,
TxtRecord,
Update,
UrlfwdRecord,
@ -93,6 +95,81 @@ class TestRecord(TestCase):
self.assertTrue(f'{encoded}.{zone.name}', record.fqdn)
self.assertTrue(f'{utf8}.{zone.decoded_name}', record.decoded_fqdn)
def test_utf8_values(self):
zone = Zone('unit.tests.', [])
utf8 = 'гэрбүл.mn.'
encoded = idna_encode(utf8)
# ALIAS
record = Record.new(
zone, '', {'type': 'ALIAS', 'ttl': 300, 'value': utf8}
)
self.assertEqual(encoded, record.value)
# CNAME
record = Record.new(
zone, 'cname', {'type': 'CNAME', 'ttl': 300, 'value': utf8}
)
self.assertEqual(encoded, record.value)
# DNAME
record = Record.new(
zone, 'dname', {'type': 'DNAME', 'ttl': 300, 'value': utf8}
)
self.assertEqual(encoded, record.value)
# MX
record = Record.new(
zone,
'mx',
{
'type': 'MX',
'ttl': 300,
'value': {'preference': 10, 'exchange': utf8},
},
)
self.assertEqual(
MxValue({'preference': 10, 'exchange': encoded}), record.values[0]
)
# NS
record = Record.new(
zone, 'ns', {'type': 'NS', 'ttl': 300, 'value': utf8}
)
self.assertEqual(encoded, record.values[0])
# PTR
another_utf8 = 'niño.mx.'
another_encoded = idna_encode(another_utf8)
record = Record.new(
zone,
'ptr',
{'type': 'PTR', 'ttl': 300, 'values': [utf8, another_utf8]},
)
self.assertEqual([encoded, another_encoded], record.values)
# SRV
record = Record.new(
zone,
'_srv._tcp',
{
'type': 'SRV',
'ttl': 300,
'value': {
'priority': 0,
'weight': 10,
'port': 80,
'target': utf8,
},
},
)
self.assertEqual(
SrvValue(
{'priority': 0, 'weight': 10, 'port': 80, 'target': encoded}
),
record.values[0],
)
def test_alias_lowering_value(self):
upper_record = AliasRecord(
self.zone,
@ -386,12 +463,14 @@ class TestRecord(TestCase):
def test_caa(self):
a_values = [
{'flags': 0, 'tag': 'issue', 'value': 'ca.example.net'},
{
'flags': 128,
'tag': 'iodef',
'value': 'mailto:security@example.com',
},
CaaValue({'flags': 0, 'tag': 'issue', 'value': 'ca.example.net'}),
CaaValue(
{
'flags': 128,
'tag': 'iodef',
'value': 'mailto:security@example.com',
}
),
]
a_data = {'ttl': 30, 'values': a_values}
a = CaaRecord(self.zone, 'a', a_data)
@ -406,7 +485,9 @@ class TestRecord(TestCase):
self.assertEqual(a_values[1]['value'], a.values[1].value)
self.assertEqual(a_data, a.data)
b_value = {'tag': 'iodef', 'value': 'http://iodef.example.com/'}
b_value = CaaValue(
{'tag': 'iodef', 'value': 'http://iodef.example.com/'}
)
b_data = {'ttl': 30, 'value': b_value}
b = CaaRecord(self.zone, 'b', b_data)
self.assertEqual(0, b.values[0].flags)
@ -448,20 +529,22 @@ class TestRecord(TestCase):
def test_loc(self):
a_values = [
{
'lat_degrees': 31,
'lat_minutes': 58,
'lat_seconds': 52.1,
'lat_direction': 'S',
'long_degrees': 115,
'long_minutes': 49,
'long_seconds': 11.7,
'long_direction': 'E',
'altitude': 20,
'size': 10,
'precision_horz': 10,
'precision_vert': 2,
}
LocValue(
{
'lat_degrees': 31,
'lat_minutes': 58,
'lat_seconds': 52.1,
'lat_direction': 'S',
'long_degrees': 115,
'long_minutes': 49,
'long_seconds': 11.7,
'long_direction': 'E',
'altitude': 20,
'size': 10,
'precision_horz': 10,
'precision_vert': 2,
}
)
]
a_data = {'ttl': 30, 'values': a_values}
a = LocRecord(self.zone, 'a', a_data)
@ -489,20 +572,22 @@ class TestRecord(TestCase):
a_values[0]['precision_vert'], a.values[0].precision_vert
)
b_value = {
'lat_degrees': 32,
'lat_minutes': 7,
'lat_seconds': 19,
'lat_direction': 'S',
'long_degrees': 116,
'long_minutes': 2,
'long_seconds': 25,
'long_direction': 'E',
'altitude': 10,
'size': 1,
'precision_horz': 10000,
'precision_vert': 10,
}
b_value = LocValue(
{
'lat_degrees': 32,
'lat_minutes': 7,
'lat_seconds': 19,
'lat_direction': 'S',
'long_degrees': 116,
'long_minutes': 2,
'long_seconds': 25,
'long_direction': 'E',
'altitude': 10,
'size': 1,
'precision_horz': 10000,
'precision_vert': 10,
}
)
b_data = {'ttl': 30, 'value': b_value}
b = LocRecord(self.zone, 'b', b_data)
self.assertEqual(b_value['lat_degrees'], b.values[0].lat_degrees)
@ -540,8 +625,8 @@ class TestRecord(TestCase):
def test_mx(self):
a_values = [
{'preference': 10, 'exchange': 'smtp1.'},
{'priority': 20, 'value': 'smtp2.'},
MxValue({'preference': 10, 'exchange': 'smtp1.'}),
MxValue({'priority': 20, 'value': 'smtp2.'}),
]
a_data = {'ttl': 30, 'values': a_values}
a = MxRecord(self.zone, 'a', a_data)
@ -550,12 +635,12 @@ class TestRecord(TestCase):
self.assertEqual(30, a.ttl)
self.assertEqual(a_values[0]['preference'], a.values[0].preference)
self.assertEqual(a_values[0]['exchange'], a.values[0].exchange)
self.assertEqual(a_values[1]['priority'], a.values[1].preference)
self.assertEqual(a_values[1]['value'], a.values[1].exchange)
a_data['values'][1] = {'preference': 20, 'exchange': 'smtp2.'}
self.assertEqual(a_values[1]['preference'], a.values[1].preference)
self.assertEqual(a_values[1]['exchange'], a.values[1].exchange)
a_data['values'][1] = MxValue({'preference': 20, 'exchange': 'smtp2.'})
self.assertEqual(a_data, a.data)
b_value = {'preference': 0, 'exchange': 'smtp3.'}
b_value = MxValue({'preference': 0, 'exchange': 'smtp3.'})
b_data = {'ttl': 30, 'value': b_value}
b = MxRecord(self.zone, 'b', b_data)
self.assertEqual(b_value['preference'], b.values[0].preference)
@ -591,22 +676,26 @@ class TestRecord(TestCase):
def test_naptr(self):
a_values = [
{
'order': 10,
'preference': 11,
'flags': 'X',
'service': 'Y',
'regexp': 'Z',
'replacement': '.',
},
{
'order': 20,
'preference': 21,
'flags': 'A',
'service': 'B',
'regexp': 'C',
'replacement': 'foo.com',
},
NaptrValue(
{
'order': 10,
'preference': 11,
'flags': 'X',
'service': 'Y',
'regexp': 'Z',
'replacement': '.',
}
),
NaptrValue(
{
'order': 20,
'preference': 21,
'flags': 'A',
'service': 'B',
'regexp': 'C',
'replacement': 'foo.com',
}
),
]
a_data = {'ttl': 30, 'values': a_values}
a = NaptrRecord(self.zone, 'a', a_data)
@ -618,14 +707,16 @@ class TestRecord(TestCase):
self.assertEqual(a_values[i][k], getattr(a.values[i], k))
self.assertEqual(a_data, a.data)
b_value = {
'order': 30,
'preference': 31,
'flags': 'M',
'service': 'N',
'regexp': 'O',
'replacement': 'x',
}
b_value = NaptrValue(
{
'order': 30,
'preference': 31,
'flags': 'M',
'service': 'N',
'regexp': 'O',
'replacement': 'x',
}
)
b_data = {'ttl': 30, 'value': b_value}
b = NaptrRecord(self.zone, 'b', b_data)
for k in a_values[0].keys():
@ -849,6 +940,30 @@ class TestRecord(TestCase):
values.add(o)
self.assertTrue(o in values)
self.assertEqual(30, o.order)
o.order = o.order + 1
self.assertEqual(31, o.order)
self.assertEqual(32, o.preference)
o.preference = o.preference + 1
self.assertEqual(33, o.preference)
self.assertEqual('M', o.flags)
o.flags = 'P'
self.assertEqual('P', o.flags)
self.assertEqual('N', o.service)
o.service = 'Q'
self.assertEqual('Q', o.service)
self.assertEqual('O', o.regexp)
o.regexp = 'R'
self.assertEqual('R', o.regexp)
self.assertEqual('z', o.replacement)
o.replacement = '1'
self.assertEqual('1', o.replacement)
def test_ns(self):
a_values = ['5.6.7.8.', '6.7.8.9.', '7.8.9.0.']
a_data = {'ttl': 30, 'values': a_values}
@ -867,8 +982,20 @@ class TestRecord(TestCase):
def test_sshfp(self):
a_values = [
{'algorithm': 10, 'fingerprint_type': 11, 'fingerprint': 'abc123'},
{'algorithm': 20, 'fingerprint_type': 21, 'fingerprint': 'def456'},
SshfpValue(
{
'algorithm': 10,
'fingerprint_type': 11,
'fingerprint': 'abc123',
}
),
SshfpValue(
{
'algorithm': 20,
'fingerprint_type': 21,
'fingerprint': 'def456',
}
),
]
a_data = {'ttl': 30, 'values': a_values}
a = SshfpRecord(self.zone, 'a', a_data)
@ -882,11 +1009,9 @@ class TestRecord(TestCase):
self.assertEqual(a_values[0]['fingerprint'], a.values[0].fingerprint)
self.assertEqual(a_data, a.data)
b_value = {
'algorithm': 30,
'fingerprint_type': 31,
'fingerprint': 'ghi789',
}
b_value = SshfpValue(
{'algorithm': 30, 'fingerprint_type': 31, 'fingerprint': 'ghi789'}
)
b_data = {'ttl': 30, 'value': b_value}
b = SshfpRecord(self.zone, 'b', b_data)
self.assertEqual(b_value['algorithm'], b.values[0].algorithm)
@ -930,8 +1055,12 @@ class TestRecord(TestCase):
def test_srv(self):
a_values = [
{'priority': 10, 'weight': 11, 'port': 12, 'target': 'server1'},
{'priority': 20, 'weight': 21, 'port': 22, 'target': 'server2'},
SrvValue(
{'priority': 10, 'weight': 11, 'port': 12, 'target': 'server1'}
),
SrvValue(
{'priority': 20, 'weight': 21, 'port': 22, 'target': 'server2'}
),
]
a_data = {'ttl': 30, 'values': a_values}
a = SrvRecord(self.zone, '_a._tcp', a_data)
@ -944,12 +1073,9 @@ class TestRecord(TestCase):
self.assertEqual(a_values[0]['target'], a.values[0].target)
self.assertEqual(a_data, a.data)
b_value = {
'priority': 30,
'weight': 31,
'port': 32,
'target': 'server3',
}
b_value = SrvValue(
{'priority': 30, 'weight': 31, 'port': 32, 'target': 'server3'}
)
b_data = {'ttl': 30, 'value': b_value}
b = SrvRecord(self.zone, '_b._tcp', b_data)
self.assertEqual(b_value['priority'], b.values[0].priority)
@ -993,18 +1119,22 @@ class TestRecord(TestCase):
def test_tlsa(self):
a_values = [
{
'certificate_usage': 1,
'selector': 1,
'matching_type': 1,
'certificate_association_data': 'ABABABABABABABABAB',
},
{
'certificate_usage': 2,
'selector': 0,
'matching_type': 2,
'certificate_association_data': 'ABABABABABABABABAC',
},
TlsaValue(
{
'certificate_usage': 1,
'selector': 1,
'matching_type': 1,
'certificate_association_data': 'ABABABABABABABABAB',
}
),
TlsaValue(
{
'certificate_usage': 2,
'selector': 0,
'matching_type': 2,
'certificate_association_data': 'ABABABABABABABABAC',
}
),
]
a_data = {'ttl': 30, 'values': a_values}
a = TlsaRecord(self.zone, 'a', a_data)
@ -1036,12 +1166,14 @@ class TestRecord(TestCase):
)
self.assertEqual(a_data, a.data)
b_value = {
'certificate_usage': 0,
'selector': 0,
'matching_type': 0,
'certificate_association_data': 'AAAAAAAAAAAAAAA',
}
b_value = TlsaValue(
{
'certificate_usage': 0,
'selector': 0,
'matching_type': 0,
'certificate_association_data': 'AAAAAAAAAAAAAAA',
}
)
b_data = {'ttl': 30, 'value': b_value}
b = TlsaRecord(self.zone, 'b', b_data)
self.assertEqual(
@ -1093,20 +1225,24 @@ class TestRecord(TestCase):
def test_urlfwd(self):
a_values = [
{
'path': '/',
'target': 'http://foo',
'code': 301,
'masking': 2,
'query': 0,
},
{
'path': '/target',
'target': 'http://target',
'code': 302,
'masking': 2,
'query': 0,
},
UrlfwdValue(
{
'path': '/',
'target': 'http://foo',
'code': 301,
'masking': 2,
'query': 0,
}
),
UrlfwdValue(
{
'path': '/target',
'target': 'http://target',
'code': 302,
'masking': 2,
'query': 0,
}
),
]
a_data = {'ttl': 30, 'values': a_values}
a = UrlfwdRecord(self.zone, 'a', a_data)
@ -1125,13 +1261,15 @@ class TestRecord(TestCase):
self.assertEqual(a_values[1]['query'], a.values[1].query)
self.assertEqual(a_data, a.data)
b_value = {
'path': '/',
'target': 'http://location',
'code': 301,
'masking': 2,
'query': 0,
}
b_value = UrlfwdValue(
{
'path': '/',
'target': 'http://location',
'code': 301,
'masking': 2,
'query': 0,
}
)
b_data = {'ttl': 30, 'value': b_value}
b = UrlfwdRecord(self.zone, 'b', b_data)
self.assertEqual(b_value['path'], b.values[0].path)
@ -1653,6 +1791,54 @@ class TestRecord(TestCase):
self.assertTrue(c >= c)
self.assertTrue(c <= c)
self.assertEqual(31, a.lat_degrees)
a.lat_degrees = a.lat_degrees + 1
self.assertEqual(32, a.lat_degrees)
self.assertEqual(58, a.lat_minutes)
a.lat_minutes = a.lat_minutes + 1
self.assertEqual(59, a.lat_minutes)
self.assertEqual(52.1, a.lat_seconds)
a.lat_seconds = a.lat_seconds + 1
self.assertEqual(53.1, a.lat_seconds)
self.assertEqual('S', a.lat_direction)
a.lat_direction = 'N'
self.assertEqual('N', a.lat_direction)
self.assertEqual(115, a.long_degrees)
a.long_degrees = a.long_degrees + 1
self.assertEqual(116, a.long_degrees)
self.assertEqual(49, a.long_minutes)
a.long_minutes = a.long_minutes + 1
self.assertEqual(50, a.long_minutes)
self.assertEqual(11.7, a.long_seconds)
a.long_seconds = a.long_seconds + 1
self.assertEqual(12.7, a.long_seconds)
self.assertEqual('E', a.long_direction)
a.long_direction = 'W'
self.assertEqual('W', a.long_direction)
self.assertEqual(20, a.altitude)
a.altitude = a.altitude + 1
self.assertEqual(21, a.altitude)
self.assertEqual(10, a.size)
a.size = a.size + 1
self.assertEqual(11, a.size)
self.assertEqual(10, a.precision_horz)
a.precision_horz = a.precision_horz + 1
self.assertEqual(11, a.precision_horz)
self.assertEqual(2, a.precision_vert)
a.precision_vert = a.precision_vert + 1
self.assertEqual(3, a.precision_vert)
# Hash
values = set()
values.add(a)
@ -2477,7 +2663,7 @@ class TestRecordValidation(TestCase):
)
self.assertEqual(['missing value'], ctx.exception.reasons)
def test_CNAME(self):
def test_cname_validation(self):
# doesn't blow up
Record.new(
self.zone,
@ -3026,6 +3212,19 @@ class TestRecordValidation(TestCase):
ctx.exception.reasons,
)
# if exchange doesn't exist value can not be None/falsey
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'MX',
'ttl': 600,
'value': {'preference': 10, 'value': ''},
},
)
self.assertEqual(['missing exchange'], ctx.exception.reasons)
# exchange can be a single `.`
record = Record.new(
self.zone,
@ -3038,7 +3237,7 @@ class TestRecordValidation(TestCase):
)
self.assertEqual('.', record.values[0].exchange)
def test_NXPTR(self):
def test_NAPTR(self):
# doesn't blow up
Record.new(
self.zone,
@ -3521,6 +3720,24 @@ class TestRecordValidation(TestCase):
ctx.exception.reasons,
)
# falsey target
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'_srv._tcp',
{
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 2,
'port': 3,
'target': '',
},
},
)
self.assertEqual(['missing target'], ctx.exception.reasons)
# target must be a valid FQDN
with self.assertRaises(ValidationError) as ctx:
Record.new(
@ -5206,7 +5423,6 @@ class TestDynamicRecords(TestCase):
'pools': {
'one': {'values': [{'value': '3.3.3.3'}]},
'two': {
# Testing out of order value sorting here
'values': [{'value': '5.5.5.5'}, {'value': '4.4.4.4'}]
},
'three': {
@ -5234,9 +5450,12 @@ class TestDynamicRecords(TestCase):
)
def test_dynamic_eqs(self):
pool_one = _DynamicPool('one', {'values': [{'value': '1.2.3.4'}]})
pool_two = _DynamicPool('two', {'values': [{'value': '1.2.3.5'}]})
pool_one = _DynamicPool(
'one', {'values': [{'value': '1.2.3.4'}]}, Ipv4Address
)
pool_two = _DynamicPool(
'two', {'values': [{'value': '1.2.3.5'}]}, Ipv4Address
)
self.assertEqual(pool_one, pool_one)
self.assertNotEqual(pool_one, pool_two)
self.assertNotEqual(pool_one, 42)
@ -5255,6 +5474,61 @@ class TestDynamicRecords(TestCase):
self.assertNotEqual(dynamic, other)
self.assertNotEqual(dynamic, 42)
def test_dynamic_cname_idna(self):
a_utf8 = 'natación.mx.'
a_encoded = idna_encode(a_utf8)
b_utf8 = 'гэрбүл.mn.'
b_encoded = idna_encode(b_utf8)
cname_data = {
'dynamic': {
'pools': {
'one': {
# Testing out of order value sorting here
'values': [
{'value': 'b.unit.tests.'},
{'value': 'a.unit.tests.'},
]
},
'two': {
'values': [
# some utf8 values we expect to be idna encoded
{'weight': 10, 'value': a_utf8},
{'weight': 12, 'value': b_utf8},
]
},
},
'rules': [
{'geos': ['NA-US-CA'], 'pool': 'two'},
{'pool': 'one'},
],
},
'type': 'CNAME',
'ttl': 60,
'value': a_utf8,
}
cname = Record.new(self.zone, 'cname', cname_data)
self.assertEqual(a_encoded, cname.value)
self.assertEqual(
{
'fallback': None,
'values': [
{'weight': 1, 'value': 'a.unit.tests.', 'status': 'obey'},
{'weight': 1, 'value': 'b.unit.tests.', 'status': 'obey'},
],
},
cname.dynamic.pools['one'].data,
)
self.assertEqual(
{
'fallback': None,
'values': [
{'weight': 12, 'value': b_encoded, 'status': 'obey'},
{'weight': 10, 'value': a_encoded, 'status': 'obey'},
],
},
cname.dynamic.pools['two'].data,
)
class TestChanges(TestCase):
zone = Zone('unit.tests.', [])


Loading…
Cancel
Save