Browse Source

WIP breakup of octodns.record file

pull/969/head
Ross McFarland 3 years ago
parent
commit
be7c3d279e
No known key found for this signature in database GPG Key ID: 943B179E15D3B22A
12 changed files with 1185 additions and 1082 deletions
  1. +6
    -0
      CHANGELOG.md
  2. +46
    -1072
      octodns/record/__init__.py
  3. +340
    -0
      octodns/record/base.py
  4. +55
    -0
      octodns/record/change.py
  5. +340
    -0
      octodns/record/dynamic.py
  6. +21
    -0
      octodns/record/exception.py
  7. +101
    -0
      octodns/record/geo.py
  8. +78
    -0
      octodns/record/ipaddress.py
  9. +27
    -0
      octodns/record/rr.py
  10. +163
    -0
      octodns/record/target.py
  11. +2
    -2
      tests/test_octodns_provider_yaml.py
  12. +6
    -8
      tests/test_octodns_record.py

+ 6
- 0
CHANGELOG.md View File

@ -8,6 +8,12 @@
modules now. modules now.
* Provider.strict_supports defaults to true, can be returned to the old * Provider.strict_supports defaults to true, can be returned to the old
behavior by setting strict_supports=False in your provider params. behavior by setting strict_supports=False in your provider params.
* octodns.record has been broken up into multiple files/modules. Most of the
primary things that were available at that module path still will be, but if
you are importing things like idna_encode/decode that actually live elsewhere
from octodns.record you'll need to update and pull them from their actual
home. Classes beginning with _ are not exported from octodns.record any
longer as they were considered private/protected.
#### Stuff #### Stuff


+ 46
- 1072
octodns/record/__init__.py
File diff suppressed because it is too large
View File


+ 340
- 0
octodns/record/base.py View File

