Browse Source

Merge branch 'master' of https://github.com/github/octodns into AZProvider

pull/84/head
Heesu Hwang 9 years ago
parent
commit
98f7c646dd
20 changed files with 1053 additions and 312 deletions
  1. +4
    -1
      octodns/cmds/dump.py
  2. +3
    -2
      octodns/manager.py
  3. +1
    -1
      octodns/provider/base.py
  4. +5
    -3
      octodns/provider/cloudflare.py
  5. +5
    -3
      octodns/provider/dnsimple.py
  6. +6
    -3
      octodns/provider/dyn.py
  7. +5
    -3
      octodns/provider/ns1.py
  8. +4
    -3
      octodns/provider/powerdns.py
  9. +6
    -3
      octodns/provider/route53.py
  10. +6
    -3
      octodns/provider/yaml.py
  11. +268
    -97
      octodns/record.py
  12. +8
    -0
      octodns/source/base.py
  13. +11
    -8
      octodns/source/tinydns.py
  14. +15
    -3
      octodns/zone.py
  15. +2
    -2
      tests/helpers.py
  16. +3
    -3
      tests/test_octodns_manager.py
  17. +2
    -2
      tests/test_octodns_provider_base.py
  18. +7
    -5
      tests/test_octodns_provider_route53.py
  19. +666
    -166
      tests/test_octodns_record.py
  20. +26
    -1
      tests/test_octodns_zone.py

+ 4
- 1
octodns/cmds/dump.py View File

@ -18,6 +18,9 @@ def main():
parser.add_argument('--output-dir', required=True,
help='The directory into which the results will be '
'written (Note: will overwrite existing files)')
parser.add_argument('--lenient', action='store_true', default=False,
help='Ignore record validations and do a best effort '
'dump')
parser.add_argument('zone', help='Zone to dump')
parser.add_argument('source', nargs='+',
help='Source(s) to pull data from')
@ -25,7 +28,7 @@ def main():
args = parser.parse_args()
manager = Manager(args.config_file)
manager.dump(args.zone, args.output_dir, *args.source)
manager.dump(args.zone, args.output_dir, args.lenient, *args.source)
if __name__ == '__main__':


+ 3
- 2
octodns/manager.py View File

@ -69,6 +69,7 @@ class Manager(object):
manager_config = self.config.get('manager', {})
max_workers = manager_config.get('max_workers', 1) \
if max_workers is None else max_workers
self.log.info('__init__: max_workers=%d', max_workers)
if max_workers > 1:
self._executor = ThreadPoolExecutor(max_workers=max_workers)
else:
@ -322,7 +323,7 @@ class Manager(object):
return zb.changes(za, _AggregateTarget(a + b))
def dump(self, zone, output_dir, source, *sources):
def dump(self, zone, output_dir, lenient, source, *sources):
'''
Dump zone data from the specified source
'''
@ -341,7 +342,7 @@ class Manager(object):
zone = Zone(zone, self.configured_sub_zones(zone))
for source in sources:
source.populate(zone)
source.populate(zone, lenient=lenient)
plan = target.plan(zone)
target.apply(plan)


+ 1
- 1
octodns/provider/base.py View File

@ -104,7 +104,7 @@ class BaseProvider(BaseSource):
self.log.info('plan: desired=%s', desired.name)
existing = Zone(desired.name, desired.sub_zones)
self.populate(existing, target=True)
self.populate(existing, target=True, lenient=True)
# compute the changes at the zone/record level
changes = existing.changes(desired, self)


+ 5
- 3
octodns/provider/cloudflare.py View File

