Browse Source

Merge pull request #1176 from pieterlexis-tomtom/SVCB-HTTPS

Add support for SVCB and HTTPS records
pull/1179/head
Ross McFarland 2 years ago
committed by GitHub
parent
commit
3455a7af44
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
4 changed files with 994 additions and 0 deletions
  1. +1
    -0
      CHANGELOG.md
  2. +314
    -0
      octodns/record/svcb.py
  3. +668
    -0
      tests/test_octodns_record_svcb.py
  4. +11
    -0
      tests/zones/unit.tests.tst

+ 1
- 0
CHANGELOG.md View File

@ -1,5 +1,6 @@
## v1.?.? - 2024-??-?? - ???
* Add support for SVCB and HTTPS records
* Allow DS records to be specified for managed sub-zones, same as NS
* Fix CAA rdata parsing to allow values with tags


+ 314
- 0
octodns/record/svcb.py View File

@ -0,0 +1,314 @@
#
# This file describes the SVCB and HTTPS records as defined in RFC 9460
# It also supports the 'ech' SvcParam as defined in draft-ietf-tls-svcb-ech-02
#
from base64 import b64decode
from binascii import Error as binascii_error
from ipaddress import AddressValueError, IPv4Address, IPv6Address
from fqdn import FQDN
from ..equality import EqualityTupleMixin
from ..idna import idna_encode
from .base import Record, ValuesMixin, unquote
from .chunked import _ChunkedValue
from .rr import RrParseError
SUPPORTED_PARAMS = {}
def validate_svcparam_port(svcparamvalue):
reasons = []
try:
port = int(svcparamvalue)
if 0 < port > 65535:
reasons.append(f'port {port} is not a valid number')
except ValueError:
reasons.append('port is not a number')
return reasons
def validate_list(svcparamkey, svcparamvalue):
if not isinstance(svcparamvalue, list):
return [f'{svcparamkey} is not a list']
return list()
def validate_svcparam_alpn(svcparamvalue):
reasons = validate_list('alpn', svcparamvalue)
if len(reasons) != 0:
return reasons
for alpn in svcparamvalue:
reasons += _ChunkedValue.validate(alpn, 'SVCB')
return reasons
def validate_svcparam_iphint(ip_version, svcparamvalue):
reasons = validate_list(f'ipv{ip_version}hint', svcparamvalue)
if len(reasons) != 0:
return reasons
for address in svcparamvalue:
try:
if ip_version == 4:
IPv4Address(address)
if ip_version == 6:
IPv6Address(address)
except AddressValueError:
reasons.append(
f'ipv{ip_version}hint "{address}" is not a valid IPv{ip_version} address'
)
return reasons
def validate_svcparam_ipv4hint(svcparamvalue):
return validate_svcparam_iphint(4, svcparamvalue)
def validate_svcparam_ipv6hint(svcparamvalue):
return validate_svcparam_iphint(6, svcparamvalue)
def validate_svcparam_mandatory(svcparamvalue):
reasons = validate_list('mandatory', svcparamvalue)
if len(reasons) != 0:
return reasons
for mandatory in svcparamvalue:
if (
mandatory not in SUPPORTED_PARAMS.keys()
and not mandatory.startswith('key')
):
reasons.append(f'unsupported SvcParam "{mandatory}" in mandatory')
if mandatory.startswith('key'):
reasons += validate_svckey_number(mandatory)
return reasons
def validate_svcparam_ech(svcparamvalue):
try:
b64decode(svcparamvalue, validate=True)
except binascii_error:
return ['ech SvcParam is invalid Base64']
def validate_svckey_number(paramkey):
try:
paramkeynum = int(paramkey[3:])
if 7 < paramkeynum > 65535:
return [f'SvcParam key "{paramkey}" has wrong key number']
except ValueError:
return [f'SvcParam key "{paramkey}" has wrong format']
return []
def parse_rdata_text_svcparamvalue_list(svcparamvalue):
return svcparamvalue.split(',')
def svcparamkeysort(svcparamkey):
if svcparamkey.startswith('key'):
return int(svcparamkey[3:])
return SUPPORTED_PARAMS[svcparamkey]['key_num']
# cc https://datatracker.ietf.org/doc/html/rfc9460#keys
SUPPORTED_PARAMS = {
'no-default-alpn': {'key_num': 2, 'has_arg': False},
'alpn': {
'key_num': 1,
'validate': validate_svcparam_alpn,
'parse_rdata_text': parse_rdata_text_svcparamvalue_list,
},
'port': {'key_num': 3, 'validate': validate_svcparam_port},
'ipv4hint': {
'key_num': 4,
'validate': validate_svcparam_ipv4hint,
'parse_rdata_text': parse_rdata_text_svcparamvalue_list,
},
'ipv6hint': {
'key_num': 6,
'validate': validate_svcparam_ipv6hint,
'parse_rdata_text': parse_rdata_text_svcparamvalue_list,
},
'mandatory': {
'key_num': 0,
'validate': validate_svcparam_mandatory,
'parse_rdata_text': parse_rdata_text_svcparamvalue_list,
},
'ech': {'key_num': 5, 'validate': validate_svcparam_ech},
}
class SvcbValue(EqualityTupleMixin, dict):
@classmethod
def parse_rdata_text(cls, value):
try:
(svcpriority, targetname, *svcparams) = value.split(' ')
except ValueError:
raise RrParseError()
try:
svcpriority = int(svcpriority)
except ValueError:
pass
targetname = unquote(targetname)
params = dict()
for svcparam in svcparams:
paramkey, *paramvalue = svcparam.split('=')
if paramkey in params.keys():
raise RrParseError(f'{paramkey} is specified twice')
if len(paramvalue) != 0:
params[paramkey] = paramvalue[0]
parse_rdata_text = SUPPORTED_PARAMS.get(paramkey, {}).get(
'parse_rdata_text', None
)
if parse_rdata_text is not None:
params[paramkey] = parse_rdata_text(paramvalue[0])
else:
params[paramkey] = None
return {
'svcpriority': svcpriority,
'targetname': targetname,
'svcparams': params,
}
@classmethod
def validate(cls, data, _):
reasons = []
for value in data:
svcpriority = -1
if 'svcpriority' not in value:
reasons.append('missing svcpriority')
else:
try:
svcpriority = int(value.get('svcpriority', 0))
if svcpriority < 0 or svcpriority > 65535:
reasons.append(f'invalid priority ' f'"{svcpriority}"')
except ValueError:
reasons.append(f'invalid priority "{value["svcpriority"]}"')
if 'targetname' not in value or value['targetname'] == '':
reasons.append('missing targetname')
else:
targetname = str(value.get('targetname', ''))
targetname = idna_encode(targetname)
if not targetname.endswith('.'):
reasons.append(
f'SVCB value "{targetname}" missing trailing .'
)
if targetname != '.' and not FQDN(targetname).is_valid:
reasons.append(
f'Invalid SVCB target "{targetname}" is not a valid FQDN.'
)
if 'svcparams' in value:
svcparams = value.get('svcparams', dict())
if svcpriority == 0 and len(svcparams) != 0:
reasons.append('svcparams set on AliasMode SVCB record')
for svcparamkey, svcparamvalue in svcparams.items():
# XXX: Should we test for keys existing when set in 'mandatory'?
if svcparamkey.startswith('key'):
reasons += validate_svckey_number(svcparamkey)
continue
if (
svcparamkey not in SUPPORTED_PARAMS.keys()
and not svcparamkey.startswith('key')
):
reasons.append(f'Unknown SvcParam {svcparamkey}')
continue
if SUPPORTED_PARAMS[svcparamkey].get('has_arg', True):
reasons += SUPPORTED_PARAMS[svcparamkey]['validate'](
svcparamvalue
)
if (
not SUPPORTED_PARAMS[svcparamkey].get('has_arg', True)
and svcparamvalue is not None
):
reasons.append(
f'SvcParam {svcparamkey} has value when it should not'
)
return reasons
@classmethod
def process(cls, values):
return [cls(v) for v in values]
def __init__(self, value):
super().__init__(
{
'svcpriority': int(value['svcpriority']),
'targetname': idna_encode(value['targetname']),
'svcparams': value.get('svcparams', dict()),
}
)
@property
def svcpriority(self):
return self['svcpriority']
@svcpriority.setter
def svcpriority(self, value):
self['svcpriority'] = value
@property
def targetname(self):
return self['targetname']
@targetname.setter
def targetname(self, value):
self['targetname'] = value
@property
def svcparams(self):
return self['svcparams']
@svcparams.setter
def svcparams(self, value):
self['svcparams'] = value
@property
def rdata_text(self):
params = ''
sorted_svcparamkeys = sorted(self.svcparams, key=svcparamkeysort)
for svcparamkey in sorted_svcparamkeys:
params += f' {svcparamkey}'
svcparamvalue = self.svcparams.get(svcparamkey, None)
if svcparamvalue is not None:
if isinstance(svcparamvalue, list):
params += f'={",".join(svcparamvalue)}'
else:
params += f'={svcparamvalue}'
return f'{self.svcpriority} {self.targetname}{params}'
def __hash__(self):
return hash(self.__repr__())
def _equality_tuple(self):
params = set()
for svcparamkey, svcparamvalue in self.svcparams.items():
if svcparamvalue is not None:
if isinstance(svcparamvalue, list):
params.add(f'{svcparamkey}={",".join(svcparamvalue)}')
else:
params.add(f'{svcparamkey}={svcparamvalue}')
else:
params.add(f'{svcparamkey}')
return (self.svcpriority, self.targetname, params)
def __repr__(self):
return f"'{self.rdata_text}'"
class SvcbRecord(ValuesMixin, Record):
_type = 'SVCB'
_value_type = SvcbValue
class HttpsRecord(ValuesMixin, Record):
_type = 'HTTPS'
_value_type = SvcbValue
Record.register_type(SvcbRecord)
Record.register_type(HttpsRecord)