@ -0,0 +1,340 @@
#
#
#
from collections import defaultdict
from logging import getLogger
from ..equality import EqualityTupleMixin
from ..idna import IdnaError, idna_decode, idna_encode
from .change import Update
from .exception import RecordException, ValidationError
class Record(EqualityTupleMixin):
log = getLogger('Record')
_CLASSES = {}
@classmethod
def register_type(cls, _class, _type=None):
if _type is None:
_type = _class._type
existing = cls._CLASSES.get(_type)
if existing:
module = existing.__module__
name = existing.__name__
msg = f'Type "{_type}" already registered by {module}.{name}'
raise RecordException(msg)
cls._CLASSES[_type] = _class
@classmethod
def registered_types(cls):
return cls._CLASSES
@classmethod
def new(cls, zone, name, data, source=None, lenient=False):
reasons = []
try:
name = idna_encode(str(name))
except IdnaError as e:
# convert the error into a reason
reasons.append(str(e))
name = str(name)
fqdn = f'{name}.{zone.name}' if name else zone.name
try:
_type = data['type']
except KeyError:
raise Exception(f'Invalid record {idna_decode(fqdn)}, missing type')
try:
_class = cls._CLASSES[_type]
except KeyError:
raise Exception(f'Unknown record type: "{_type}"')
reasons.extend(_class.validate(name, fqdn, data))
try:
lenient |= data['octodns']['lenient']
except KeyError:
pass
if reasons:
if lenient:
cls.log.warning(ValidationError.build_message(fqdn, reasons))
else:
raise ValidationError(fqdn, reasons)
return _class(zone, name, data, source=source)
@classmethod
def validate(cls, name, fqdn, data):
reasons = []
if name == '@':
reasons.append('invalid name "@", use "" instead')
n = len(fqdn)
if n > 253:
reasons.append(
f'invalid fqdn, "{idna_decode(fqdn)}" is too long at {n} '
'chars, max is 253'
)
for label in name.split('.'):
n = len(label)
if n > 63:
reasons.append(
f'invalid label, "{label}" is too long at {n}'
' chars, max is 63'
)
# TODO: look at the idna lib for a lot more potential validations...
try:
ttl = int(data['ttl'])
if ttl < 0:
reasons.append('invalid ttl')
except KeyError:
reasons.append('missing ttl')
try:
if data['octodns']['healthcheck']['protocol'] not in (
'HTTP',
'HTTPS',
'TCP',
):
reasons.append('invalid healthcheck protocol')
except KeyError:
pass
return reasons
@classmethod
def from_rrs(cls, zone, rrs, lenient=False):
# group records by name & type so that multiple rdatas can be combined
# into a single record when needed
grouped = defaultdict(list)
for rr in rrs:
grouped[(rr.name, rr._type)].append(rr)
records = []
# walk the grouped rrs converting each one to data and then create a
# record with that data
for _, rrs in sorted(grouped.items()):
rr = rrs[0]
name = zone.hostname_from_fqdn(rr.name)
_class = cls._CLASSES[rr._type]
data = _class.data_from_rrs(rrs)
record = Record.new(zone, name, data, lenient=lenient)
records.append(record)
return records
def __init__(self, zone, name, data, source=None):
self.zone = zone
if name:
# internally everything is idna
self.name = idna_encode(str(name))
# we'll keep a decoded version around for logs and errors
self.decoded_name = idna_decode(self.name)
else:
self.name = self.decoded_name = name
self.log.debug(
'__init__: zone.name=%s, type=%11s, name=%s',
zone.decoded_name,
self.__class__.__name__,
self.decoded_name,
)
self.source = source
self.ttl = int(data['ttl'])
self._octodns = data.get('octodns', {})
def _data(self):
return {'ttl': self.ttl}
@property
def data(self):
return self._data()
@property
def fqdn(self):
# TODO: these should be calculated and set in __init__ rather than on
# each use
if self.name:
return f'{self.name}.{self.zone.name}'
return self.zone.name
@property
def decoded_fqdn(self):
if self.decoded_name:
return f'{self.decoded_name}.{self.zone.decoded_name}'
return self.zone.decoded_name
@property
def ignored(self):
return self._octodns.get('ignored', False)
@property
def excluded(self):
return self._octodns.get('excluded', [])
@property
def included(self):
return self._octodns.get('included', [])
def healthcheck_host(self, value=None):
healthcheck = self._octodns.get('healthcheck', {})
if healthcheck.get('protocol', None) == 'TCP':
return None
return healthcheck.get('host', self.fqdn[:-1]) or value
@property
def healthcheck_path(self):
healthcheck = self._octodns.get('healthcheck', {})
if healthcheck.get('protocol', None) == 'TCP':
return None
try:
return healthcheck['path']
except KeyError:
return '/_dns'
@property
def healthcheck_protocol(self):
try:
return self._octodns['healthcheck']['protocol']
except KeyError:
return 'HTTPS'
@property
def healthcheck_port(self):
try:
return int(self._octodns['healthcheck']['port'])
except KeyError:
return 443
def changes(self, other, target):
# We're assuming we have the same name and type if we're being compared
if self.ttl != other.ttl:
return Update(self, other)
def copy(self, zone=None):
data = self.data
data['type'] = self._type
data['octodns'] = self._octodns
return Record.new(
zone if zone else self.zone,
self.name,
data,
self.source,
lenient=True,
)
# NOTE: we're using __hash__ and ordering methods that consider Records
# equivalent if they have the same name & _type. Values are ignored. This
# is useful when computing diffs/changes.
def __hash__(self):
return f'{self.name}:{self._type}'.__hash__()
def _equality_tuple(self):
return (self.name, self._type)
def __repr__(self):
# Make sure this is always overridden
raise NotImplementedError('Abstract base class, __repr__ required')
class ValuesMixin(object):
@classmethod
def validate(cls, name, fqdn, data):
reasons = super().validate(name, fqdn, data)
values = data.get('values', data.get('value', []))
reasons.extend(cls._value_type.validate(values, cls._type))
return reasons
@classmethod
def data_from_rrs(cls, rrs):
# type and TTL come from the first rr
rr = rrs[0]
# values come from parsing the rdata portion of all rrs
values = [cls._value_type.parse_rdata_text(rr.rdata) for rr in rrs]
return {'ttl': rr.ttl, 'type': rr._type, 'values': values}
def __init__(self, zone, name, data, source=None):
super().__init__(zone, name, data, source=source)
try:
values = data['values']
except KeyError:
values = [data['value']]
self.values = sorted(self._value_type.process(values))
def changes(self, other, target):
if self.values != other.values:
return Update(self, other)
return super().changes(other, target)
def _data(self):
ret = super()._data()
if len(self.values) > 1:
values = [getattr(v, 'data', v) for v in self.values if v]
if len(values) > 1:
ret['values'] = values
elif len(values) == 1:
ret['value'] = values[0]
elif len(self.values) == 1:
v = self.values[0]
if v:
ret['value'] = getattr(v, 'data', v)
return ret
@property
def rrs(self):
return (
self.fqdn,
self.ttl,
self._type,
[v.rdata_text for v in self.values],
)
def __repr__(self):
values = "', '".join([str(v) for v in self.values])
klass = self.__class__.__name__
return f"<{klass} {self._type} {self.ttl}, {self.decoded_fqdn}, ['{values}']>"
class ValueMixin(object):
@classmethod
def validate(cls, name, fqdn, data):
reasons = super().validate(name, fqdn, data)
reasons.extend(
cls._value_type.validate(data.get('value', None), cls._type)
)
return reasons
@classmethod
def data_from_rrs(cls, rrs):
# single value, so single rr only...
rr = rrs[0]
return {
'ttl': rr.ttl,
'type': rr._type,
'value': cls._value_type.parse_rdata_text(rr.rdata),
}
def __init__(self, zone, name, data, source=None):
super().__init__(zone, name, data, source=source)
self.value = self._value_type.process(data['value'])
def changes(self, other, target):
if self.value != other.value:
return Update(self, other)
return super().changes(other, target)
def _data(self):
ret = super()._data()
if self.value:
ret['value'] = getattr(self.value, 'data', self.value)
return ret
@property
def rrs(self):
return self.fqdn, self.ttl, self._type, [self.value.rdata_text]
def __repr__(self):
klass = self.__class__.__name__
return f'<{klass} {self._type} {self.ttl}, {self.decoded_fqdn}, {self.value}>'

