Browse Source

Merge pull request #1020 from octodns/full-tinydns-support

Full tinydns spec-compliant source implementation
pull/1024/head
Ross McFarland 2 years ago
committed by GitHub
parent
commit
49aca9db10
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 612 additions and 187 deletions
  1. +2
    -0
      CHANGELOG.md
  2. +4
    -0
      octodns/record/base.py
  3. +409
    -172
      octodns/source/tinydns.py
  4. +26
    -0
      octodns/zone.py
  5. +15
    -0
      tests/test_octodns_record.py
  6. +98
    -8
      tests/test_octodns_source_tinydns.py
  7. +25
    -0
      tests/test_octodns_zone.py
  8. +32
    -7
      tests/zones/tinydns/example.com
  9. +1
    -0
      tests/zones/tinydns/other.foo

+ 2
- 0
CHANGELOG.md View File

@ -6,6 +6,8 @@
* octodns-report access --lenient flag to allow running reports with records * octodns-report access --lenient flag to allow running reports with records
sourced from providers with non-compliant record data. sourced from providers with non-compliant record data.
* Correctly handle FQDNs in TinyDNS config files that end with trailing .'s * Correctly handle FQDNs in TinyDNS config files that end with trailing .'s
* Complete rewrite of TinyDnsBaseSource to fully implement the spec and the ipv6
extensions
## v1.0.0.rc0 - 2023-05-16 - First of the ones ## v1.0.0.rc0 - 2023-05-16 - First of the ones


+ 4
- 0
octodns/record/base.py View File

@ -123,6 +123,10 @@ class Record(EqualityTupleMixin):
return records return records
@classmethod
def parse_rdata_texts(cls, rdatas):
return [cls._value_type.parse_rdata_text(r) for r in rdatas]
def __init__(self, zone, name, data, source=None): def __init__(self, zone, name, data, source=None):
self.zone = zone self.zone = zone
if name: if name:


+ 409
- 172
octodns/source/tinydns.py View File

