diff --git a/CHANGELOG.md b/CHANGELOG.md index f727f41..c883a1b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## v1.?.? - 2024-??-?? - ??? +* Add support for SVCB and HTTPS records * Allow DS records to be specified for managed sub-zones, same as NS * Fix CAA rdata parsing to allow values with tags diff --git a/octodns/record/svcb.py b/octodns/record/svcb.py new file mode 100644 index 0000000..904c912 --- /dev/null +++ b/octodns/record/svcb.py @@ -0,0 +1,314 @@ +# +# This file describes the SVCB and HTTPS records as defined in RFC 9460 +# It also supports the 'ech' SvcParam as defined in draft-ietf-tls-svcb-ech-02 +# + +from base64 import b64decode +from binascii import Error as binascii_error +from ipaddress import AddressValueError, IPv4Address, IPv6Address + +from fqdn import FQDN + +from ..equality import EqualityTupleMixin +from ..idna import idna_encode +from .base import Record, ValuesMixin, unquote +from .chunked import _ChunkedValue +from .rr import RrParseError + +SUPPORTED_PARAMS = {} + + +def validate_svcparam_port(svcparamvalue): + reasons = [] + try: + port = int(svcparamvalue) + if 0 < port > 65535: + reasons.append(f'port {port} is not a valid number') + except ValueError: + reasons.append('port is not a number') + return reasons + + +def validate_list(svcparamkey, svcparamvalue): + if not isinstance(svcparamvalue, list): + return [f'{svcparamkey} is not a list'] + return list() + + +def validate_svcparam_alpn(svcparamvalue): + reasons = validate_list('alpn', svcparamvalue) + if len(reasons) != 0: + return reasons + for alpn in svcparamvalue: + reasons += _ChunkedValue.validate(alpn, 'SVCB') + return reasons + + +def validate_svcparam_iphint(ip_version, svcparamvalue): + reasons = validate_list(f'ipv{ip_version}hint', svcparamvalue) + if len(reasons) != 0: + return reasons + for address in svcparamvalue: + try: + if ip_version == 4: + IPv4Address(address) + if ip_version == 6: + IPv6Address(address) + except AddressValueError: + reasons.append( + f'ipv{ip_version}hint "{address}" is not a valid IPv{ip_version} address' + ) + return reasons + + +def validate_svcparam_ipv4hint(svcparamvalue): + return validate_svcparam_iphint(4, svcparamvalue) + + +def validate_svcparam_ipv6hint(svcparamvalue): + return validate_svcparam_iphint(6, svcparamvalue) + + +def validate_svcparam_mandatory(svcparamvalue): + reasons = validate_list('mandatory', svcparamvalue) + if len(reasons) != 0: + return reasons + for mandatory in svcparamvalue: + if ( + mandatory not in SUPPORTED_PARAMS.keys() + and not mandatory.startswith('key') + ): + reasons.append(f'unsupported SvcParam "{mandatory}" in mandatory') + if mandatory.startswith('key'): + reasons += validate_svckey_number(mandatory) + return reasons + + +def validate_svcparam_ech(svcparamvalue): + try: + b64decode(svcparamvalue, validate=True) + except binascii_error: + return ['ech SvcParam is invalid Base64'] + + +def validate_svckey_number(paramkey): + try: + paramkeynum = int(paramkey[3:]) + if 7 < paramkeynum > 65535: + return [f'SvcParam key "{paramkey}" has wrong key number'] + except ValueError: + return [f'SvcParam key "{paramkey}" has wrong format'] + return [] + + +def parse_rdata_text_svcparamvalue_list(svcparamvalue): + return svcparamvalue.split(',') + + +def svcparamkeysort(svcparamkey): + if svcparamkey.startswith('key'): + return int(svcparamkey[3:]) + return SUPPORTED_PARAMS[svcparamkey]['key_num'] + + +# cc https://datatracker.ietf.org/doc/html/rfc9460#keys +SUPPORTED_PARAMS = { + 'no-default-alpn': {'key_num': 2, 'has_arg': False}, + 'alpn': { + 'key_num': 1, + 'validate': validate_svcparam_alpn, + 'parse_rdata_text': parse_rdata_text_svcparamvalue_list, + }, + 'port': {'key_num': 3, 'validate': validate_svcparam_port}, + 'ipv4hint': { + 'key_num': 4, + 'validate': validate_svcparam_ipv4hint, + 'parse_rdata_text': parse_rdata_text_svcparamvalue_list, + }, + 'ipv6hint': { + 'key_num': 6, + 'validate': validate_svcparam_ipv6hint, + 'parse_rdata_text': parse_rdata_text_svcparamvalue_list, + }, + 'mandatory': { + 'key_num': 0, + 'validate': validate_svcparam_mandatory, + 'parse_rdata_text': parse_rdata_text_svcparamvalue_list, + }, + 'ech': {'key_num': 5, 'validate': validate_svcparam_ech}, +} + + +class SvcbValue(EqualityTupleMixin, dict): + + @classmethod + def parse_rdata_text(cls, value): + try: + (svcpriority, targetname, *svcparams) = value.split(' ') + except ValueError: + raise RrParseError() + try: + svcpriority = int(svcpriority) + except ValueError: + pass + targetname = unquote(targetname) + params = dict() + for svcparam in svcparams: + paramkey, *paramvalue = svcparam.split('=') + if paramkey in params.keys(): + raise RrParseError(f'{paramkey} is specified twice') + if len(paramvalue) != 0: + params[paramkey] = paramvalue[0] + parse_rdata_text = SUPPORTED_PARAMS.get(paramkey, {}).get( + 'parse_rdata_text', None + ) + if parse_rdata_text is not None: + params[paramkey] = parse_rdata_text(paramvalue[0]) + else: + params[paramkey] = None + return { + 'svcpriority': svcpriority, + 'targetname': targetname, + 'svcparams': params, + } + + @classmethod + def validate(cls, data, _): + reasons = [] + for value in data: + svcpriority = -1 + if 'svcpriority' not in value: + reasons.append('missing svcpriority') + else: + try: + svcpriority = int(value.get('svcpriority', 0)) + if svcpriority < 0 or svcpriority > 65535: + reasons.append(f'invalid priority ' f'"{svcpriority}"') + except ValueError: + reasons.append(f'invalid priority "{value["svcpriority"]}"') + + if 'targetname' not in value or value['targetname'] == '': + reasons.append('missing targetname') + else: + targetname = str(value.get('targetname', '')) + targetname = idna_encode(targetname) + if not targetname.endswith('.'): + reasons.append( + f'SVCB value "{targetname}" missing trailing .' + ) + if targetname != '.' and not FQDN(targetname).is_valid: + reasons.append( + f'Invalid SVCB target "{targetname}" is not a valid FQDN.' + ) + + if 'svcparams' in value: + svcparams = value.get('svcparams', dict()) + if svcpriority == 0 and len(svcparams) != 0: + reasons.append('svcparams set on AliasMode SVCB record') + for svcparamkey, svcparamvalue in svcparams.items(): + # XXX: Should we test for keys existing when set in 'mandatory'? + if svcparamkey.startswith('key'): + reasons += validate_svckey_number(svcparamkey) + continue + if ( + svcparamkey not in SUPPORTED_PARAMS.keys() + and not svcparamkey.startswith('key') + ): + reasons.append(f'Unknown SvcParam {svcparamkey}') + continue + if SUPPORTED_PARAMS[svcparamkey].get('has_arg', True): + reasons += SUPPORTED_PARAMS[svcparamkey]['validate']( + svcparamvalue + ) + if ( + not SUPPORTED_PARAMS[svcparamkey].get('has_arg', True) + and svcparamvalue is not None + ): + reasons.append( + f'SvcParam {svcparamkey} has value when it should not' + ) + + return reasons + + @classmethod + def process(cls, values): + return [cls(v) for v in values] + + def __init__(self, value): + super().__init__( + { + 'svcpriority': int(value['svcpriority']), + 'targetname': idna_encode(value['targetname']), + 'svcparams': value.get('svcparams', dict()), + } + ) + + @property + def svcpriority(self): + return self['svcpriority'] + + @svcpriority.setter + def svcpriority(self, value): + self['svcpriority'] = value + + @property + def targetname(self): + return self['targetname'] + + @targetname.setter + def targetname(self, value): + self['targetname'] = value + + @property + def svcparams(self): + return self['svcparams'] + + @svcparams.setter + def svcparams(self, value): + self['svcparams'] = value + + @property + def rdata_text(self): + params = '' + sorted_svcparamkeys = sorted(self.svcparams, key=svcparamkeysort) + for svcparamkey in sorted_svcparamkeys: + params += f' {svcparamkey}' + svcparamvalue = self.svcparams.get(svcparamkey, None) + if svcparamvalue is not None: + if isinstance(svcparamvalue, list): + params += f'={",".join(svcparamvalue)}' + else: + params += f'={svcparamvalue}' + return f'{self.svcpriority} {self.targetname}{params}' + + def __hash__(self): + return hash(self.__repr__()) + + def _equality_tuple(self): + params = set() + for svcparamkey, svcparamvalue in self.svcparams.items(): + if svcparamvalue is not None: + if isinstance(svcparamvalue, list): + params.add(f'{svcparamkey}={",".join(svcparamvalue)}') + else: + params.add(f'{svcparamkey}={svcparamvalue}') + else: + params.add(f'{svcparamkey}') + return (self.svcpriority, self.targetname, params) + + def __repr__(self): + return f"'{self.rdata_text}'" + + +class SvcbRecord(ValuesMixin, Record): + _type = 'SVCB' + _value_type = SvcbValue + + +class HttpsRecord(ValuesMixin, Record): + _type = 'HTTPS' + _value_type = SvcbValue + + +Record.register_type(SvcbRecord) +Record.register_type(HttpsRecord) diff --git a/tests/test_octodns_record_svcb.py b/tests/test_octodns_record_svcb.py new file mode 100644 index 0000000..6566f16 --- /dev/null +++ b/tests/test_octodns_record_svcb.py @@ -0,0 +1,668 @@ +# +# +# + +from unittest import TestCase + +from helpers import SimpleProvider + +from octodns.record import Record +from octodns.record.exception import ValidationError +from octodns.record.rr import RrParseError +from octodns.record.svcb import SvcbRecord, SvcbValue +from octodns.zone import Zone + + +class TestRecordSvcb(TestCase): + zone = Zone('unit.tests.', []) + + def test_svcb(self): + aliasmode_value = SvcbValue( + {'svcpriority': 0, 'targetname': 'foo.example.com.'} + ) + aliasmode_data = {'ttl': 300, 'value': aliasmode_value} + a = SvcbRecord(self.zone, 'alias', aliasmode_data) + self.assertEqual('alias', a.name) + self.assertEqual('alias.unit.tests.', a.fqdn) + self.assertEqual(300, a.ttl) + self.assertEqual( + aliasmode_value['svcpriority'], a.values[0].svcpriority + ) + self.assertEqual(aliasmode_value['targetname'], a.values[0].targetname) + self.assertEqual(aliasmode_value['svcparams'], a.values[0].svcparams) + self.assertEqual(aliasmode_data, a.data) + + servicemode_values = [ + SvcbValue( + { + 'svcpriority': 1, + 'targetname': 'foo.example.com.', + 'svcparams': {'port': 8002}, + } + ), + SvcbValue( + { + 'svcpriority': 2, + 'targetname': 'foo.example.net.', + 'svcparams': {'port': 8080}, + } + ), + ] + servicemode_data = {'ttl': 300, 'values': servicemode_values} + b = SvcbRecord(self.zone, 'service', servicemode_data) + self.assertEqual('service', b.name) + self.assertEqual('service.unit.tests.', b.fqdn) + self.assertEqual(300, b.ttl) + self.assertEqual( + servicemode_values[0]['svcpriority'], b.values[0].svcpriority + ) + self.assertEqual( + servicemode_values[0]['targetname'], b.values[0].targetname + ) + self.assertEqual( + servicemode_values[0]['svcparams'], b.values[0].svcparams + ) + self.assertEqual( + servicemode_values[1]['svcpriority'], b.values[1].svcpriority + ) + self.assertEqual( + servicemode_values[1]['targetname'], b.values[1].targetname + ) + self.assertEqual( + servicemode_values[1]['svcparams'], b.values[1].svcparams + ) + self.assertEqual(servicemode_data, b.data) + + target = SimpleProvider() + # No changes with self + self.assertFalse(b.changes(b, target)) + # Diff in priority causes change + other = SvcbRecord( + self.zone, 'service2', {'ttl': 30, 'values': servicemode_values} + ) + other.values[0].svcpriority = 22 + change = b.changes(other, target) + self.assertEqual(change.existing, b) + self.assertEqual(change.new, other) + # Diff in target causes change + other.values[0].svcpriority = b.values[0].svcpriority + other.values[0].targetname = 'blabla.example.com.' + change = b.changes(other, target) + self.assertEqual(change.existing, b) + self.assertEqual(change.new, other) + # Diff in params causes change + other.values[0].targetname = b.values[0].targetname + other.values[0].svcparams = {'port': '8888'} + change = b.changes(other, target) + self.assertEqual(change.existing, b) + self.assertEqual(change.new, other) + + # __repr__ doesn't blow up + a.__repr__() + b.__repr__() + + def test_svcb_value_rdata_text(self): + # empty string won't parse + with self.assertRaises(RrParseError): + SvcbValue.parse_rdata_text('') + + # single word won't parse + with self.assertRaises(RrParseError): + SvcbValue.parse_rdata_text('nope') + + # Double keys are not allowed + with self.assertRaises(RrParseError): + SvcbValue.parse_rdata_text('1 foo.example.com port=8080 port=8084') + + # priority not int + self.assertEqual( + { + 'svcpriority': 'one', + 'targetname': 'foo.example.com', + 'svcparams': dict(), + }, + SvcbValue.parse_rdata_text('one foo.example.com'), + ) + + # valid with params + self.assertEqual( + { + 'svcpriority': 1, + 'targetname': 'svcb.unit.tests.', + 'svcparams': {'port': '8080', 'no-default-alpn': None}, + }, + SvcbValue.parse_rdata_text( + '1 svcb.unit.tests. port=8080 no-default-alpn' + ), + ) + + # quoted target + self.assertEqual( + { + 'svcpriority': 1, + 'targetname': 'svcb.unit.tests.', + 'svcparams': dict(), + }, + SvcbValue.parse_rdata_text('1 "svcb.unit.tests."'), + ) + + zone = Zone('unit.tests.', []) + a = SvcbRecord( + zone, + 'svc', + { + 'ttl': 32, + 'value': { + 'svcpriority': 1, + 'targetname': 'svcb.unit.tests.', + 'svcparams': {'port': '8080'}, + }, + }, + ) + self.assertEqual(1, a.values[0].svcpriority) + self.assertEqual('svcb.unit.tests.', a.values[0].targetname) + self.assertEqual({'port': '8080'}, a.values[0].svcparams) + + # both directions should match + rdata = '1 svcb.unit.tests. no-default-alpn port=8080 ipv4hint=192.0.2.2,192.0.2.53 key3333=foobar' + record = SvcbRecord( + zone, 'svc', {'ttl': 32, 'value': SvcbValue.parse_rdata_text(rdata)} + ) + self.assertEqual(rdata, record.values[0].rdata_text) + + # both directions should match + rdata = '0 svcb.unit.tests.' + record = SvcbRecord( + zone, 'svc', {'ttl': 32, 'value': SvcbValue.parse_rdata_text(rdata)} + ) + self.assertEqual(rdata, record.values[0].rdata_text) + + def test_svcb_value(self): + a = SvcbValue( + {'svcpriority': 0, 'targetname': 'foo.', 'svcparams': dict()} + ) + b = SvcbValue( + {'svcpriority': 1, 'targetname': 'foo.', 'svcparams': dict()} + ) + c = SvcbValue( + { + 'svcpriority': 0, + 'targetname': 'foo.', + 'svcparams': {'port': 8080, 'no-default-alpn': None}, + } + ) + d = SvcbValue( + { + 'svcpriority': 0, + 'targetname': 'foo.', + 'svcparams': {'alpn': ['h2', 'h3'], 'port': 8080}, + } + ) + e = SvcbValue( + { + 'svcpriority': 0, + 'targetname': 'mmm.', + 'svcparams': {'ipv4hint': ['192.0.2.1']}, + } + ) + + self.assertEqual(a, a) + self.assertEqual(b, b) + self.assertEqual(c, c) + self.assertEqual(d, d) + self.assertEqual(e, e) + + self.assertNotEqual(a, b) + self.assertNotEqual(a, c) + self.assertNotEqual(a, d) + self.assertNotEqual(a, e) + self.assertNotEqual(b, a) + self.assertNotEqual(b, c) + self.assertNotEqual(b, d) + self.assertNotEqual(b, e) + self.assertNotEqual(c, a) + self.assertNotEqual(c, b) + self.assertNotEqual(c, d) + self.assertNotEqual(c, e) + self.assertNotEqual(d, a) + self.assertNotEqual(d, b) + self.assertNotEqual(d, c) + self.assertNotEqual(d, e) + self.assertNotEqual(e, a) + self.assertNotEqual(e, b) + self.assertNotEqual(e, c) + self.assertNotEqual(e, d) + + self.assertTrue(a < b) + self.assertTrue(a < c) + + self.assertTrue(b > a) + self.assertTrue(b > c) + + self.assertTrue(c > a) + self.assertTrue(c < b) + + self.assertTrue(a <= b) + self.assertTrue(a <= c) + self.assertTrue(a <= a) + self.assertTrue(a >= a) + + self.assertTrue(b >= a) + self.assertTrue(b >= c) + self.assertTrue(b >= b) + self.assertTrue(b <= b) + + self.assertTrue(c >= a) + self.assertTrue(c <= b) + self.assertTrue(c >= c) + self.assertTrue(c <= c) + + # Hash + values = set() + values.add(a) + self.assertTrue(a in values) + self.assertFalse(b in values) + values.add(b) + self.assertTrue(b in values) + + def test_validation(self): + # doesn't blow up + Record.new( + self.zone, + 'svcb', + { + 'type': 'SVCB', + 'ttl': 600, + 'value': {'svcpriority': 1, 'targetname': 'foo.bar.baz.'}, + }, + ) + + # Wildcards are fine + Record.new( + self.zone, + '*', + { + 'type': 'SVCB', + 'ttl': 600, + 'value': {'svcpriority': 1, 'targetname': 'foo.bar.baz.'}, + }, + ) + + # missing priority + with self.assertRaises(ValidationError) as ctx: + Record.new( + self.zone, + 'foo', + { + 'type': 'SVCB', + 'ttl': 600, + 'value': {'targetname': 'foo.bar.baz.'}, + }, + ) + self.assertEqual(['missing svcpriority'], ctx.exception.reasons) + + # invalid priority + with self.assertRaises(ValidationError) as ctx: + Record.new( + self.zone, + 'foo', + { + 'type': 'SVCB', + 'ttl': 600, + 'value': { + 'svcpriority': 'foo', + 'targetname': 'foo.bar.baz.', + }, + }, + ) + self.assertEqual(['invalid priority "foo"'], ctx.exception.reasons) + + # invalid priority (out of range) + with self.assertRaises(ValidationError) as ctx: + Record.new( + self.zone, + 'foo', + { + 'type': 'SVCB', + 'ttl': 600, + 'value': { + 'svcpriority': 100000, + 'targetname': 'foo.bar.baz.', + }, + }, + ) + self.assertEqual(['invalid priority "100000"'], ctx.exception.reasons) + + # missing target + with self.assertRaises(ValidationError) as ctx: + Record.new( + self.zone, + 'foo', + {'type': 'SVCB', 'ttl': 600, 'value': {'svcpriority': 1}}, + ) + self.assertEqual(['missing targetname'], ctx.exception.reasons) + + # invalid target + with self.assertRaises(ValidationError) as ctx: + Record.new( + self.zone, + 'foo', + { + 'type': 'SVCB', + 'ttl': 600, + 'value': {'svcpriority': 1, 'targetname': 'foo.bar.baz'}, + }, + ) + self.assertEqual( + ['SVCB value "foo.bar.baz" missing trailing .'], + ctx.exception.reasons, + ) + + # falsey target + with self.assertRaises(ValidationError) as ctx: + Record.new( + self.zone, + 'foo', + { + 'type': 'SVCB', + 'ttl': 600, + 'value': {'svcpriority': 1, 'targetname': ''}, + }, + ) + self.assertEqual(['missing targetname'], ctx.exception.reasons) + + # target must be a valid FQDN + with self.assertRaises(ValidationError) as ctx: + Record.new( + self.zone, + 'foo', + { + 'type': 'SVCB', + 'ttl': 600, + 'value': { + 'svcpriority': 1, + 'targetname': 'bla foo.bar.com.', + }, + }, + ) + self.assertEqual( + ['Invalid SVCB target "bla foo.bar.com." is not a valid FQDN.'], + ctx.exception.reasons, + ) + + # target can be root label + Record.new( + self.zone, + 'foo', + { + 'type': 'SVCB', + 'ttl': 600, + 'value': {'svcpriority': 1, 'targetname': '.'}, + }, + ) + + # Params can't be set for AliasMode + with self.assertRaises(ValidationError) as ctx: + Record.new( + self.zone, + 'foo', + { + 'type': 'SVCB', + 'ttl': 600, + 'value': { + 'svcpriority': 0, + 'targetname': 'foo.bar.com.', + 'svcparams': {'port': '8000'}, + }, + }, + ) + self.assertEqual( + ['svcparams set on AliasMode SVCB record'], ctx.exception.reasons + ) + + # Unknown param + with self.assertRaises(ValidationError) as ctx: + Record.new( + self.zone, + 'foo', + { + 'type': 'SVCB', + 'ttl': 600, + 'value': { + 'svcpriority': 1, + 'targetname': 'foo.bar.com.', + 'svcparams': {'blablabla': '222'}, + }, + }, + ) + self.assertEqual(['Unknown SvcParam blablabla'], ctx.exception.reasons) + + # Port number invalid + with self.assertRaises(ValidationError) as ctx: + Record.new( + self.zone, + 'foo', + { + 'type': 'SVCB', + 'ttl': 600, + 'value': { + 'svcpriority': 1, + 'targetname': 'foo.bar.com.', + 'svcparams': {'port': 100000}, + }, + }, + ) + self.assertEqual( + ['port 100000 is not a valid number'], ctx.exception.reasons + ) + + # Port number not an int + with self.assertRaises(ValidationError) as ctx: + Record.new( + self.zone, + 'foo', + { + 'type': 'SVCB', + 'ttl': 600, + 'value': { + 'svcpriority': 1, + 'targetname': 'foo.bar.com.', + 'svcparams': {'port': 'foo'}, + }, + }, + ) + self.assertEqual(['port is not a number'], ctx.exception.reasons) + + # no-default-alpn set + Record.new( + self.zone, + 'foo', + { + 'type': 'SVCB', + 'ttl': 600, + 'value': { + 'svcpriority': 1, + 'targetname': 'foo.bar.com.', + 'svcparams': {'no-default-alpn': None}, + }, + }, + ) + + # no-default-alpn has value + with self.assertRaises(ValidationError) as ctx: + Record.new( + self.zone, + 'foo', + { + 'type': 'SVCB', + 'ttl': 600, + 'value': { + 'svcpriority': 1, + 'targetname': 'foo.bar.com.', + 'svcparams': {'no-default-alpn': 'foobar'}, + }, + }, + ) + self.assertEqual( + ['SvcParam no-default-alpn has value when it should not'], + ctx.exception.reasons, + ) + + # alpn is broken + with self.assertRaises(ValidationError) as ctx: + Record.new( + self.zone, + 'foo', + { + 'type': 'SVCB', + 'ttl': 600, + 'value': { + 'svcpriority': 1, + 'targetname': 'foo.bar.com.', + 'svcparams': {'alpn': ['h2', '😅']}, + }, + }, + ) + self.assertEqual(['non ASCII character in "😅"'], ctx.exception.reasons) + + # svcbvaluelist that is not a list + with self.assertRaises(ValidationError) as ctx: + Record.new( + self.zone, + 'foo', + { + 'type': 'SVCB', + 'ttl': 600, + 'value': { + 'svcpriority': 1, + 'targetname': 'foo.bar.com.', + 'svcparams': { + 'ipv4hint': '192.0.2.1,192.0.2.2', + 'ipv6hint': '2001:db8::1', + 'mandatory': 'ipv6hint', + 'alpn': 'h2,h3', + }, + }, + }, + ) + self.assertEqual( + [ + 'ipv4hint is not a list', + 'ipv6hint is not a list', + 'mandatory is not a list', + 'alpn is not a list', + ], + ctx.exception.reasons, + ) + + # ipv4hint + with self.assertRaises(ValidationError) as ctx: + Record.new( + self.zone, + 'foo', + { + 'type': 'SVCB', + 'ttl': 600, + 'value': { + 'svcpriority': 1, + 'targetname': 'foo.bar.com.', + 'svcparams': { + 'ipv4hint': ['192.0.2.0', '500.500.30.30'] + }, + }, + }, + ) + self.assertEqual( + ['ipv4hint "500.500.30.30" is not a valid IPv4 address'], + ctx.exception.reasons, + ) + + # ipv6hint + with self.assertRaises(ValidationError) as ctx: + Record.new( + self.zone, + 'foo', + { + 'type': 'SVCB', + 'ttl': 600, + 'value': { + 'svcpriority': 1, + 'targetname': 'foo.bar.com.', + 'svcparams': { + 'ipv6hint': ['2001:db8:43::1', 'notanip'] + }, + }, + }, + ) + self.assertEqual( + ['ipv6hint "notanip" is not a valid IPv6 address'], + ctx.exception.reasons, + ) + + # mandatory + with self.assertRaises(ValidationError) as ctx: + Record.new( + self.zone, + 'foo', + { + 'type': 'SVCB', + 'ttl': 600, + 'value': { + 'svcpriority': 1, + 'targetname': 'foo.bar.com.', + 'svcparams': { + 'mandatory': ['ipv4hint', 'unknown', 'key4444'] + }, + }, + }, + ) + self.assertEqual( + ['unsupported SvcParam "unknown" in mandatory'], + ctx.exception.reasons, + ) + + # ech + with self.assertRaises(ValidationError) as ctx: + Record.new( + self.zone, + 'foo', + { + 'type': 'SVCB', + 'ttl': 600, + 'value': { + 'svcpriority': 1, + 'targetname': 'foo.bar.com.', + 'svcparams': {'ech': ' dG90YWxseUZha2VFQ0hPcHRpb24'}, + }, + }, + ) + self.assertEqual( + ['ech SvcParam is invalid Base64'], ctx.exception.reasons + ) + + # broken keyNNNN format + with self.assertRaises(ValidationError) as ctx: + Record.new( + self.zone, + 'foo', + { + 'type': 'SVCB', + 'ttl': 600, + 'value': { + 'svcpriority': 1, + 'targetname': 'foo.bar.com.', + 'svcparams': { + 'key100000': 'foo', + 'key3333': 'bar', + 'keyXXX': 'foo', + }, + }, + }, + ) + self.assertEqual( + [ + 'SvcParam key "key100000" has wrong key number', + 'SvcParam key "keyXXX" has wrong format', + ], + ctx.exception.reasons, + ) diff --git a/tests/zones/unit.tests.tst b/tests/zones/unit.tests.tst index 82549ea..5c08baf 100644 --- a/tests/zones/unit.tests.tst +++ b/tests/zones/unit.tests.tst @@ -55,3 +55,14 @@ aaaa 600 IN AAAA 2601:644:500:e210:62f8:1dff:feb8:947a ; CNAME Records cname 300 IN CNAME unit.tests. included 300 IN CNAME unit.tests. + +; SVCB and HTTPS records +svcb-alias 300 IN SVCB 0 alias-target.unit.test. + +svcb-service 300 IN SVCB 1 . ipv4hint=192.0.2.4 port=9000 +svcb-service 300 IN SVCB 2 svcb-target.unit.test. port=9001 + +https-alias 300 IN HTTPS 0 alias-target.unit.test. + +https-service 300 IN HTTPS 1 . ipv4hint=192.0.2.4 port=9000 +https-service 300 IN HTTPS 2 svcb-target.unit.test. port=9001