+ 55
- 0
octodns/record/change.py View File

@ -0,0 +1,55 @@
#
#
#
from ..equality import EqualityTupleMixin
class Change(EqualityTupleMixin):
def __init__(self, existing, new):
self.existing = existing
self.new = new
@property
def record(self):
'Returns new if we have one, existing otherwise'
return self.new or self.existing
def _equality_tuple(self):
return (self.CLASS_ORDERING, self.record.name, self.record._type)
class Create(Change):
CLASS_ORDERING = 1
def __init__(self, new):
super().__init__(None, new)
def __repr__(self, leader=''):
source = self.new.source.id if self.new.source else ''
return f'Create {self.new} ({source})'
class Update(Change):
CLASS_ORDERING = 2
# Leader is just to allow us to work around heven eating leading whitespace
# in our output. When we call this from the Manager.sync plan summary
# section we'll pass in a leader, otherwise we'll just let it default and
# do nothing
def __repr__(self, leader=''):
source = self.new.source.id if self.new.source else ''
return (
f'Update\n{leader} {self.existing} ->\n'
f'{leader} {self.new} ({source})'
)
class Delete(Change):
CLASS_ORDERING = 0
def __init__(self, existing):
super().__init__(existing, None)
def __repr__(self, leader=''):
return f'Delete {self.existing}'

+ 340
- 0
octodns/record/dynamic.py View File

