|
|
|
@ -29,18 +29,26 @@ def validate_svcparam_port(svcparamvalue): |
|
|
|
return reasons |
|
|
|
|
|
|
|
|
|
|
|
def validate_list(svcparamkey, svcparamvalue): |
|
|
|
if not isinstance(svcparamvalue, list): |
|
|
|
return [f'{svcparamkey} is not a list'] |
|
|
|
return list() |
|
|
|
|
|
|
|
|
|
|
|
def validate_svcparam_alpn(svcparamvalue): |
|
|
|
reasons = [] |
|
|
|
alpns = svcparamvalue.split(',') |
|
|
|
for alpn in alpns: |
|
|
|
reasons = validate_list('alpn', svcparamvalue) |
|
|
|
if len(reasons) != 0: |
|
|
|
return reasons |
|
|
|
for alpn in svcparamvalue: |
|
|
|
reasons += _ChunkedValue.validate(alpn, 'SVCB') |
|
|
|
return reasons |
|
|
|
|
|
|
|
|
|
|
|
def validate_svcparam_iphint(ip_version, svcparamvalue): |
|
|
|
reasons = [] |
|
|
|
addresses = svcparamvalue.split(',') |
|
|
|
for address in addresses: |
|
|
|
reasons = validate_list(f'ipv{ip_version}hint', svcparamvalue) |
|
|
|
if len(reasons) != 0: |
|
|
|
return reasons |
|
|
|
for address in svcparamvalue: |
|
|
|
try: |
|
|
|
if ip_version == 4: |
|
|
|
IPv4Address(address) |
|
|
|
@ -48,23 +56,24 @@ def validate_svcparam_iphint(ip_version, svcparamvalue): |
|
|
|
IPv6Address(address) |
|
|
|
except AddressValueError: |
|
|
|
reasons.append( |
|
|
|
f'ip{ip_version}hint "{address}" is not a valid IPv{ip_version} address' |
|
|
|
f'ipv{ip_version}hint "{address}" is not a valid IPv{ip_version} address' |
|
|
|
) |
|
|
|
return reasons |
|
|
|
|
|
|
|
|
|
|
|
def validate_svcparam_ip4hint(svcparamvalue): |
|
|
|
def validate_svcparam_ipv4hint(svcparamvalue): |
|
|
|
return validate_svcparam_iphint(4, svcparamvalue) |
|
|
|
|
|
|
|
|
|
|
|
def validate_svcparam_ip6hint(svcparamvalue): |
|
|
|
def validate_svcparam_ipv6hint(svcparamvalue): |
|
|
|
return validate_svcparam_iphint(6, svcparamvalue) |
|
|
|
|
|
|
|
|
|
|
|
def validate_svcparam_mandatory(svcparamvalue): |
|
|
|
reasons = [] |
|
|
|
mandatories = svcparamvalue.split(',') |
|
|
|
for mandatory in mandatories: |
|
|
|
reasons = validate_list('mandatory', svcparamvalue) |
|
|
|
if len(reasons) != 0: |
|
|
|
return reasons |
|
|
|
for mandatory in svcparamvalue: |
|
|
|
if ( |
|
|
|
mandatory not in SUPPORTED_PARAMS.keys() |
|
|
|
and not mandatory.startswith('key') |
|
|
|
@ -92,14 +101,30 @@ def validate_svckey_number(paramkey): |
|
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
def parse_rdata_text_svcparamvalue_list(svcparamvalue): |
|
|
|
return svcparamvalue.split(',') |
|
|
|
|
|
|
|
|
|
|
|
# cc https://datatracker.ietf.org/doc/html/rfc9460#keys |
|
|
|
SUPPORTED_PARAMS = { |
|
|
|
'no-default-alpn': {'has_arg': False}, |
|
|
|
'alpn': {'validate': validate_svcparam_alpn}, |
|
|
|
'alpn': { |
|
|
|
'validate': validate_svcparam_alpn, |
|
|
|
'parse_rdata_text': parse_rdata_text_svcparamvalue_list, |
|
|
|
}, |
|
|
|
'port': {'validate': validate_svcparam_port}, |
|
|
|
'ipv4hint': {'validate': validate_svcparam_ip4hint}, |
|
|
|
'ipv6hint': {'validate': validate_svcparam_ip6hint}, |
|
|
|
'mandatory': {'validate': validate_svcparam_mandatory}, |
|
|
|
'ipv4hint': { |
|
|
|
'validate': validate_svcparam_ipv4hint, |
|
|
|
'parse_rdata_text': parse_rdata_text_svcparamvalue_list, |
|
|
|
}, |
|
|
|
'ipv6hint': { |
|
|
|
'validate': validate_svcparam_ipv6hint, |
|
|
|
'parse_rdata_text': parse_rdata_text_svcparamvalue_list, |
|
|
|
}, |
|
|
|
'mandatory': { |
|
|
|
'validate': validate_svcparam_mandatory, |
|
|
|
'parse_rdata_text': parse_rdata_text_svcparamvalue_list, |
|
|
|
}, |
|
|
|
'ech': {'validate': validate_svcparam_ech}, |
|
|
|
} |
|
|
|
|
|
|
|
@ -109,7 +134,6 @@ class SvcbValue(EqualityTupleMixin, dict): |
|
|
|
@classmethod |
|
|
|
def parse_rdata_text(cls, value): |
|
|
|
try: |
|
|
|
# XXX: Should we split params into the specific ParamKeys and ParamValues? |
|
|
|
(svcpriority, targetname, *svcparams) = value.split(' ') |
|
|
|
except ValueError: |
|
|
|
raise RrParseError() |
|
|
|
@ -125,6 +149,15 @@ class SvcbValue(EqualityTupleMixin, dict): |
|
|
|
raise RrParseError(f'{paramkey} is specified twice') |
|
|
|
if len(paramvalue) != 0: |
|
|
|
params[paramkey] = paramvalue[0] |
|
|
|
if ( |
|
|
|
SUPPORTED_PARAMS.get(paramkey, {}).get( |
|
|
|
'parse_rdata_text', None |
|
|
|
) |
|
|
|
is not None |
|
|
|
): |
|
|
|
params[paramkey] = SUPPORTED_PARAMS[paramkey][ |
|
|
|
'parse_rdata_text' |
|
|
|
](paramvalue[0]) |
|
|
|
continue |
|
|
|
params[paramkey] = None |
|
|
|
return { |
|
|
|
@ -231,10 +264,14 @@ class SvcbValue(EqualityTupleMixin, dict): |
|
|
|
@property |
|
|
|
def rdata_text(self): |
|
|
|
params = '' |
|
|
|
# XXX: sort params by key number |
|
|
|
for svcparamkey, svcparamvalue in self.svcparams.items(): |
|
|
|
params += f' {svcparamkey}' |
|
|
|
if svcparamvalue is not None: |
|
|
|
params += f'={svcparamvalue}' |
|
|
|
if isinstance(svcparamvalue, list): |
|
|
|
params += f'={",".join(svcparamvalue)}' |
|
|
|
else: |
|
|
|
params += f'={svcparamvalue}' |
|
|
|
return f'{self.svcpriority} {self.targetname}{params}' |
|
|
|
|
|
|
|
def __hash__(self): |
|
|
|
@ -244,7 +281,10 @@ class SvcbValue(EqualityTupleMixin, dict): |
|
|
|
params = set() |
|
|
|
for svcparamkey, svcparamvalue in self.svcparams.items(): |
|
|
|
if svcparamvalue is not None: |
|
|
|
params.add(f'{svcparamkey}={svcparamvalue}') |
|
|
|
if isinstance(svcparamvalue, list): |
|
|
|
params.add(f'{svcparamkey}={",".join(svcparamvalue)}') |
|
|
|
else: |
|
|
|
params.add(f'{svcparamkey}={svcparamvalue}') |
|
|
|
else: |
|
|
|
params.add(f'{svcparamkey}') |
|
|
|
return (self.svcpriority, self.targetname, params) |
|
|
|
|