@ -154,8 +154,9 @@ class CloudflareProvider(BaseProvider):
return self._zone_records[zone.name]
def populate(self, zone, target=False):
self.log.debug('populate: name=%s', zone.name)
def populate(self, zone, target=False, lenient=False):
self.log.debug('populate: name=%s, target=%s, lenient=%s', zone.name,
target, lenient)
before = len(zone.records)
records = self.zone_records(zone)
@ -171,7 +172,8 @@ class CloudflareProvider(BaseProvider):
for _type, records in types.items():
data_for = getattr(self, '_data_for_{}'.format(_type))
data = data_for(_type, records)
record = Record.new(zone, name, data, source=self)
record = Record.new(zone, name, data, source=self,
lenient=lenient)
zone.add_record(record)
self.log.info('populate: found %s records',


+ 5
- 3
octodns/provider/dnsimple.py View File

@ -234,8 +234,9 @@ class DnsimpleProvider(BaseProvider):
return self._zone_records[zone.name]
def populate(self, zone, target=False):
self.log.debug('populate: name=%s', zone.name)
def populate(self, zone, target=False, lenient=False):
self.log.debug('populate: name=%s, target=%s, lenient=%s', zone.name,
target, lenient)
values = defaultdict(lambda: defaultdict(list))
for record in self.zone_records(zone):
@ -252,7 +253,8 @@ class DnsimpleProvider(BaseProvider):
for name, types in values.items():
for _type, records in types.items():
data_for = getattr(self, '_data_for_{}'.format(_type))
record = Record.new(zone, name, data_for(_type, records))
record = Record.new(zone, name, data_for(_type, records),
source=self, lenient=lenient)
zone.add_record(record)
self.log.info('populate: found %s records',


+ 6
- 3
octodns/provider/dyn.py View File

@ -338,8 +338,10 @@ class DynProvider(BaseProvider):
return td_records
def populate(self, zone, target=False):
self.log.info('populate: zone=%s', zone.name)
def populate(self, zone, target=False, lenient=False):
self.log.debug('populate: name=%s, target=%s, lenient=%s', zone.name,
target, lenient)
before = len(zone.records)
self._check_dyn_sess()
@ -364,7 +366,8 @@ class DynProvider(BaseProvider):
for _type, records in types.items():
data_for = getattr(self, '_data_for_{}'.format(_type))
data = data_for(_type, records)
record = Record.new(zone, name, data, source=self)
record = Record.new(zone, name, data, source=self,
lenient=lenient)
if record not in td_records:
zone.add_record(record)


+ 5
- 3
octodns/provider/ns1.py View File

@ -111,8 +111,9 @@ class Ns1Provider(BaseProvider):
'values': values,
}
def populate(self, zone, target=False):
self.log.debug('populate: name=%s', zone.name)
def populate(self, zone, target=False, lenient=False):
self.log.debug('populate: name=%s, target=%s, lenient=%s', zone.name,
target, lenient)
try:
nsone_zone = self._client.loadZone(zone.name[:-1])
@ -127,7 +128,8 @@ class Ns1Provider(BaseProvider):
_type = record['type']
data_for = getattr(self, '_data_for_{}'.format(_type))
name = zone.hostname_from_fqdn(record['domain'])
record = Record.new(zone, name, data_for(_type, record))
record = Record.new(zone, name, data_for(_type, record),
source=self, lenient=lenient)
zone.add_record(record)
self.log.info('populate: found %s records',


+ 4
- 3
octodns/provider/powerdns.py View File

@ -146,8 +146,9 @@ class PowerDnsBaseProvider(BaseProvider):
'ttl': rrset['ttl']
}
def populate(self, zone, target=False):
self.log.debug('populate: name=%s', zone.name)
def populate(self, zone, target=False, lenient=False):
self.log.debug('populate: name=%s, target=%s, lenient=%s', zone.name,
target, lenient)
resp = None
try:
@ -177,7 +178,7 @@ class PowerDnsBaseProvider(BaseProvider):
data_for = getattr(self, '_data_for_{}'.format(_type))
record_name = zone.hostname_from_fqdn(rrset['name'])
record = Record.new(zone, record_name, data_for(rrset),
source=self)
source=self, lenient=lenient)
zone.add_record(record)
self.log.info('populate: found %s records',


+ 6
- 3
octodns/provider/route53.py View File

@ -418,8 +418,10 @@ class Route53Provider(BaseProvider):
return self._r53_rrsets[zone_id]
def populate(self, zone, target=False):
self.log.debug('populate: name=%s', zone.name)
def populate(self, zone, target=False, lenient=False):
self.log.debug('populate: name=%s, target=%s, lenient=%s', zone.name,
target, lenient)
before = len(zone.records)
zone_id = self._get_zone_id(zone.name)
@ -449,7 +451,8 @@ class Route53Provider(BaseProvider):
data['geo'] = geo
else:
data = data[0]
record = Record.new(zone, name, data, source=self)
record = Record.new(zone, name, data, source=self,
lenient=lenient)
zone.add_record(record)
self.log.info('populate: found %s records',


+ 6
- 3
octodns/provider/yaml.py View File

@ -45,8 +45,10 @@ class YamlProvider(BaseProvider):
self.default_ttl = default_ttl
self.enforce_order = enforce_order
def populate(self, zone, target=False):
self.log.debug('populate: zone=%s, target=%s', zone.name, target)
def populate(self, zone, target=False, lenient=False):
self.log.debug('populate: name=%s, target=%s, lenient=%s', zone.name,
target, lenient)
if target:
# When acting as a target we ignore any existing records so that we
# create a completely new copy
@ -63,7 +65,8 @@ class YamlProvider(BaseProvider):
for d in data:
if 'ttl' not in d:
d['ttl'] = self.default_ttl
record = Record.new(zone, name, d, source=self)
record = Record.new(zone, name, d, source=self,
lenient=lenient)
zone.add_record(record)
self.log.info('populate: found %s records',


+ 268
- 97
octodns/record.py View File

@ -54,21 +54,30 @@ class Delete(Change):
return 'Delete {}'.format(self.existing)
_unescaped_semicolon_re = re.compile(r'\w;')
class ValidationError(Exception):
@classmethod
def build_message(cls, fqdn, reasons):
return 'Invalid record {}\n - {}'.format(fqdn, '\n - '.join(reasons))
def __init__(self, fqdn, reasons):
super(Exception, self).__init__(self.build_message(fqdn, reasons))
self.fqdn = fqdn
self.reasons = reasons
class Record(object):
log = getLogger('Record')
@classmethod
def new(cls, zone, name, data, source=None):
def new(cls, zone, name, data, source=None, lenient=False):
fqdn = '{}.{}'.format(name, zone.name) if name else zone.name
try:
_type = data['type']
except KeyError:
fqdn = '{}.{}'.format(name, zone.name) if name else zone.name
raise Exception('Invalid record {}, missing type'.format(fqdn))
try:
_type = {
_class = {
'A': ARecord,
'AAAA': AaaaRecord,
'ALIAS': AliasRecord,
@ -98,7 +107,24 @@ class Record(object):
}[_type]
except KeyError:
raise Exception('Unknown record type: "{}"'.format(_type))
return _type(zone, name, data, source=source)
reasons = _class.validate(name, data)
if reasons:
if lenient:
cls.log.warn(ValidationError.build_message(fqdn, reasons))
else:
raise ValidationError(fqdn, reasons)
return _class(zone, name, data, source=source)
@classmethod
def validate(cls, name, data):
reasons = []
try:
ttl = int(data['ttl'])
if ttl < 0:
reasons.append('invalid ttl')
except KeyError:
reasons.append('missing ttl')
return reasons
def __init__(self, zone, name, data, source=None):
self.log.debug('__init__: zone.name=%s, type=%11s, name=%s', zone.name,
@ -106,11 +132,8 @@ class Record(object):
self.zone = zone
# force everything lower-case just to be safe
self.name = str(name).lower() if name else name
try:
self.ttl = int(data['ttl'])
except KeyError:
raise Exception('Invalid record {}, missing ttl'.format(self.fqdn))
self.source = source
self.ttl = int(data['ttl'])
octodns = data.get('octodns', {})
self.ignored = octodns.get('ignored', False)
@ -154,11 +177,17 @@ class GeoValue(object):
geo_re = re.compile(r'^(?P<continent_code>\w\w)(-(?P<country_code>\w\w)'
r'(-(?P<subdivision_code>\w\w))?)?$')
def __init__(self, geo, values):
match = self.geo_re.match(geo)
@classmethod
def _validate_geo(cls, code):
reasons = []
match = cls.geo_re.match(code)
if not match:
raise Exception('Invalid geo "{}"'.format(geo))
reasons.append('invalid geo "{}"'.format(code))
return reasons
def __init__(self, geo, values):
self.code = geo
match = self.geo_re.match(geo)
self.continent_code = match.group('continent_code')
self.country_code = match.group('country_code')
self.subdivision_code = match.group('subdivision_code')
@ -185,16 +214,29 @@ class GeoValue(object):
class _ValuesMixin(object):
def __init__(self, zone, name, data, source=None):
super(_ValuesMixin, self).__init__(zone, name, data, source=source)
@classmethod
def validate(cls, name, data):
reasons = super(_ValuesMixin, cls).validate(name, data)
values = []
try:
values = data['values']
except KeyError:
try:
values = [data['value']]
except KeyError:
raise Exception('Invalid record {}, missing value(s)'
.format(self.fqdn))
reasons.append('missing value(s)')
for value in values:
reasons.extend(cls._validate_value(value))
return reasons
def __init__(self, zone, name, data, source=None):
super(_ValuesMixin, self).__init__(zone, name, data, source=source)
try:
values = data['values']
except KeyError:
values = [data['value']]
self.values = sorted(self._process_values(values))
def changes(self, other, target):
@ -224,6 +266,21 @@ class _GeoMixin(_ValuesMixin):
Must be included before `Record`.
'''
@classmethod
def validate(cls, name, data):
reasons = super(_GeoMixin, cls).validate(name, data)
try:
geo = dict(data['geo'])
# TODO: validate legal codes
for code, values in geo.items():
reasons.extend(GeoValue._validate_geo(code))
for value in values:
reasons.extend(cls._validate_value(value))
except KeyError:
pass
return reasons
# TODO: support 'value' as well
# TODO: move away from "data" hash to strict params, it's kind of leaking
# the yaml implementation into here and then forcing it back out into
# non-yaml providers during input
@ -233,9 +290,8 @@ class _GeoMixin(_ValuesMixin):
self.geo = dict(data['geo'])
except KeyError:
self.geo = {}
for k, vs in self.geo.items():
vs = sorted(self._process_values(vs))
self.geo[k] = GeoValue(k, vs)
for code, values in self.geo.items():
self.geo[code] = GeoValue(code, values)
def _data(self):
ret = super(_GeoMixin, self)._data()
@ -264,41 +320,52 @@ class _GeoMixin(_ValuesMixin):
class ARecord(_GeoMixin, Record):
_type = 'A'
@classmethod
def _validate_value(self, value):
reasons = []
try:
IPv4Address(unicode(value))
except Exception:
reasons.append('invalid ip address "{}"'.format(value))
return reasons
def _process_values(self, values):
for ip in values:
try:
IPv4Address(unicode(ip))
except Exception:
raise Exception('Invalid record {}, value {} not a valid ip'
.format(self.fqdn, ip))
return values
class AaaaRecord(_GeoMixin, Record):
_type = 'AAAA'
@classmethod
def _validate_value(self, value):
reasons = []
try:
IPv6Address(unicode(value))
except Exception:
reasons.append('invalid ip address "{}"'.format(value))
return reasons
def _process_values(self, values):
ret = []
for ip in values:
try:
IPv6Address(unicode(ip))
ret.append(ip.lower())
except Exception:
raise Exception('Invalid record {}, value {} not a valid ip'
.format(self.fqdn, ip))
return ret
return values
class _ValueMixin(object):
def __init__(self, zone, name, data, source=None):
super(_ValueMixin, self).__init__(zone, name, data, source=source)
@classmethod
def validate(cls, name, data):
reasons = super(_ValueMixin, cls).validate(name, data)
value = None
try:
value = data['value']
except KeyError:
raise Exception('Invalid record {}, missing value'
.format(self.fqdn))
self.value = self._process_value(value)
reasons.append('missing value')
if value:
reasons.extend(cls._validate_value(value))
return reasons
def __init__(self, zone, name, data, source=None):
super(_ValueMixin, self).__init__(zone, name, data, source=source)
self.value = self._process_value(data['value'])
def changes(self, other, target):
if self.value != other.value:
@ -319,25 +386,50 @@ class _ValueMixin(object):
class AliasRecord(_ValueMixin, Record):
_type = 'ALIAS'
def _process_value(self, value):
@classmethod
def _validate_value(self, value):
reasons = []
if not value.endswith('.'):
raise Exception('Invalid record {}, value ({}) missing trailing .'
.format(self.fqdn, value))
reasons.append('missing trailing .')
return reasons
def _process_value(self, value):
return value
class CnameRecord(_ValueMixin, Record):
_type = 'CNAME'
def _process_value(self, value):
@classmethod
def validate(cls, name, data):
reasons = []
if name == '':
reasons.append('root CNAME not allowed')
reasons.extend(super(CnameRecord, cls).validate(name, data))
return reasons
@classmethod
def _validate_value(cls, value):
reasons = []
if not value.endswith('.'):
raise Exception('Invalid record {}, value ({}) missing trailing .'
.format(self.fqdn, value))
return value.lower()
reasons.append('missing trailing .')
return reasons
def _process_value(self, value):
return value
class MxValue(object):
@classmethod
def _validate_value(cls, value):
reasons = []
if 'priority' not in value:
reasons.append('missing priority')
if 'value' not in value:
reasons.append('missing value')
return reasons
def __init__(self, value):
# TODO: rename preference
self.priority = int(value['priority'])
@ -363,19 +455,38 @@ class MxValue(object):
class MxRecord(_ValuesMixin, Record):
_type = 'MX'
@classmethod
def _validate_value(cls, value):
return MxValue._validate_value(value)
def _process_values(self, values):
ret = []
for value in values:
try:
ret.append(MxValue(value))
except KeyError as e:
raise Exception('Invalid value in record {}, missing {}'
.format(self.fqdn, e.args[0]))
return ret
return [MxValue(v) for v in values]
class NaptrValue(object):
@classmethod
def _validate_value(cls, data):
reasons = []
try:
int(data['order'])
except KeyError:
reasons.append('missing order')
except ValueError:
reasons.append('invalid order "{}"'.format(data['order']))
try:
int(data['preference'])
except KeyError:
reasons.append('missing preference')
except ValueError:
reasons.append('invalid preference "{}"'
.format(data['preference']))
# TODO: validate field data
for k in ('flags', 'service', 'regexp', 'replacement'):
if k not in data:
reasons.append('missing {}'.format(k))
return reasons
def __init__(self, value):
self.order = int(value['order'])
self.preference = int(value['preference'])
@ -420,42 +531,65 @@ class NaptrValue(object):
class NaptrRecord(_ValuesMixin, Record):
_type = 'NAPTR'
@classmethod
def _validate_value(cls, value):
return NaptrValue._validate_value(value)
def _process_values(self, values):
ret = []
for value in values:
try:
ret.append(NaptrValue(value))
except KeyError as e:
raise Exception('Invalid value in record {}, missing {}'
.format(self.fqdn, e.args[0]))
return ret
return [NaptrValue(v) for v in values]
class NsRecord(_ValuesMixin, Record):
_type = 'NS'
@classmethod
def _validate_value(cls, value):
reasons = []
if not value.endswith('.'):
reasons.append('missing trailing .')
return reasons
def _process_values(self, values):
ret = []
for ns in values:
if not ns.endswith('.'):
raise Exception('Invalid record {}, value {} missing '
'trailing .'.format(self.fqdn, ns))
ret.append(ns.lower())
return ret
return values
class PtrRecord(_ValueMixin, Record):
_type = 'PTR'
def _process_value(self, value):
@classmethod
def _validate_value(cls, value):
reasons = []
if not value.endswith('.'):
raise Exception('Invalid record {}, value ({}) missing trailing .'
.format(self.fqdn, value))
return value.lower()
reasons.append('missing trailing .')
return reasons
def _process_value(self, value):
return value
class SshfpValue(object):
@classmethod
def _validate_value(cls, value):
reasons = []
# TODO: validate algorithm and fingerprint_type values
try:
int(value['algorithm'])
except KeyError:
reasons.append('missing algorithm')
except ValueError:
reasons.append('invalid algorithm "{}"'.format(value['algorithm']))
try:
int(value['fingerprint_type'])
except KeyError:
reasons.append('missing fingerprint_type')
except ValueError:
reasons.append('invalid fingerprint_type "{}"'
.format(value['fingerprint_type']))
if 'fingerprint' not in value:
reasons.append('missing fingerprint')
return reasons
def __init__(self, value):
self.algorithm = int(value['algorithm'])
self.fingerprint_type = int(value['fingerprint_type'])
@ -484,26 +618,61 @@ class SshfpValue(object):
class SshfpRecord(_ValuesMixin, Record):
_type = 'SSHFP'
@classmethod
def _validate_value(cls, value):
return SshfpValue._validate_value(value)
def _process_values(self, values):
ret = []
for value in values:
try:
ret.append(SshfpValue(value))
except KeyError as e:
raise Exception('Invalid value in record {}, missing {}'
.format(self.fqdn, e.args[0]))
return ret
return [SshfpValue(v) for v in values]
_unescaped_semicolon_re = re.compile(r'\w;')
class SpfRecord(_ValuesMixin, Record):
_type = 'SPF'
@classmethod
def _validate_value(cls, value):
if _unescaped_semicolon_re.search(value):
return ['unescaped ;']
return []
def _process_values(self, values):
return values
class SrvValue(object):
@classmethod
def _validate_value(self, value):
reasons = []
# TODO: validate algorithm and fingerprint_type values
try:
int(value['priority'])
except KeyError:
reasons.append('missing priority')
except ValueError:
reasons.append('invalid priority "{}"'.format(value['priority']))
try:
int(value['weight'])
except KeyError:
reasons.append('missing weight')
except ValueError:
reasons.append('invalid weight "{}"'.format(value['weight']))
try:
int(value['port'])
except KeyError:
reasons.append('missing port')
except ValueError:
reasons.append('invalid port "{}"'.format(value['port']))
try:
if not value['target'].endswith('.'):
reasons.append('missing trailing .')
except KeyError:
reasons.append('missing target')
return reasons
def __init__(self, value):
self.priority = int(value['priority'])
self.weight = int(value['weight'])
@ -537,28 +706,30 @@ class SrvRecord(_ValuesMixin, Record):
_type = 'SRV'
_name_re = re.compile(r'^_[^\.]+\.[^\.]+')
def __init__(self, zone, name, data, source=None):
if not self._name_re.match(name):
raise Exception('Invalid name {}.{}'.format(name, zone.name))
super(SrvRecord, self).__init__(zone, name, data, source)
@classmethod
def validate(cls, name, data):
reasons = []
if not cls._name_re.match(name):
reasons.append('invalid name')
reasons.extend(super(SrvRecord, cls).validate(name, data))
return reasons
@classmethod
def _validate_value(cls, value):
return SrvValue._validate_value(value)
def _process_values(self, values):
ret = []
for value in values:
try:
ret.append(SrvValue(value))
except KeyError as e:
raise Exception('Invalid value in record {}, missing {}'
.format(self.fqdn, e.args[0]))
return ret
return [SrvValue(v) for v in values]
class TxtRecord(_ValuesMixin, Record):
_type = 'TXT'
@classmethod
def _validate_value(cls, value):
if _unescaped_semicolon_re.search(value):
return ['unescaped ;']
return []
def _process_values(self, values):
for value in values:
if _unescaped_semicolon_re.search(value):
raise Exception('Invalid record {}, unescaped ;'
.format(self.fqdn))
return values

+ 8
- 0
octodns/source/base.py View File

@ -23,6 +23,14 @@ class BaseSource(object):
def populate(self, zone, target=False):
'''
Loads all zones the provider knows about
When `target` is True the populate call is being made to load the
current state of the provider.
When `lenient` is True the populate call may skip record validation and
do a "best effort" load of data. That will allow through some common,
but not best practices stuff that we otherwise would reject. E.g. no
trailing . or mising escapes for ;.
'''
raise NotImplementedError('Abstract base class, populate method '
'missing')


+ 11
- 8
octodns/source/tinydns.py View File

@ -81,19 +81,21 @@ class TinyDnsBaseSource(BaseSource):
'values': ['{}.'.format(r[0]) for r in records]
}
def populate(self, zone, target=False):
self.log.debug('populate: zone=%s', zone.name)
def populate(self, zone, target=False, lenient=False):
self.log.debug('populate: name=%s, target=%s, lenient=%s', zone.name,
target, lenient)
before = len(zone.records)
if zone.name.endswith('in-addr.arpa.'):
self._populate_in_addr_arpa(zone)
self._populate_in_addr_arpa(zone, lenient)
else:
self._populate_normal(zone)
self._populate_normal(zone, lenient)
self.log.info('populate: found %s records',
len(zone.records) - before)
def _populate_normal(self, zone):
def _populate_normal(self, zone, lenient):
type_map = {
'=': 'A',
'^': None,
@ -129,14 +131,15 @@ class TinyDnsBaseSource(BaseSource):
data_for = getattr(self, '_data_for_{}'.format(_type))
data = data_for(_type, d)
if data:
record = Record.new(zone, name, data, source=self)
record = Record.new(zone, name, data, source=self,
lenient=lenient)
try:
zone.add_record(record)
except SubzoneRecordException:
self.log.debug('_populate_normal: skipping subzone '
'record=%s', record)
def _populate_in_addr_arpa(self, zone):
def _populate_in_addr_arpa(self, zone, lenient):
name_re = re.compile('(?P<name>.+)\.{}$'.format(zone.name[:-1]))
for line in self._lines():
@ -170,7 +173,7 @@ class TinyDnsBaseSource(BaseSource):
'ttl': ttl,
'type': 'PTR',
'value': value
}, source=self)
}, source=self, lenient=lenient)
try:
zone.add_record(record)
except DuplicateRecordException:


+ 15
- 3
octodns/zone.py View File

@ -19,6 +19,10 @@ class DuplicateRecordException(Exception):
pass
class InvalidNodeException(Exception):
pass
def _is_eligible(record):
# Should this record be considered when computing changes
# We ignore all top-level NS records
@ -59,9 +63,17 @@ class Zone(object):
raise SubzoneRecordException('Record {} a managed sub-zone '
'and not of type NS'
.format(record.fqdn))
if record in self.records:
raise DuplicateRecordException('Duplicate record {}, type {}'
.format(record.fqdn, record._type))
for existing in self.records:
if record == existing:
raise DuplicateRecordException('Duplicate record {}, type {}'
.format(record.fqdn,
record._type))
elif name == existing.name and (record._type == 'CNAME' or
existing._type == 'CNAME'):
raise InvalidNodeException('Invalid state, CNAME at {} '
'cannot coexist with other records'
.format(record.fqdn))
self.records.add(record)
def changes(self, desired, target):


+ 2
- 2
tests/helpers.py View File

@ -22,7 +22,7 @@ class SimpleProvider(object):
def __init__(self, id='test'):
pass
def populate(self, zone, source=True):
def populate(self, zone, source=False, lenient=False):
pass
def supports(self, record):
@ -38,7 +38,7 @@ class GeoProvider(object):
def __init__(self, id='test'):
pass
def populate(self, zone, source=True):
def populate(self, zone, source=False, lenient=False):
pass
def supports(self, record):


+ 3
- 3
tests/test_octodns_manager.py View File

@ -195,15 +195,15 @@ class TestManager(TestCase):
manager = Manager(get_config_filename('simple.yaml'))
with self.assertRaises(Exception) as ctx:
manager.dump('unit.tests.', tmpdir.dirname, 'nope')
manager.dump('unit.tests.', tmpdir.dirname, False, 'nope')
self.assertEquals('Unknown source: nope', ctx.exception.message)
manager.dump('unit.tests.', tmpdir.dirname, 'in')
manager.dump('unit.tests.', tmpdir.dirname, False, 'in')
# make sure this fails with an IOError and not a KeyError when
# tyring to find sub zones
with self.assertRaises(IOError):
manager.dump('unknown.zone.', tmpdir.dirname, 'in')
manager.dump('unknown.zone.', tmpdir.dirname, False, 'in')
def test_validate_configs(self):
Manager(get_config_filename('simple-validate.yaml')).validate_configs()


+ 2
- 2
tests/test_octodns_provider_base.py View File

@ -24,7 +24,7 @@ class HelperProvider(BaseProvider):
self.apply_disabled = apply_disabled
self.include_change_callback = include_change_callback
def populate(self, zone, target=False):
def populate(self, zone, target=False, lenient=False):
pass
def _include_change(self, change):
@ -72,7 +72,7 @@ class TestBaseProvider(TestCase):
class HasPopulate(HasSupports):
def populate(self, zone, target=False):
def populate(self, zone, target=False, lenient=False):
zone.add_record(Record.new(zone, '', {
'ttl': 60,
'type': 'A',


+ 7
- 5
tests/test_octodns_provider_route53.py View File

@ -370,7 +370,7 @@ class TestRoute53Provider(TestCase):
stubber.assert_no_pending_responses()
# Delete by monkey patching in a populate that includes an extra record
def add_extra_populate(existing, target):
def add_extra_populate(existing, target, lenient):
for record in self.expected.records:
existing.records.add(record)
record = Record.new(existing, 'extra',
@ -406,7 +406,7 @@ class TestRoute53Provider(TestCase):
# Update by monkey patching in a populate that modifies the A record
# with geos
def mod_geo_populate(existing, target):
def mod_geo_populate(existing, target, lenient):
for record in self.expected.records:
if record._type != 'A' or not record.geo:
existing.records.add(record)
@ -502,7 +502,7 @@ class TestRoute53Provider(TestCase):
# Update converting to non-geo by monkey patching in a populate that
# modifies the A record with geos
def mod_add_geo_populate(existing, target):
def mod_add_geo_populate(existing, target, lenient):
for record in self.expected.records:
if record._type != 'A' or record.geo:
existing.records.add(record)
@ -1260,8 +1260,10 @@ class TestRoute53Records(TestCase):
False)
self.assertEquals(c, c)
d = _Route53Record(None, Record.new(existing, '',
{'ttl': 42, 'type': 'CNAME',
'value': 'foo.bar.'}),
{'ttl': 42, 'type': 'MX',
'value': {
'priority': 10,
'value': 'foo.bar.'}}),
False)
self.assertEquals(d, d)


+ 666
- 166
tests/test_octodns_record.py View File

@ -9,7 +9,8 @@ from unittest import TestCase
from octodns.record import ARecord, AaaaRecord, AliasRecord, CnameRecord, \
Create, Delete, GeoValue, MxRecord, NaptrRecord, NaptrValue, NsRecord, \
PtrRecord, Record, SshfpRecord, SpfRecord, SrvRecord, TxtRecord, Update
Record, SshfpRecord, SpfRecord, SrvRecord, TxtRecord, Update, \
ValidationError
from octodns.zone import Zone
from helpers import GeoProvider, SimpleProvider
@ -42,15 +43,6 @@ class TestRecord(TestCase):
self.assertEquals([b_value], b.values)
self.assertEquals(b_data, b.data)
# missing ttl
with self.assertRaises(Exception) as ctx:
ARecord(self.zone, None, {'value': '1.1.1.1'})
self.assertTrue('missing ttl' in ctx.exception.message)
# missing values & value
with self.assertRaises(Exception) as ctx:
ARecord(self.zone, None, {'ttl': 42})
self.assertTrue('missing value(s)' in ctx.exception.message)
# top-level
data = {'ttl': 30, 'value': '4.2.3.4'}
self.assertEquals(self.zone.name, ARecord(self.zone, '', data).fqdn)
@ -104,20 +96,6 @@ class TestRecord(TestCase):
DummyRecord().__repr__()
def test_invalid_a(self):
with self.assertRaises(Exception) as ctx:
ARecord(self.zone, 'a', {
'ttl': 30,
'value': 'foo',
})
self.assertTrue('Invalid record' in ctx.exception.message)
with self.assertRaises(Exception) as ctx:
ARecord(self.zone, 'a', {
'ttl': 30,
'values': ['1.2.3.4', 'bar'],
})
self.assertTrue('Invalid record' in ctx.exception.message)
def test_geo(self):
geo_data = {'ttl': 42, 'values': ['5.2.3.4', '6.2.3.4'],
'geo': {'AF': ['1.1.1.1'],
@ -157,19 +135,6 @@ class TestRecord(TestCase):
# Geo provider does consider lack of geo diffs to be changes
self.assertTrue(geo.changes(other, geo_target))
# invalid geo code
with self.assertRaises(Exception) as ctx:
ARecord(self.zone, 'geo', {'ttl': 42,
'values': ['5.2.3.4', '6.2.3.4'],
'geo': {'abc': ['1.1.1.1']}})
self.assertEquals('Invalid geo "abc"', ctx.exception.message)
with self.assertRaises(Exception) as ctx:
ARecord(self.zone, 'geo', {'ttl': 42,
'values': ['5.2.3.4', '6.2.3.4'],
'geo': {'NA-US': ['1.1.1']}})
self.assertTrue('not a valid ip' in ctx.exception.message)
# __repr__ doesn't blow up
geo.__repr__()
@ -187,30 +152,12 @@ class TestRecord(TestCase):
self.assertEquals([b_value], b.values)
self.assertEquals(b_data, b.data)
# missing values & value
with self.assertRaises(Exception) as ctx:
_type(self.zone, None, {'ttl': 42})
self.assertTrue('missing value(s)' in ctx.exception.message)
def test_aaaa(self):
a_values = ['2001:0db8:3c4d:0015:0000:0000:1a2f:1a2b',
'2001:0db8:3c4d:0015:0000:0000:1a2f:1a3b']
b_value = '2001:0db8:3c4d:0015:0000:0000:1a2f:1a4b'
self.assertMultipleValues(AaaaRecord, a_values, b_value)
with self.assertRaises(Exception) as ctx:
AaaaRecord(self.zone, 'a', {
'ttl': 30,
'value': 'foo',
})
self.assertTrue('Invalid record' in ctx.exception.message)
with self.assertRaises(Exception) as ctx:
AaaaRecord(self.zone, 'a', {
'ttl': 30,
'values': [b_value, 'bar'],
})
self.assertTrue('Invalid record' in ctx.exception.message)
def assertSingleValue(self, _type, a_value, b_value):
a_data = {'ttl': 30, 'value': a_value}
a = _type(self.zone, 'a', a_data)
@ -225,11 +172,6 @@ class TestRecord(TestCase):
self.assertEquals(b_value, b.value)
self.assertEquals(b_data, b.data)
# missing value
with self.assertRaises(Exception) as ctx:
_type(self.zone, None, {'ttl': 42})
self.assertTrue('missing value' in ctx.exception.message)
target = SimpleProvider()
# No changes with self
self.assertFalse(a.changes(a, target))
@ -251,15 +193,6 @@ class TestRecord(TestCase):
self.assertEquals(a_data['value'], a.value)
self.assertEquals(a_data, a.data)
# missing value
with self.assertRaises(Exception) as ctx:
AliasRecord(self.zone, None, {'ttl': 0})
self.assertTrue('missing value' in ctx.exception.message)
# bad name
with self.assertRaises(Exception) as ctx:
AliasRecord(self.zone, None, {'ttl': 0, 'value': 'www.unit.tests'})
self.assertTrue('missing trailing .' in ctx.exception.message)
target = SimpleProvider()
# No changes with self
self.assertFalse(a.changes(a, target))
@ -277,19 +210,6 @@ class TestRecord(TestCase):
self.assertSingleValue(CnameRecord, 'target.foo.com.',
'other.foo.com.')
with self.assertRaises(Exception) as ctx:
CnameRecord(self.zone, 'a', {
'ttl': 30,
'value': 'foo',
})
self.assertTrue('Invalid record' in ctx.exception.message)
with self.assertRaises(Exception) as ctx:
CnameRecord(self.zone, 'a', {
'ttl': 30,
'values': ['foo.com.', 'bar.com'],
})
self.assertTrue('Invalid record' in ctx.exception.message)
def test_mx(self):
a_values = [{
'priority': 10,
@ -319,15 +239,6 @@ class TestRecord(TestCase):
self.assertEquals(b_value['value'], b.values[0].value)
self.assertEquals(b_data, b.data)
# missing value
with self.assertRaises(Exception) as ctx:
MxRecord(self.zone, None, {'ttl': 42})
self.assertTrue('missing value(s)' in ctx.exception.message)
# invalid value
with self.assertRaises(Exception) as ctx:
MxRecord(self.zone, None, {'ttl': 42, 'value': {}})
self.assertTrue('Invalid value' in ctx.exception.message)
target = SimpleProvider()
# No changes with self
self.assertFalse(a.changes(a, target))
@ -387,15 +298,6 @@ class TestRecord(TestCase):
self.assertEquals(b_value[k], getattr(b.values[0], k))
self.assertEquals(b_data, b.data)
# missing value
with self.assertRaises(Exception) as ctx:
NaptrRecord(self.zone, None, {'ttl': 42})
self.assertTrue('missing value' in ctx.exception.message)
# invalid value
with self.assertRaises(Exception) as ctx:
NaptrRecord(self.zone, None, {'ttl': 42, 'value': {}})
self.assertTrue('Invalid value' in ctx.exception.message)
target = SimpleProvider()
# No changes with self
self.assertFalse(a.changes(a, target))
@ -538,33 +440,6 @@ class TestRecord(TestCase):
self.assertEquals([b_value], b.values)
self.assertEquals(b_data, b.data)
# missing values & value
with self.assertRaises(Exception) as ctx:
NsRecord(self.zone, None, {'ttl': 42})
self.assertTrue('missing value(s)' in ctx.exception.message)
with self.assertRaises(Exception) as ctx:
NsRecord(self.zone, 'a', {
'ttl': 30,
'value': 'foo',
})
self.assertTrue('Invalid record' in ctx.exception.message)
with self.assertRaises(Exception) as ctx:
NsRecord(self.zone, 'a', {
'ttl': 30,
'values': ['foo.com.', 'bar.com'],
})
self.assertTrue('Invalid record' in ctx.exception.message)
def test_ptr(self):
self.assertSingleValue(PtrRecord, 'foo.bar.com.', 'other.bar.com.')
with self.assertRaises(Exception) as ctx:
PtrRecord(self.zone, 'a', {
'ttl': 30,
'value': 'foo',
})
self.assertTrue('Invalid record' in ctx.exception.message)
def test_sshfp(self):
a_values = [{
'algorithm': 10,
@ -599,15 +474,6 @@ class TestRecord(TestCase):
self.assertEquals(b_value['fingerprint'], b.values[0].fingerprint)
self.assertEquals(b_data, b.data)
# missing value
with self.assertRaises(Exception) as ctx:
SshfpRecord(self.zone, None, {'ttl': 42})
self.assertTrue('missing value(s)' in ctx.exception.message)
# invalid value
with self.assertRaises(Exception) as ctx:
SshfpRecord(self.zone, None, {'ttl': 42, 'value': {}})
self.assertTrue('Invalid value' in ctx.exception.message)
target = SimpleProvider()
# No changes with self
self.assertFalse(a.changes(a, target))
@ -677,21 +543,6 @@ class TestRecord(TestCase):
self.assertEquals(b_value['target'], b.values[0].target)
self.assertEquals(b_data, b.data)
# invalid name
with self.assertRaises(Exception) as ctx:
SrvRecord(self.zone, 'bad', {'ttl': 42})
self.assertEquals('Invalid name bad.unit.tests.',
ctx.exception.message)
# missing value
with self.assertRaises(Exception) as ctx:
SrvRecord(self.zone, '_missing._tcp', {'ttl': 42})
self.assertTrue('missing value(s)' in ctx.exception.message)
# invalid value
with self.assertRaises(Exception) as ctx:
SrvRecord(self.zone, '_missing._udp', {'ttl': 42, 'value': {}})
self.assertTrue('Invalid value' in ctx.exception.message)
target = SimpleProvider()
# No changes with self
self.assertFalse(a.changes(a, target))
@ -729,21 +580,6 @@ class TestRecord(TestCase):
b_value = 'b other'
self.assertMultipleValues(TxtRecord, a_values, b_value)
Record.new(self.zone, 'txt', {
'ttl': 44,
'type': 'TXT',
'value': 'escaped\; foo',
})
with self.assertRaises(Exception) as ctx:
Record.new(self.zone, 'txt', {
'ttl': 44,
'type': 'TXT',
'value': 'un-escaped; foo',
})
self.assertEquals('Invalid record txt.unit.tests., unescaped ;',
ctx.exception.message)
def test_record_new(self):
txt = Record.new(self.zone, 'txt', {
'ttl': 44,
@ -794,3 +630,667 @@ class TestRecord(TestCase):
self.assertEquals('CA', geo.subdivision_code)
self.assertEquals(values, geo.values)
self.assertEquals(['NA-US', 'NA'], list(geo.parents))
class TestRecordValidation(TestCase):
zone = Zone('unit.tests.', [])
def test_base(self):
# no ttl
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'A',
'value': '1.2.3.4',
})
self.assertEquals(['missing ttl'], ctx.exception.reasons)
# invalid ttl
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'www', {
'type': 'A',
'ttl': -1,
'value': '1.2.3.4',
})
self.assertEquals('www.unit.tests.', ctx.exception.fqdn)
self.assertEquals(['invalid ttl'], ctx.exception.reasons)
# no exception if we're in lenient mode
Record.new(self.zone, 'www', {
'type': 'A',
'ttl': -1,
'value': '1.2.3.4',
}, lenient=True)
# __init__ may still blow up, even if validation is lenient
with self.assertRaises(KeyError) as ctx:
Record.new(self.zone, 'www', {
'type': 'A',
'ttl': -1,
}, lenient=True)
self.assertEquals(('value',), ctx.exception.args)
def test_A_and_values_mixin(self):
# doesn't blow up
Record.new(self.zone, '', {
'type': 'A',
'ttl': 600,
'value': '1.2.3.4',
})
Record.new(self.zone, '', {
'type': 'A',
'ttl': 600,
'values': [
'1.2.3.4',
'1.2.3.5',
]
})
# missing value(s)
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'A',
'ttl': 600,
})
self.assertEquals(['missing value(s)'], ctx.exception.reasons)
# missing value(s) & ttl
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'A',
})
self.assertEquals(['missing ttl', 'missing value(s)'],
ctx.exception.reasons)
# invalid ip address
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'A',
'ttl': 600,
'value': 'hello'
})
self.assertEquals(['invalid ip address "hello"'],
ctx.exception.reasons)
# invalid ip addresses
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'A',
'ttl': 600,
'values': ['hello', 'goodbye']
})
self.assertEquals([
'invalid ip address "hello"',
'invalid ip address "goodbye"'
], ctx.exception.reasons)
# invalid & valid ip addresses, no ttl
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'A',
'values': ['1.2.3.4', 'hello', '5.6.7.8']
})
self.assertEquals([
'missing ttl',
'invalid ip address "hello"',
], ctx.exception.reasons)
def test_geo(self):
Record.new(self.zone, '', {
'geo': {
'NA': ['1.2.3.5'],
'NA-US': ['1.2.3.5', '1.2.3.6']
},
'type': 'A',
'ttl': 600,
'value': '1.2.3.4',
})
# invalid ip address
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'geo': {
'NA': ['hello'],
'NA-US': ['1.2.3.5', '1.2.3.6']
},
'type': 'A',
'ttl': 600,
'value': '1.2.3.4',
})
self.assertEquals(['invalid ip address "hello"'],
ctx.exception.reasons)
# invalid geo code
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'geo': {
'XYZ': ['1.2.3.4'],
},
'type': 'A',
'ttl': 600,
'value': '1.2.3.4',
})
self.assertEquals(['invalid geo "XYZ"'], ctx.exception.reasons)
# invalid ip address
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'geo': {
'NA': ['hello'],
'NA-US': ['1.2.3.5', 'goodbye']
},
'type': 'A',
'ttl': 600,
'value': '1.2.3.4',
})
self.assertEquals([
'invalid ip address "hello"',
'invalid ip address "goodbye"'
], ctx.exception.reasons)
def test_AAAA(self):
# doesn't blow up
Record.new(self.zone, '', {
'type': 'AAAA',
'ttl': 600,
'value': '2601:644:500:e210:62f8:1dff:feb8:947a',
})
Record.new(self.zone, '', {
'type': 'AAAA',
'ttl': 600,
'values': [
'2601:644:500:e210:62f8:1dff:feb8:947a',
'2601:644:500:e210:62f8:1dff:feb8:947b',
]
})
# invalid ip address
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'AAAA',
'ttl': 600,
'value': 'hello'
})
self.assertEquals(['invalid ip address "hello"'],
ctx.exception.reasons)
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'AAAA',
'ttl': 600,
'value': '1.2.3.4'
})
self.assertEquals(['invalid ip address "1.2.3.4"'],
ctx.exception.reasons)
# invalid ip addresses
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'AAAA',
'ttl': 600,
'values': ['hello', 'goodbye']
})
self.assertEquals([
'invalid ip address "hello"',
'invalid ip address "goodbye"'
], ctx.exception.reasons)
def test_ALIAS_and_value_mixin(self):
# doesn't blow up
Record.new(self.zone, '', {
'type': 'ALIAS',
'ttl': 600,
'value': 'foo.bar.com.',
})
# missing value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'ALIAS',
'ttl': 600,
})
self.assertEquals(['missing value'], ctx.exception.reasons)
# missing trailing .
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'ALIAS',
'ttl': 600,
'value': 'foo.bar.com',
})
self.assertEquals(['missing trailing .'], ctx.exception.reasons)
def test_CNAME(self):
# doesn't blow up
Record.new(self.zone, 'www', {
'type': 'CNAME',
'ttl': 600,
'value': 'foo.bar.com.',
})
# root cname is a no-no
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'CNAME',
'ttl': 600,
'value': 'foo.bar.com.',
})
self.assertEquals(['root CNAME not allowed'], ctx.exception.reasons)
# missing trailing .
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'www', {
'type': 'CNAME',
'ttl': 600,
'value': 'foo.bar.com',
})
self.assertEquals(['missing trailing .'], ctx.exception.reasons)
def test_MX(self):
# doesn't blow up
Record.new(self.zone, '', {
'type': 'MX',
'ttl': 600,
'value': {
'priority': 10,
'value': 'foo.bar.com.'
}
})
# missing priority
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'MX',
'ttl': 600,
'value': {
'value': 'foo.bar.com.'
}
})
self.assertEquals(['missing priority'], ctx.exception.reasons)
# missing value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'MX',
'ttl': 600,
'value': {
'priority': 10,
}
})
self.assertEquals(['missing value'], ctx.exception.reasons)
def test_NXPTR(self):
# doesn't blow up
Record.new(self.zone, '', {
'type': 'NAPTR',
'ttl': 600,
'value': {
'order': 10,
'preference': 20,
'flags': 'f',
'service': 'srv',
'regexp': '.*',
'replacement': '.'
}
})
# missing X priority
value = {
'order': 10,
'preference': 20,
'flags': 'f',
'service': 'srv',
'regexp': '.*',
'replacement': '.'
}
for k in ('order', 'preference', 'flags', 'service', 'regexp',
'replacement'):
v = dict(value)
del v[k]
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'NAPTR',
'ttl': 600,
'value': v
})
self.assertEquals(['missing {}'.format(k)], ctx.exception.reasons)
# non-int order
v = dict(value)
v['order'] = 'boo'
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'NAPTR',
'ttl': 600,
'value': v
})
self.assertEquals(['invalid order "boo"'], ctx.exception.reasons)
# non-int preference
v = dict(value)
v['preference'] = 'who'
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'NAPTR',
'ttl': 600,
'value': v
})
self.assertEquals(['invalid preference "who"'], ctx.exception.reasons)
def test_NS(self):
# doesn't blow up
Record.new(self.zone, '', {
'type': 'NS',
'ttl': 600,
'values': [
'foo.bar.com.',
'1.2.3.4.'
]
})
# missing value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'NS',
'ttl': 600,
})
self.assertEquals(['missing value(s)'], ctx.exception.reasons)
# no trailing .
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'NS',
'ttl': 600,
'value': 'foo.bar',
})
self.assertEquals(['missing trailing .'], ctx.exception.reasons)
def test_PTR(self):
# doesn't blow up (name & zone here don't make any sense, but not
# important)
Record.new(self.zone, '', {
'type': 'PTR',
'ttl': 600,
'value': 'foo.bar.com.',
})
# missing value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'PTR',
'ttl': 600,
})
self.assertEquals(['missing value'], ctx.exception.reasons)
# no trailing .
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'PTR',
'ttl': 600,
'value': 'foo.bar',
})
self.assertEquals(['missing trailing .'], ctx.exception.reasons)
def test_SSHFP(self):
# doesn't blow up
Record.new(self.zone, '', {
'type': 'SSHFP',
'ttl': 600,
'value': {
'algorithm': 1,
'fingerprint_type': 1,
'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73'
}
})
# missing algorithm
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'SSHFP',
'ttl': 600,
'value': {
'fingerprint_type': 1,
'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73'
}
})
self.assertEquals(['missing algorithm'], ctx.exception.reasons)
# invalid algorithm
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'SSHFP',
'ttl': 600,
'value': {
'algorithm': 'nope',
'fingerprint_type': 1,
'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73'
}
})
self.assertEquals(['invalid algorithm "nope"'], ctx.exception.reasons)
# missing fingerprint_type
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'SSHFP',
'ttl': 600,
'value': {
'algorithm': 1,
'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73'
}
})
self.assertEquals(['missing fingerprint_type'], ctx.exception.reasons)
# invalid fingerprint_type
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'SSHFP',
'ttl': 600,
'value': {
'algorithm': 1,
'fingerprint_type': 'yeeah',
'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73'
}
})
self.assertEquals(['invalid fingerprint_type "yeeah"'],
ctx.exception.reasons)
# missing fingerprint
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'SSHFP',
'ttl': 600,
'value': {
'algorithm': 1,
'fingerprint_type': 1,
}
})
self.assertEquals(['missing fingerprint'], ctx.exception.reasons)
def test_SPF(self):
# doesn't blow up (name & zone here don't make any sense, but not
# important)
Record.new(self.zone, '', {
'type': 'SPF',
'ttl': 600,
'values': [
'v=spf1 ip4:192.168.0.1/16-all',
'v=spf1 ip4:10.1.2.1/24-all',
'this has some\; semi-colons\; in it',
]
})
# missing value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'SPF',
'ttl': 600,
})
self.assertEquals(['missing value(s)'], ctx.exception.reasons)
# missing escapes
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'SPF',
'ttl': 600,
'value': 'this has some; semi-colons\; in it',
})
self.assertEquals(['unescaped ;'], ctx.exception.reasons)
def test_SRV(self):
# doesn't blow up
Record.new(self.zone, '_srv._tcp', {
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 2,
'port': 3,
'target': 'foo.bar.baz.'
}
})
# invalid name
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'neup', {
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 2,
'port': 3,
'target': 'foo.bar.baz.'
}
})
self.assertEquals(['invalid name'], ctx.exception.reasons)
# missing priority
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '_srv._tcp', {
'type': 'SRV',
'ttl': 600,
'value': {
'weight': 2,
'port': 3,
'target': 'foo.bar.baz.'
}
})
self.assertEquals(['missing priority'], ctx.exception.reasons)
# invalid priority
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '_srv._tcp', {
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 'foo',
'weight': 2,
'port': 3,
'target': 'foo.bar.baz.'
}
})
self.assertEquals(['invalid priority "foo"'], ctx.exception.reasons)
# missing weight
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '_srv._tcp', {
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'port': 3,
'target': 'foo.bar.baz.'
}
})
self.assertEquals(['missing weight'], ctx.exception.reasons)
# invalid weight
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '_srv._tcp', {
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 'foo',
'port': 3,
'target': 'foo.bar.baz.'
}
})
self.assertEquals(['invalid weight "foo"'], ctx.exception.reasons)
# missing port
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '_srv._tcp', {
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 2,
'target': 'foo.bar.baz.'
}
})
self.assertEquals(['missing port'], ctx.exception.reasons)
# invalid port
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '_srv._tcp', {
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 2,
'port': 'foo',
'target': 'foo.bar.baz.'
}
})
self.assertEquals(['invalid port "foo"'], ctx.exception.reasons)
# missing target
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '_srv._tcp', {
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 2,
'port': 3,
}
})
self.assertEquals(['missing target'], ctx.exception.reasons)
# invalid target
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '_srv._tcp', {
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 2,
'port': 3,
'target': 'foo.bar.baz'
}
})
self.assertEquals(['missing trailing .'],
ctx.exception.reasons)
def test_TXT(self):
# doesn't blow up (name & zone here don't make any sense, but not
# important)
Record.new(self.zone, '', {
'type': 'TXT',
'ttl': 600,
'values': [
'hello world',
'this has some\; semi-colons\; in it',
]
})
# missing value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'TXT',
'ttl': 600,
})
self.assertEquals(['missing value(s)'], ctx.exception.reasons)
# missing escapes
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'TXT',
'ttl': 600,
'value': 'this has some; semi-colons\; in it',
})
self.assertEquals(['unescaped ;'], ctx.exception.reasons)

+ 26
- 1
tests/test_octodns_zone.py View File

@ -8,7 +8,8 @@ from __future__ import absolute_import, division, print_function, \
from unittest import TestCase
from octodns.record import ARecord, AaaaRecord, Create, Delete, Record, Update
from octodns.zone import DuplicateRecordException, SubzoneRecordException, Zone
from octodns.zone import DuplicateRecordException, InvalidNodeException, \
SubzoneRecordException, Zone
from helpers import SimpleProvider
@ -205,3 +206,27 @@ class TestZone(TestCase):
self.assertTrue(zone_missing.changes(zone_normal, provider))
self.assertFalse(zone_missing.changes(zone_ignored, provider))
def test_cname_coexisting(self):
zone = Zone('unit.tests.', [])
a = Record.new(zone, 'www', {
'ttl': 60,
'type': 'A',
'value': '9.9.9.9',
})
cname = Record.new(zone, 'www', {
'ttl': 60,
'type': 'CNAME',
'value': 'foo.bar.com.',
})
# add cname to a
zone.add_record(a)
with self.assertRaises(InvalidNodeException):
zone.add_record(cname)
# add a to cname
zone = Zone('unit.tests.', [])
zone.add_record(cname)
with self.assertRaises(InvalidNodeException):
zone.add_record(a)

Loading…
Cancel
Save