@ -0,0 +1,340 @@
#
#
#
from logging import getLogger
import re
from .change import Update
from .geo import GeoCodes
class _DynamicPool(object):
log = getLogger('_DynamicPool')
def __init__(self, _id, data, value_type):
self._id = _id
values = [
{
'value': value_type(d['value']),
'weight': d.get('weight', 1),
'status': d.get('status', 'obey'),
}
for d in data['values']
]
values.sort(key=lambda d: d['value'])
# normalize weight of a single-value pool
if len(values) == 1:
weight = data['values'][0].get('weight', 1)
if weight != 1:
self.log.warning(
'Using weight=1 instead of %s for single-value pool %s',
weight,
_id,
)
values[0]['weight'] = 1
fallback = data.get('fallback', None)
self.data = {
'fallback': fallback if fallback != 'default' else None,
'values': values,
}
def _data(self):
return self.data
def __eq__(self, other):
if not isinstance(other, _DynamicPool):
return False
return self.data == other.data
def __ne__(self, other):
return not self.__eq__(other)
def __repr__(self):
return f'{self.data}'
class _DynamicRule(object):
def __init__(self, i, data):
self.i = i
self.data = {}
try:
self.data['pool'] = data['pool']
except KeyError:
pass
try:
self.data['geos'] = sorted(data['geos'])
except KeyError:
pass
def _data(self):
return self.data
def __eq__(self, other):
if not isinstance(other, _DynamicRule):
return False
return self.data == other.data
def __ne__(self, other):
return not self.__eq__(other)
def __repr__(self):
return f'{self.data}'
class _Dynamic(object):
def __init__(self, pools, rules):
self.pools = pools
self.rules = rules
def _data(self):
pools = {}
for _id, pool in self.pools.items():
pools[_id] = pool._data()
rules = []
for rule in self.rules:
rules.append(rule._data())
return {'pools': pools, 'rules': rules}
def __eq__(self, other):
if not isinstance(other, _Dynamic):
return False
ret = self.pools == other.pools and self.rules == other.rules
return ret
def __ne__(self, other):
return not self.__eq__(other)
def __repr__(self):
return f'{self.pools}, {self.rules}'
class _DynamicMixin(object):
geo_re = re.compile(
r'^(?P<continent_code>\w\w)(-(?P<country_code>\w\w)'
r'(-(?P<subdivision_code>\w\w))?)?$'
)
@classmethod
def validate(cls, name, fqdn, data):
reasons = super().validate(name, fqdn, data)
if 'dynamic' not in data:
return reasons
elif 'geo' in data:
reasons.append('"dynamic" record with "geo" content')
try:
pools = data['dynamic']['pools']
except KeyError:
pools = {}
pools_exist = set()
pools_seen = set()
pools_seen_as_fallback = set()
if not isinstance(pools, dict):
reasons.append('pools must be a dict')
elif not pools:
reasons.append('missing pools')
else:
for _id, pool in sorted(pools.items()):
if not isinstance(pool, dict):
reasons.append(f'pool "{_id}" must be a dict')
continue
try:
values = pool['values']
except KeyError:
reasons.append(f'pool "{_id}" is missing values')
continue
pools_exist.add(_id)
for i, value in enumerate(values):
value_num = i + 1
try:
weight = value['weight']
weight = int(weight)
if weight < 1 or weight > 100:
reasons.append(
f'invalid weight "{weight}" in '
f'pool "{_id}" value {value_num}'
)
except KeyError:
pass
except ValueError:
reasons.append(
f'invalid weight "{weight}" in '
f'pool "{_id}" value {value_num}'
)
try:
status = value['status']
if status not in ['up', 'down', 'obey']:
reasons.append(
f'invalid status "{status}" in '
f'pool "{_id}" value {value_num}'
)
except KeyError:
pass
try:
value = value['value']
reasons.extend(
cls._value_type.validate(value, cls._type)
)
except KeyError:
reasons.append(
f'missing value in pool "{_id}" '
f'value {value_num}'
)
if len(values) == 1 and values[0].get('weight', 1) != 1:
reasons.append(
f'pool "{_id}" has single value with weight!=1'
)
fallback = pool.get('fallback', None)
if fallback is not None:
if fallback in pools:
pools_seen_as_fallback.add(fallback)
else:
reasons.append(
f'undefined fallback "{fallback}" '
f'for pool "{_id}"'
)
# Check for loops
fallback = pools[_id].get('fallback', None)
seen = [_id, fallback]
while fallback is not None:
# See if there's a next fallback
fallback = pools.get(fallback, {}).get('fallback', None)
if fallback in seen:
loop = ' -> '.join(seen)
reasons.append(f'loop in pool fallbacks: {loop}')
# exit the loop
break
seen.append(fallback)
try:
rules = data['dynamic']['rules']
except KeyError:
rules = []
if not isinstance(rules, (list, tuple)):
reasons.append('rules must be a list')
elif not rules:
reasons.append('missing rules')
else:
seen_default = False
for i, rule in enumerate(rules):
rule_num = i + 1
try:
pool = rule['pool']
except KeyError:
reasons.append(f'rule {rule_num} missing pool')
continue
try:
geos = rule['geos']
except KeyError:
geos = []
if not isinstance(pool, str):
reasons.append(f'rule {rule_num} invalid pool "{pool}"')
else:
if pool not in pools:
reasons.append(
f'rule {rule_num} undefined pool ' f'"{pool}"'
)
elif pool in pools_seen and geos:
reasons.append(
f'rule {rule_num} invalid, target '
f'pool "{pool}" reused'
)
pools_seen.add(pool)
if not geos:
if seen_default:
reasons.append(f'rule {rule_num} duplicate default')
seen_default = True
if not isinstance(geos, (list, tuple)):
reasons.append(f'rule {rule_num} geos must be a list')
else:
for geo in geos:
reasons.extend(
GeoCodes.validate(geo, f'rule {rule_num} ')
)
unused = pools_exist - pools_seen - pools_seen_as_fallback
if unused:
unused = '", "'.join(sorted(unused))
reasons.append(f'unused pools: "{unused}"')
return reasons
def __init__(self, zone, name, data, *args, **kwargs):
super().__init__(zone, name, data, *args, **kwargs)
self.dynamic = {}
if 'dynamic' not in data:
return
# pools
try:
pools = dict(data['dynamic']['pools'])
except:
pools = {}
for _id, pool in sorted(pools.items()):
pools[_id] = _DynamicPool(_id, pool, self._value_type)
# rules
try:
rules = list(data['dynamic']['rules'])
except:
rules = []
parsed = []
for i, rule in enumerate(rules):
parsed.append(_DynamicRule(i, rule))
# dynamic
self.dynamic = _Dynamic(pools, parsed)
def _data(self):
ret = super()._data()
if self.dynamic:
ret['dynamic'] = self.dynamic._data()
return ret
def changes(self, other, target):
if target.SUPPORTS_DYNAMIC:
if self.dynamic != other.dynamic:
return Update(self, other)
return super().changes(other, target)
def __repr__(self):
# TODO: improve this whole thing, we need multi-line...
if self.dynamic:
# TODO: this hack can't going to cut it, as part of said
# improvements the value types should deal with serializing their
# value
try:
values = self.values
except AttributeError:
values = self.value
klass = self.__class__.__name__
return (
f'<{klass} {self._type} {self.ttl}, {self.decoded_fqdn}, '
f'{values}, {self.dynamic}>'
)
return super().__repr__()