@ -3,7 +3,6 @@
# #
import logging import logging
import re
import textwrap import textwrap
from collections import defaultdict from collections import defaultdict
from ipaddress import ip_address from ipaddress import ip_address
@ -11,212 +10,445 @@ from os import listdir
from os.path import join from os.path import join
from ..record import Record from ..record import Record
from ..zone import DuplicateRecordException, SubzoneRecordException
from .base import BaseSource from .base import BaseSource
def _unique(values):
try:
# this will work if they're simple strings
return list(set(values))
except TypeError:
pass
# if they're dictionaries it's a bit more involved since dict's aren't
# hashable, based on https://stackoverflow.com/a/38521207
return [dict(s) for s in set(frozenset(v.items()) for v in values)]
class TinyDnsBaseSource(BaseSource): class TinyDnsBaseSource(BaseSource):
SUPPORTS_GEO = False SUPPORTS_GEO = False
SUPPORTS_DYNAMIC = False SUPPORTS_DYNAMIC = False
SUPPORTS = set(('A', 'CNAME', 'MX', 'NS', 'TXT', 'AAAA'))
split_re = re.compile(r':+')
def __init__(self, id, default_ttl=3600): def __init__(self, id, default_ttl=3600):
super().__init__(id) super().__init__(id)
self.default_ttl = default_ttl self.default_ttl = default_ttl
def _data_for_A(self, _type, records):
@property
def SUPPORTS(self):
# All record types, including those registered by 3rd party modules
return set(Record.registered_types().keys())
def _ttl_for(self, lines, index):
# see if we can find a ttl on any of the lines, first one wins
for line in lines:
try:
return int(line[index])
except IndexError:
pass
# and if we don't use the default
return self.default_ttl
def _records_for_at(self, zone, name, lines, arpa=False):
# @fqdn:ip:x:dist:ttl:timestamp:lo
# MX (and optional A)
if arpa:
# no arpa
return []
if not zone.owns('MX', name):
# if name doesn't live under our zone there's nothing for us to do
return
ttl = self._ttl_for(lines, 4)
values = [] values = []
for record in records:
if record[0] != '0.0.0.0':
values.append(record[0])
if len(values) == 0:
for line in lines:
mx = line[2]
# if there's a . in the mx we hit a special case and use it as-is
if '.' not in mx:
# otherwise we treat it as the MX hostnam and construct the rest
mx = f'{mx}.mx.{zone.name}'
elif mx[-1] != '.':
mx = f'{mx}.'
# default distance is 0
try:
dist = line[3] or 0
except IndexError:
dist = 0
# if we have an IP then we need to create an A for the MX
ip = line[1]
if ip and zone.owns('A', mx):
yield 'A', mx, ttl, [ip]
values.append({'preference': dist, 'exchange': mx})
yield 'MX', name, ttl, values
def _records_for_C(self, zone, name, lines, arpa=False):
# Cfqdn:p:ttl:timestamp:lo
# CNAME
if arpa:
# no arpa
return []
if not zone.owns('CNAME', name):
# if name doesn't live under our zone there's nothing for us to do
return return
try:
ttl = records[0][1]
except IndexError:
ttl = self.default_ttl
return {'ttl': ttl, 'type': _type, 'values': values}
def _data_for_AAAA(self, _type, records):
value = lines[0][1]
if value[-1] != '.':
value = f'{value}.'
ttl = self._ttl_for(lines, 2)
yield 'CNAME', name, ttl, [value]
def _records_for_caret(self, zone, name, lines, arpa=False):
# ^fqdn:p:ttl:timestamp:lo
# PTR, line may be a A/AAAA or straight PTR
if not arpa:
# we only operate on arpa
return []
names = defaultdict(list)
for line in lines:
if line[0].endswith('in-addr.arpa') or line[0].endswith(
'ip6.arpa.'
):
# it's a straight PTR record, already in in-addr.arpa format,
# 2nd item is the name it points to
name = line[0]
value = line[1]
else:
# it's not a PTR we need to build up the PTR data from what
# we're given
value = line[0]
addr = line[1]
if '.' not in addr:
addr = u':'.join(textwrap.wrap(line[1], 4))
addr = ip_address(addr)
name = addr.reverse_pointer
if value[-1] != '.':
value = f'{value}.'
names[name].append(value)
ttl = self._ttl_for(lines, 2)
for name, values in names.items():
if zone.owns('PTR', name):
yield 'PTR', name, ttl, values
def _records_for_equal(self, zone, name, lines, arpa=False):
# =fqdn:ip:ttl:timestamp:lo
# A (arpa False) & PTR (arpa True)
if arpa:
yield from self._records_for_caret(zone, name, lines, arpa)
else:
yield from self._records_for_plus(zone, name, lines, arpa)
def _records_for_dot(self, zone, name, lines, arpa=False):
# .fqdn:ip:x:ttl:timestamp:lo
# NS (and optional A)
if arpa:
# no arpa
return []
if not zone.owns('NS', name):
# if name doesn't live under our zone there's nothing for us to do
return
ttl = self._ttl_for(lines, 3)
values = [] values = []
for record in records:
for line in lines:
ns = line[2]
# if there's a . in the ns we hit a special case and use it as-is
if '.' not in ns:
# otherwise we treat it as the NS hostnam and construct the rest
ns = f'{ns}.ns.{zone.name}'
elif ns[-1] != '.':
ns = f'{ns}.'
# if we have an IP then we need to create an A for the MX
ip = line[1]
if ip and zone.owns('A', ns):
yield 'A', ns, ttl, [ip]
values.append(ns)
yield 'NS', name, ttl, values
_records_for_amp = _records_for_dot
def _records_for_plus(self, zone, name, lines, arpa=False):
# +fqdn:ip:ttl:timestamp:lo
# A
if arpa:
# no arpa
return []
if not zone.owns('A', name):
# if name doesn't live under our zone there's nothing for us to do
return
# collect our ip(s)
ips = [l[1] for l in lines if l[1] != '0.0.0.0']
if not ips:
# we didn't find any value ips so nothing to do
return []
ttl = self._ttl_for(lines, 2)
yield 'A', name, ttl, ips
def _records_for_quote(self, zone, name, lines, arpa=False):
# 'fqdn:s:ttl:timestamp:lo
# TXT
if arpa:
# no arpa
return []
if not zone.owns('TXT', name):
# if name doesn't live under our zone there's nothing for us to do
return
# collect our ip(s)
values = [
l[1].encode('latin1').decode('unicode-escape').replace(";", "\\;")
for l in lines
]
ttl = self._ttl_for(lines, 2)
yield 'TXT', name, ttl, values
def _records_for_three(self, zone, name, lines, arpa=False):
# 3fqdn:ip:ttl:timestamp:lo
# AAAA
if arpa:
# no arpa
return []
if not zone.owns('AAAA', name):
# if name doesn't live under our zone there's nothing for us to do
return
# collect our ip(s)
ips = []
for line in lines:
# TinyDNS files have the ipv6 address written in full, but with the # TinyDNS files have the ipv6 address written in full, but with the
# colons removed. This inserts a colon every 4th character to make # colons removed. This inserts a colon every 4th character to make
# the address correct. # the address correct.
values.append(u":".join(textwrap.wrap(record[0], 4)))
try:
ttl = records[0][1]
except IndexError:
ttl = self.default_ttl
return {'ttl': ttl, 'type': _type, 'values': values}
def _data_for_TXT(self, _type, records):
values = []
ips.append(u':'.join(textwrap.wrap(line[1], 4)))
ttl = self._ttl_for(lines, 2)
yield 'AAAA', name, ttl, ips
def _records_for_S(self, zone, name, lines, arpa=False):
# Sfqdn:ip:x:port:priority:weight:ttl:timestamp:lo
# SRV
if arpa:
# no arpa
return []
if not zone.owns('SRV', name):
# if name doesn't live under our zone there's nothing for us to do
return
ttl = self._ttl_for(lines, 6)
for record in records:
new_value = (
record[0]
.encode('latin1')
.decode('unicode-escape')
.replace(";", "\\;")
values = []
for line in lines:
target = line[2]
# if there's a . in the mx we hit a special case and use it as-is
if '.' not in target:
# otherwise we treat it as the MX hostnam and construct the rest
target = f'{target}.srv.{zone.name}'
elif target[-1] != '.':
target = f'{target}.'
# if we have an IP then we need to create an A for the SRV
# has to be present, but can be empty
ip = line[1]
if ip and zone.owns('A', target):
yield 'A', target, ttl, [ip]
# required
port = int(line[3])
# optional, default 0
try:
priority = int(line[4] or 0)
except IndexError:
priority = 0
# optional, default 0
try:
weight = int(line[5] or 0)
except IndexError:
weight = 0
values.append(
{
'priority': priority,
'weight': weight,
'port': port,
'target': target,
}
) )
values.append(new_value)
try:
ttl = records[0][1]
except IndexError:
ttl = self.default_ttl
return {'ttl': ttl, 'type': _type, 'values': values}
def _data_for_CNAME(self, _type, records):
first = records[0]
try:
ttl = first[1]
except IndexError:
ttl = self.default_ttl
return {'ttl': ttl, 'type': _type, 'value': f'{first[0]}.'}
def _data_for_MX(self, _type, records):
try:
ttl = records[0][2]
except IndexError:
ttl = self.default_ttl
return {
'ttl': ttl,
'type': _type,
'values': [
{'preference': r[1], 'exchange': f'{r[0]}.'} for r in records
],
}
def _data_for_NS(self, _type, records):
try:
ttl = records[0][1]
except IndexError:
ttl = self.default_ttl
return {
'ttl': ttl,
'type': _type,
'values': [f'{r[0]}.' for r in records],
}
def populate(self, zone, target=False, lenient=False):
self.log.debug(
'populate: name=%s, target=%s, lenient=%s',
zone.name,
target,
lenient,
)
yield 'SRV', name, ttl, values
before = len(zone.records)
def _records_for_colon(self, zone, name, lines, arpa=False):
# :fqdn:n:rdata:ttl:timestamp:lo
# ANY
if zone.name.endswith('in-addr.arpa.'):
self._populate_in_addr_arpa(zone, lenient)
else:
self._populate_normal(zone, lenient)
if arpa:
# no arpa
return []
self.log.info(
'populate: found %s records', len(zone.records) - before
)
if not zone.owns('SRV', name):
# if name doesn't live under our zone there's nothing for us to do
return
# group by lines by the record type
types = defaultdict(list)
for line in lines:
types[line[1].upper()].append(line)
classes = Record.registered_types()
for _type, lines in types.items():
_class = classes.get(_type, None)
if not _class:
self.log.info(
'_records_for_colon: unrecognized type %s, %s', _type, line
)
continue
def _populate_normal(self, zone, lenient):
type_map = {
'=': 'A',
'^': None,
'.': 'NS',
'C': 'CNAME',
'+': 'A',
'@': 'MX',
'\'': 'TXT',
'3': 'AAAA',
'6': 'AAAA',
}
name_re = re.compile(fr'((?P<name>.+)\.)?{zone.name[:-1]}\.?$')
ttl = self._ttl_for(lines, 3)
rdatas = [l[2] for l in lines]
yield _type, name, ttl, _class.parse_rdata_texts(rdatas)
def _records_for_six(self, zone, name, lines, arpa=False):
# 6fqdn:ip:ttl:timestamp:lo
# AAAA (arpa False) & PTR (arpa True)
if arpa:
yield from self._records_for_caret(zone, name, lines, arpa)
else:
yield from self._records_for_three(zone, name, lines, arpa)
SYMBOL_MAP = {
'=': _records_for_equal, # A
'^': _records_for_caret, # PTR
'.': _records_for_dot, # NS
'C': _records_for_C, # CNAME
'+': _records_for_plus, # A
'@': _records_for_at, # MX
'&': _records_for_amp, # NS
'\'': _records_for_quote, # TXT
'3': _records_for_three, # AAAA
'S': _records_for_S, # SRV
':': _records_for_colon, # arbitrary
'6': _records_for_six, # AAAA
}
def _process_lines(self, zone, lines):
data = defaultdict(lambda: defaultdict(list)) data = defaultdict(lambda: defaultdict(list))
for line in self._lines():
_type = line[0]
if _type not in type_map:
# Something we don't care about
continue
_type = type_map[_type]
if not _type:
continue
for line in lines:
symbol = line[0]
# Skip type, remove trailing comments, and omit newline # Skip type, remove trailing comments, and omit newline
line = line[1:].split('#', 1)[0] line = line[1:].split('#', 1)[0]
# Split on :'s including :: and strip leading/trailing ws # Split on :'s including :: and strip leading/trailing ws
line = [p.strip() for p in self.split_re.split(line)]
match = name_re.match(line[0])
if not match:
continue
name = zone.hostname_from_fqdn(line[0])
data[name][_type].append(line[1:])
for name, types in data.items():
for _type, d in types.items():
data_for = getattr(self, f'_data_for_{_type}')
data = data_for(_type, d)
if data:
record = Record.new(
zone, name, data, source=self, lenient=lenient
)
try:
zone.add_record(record, lenient=lenient)
except SubzoneRecordException:
self.log.debug(
'_populate_normal: skipping subzone record=%s',
record,
)
def _populate_in_addr_arpa(self, zone, lenient):
name_re = re.compile(fr'(?P<name>.+)\.{zone.name[:-1]}\.?$')
for line in self._lines():
_type = line[0]
# We're only interested in = (A+PTR), and ^ (PTR) records
if _type not in ('=', '^'):
line = [p.strip() for p in line.split(':')]
data[symbol][line[0]].append(line)
return data
def _process_symbols(self, zone, symbols, arpa):
types = defaultdict(lambda: defaultdict(list))
ttls = defaultdict(dict)
for symbol, names in symbols.items():
records_for = self.SYMBOL_MAP.get(symbol, None)
if not records_for:
# Something we don't care about
self.log.info(
'skipping type %s, not supported/interested', symbol
)
continue continue
# Skip type, remove trailing comments, and omit newline
line = line[1:].split('#', 1)[0]
# Split on :'s including :: and strip leading/trailing ws
line = [p.strip() for p in self.split_re.split(line)]
for name, lines in names.items():
for _type, name, ttl, values in records_for(
self, zone, name, lines, arpa=arpa
):
# remove the zone name
name = zone.hostname_from_fqdn(name)
types[_type][name].extend(values)
# first non-default wins, if we never see anything we'll
# just use the default below
if ttl != self.default_ttl:
ttls[_type][name] = ttl
if line[0].endswith('in-addr.arpa'):
# since it's already in in-addr.arpa format
match = name_re.match(line[0])
value = line[1]
else:
addr = ip_address(line[1])
match = name_re.match(addr.reverse_pointer)
value = line[0]
return types, ttls
if match:
try:
ttl = line[2]
except IndexError:
ttl = self.default_ttl
if value[-1] != '.':
value = f'{value}.'
name = match.group('name')
record = Record.new(
zone,
name,
{'ttl': ttl, 'type': 'PTR', 'value': value},
source=self,
lenient=lenient,
)
try:
zone.add_record(record, lenient=lenient)
except DuplicateRecordException:
self.log.warning(
f'Duplicate PTR record for {addr}, skipping'
)
def populate(self, zone, target=False, lenient=False):
self.log.debug(
'populate: name=%s, target=%s, lenient=%s',
zone.name,
target,
lenient,
)
before = len(zone.records)
# This is complicate b/c the mapping between tinydns line types (called
# symbols here) is not one to one with (octoDNS) records. Some lines
# create multiple types of records and multiple lines are often combined
# to make a single record (with multiple values.) Sometimes both happen.
# To deal with this we'll do things in 3 stages:
# first group lines by their symbol and name
symbols = self._process_lines(zone, self._lines())
# then work through those to group values by their _type and name
zone_name = zone.name
arpa = zone_name.endswith('in-addr.arpa.') or zone_name.endswith(
'ip6.arpa.'
)
types, ttls = self._process_symbols(zone, symbols, arpa)
# now we finally have all the values for each (soon to be) record
# collected together, turn them into their coresponding record and add
# it to the zone
for _type, names in types.items():
for name, values in names.items():
data = {
'ttl': ttls[_type].get(name, self.default_ttl),
'type': _type,
}
if len(values) > 1:
data['values'] = _unique(values)
else:
data['value'] = values[0]
record = Record.new(zone, name, data, lenient=lenient)
zone.add_record(record, lenient=lenient)
self.log.info(
'populate: found %s records', len(zone.records) - before
)
class TinyDnsFileSource(TinyDnsBaseSource): class TinyDnsFileSource(TinyDnsBaseSource):
@ -232,6 +464,11 @@ class TinyDnsFileSource(TinyDnsBaseSource):
default_ttl: 3600 default_ttl: 3600
NOTE: timestamps & lo fields are ignored if present. NOTE: timestamps & lo fields are ignored if present.
The source intends to conform to and fully support the official spec,
https://cr.yp.to/djbdns/tinydns-data.html and the common patch/extensions to
support IPv6 and a few other record types,
https://docs.bytemark.co.uk/article/tinydns-format/.
''' '''
def __init__(self, id, directory, default_ttl=3600): def __init__(self, id, directory, default_ttl=3600):


