Browse Source

Add support for SVCB and HTTPS records

These records are defined in [RFC 9460](https://datatracker.ietf.org/doc/html/rfc9460)
and are used for "Service Binding" and provide clients with all
connection information for network services.

Modern browsers already query for these records and several larger
websites have [adopted usage of HTTPS
records](https://blog.apnic.net/2023/12/18/use-of-https-resource-records/)
already.
pull/1176/head
Pieter Lexis 2 years ago
parent
commit
f02f94e4ad
No known key found for this signature in database GPG Key ID: B6ED640F21BF69E3
3 changed files with 849 additions and 0 deletions
  1. +251
    -0
      octodns/record/svcb.py
  2. +587
    -0
      tests/test_octodns_record_svcb.py
  3. +11
    -0
      tests/zones/unit.tests.tst

+ 251
- 0
octodns/record/svcb.py View File

@ -0,0 +1,251 @@
#
# This file describes the SVCB and HTTPS records as defined in RFC 9460
# It also supports the 'ech' SvcParam as defined in draft-ietf-tls-svcb-ech-02
#
from base64 import b64decode
from binascii import Error as binascii_error
from ipaddress import AddressValueError, IPv4Address, IPv6Address
from fqdn import FQDN
from ..equality import EqualityTupleMixin
from ..idna import idna_encode
from .base import Record, ValuesMixin, unquote
from .chunked import _ChunkedValue
from .rr import RrParseError
SUPPORTED_PARAMS = {}
def validate_svcparam_port(svcparamvalue):
reasons = []
try:
port = int(svcparamvalue)
if 0 < port > 65535:
reasons.append(f'port {port} is not a valid number')
except ValueError:
reasons.append('port is not a number')
return reasons
def validate_svcparam_alpn(svcparamvalue):
reasons = []
alpns = svcparamvalue.split(',')
for alpn in alpns:
reasons += _ChunkedValue.validate(alpn, 'SVCB')
return reasons
def validate_svcparam_iphint(ip_version, svcparamvalue):
reasons = []
addresses = svcparamvalue.split(',')
for address in addresses:
try:
if ip_version == 4:
IPv4Address(address)
if ip_version == 6:
IPv6Address(address)
except AddressValueError:
reasons.append(
f'ip{ip_version}hint "{address}" is not a valid IPv{ip_version} address'
)
return reasons
def validate_svcparam_ip4hint(svcparamvalue):
return validate_svcparam_iphint(4, svcparamvalue)
def validate_svcparam_ip6hint(svcparamvalue):
return validate_svcparam_iphint(6, svcparamvalue)
def validate_svcparam_mandatory(svcparamvalue):
reasons = []
mandatories = svcparamvalue.split(',')
for mandatory in mandatories:
if (
mandatory not in SUPPORTED_PARAMS.keys()
and not mandatory.startswith('key')
):
reasons.append(f'unsupported SvcParam "{mandatory}" in mandatory')
if mandatory.startswith('key'):
reasons += validate_svckey_number(mandatory)
return reasons
def validate_svcparam_ech(svcparamvalue):
try:
b64decode(svcparamvalue, validate=True)
except binascii_error:
return ['ech SvcParam is invalid Base64']
def validate_svckey_number(paramkey):
try:
paramkeynum = int(paramkey[3:])
if 7 < paramkeynum > 65535:
return [f'SvcParam key "{paramkey}" has wrong key number']
except ValueError:
return [f'SvcParam key "{paramkey}" has wrong format']
return []
SUPPORTED_PARAMS = {
'no-default-alpn': {'has_arg': False},
'alpn': {'validate': validate_svcparam_alpn},
'port': {'validate': validate_svcparam_port},
'ipv4hint': {'validate': validate_svcparam_ip4hint},
'ipv6hint': {'validate': validate_svcparam_ip6hint},
'mandatory': {'validate': validate_svcparam_mandatory},
'ech': {'validate': validate_svcparam_ech},
}
class SvcbValue(EqualityTupleMixin, dict):
@classmethod
def parse_rdata_text(cls, value):
try:
# XXX: these are called SvcPriority, TargetName, and SvcParams in RFC 9460 section 2.
# Should we mirror these names, or are priority, target and params good enough?
# XXX: Should we split params into the specific ParamKeys and ParamValues?
(priority, target, *params) = value.split(' ')
except ValueError:
raise RrParseError()
try:
priority = int(priority)
except ValueError:
pass
target = unquote(target)
return {'priority': priority, 'target': target, 'params': params}
@classmethod
def validate(cls, data, _):
reasons = []
for value in data:
priority = -1
if 'priority' not in value:
reasons.append('missing priority')
else:
try:
priority = int(value.get('priority', 0))
if priority < 0 or priority > 65535:
reasons.append(f'invalid priority ' f'"{priority}"')
except ValueError:
reasons.append(
f'invalid priority ' f'"{value["priority"]}"'
)
if 'target' not in value or value['target'] == '':
reasons.append('missing target')
else:
target = str(value.get('target', ''))
target = idna_encode(target)
if not target.endswith('.'):
reasons.append(f'SVCB value "{target}" missing trailing .')
if target != '.' and not FQDN(target).is_valid:
reasons.append(
f'Invalid SVCB target "{target}" is not a valid FQDN.'
)
if 'params' in value:
params = value.get('params', list())
if priority == 0 and len(params) != 0:
reasons.append('params set on AliasMode SVCB record')
for param in params:
# XXX: Should we test for keys existing when set in 'mandatory'?
paramkey, *paramvalue = param.split('=')
if paramkey.startswith('key'):
reasons += validate_svckey_number(paramkey)
continue
if (
paramkey not in SUPPORTED_PARAMS.keys()
and not paramkey.startswith('key')
):
reasons.append(f'Unknown SvcParam {paramkey}')
continue
if SUPPORTED_PARAMS[paramkey].get('has_arg', True):
reasons += SUPPORTED_PARAMS[paramkey]['validate'](
paramvalue[0]
)
if (
not SUPPORTED_PARAMS[paramkey].get('has_arg', True)
and len(paramvalue) != 0
):
reasons.append(
f'SvcParam {paramkey} has value when it should not'
)
return reasons
@classmethod
def process(cls, values):
return [cls(v) for v in values]
def __init__(self, value):
super().__init__(
{
'priority': int(value['priority']),
'target': idna_encode(value['target']),
'params': value.get('params', list()),
}
)
@property
def priority(self):
return self['priority']
@priority.setter
def priority(self, value):
self['priority'] = value
@property
def target(self):
return self['target']
@target.setter
def target(self, value):
self['target'] = value
@property
def params(self):
return self['params']
@params.setter
def params(self, value):
self['params'] = value
@property
def rdata_text(self):
params = ''
if len(self.params) != 0:
params = f' {" ".join(self.params)}'
return f'{self.priority} {self.target}{params}'
def __hash__(self):
return hash(self.__repr__())
def _equality_tuple(self):
return (self.priority, self.target, self.params)
def __repr__(self):
params = ''
if len(self.params) != 0:
params = f' {" ".join(self.params)}'
return f"'{self.priority} {self.target}{params}'"
class SvcbRecord(ValuesMixin, Record):
_type = 'SVCB'
_value_type = SvcbValue
class HttpsRecord(ValuesMixin, Record):
_type = 'HTTPS'
_value_type = SvcbValue
Record.register_type(SvcbRecord)
Record.register_type(HttpsRecord)

+ 587
- 0
tests/test_octodns_record_svcb.py View File

@ -0,0 +1,587 @@
#
#
#
from unittest import TestCase
from helpers import SimpleProvider
from octodns.record import Record
from octodns.record.exception import ValidationError
from octodns.record.rr import RrParseError
from octodns.record.svcb import SvcbRecord, SvcbValue
from octodns.zone import Zone
class TestRecordSvcb(TestCase):
zone = Zone('unit.tests.', [])
def test_svcb(self):
aliasmode_value = SvcbValue(
{'priority': 0, 'target': 'foo.example.com.'}
)
aliasmode_data = {'ttl': 300, 'value': aliasmode_value}
a = SvcbRecord(self.zone, 'alias', aliasmode_data)
self.assertEqual('alias', a.name)
self.assertEqual('alias.unit.tests.', a.fqdn)
self.assertEqual(300, a.ttl)
self.assertEqual(aliasmode_value['priority'], a.values[0].priority)
self.assertEqual(aliasmode_value['target'], a.values[0].target)
self.assertEqual(aliasmode_value['params'], a.values[0].params)
self.assertEqual(aliasmode_data, a.data)
servicemode_values = [
SvcbValue(
{
'priority': 1,
'target': 'foo.example.com.',
'params': 'port=8002',
}
),
SvcbValue(
{
'priority': 2,
'target': 'foo.example.net.',
'params': 'port=8080',
}
),
]
servicemode_data = {'ttl': 300, 'values': servicemode_values}
b = SvcbRecord(self.zone, 'service', servicemode_data)
self.assertEqual('service', b.name)
self.assertEqual('service.unit.tests.', b.fqdn)
self.assertEqual(300, b.ttl)
self.assertEqual(
servicemode_values[0]['priority'], b.values[0].priority
)
self.assertEqual(servicemode_values[0]['target'], b.values[0].target)
self.assertEqual(servicemode_values[0]['params'], b.values[0].params)
self.assertEqual(
servicemode_values[1]['priority'], b.values[1].priority
)
self.assertEqual(servicemode_values[1]['target'], b.values[1].target)
self.assertEqual(servicemode_values[1]['params'], b.values[1].params)
self.assertEqual(servicemode_data, b.data)
target = SimpleProvider()
# No changes with self
self.assertFalse(b.changes(b, target))
# Diff in priority causes change
other = SvcbRecord(
self.zone, 'service2', {'ttl': 30, 'values': servicemode_values}
)
other.values[0].priority = 22
change = b.changes(other, target)
self.assertEqual(change.existing, b)
self.assertEqual(change.new, other)
# Diff in target causes change
other.values[0].priority = b.values[0].priority
other.values[0].target = 'blabla.example.com'
change = b.changes(other, target)
self.assertEqual(change.existing, b)
self.assertEqual(change.new, other)
# Diff in params causes change
other.values[0].target = b.values[0].target
other.values[0].params = 'port=8888'
change = b.changes(other, target)
self.assertEqual(change.existing, b)
self.assertEqual(change.new, other)
# __repr__ doesn't blow up
a.__repr__()
b.__repr__()
def test_svcb_value_rdata_text(self):
# empty string won't parse
with self.assertRaises(RrParseError):
SvcbValue.parse_rdata_text('')
# single word won't parse
with self.assertRaises(RrParseError):
SvcbValue.parse_rdata_text('nope')
# priority not int
self.assertEqual(
{'priority': 'one', 'target': 'foo.example.com', 'params': list()},
SvcbValue.parse_rdata_text('one foo.example.com'),
)
# valid with params
self.assertEqual(
{
'priority': 1,
'target': 'svcb.unit.tests.',
'params': ['port=8080'],
},
SvcbValue.parse_rdata_text('1 svcb.unit.tests. port=8080'),
)
# quoted target
self.assertEqual(
{'priority': 1, 'target': 'svcb.unit.tests.', 'params': list()},
SvcbValue.parse_rdata_text('1 "svcb.unit.tests."'),
)
zone = Zone('unit.tests.', [])
a = SvcbRecord(
zone,
'svc',
{
'ttl': 32,
'value': {
'priority': 1,
'target': 'svcb.unit.tests.',
'params': ['port=8080'],
},
},
)
self.assertEqual(1, a.values[0].priority)
self.assertEqual('svcb.unit.tests.', a.values[0].target)
self.assertEqual(['port=8080'], a.values[0].params)
# both directions should match
rdata = '1 svcb.unit.tests. port=8080'
record = SvcbRecord(
zone, 'svc', {'ttl': 32, 'value': SvcbValue.parse_rdata_text(rdata)}
)
self.assertEqual(rdata, record.values[0].rdata_text)
# both directions should match
rdata = '0 svcb.unit.tests.'
record = SvcbRecord(
zone, 'svc', {'ttl': 32, 'value': SvcbValue.parse_rdata_text(rdata)}
)
self.assertEqual(rdata, record.values[0].rdata_text)
def test_svcb_value(self):
a = SvcbValue({'priority': 0, 'target': 'foo.', 'params': list()})
b = SvcbValue({'priority': 1, 'target': 'foo.', 'params': list()})
c = SvcbValue(
{'priority': 0, 'target': 'foo.', 'params': ['port=8080']}
)
d = SvcbValue(
{
'priority': 0,
'target': 'foo.',
'params': ['alpn=h2,h3', 'port=8080'],
}
)
e = SvcbValue(
{'priority': 0, 'target': 'mmm.', 'params': ['ipv4hint=192.0.2.1']}
)
self.assertEqual(a, a)
self.assertEqual(b, b)
self.assertEqual(c, c)
self.assertEqual(d, d)
self.assertEqual(e, e)
self.assertNotEqual(a, b)
self.assertNotEqual(a, c)
self.assertNotEqual(a, d)
self.assertNotEqual(a, e)
self.assertNotEqual(b, a)
self.assertNotEqual(b, c)
self.assertNotEqual(b, d)
self.assertNotEqual(b, e)
self.assertNotEqual(c, a)
self.assertNotEqual(c, b)
self.assertNotEqual(c, d)
self.assertNotEqual(c, e)
self.assertNotEqual(d, a)
self.assertNotEqual(d, b)
self.assertNotEqual(d, c)
self.assertNotEqual(d, e)
self.assertNotEqual(e, a)
self.assertNotEqual(e, b)
self.assertNotEqual(e, c)
self.assertNotEqual(e, d)
self.assertTrue(a < b)
self.assertTrue(a < c)
self.assertTrue(b > a)
self.assertTrue(b > c)
self.assertTrue(c > a)
self.assertTrue(c < b)
self.assertTrue(a <= b)
self.assertTrue(a <= c)
self.assertTrue(a <= a)
self.assertTrue(a >= a)
self.assertTrue(b >= a)
self.assertTrue(b >= c)
self.assertTrue(b >= b)
self.assertTrue(b <= b)
self.assertTrue(c >= a)
self.assertTrue(c <= b)
self.assertTrue(c >= c)
self.assertTrue(c <= c)
# Hash
values = set()
values.add(a)
self.assertTrue(a in values)
self.assertFalse(b in values)
values.add(b)
self.assertTrue(b in values)
def test_validation(self):
# doesn't blow up
Record.new(
self.zone,
'svcb',
{
'type': 'SVCB',
'ttl': 600,
'value': {'priority': 1, 'target': 'foo.bar.baz.'},
},
)
# Wildcards are fine
Record.new(
self.zone,
'*',
{
'type': 'SVCB',
'ttl': 600,
'value': {'priority': 1, 'target': 'foo.bar.baz.'},
},
)
# missing priority
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {'target': 'foo.bar.baz.'},
},
)
self.assertEqual(['missing priority'], ctx.exception.reasons)
# invalid priority
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {'priority': 'foo', 'target': 'foo.bar.baz.'},
},
)
self.assertEqual(['invalid priority "foo"'], ctx.exception.reasons)
# invalid priority (out of range)
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {'priority': 100000, 'target': 'foo.bar.baz.'},
},
)
self.assertEqual(['invalid priority "100000"'], ctx.exception.reasons)
# missing target
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{'type': 'SVCB', 'ttl': 600, 'value': {'priority': 1}},
)
self.assertEqual(['missing target'], ctx.exception.reasons)
# invalid target
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {'priority': 1, 'target': 'foo.bar.baz'},
},
)
self.assertEqual(
['SVCB value "foo.bar.baz" missing trailing .'],
ctx.exception.reasons,
)
# falsey target
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {'priority': 1, 'target': ''},
},
)
self.assertEqual(['missing target'], ctx.exception.reasons)
# target must be a valid FQDN
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {'priority': 1, 'target': 'bla foo.bar.com.'},
},
)
self.assertEqual(
['Invalid SVCB target "bla foo.bar.com." is not a valid FQDN.'],
ctx.exception.reasons,
)
# target can be root label
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {'priority': 1, 'target': '.'},
},
)
# Params can't be set for AliasMode
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'priority': 0,
'target': 'foo.bar.com.',
'params': ['port=8000'],
},
},
)
self.assertEqual(
['params set on AliasMode SVCB record'], ctx.exception.reasons
)
# Unknown param
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'priority': 1,
'target': 'foo.bar.com.',
'params': ['blablabla=222'],
},
},
)
self.assertEqual(['Unknown SvcParam blablabla'], ctx.exception.reasons)
# Port number invalid
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'priority': 1,
'target': 'foo.bar.com.',
'params': ['port=100000'],
},
},
)
self.assertEqual(
['port 100000 is not a valid number'], ctx.exception.reasons
)
# Port number not an int
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'priority': 1,
'target': 'foo.bar.com.',
'params': ['port=foo'],
},
},
)
self.assertEqual(['port is not a number'], ctx.exception.reasons)
# no-default-alpn set
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'priority': 1,
'target': 'foo.bar.com.',
'params': ['no-default-alpn'],
},
},
)
# no-default-alpn has value
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'priority': 1,
'target': 'foo.bar.com.',
'params': ['no-default-alpn=foobar'],
},
},
)
self.assertEqual(
['SvcParam no-default-alpn has value when it should not'],
ctx.exception.reasons,
)
# alpn is broken
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'priority': 1,
'target': 'foo.bar.com.',
'params': ['alpn=h2,😅'],
},
},
)
self.assertEqual(['non ASCII character in "😅"'], ctx.exception.reasons)
# ipv4hint
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'priority': 1,
'target': 'foo.bar.com.',
'params': ['ipv4hint=192.0.2.0,500.500.30.30'],
},
},
)
self.assertEqual(
['ip4hint "500.500.30.30" is not a valid IPv4 address'],
ctx.exception.reasons,
)
# ipv6hint
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'priority': 1,
'target': 'foo.bar.com.',
'params': ['ipv6hint=2001:db8:43::1,notanip'],
},
},
)
self.assertEqual(
['ip6hint "notanip" is not a valid IPv6 address'],
ctx.exception.reasons,
)
# mandatory
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'priority': 1,
'target': 'foo.bar.com.',
'params': ['mandatory=ipv4hint,unknown,key4444'],
},
},
)
self.assertEqual(
['unsupported SvcParam "unknown" in mandatory'],
ctx.exception.reasons,
)
# ech
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'priority': 1,
'target': 'foo.bar.com.',
'params': ['ech=dG90YWxseUZha2VFQ0hPcHRpb24'],
},
},
)
self.assertEqual(
['ech SvcParam is invalid Base64'], ctx.exception.reasons
)
# broken keyNNNN format
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'foo',
{
'type': 'SVCB',
'ttl': 600,
'value': {
'priority': 1,
'target': 'foo.bar.com.',
'params': [
'key100000=foo',
'key3333=bar',
'keyXXX=foo',
],
},
},
)
self.assertEqual(
[
'SvcParam key "key100000" has wrong key number',
'SvcParam key "keyXXX" has wrong format',
],
ctx.exception.reasons,
)

+ 11
- 0
tests/zones/unit.tests.tst View File

@ -55,3 +55,14 @@ aaaa 600 IN AAAA 2601:644:500:e210:62f8:1dff:feb8:947a
; CNAME Records
cname 300 IN CNAME unit.tests.
included 300 IN CNAME unit.tests.
; SVCB and HTTPS records
svcb-alias 300 IN SVCB 0 alias-target.unit.test.
svcb-service 300 IN SVCB 1 . ipv4hint=192.0.2.4 port=9000
svcb-service 300 IN SVCB 2 svcb-target.unit.test. port=9001
https-alias 300 IN HTTPS 0 alias-target.unit.test.
https-service 300 IN HTTPS 1 . ipv4hint=192.0.2.4 port=9000
https-service 300 IN HTTPS 2 svcb-target.unit.test. port=9001

Loading…
Cancel
Save