+ 21
- 0
octodns/record/exception.py View File

@ -0,0 +1,21 @@
#
#
#
from ..idna import idna_decode
class RecordException(Exception):
pass
class ValidationError(RecordException):
@classmethod
def build_message(cls, fqdn, reasons):
reasons = '\n - '.join(reasons)
return f'Invalid record {idna_decode(fqdn)}\n - {reasons}'
def __init__(self, fqdn, reasons):
super().__init__(self.build_message(fqdn, reasons))
self.fqdn = fqdn
self.reasons = reasons

+ 101
- 0
octodns/record/geo.py View File

@ -3,7 +3,11 @@
# #
from logging import getLogger from logging import getLogger
import re
from ..equality import EqualityTupleMixin
from .base import ValuesMixin
from .change import Update
from .geo_data import geo_data from .geo_data import geo_data
@ -79,3 +83,100 @@ class GeoCodes(object):
if province in geo_data['NA']['CA']['provinces']: if province in geo_data['NA']['CA']['provinces']:
country = 'CA' country = 'CA'
return f'NA-{country}-{province}' return f'NA-{country}-{province}'
class GeoValue(EqualityTupleMixin):
geo_re = re.compile(
r'^(?P<continent_code>\w\w)(-(?P<country_code>\w\w)'
r'(-(?P<subdivision_code>\w\w))?)?$'
)
@classmethod
def _validate_geo(cls, code):
reasons = []
match = cls.geo_re.match(code)
if not match:
reasons.append(f'invalid geo "{code}"')
return reasons
def __init__(self, geo, values):
self.code = geo
match = self.geo_re.match(geo)
self.continent_code = match.group('continent_code')
self.country_code = match.group('country_code')
self.subdivision_code = match.group('subdivision_code')
self.values = sorted(values)
@property
def parents(self):
bits = self.code.split('-')[:-1]
while bits:
yield '-'.join(bits)
bits.pop()
def _equality_tuple(self):
return (
self.continent_code,
self.country_code,
self.subdivision_code,
self.values,
)
def __repr__(self):
return (
f"'Geo {self.continent_code} {self.country_code} "
"{self.subdivision_code} {self.values}'"
)
class _GeoMixin(ValuesMixin):
'''
Adds GeoDNS support to a record.
Must be included before `Record`.
'''
@classmethod
def validate(cls, name, fqdn, data):
reasons = super().validate(name, fqdn, data)
try:
geo = dict(data['geo'])
for code, values in geo.items():
reasons.extend(GeoValue._validate_geo(code))
reasons.extend(cls._value_type.validate(values, cls._type))
except KeyError:
pass
return reasons
def __init__(self, zone, name, data, *args, **kwargs):
super().__init__(zone, name, data, *args, **kwargs)
try:
self.geo = dict(data['geo'])
except KeyError:
self.geo = {}
for code, values in self.geo.items():
self.geo[code] = GeoValue(code, values)
def _data(self):
ret = super()._data()
if self.geo:
geo = {}
for code, value in self.geo.items():
geo[code] = value.values
ret['geo'] = geo
return ret
def changes(self, other, target):
if target.SUPPORTS_GEO:
if self.geo != other.geo:
return Update(self, other)
return super().changes(other, target)
def __repr__(self):
if self.geo:
klass = self.__class__.__name__
return (
f'<{klass} {self._type} {self.ttl}, {self.decoded_fqdn}, '
f'{self.values}, {self.geo}>'
)
return super().__repr__()