+ 668
- 0
tests/test_octodns_record_svcb.py View File

@ -0,0 +1,668 @@
#
#
#
from unittest import TestCase
from helpers import SimpleProvider
from octodns.record import Record
from octodns.record.exception import ValidationError
from octodns.record.rr import RrParseError
from octodns.record.svcb import SvcbRecord, SvcbValue
from octodns.zone import Zone
class TestRecordSvcb(TestCase):
zone = Zone('unit.tests.', [])
def test_svcb(self):
aliasmode_value = SvcbValue(
{'svcpriority': 0, 'targetname': 'foo.example.com.'}
)
aliasmode_data = {'ttl': 300, 'value': aliasmode_value}
a = SvcbRecord(self.zone, 'alias', aliasmode_data)
self.assertEqual('alias', a.name)
self.assertEqual('alias.unit.tests.', a.fqdn)
self.assertEqual(300, a.ttl)
self.assertEqual(
aliasmode_value['svcpriority'], a.values[0].svcpriority
)
self.assertEqual(aliasmode_value['targetname'], a.values[0].targetname)
self.assertEqual(aliasmode_value['svcparams'], a.values[0].svcparams)
self.assertEqual(aliasmode_data, a.data)
servicemode_values = [
SvcbValue(
{
'svcpriority': 1,
'targetname': 'foo.example.com.',
'svcparams': {'port': 8002},
}
),
SvcbValue(
{
'svcpriority': 2,
'targetname': 'foo.example.net.',
'svcparams': {'port': 8080},
}
),
]
servicemode_data = {'ttl': 300, 'values': servicemode_values}
b = SvcbRecord(self.zone, 'service', servicemode_data)
self.assertEqual('service', b.name)
self.assertEqual('service.unit.tests.', b.fqdn)
self.assertEqual(300, b.ttl)
self.assertEqual(
servicemode_values[0]['svcpriority'], b.values[0].svcpriority
)
self.assertEqual(
servicemode_values[0]['targetname'], b.values[0].targetname
)
self.assertEqual(
servicemode_values[0]['svcparams'], b.values[0].svcparams
)
self.assertEqual(
servicemode_values[1]['svcpriority'], b.values[1].svcpriority
)
self.assertEqual(
servicemode_values[1]['targetname'], b.values[1].targetname
)
self.assertEqual(
servicemode_values[1]['svcparams'], b.values[1].svcparams
)
self.assertEqual(servicemode_data, b.data)
target = SimpleProvider()
# No changes with self
self.assertFalse(b.changes(b, target))
# Diff in priority causes change
other = SvcbRecord(
self.zone, 'service2', {'ttl': 30, 'values': servicemode_values}
)
other.values[0].svcpriority = 22
change = b.changes(other, target)
self.assertEqual(change.existing, b)
self.assertEqual(change.new, other)
# Diff in target causes change
other.values[0].svcpriority = b.values[0].svcpriority
other.values[0].targetname = 'blabla.example.com.'
change = b.changes(other, target)
self.assertEqual(change.existing, b)
self.assertEqual(change.new, other)
# Diff in params causes change
other.values[0].targetname = b.values[0].targetname
other.values[0].svcparams = {'port': '8888'}
change = b.changes(other, target)
self.assertEqual(change.existing, b)
self.assertEqual(change.new, other)
# __repr__ doesn't blow up
a.__repr__()
b.__repr__()
def test_svcb_value_rdata_text(self):
# empty string won't parse
with self.assertRaises(RrParseError):
SvcbValue.parse_rdata_text('')
# single word won't parse
with self.assertRaises(RrParseError):
SvcbValue.parse_rdata_text('nope')
# Double keys are not allowed
with self.assertRaises(RrParseError):
SvcbValue.parse_rdata_text('1 foo.example.com port=8080 port=8084')
# priority not int
self.assertEqual(
{
'svcpriority': 'one',
'targetname': 'foo.example.com',
'svcparams': dict(),
},
SvcbValue.parse_rdata_text('one foo.example.com'),
)
# valid with params
self.assertEqual(
{
'svcpriority': 1,
'targetname': 'svcb.unit.tests.',
'svcparams': {'port': '8080', 'no-default-alpn': None},
},
SvcbValue.parse_rdata_text(
'1 svcb.unit.tests. port=8080 no-default-alpn'
),
)
# quoted target
self.assertEqual(
{
'svcpriority': 1,
'targetname': 'svcb.unit.tests.',
'svcparams': dict(),
},
SvcbValue.parse_rdata_text('1 "svcb.unit.tests."'),
)
zone = Zone('unit.tests.', [])
a = SvcbRecord(
zone,
'svc',
{
'ttl': 32,
'value': {
'svcpriority': 1,
'targetname': 'svcb.unit.tests.',
'svcparams': {'port': '8080'},
},
},
)
self.assertEqual(1, a.values[0].svcpriority)
self.assertEqual('svcb.unit.tests.', a.values[0].targetname)
self.assertEqual({'port': '8080'}, a.values[0].svcparams)
# both directions should match
rdata = '1 svcb.unit.tests. no-default-alpn port=8080 ipv4hint=192.0.2.2,192.0.2.53 key3333=foobar'
record = SvcbRecord(
zone, 'svc', {'ttl': 32, 'value': SvcbValue.parse_rdata_text(rdata)}
)
self.assertEqual(rdata, record.values[0].rdata_text)
# both directions should match
rdata = '0 svcb.unit.tests.'
record = SvcbRecord(
zone, 'svc', {'ttl': 32, 'value': SvcbValue.parse_rdata_text(rdata)}
)
self.assertEqual(rdata, record.values[0].rdata_text)
def test_svcb_value(self):
a = SvcbValue(
{'svcpriority': 0, 'targetname': 'foo.', 'svcparams': dict()}
)
b = SvcbValue(
{'svcpriority': 1, 'targetname': 'foo.', 'svcparams': dict()}
)
c = SvcbValue(
{
'svcpriority': 0,
'targetname': 'foo.',
'svcparams': {'port': 8080, 'no-default-alpn': None},
}
)
d = SvcbValue(
{
'svcpriority': 0,
'targetname': 'foo.',
'svcparams': {'alpn': ['h2', 'h3'], 'port': 8080},
}
)
e = SvcbValue(
{
'svcpriority': 0,
'targetname': 'mmm.',
'svcparams': {'ipv4hint': ['192.0.2.1']},
}
)
self.assertEqual(a, a)
self.assertEqual(b, b)
self.assertEqual(c, c)
self.assertEqual(d, d)
self.assertEqual(e, e)
self.assertNotEqual(a, b)
self.assertNotEqual(a, c)
self.assertNotEqual(a, d)
self.assertNotEqual(a, e)
self.assertNotEqual(b, a)
self.assertNotEqual(b, c)
self.assertNotEqual(b, d)
self.assertNotEqual(b, e)
self.assertNotEqual(c, a)
self.assertNotEqual(c, b)
self.assertNotEqual(c, d)
self.assertNotEqual(c, e)
self.assertNotEqual(d, a)
self.assertNotEqual(d, b)
self.assertNotEqual(d, c)
self.assertNotEqual(d, e)
self.assertNotEqual(e, a)
self.assertNotEqual(e, b)
self.assertNotEqual(e, c)
self.assertNotEqual(e, d)
self.assertTrue(a < b)
self.assertTrue(a < c)
self.assertTrue(b > a)
self.assertTrue(b > c)
self.assertTrue(c > a)
self.assertTrue(c < b)
self.assertTrue(a <= b)
self.assertTrue(a <= c)
self.assertTrue(a <= a)
self.assertTrue(a >= a)
self.assertTrue(b >= a)
self.assertTrue(b >= c)
self.assertTrue(b >= b)
self.assertTrue(b <= b)
self.assertTrue(c >= a)
self.assertTrue(c <= b)
self.assertTrue(c >= c)
self.assertTrue(c <= c)
# Hash
values = set()
values.add(a)
self.assertTrue(a in values)
self.assertFalse(b in values)
values.add(b)
self.assertTrue(b in values)
def test_validation(self):
# doesn't blow up
Record.new(
self.zone,
'svcb',
{
'type': 'SVCB',
'ttl': 600,
'value': {'svcpriority': 1, 'targetname': 'foo.bar.baz.'},
},
)
# Wildcards are fine
Record.new(
self.zone,
'*',
{
'type': 'SVCB',
'ttl': 600,
'value': {'svcpriority': 1, 'targetname': 'foo.bar.baz.'},
},
)
# missing priority
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {'targetname': 'foo.bar.baz.'},
},
)
self.assertEqual(['missing svcpriority'], ctx.exception.reasons)
# invalid priority
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 'foo',
'targetname': 'foo.bar.baz.',
},
},
)
self.assertEqual(['invalid priority "foo"'], ctx.exception.reasons)
# invalid priority (out of range)
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 100000,
'targetname': 'foo.bar.baz.',
},
},
)
self.assertEqual(['invalid priority "100000"'], ctx.exception.reasons)
# missing target
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{'type': 'SVCB', 'ttl': 600, 'value': {'svcpriority': 1}},
)
self.assertEqual(['missing targetname'], ctx.exception.reasons)
# invalid target
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {'svcpriority': 1, 'targetname': 'foo.bar.baz'},
},
)
self.assertEqual(
['SVCB value "foo.bar.baz" missing trailing .'],
ctx.exception.reasons,
)
# falsey target
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {'svcpriority': 1, 'targetname': ''},
},
)
self.assertEqual(['missing targetname'], ctx.exception.reasons)
# target must be a valid FQDN
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 1,
'targetname': 'bla foo.bar.com.',
},
},
)
self.assertEqual(
['Invalid SVCB target "bla foo.bar.com." is not a valid FQDN.'],
ctx.exception.reasons,
)
# target can be root label
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {'svcpriority': 1, 'targetname': '.'},
},
)
# Params can't be set for AliasMode
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 0,
'targetname': 'foo.bar.com.',
'svcparams': {'port': '8000'},
},
},
)
self.assertEqual(
['svcparams set on AliasMode SVCB record'], ctx.exception.reasons
)
# Unknown param
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 1,
'targetname': 'foo.bar.com.',
'svcparams': {'blablabla': '222'},
},
},
)
self.assertEqual(['Unknown SvcParam blablabla'], ctx.exception.reasons)
# Port number invalid
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 1,
'targetname': 'foo.bar.com.',
'svcparams': {'port': 100000},
},
},
)
self.assertEqual(
['port 100000 is not a valid number'], ctx.exception.reasons
)
# Port number not an int
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 1,
'targetname': 'foo.bar.com.',
'svcparams': {'port': 'foo'},
},
},
)
self.assertEqual(['port is not a number'], ctx.exception.reasons)
# no-default-alpn set
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 1,
'targetname': 'foo.bar.com.',
'svcparams': {'no-default-alpn': None},
},
},
)
# no-default-alpn has value
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 1,
'targetname': 'foo.bar.com.',
'svcparams': {'no-default-alpn': 'foobar'},
},
},
)
self.assertEqual(
['SvcParam no-default-alpn has value when it should not'],
ctx.exception.reasons,
)
# alpn is broken
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 1,
'targetname': 'foo.bar.com.',
'svcparams': {'alpn': ['h2', '😅']},
},
},
)
self.assertEqual(['non ASCII character in "😅"'], ctx.exception.reasons)
# svcbvaluelist that is not a list
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 1,
'targetname': 'foo.bar.com.',
'svcparams': {
'ipv4hint': '192.0.2.1,192.0.2.2',
'ipv6hint': '2001:db8::1',
'mandatory': 'ipv6hint',
'alpn': 'h2,h3',
},
},
},
)
self.assertEqual(
[
'ipv4hint is not a list',
'ipv6hint is not a list',
'mandatory is not a list',
'alpn is not a list',
],
ctx.exception.reasons,
)
# ipv4hint
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 1,
'targetname': 'foo.bar.com.',
'svcparams': {
'ipv4hint': ['192.0.2.0', '500.500.30.30']
},
},
},
)
self.assertEqual(
['ipv4hint "500.500.30.30" is not a valid IPv4 address'],
ctx.exception.reasons,
)
# ipv6hint
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 1,
'targetname': 'foo.bar.com.',
'svcparams': {
'ipv6hint': ['2001:db8:43::1', 'notanip']
},
},
},
)
self.assertEqual(
['ipv6hint "notanip" is not a valid IPv6 address'],
ctx.exception.reasons,
)
# mandatory
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 1,
'targetname': 'foo.bar.com.',
'svcparams': {
'mandatory': ['ipv4hint', 'unknown', 'key4444']
},
},
},
)
self.assertEqual(
['unsupported SvcParam "unknown" in mandatory'],
ctx.exception.reasons,
)
# ech
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 1,
'targetname': 'foo.bar.com.',
'svcparams': {'ech': ' dG90YWxseUZha2VFQ0hPcHRpb24'},
},
},
)
self.assertEqual(
['ech SvcParam is invalid Base64'], ctx.exception.reasons
)
# broken keyNNNN format
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'svcpriority': 1,
'targetname': 'foo.bar.com.',
'svcparams': {
'key100000': 'foo',
'key3333': 'bar',
'keyXXX': 'foo',
},
},
},
)
self.assertEqual(
[
'SvcParam key "key100000" has wrong key number',
'SvcParam key "keyXXX" has wrong format',
],
ctx.exception.reasons,
)

+ 11
- 0
tests/zones/unit.tests.tst View File

@ -55,3 +55,14 @@ aaaa 600 IN AAAA 2601:644:500:e210:62f8:1dff:feb8:947a
; CNAME Records
cname 300 IN CNAME unit.tests.
included 300 IN CNAME unit.tests.
; SVCB and HTTPS records
svcb-alias 300 IN SVCB 0 alias-target.unit.test.
svcb-service 300 IN SVCB 1 . ipv4hint=192.0.2.4 port=9000
svcb-service 300 IN SVCB 2 svcb-target.unit.test. port=9001
https-alias 300 IN HTTPS 0 alias-target.unit.test.
https-service 300 IN HTTPS 1 . ipv4hint=192.0.2.4 port=9000
https-service 300 IN HTTPS 2 svcb-target.unit.test. port=9001

Loading…
Cancel
Save