Browse Source

Merge branch 'main' into fix-multiple-dynamic-zones

pull/1069/head
Ross McFarland 2 years ago
committed by GitHub
parent
commit
da8ac9dd75
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 238 additions and 140 deletions
  1. +98
    -55
      octodns/record/ds.py
  2. +140
    -85
      tests/test_octodns_record_ds.py

+ 98
- 55
octodns/record/ds.py View File

@ -2,37 +2,40 @@
# #
# #
from logging import getLogger
from ..equality import EqualityTupleMixin from ..equality import EqualityTupleMixin
from .base import Record, ValuesMixin from .base import Record, ValuesMixin
from .rr import RrParseError from .rr import RrParseError
class DsValue(EqualityTupleMixin, dict): class DsValue(EqualityTupleMixin, dict):
# https://www.rfc-editor.org/rfc/rfc4034.html#section-2.1
# https://www.rfc-editor.org/rfc/rfc4034.html#section-5.1
log = getLogger('DsValue')
@classmethod @classmethod
def parse_rdata_text(cls, value): def parse_rdata_text(cls, value):
try: try:
flags, protocol, algorithm, public_key = value.split(' ')
key_tag, algorithm, digest_type, digest = value.split(' ')
except ValueError: except ValueError:
raise RrParseError() raise RrParseError()
try: try:
flags = int(flags)
key_tag = int(key_tag)
except ValueError: except ValueError:
pass pass
try: try:
protocol = int(protocol)
algorithm = int(algorithm)
except ValueError: except ValueError:
pass pass
try: try:
algorithm = int(algorithm)
digest_type = int(digest_type)
except ValueError: except ValueError:
pass pass
return { return {
'flags': flags,
'protocol': protocol,
'key_tag': key_tag,
'algorithm': algorithm, 'algorithm': algorithm,
'public_key': public_key,
'digest_type': digest_type,
'digest': digest,
} }
@classmethod @classmethod
@ -41,26 +44,57 @@ class DsValue(EqualityTupleMixin, dict):
data = (data,) data = (data,)
reasons = [] reasons = []
for value in data: for value in data:
try:
int(value['flags'])
except KeyError:
reasons.append('missing flags')
except ValueError:
reasons.append(f'invalid flags "{value["flags"]}"')
try:
int(value['protocol'])
except KeyError:
reasons.append('missing protocol')
except ValueError:
reasons.append(f'invalid protocol "{value["protocol"]}"')
try:
int(value['algorithm'])
except KeyError:
reasons.append('missing algorithm')
except ValueError:
reasons.append(f'invalid algorithm "{value["algorithm"]}"')
if 'public_key' not in value:
reasons.append('missing public_key')
# we need to validate both "old" style field names and new
# it is safe to assume if public_key or flags are defined then it is "old" style
# A DS record without public_key doesn't make any sense and shouldn't have validated previously
if "public_key" in value or "flags" in value:
cls.log.warning(
'"algorithm", "flags", "public_key", and "protocol" support is DEPRECATED and will be removed in 2.0'
)
try:
int(value['flags'])
except KeyError:
reasons.append('missing flags')
except ValueError:
reasons.append(f'invalid flags "{value["flags"]}"')
try:
int(value['protocol'])
except KeyError:
reasons.append('missing protocol')
except ValueError:
reasons.append(f'invalid protocol "{value["protocol"]}"')
try:
int(value['algorithm'])
except KeyError:
reasons.append('missing algorithm')
except ValueError:
reasons.append(f'invalid algorithm "{value["algorithm"]}"')
if 'public_key' not in value:
reasons.append('missing public_key')
else:
try:
int(value['key_tag'])
except KeyError:
reasons.append('missing key_tag')
except ValueError:
reasons.append(f'invalid key_tag "{value["key_tag"]}"')
try:
int(value['algorithm'])
except KeyError:
reasons.append('missing algorithm')
except ValueError:
reasons.append(f'invalid algorithm "{value["algorithm"]}"')
try:
int(value['digest_type'])
except KeyError:
reasons.append('missing digest_type')
except ValueError:
reasons.append(
f'invalid digest_type "{value["digest_type"]}"'
)
if 'digest' not in value:
reasons.append('missing digest')
return reasons return reasons
@classmethod @classmethod
@ -68,30 +102,31 @@ class DsValue(EqualityTupleMixin, dict):
return [cls(v) for v in values] return [cls(v) for v in values]
def __init__(self, value): def __init__(self, value):
super().__init__(
{
'flags': int(value['flags']),
'protocol': int(value['protocol']),
# we need to instantiate both based on "old" style field names and new
# it is safe to assume if public_key or flags are defined then it is "old" style
if "public_key" in value or "flags" in value:
init = {
'key_tag': int(value['flags']),
'algorithm': int(value['protocol']),
'digest_type': int(value['algorithm']),
'digest': value['public_key'],
}
else:
init = {
'key_tag': int(value['key_tag']),
'algorithm': int(value['algorithm']), 'algorithm': int(value['algorithm']),
'public_key': value['public_key'],
'digest_type': int(value['digest_type']),
'digest': value['digest'],
} }
)
super().__init__(init)
@property @property
def flags(self):
return self['flags']
def key_tag(self):
return self['key_tag']
@flags.setter
def flags(self, value):
self['flags'] = value
@property
def protocol(self):
return self['protocol']
@protocol.setter
def protocol(self, value):
self['protocol'] = value
@key_tag.setter
def key_tag(self, value):
self['key_tag'] = value
@property @property
def algorithm(self): def algorithm(self):
@ -102,12 +137,20 @@ class DsValue(EqualityTupleMixin, dict):
self['algorithm'] = value self['algorithm'] = value
@property @property
def public_key(self):
return self['public_key']
def digest_type(self):
return self['digest_type']
@digest_type.setter
def digest_type(self, value):
self['digest_type'] = value
@property
def digest(self):
return self['digest']
@public_key.setter
def public_key(self, value):
self['public_key'] = value
@digest.setter
def digest(self, value):
self['digest'] = value
@property @property
def data(self): def data(self):
@ -116,15 +159,15 @@ class DsValue(EqualityTupleMixin, dict):
@property @property
def rdata_text(self): def rdata_text(self):
return ( return (
f'{self.flags} {self.protocol} {self.algorithm} {self.public_key}'
f'{self.key_tag} {self.algorithm} {self.digest_type} {self.digest}'
) )
def _equality_tuple(self): def _equality_tuple(self):
return (self.flags, self.protocol, self.algorithm, self.public_key)
return (self.key_tag, self.algorithm, self.digest_type, self.digest)
def __repr__(self): def __repr__(self):
return ( return (
f'{self.flags} {self.protocol} {self.algorithm} {self.public_key}'
f'{self.key_tag} {self.algorithm} {self.digest_type} {self.digest}'
) )