+ 78
- 0
octodns/record/ipaddress.py View File

@ -0,0 +1,78 @@
#
#
#
from ipaddress import IPv4Address as _IPv4Address, IPv6Address as _IPv6Address
from .base import Record
from .dynamic import _DynamicMixin
from .geo import _GeoMixin
class _IpAddress(str):
@classmethod
def parse_rdata_text(cls, value):
return value
@classmethod
def validate(cls, data, _type):
if not isinstance(data, (list, tuple)):
data = (data,)
if len(data) == 0:
return ['missing value(s)']
reasons = []
for value in data:
if value == '':
reasons.append('empty value')
elif value is None:
reasons.append('missing value(s)')
else:
try:
cls._address_type(str(value))
except Exception:
addr_name = cls._address_name
reasons.append(f'invalid {addr_name} address "{value}"')
return reasons
@classmethod
def process(cls, values):
# Translating None into '' so that the list will be sortable in
# python3, get everything to str first
values = [v if v is not None else '' for v in values]
# Now round trip all non-'' through the address type and back to a str
# to normalize the address representation.
return [cls(v) if v != '' else '' for v in values]
def __new__(cls, v):
v = str(cls._address_type(v))
return super().__new__(cls, v)
@property
def rdata_text(self):
return self
class Ipv4Address(_IpAddress):
_address_type = _IPv4Address
_address_name = 'IPv4'
class ARecord(_DynamicMixin, _GeoMixin, Record):
_type = 'A'
_value_type = Ipv4Address
Record.register_type(ARecord)
class Ipv6Address(_IpAddress):
_address_type = _IPv6Address
_address_name = 'IPv6'
class AaaaRecord(_DynamicMixin, _GeoMixin, Record):
_type = 'AAAA'
_value_type = Ipv6Address
Record.register_type(AaaaRecord)