+ 26
- 0
octodns/zone.py View File

@ -75,6 +75,32 @@ class Zone(object):
# it has utf8 chars # it has utf8 chars
return self._utf8_name_re.sub('', fqdn) return self._utf8_name_re.sub('', fqdn)
def owns(self, _type, fqdn):
if fqdn[-1] != '.':
fqdn = f'{fqdn}.'
# if we exactly match the zone name we own it
if fqdn == self.name:
return True
# if we don't end with the zone's name on a boundary we aren't owned
if not fqdn.endswith(f'.{self.name}'):
return False
hostname = self.hostname_from_fqdn(fqdn)
if hostname in self.sub_zones:
# if our hostname matches a sub-zone exactly we have to be a NS
# record
return _type == 'NS'
for sub_zone in self.sub_zones:
if hostname.endswith(f'.{sub_zone}'):
# this belongs under a sub-zone
return False
# otherwise we own it
return True
def add_record(self, record, replace=False, lenient=False): def add_record(self, record, replace=False, lenient=False):
if self._origin: if self._origin:
self.hydrate() self.hydrate()


+ 15
- 0
tests/test_octodns_record.py View File

@ -8,6 +8,7 @@ from octodns.idna import idna_encode
from octodns.record import ( from octodns.record import (
AliasRecord, AliasRecord,
ARecord, ARecord,
CnameRecord,
Create, Create,
Delete, Delete,
MxValue, MxValue,
@ -176,6 +177,20 @@ class TestRecord(TestCase):
# make sure there's nothing extra # make sure there's nothing extra
self.assertEqual(5, len(records)) self.assertEqual(5, len(records))
def test_parse_rdata_texts(self):
self.assertEqual(['2.3.4.5'], ARecord.parse_rdata_texts(['2.3.4.5']))
self.assertEqual(
['2.3.4.6', '3.4.5.7'],
ARecord.parse_rdata_texts(['2.3.4.6', '3.4.5.7']),
)
self.assertEqual(
['some.target.'], CnameRecord.parse_rdata_texts(['some.target.'])
)
self.assertEqual(
['some.target.', 'other.target.'],
CnameRecord.parse_rdata_texts(['some.target.', 'other.target.']),
)
def test_values_mixin_data(self): def test_values_mixin_data(self):
# no values, no value or values in data # no values, no value or values in data
a = ARecord(self.zone, '', {'type': 'A', 'ttl': 600, 'values': []}) a = ARecord(self.zone, '', {'type': 'A', 'ttl': 600, 'values': []})


+ 98
- 8
tests/test_octodns_source_tinydns.py View File

@ -17,7 +17,7 @@ class TestTinyDnsFileSource(TestCase):
def test_populate_normal(self): def test_populate_normal(self):
got = Zone('example.com.', []) got = Zone('example.com.', [])
self.source.populate(got) self.source.populate(got)
self.assertEqual(17, len(got.records))
self.assertEqual(30, len(got.records))
expected = Zone('example.com.', []) expected = Zone('example.com.', [])
for name, data in ( for name, data in (
@ -26,8 +26,13 @@ class TestTinyDnsFileSource(TestCase):
'', '',
{ {
'type': 'NS', 'type': 'NS',
'ttl': 3600,
'values': ['ns1.ns.com.', 'ns2.ns.com.'],
'ttl': 31,
'values': [
'a.ns.example.com.',
'b.ns.example.com.',
'ns1.ns.com.',
'ns2.ns.com.',
],
}, },
), ),
( (
@ -43,6 +48,10 @@ class TestTinyDnsFileSource(TestCase):
'cname', 'cname',
{'type': 'CNAME', 'ttl': 3600, 'value': 'www.example.com.'}, {'type': 'CNAME', 'ttl': 3600, 'value': 'www.example.com.'},
), ),
(
'cname2',
{'type': 'CNAME', 'ttl': 48, 'value': 'www2.example.com.'},
),
( (
'some-host-abc123', 'some-host-abc123',
{'type': 'A', 'ttl': 1800, 'value': '10.2.3.7'}, {'type': 'A', 'ttl': 1800, 'value': '10.2.3.7'},
@ -61,7 +70,7 @@ class TestTinyDnsFileSource(TestCase):
'exchange': 'smtp-1-host.example.com.', 'exchange': 'smtp-1-host.example.com.',
}, },
{ {
'preference': 20,
'preference': 0,
'exchange': 'smtp-2-host.example.com.', 'exchange': 'smtp-2-host.example.com.',
}, },
], ],
@ -75,11 +84,11 @@ class TestTinyDnsFileSource(TestCase):
'values': [ 'values': [
{ {
'preference': 30, 'preference': 30,
'exchange': 'smtp-1-host.example.com.',
'exchange': 'smtp-3-host.mx.example.com.',
}, },
{ {
'preference': 40, 'preference': 40,
'exchange': 'smtp-2-host.example.com.',
'exchange': 'smtp-4-host.mx.example.com.',
}, },
], ],
}, },
@ -111,6 +120,83 @@ class TestTinyDnsFileSource(TestCase):
'value': 'v=DKIM1\\; k=rsa\\; p=blah', 'value': 'v=DKIM1\\; k=rsa\\; p=blah',
}, },
), ),
('b.ns', {'type': 'A', 'ttl': 31, 'value': '43.44.45.46'}),
('a.ns', {'type': 'A', 'ttl': 3600, 'value': '42.43.44.45'}),
(
'smtp-3-host.mx',
{'type': 'A', 'ttl': 1800, 'value': '21.22.23.24'},
),
(
'smtp-4-host.mx',
{'type': 'A', 'ttl': 1800, 'value': '22.23.24.25'},
),
('ns5.ns', {'type': 'A', 'ttl': 30, 'value': '14.15.16.17'}),
('ns6.ns', {'type': 'A', 'ttl': 30, 'value': '15.16.17.18'}),
(
'other',
{
'type': 'NS',
'ttl': 30,
'values': ['ns5.ns.example.com.', 'ns6.ns.example.com.'],
},
),
(
'_a._tcp',
{
'type': 'SRV',
'ttl': 43,
'values': [
{
'priority': 0,
'weight': 0,
'port': 8888,
'target': 'target.srv.example.com.',
},
{
'priority': 10,
'weight': 50,
'port': 8080,
'target': 'target.somewhere.else.',
},
],
},
),
('target.srv', {'type': 'A', 'ttl': 43, 'value': '56.57.58.59'}),
(
'_b._tcp',
{
'type': 'SRV',
'ttl': 3600,
'values': [
{
'priority': 0,
'weight': 0,
'port': 9999,
'target': 'target.srv.example.com.',
}
],
},
),
(
'arbitrary-sshfp',
{
'type': 'SSHFP',
'ttl': 45,
'values': [
{
'algorithm': 1,
'fingerprint_type': 2,
'fingerprint': '00479b27',
},
{
'algorithm': 2,
'fingerprint_type': 2,
'fingerprint': '00479a28',
},
],
},
),
('arbitrary-a', {'type': 'A', 'ttl': 3600, 'value': '80.81.82.83'}),
): ):
record = Record.new(expected, name, data) record = Record.new(expected, name, data)
expected.add_record(record) expected.add_record(record)
@ -162,7 +248,10 @@ class TestTinyDnsFileSource(TestCase):
{ {
'type': 'PTR', 'type': 'PTR',
'ttl': 3600, 'ttl': 3600,
'value': 'has-dup-def123.example.com.',
'values': [
'has-dup-def123.example.com.',
'has-dup-def456.example.com.',
],
}, },
), ),
( (
@ -183,4 +272,5 @@ class TestTinyDnsFileSource(TestCase):
def test_ignores_subs(self): def test_ignores_subs(self):
got = Zone('example.com.', ['sub']) got = Zone('example.com.', ['sub'])
self.source.populate(got) self.source.populate(got)
self.assertEqual(16, len(got.records))
# we don't see one www.sub.example.com. record b/c it's in a sub
self.assertEqual(29, len(got.records))

+ 25
- 0
tests/test_octodns_zone.py View File

@ -191,6 +191,31 @@ class TestZone(TestCase):
Zone('space not allowed.', []) Zone('space not allowed.', [])
self.assertTrue('whitespace not allowed' in str(ctx.exception)) self.assertTrue('whitespace not allowed' in str(ctx.exception))
def test_owns(self):
zone = Zone('unit.tests.', set(['sub']))
self.assertTrue(zone.owns('A', 'unit.tests'))
self.assertTrue(zone.owns('A', 'unit.tests.'))
self.assertTrue(zone.owns('A', 'www.unit.tests.'))
self.assertTrue(zone.owns('A', 'www.unit.tests.'))
# we do own our direct sub's delegation NS records
self.assertTrue(zone.owns('NS', 'sub.unit.tests.'))
# we don't own the root of our sub
self.assertFalse(zone.owns('A', 'sub.unit.tests.'))
# of anything under it
self.assertFalse(zone.owns('A', 'www.sub.unit.tests.'))
# including subsequent delegatoin NS records
self.assertFalse(zone.owns('NS', 'below.sub.unit.tests.'))
# edge cases
# we don't own something that ends with our name, but isn't a boundary
self.assertFalse(zone.owns('A', 'foo-unit.tests.'))
# we do something that ends with the sub-zone, but isn't at a boundary
self.assertTrue(zone.owns('A', 'foo-sub.unit.tests.'))
def test_sub_zones(self): def test_sub_zones(self):
# NS for exactly the sub is allowed # NS for exactly the sub is allowed
zone = Zone('unit.tests.', set(['sub', 'barred'])) zone = Zone('unit.tests.', set(['sub', 'barred']))


+ 32
- 7
tests/zones/tinydns/example.com View File

@ -5,6 +5,8 @@
# Multi-value A # Multi-value A
+example.com:10.2.3.4:30 +example.com:10.2.3.4:30
+example.com.:10.2.3.5:30 +example.com.:10.2.3.5:30
# duplicate value should be ignored
+example.com:10.2.3.4
Ccname.other.foo:www.other.foo Ccname.other.foo:www.other.foo
@ -26,21 +28,27 @@ Ccname.other.foo:www.other.foo
# MX # MX
@example.com::smtp-1-host.example.com:10 @example.com::smtp-1-host.example.com:10
@example.com.::smtp-2-host.example.com:20
# MX with ttl
@smtp.example.com::smtp-1-host.example.com:30:1800
@smtp.example.com.::smtp-2-host.example.com:40:1800
@example.com.::smtp-2-host.example.com.
# MX with ttl and ip
@smtp.example.com:21.22.23.24:smtp-3-host:30:1800
@smtp.example.com.:22.23.24.25:smtp-4-host:40:1800
# NS
# NS for sub
.sub.example.com::ns3.ns.com:30 .sub.example.com::ns3.ns.com:30
.sub.example.com.::ns4.ns.com:30
.sub.example.com.::ns4.ns.com.:30
# NS with ip
.other.example.com:14.15.16.17:ns5:30
.other.example.com.:15.16.17.18:ns6:30
# A, under sub # A, under sub
+www.sub.example.com::1.2.3.4
+www.sub.example.com:1.2.3.4
# Top-level NS # Top-level NS
.example.com::ns1.ns.com .example.com::ns1.ns.com
.example.com.::ns2.ns.com .example.com.::ns2.ns.com
# Top-level NS with automatic A
&example.com:42.43.44.45:a
&example.com.:43.44.45.46:b:31
# sub special cases # sub special cases
+a1.blah-asdf.subtest.com:10.2.3.5 +a1.blah-asdf.subtest.com:10.2.3.5
@ -55,3 +63,20 @@ Ccname.other.foo:www.other.foo
6ipv6-6.example.com:2a021348017cd5d0002419fffef35743 6ipv6-6.example.com:2a021348017cd5d0002419fffef35743
'semicolon.example.com:v=DKIM1; k=rsa; p=blah:300 'semicolon.example.com:v=DKIM1; k=rsa; p=blah:300
# SRV
S_a._tcp.example.com:56.57.58.59:target:8888
S_a._tcp.example.com::target.somewhere.else:8080:10:50:43
# will try and re-create an already existing A with the same IP, should be a
# noop
S_b._tcp.example.com:56.57.58.59:target.srv.example.com.:9999
# complete duplicate should be ignored
S_b._tcp.example.com:56.57.58.59:target.srv.example.com.:9999
# arbitrary multi-value non-spec record
:arbitrary-sshfp.example.com:SSHFP:2 2 00479a28
:arbitrary-sshfp.example.com:SSHFP:1 2 00479b27:45
# does not make sense to do an A this way, but it'll work
:arbitrary-a.example.com:a:80.81.82.83
# this should just be inored b/c the type is unknown
:arbitrary-invalid.example.com:invalid:does not matter:99

+ 1
- 0
tests/zones/tinydns/other.foo View File

@ -3,5 +3,6 @@
# CNAME with trailing comment # CNAME with trailing comment
Ccname.example.com:www.example.com # this is a comment Ccname.example.com:www.example.com # this is a comment
Ccname2.example.com:www2.example.com.:48
+www.other.foo:14.2.3.6 +www.other.foo:14.2.3.6

Loading…
Cancel
Save