+ 140
- 85
tests/test_octodns_record_ds.py View File

@ -12,52 +12,67 @@ from octodns.zone import Zone
class TestRecordDs(TestCase): class TestRecordDs(TestCase):
def test_ds(self): def test_ds(self):
for a, b in ( for a, b in (
# diff flags
# diff key_tag
( (
{ {
'flags': 0,
'protocol': 1,
'algorithm': 2,
'public_key': 'abcdef0123456',
'key_tag': 0,
'algorithm': 1,
'digest_type': 2,
'digest': 'abcdef0123456',
}, },
{ {
'flags': 1,
'protocol': 1,
'algorithm': 2,
'public_key': 'abcdef0123456',
'key_tag': 1,
'algorithm': 1,
'digest_type': 2,
'digest': 'abcdef0123456',
}, },
), ),
# diff protocol
# diff algorithm
( (
{ {
'flags': 0,
'protocol': 1,
'algorithm': 2,
'public_key': 'abcdef0123456',
'key_tag': 0,
'algorithm': 1,
'digest_type': 2,
'digest': 'abcdef0123456',
}, },
{ {
'flags': 0,
'protocol': 2,
'key_tag': 0,
'algorithm': 2, 'algorithm': 2,
'public_key': 'abcdef0123456',
'digest_type': 2,
'digest': 'abcdef0123456',
}, },
), ),
# diff algorithm
# diff digest_type
( (
{ {
'flags': 0,
'protocol': 1,
'algorithm': 2,
'public_key': 'abcdef0123456',
'key_tag': 0,
'algorithm': 1,
'digest_type': 2,
'digest': 'abcdef0123456',
}, },
{ {
'flags': 0,
'protocol': 1,
'algorithm': 3,
'public_key': 'abcdef0123456',
'key_tag': 0,
'algorithm': 1,
'digest_type': 3,
'digest': 'abcdef0123456',
},
),
# diff digest
(
{
'key_tag': 0,
'algorithm': 1,
'digest_type': 2,
'digest': 'abcdef0123456',
},
{
'key_tag': 0,
'algorithm': 1,
'digest_type': 2,
'digest': 'bcdef0123456a',
}, },
), ),
# diff public_key
# diff digest with previously used key names
( (
{ {
'flags': 0, 'flags': 0,
@ -66,10 +81,10 @@ class TestRecordDs(TestCase):
'public_key': 'abcdef0123456', 'public_key': 'abcdef0123456',
}, },
{ {
'flags': 0,
'protocol': 1,
'algorithm': 2,
'public_key': 'bcdef0123456a',
'key_tag': 0,
'algorithm': 1,
'digest_type': 2,
'digest': 'bcdef0123456a',
}, },
), ),
): ):
@ -104,73 +119,113 @@ class TestRecordDs(TestCase):
# things ints, will parse # things ints, will parse
self.assertEqual( self.assertEqual(
{ {
'flags': 'one',
'protocol': 'two',
'algorithm': 'three',
'public_key': 'key',
'key_tag': 'one',
'algorithm': 'two',
'digest_type': 'three',
'digest': 'key',
}, },
DsValue.parse_rdata_text('one two three key'), DsValue.parse_rdata_text('one two three key'),
) )
# valid # valid
data = { data = {
'flags': 0,
'protocol': 1,
'algorithm': 2,
'public_key': '99148c81',
'key_tag': 0,
'algorithm': 1,
'digest_type': 2,
'digest': '99148c81',
} }
self.assertEqual(data, DsValue.parse_rdata_text('0 1 2 99148c81')) self.assertEqual(data, DsValue.parse_rdata_text('0 1 2 99148c81'))
self.assertEqual([], DsValue.validate(data, 'DS')) self.assertEqual([], DsValue.validate(data, 'DS'))
# missing flags
data = {'protocol': 1, 'algorithm': 2, 'public_key': '99148c81'}
self.assertEqual(['missing flags'], DsValue.validate(data, 'DS'))
# invalid flags
# missing key_tag
data = {'algorithm': 1, 'digest_type': 2, 'digest': '99148c81'}
self.assertEqual(['missing key_tag'], DsValue.validate(data, 'DS'))
# invalid key_tag
data = { data = {
'flags': 'a',
'protocol': 1,
'algorithm': 2,
'public_key': '99148c81',
'key_tag': 'a',
'algorithm': 1,
'digest_type': 2,
'digest': '99148c81',
} }
self.assertEqual(['invalid flags "a"'], DsValue.validate(data, 'DS'))
# missing protocol
data = {'flags': 1, 'algorithm': 2, 'public_key': '99148c81'}
self.assertEqual(['missing protocol'], DsValue.validate(data, 'DS'))
# invalid protocol
data = {
'flags': 1,
'protocol': 'a',
'algorithm': 2,
'public_key': '99148c81',
}
self.assertEqual(['invalid protocol "a"'], DsValue.validate(data, 'DS'))
self.assertEqual(['invalid key_tag "a"'], DsValue.validate(data, 'DS'))
# missing algorithm # missing algorithm
data = {'flags': 1, 'protocol': 2, 'public_key': '99148c81'}
data = {'key_tag': 1, 'digest_type': 2, 'digest': '99148c81'}
self.assertEqual(['missing algorithm'], DsValue.validate(data, 'DS')) self.assertEqual(['missing algorithm'], DsValue.validate(data, 'DS'))
# invalid algorithm # invalid algorithm
data = { data = {
'flags': 1,
'protocol': 2,
'key_tag': 1,
'algorithm': 'a', 'algorithm': 'a',
'public_key': '99148c81',
'digest_type': 2,
'digest': '99148c81',
} }
self.assertEqual( self.assertEqual(
['invalid algorithm "a"'], DsValue.validate(data, 'DS') ['invalid algorithm "a"'], DsValue.validate(data, 'DS')
) )
# missing digest_type
data = {'key_tag': 1, 'algorithm': 2, 'digest': '99148c81'}
self.assertEqual(['missing digest_type'], DsValue.validate(data, 'DS'))
# invalid digest_type
data = {
'key_tag': 1,
'algorithm': 2,
'digest_type': 'a',
'digest': '99148c81',
}
self.assertEqual(
['invalid digest_type "a"'], DsValue.validate(data, 'DS')
)
# missing public_key (list)
data = {'key_tag': 1, 'algorithm': 2, 'digest_type': 3}
self.assertEqual(['missing digest'], DsValue.validate([data], 'DS'))
# do validations again with old field style
# missing flags (list)
data = {'protocol': 2, 'algorithm': 3, 'public_key': '99148c81'}
self.assertEqual(['missing flags'], DsValue.validate([data], 'DS'))
# missing protocol (list)
data = {'flags': 1, 'algorithm': 3, 'public_key': '99148c81'}
self.assertEqual(['missing protocol'], DsValue.validate([data], 'DS'))
# missing algorithm (list) # missing algorithm (list)
data = {'flags': 1, 'protocol': 2, 'algorithm': 3}
data = {'flags': 1, 'protocol': 2, 'public_key': '99148c81'}
self.assertEqual(['missing algorithm'], DsValue.validate([data], 'DS'))
# missing public_key (list)
data = {'flags': 1, 'algorithm': 3, 'protocol': 2}
self.assertEqual(['missing public_key'], DsValue.validate([data], 'DS')) self.assertEqual(['missing public_key'], DsValue.validate([data], 'DS'))
# missing public_key (list)
data = {'flags': 1, 'algorithm': 3, 'protocol': 2, 'digest': '99148c81'}
self.assertEqual(['missing public_key'], DsValue.validate([data], 'DS'))
# invalid flags, protocol and algorithm
data = {
'flags': 'a',
'protocol': 'a',
'algorithm': 'a',
'public_key': '99148c81',
}
self.assertEqual(
[
'invalid flags "a"',
'invalid protocol "a"',
'invalid algorithm "a"',
],
DsValue.validate(data, 'DS'),
)
zone = Zone('unit.tests.', []) zone = Zone('unit.tests.', [])
values = [ values = [
{ {
'flags': 0,
'protocol': 1,
'algorithm': 2,
'public_key': '99148c81',
'key_tag': 0,
'algorithm': 1,
'digest_type': 2,
'digest': '99148c81',
}, },
{ {
'flags': 1, 'flags': 1,
@ -180,26 +235,26 @@ class TestRecordDs(TestCase):
}, },
] ]
a = DsRecord(zone, 'ds', {'ttl': 32, 'values': values}) a = DsRecord(zone, 'ds', {'ttl': 32, 'values': values})
self.assertEqual(0, a.values[0].flags)
a.values[0].flags += 1
self.assertEqual(1, a.values[0].flags)
self.assertEqual(0, a.values[0].key_tag)
a.values[0].key_tag += 1
self.assertEqual(1, a.values[0].key_tag)
self.assertEqual(1, a.values[0].protocol)
a.values[0].protocol += 1
self.assertEqual(2, a.values[0].protocol)
self.assertEqual(2, a.values[0].algorithm)
self.assertEqual(1, a.values[0].algorithm)
a.values[0].algorithm += 1 a.values[0].algorithm += 1
self.assertEqual(3, a.values[0].algorithm)
self.assertEqual(2, a.values[0].algorithm)
self.assertEqual(2, a.values[0].digest_type)
a.values[0].digest_type += 1
self.assertEqual(3, a.values[0].digest_type)
self.assertEqual('99148c81', a.values[0].public_key)
a.values[0].public_key = '99148c42'
self.assertEqual('99148c42', a.values[0].public_key)
self.assertEqual('99148c81', a.values[0].digest)
a.values[0].digest = '99148c42'
self.assertEqual('99148c42', a.values[0].digest)
self.assertEqual(1, a.values[1].flags)
self.assertEqual(2, a.values[1].protocol)
self.assertEqual(3, a.values[1].algorithm)
self.assertEqual('99148c44', a.values[1].public_key)
self.assertEqual(1, a.values[1].key_tag)
self.assertEqual(2, a.values[1].algorithm)
self.assertEqual(3, a.values[1].digest_type)
self.assertEqual('99148c44', a.values[1].digest)
self.assertEqual(DsValue(values[1]), a.values[1].data) self.assertEqual(DsValue(values[1]), a.values[1].data)
self.assertEqual('1 2 3 99148c44', a.values[1].rdata_text) self.assertEqual('1 2 3 99148c44', a.values[1].rdata_text)


Loading…
Cancel
Save