+ 27
- 0
octodns/record/rr.py View File

@ -0,0 +1,27 @@
#
#
#
from .exception import RecordException
class RrParseError(RecordException):
def __init__(self, message='failed to parse string value as RR text'):
super().__init__(message)
class Rr(object):
'''
Simple object intended to be used with Record.from_rrs to allow providers
that work with RFC formatted rdata to share centralized parsing/encoding
code
'''
def __init__(self, name, _type, ttl, rdata):
self.name = name
self._type = _type
self.ttl = ttl
self.rdata = rdata
def __repr__(self):
return f'Rr<{self.name}, {self._type}, {self.ttl}, {self.rdata}'

+ 163
- 0
octodns/record/target.py View File

@ -0,0 +1,163 @@
#
#
#
from fqdn import FQDN
from ..idna import idna_encode
from .base import Record, ValueMixin, ValuesMixin
from .dynamic import _DynamicMixin
class _TargetValue(str):
@classmethod
def parse_rdata_text(self, value):
return value
@classmethod
def validate(cls, data, _type):
reasons = []
if data == '':
reasons.append('empty value')
elif not data:
reasons.append('missing value')
else:
data = idna_encode(data)
if not FQDN(str(data), allow_underscores=True).is_valid:
reasons.append(f'{_type} value "{data}" is not a valid FQDN')
elif not data.endswith('.'):
reasons.append(f'{_type} value "{data}" missing trailing .')
return reasons
@classmethod
def process(cls, value):
if value:
return cls(value)
return None
def __new__(cls, v):
v = idna_encode(v)
return super().__new__(cls, v)
@property
def rdata_text(self):
return self
#
# much like _TargetValue, but geared towards multiple values
class _TargetsValue(str):
@classmethod
def parse_rdata_text(cls, value):
return value
@classmethod
def validate(cls, data, _type):
if not data:
return ['missing value(s)']
elif not isinstance(data, (list, tuple)):
data = (data,)
reasons = []
for value in data:
value = idna_encode(value)
if not FQDN(value, allow_underscores=True).is_valid:
reasons.append(
f'Invalid {_type} value "{value}" is not a valid FQDN.'
)
elif not value.endswith('.'):
reasons.append(f'{_type} value "{value}" missing trailing .')
return reasons
@classmethod
def process(cls, values):
return [cls(v) for v in values]
def __new__(cls, v):
v = idna_encode(v)
return super().__new__(cls, v)
@property
def rdata_text(self):
return self
class AliasValue(_TargetValue):
pass
class AliasRecord(ValueMixin, Record):
_type = 'ALIAS'
_value_type = AliasValue
@classmethod
def validate(cls, name, fqdn, data):
reasons = []
if name != '':
reasons.append('non-root ALIAS not allowed')
reasons.extend(super().validate(name, fqdn, data))
return reasons
Record.register_type(AliasRecord)
class CnameValue(_TargetValue):
pass
class CnameRecord(_DynamicMixin, ValueMixin, Record):
_type = 'CNAME'
_value_type = CnameValue
@classmethod
def validate(cls, name, fqdn, data):
reasons = []
if name == '':
reasons.append('root CNAME not allowed')
reasons.extend(super().validate(name, fqdn, data))
return reasons
Record.register_type(CnameRecord)
class DnameValue(_TargetValue):
pass
class DnameRecord(_DynamicMixin, ValueMixin, Record):
_type = 'DNAME'
_value_type = DnameValue
Record.register_type(DnameRecord)
class NsValue(_TargetsValue):
pass
class NsRecord(ValuesMixin, Record):
_type = 'NS'
_value_type = NsValue
Record.register_type(NsRecord)
class PtrValue(_TargetsValue):
pass
class PtrRecord(ValuesMixin, Record):
_type = 'PTR'
_value_type = PtrValue
# This is for backward compatibility with providers that don't support
# multi-value PTR records.
@property
def value(self):
return self.values[0]
Record.register_type(PtrRecord)

