Browse Source

Implmement Record.from_rrs and plug it together with parse_rdata_text

pull/930/head
Ross McFarland 3 years ago
parent
commit
0228e6822e
No known key found for this signature in database GPG Key ID: 943B179E15D3B22A
3 changed files with 159 additions and 233 deletions
  1. +86
    -30
      octodns/record/__init__.py
  2. +15
    -145
      octodns/source/axfr.py
  3. +58
    -58
      tests/test_octodns_record.py

+ 86
- 30
octodns/record/__init__.py View File

@ -2,6 +2,7 @@
#
#
from collections import defaultdict
from ipaddress import IPv4Address as _IPv4Address, IPv6Address as _IPv6Address
from logging import getLogger
import re
@ -84,6 +85,17 @@ class ValidationError(RecordException):
self.reasons = reasons
class Rr(object):
def __init__(self, name, _type, ttl, rdata):
self.name = name
self._type = _type
self.ttl = ttl
self.rdata = rdata
def __repr__(self):
return f'Rr<{self.name}, {self._type}, {self.ttl}, {self.rdata}'
class Record(EqualityTupleMixin):
log = getLogger('Record')
@ -171,6 +183,32 @@ class Record(EqualityTupleMixin):
pass
return reasons
@classmethod
def from_rrs(cls, zone, rrs, lenient=False):
from pprint import pprint
pprint({'zone': zone, 'rrs': rrs, 'lenient': lenient})
grouped = defaultdict(list)
for rr in rrs:
grouped[(rr.name, rr._type)].append(rr)
pprint({'grouped': grouped})
records = []
for _, rrs in sorted(grouped.items()):
rr = rrs[0]
name = zone.hostname_from_fqdn(rr.name)
_class = cls._CLASSES[rr._type]
pprint({'rr': rr, 'name': name, 'class': _class})
data = _class.data_from_rrs(rrs)
pprint({'data': data})
record = Record.new(zone, name, data, lenient=lenient)
records.append(record)
pprint({'records': records})
return records
def __init__(self, zone, name, data, source=None):
self.zone = zone
if name:
@ -342,6 +380,15 @@ class ValuesMixin(object):
return reasons
@classmethod
def data_from_rrs(cls, rrs):
values = [cls._value_type.parse_rdata_text(rr.rdata) for rr in rrs]
from pprint import pprint
pprint({'values': values})
rr = rrs[0]
return {'ttl': rr.ttl, 'type': rr._type, 'values': values}
def __init__(self, zone, name, data, source=None):
super(ValuesMixin, self).__init__(zone, name, data, source=source)
try:
@ -438,6 +485,15 @@ class ValueMixin(object):
)
return reasons
@classmethod
def data_from_rrs(cls, rrs):
rr = rrs[0]
return {
'ttl': rr.ttl,
'type': rr._type,
'value': cls._value_type.parse_rdata_text(rr.rdata),
}
def __init__(self, zone, name, data, source=None):
super(ValueMixin, self).__init__(zone, name, data, source=source)
self.value = self._value_type.process(data['value'])
@ -791,12 +847,12 @@ class _DynamicMixin(object):
class _TargetValue(str):
@classmethod
def parse_rr_text(self, value):
def parse_rdata_text(self, value):
return value
@classmethod
def validate(cls, data, _type):
# no need to call parse_rr_text since it's a noop
# no need to call parse_rdata_text since it's a noop
reasons = []
if data == '':
reasons.append('empty value')
@ -812,7 +868,7 @@ class _TargetValue(str):
@classmethod
def process(cls, value):
# no need to call parse_rr_text since it's a noop
# no need to call parse_rdata_text since it's a noop
if value:
return cls(value.lower())
return None
@ -832,7 +888,7 @@ class DnameValue(_TargetValue):
class _IpAddress(str):
@classmethod
def parse_rr_text(cls, value):
def parse_rdata_text(cls, value):
return value
@classmethod
@ -843,7 +899,7 @@ class _IpAddress(str):
return ['missing value(s)']
reasons = []
for value in data:
# no need to call parse_rr_text here as it's a noop
# no need to call parse_rdata_text here as it's a noop
if value == '':
reasons.append('empty value')
elif value is None:
@ -866,7 +922,7 @@ class _IpAddress(str):
return [cls(v) if v != '' else '' for v in values]
def __new__(cls, v):
# no need to call parse_rr_text here as it's a noop
# no need to call parse_rdata_text here as it's a noop
v = str(cls._address_type(v))
return super().__new__(cls, v)
@ -925,7 +981,7 @@ class CaaValue(EqualityTupleMixin, dict):
# https://tools.ietf.org/html/rfc6844#page-5
@classmethod
def parse_rr_text(cls, value):
def parse_rdata_text(cls, value):
try:
flags, tag, value = value.split(' ')
except ValueError:
@ -945,7 +1001,7 @@ class CaaValue(EqualityTupleMixin, dict):
if isinstance(value, str):
# it's hopefully RR formatted, give parsing a try
try:
value = cls.parse_rr_text(value)
value = cls.parse_rdata_text(value)
except RrParseError as e:
reasons.append(str(e))
# not a dict so no point in continuing
@ -969,7 +1025,7 @@ class CaaValue(EqualityTupleMixin, dict):
def __init__(self, value):
if isinstance(value, str):
value = self.parse_rr_text(value)
value = self.parse_rdata_text(value)
super().__init__(
{
'flags': int(value.get('flags', 0)),
@ -1056,7 +1112,7 @@ class LocValue(EqualityTupleMixin, dict):
# https://www.rfc-editor.org/rfc/rfc1876.html
@classmethod
def parse_rr_text(cls, value):
def parse_rdata_text(cls, value):
try:
value = value.replace('m', '')
(
@ -1157,7 +1213,7 @@ class LocValue(EqualityTupleMixin, dict):
if isinstance(value, str):
# it's hopefully RR formatted, give parsing a try
try:
value = cls.parse_rr_text(value)
value = cls.parse_rdata_text(value)
except RrParseError as e:
reasons.append(str(e))
# not a dict so no point in continuing
@ -1235,7 +1291,7 @@ class LocValue(EqualityTupleMixin, dict):
def __init__(self, value):
if isinstance(value, str):
value = self.parse_rr_text(value)
value = self.parse_rdata_text(value)
super().__init__(
{
'lat_degrees': int(value['lat_degrees']),
@ -1412,7 +1468,7 @@ Record.register_type(LocRecord)
class MxValue(EqualityTupleMixin, dict):
@classmethod
def parse_rr_text(cls, value):
def parse_rdata_text(cls, value):
try:
preference, exchange = value.split(' ')
except ValueError:
@ -1432,7 +1488,7 @@ class MxValue(EqualityTupleMixin, dict):
if isinstance(value, str):
# it's hopefully RR formatted, give parsing a try
try:
value = cls.parse_rr_text(value)
value = cls.parse_rdata_text(value)
except RrParseError as e:
reasons.append(str(e))
# not a dict so no point in continuing
@ -1469,7 +1525,7 @@ class MxValue(EqualityTupleMixin, dict):
def __init__(self, value):
if isinstance(value, str):
value = self.parse_rr_text(value)
value = self.parse_rdata_text(value)
# RFC1035 says preference, half the providers use priority
try:
preference = value['preference']
@ -1530,7 +1586,7 @@ class NaptrValue(EqualityTupleMixin, dict):
VALID_FLAGS = ('S', 'A', 'U', 'P')
@classmethod
def parse_rr_text(cls, value):
def parse_rdata_text(cls, value):
try:
(
order,
@ -1565,7 +1621,7 @@ class NaptrValue(EqualityTupleMixin, dict):
if isinstance(value, str):
# it's hopefully RR formatted, give parsing a try
try:
value = cls.parse_rr_text(value)
value = cls.parse_rdata_text(value)
except RrParseError as e:
reasons.append(str(e))
# not a dict so no point in continuing
@ -1602,7 +1658,7 @@ class NaptrValue(EqualityTupleMixin, dict):
def __init__(self, value):
if isinstance(value, str):
value = self.parse_rr_text(value)
value = self.parse_rdata_text(value)
super().__init__(
{
'order': int(value['order']),
@ -1703,7 +1759,7 @@ Record.register_type(NaptrRecord)
class _NsValue(str):
@classmethod
def parse_rr_text(cls, value):
def parse_rdata_text(cls, value):
return value
@classmethod
@ -1714,7 +1770,7 @@ class _NsValue(str):
data = (data,)
reasons = []
for value in data:
# no need to consider parse_rr_text as it's a noop
# no need to consider parse_rdata_text as it's a noop
if not FQDN(str(value), allow_underscores=True).is_valid:
reasons.append(
f'Invalid NS value "{value}" is not a valid FQDN.'
@ -1781,7 +1837,7 @@ class SshfpValue(EqualityTupleMixin, dict):
VALID_FINGERPRINT_TYPES = (1, 2)
@classmethod
def parse_rr_text(self, value):
def parse_rdata_text(self, value):
try:
algorithm, fingerprint_type, fingerprint = value.split(' ')
except ValueError:
@ -1809,7 +1865,7 @@ class SshfpValue(EqualityTupleMixin, dict):
if isinstance(value, str):
# it's hopefully RR formatted, give parsing a try
try:
value = cls.parse_rr_text(value)
value = cls.parse_rdata_text(value)
except RrParseError as e:
reasons.append(str(e))
# not a dict so no point in continuing
@ -1844,7 +1900,7 @@ class SshfpValue(EqualityTupleMixin, dict):
def __init__(self, value):
if isinstance(value, str):
value = self.parse_rr_text(value)
value = self.parse_rdata_text(value)
super().__init__(
{
'algorithm': int(value['algorithm']),
@ -1939,7 +1995,7 @@ class _ChunkedValue(str):
data = (data,)
reasons = []
for value in data:
# no need to try parse_rr_text here as it's a noop
# no need to try parse_rdata_text here as it's a noop
if cls._unescaped_semicolon_re.search(value):
reasons.append(f'unescaped ; in "{value}"')
return reasons
@ -1968,7 +2024,7 @@ Record.register_type(SpfRecord)
class SrvValue(EqualityTupleMixin, dict):
@classmethod
def parse_rr_text(self, value):
def parse_rdata_text(self, value):
try:
priority, weight, port, target = value.split(' ')
except ValueError:
@ -2001,7 +2057,7 @@ class SrvValue(EqualityTupleMixin, dict):
if isinstance(value, str):
# it's hopefully RR formatted, give parsing a try
try:
value = cls.parse_rr_text(value)
value = cls.parse_rdata_text(value)
except RrParseError as e:
reasons.append(str(e))
# not a dict so no point in continuing
@ -2046,7 +2102,7 @@ class SrvValue(EqualityTupleMixin, dict):
def __init__(self, value):
if isinstance(value, str):
value = self.parse_rr_text(value)
value = self.parse_rdata_text(value)
super().__init__(
{
'priority': int(value['priority']),
@ -2121,7 +2177,7 @@ Record.register_type(SrvRecord)
class TlsaValue(EqualityTupleMixin, dict):
@classmethod
def parse_rr_text(self, value):
def parse_rdata_text(self, value):
try:
(
certificate_usage,
@ -2159,7 +2215,7 @@ class TlsaValue(EqualityTupleMixin, dict):
if isinstance(value, str):
# it's hopefully RR formatted, give parsing a try
try:
value = cls.parse_rr_text(value)
value = cls.parse_rdata_text(value)
except RrParseError as e:
reasons.append(str(e))
# not a dict so no point in continuing
@ -2208,7 +2264,7 @@ class TlsaValue(EqualityTupleMixin, dict):
def __init__(self, value):
if isinstance(value, str):
value = self.parse_rr_text(value)
value = self.parse_rdata_text(value)
super().__init__(
{
'certificate_usage': int(value.get('certificate_usage', 0)),


+ 15
- 145
octodns/source/axfr.py View File

@ -9,12 +9,11 @@ import dns.rdatatype
from dns.exception import DNSException
from collections import defaultdict
from os import listdir
from os.path import join
import logging
from ..record import Record
from ..record import Record, Rr
from .base import BaseSource
@ -42,110 +41,6 @@ class AxfrBaseSource(BaseSource):
def __init__(self, id):
super(AxfrBaseSource, self).__init__(id)
def _data_for_multiple(self, _type, records):
return {
'ttl': records[0]['ttl'],
'type': _type,
'values': [r['value'] for r in records],
}
_data_for_A = _data_for_multiple
_data_for_AAAA = _data_for_multiple
_data_for_NS = _data_for_multiple
_data_for_PTR = _data_for_multiple
def _data_for_CAA(self, _type, records):
values = []
for record in records:
flags, tag, value = record['value'].split(' ', 2)
values.append(
{'flags': flags, 'tag': tag, 'value': value.replace('"', '')}
)
return {'ttl': records[0]['ttl'], 'type': _type, 'values': values}
def _data_for_LOC(self, _type, records):
values = []
for record in records:
(
lat_degrees,
lat_minutes,
lat_seconds,
lat_direction,
long_degrees,
long_minutes,
long_seconds,
long_direction,
altitude,
size,
precision_horz,
precision_vert,
) = (record['value'].replace('m', '').split(' ', 11))
values.append(
{
'lat_degrees': lat_degrees,
'lat_minutes': lat_minutes,
'lat_seconds': lat_seconds,
'lat_direction': lat_direction,
'long_degrees': long_degrees,
'long_minutes': long_minutes,
'long_seconds': long_seconds,
'long_direction': long_direction,
'altitude': altitude,
'size': size,
'precision_horz': precision_horz,
'precision_vert': precision_vert,
}
)
return {'ttl': records[0]['ttl'], 'type': _type, 'values': values}
def _data_for_MX(self, _type, records):
values = []
for record in records:
preference, exchange = record['value'].split(' ', 1)
values.append({'preference': preference, 'exchange': exchange})
return {'ttl': records[0]['ttl'], 'type': _type, 'values': values}
def _data_for_TXT(self, _type, records):
values = [value['value'].replace(';', '\\;') for value in records]
return {'ttl': records[0]['ttl'], 'type': _type, 'values': values}
_data_for_SPF = _data_for_TXT
def _data_for_single(self, _type, records):
record = records[0]
return {'ttl': record['ttl'], 'type': _type, 'value': record['value']}
_data_for_CNAME = _data_for_single
def _data_for_SRV(self, _type, records):
values = []
for record in records:
priority, weight, port, target = record['value'].split(' ', 3)
values.append(
{
'priority': priority,
'weight': weight,
'port': port,
'target': target,
}
)
return {'type': _type, 'ttl': records[0]['ttl'], 'values': values}
def _data_for_SSHFP(self, _type, records):
values = []
for record in records:
algorithm, fingerprint_type, fingerprint = record['value'].split(
' ', 2
)
values.append(
{
'algorithm': algorithm,
'fingerprint_type': fingerprint_type,
'fingerprint': fingerprint,
}
)
return {'type': _type, 'ttl': records[0]['ttl'], 'values': values}
def populate(self, zone, target=False, lenient=False):
self.log.debug(
'populate: name=%s, target=%s, lenient=%s',
@ -154,26 +49,10 @@ class AxfrBaseSource(BaseSource):
lenient,
)
values = defaultdict(lambda: defaultdict(list))
for record in self.zone_records(zone):
_type = record['type']
if _type not in self.SUPPORTS:
continue
name = zone.hostname_from_fqdn(record['name'])
values[name][record['type']].append(record)
before = len(zone.records)
for name, types in values.items():
for _type, records in types.items():
data_for = getattr(self, f'_data_for_{_type}')
record = Record.new(
zone,
name,
data_for(_type, records),
source=self,
lenient=lenient,
)
zone.add_record(record, lenient=lenient)
rrs = self.zone_records(zone)
for record in Record.from_rrs(zone, rrs, lenient=lenient):
zone.add_record(record, lenient=lenient)
self.log.info(
'populate: found %s records', len(zone.records) - before
@ -220,14 +99,8 @@ class AxfrSource(AxfrBaseSource):
for (name, ttl, rdata) in z.iterate_rdatas():
rdtype = dns.rdatatype.to_text(rdata.rdtype)
records.append(
{
"name": name.to_text(),
"ttl": ttl,
"type": rdtype,
"value": rdata.to_text(),
}
)
if rdtype in self.SUPPORTS:
records.append(Rr(name.to_text(), rdtype, ttl, rdata.to_text()))
return records
@ -304,20 +177,17 @@ class ZoneFileSource(AxfrBaseSource):
if zone.name not in self._zone_records:
try:
z = self._load_zone_file(zone.name)
records = []
for (name, ttl, rdata) in z.iterate_rdatas():
rdtype = dns.rdatatype.to_text(rdata.rdtype)
except ZoneFileSourceNotFound:
return []
records = []
for (name, ttl, rdata) in z.iterate_rdatas():
rdtype = dns.rdatatype.to_text(rdata.rdtype)
if rdtype in self.SUPPORTS:
records.append(
{
"name": name.to_text(),
"ttl": ttl,
"type": rdtype,
"value": rdata.to_text(),
}
Rr(name.to_text(), rdtype, ttl, rdata.to_text())
)
self._zone_records[zone.name] = records
except ZoneFileSourceNotFound:
return []
self._zone_records[zone.name] = records
return self._zone_records[zone.name]

+ 58
- 58
tests/test_octodns_record.py View File

@ -235,10 +235,10 @@ class TestRecord(TestCase):
'1.2.word.4',
'1.2.3.4',
):
self.assertEqual(s, Ipv4Address.parse_rr_text(s))
self.assertEqual(s, Ipv4Address.parse_rdata_text(s))
# since we're a noop there's no need/way to check whether validate or
# __init__ call parse_rr_text
# __init__ call parse_rdata_text
zone = Zone('unit.tests.', [])
a = ARecord(zone, 'a', {'ttl': 42, 'value': '1.2.3.4'})
@ -426,10 +426,10 @@ class TestRecord(TestCase):
'1.2.word.4',
'1.2.3.4',
):
self.assertEqual(s, _TargetValue.parse_rr_text(s))
self.assertEqual(s, _TargetValue.parse_rdata_text(s))
# since we're a noop there's no need/way to check whether validate or
# __init__ call parse_rr_text
# __init__ call parse_rdata_text
zone = Zone('unit.tests.', [])
a = AliasRecord(zone, 'a', {'ttl': 42, 'value': 'some.target.'})
@ -498,33 +498,33 @@ class TestRecord(TestCase):
def test_caa_value_rr_text(self):
# empty string won't parse
with self.assertRaises(RrParseError):
CaaValue.parse_rr_text('')
CaaValue.parse_rdata_text('')
# single word won't parse
with self.assertRaises(RrParseError):
CaaValue.parse_rr_text('nope')
CaaValue.parse_rdata_text('nope')
# 2nd word won't parse
with self.assertRaises(RrParseError):
CaaValue.parse_rr_text('0 tag')
CaaValue.parse_rdata_text('0 tag')
# 4th word won't parse
with self.assertRaises(RrParseError):
CaaValue.parse_rr_text('1 tag value another')
CaaValue.parse_rdata_text('1 tag value another')
# flags not an int, will parse
self.assertEqual(
{'flags': 'one', 'tag': 'tag', 'value': 'value'},
CaaValue.parse_rr_text('one tag value'),
CaaValue.parse_rdata_text('one tag value'),
)
# valid
self.assertEqual(
{'flags': 0, 'tag': 'tag', 'value': '99148c81'},
CaaValue.parse_rr_text('0 tag 99148c81'),
CaaValue.parse_rdata_text('0 tag 99148c81'),
)
# make sure that validate is using parse_rr_text when passed string
# make sure that validate is using parse_rdata_text when passed string
# value(s)
reasons = CaaRecord.validate(
'caa', 'caa.unit.tests.', {'ttl': 32, 'value': ''}
@ -539,7 +539,7 @@ class TestRecord(TestCase):
)
self.assertFalse(reasons)
# make sure that the cstor is using parse_rr_text
# make sure that the cstor is using parse_rdata_text
zone = Zone('unit.tests.', [])
a = CaaRecord(zone, 'caa', {'ttl': 32, 'value': '0 tag 99148c81'})
self.assertEqual(0, a.values[0].flags)
@ -667,7 +667,7 @@ class TestRecord(TestCase):
for i in tuple(range(0, 12)) + (13,):
s = ''.join(['word'] * i)
with self.assertRaises(RrParseError):
LocValue.parse_rr_text(s)
LocValue.parse_rdata_text(s)
# type conversions are best effort
self.assertEqual(
@ -685,7 +685,7 @@ class TestRecord(TestCase):
'precision_vert': 'nine',
'size': 'seven',
},
LocValue.parse_rr_text(
LocValue.parse_rdata_text(
'zero one two S three four five W six seven eight nine'
),
)
@ -707,10 +707,10 @@ class TestRecord(TestCase):
'precision_vert': 9.9,
'size': 7.7,
},
LocValue.parse_rr_text(s),
LocValue.parse_rdata_text(s),
)
# make sure validate is using parse_rr_text when passed string values
# make sure validate is using parse_rdata_text when passed string values
reasons = LocRecord.validate(
'loc', 'loc.unit.tests', {'ttl': 42, 'value': ''}
)
@ -720,7 +720,7 @@ class TestRecord(TestCase):
)
self.assertFalse(reasons)
# make sure that the cstor is using parse_rr_text
# make sure that the cstor is using parse_rdata_text
zone = Zone('unit.tests.', [])
a = LocRecord(zone, 'mx', {'ttl': 32, 'value': s})
self.assertEqual(0, a.values[0].lat_degrees)
@ -792,29 +792,29 @@ class TestRecord(TestCase):
# empty string won't parse
with self.assertRaises(RrParseError):
MxValue.parse_rr_text('')
MxValue.parse_rdata_text('')
# single word won't parse
with self.assertRaises(RrParseError):
MxValue.parse_rr_text('nope')
MxValue.parse_rdata_text('nope')
# 3rd word won't parse
with self.assertRaises(RrParseError):
MxValue.parse_rr_text('10 mx.unit.tests. another')
MxValue.parse_rdata_text('10 mx.unit.tests. another')
# preference not an int
self.assertEqual(
{'preference': 'abc', 'exchange': 'mx.unit.tests.'},
MxValue.parse_rr_text('abc mx.unit.tests.'),
MxValue.parse_rdata_text('abc mx.unit.tests.'),
)
# valid
self.assertEqual(
{'preference': 10, 'exchange': 'mx.unit.tests.'},
MxValue.parse_rr_text('10 mx.unit.tests.'),
MxValue.parse_rdata_text('10 mx.unit.tests.'),
)
# make sure that validate is using parse_rr_text when passed string
# make sure that validate is using parse_rdata_text when passed string
# value(s)
reasons = MxRecord.validate(
'mx', 'mx.unit.tests.', {'ttl': 32, 'value': ''}
@ -829,7 +829,7 @@ class TestRecord(TestCase):
)
self.assertFalse(reasons)
# make sure that the cstor is using parse_rr_text
# make sure that the cstor is using parse_rdata_text
zone = Zone('unit.tests.', [])
a = MxRecord(zone, 'mx', {'ttl': 32, 'value': '10 mail.unit.tests.'})
self.assertEqual(10, a.values[0].preference)
@ -1152,7 +1152,7 @@ class TestRecord(TestCase):
'one two three four five six seven',
):
with self.assertRaises(RrParseError):
NaptrValue.parse_rr_text(v)
NaptrValue.parse_rdata_text(v)
# we don't care if the types of things are correct when parsing rr text
self.assertEqual(
@ -1164,7 +1164,7 @@ class TestRecord(TestCase):
'regexp': 'five',
'replacement': 'six',
},
NaptrValue.parse_rr_text('one two three four five six'),
NaptrValue.parse_rdata_text('one two three four five six'),
)
# order and preference will be converted to int's when possible
@ -1177,10 +1177,10 @@ class TestRecord(TestCase):
'regexp': 'five',
'replacement': 'six',
},
NaptrValue.parse_rr_text('1 2 three four five six'),
NaptrValue.parse_rdata_text('1 2 three four five six'),
)
# make sure that validate is using parse_rr_text when passed string
# make sure that validate is using parse_rdata_text when passed string
# value(s)
reasons = NaptrRecord.validate(
'naptr', 'naptr.unit.tests.', {'ttl': 32, 'value': ''}
@ -1197,7 +1197,7 @@ class TestRecord(TestCase):
)
self.assertFalse(reasons)
# make sure that the cstor is using parse_rr_text
# make sure that the cstor is using parse_rdata_text
zone = Zone('unit.tests.', [])
s = '1 2 S service regexp replacement'
a = NaptrRecord(zone, 'naptr', {'ttl': 32, 'value': s})
@ -1238,10 +1238,10 @@ class TestRecord(TestCase):
'1.2.word.4',
'1.2.3.4',
):
self.assertEqual(s, _NsValue.parse_rr_text(s))
self.assertEqual(s, _NsValue.parse_rdata_text(s))
# since we're a noop there's no need/way to check whether validate or
# __init__ call parse_rr_text
# __init__ call parse_rdata_text
zone = Zone('unit.tests.', [])
a = NsRecord(zone, 'a', {'ttl': 42, 'value': 'some.target.'})
@ -1319,15 +1319,15 @@ class TestRecord(TestCase):
# empty string won't parse
with self.assertRaises(RrParseError):
SshfpValue.parse_rr_text('')
SshfpValue.parse_rdata_text('')
# single word won't parse
with self.assertRaises(RrParseError):
SshfpValue.parse_rr_text('nope')
SshfpValue.parse_rdata_text('nope')
# 3rd word won't parse
with self.assertRaises(RrParseError):
SshfpValue.parse_rr_text('0 1 00479b27 another')
SshfpValue.parse_rdata_text('0 1 00479b27 another')
# algorithm and fingerprint_type not ints
self.assertEqual(
@ -1336,16 +1336,16 @@ class TestRecord(TestCase):
'fingerprint_type': 'two',
'fingerprint': '00479b27',
},
SshfpValue.parse_rr_text('one two 00479b27'),
SshfpValue.parse_rdata_text('one two 00479b27'),
)
# valid
self.assertEqual(
{'algorithm': 1, 'fingerprint_type': 2, 'fingerprint': '00479b27'},
SshfpValue.parse_rr_text('1 2 00479b27'),
SshfpValue.parse_rdata_text('1 2 00479b27'),
)
# make sure that validate is using parse_rr_text when passed string
# make sure that validate is using parse_rdata_text when passed string
# value(s)
reasons = SshfpRecord.validate(
'sshfp', 'sshfp.unit.tests.', {'ttl': 32, 'value': ''}
@ -1360,7 +1360,7 @@ class TestRecord(TestCase):
)
self.assertFalse(reasons)
# make sure that the cstor is using parse_rr_text
# make sure that the cstor is using parse_rdata_text
zone = Zone('unit.tests.', [])
a = SshfpRecord(zone, 'sshfp', {'ttl': 32, 'value': '1 2 00479b27'})
self.assertEqual(1, a.values[0].algorithm)
@ -1386,10 +1386,10 @@ class TestRecord(TestCase):
'1.2.word.4',
'1.2.3.4',
):
self.assertEqual(s, _ChunkedValue.parse_rr_text(s))
self.assertEqual(s, _ChunkedValue.parse_rdata_text(s))
# since we're a noop there's no need/way to check whether validate or
# __init__ call parse_rr_text
# __init__ call parse_rdata_text
zone = Zone('unit.tests.', [])
a = SpfRecord(zone, 'a', {'ttl': 42, 'value': 'some.target.'})
@ -1463,23 +1463,23 @@ class TestRecord(TestCase):
# empty string won't parse
with self.assertRaises(RrParseError):
SrvValue.parse_rr_text('')
SrvValue.parse_rdata_text('')
# single word won't parse
with self.assertRaises(RrParseError):
SrvValue.parse_rr_text('nope')
SrvValue.parse_rdata_text('nope')
# 2nd word won't parse
with self.assertRaises(RrParseError):
SrvValue.parse_rr_text('1 2')
SrvValue.parse_rdata_text('1 2')
# 3rd word won't parse
with self.assertRaises(RrParseError):
SrvValue.parse_rr_text('1 2 3')
SrvValue.parse_rdata_text('1 2 3')
# 5th word won't parse
with self.assertRaises(RrParseError):
SrvValue.parse_rr_text('1 2 3 4 5')
SrvValue.parse_rdata_text('1 2 3 4 5')
# priority weight and port not ints
self.assertEqual(
@ -1489,7 +1489,7 @@ class TestRecord(TestCase):
'port': 'three',
'target': 'srv.unit.tests.',
},
SrvValue.parse_rr_text('one two three srv.unit.tests.'),
SrvValue.parse_rdata_text('one two three srv.unit.tests.'),
)
# valid
@ -1500,10 +1500,10 @@ class TestRecord(TestCase):
'port': 3,
'target': 'srv.unit.tests.',
},
SrvValue.parse_rr_text('1 2 3 srv.unit.tests.'),
SrvValue.parse_rdata_text('1 2 3 srv.unit.tests.'),
)
# make sure that validate is using parse_rr_text when passed string
# make sure that validate is using parse_rdata_text when passed string
# value(s)
reasons = SrvRecord.validate(
'_srv._tcp', '_srv._tcp.unit.tests.', {'ttl': 32, 'value': ''}
@ -1516,7 +1516,7 @@ class TestRecord(TestCase):
)
self.assertFalse(reasons)
# make sure that the cstor is using parse_rr_text
# make sure that the cstor is using parse_rdata_text
zone = Zone('unit.tests.', [])
a = SrvRecord(
zone, '_srv._tcp', {'ttl': 32, 'value': '1 2 3 srv.unit.tests.'}
@ -1631,23 +1631,23 @@ class TestRecord(TestCase):
# empty string won't parse
with self.assertRaises(RrParseError):
TlsaValue.parse_rr_text('')
TlsaValue.parse_rdata_text('')
# single word won't parse
with self.assertRaises(RrParseError):
TlsaValue.parse_rr_text('nope')
TlsaValue.parse_rdata_text('nope')
# 2nd word won't parse
with self.assertRaises(RrParseError):
TlsaValue.parse_rr_text('1 2')
TlsaValue.parse_rdata_text('1 2')
# 3rd word won't parse
with self.assertRaises(RrParseError):
TlsaValue.parse_rr_text('1 2 3')
TlsaValue.parse_rdata_text('1 2 3')
# 5th word won't parse
with self.assertRaises(RrParseError):
TlsaValue.parse_rr_text('1 2 3 abcd another')
TlsaValue.parse_rdata_text('1 2 3 abcd another')
# non-ints
self.assertEqual(
@ -1657,7 +1657,7 @@ class TestRecord(TestCase):
'matching_type': 'three',
'certificate_association_data': 'abcd',
},
TlsaValue.parse_rr_text('one two three abcd'),
TlsaValue.parse_rdata_text('one two three abcd'),
)
# valid
@ -1668,10 +1668,10 @@ class TestRecord(TestCase):
'matching_type': 3,
'certificate_association_data': 'abcd',
},
TlsaValue.parse_rr_text('1 2 3 abcd'),
TlsaValue.parse_rdata_text('1 2 3 abcd'),
)
# make sure that validate is using parse_rr_text when passed string
# make sure that validate is using parse_rdata_text when passed string
# value(s)
reasons = TlsaRecord.validate(
'tlsa', 'tlsa.unit.tests.', {'ttl': 32, 'value': ''}
@ -1682,7 +1682,7 @@ class TestRecord(TestCase):
)
self.assertFalse(reasons)
# make sure that the cstor is using parse_rr_text
# make sure that the cstor is using parse_rdata_text
zone = Zone('unit.tests.', [])
a = TlsaRecord(zone, 'tlsa', {'ttl': 32, 'value': '2 1 0 abcd'})
self.assertEqual(2, a.values[0].certificate_usage)


Loading…
Cancel
Save