Browse Source

Merge remote-tracking branch 'origin/main' into chunked-variation-handling

chunked-variation-handling
Ross McFarland 2 years ago
parent
commit
9f8ac99682
No known key found for this signature in database GPG Key ID: 943B179E15D3B22A
12 changed files with 1099 additions and 25 deletions
  1. +7
    -0
      CHANGELOG.md
  2. +1
    -1
      octodns/__init__.py
  3. +18
    -19
      octodns/manager.py
  4. +17
    -0
      octodns/provider/plan.py
  5. +6
    -0
      octodns/record/__init__.py
  6. +11
    -2
      octodns/record/change.py
  7. +19
    -0
      octodns/record/https.py
  8. +308
    -0
      octodns/record/svcb.py
  9. +10
    -0
      tests/config/dynamic-config.yaml
  10. +23
    -3
      tests/test_octodns_plan.py
  11. +668
    -0
      tests/test_octodns_record_svcb.py
  12. +11
    -0
      tests/zones/unit.tests.tst

+ 7
- 0
CHANGELOG.md View File

@ -1,5 +1,12 @@
## v1.?.? - 2024-??-?? - ???
* Improved handling of present, but empty/None config file values.
* Add PlanJson plan_output support
* Include `record_type` in Change data
## v1.8.0 - 2024-06-10 - Set the records straight
* Add support for SVCB and HTTPS records
* Allow DS records to be specified for managed sub-zones, same as NS
* Fix CAA rdata parsing to allow values with tags
* Improve TXT (and SPF) record handling of unexpected whitespace


+ 1
- 1
octodns/__init__.py View File

@ -1,4 +1,4 @@
'OctoDNS: DNS as code - Tools for managing DNS across multiple providers'
# TODO: remove __VERSION__ w/2.x
__version__ = __VERSION__ = '1.7.0'
__version__ = __VERSION__ = '1.8.0'

+ 18
- 19
octodns/manager.py View File

