Browse Source

Merge pull request #929 from octodns/value-object

Record values are first-class objects
pull/942/head
Ross McFarland 3 years ago
committed by GitHub
parent
commit
78d6e6ded6
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 706 additions and 282 deletions
  1. +2
    -0
      CHANGELOG.md
  2. +459
    -162
      octodns/record/__init__.py
  3. +5
    -0
      octodns/yaml.py
  4. +240
    -120
      tests/test_octodns_record.py

+ 2
- 0
CHANGELOG.md View File

@ -30,6 +30,8 @@
* Add TtlRestrictionFilter processor for adding ttl restriction/checking
* NameAllowlistFilter & NameRejectlistFilter implementations to support
filtering on record names to include/exclude records from management.
* All Record values are now first class objects. This shouldn't be an externally
visible change, but will enable future improvements.
## v0.9.19 - 2022-08-14 - Subzone handling


+ 459
- 162
octodns/record/__init__.py View File

@ -2,7 +2,7 @@
#
#
from ipaddress import IPv4Address, IPv6Address
from ipaddress import IPv4Address as _IPv4Address, IPv6Address as _IPv6Address
from logging import getLogger
import re
@ -784,7 +784,38 @@ class _DynamicMixin(object):
return super(_DynamicMixin, self).__repr__()
class _IpList(object):
class _TargetValue(str):
@classmethod
def validate(cls, data, _type):
reasons = []
if data == '':
reasons.append('empty value')
elif not data:
reasons.append('missing value')
# NOTE: FQDN complains if the data it receives isn't a str, it doesn't
# allow unicode... This is likely specific to 2.7
elif not FQDN(str(data), allow_underscores=True).is_valid:
reasons.append(f'{_type} value "{data}" is not a valid FQDN')
elif not data.endswith('.'):
reasons.append(f'{_type} value "{data}" missing trailing .')
return reasons
@classmethod
def process(cls, value):
if value:
return cls(value.lower())
return None
class CnameValue(_TargetValue):
pass
class DnameValue(_TargetValue):
pass
class _IpAddress(str):
@classmethod
def validate(cls, data, _type):
if not isinstance(data, (list, tuple)):
@ -809,64 +840,37 @@ class _IpList(object):
def process(cls, values):
# Translating None into '' so that the list will be sortable in
# python3, get everything to str first
values = [str(v) if v is not None else '' for v in values]
values = [v if v is not None else '' for v in values]
# Now round trip all non-'' through the address type and back to a str
# to normalize the address representation.
return [str(cls._address_type(v)) if v != '' else '' for v in values]
return [cls(v) if v != '' else '' for v in values]
def __new__(cls, v):
v = str(cls._address_type(v))
return super().__new__(cls, v)
class Ipv4List(_IpList):
_address_name = 'IPv4'
_address_type = IPv4Address
class Ipv6List(_IpList):
_address_name = 'IPv6'
_address_type = IPv6Address
class _TargetValue(object):
@classmethod
def validate(cls, data, _type):
reasons = []
if data == '':
reasons.append('empty value')
elif not data:
reasons.append('missing value')
# NOTE: FQDN complains if the data it receives isn't a str, it doesn't
# allow unicode... This is likely specific to 2.7
elif not FQDN(str(data), allow_underscores=True).is_valid:
reasons.append(f'{_type} value "{data}" is not a valid FQDN')
elif not data.endswith('.'):
reasons.append(f'{_type} value "{data}" missing trailing .')
return reasons
@classmethod
def process(self, value):
if value:
return value.lower()
return value
class CnameValue(_TargetValue):
pass
class DnameValue(_TargetValue):
pass
class Ipv4Address(_IpAddress):
_address_type = _IPv4Address
_address_name = 'IPv4'
class ARecord(_DynamicMixin, _GeoMixin, Record):
_type = 'A'
_value_type = Ipv4List
_value_type = Ipv4Address
Record.register_type(ARecord)
class Ipv6Address(_IpAddress):
_address_type = _IPv6Address
_address_name = 'IPv6'
class AaaaRecord(_DynamicMixin, _GeoMixin, Record):
_type = 'AAAA'
_value_type = Ipv6List
_value_type = Ipv6Address
Record.register_type(AaaaRecord)
@ -892,7 +896,7 @@ class AliasRecord(ValueMixin, Record):
Record.register_type(AliasRecord)
class CaaValue(EqualityTupleMixin):
class CaaValue(EqualityTupleMixin, dict):
# https://tools.ietf.org/html/rfc6844#page-5
@classmethod
@ -916,16 +920,44 @@ class CaaValue(EqualityTupleMixin):
@classmethod
def process(cls, values):
return [CaaValue(v) for v in values]
return [cls(v) for v in values]
def __init__(self, value):
self.flags = int(value.get('flags', 0))
self.tag = value['tag']
self.value = value['value']
super().__init__(
{
'flags': int(value.get('flags', 0)),
'tag': value['tag'],
'value': value['value'],
}
)
@property
def flags(self):
return self['flags']
@flags.setter
def flags(self, value):
self['flags'] = value
@property
def tag(self):
return self['tag']
@tag.setter
def tag(self, value):
self['tag'] = value
@property
def value(self):
return self['value']
@value.setter
def value(self, value):
self['value'] = value
@property
def data(self):
return {'flags': self.flags, 'tag': self.tag, 'value': self.value}
return self
def _equality_tuple(self):
return (self.flags, self.tag, self.value)
@ -966,7 +998,7 @@ class DnameRecord(_DynamicMixin, ValueMixin, Record):
Record.register_type(DnameRecord)
class LocValue(EqualityTupleMixin):
class LocValue(EqualityTupleMixin, dict):
# TODO: work out how to do defaults per RFC
@classmethod
@ -1062,38 +1094,125 @@ class LocValue(EqualityTupleMixin):
@classmethod
def process(cls, values):
return [LocValue(v) for v in values]
return [cls(v) for v in values]
def __init__(self, value):
self.lat_degrees = int(value['lat_degrees'])
self.lat_minutes = int(value['lat_minutes'])
self.lat_seconds = float(value['lat_seconds'])
self.lat_direction = value['lat_direction'].upper()
self.long_degrees = int(value['long_degrees'])
self.long_minutes = int(value['long_minutes'])
self.long_seconds = float(value['long_seconds'])
self.long_direction = value['long_direction'].upper()
self.altitude = float(value['altitude'])
self.size = float(value['size'])
self.precision_horz = float(value['precision_horz'])
self.precision_vert = float(value['precision_vert'])
super().__init__(
{
'lat_degrees': int(value['lat_degrees']),
'lat_minutes': int(value['lat_minutes']),
'lat_seconds': float(value['lat_seconds']),
'lat_direction': value['lat_direction'].upper(),
'long_degrees': int(value['long_degrees']),
'long_minutes': int(value['long_minutes']),
'long_seconds': float(value['long_seconds']),
'long_direction': value['long_direction'].upper(),
'altitude': float(value['altitude']),
'size': float(value['size']),
'precision_horz': float(value['precision_horz']),
'precision_vert': float(value['precision_vert']),
}
)
@property
def lat_degrees(self):
return self['lat_degrees']
@lat_degrees.setter
def lat_degrees(self, value):
self['lat_degrees'] = value
@property
def lat_minutes(self):
return self['lat_minutes']
@lat_minutes.setter
def lat_minutes(self, value):
self['lat_minutes'] = value
@property
def lat_seconds(self):
return self['lat_seconds']
@lat_seconds.setter
def lat_seconds(self, value):
self['lat_seconds'] = value
@property
def lat_direction(self):
return self['lat_direction']
@lat_direction.setter
def lat_direction(self, value):
self['lat_direction'] = value
@property
def long_degrees(self):
return self['long_degrees']
@long_degrees.setter
def long_degrees(self, value):
self['long_degrees'] = value
@property
def long_minutes(self):
return self['long_minutes']
@long_minutes.setter
def long_minutes(self, value):
self['long_minutes'] = value
@property
def long_seconds(self):
return self['long_seconds']
@long_seconds.setter
def long_seconds(self, value):
self['long_seconds'] = value
@property
def long_direction(self):
return self['long_direction']
@long_direction.setter
def long_direction(self, value):
self['long_direction'] = value
@property
def altitude(self):
return self['altitude']
@altitude.setter
def altitude(self, value):
self['altitude'] = value
@property
def size(self):
return self['size']
@size.setter
def size(self, value):
self['size'] = value
@property
def precision_horz(self):
return self['precision_horz']
@precision_horz.setter
def precision_horz(self, value):
self['precision_horz'] = value
@property
def precision_vert(self):
return self['precision_vert']
@precision_vert.setter
def precision_vert(self, value):
self['precision_vert'] = value
@property
def data(self):
return {
'lat_degrees': self.lat_degrees,
'lat_minutes': self.lat_minutes,
'lat_seconds': self.lat_seconds,
'lat_direction': self.lat_direction,
'long_degrees': self.long_degrees,
'long_minutes': self.long_minutes,
'long_seconds': self.long_seconds,
'long_direction': self.long_direction,
'altitude': self.altitude,
'size': self.size,
'precision_horz': self.precision_horz,
'precision_vert': self.precision_vert,
}
return self
def __hash__(self):
return hash(
@ -1148,7 +1267,7 @@ class LocRecord(ValuesMixin, Record):
Record.register_type(LocRecord)
class MxValue(EqualityTupleMixin):
class MxValue(EqualityTupleMixin, dict):
@classmethod
def validate(cls, data, _type):
if not isinstance(data, (list, tuple)):
@ -1183,7 +1302,7 @@ class MxValue(EqualityTupleMixin):
@classmethod
def process(cls, values):
return [MxValue(v) for v in values]
return [cls(v) for v in values]
def __init__(self, value):
# RFC1035 says preference, half the providers use priority
@ -1191,17 +1310,34 @@ class MxValue(EqualityTupleMixin):
preference = value['preference']
except KeyError:
preference = value['priority']
self.preference = int(preference)
# UNTIL 1.0 remove value fallback
try:
exchange = value['exchange']
except KeyError:
exchange = value['value']
self.exchange = exchange.lower()
super().__init__(
{'preference': int(preference), 'exchange': exchange.lower()}
)
@property
def preference(self):
return self['preference']
@preference.setter
def preference(self, value):
self['preference'] = value
@property
def exchange(self):
return self['exchange']
@exchange.setter
def exchange(self, value):
self['exchange'] = value
@property
def data(self):
return {'preference': self.preference, 'exchange': self.exchange}
return self
def __hash__(self):
return hash((self.preference, self.exchange))
@ -1221,7 +1357,7 @@ class MxRecord(ValuesMixin, Record):
Record.register_type(MxRecord)
class NaptrValue(EqualityTupleMixin):
class NaptrValue(EqualityTupleMixin, dict):
VALID_FLAGS = ('S', 'A', 'U', 'P')
@classmethod
@ -1258,26 +1394,71 @@ class NaptrValue(EqualityTupleMixin):
@classmethod
def process(cls, values):
return [NaptrValue(v) for v in values]
return [cls(v) for v in values]
def __init__(self, value):
self.order = int(value['order'])
self.preference = int(value['preference'])
self.flags = value['flags']
self.service = value['service']
self.regexp = value['regexp']
self.replacement = value['replacement']
super().__init__(
{
'order': int(value['order']),
'preference': int(value['preference']),
'flags': value['flags'],
'service': value['service'],
'regexp': value['regexp'],
'replacement': value['replacement'],
}
)
@property
def order(self):
return self['order']
@order.setter
def order(self, value):
self['order'] = value
@property
def preference(self):
return self['preference']
@preference.setter
def preference(self, value):
self['preference'] = value
@property
def flags(self):
return self['flags']
@flags.setter
def flags(self, value):
self['flags'] = value
@property
def service(self):
return self['service']
@service.setter
def service(self, value):
self['service'] = value
@property
def regexp(self):
return self['regexp']
@regexp.setter
def regexp(self, value):
self['regexp'] = value
@property
def replacement(self):
return self['replacement']
@replacement.setter
def replacement(self, value):
self['replacement'] = value
@property
def data(self):
return {
'order': self.order,
'preference': self.preference,
'flags': self.flags,
'service': self.service,
'regexp': self.regexp,
'replacement': self.replacement,
}
return self
def __hash__(self):
return hash(self.__repr__())
@ -1310,7 +1491,7 @@ class NaptrRecord(ValuesMixin, Record):
Record.register_type(NaptrRecord)
class _NsValue(object):
class _NsValue(str):
@classmethod
def validate(cls, data, _type):
if not data:
@ -1329,7 +1510,7 @@ class _NsValue(object):
@classmethod
def process(cls, values):
return values
return [cls(v) for v in values]
class NsRecord(ValuesMixin, Record):
@ -1358,7 +1539,8 @@ class PtrValue(_TargetValue):
@classmethod
def process(cls, values):
return [super(PtrValue, cls).process(v) for v in values]
supr = super()
return [supr.process(v) for v in values]
class PtrRecord(ValuesMixin, Record):
@ -1375,7 +1557,7 @@ class PtrRecord(ValuesMixin, Record):
Record.register_type(PtrRecord)
class SshfpValue(EqualityTupleMixin):
class SshfpValue(EqualityTupleMixin, dict):
VALID_ALGORITHMS = (1, 2, 3, 4)
VALID_FINGERPRINT_TYPES = (1, 2)
@ -1411,20 +1593,44 @@ class SshfpValue(EqualityTupleMixin):
@classmethod
def process(cls, values):
return [SshfpValue(v) for v in values]
return [cls(v) for v in values]
def __init__(self, value):
self.algorithm = int(value['algorithm'])
self.fingerprint_type = int(value['fingerprint_type'])
self.fingerprint = value['fingerprint']
super().__init__(
{
'algorithm': int(value['algorithm']),
'fingerprint_type': int(value['fingerprint_type']),
'fingerprint': value['fingerprint'],
}
)
@property
def algorithm(self):
return self['algorithm']
@algorithm.setter
def algorithm(self, value):
self['algorithm'] = value
@property
def fingerprint_type(self):
return self['fingerprint_type']
@fingerprint_type.setter
def fingerprint_type(self, value):
self['fingerprint_type'] = value
@property
def fingerprint(self):
return self['fingerprint']
@fingerprint.setter
def fingerprint(self, value):
self['fingerprint'] = value
@property
def data(self):
return {
'algorithm': self.algorithm,
'fingerprint_type': self.fingerprint_type,
'fingerprint': self.fingerprint,
}
return self
def __hash__(self):
return hash(self.__repr__())
@ -1465,7 +1671,7 @@ class _ChunkedValuesMixin(ValuesMixin):
return values
class _ChunkedValue(object):
class _ChunkedValue(str):
_unescaped_semicolon_re = re.compile(r'\w;')
@classmethod
@ -1486,7 +1692,7 @@ class _ChunkedValue(object):
for v in values:
if v and v[0] == '"':
v = v[1:-1]
ret.append(v.replace('" "', ''))
ret.append(cls(v.replace('" "', '')))
return ret
@ -1498,7 +1704,7 @@ class SpfRecord(_ChunkedValuesMixin, Record):
Record.register_type(SpfRecord)
class SrvValue(EqualityTupleMixin):
class SrvValue(EqualityTupleMixin, dict):
@classmethod
def validate(cls, data, _type):
if not isinstance(data, (list, tuple)):
@ -1541,22 +1747,53 @@ class SrvValue(EqualityTupleMixin):
@classmethod
def process(cls, values):
return [SrvValue(v) for v in values]
return [cls(v) for v in values]
def __init__(self, value):
self.priority = int(value['priority'])
self.weight = int(value['weight'])
self.port = int(value['port'])
self.target = value['target'].lower()
super().__init__(
{
'priority': int(value['priority']),
'weight': int(value['weight']),
'port': int(value['port']),
'target': value['target'].lower(),
}
)
@property
def priority(self):
return self['priority']
@priority.setter
def priority(self, value):
self['priority'] = value
@property
def weight(self):
return self['weight']
@weight.setter
def weight(self, value):
self['weight'] = value
@property
def port(self):
return self['port']
@port.setter
def port(self, value):
self['port'] = value
@property
def target(self):
return self['target']
@target.setter
def target(self, value):
self['target'] = value
@property
def data(self):
return {
'priority': self.priority,
'weight': self.weight,
'port': self.port,
'target': self.target,
}
return self
def __hash__(self):
return hash(self.__repr__())
@ -1585,7 +1822,7 @@ class SrvRecord(ValuesMixin, Record):
Record.register_type(SrvRecord)
class TlsaValue(EqualityTupleMixin):
class TlsaValue(EqualityTupleMixin, dict):
@classmethod
def validate(cls, data, _type):
if not isinstance(data, (list, tuple)):
@ -1632,24 +1869,51 @@ class TlsaValue(EqualityTupleMixin):
@classmethod
def process(cls, values):
return [TlsaValue(v) for v in values]
return [cls(v) for v in values]
def __init__(self, value):
self.certificate_usage = int(value.get('certificate_usage', 0))
self.selector = int(value.get('selector', 0))
self.matching_type = int(value.get('matching_type', 0))
self.certificate_association_data = value[
'certificate_association_data'
]
super().__init__(
{
'certificate_usage': int(value.get('certificate_usage', 0)),
'selector': int(value.get('selector', 0)),
'matching_type': int(value.get('matching_type', 0)),
'certificate_association_data': value[
'certificate_association_data'
],
}
)
@property
def data(self):
return {
'certificate_usage': self.certificate_usage,
'selector': self.selector,
'matching_type': self.matching_type,
'certificate_association_data': self.certificate_association_data,
}
def certificate_usage(self):
return self['certificate_usage']
@certificate_usage.setter
def certificate_usage(self, value):
self['certificate_usage'] = value
@property
def selector(self):
return self['selector']
@selector.setter
def selector(self, value):
self['selector'] = value
@property
def matching_type(self):
return self['matching_type']
@matching_type.setter
def matching_type(self, value):
self['matching_type'] = value
@property
def certificate_association_data(self):
return self['certificate_association_data']
@certificate_association_data.setter
def certificate_association_data(self, value):
self['certificate_association_data'] = value
def _equality_tuple(self):
return (
@ -1686,7 +1950,7 @@ class TxtRecord(_ChunkedValuesMixin, Record):
Record.register_type(TxtRecord)
class UrlfwdValue(EqualityTupleMixin):
class UrlfwdValue(EqualityTupleMixin, dict):
VALID_CODES = (301, 302)
VALID_MASKS = (0, 1, 2)
VALID_QUERY = (0, 1)
@ -1728,37 +1992,70 @@ class UrlfwdValue(EqualityTupleMixin):
@classmethod
def process(cls, values):
return [UrlfwdValue(v) for v in values]
return [cls(v) for v in values]
def __init__(self, value):
self.path = value['path']
self.target = value['target']
self.code = int(value['code'])
self.masking = int(value['masking'])
self.query = int(value['query'])
super().__init__(
{
'path': value['path'],
'target': value['target'],
'code': int(value['code']),
'masking': int(value['masking']),
'query': int(value['query']),
}
)
@property
def data(self):
return {
'path': self.path,
'target': self.target,
'code': self.code,
'masking': self.masking,
'query': self.query,
}
def path(self):
return self['path']
def __hash__(self):
return hash(self.__repr__())
@path.setter
def path(self, value):
self['path'] = value
@property
def target(self):
return self['target']
@target.setter
def target(self, value):
self['target'] = value
@property
def code(self):
return self['code']
@code.setter
def code(self, value):
self['code'] = value
@property
def masking(self):
return self['masking']
@masking.setter
def masking(self, value):
self['masking'] = value
@property
def query(self):
return self['query']
@query.setter
def query(self, value):
self['query'] = value
def _equality_tuple(self):
return (self.path, self.target, self.code, self.masking, self.query)
def __repr__(self):
return (
f'"{self.path}" "{self.target}" {self.code} '
f'{self.masking} {self.query}'
def __hash__(self):
return hash(
(self.path, self.target, self.code, self.masking, self.query)
)
def __repr__(self):
return f'"{self.path}" "{self.target}" {self.code} {self.masking} {self.query}'
class UrlfwdRecord(ValuesMixin, Record):
_type = 'URLFWD'


+ 5
- 0
octodns/yaml.py View File

@ -4,6 +4,7 @@
from natsort import natsort_keygen
from yaml import SafeDumper, SafeLoader, load, dump
from yaml.representer import SafeRepresenter
from yaml.constructor import ConstructorError
@ -54,6 +55,10 @@ class SortingDumper(SafeDumper):
SortingDumper.add_representer(dict, SortingDumper._representer)
# This should handle all the record value types which are ultimately either str
# or dict at some point in their inheritance hierarchy
SortingDumper.add_multi_representer(str, SafeRepresenter.represent_str)
SortingDumper.add_multi_representer(dict, SortingDumper._representer)
def safe_dump(data, fh, **options):


+ 240
- 120
tests/test_octodns_record.py View File

@ -32,6 +32,7 @@ from octodns.record import (
SrvRecord,
SrvValue,
TlsaRecord,
TlsaValue,
TxtRecord,
Update,
UrlfwdRecord,
@ -386,12 +387,14 @@ class TestRecord(TestCase):
def test_caa(self):
a_values = [
{'flags': 0, 'tag': 'issue', 'value': 'ca.example.net'},
{
'flags': 128,
'tag': 'iodef',
'value': 'mailto:security@example.com',
},
CaaValue({'flags': 0, 'tag': 'issue', 'value': 'ca.example.net'}),
CaaValue(
{
'flags': 128,
'tag': 'iodef',
'value': 'mailto:security@example.com',
}
),
]
a_data = {'ttl': 30, 'values': a_values}
a = CaaRecord(self.zone, 'a', a_data)
@ -406,7 +409,9 @@ class TestRecord(TestCase):
self.assertEqual(a_values[1]['value'], a.values[1].value)
self.assertEqual(a_data, a.data)
b_value = {'tag': 'iodef', 'value': 'http://iodef.example.com/'}
b_value = CaaValue(
{'tag': 'iodef', 'value': 'http://iodef.example.com/'}
)
b_data = {'ttl': 30, 'value': b_value}
b = CaaRecord(self.zone, 'b', b_data)
self.assertEqual(0, b.values[0].flags)
@ -448,20 +453,22 @@ class TestRecord(TestCase):
def test_loc(self):
a_values = [
{
'lat_degrees': 31,
'lat_minutes': 58,
'lat_seconds': 52.1,
'lat_direction': 'S',
'long_degrees': 115,
'long_minutes': 49,
'long_seconds': 11.7,
'long_direction': 'E',
'altitude': 20,
'size': 10,
'precision_horz': 10,
'precision_vert': 2,
}
LocValue(
{
'lat_degrees': 31,
'lat_minutes': 58,
'lat_seconds': 52.1,
'lat_direction': 'S',
'long_degrees': 115,
'long_minutes': 49,
'long_seconds': 11.7,
'long_direction': 'E',
'altitude': 20,
'size': 10,
'precision_horz': 10,
'precision_vert': 2,
}
)
]
a_data = {'ttl': 30, 'values': a_values}
a = LocRecord(self.zone, 'a', a_data)
@ -489,20 +496,22 @@ class TestRecord(TestCase):
a_values[0]['precision_vert'], a.values[0].precision_vert
)
b_value = {
'lat_degrees': 32,
'lat_minutes': 7,
'lat_seconds': 19,
'lat_direction': 'S',
'long_degrees': 116,
'long_minutes': 2,
'long_seconds': 25,
'long_direction': 'E',
'altitude': 10,
'size': 1,
'precision_horz': 10000,
'precision_vert': 10,
}
b_value = LocValue(
{
'lat_degrees': 32,
'lat_minutes': 7,
'lat_seconds': 19,
'lat_direction': 'S',
'long_degrees': 116,
'long_minutes': 2,
'long_seconds': 25,
'long_direction': 'E',
'altitude': 10,
'size': 1,
'precision_horz': 10000,
'precision_vert': 10,
}
)
b_data = {'ttl': 30, 'value': b_value}
b = LocRecord(self.zone, 'b', b_data)
self.assertEqual(b_value['lat_degrees'], b.values[0].lat_degrees)
@ -540,8 +549,8 @@ class TestRecord(TestCase):
def test_mx(self):
a_values = [
{'preference': 10, 'exchange': 'smtp1.'},
{'priority': 20, 'value': 'smtp2.'},
MxValue({'preference': 10, 'exchange': 'smtp1.'}),
MxValue({'priority': 20, 'value': 'smtp2.'}),
]
a_data = {'ttl': 30, 'values': a_values}
a = MxRecord(self.zone, 'a', a_data)
@ -550,12 +559,12 @@ class TestRecord(TestCase):
self.assertEqual(30, a.ttl)
self.assertEqual(a_values[0]['preference'], a.values[0].preference)
self.assertEqual(a_values[0]['exchange'], a.values[0].exchange)
self.assertEqual(a_values[1]['priority'], a.values[1].preference)
self.assertEqual(a_values[1]['value'], a.values[1].exchange)
a_data['values'][1] = {'preference': 20, 'exchange': 'smtp2.'}
self.assertEqual(a_values[1]['preference'], a.values[1].preference)
self.assertEqual(a_values[1]['exchange'], a.values[1].exchange)
a_data['values'][1] = MxValue({'preference': 20, 'exchange': 'smtp2.'})
self.assertEqual(a_data, a.data)
b_value = {'preference': 0, 'exchange': 'smtp3.'}
b_value = MxValue({'preference': 0, 'exchange': 'smtp3.'})
b_data = {'ttl': 30, 'value': b_value}
b = MxRecord(self.zone, 'b', b_data)
self.assertEqual(b_value['preference'], b.values[0].preference)
@ -591,22 +600,26 @@ class TestRecord(TestCase):
def test_naptr(self):
a_values = [
{
'order': 10,
'preference': 11,
'flags': 'X',
'service': 'Y',
'regexp': 'Z',
'replacement': '.',
},
{
'order': 20,
'preference': 21,
'flags': 'A',
'service': 'B',
'regexp': 'C',
'replacement': 'foo.com',
},
NaptrValue(
{
'order': 10,
'preference': 11,
'flags': 'X',
'service': 'Y',
'regexp': 'Z',
'replacement': '.',
}
),
NaptrValue(
{
'order': 20,
'preference': 21,
'flags': 'A',
'service': 'B',
'regexp': 'C',
'replacement': 'foo.com',
}
),
]
a_data = {'ttl': 30, 'values': a_values}
a = NaptrRecord(self.zone, 'a', a_data)
@ -618,14 +631,16 @@ class TestRecord(TestCase):
self.assertEqual(a_values[i][k], getattr(a.values[i], k))
self.assertEqual(a_data, a.data)
b_value = {
'order': 30,
'preference': 31,
'flags': 'M',
'service': 'N',
'regexp': 'O',
'replacement': 'x',
}
b_value = NaptrValue(
{
'order': 30,
'preference': 31,
'flags': 'M',
'service': 'N',
'regexp': 'O',
'replacement': 'x',
}
)
b_data = {'ttl': 30, 'value': b_value}
b = NaptrRecord(self.zone, 'b', b_data)
for k in a_values[0].keys():
@ -849,6 +864,30 @@ class TestRecord(TestCase):
values.add(o)
self.assertTrue(o in values)
self.assertEqual(30, o.order)
o.order = o.order + 1
self.assertEqual(31, o.order)
self.assertEqual(32, o.preference)
o.preference = o.preference + 1
self.assertEqual(33, o.preference)
self.assertEqual('M', o.flags)
o.flags = 'P'
self.assertEqual('P', o.flags)
self.assertEqual('N', o.service)
o.service = 'Q'
self.assertEqual('Q', o.service)
self.assertEqual('O', o.regexp)
o.regexp = 'R'
self.assertEqual('R', o.regexp)
self.assertEqual('z', o.replacement)
o.replacement = '1'
self.assertEqual('1', o.replacement)
def test_ns(self):
a_values = ['5.6.7.8.', '6.7.8.9.', '7.8.9.0.']
a_data = {'ttl': 30, 'values': a_values}
@ -867,8 +906,20 @@ class TestRecord(TestCase):
def test_sshfp(self):
a_values = [
{'algorithm': 10, 'fingerprint_type': 11, 'fingerprint': 'abc123'},
{'algorithm': 20, 'fingerprint_type': 21, 'fingerprint': 'def456'},
SshfpValue(
{
'algorithm': 10,
'fingerprint_type': 11,
'fingerprint': 'abc123',
}
),
SshfpValue(
{
'algorithm': 20,
'fingerprint_type': 21,
'fingerprint': 'def456',
}
),
]
a_data = {'ttl': 30, 'values': a_values}
a = SshfpRecord(self.zone, 'a', a_data)
@ -882,11 +933,9 @@ class TestRecord(TestCase):
self.assertEqual(a_values[0]['fingerprint'], a.values[0].fingerprint)
self.assertEqual(a_data, a.data)
b_value = {
'algorithm': 30,
'fingerprint_type': 31,
'fingerprint': 'ghi789',
}
b_value = SshfpValue(
{'algorithm': 30, 'fingerprint_type': 31, 'fingerprint': 'ghi789'}
)
b_data = {'ttl': 30, 'value': b_value}
b = SshfpRecord(self.zone, 'b', b_data)
self.assertEqual(b_value['algorithm'], b.values[0].algorithm)
@ -930,8 +979,12 @@ class TestRecord(TestCase):
def test_srv(self):
a_values = [
{'priority': 10, 'weight': 11, 'port': 12, 'target': 'server1'},
{'priority': 20, 'weight': 21, 'port': 22, 'target': 'server2'},
SrvValue(
{'priority': 10, 'weight': 11, 'port': 12, 'target': 'server1'}
),
SrvValue(
{'priority': 20, 'weight': 21, 'port': 22, 'target': 'server2'}
),
]
a_data = {'ttl': 30, 'values': a_values}
a = SrvRecord(self.zone, '_a._tcp', a_data)
@ -942,14 +995,21 @@ class TestRecord(TestCase):
self.assertEqual(a_values[0]['weight'], a.values[0].weight)
self.assertEqual(a_values[0]['port'], a.values[0].port)
self.assertEqual(a_values[0]['target'], a.values[0].target)
from pprint import pprint
pprint(
{
'a_values': a_values,
'self': a_data,
'other': a.data,
'a.values': a.values,
}
)
self.assertEqual(a_data, a.data)
b_value = {
'priority': 30,
'weight': 31,
'port': 32,
'target': 'server3',
}
b_value = SrvValue(
{'priority': 30, 'weight': 31, 'port': 32, 'target': 'server3'}
)
b_data = {'ttl': 30, 'value': b_value}
b = SrvRecord(self.zone, '_b._tcp', b_data)
self.assertEqual(b_value['priority'], b.values[0].priority)
@ -993,18 +1053,22 @@ class TestRecord(TestCase):
def test_tlsa(self):
a_values = [
{
'certificate_usage': 1,
'selector': 1,
'matching_type': 1,
'certificate_association_data': 'ABABABABABABABABAB',
},
{
'certificate_usage': 2,
'selector': 0,
'matching_type': 2,
'certificate_association_data': 'ABABABABABABABABAC',
},
TlsaValue(
{
'certificate_usage': 1,
'selector': 1,
'matching_type': 1,
'certificate_association_data': 'ABABABABABABABABAB',
}
),
TlsaValue(
{
'certificate_usage': 2,
'selector': 0,
'matching_type': 2,
'certificate_association_data': 'ABABABABABABABABAC',
}
),
]
a_data = {'ttl': 30, 'values': a_values}
a = TlsaRecord(self.zone, 'a', a_data)
@ -1036,12 +1100,14 @@ class TestRecord(TestCase):
)
self.assertEqual(a_data, a.data)
b_value = {
'certificate_usage': 0,
'selector': 0,
'matching_type': 0,
'certificate_association_data': 'AAAAAAAAAAAAAAA',
}
b_value = TlsaValue(
{
'certificate_usage': 0,
'selector': 0,
'matching_type': 0,
'certificate_association_data': 'AAAAAAAAAAAAAAA',
}
)
b_data = {'ttl': 30, 'value': b_value}
b = TlsaRecord(self.zone, 'b', b_data)
self.assertEqual(
@ -1093,20 +1159,24 @@ class TestRecord(TestCase):
def test_urlfwd(self):
a_values = [
{
'path': '/',
'target': 'http://foo',
'code': 301,
'masking': 2,
'query': 0,
},
{
'path': '/target',
'target': 'http://target',
'code': 302,
'masking': 2,
'query': 0,
},
UrlfwdValue(
{
'path': '/',
'target': 'http://foo',
'code': 301,
'masking': 2,
'query': 0,
}
),
UrlfwdValue(
{
'path': '/target',
'target': 'http://target',
'code': 302,
'masking': 2,
'query': 0,
}
),
]
a_data = {'ttl': 30, 'values': a_values}
a = UrlfwdRecord(self.zone, 'a', a_data)
@ -1125,13 +1195,15 @@ class TestRecord(TestCase):
self.assertEqual(a_values[1]['query'], a.values[1].query)
self.assertEqual(a_data, a.data)
b_value = {
'path': '/',
'target': 'http://location',
'code': 301,
'masking': 2,
'query': 0,
}
b_value = UrlfwdValue(
{
'path': '/',
'target': 'http://location',
'code': 301,
'masking': 2,
'query': 0,
}
)
b_data = {'ttl': 30, 'value': b_value}
b = UrlfwdRecord(self.zone, 'b', b_data)
self.assertEqual(b_value['path'], b.values[0].path)
@ -1653,6 +1725,54 @@ class TestRecord(TestCase):
self.assertTrue(c >= c)
self.assertTrue(c <= c)
self.assertEqual(31, a.lat_degrees)
a.lat_degrees = a.lat_degrees + 1
self.assertEqual(32, a.lat_degrees)
self.assertEqual(58, a.lat_minutes)
a.lat_minutes = a.lat_minutes + 1
self.assertEqual(59, a.lat_minutes)
self.assertEqual(52.1, a.lat_seconds)
a.lat_seconds = a.lat_seconds + 1
self.assertEqual(53.1, a.lat_seconds)
self.assertEqual('S', a.lat_direction)
a.lat_direction = 'N'
self.assertEqual('N', a.lat_direction)
self.assertEqual(115, a.long_degrees)
a.long_degrees = a.long_degrees + 1
self.assertEqual(116, a.long_degrees)
self.assertEqual(49, a.long_minutes)
a.long_minutes = a.long_minutes + 1
self.assertEqual(50, a.long_minutes)
self.assertEqual(11.7, a.long_seconds)
a.long_seconds = a.long_seconds + 1
self.assertEqual(12.7, a.long_seconds)
self.assertEqual('E', a.long_direction)
a.long_direction = 'W'
self.assertEqual('W', a.long_direction)
self.assertEqual(20, a.altitude)
a.altitude = a.altitude + 1
self.assertEqual(21, a.altitude)
self.assertEqual(10, a.size)
a.size = a.size + 1
self.assertEqual(11, a.size)
self.assertEqual(10, a.precision_horz)
a.precision_horz = a.precision_horz + 1
self.assertEqual(11, a.precision_horz)
self.assertEqual(2, a.precision_vert)
a.precision_vert = a.precision_vert + 1
self.assertEqual(3, a.precision_vert)
# Hash
values = set()
values.add(a)
@ -3038,7 +3158,7 @@ class TestRecordValidation(TestCase):
)
self.assertEqual('.', record.values[0].exchange)
def test_NXPTR(self):
def test_NAPTR(self):
# doesn't blow up
Record.new(
self.zone,


Loading…
Cancel
Save