+ 2
- 2
tests/test_octodns_provider_yaml.py View File

@ -9,7 +9,7 @@ from yaml import safe_load
from yaml.constructor import ConstructorError from yaml.constructor import ConstructorError
from octodns.idna import idna_encode from octodns.idna import idna_encode
from octodns.record import _NsValue, Create, Record, ValuesMixin
from octodns.record import NsValue, Create, Record, ValuesMixin
from octodns.provider import ProviderException from octodns.provider import ProviderException
from octodns.provider.base import Plan from octodns.provider.base import Plan
from octodns.provider.yaml import ( from octodns.provider.yaml import (
@ -273,7 +273,7 @@ xn--dj-kia8a:
class YamlRecord(ValuesMixin, Record): class YamlRecord(ValuesMixin, Record):
_type = 'YAML' _type = 'YAML'
_value_type = _NsValue
_value_type = NsValue
# don't know anything about a yaml type # don't know anything about a yaml type
self.assertTrue('YAML' not in source.SUPPORTS) self.assertTrue('YAML' not in source.SUPPORTS)


+ 6
- 8
tests/test_octodns_record.py View File

@ -26,6 +26,7 @@ from octodns.record import (
NaptrRecord, NaptrRecord,
NaptrValue, NaptrValue,
NsRecord, NsRecord,
NsValue,
PtrRecord, PtrRecord,
PtrValue, PtrValue,
Record, Record,
@ -46,12 +47,9 @@ from octodns.record import (
ValidationError, ValidationError,
ValuesMixin, ValuesMixin,
_ChunkedValue, _ChunkedValue,
_Dynamic,
_DynamicPool,
_DynamicRule,
_NsValue,
_TargetValue,
) )
from octodns.record.dynamic import _Dynamic, _DynamicPool, _DynamicRule
from octodns.record.target import _TargetValue
from octodns.zone import Zone from octodns.zone import Zone
from helpers import DynamicProvider, GeoProvider, SimpleProvider from helpers import DynamicProvider, GeoProvider, SimpleProvider
@ -64,13 +62,13 @@ class TestRecord(TestCase):
with self.assertRaises(RecordException) as ctx: with self.assertRaises(RecordException) as ctx:
Record.register_type(None, 'A') Record.register_type(None, 'A')
self.assertEqual( self.assertEqual(
'Type "A" already registered by octodns.record.ARecord',
'Type "A" already registered by octodns.record.ipaddress.ARecord',
str(ctx.exception), str(ctx.exception),
) )
class AaRecord(ValuesMixin, Record): class AaRecord(ValuesMixin, Record):
_type = 'AA' _type = 'AA'
_value_type = _NsValue
_value_type = NsValue
self.assertTrue('AA' not in Record.registered_types()) self.assertTrue('AA' not in Record.registered_types())
@ -1514,7 +1512,7 @@ class TestRecord(TestCase):
'1.2.word.4', '1.2.word.4',
'1.2.3.4', '1.2.3.4',
): ):
self.assertEqual(s, _NsValue.parse_rdata_text(s))
self.assertEqual(s, NsValue.parse_rdata_text(s))
zone = Zone('unit.tests.', []) zone = Zone('unit.tests.', [])
a = NsRecord(zone, 'a', {'ttl': 42, 'value': 'some.target.'}) a = NsRecord(zone, 'a', {'ttl': 42, 'value': 'some.target.'})


Loading…
Cancel
Save