@ -110,7 +110,7 @@ class Manager(object):
zones = self.config['zones']
self.config['zones'] = self._config_zones(zones)
manager_config = self.config.get('manager', {})
manager_config = self.config.get('manager') or {}
self._executor = self._config_executor(manager_config, max_workers)
self.include_meta = self._config_include_meta(
manager_config, include_meta
@ -122,17 +122,19 @@ class Manager(object):
# add our hard-coded environ handler first so that other secret
# providers can pull in env variables w/it
self.secret_handlers = {'env': EnvironSecrets('env')}
secret_handlers_config = self.config.get('secret_handlers', {})
secret_handlers_config = self.config.get('secret_handlers') or {}
self.secret_handlers.update(
self._config_secret_handlers(secret_handlers_config)
)
self.auto_arpa = self._config_auto_arpa(manager_config, auto_arpa)
self.global_processors = manager_config.get('processors', [])
self.global_processors = manager_config.get('processors') or []
self.log.info('__init__: global_processors=%s', self.global_processors)
self.global_post_processors = manager_config.get('post_processors', [])
self.global_post_processors = (
manager_config.get('post_processors') or []
)
self.log.info(
'__init__: global_post_processors=%s', self.global_post_processors
)
@ -140,7 +142,7 @@ class Manager(object):
providers_config = self.config['providers']
self.providers = self._config_providers(providers_config)
processors_config = self.config.get('processors', {})
processors_config = self.config.get('processors') or {}
self.processors = self._config_processors(processors_config)
if self.auto_arpa:
@ -168,15 +170,12 @@ class Manager(object):
self.processors[meta.id] = meta
self.global_post_processors.append(meta.id)
plan_outputs_config = manager_config.get(
'plan_outputs',
{
'_logger': {
'class': 'octodns.provider.plan.PlanLogger',
'level': 'info',
}
},
)
plan_outputs_config = manager_config.get('plan_outputs') or {
'_logger': {
'class': 'octodns.provider.plan.PlanLogger',
'level': 'info',
}
}
self.plan_outputs = self._config_plan_outputs(plan_outputs_config)
def _config_zones(self, zones):
@ -199,7 +198,7 @@ class Manager(object):
def _config_executor(self, manager_config, max_workers=None):
max_workers = (
manager_config.get('max_workers', 1)
manager_config.get('max_workers') or 1
if max_workers is None
else max_workers
)
@ -546,7 +545,7 @@ class Manager(object):
def _get_sources(self, decoded_zone_name, config, eligible_sources):
try:
sources = config['sources']
sources = config['sources'] or []
except KeyError:
raise ManagerException(
f'Zone {decoded_zone_name} is missing sources'
@ -691,7 +690,7 @@ class Manager(object):
)
try:
targets = config['targets']
targets = config['targets'] or []
except KeyError:
raise ManagerException(
f'Zone {decoded_zone_name} is missing targets'
@ -699,7 +698,7 @@ class Manager(object):
processors = (
self.global_processors
+ config.get('processors', [])
+ (config.get('processors') or [])
+ self.global_post_processors
)
self.log.info('sync: processors=%s', processors)
@ -1032,7 +1031,7 @@ class Manager(object):
source.populate(zone, lenient=lenient)
# check that processors are in order if any are specified
processors = config.get('processors', [])
processors = config.get('processors') or []
try:
# same as above, but for processors this time
for processor in processors:


+ 17
- 0
octodns/provider/plan.py View File

@ -2,7 +2,9 @@
#
#
from collections import defaultdict
from io import StringIO
from json import dumps
from logging import DEBUG, ERROR, INFO, WARN, getLogger
from sys import stdout
@ -220,6 +222,21 @@ def _value_stringifier(record, sep):
return sep.join(values)
class PlanJson(_PlanOutput):
def __init__(self, name, indent=None, sort_keys=True):
super().__init__(name)
self.indent = indent
self.sort_keys = sort_keys
def run(self, plans, fh=stdout, *args, **kwargs):
data = defaultdict(dict)
for target, plan in plans:
data[target.id][plan.desired.name] = plan.data
fh.write(dumps(data, indent=self.indent, sort_keys=self.sort_keys))
fh.write('\n')
class PlanMarkdown(_PlanOutput):
def run(self, plans, fh=stdout, *args, **kwargs):
if plans:


+ 6
- 0
octodns/record/__init__.py View File

@ -14,6 +14,7 @@ from .dname import DnameRecord, DnameValue
from .ds import DsRecord, DsValue
from .exception import RecordException, ValidationError
from .geo import GeoCodes, GeoValue
from .https import HttpsRecord, HttpsValue
from .loc import LocRecord, LocValue
from .mx import MxRecord, MxValue
from .naptr import NaptrRecord, NaptrValue
@ -23,6 +24,7 @@ from .rr import Rr, RrParseError
from .spf import SpfRecord
from .srv import SrvRecord, SrvValue
from .sshfp import SshfpRecord, SshfpValue
from .svcb import SvcbRecord, SvcbValue
from .tlsa import TlsaRecord, TlsaValue
from .txt import TxtRecord, TxtValue
from .urlfwd import UrlfwdRecord, UrlfwdValue
@ -45,6 +47,8 @@ DsRecord
DsValue
GeoCodes
GeoValue
HttpsRecord
HttpsValue
Ipv4Address
Ipv4Value
Ipv6Address
@ -68,6 +72,8 @@ SrvRecord
SrvValue
SshfpRecord
SshfpValue
SvcbRecord
SvcbValue
TlsaRecord
TlsaValue
TxtRecord


+ 11
- 2
octodns/record/change.py View File

@ -27,7 +27,11 @@ class Create(Change):
@property
def data(self):
return {'type': 'create', 'new': self.new.data}
return {
'type': 'create',
'new': self.new.data,
'record_type': self.new._type,
}
def __repr__(self, leader=''):
source = self.new.source.id if self.new.source else ''
@ -43,6 +47,7 @@ class Update(Change):
'type': 'update',
'existing': self.existing.data,
'new': self.new.data,
'record_type': self.new._type,
}
# Leader is just to allow us to work around heven eating leading whitespace
@ -65,7 +70,11 @@ class Delete(Change):
@property
def data(self):
return {'type': 'delete', 'existing': self.existing.data}
return {
'type': 'delete',
'existing': self.existing.data,
'record_type': self.existing._type,
}
def __repr__(self, leader=''):
return f'Delete {self.existing}'

+ 19
- 0
octodns/record/https.py View File

@ -0,0 +1,19 @@
#
# This file describes the HTTPS records as defined in RFC 9460
# It also supports the 'ech' SvcParam as defined in draft-ietf-tls-svcb-ech-02
#
from .base import Record, ValuesMixin
from .svcb import SvcbValue
class HttpsValue(SvcbValue):
pass
class HttpsRecord(ValuesMixin, Record):
_type = 'HTTPS'
_value_type = HttpsValue
Record.register_type(HttpsRecord)

+ 308
- 0
octodns/record/svcb.py View File

@ -0,0 +1,308 @@
#
# This file describes the SVCB records as defined in RFC 9460
# It also supports the 'ech' SvcParam as defined in draft-ietf-tls-svcb-ech-02
#
from base64 import b64decode
from binascii import Error as binascii_error
from ipaddress import AddressValueError, IPv4Address, IPv6Address
from fqdn import FQDN
from ..equality import EqualityTupleMixin
from ..idna import idna_encode
from .base import Record, ValuesMixin, unquote
from .chunked import _ChunkedValue
from .rr import RrParseError
SUPPORTED_PARAMS = {}
def validate_svcparam_port(svcparamvalue):
reasons = []
try:
port = int(svcparamvalue)
if 0 < port > 65535:
reasons.append(f'port {port} is not a valid number')
except ValueError:
reasons.append('port is not a number')
return reasons
def validate_list(svcparamkey, svcparamvalue):
if not isinstance(svcparamvalue, list):
return [f'{svcparamkey} is not a list']
return list()
def validate_svcparam_alpn(svcparamvalue):
reasons = validate_list('alpn', svcparamvalue)
if len(reasons) != 0:
return reasons
for alpn in svcparamvalue:
reasons += _ChunkedValue.validate(alpn, 'SVCB')
return reasons
def validate_svcparam_iphint(ip_version, svcparamvalue):
reasons = validate_list(f'ipv{ip_version}hint', svcparamvalue)
if len(reasons) != 0:
return reasons
for address in svcparamvalue:
try:
if ip_version == 4:
IPv4Address(address)
if ip_version == 6:
IPv6Address(address)
except AddressValueError:
reasons.append(
f'ipv{ip_version}hint "{address}" is not a valid IPv{ip_version} address'
)
return reasons
def validate_svcparam_ipv4hint(svcparamvalue):
return validate_svcparam_iphint(4, svcparamvalue)
def validate_svcparam_ipv6hint(svcparamvalue):
return validate_svcparam_iphint(6, svcparamvalue)
def validate_svcparam_mandatory(svcparamvalue):
reasons = validate_list('mandatory', svcparamvalue)
if len(reasons) != 0:
return reasons
for mandatory in svcparamvalue:
if (
mandatory not in SUPPORTED_PARAMS.keys()
and not mandatory.startswith('key')
):
reasons.append(f'unsupported SvcParam "{mandatory}" in mandatory')
if mandatory.startswith('key'):
reasons += validate_svckey_number(mandatory)
return reasons
def validate_svcparam_ech(svcparamvalue):
try:
b64decode(svcparamvalue, validate=True)
except binascii_error:
return ['ech SvcParam is invalid Base64']
def validate_svckey_number(paramkey):
try:
paramkeynum = int(paramkey[3:])
if 7 < paramkeynum > 65535:
return [f'SvcParam key "{paramkey}" has wrong key number']
except ValueError:
return [f'SvcParam key "{paramkey}" has wrong format']
return []
def parse_rdata_text_svcparamvalue_list(svcparamvalue):
return svcparamvalue.split(',')
def svcparamkeysort(svcparamkey):
if svcparamkey.startswith('key'):
return int(svcparamkey[3:])
return SUPPORTED_PARAMS[svcparamkey]['key_num']
# cc https://datatracker.ietf.org/doc/html/rfc9460#keys
SUPPORTED_PARAMS = {
'no-default-alpn': {'key_num': 2, 'has_arg': False},
'alpn': {
'key_num': 1,
'validate': validate_svcparam_alpn,
'parse_rdata_text': parse_rdata_text_svcparamvalue_list,
},
'port': {'key_num': 3, 'validate': validate_svcparam_port},
'ipv4hint': {
'key_num': 4,
'validate': validate_svcparam_ipv4hint,
'parse_rdata_text': parse_rdata_text_svcparamvalue_list,
},
'ipv6hint': {
'key_num': 6,
'validate': validate_svcparam_ipv6hint,
'parse_rdata_text': parse_rdata_text_svcparamvalue_list,
},
'mandatory': {
'key_num': 0,
'validate': validate_svcparam_mandatory,
'parse_rdata_text': parse_rdata_text_svcparamvalue_list,
},
'ech': {'key_num': 5, 'validate': validate_svcparam_ech},
}
class SvcbValue(EqualityTupleMixin, dict):
@classmethod
def parse_rdata_text(cls, value):
try:
(svcpriority, targetname, *svcparams) = value.split(' ')
except ValueError:
raise RrParseError()
try:
svcpriority = int(svcpriority)
except ValueError:
pass
targetname = unquote(targetname)
params = dict()
for svcparam in svcparams:
paramkey, *paramvalue = svcparam.split('=')
if paramkey in params.keys():
raise RrParseError(f'{paramkey} is specified twice')
if len(paramvalue) != 0:
params[paramkey] = paramvalue[0]
parse_rdata_text = SUPPORTED_PARAMS.get(paramkey, {}).get(
'parse_rdata_text', None
)
if parse_rdata_text is not None:
params[paramkey] = parse_rdata_text(paramvalue[0])
else:
params[paramkey] = None
return {
'svcpriority': svcpriority,
'targetname': targetname,
'svcparams': params,
}
@classmethod
def validate(cls, data, _):
reasons = []
for value in data:
svcpriority = -1
if 'svcpriority' not in value:
reasons.append('missing svcpriority')
else:
try:
svcpriority = int(value.get('svcpriority', 0))
if svcpriority < 0 or svcpriority > 65535:
reasons.append(f'invalid priority ' f'"{svcpriority}"')
except ValueError:
reasons.append(f'invalid priority "{value["svcpriority"]}"')
if 'targetname' not in value or value['targetname'] == '':
reasons.append('missing targetname')
else:
targetname = str(value.get('targetname', ''))
targetname = idna_encode(targetname)
if not targetname.endswith('.'):
reasons.append(
f'SVCB value "{targetname}" missing trailing .'
)
if targetname != '.' and not FQDN(targetname).is_valid:
reasons.append(
f'Invalid SVCB target "{targetname}" is not a valid FQDN.'
)
if 'svcparams' in value:
svcparams = value.get('svcparams', dict())
if svcpriority == 0 and len(svcparams) != 0:
reasons.append('svcparams set on AliasMode SVCB record')
for svcparamkey, svcparamvalue in svcparams.items():
# XXX: Should we test for keys existing when set in 'mandatory'?
if svcparamkey.startswith('key'):
reasons += validate_svckey_number(svcparamkey)
continue
if (
svcparamkey not in SUPPORTED_PARAMS.keys()
and not svcparamkey.startswith('key')
):
reasons.append(f'Unknown SvcParam {svcparamkey}')
continue
if SUPPORTED_PARAMS[svcparamkey].get('has_arg', True):
reasons += SUPPORTED_PARAMS[svcparamkey]['validate'](
svcparamvalue
)
if (
not SUPPORTED_PARAMS[svcparamkey].get('has_arg', True)
and svcparamvalue is not None
):
reasons.append(
f'SvcParam {svcparamkey} has value when it should not'
)
return reasons
@classmethod
def process(cls, values):
return [cls(v) for v in values]
def __init__(self, value):
super().__init__(
{
'svcpriority': int(value['svcpriority']),
'targetname': idna_encode(value['targetname']),
'svcparams': value.get('svcparams', dict()),
}
)
@property
def svcpriority(self):
return self['svcpriority']
@svcpriority.setter
def svcpriority(self, value):
self['svcpriority'] = value
@property
def targetname(self):
return self['targetname']
@targetname.setter
def targetname(self, value):
self['targetname'] = value
@property
def svcparams(self):
return self['svcparams']
@svcparams.setter
def svcparams(self, value):
self['svcparams'] = value
@property
def rdata_text(self):
params = ''
sorted_svcparamkeys = sorted(self.svcparams, key=svcparamkeysort)
for svcparamkey in sorted_svcparamkeys:
params += f' {svcparamkey}'
svcparamvalue = self.svcparams.get(svcparamkey, None)
if svcparamvalue is not None:
if isinstance(svcparamvalue, list):
params += f'={",".join(svcparamvalue)}'
else:
params += f'={svcparamvalue}'
return f'{self.svcpriority} {self.targetname}{params}'
def __hash__(self):
return hash(self.__repr__())
def _equality_tuple(self):
params = set()
for svcparamkey, svcparamvalue in self.svcparams.items():
if svcparamvalue is not None:
if isinstance(svcparamvalue, list):
params.add(f'{svcparamkey}={",".join(svcparamvalue)}')
else:
params.add(f'{svcparamkey}={svcparamvalue}')
else:
params.add(f'{svcparamkey}')
return (self.svcpriority, self.targetname, params)
def __repr__(self):
return f"'{self.rdata_text}'"
class SvcbRecord(ValuesMixin, Record):
_type = 'SVCB'
_value_type = SvcbValue
Record.register_type(SvcbRecord)

+ 10
- 0
tests/config/dynamic-config.yaml View File

@ -1,3 +1,11 @@
# Test whether <xyz>=None blows up
manager:
secret_handlers:
processors:
post_processors:
plan_outputs:
providers:
in:
class: octodns.provider.yaml.YamlProvider
@ -15,6 +23,8 @@ zones:
'*.one':
sources:
- in
# does None value blow up
processors:
targets:
- dump


+ 23
- 3
tests/test_octodns_plan.py View File

@ -3,6 +3,7 @@
#
from io import StringIO
from json import loads
from logging import getLogger
from unittest import TestCase
@ -11,6 +12,7 @@ from helpers import SimpleProvider
from octodns.provider.plan import (
Plan,
PlanHtml,
PlanJson,
PlanLogger,
PlanMarkdown,
RootNsChange,
@ -126,6 +128,17 @@ class TestPlanHtml(TestCase):
)
class TestPlanJson(TestCase):
def test_basics(self):
out = StringIO()
PlanJson('json').run(plans, fh=out)
data = loads(out.getvalue())
for key in ('test', 'unit.tests.', 'changes'):
self.assertTrue(key in data)
data = data[key]
self.assertEqual(4, len(data))
class TestPlanMarkdown(TestCase):
log = getLogger('TestPlanMarkdown')
@ -394,18 +407,25 @@ class TestPlanSafety(TestCase):
# we'll test the change .data's here while we're at it since they don't
# have a dedicated test (file)
delete_data = data['changes'][0] # delete
self.assertEqual(['existing', 'type'], sorted(delete_data.keys()))
self.assertEqual(
['existing', 'record_type', 'type'], sorted(delete_data.keys())
)
self.assertEqual('delete', delete_data['type'])
self.assertEqual('A', delete_data['record_type'])
self.assertEqual(delete.existing.data, delete_data['existing'])
create_data = data['changes'][1] # create
self.assertEqual(['new', 'type'], sorted(create_data.keys()))
self.assertEqual(
['new', 'record_type', 'type'], sorted(create_data.keys())
)
self.assertEqual('create', create_data['type'])
self.assertEqual('CNAME', create_data['record_type'])
self.assertEqual(create.new.data, create_data['new'])
update_data = data['changes'][3] # update
self.assertEqual(
['existing', 'new', 'type'], sorted(update_data.keys())
['existing', 'new', 'record_type', 'type'],
sorted(update_data.keys()),
)
self.assertEqual('update', update_data['type'])
self.assertEqual(update.existing.data, update_data['existing'])


+ 668
- 0
tests/test_octodns_record_svcb.py View File

@ -0,0 +1,668 @@
#
#
#
from unittest import TestCase
from helpers import SimpleProvider
from octodns.record import Record
from octodns.record.exception import ValidationError
from octodns.record.rr import RrParseError
from octodns.record.svcb import SvcbRecord, SvcbValue
from octodns.zone import Zone
class TestRecordSvcb(TestCase):
zone = Zone('unit.tests.', [])
def test_svcb(self):
aliasmode_value = SvcbValue(
{'svcpriority': 0, 'targetname': 'foo.example.com.'}
)
aliasmode_data = {'ttl': 300, 'value': aliasmode_value}
a = SvcbRecord(self.zone, 'alias', aliasmode_data)
self.assertEqual('alias', a.name)
self.assertEqual('alias.unit.tests.', a.fqdn)
self.assertEqual(300, a.ttl)
self.assertEqual(
aliasmode_value['svcpriority'], a.values[0].svcpriority
)
self.assertEqual(aliasmode_value['targetname'], a.values[0].targetname)
self.assertEqual(aliasmode_value['svcparams'], a.values[0].svcparams)
self.assertEqual(aliasmode_data, a.data)
servicemode_values = [
SvcbValue(
{
'svcpriority': 1,
'targetname': 'foo.example.com.',
'svcparams': {'port': 8002},
}
),
SvcbValue(
{
'svcpriority': 2,
'targetname': 'foo.example.net.',
'svcparams': {'port': 8080},
}
),
]
servicemode_data = {'ttl': 300, 'values': servicemode_values}
b = SvcbRecord(self.zone, 'service', servicemode_data)
self.assertEqual('service', b.name)
self.assertEqual('service.unit.tests.', b.fqdn)
self.assertEqual(300, b.ttl)
self.assertEqual(
servicemode_values[0]['svcpriority'], b.values[0].svcpriority
)
self.assertEqual(
servicemode_values[0]['targetname'], b.values[0].targetname
)
self.assertEqual(
servicemode_values[0]['svcparams'], b.values[0].svcparams
)
self.assertEqual(
servicemode_values[1]['svcpriority'], b.values[1].svcpriority
)
self.assertEqual(
servicemode_values[1]['targetname'], b.values[1].targetname
)
self.assertEqual(
servicemode_values[1]['svcparams'], b.values[1].svcparams
)
self.assertEqual(servicemode_data, b.data)
target = SimpleProvider()
# No changes with self
self.assertFalse(b.changes(b, target))
# Diff in priority causes change
other = SvcbRecord(
self.zone, 'service2', {'ttl': 30, 'values': servicemode_values}
)
other.values[0].svcpriority = 22
change = b.changes(other, target)
self.assertEqual(change.existing, b)
self.assertEqual(change.new, other)
# Diff in target causes change
other.values[0].svcpriority = b.values[0].svcpriority
other.values[0].targetname = 'blabla.example.com.'
change = b.changes(other, target)
self.assertEqual(change.existing, b)
self.assertEqual(change.new, other)
# Diff in params causes change
other.values[0].targetname = b.values[0].targetname
other.values[0].svcparams = {'port': '8888'}
change = b.changes(other, target)
self.assertEqual(change.existing, b)
self.assertEqual(change.new, other)
# __repr__ doesn't blow up
a.__repr__()
b.__repr__()
def test_svcb_value_rdata_text(self):
# empty string won't parse
with self.assertRaises(RrParseError):
SvcbValue.parse_rdata_text('')
# single word won't parse
with self.assertRaises(RrParseError):
SvcbValue.parse_rdata_text('nope')
# Double keys are not allowed
with self.assertRaises(RrParseError):
SvcbValue.parse_rdata_text('1 foo.example.com port=8080 port=8084')
# priority not int
self.assertEqual(
{
'svcpriority': 'one',
'targetname': 'foo.example.com',
'svcparams': dict(),
},
SvcbValue.parse_rdata_text('one foo.example.com'),
)
# valid with params
self.assertEqual(
{
'svcpriority': 1,
'targetname': 'svcb.unit.tests.',
'svcparams': {'port': '8080', 'no-default-alpn': None},
},
SvcbValue.parse_rdata_text(
'1 svcb.unit.tests. port=8080 no-default-alpn'
),
)
# quoted target
self.assertEqual(
{
'svcpriority': 1,
'targetname': 'svcb.unit.tests.',
'svcparams': dict(),
},
SvcbValue.parse_rdata_text('1 "svcb.unit.tests."'),
)
zone = Zone('unit.tests.', [])
a = SvcbRecord(
zone,
'svc',
{
'ttl': 32,
'value': {
'svcpriority': 1,
'targetname': 'svcb.unit.tests.',
'svcparams': {'port': '8080'},
},
},
)
self.assertEqual(1, a.values[0].svcpriority)
self.assertEqual('svcb.unit.tests.', a.values[0].targetname)
self.assertEqual({'port': '8080'}, a.values[0].svcparams)
# both directions should match
rdata = '1 svcb.unit.tests. no-default-alpn port=8080 ipv4hint=192.0.2.2,192.0.2.53 key3333=foobar'
record = SvcbRecord(
zone, 'svc', {'ttl': 32, 'value': SvcbValue.parse_rdata_text(rdata)}
)
self.assertEqual(rdata, record.values[0].rdata_text)
# both directions should match
rdata = '0 svcb.unit.tests.'
record = SvcbRecord(
zone, 'svc', {'ttl': 32, 'value': SvcbValue.parse_rdata_text(rdata)}
)
self.assertEqual(rdata, record.values[0].rdata_text)
def test_svcb_value(self):
a = SvcbValue(
{'svcpriority': 0, 'targetname': 'foo.', 'svcparams': dict()}
)
b = SvcbValue(
{'svcpriority': 1, 'targetname': 'foo.', 'svcparams': dict()}
)
c = SvcbValue(
{
'svcpriority': 0,
'targetname': 'foo.',
'svcparams': {'port': 8080, 'no-default-alpn': None},
}
)
d = SvcbValue(
{
'svcpriority': 0,
'targetname': 'foo.',
'svcparams': {'alpn': ['h2', 'h3'], 'port': 8080},
}
)
e = SvcbValue(
{
'svcpriority': 0,
'targetname': 'mmm.',
'svcparams': {'ipv4hint': ['192.0.2.1']},
}
)
self.assertEqual(a, a)
self.assertEqual(b, b)
self.assertEqual(c, c)
self.assertEqual(d, d)
self.assertEqual(e, e)
self.assertNotEqual(a, b)
self.assertNotEqual(a, c)
self.assertNotEqual(a, d)
self.assertNotEqual(a, e)
self.assertNotEqual(b, a)
self.assertNotEqual(b, c)
self.assertNotEqual(b, d)
self.assertNotEqual(b, e)
self.assertNotEqual(c, a)
self.assertNotEqual(c, b)
self.assertNotEqual(c, d)
self.assertNotEqual(c, e)
self.assertNotEqual(d, a)
self.assertNotEqual(d, b)
self.assertNotEqual(d, c)
self.assertNotEqual(d, e)
self.assertNotEqual(e, a)
self.assertNotEqual(e, b)
self.assertNotEqual(e, c)
self.assertNotEqual(e, d)
self.assertTrue(a < b)
self.assertTrue(a < c)
self.assertTrue(b > a)
self.assertTrue(b > c)
self.assertTrue(c > a)
self.assertTrue(c < b)
self.assertTrue(a <= b)
self.assertTrue(a <= c)
self.assertTrue(a <= a)
self.assertTrue(a >= a)
self.assertTrue(b >= a)
self.assertTrue(b >= c)
self.assertTrue(b >= b)
self.assertTrue(b <= b)
self.assertTrue(c >= a)
self.assertTrue(c <= b)
self.assertTrue(c >= c)
self.assertTrue(c <= c)
# Hash
values = set()
values.add(a)
self.assertTrue(a in values)
self.assertFalse(b in values)
values.add(b)
self.assertTrue(b in values)
def test_validation(self):
# doesn't blow up
Record.new(
self.zone,
'svcb',
{
'type': 'SVCB',
'ttl': 600,
'value': {'svcpriority': 1, 'targetname': 'foo.bar.baz.'},
},
)
# Wildcards are fine
Record.new(
self.zone,
'*',
{
'type': 'SVCB',
'ttl': 600,
'value': {'svcpriority': 1, 'targetname': 'foo.bar.baz.'},
},
)
# missing priority
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {'targetname': 'foo.bar.baz.'},
},
)
self.assertEqual(['missing svcpriority'], ctx.exception.reasons)
# invalid priority
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 'foo',
'targetname': 'foo.bar.baz.',
},
},
)
self.assertEqual(['invalid priority "foo"'], ctx.exception.reasons)
# invalid priority (out of range)
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 100000,
'targetname': 'foo.bar.baz.',
},
},
)
self.assertEqual(['invalid priority "100000"'], ctx.exception.reasons)
# missing target
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{'type': 'SVCB', 'ttl': 600, 'value': {'svcpriority': 1}},
)
self.assertEqual(['missing targetname'], ctx.exception.reasons)
# invalid target
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {'svcpriority': 1, 'targetname': 'foo.bar.baz'},
},
)
self.assertEqual(
['SVCB value "foo.bar.baz" missing trailing .'],
ctx.exception.reasons,
)
# falsey target
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {'svcpriority': 1, 'targetname': ''},
},
)
self.assertEqual(['missing targetname'], ctx.exception.reasons)
# target must be a valid FQDN
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 1,
'targetname': 'bla foo.bar.com.',
},
},
)
self.assertEqual(
['Invalid SVCB target "bla foo.bar.com." is not a valid FQDN.'],
ctx.exception.reasons,
)
# target can be root label
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {'svcpriority': 1, 'targetname': '.'},
},
)
# Params can't be set for AliasMode
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 0,
'targetname': 'foo.bar.com.',
'svcparams': {'port': '8000'},
},
},
)
self.assertEqual(
['svcparams set on AliasMode SVCB record'], ctx.exception.reasons
)
# Unknown param
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 1,
'targetname': 'foo.bar.com.',
'svcparams': {'blablabla': '222'},
},
},
)
self.assertEqual(['Unknown SvcParam blablabla'], ctx.exception.reasons)
# Port number invalid
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 1,
'targetname': 'foo.bar.com.',
'svcparams': {'port': 100000},
},
},
)
self.assertEqual(
['port 100000 is not a valid number'], ctx.exception.reasons
)
# Port number not an int
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 1,
'targetname': 'foo.bar.com.',
'svcparams': {'port': 'foo'},
},
},
)
self.assertEqual(['port is not a number'], ctx.exception.reasons)
# no-default-alpn set
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 1,
'targetname': 'foo.bar.com.',
'svcparams': {'no-default-alpn': None},
},
},
)
# no-default-alpn has value
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 1,
'targetname': 'foo.bar.com.',
'svcparams': {'no-default-alpn': 'foobar'},
},
},
)
self.assertEqual(
['SvcParam no-default-alpn has value when it should not'],
ctx.exception.reasons,
)
# alpn is broken
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 1,
'targetname': 'foo.bar.com.',
'svcparams': {'alpn': ['h2', '😅']},
},
},
)
self.assertEqual(['non ASCII character in "😅"'], ctx.exception.reasons)
# svcbvaluelist that is not a list
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 1,
'targetname': 'foo.bar.com.',
'svcparams': {
'ipv4hint': '192.0.2.1,192.0.2.2',
'ipv6hint': '2001:db8::1',
'mandatory': 'ipv6hint',
'alpn': 'h2,h3',
},
},
},
)
self.assertEqual(
[
'ipv4hint is not a list',
'ipv6hint is not a list',
'mandatory is not a list',
'alpn is not a list',
],
ctx.exception.reasons,
)
# ipv4hint
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 1,
'targetname': 'foo.bar.com.',
'svcparams': {
'ipv4hint': ['192.0.2.0', '500.500.30.30']
},
},
},
)
self.assertEqual(
['ipv4hint "500.500.30.30" is not a valid IPv4 address'],
ctx.exception.reasons,
)
# ipv6hint
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 1,
'targetname': 'foo.bar.com.',
'svcparams': {
'ipv6hint': ['2001:db8:43::1', 'notanip']
},
},
},
)
self.assertEqual(
['ipv6hint "notanip" is not a valid IPv6 address'],
ctx.exception.reasons,
)
# mandatory
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 1,
'targetname': 'foo.bar.com.',
'svcparams': {
'mandatory': ['ipv4hint', 'unknown', 'key4444']
},
},
},
)
self.assertEqual(
['unsupported SvcParam "unknown" in mandatory'],
ctx.exception.reasons,
)
# ech
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 1,
'targetname': 'foo.bar.com.',
'svcparams': {'ech': ' dG90YWxseUZha2VFQ0hPcHRpb24'},
},
},
)
self.assertEqual(
['ech SvcParam is invalid Base64'], ctx.exception.reasons
)
# broken keyNNNN format
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 1,
'targetname': 'foo.bar.com.',
'svcparams': {
'key100000': 'foo',
'key3333': 'bar',
'keyXXX': 'foo',
},
},
},
)
self.assertEqual(
[
'SvcParam key "key100000" has wrong key number',
'SvcParam key "keyXXX" has wrong format',
],
ctx.exception.reasons,
)

+ 11
- 0
tests/zones/unit.tests.tst View File

@ -55,3 +55,14 @@ aaaa 600 IN AAAA 2601:644:500:e210:62f8:1dff:feb8:947a
; CNAME Records
cname 300 IN CNAME unit.tests.
included 300 IN CNAME unit.tests.
; SVCB and HTTPS records
svcb-alias 300 IN SVCB 0 alias-target.unit.test.
svcb-service 300 IN SVCB 1 . ipv4hint=192.0.2.4 port=9000
svcb-service 300 IN SVCB 2 svcb-target.unit.test. port=9001
https-alias 300 IN HTTPS 0 alias-target.unit.test.
https-service 300 IN HTTPS 1 . ipv4hint=192.0.2.4 port=9000
https-service 300 IN HTTPS 2 svcb-target.unit.test. port=9001

Loading…
Cancel
Save