From b53993e9885456a02ddd539509f0be341aed9da3 Mon Sep 17 00:00:00 2001 From: Pieter Lexis Date: Tue, 4 Jun 2024 22:31:32 +0200 Subject: [PATCH] SVCB: force scvparamvalue lists to be lists --- octodns/record/svcb.py | 78 +++++++++++++++++++++++-------- tests/test_octodns_record_svcb.py | 54 +++++++++++++++++---- 2 files changed, 104 insertions(+), 28 deletions(-) diff --git a/octodns/record/svcb.py b/octodns/record/svcb.py index 3ec3a39..c7bcfb0 100644 --- a/octodns/record/svcb.py +++ b/octodns/record/svcb.py @@ -29,18 +29,26 @@ def validate_svcparam_port(svcparamvalue): return reasons +def validate_list(svcparamkey, svcparamvalue): + if not isinstance(svcparamvalue, list): + return [f'{svcparamkey} is not a list'] + return list() + + def validate_svcparam_alpn(svcparamvalue): - reasons = [] - alpns = svcparamvalue.split(',') - for alpn in alpns: + reasons = validate_list('alpn', svcparamvalue) + if len(reasons) != 0: + return reasons + for alpn in svcparamvalue: reasons += _ChunkedValue.validate(alpn, 'SVCB') return reasons def validate_svcparam_iphint(ip_version, svcparamvalue): - reasons = [] - addresses = svcparamvalue.split(',') - for address in addresses: + reasons = validate_list(f'ipv{ip_version}hint', svcparamvalue) + if len(reasons) != 0: + return reasons + for address in svcparamvalue: try: if ip_version == 4: IPv4Address(address) @@ -48,23 +56,24 @@ def validate_svcparam_iphint(ip_version, svcparamvalue): IPv6Address(address) except AddressValueError: reasons.append( - f'ip{ip_version}hint "{address}" is not a valid IPv{ip_version} address' + f'ipv{ip_version}hint "{address}" is not a valid IPv{ip_version} address' ) return reasons -def validate_svcparam_ip4hint(svcparamvalue): +def validate_svcparam_ipv4hint(svcparamvalue): return validate_svcparam_iphint(4, svcparamvalue) -def validate_svcparam_ip6hint(svcparamvalue): +def validate_svcparam_ipv6hint(svcparamvalue): return validate_svcparam_iphint(6, svcparamvalue) def validate_svcparam_mandatory(svcparamvalue): - reasons = [] - mandatories = svcparamvalue.split(',') - for mandatory in mandatories: + reasons = validate_list('mandatory', svcparamvalue) + if len(reasons) != 0: + return reasons + for mandatory in svcparamvalue: if ( mandatory not in SUPPORTED_PARAMS.keys() and not mandatory.startswith('key') @@ -92,14 +101,30 @@ def validate_svckey_number(paramkey): return [] +def parse_rdata_text_svcparamvalue_list(svcparamvalue): + return svcparamvalue.split(',') + + # cc https://datatracker.ietf.org/doc/html/rfc9460#keys SUPPORTED_PARAMS = { 'no-default-alpn': {'has_arg': False}, - 'alpn': {'validate': validate_svcparam_alpn}, + 'alpn': { + 'validate': validate_svcparam_alpn, + 'parse_rdata_text': parse_rdata_text_svcparamvalue_list, + }, 'port': {'validate': validate_svcparam_port}, - 'ipv4hint': {'validate': validate_svcparam_ip4hint}, - 'ipv6hint': {'validate': validate_svcparam_ip6hint}, - 'mandatory': {'validate': validate_svcparam_mandatory}, + 'ipv4hint': { + 'validate': validate_svcparam_ipv4hint, + 'parse_rdata_text': parse_rdata_text_svcparamvalue_list, + }, + 'ipv6hint': { + 'validate': validate_svcparam_ipv6hint, + 'parse_rdata_text': parse_rdata_text_svcparamvalue_list, + }, + 'mandatory': { + 'validate': validate_svcparam_mandatory, + 'parse_rdata_text': parse_rdata_text_svcparamvalue_list, + }, 'ech': {'validate': validate_svcparam_ech}, } @@ -109,7 +134,6 @@ class SvcbValue(EqualityTupleMixin, dict): @classmethod def parse_rdata_text(cls, value): try: - # XXX: Should we split params into the specific ParamKeys and ParamValues? (svcpriority, targetname, *svcparams) = value.split(' ') except ValueError: raise RrParseError() @@ -125,6 +149,15 @@ class SvcbValue(EqualityTupleMixin, dict): raise RrParseError(f'{paramkey} is specified twice') if len(paramvalue) != 0: params[paramkey] = paramvalue[0] + if ( + SUPPORTED_PARAMS.get(paramkey, {}).get( + 'parse_rdata_text', None + ) + is not None + ): + params[paramkey] = SUPPORTED_PARAMS[paramkey][ + 'parse_rdata_text' + ](paramvalue[0]) continue params[paramkey] = None return { @@ -231,10 +264,14 @@ class SvcbValue(EqualityTupleMixin, dict): @property def rdata_text(self): params = '' + # XXX: sort params by key number for svcparamkey, svcparamvalue in self.svcparams.items(): params += f' {svcparamkey}' if svcparamvalue is not None: - params += f'={svcparamvalue}' + if isinstance(svcparamvalue, list): + params += f'={",".join(svcparamvalue)}' + else: + params += f'={svcparamvalue}' return f'{self.svcpriority} {self.targetname}{params}' def __hash__(self): @@ -244,7 +281,10 @@ class SvcbValue(EqualityTupleMixin, dict): params = set() for svcparamkey, svcparamvalue in self.svcparams.items(): if svcparamvalue is not None: - params.add(f'{svcparamkey}={svcparamvalue}') + if isinstance(svcparamvalue, list): + params.add(f'{svcparamkey}={",".join(svcparamvalue)}') + else: + params.add(f'{svcparamkey}={svcparamvalue}') else: params.add(f'{svcparamkey}') return (self.svcpriority, self.targetname, params) diff --git a/tests/test_octodns_record_svcb.py b/tests/test_octodns_record_svcb.py index 767d3f0..51c25ec 100644 --- a/tests/test_octodns_record_svcb.py +++ b/tests/test_octodns_record_svcb.py @@ -164,7 +164,7 @@ class TestRecordSvcb(TestCase): self.assertEqual({'port': '8080'}, a.values[0].svcparams) # both directions should match - rdata = '1 svcb.unit.tests. port=8080 no-default-alpn' + rdata = '1 svcb.unit.tests. port=8080 no-default-alpn ipv4hint=192.0.2.2,192.0.2.53' record = SvcbRecord( zone, 'svc', {'ttl': 32, 'value': SvcbValue.parse_rdata_text(rdata)} ) @@ -195,14 +195,14 @@ class TestRecordSvcb(TestCase): { 'svcpriority': 0, 'targetname': 'foo.', - 'svcparams': {'alpn': 'h2,h3', 'port': 8080}, + 'svcparams': {'alpn': ['h2', 'h3'], 'port': 8080}, } ) e = SvcbValue( { 'svcpriority': 0, 'targetname': 'mmm.', - 'svcparams': {'ipv4hint': '192.0.2.1'}, + 'svcparams': {'ipv4hint': ['192.0.2.1']}, } ) @@ -519,12 +519,42 @@ class TestRecordSvcb(TestCase): 'value': { 'svcpriority': 1, 'targetname': 'foo.bar.com.', - 'svcparams': {'alpn': 'h2,😅'}, + 'svcparams': {'alpn': ['h2', '😅']}, }, }, ) self.assertEqual(['non ASCII character in "😅"'], ctx.exception.reasons) + # svcbvaluelist that is not a list + with self.assertRaises(ValidationError) as ctx: + Record.new( + self.zone, + 'foo', + { + 'type': 'SVCB', + 'ttl': 600, + 'value': { + 'svcpriority': 1, + 'targetname': 'foo.bar.com.', + 'svcparams': { + 'ipv4hint': '192.0.2.1,192.0.2.2', + 'ipv6hint': '2001:db8::1', + 'mandatory': 'ipv6hint', + 'alpn': 'h2,h3', + }, + }, + }, + ) + self.assertEqual( + [ + 'ipv4hint is not a list', + 'ipv6hint is not a list', + 'mandatory is not a list', + 'alpn is not a list', + ], + ctx.exception.reasons, + ) + # ipv4hint with self.assertRaises(ValidationError) as ctx: Record.new( @@ -536,12 +566,14 @@ class TestRecordSvcb(TestCase): 'value': { 'svcpriority': 1, 'targetname': 'foo.bar.com.', - 'svcparams': {'ipv4hint': '192.0.2.0,500.500.30.30'}, + 'svcparams': { + 'ipv4hint': ['192.0.2.0', '500.500.30.30'] + }, }, }, ) self.assertEqual( - ['ip4hint "500.500.30.30" is not a valid IPv4 address'], + ['ipv4hint "500.500.30.30" is not a valid IPv4 address'], ctx.exception.reasons, ) @@ -556,12 +588,14 @@ class TestRecordSvcb(TestCase): 'value': { 'svcpriority': 1, 'targetname': 'foo.bar.com.', - 'svcparams': {'ipv6hint': '2001:db8:43::1,notanip'}, + 'svcparams': { + 'ipv6hint': ['2001:db8:43::1', 'notanip'] + }, }, }, ) self.assertEqual( - ['ip6hint "notanip" is not a valid IPv6 address'], + ['ipv6hint "notanip" is not a valid IPv6 address'], ctx.exception.reasons, ) @@ -576,7 +610,9 @@ class TestRecordSvcb(TestCase): 'value': { 'svcpriority': 1, 'targetname': 'foo.bar.com.', - 'svcparams': {'mandatory': 'ipv4hint,unknown,key4444'}, + 'svcparams': { + 'mandatory': ['ipv4hint', 'unknown', 'key4444'] + }, }, }, )