Browse Source

Merge branch 'main' into host-from-fqdn-idna

pull/939/head
Ross McFarland 3 years ago
committed by GitHub
parent
commit
2d0be4147c
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 940 additions and 349 deletions
  1. +4
    -0
      CHANGELOG.md
  2. +1
    -1
      CONTRIBUTING.md
  3. +2
    -2
      octodns/cmds/args.py
  4. +2
    -4
      octodns/cmds/report.py
  5. +1
    -1
      octodns/processor/acme.py
  6. +1
    -1
      octodns/processor/ownership.py
  7. +1
    -1
      octodns/provider/base.py
  8. +1
    -1
      octodns/provider/plan.py
  9. +2
    -2
      octodns/provider/yaml.py
  10. +507
    -193
      octodns/record/__init__.py
  11. +6
    -8
      octodns/source/axfr.py
  12. +2
    -4
      octodns/source/envvar.py
  13. +2
    -2
      octodns/source/tinydns.py
  14. +5
    -0
      octodns/yaml.py
  15. +2
    -2
      tests/helpers.py
  16. +2
    -2
      tests/test_octodns_provider_base.py
  17. +399
    -125
      tests/test_octodns_record.py

+ 4
- 0
CHANGELOG.md View File

@ -11,6 +11,8 @@
decoded form. Both forms should be accepted in command line arguments.
Providers may need to be updated to display the decoded form in their logs,
until then they'd display the IDNA version.
* IDNA value support for Record types that hold FQDNs: ALIAS, CNAME, DNAME, PTR,
MX, NS, and SRV.
* Support for configuring global processors that apply to all zones with
`manager.processors`
@ -30,6 +32,8 @@
* Add TtlRestrictionFilter processor for adding ttl restriction/checking
* NameAllowlistFilter & NameRejectlistFilter implementations to support
filtering on record names to include/exclude records from management.
* All Record values are now first class objects. This shouldn't be an externally
visible change, but will enable future improvements.
## v0.9.19 - 2022-08-14 - Subzone handling


+ 1
- 1
CONTRIBUTING.md View File

@ -8,7 +8,7 @@ If you have questions, or you'd like to check with us before embarking on a majo
## How to contribute
This project uses the [GitHub Flow](https://guides.github.com/introduction/flow/). That means that the `master` branch is stable and new development is done in feature branches. Feature branches are merged into the `master` branch via a Pull Request.
This project uses the [GitHub Flow](https://guides.github.com/introduction/flow/). That means that the `main` branch is stable and new development is done in feature branches. Feature branches are merged into the `main` branch via a Pull Request.
0. Fork and clone the repository
0. Configure and install the dependencies: `./script/bootstrap`


+ 2
- 2
octodns/cmds/args.py View File

@ -18,7 +18,7 @@ class ArgumentParser(_Base):
'''
def __init__(self, *args, **kwargs):
super(ArgumentParser, self).__init__(*args, **kwargs)
super().__init__(*args, **kwargs)
def parse_args(self, default_log_level=INFO):
version = f'octoDNS {__VERSION__}'
@ -50,7 +50,7 @@ class ArgumentParser(_Base):
'--debug', action='store_true', default=False, help=_help
)
args = super(ArgumentParser, self).parse_args()
args = super().parse_args()
self._setup_logging(args, default_log_level)
return args


+ 2
- 4
octodns/cmds/report.py View File

@ -16,13 +16,11 @@ from octodns.manager import Manager
class AsyncResolver(Resolver):
def __init__(self, num_workers, *args, **kwargs):
super(AsyncResolver, self).__init__(*args, **kwargs)
super().__init__(*args, **kwargs)
self.executor = ThreadPoolExecutor(max_workers=num_workers)
def query(self, *args, **kwargs):
return self.executor.submit(
super(AsyncResolver, self).query, *args, **kwargs
)
return self.executor.submit(super().query, *args, **kwargs)
def main():


+ 1
- 1
octodns/processor/acme.py View File

@ -25,7 +25,7 @@ class AcmeMangingProcessor(BaseProcessor):
- acme
...
'''
super(AcmeMangingProcessor, self).__init__(name)
super().__init__(name)
self._owned = set()


+ 1
- 1
octodns/processor/ownership.py View File

@ -15,7 +15,7 @@ from .base import BaseProcessor
# and thus "own" them going forward.
class OwnershipProcessor(BaseProcessor):
def __init__(self, name, txt_name='_owner', txt_value='*octodns*'):
super(OwnershipProcessor, self).__init__(name)
super().__init__(name)
self.txt_name = txt_name
self.txt_value = txt_value
self._txt_values = [txt_value]


+ 1
- 1
octodns/provider/base.py View File

@ -17,7 +17,7 @@ class BaseProvider(BaseSource):
delete_pcent_threshold=Plan.MAX_SAFE_DELETE_PCENT,
strict_supports=False,
):
super(BaseProvider, self).__init__(id)
super().__init__(id)
self.log.debug(
'__init__: id=%s, apply_disabled=%s, '
'update_pcent_threshold=%.2f, '


+ 1
- 1
octodns/provider/plan.py View File

@ -136,7 +136,7 @@ class _PlanOutput(object):
class PlanLogger(_PlanOutput):
def __init__(self, name, level='info'):
super(PlanLogger, self).__init__(name)
super().__init__(name)
try:
self.level = {
'debug': DEBUG,


+ 2
- 2
octodns/provider/yaml.py View File

@ -128,7 +128,7 @@ class YamlProvider(BaseProvider):
enforce_order,
populate_should_replace,
)
super(YamlProvider, self).__init__(id, *args, **kwargs)
super().__init__(id, *args, **kwargs)
self.directory = directory
self.default_ttl = default_ttl
self.enforce_order = enforce_order
@ -311,7 +311,7 @@ class SplitYamlProvider(YamlProvider):
CATCHALL_RECORD_NAMES = ('*', '')
def __init__(self, id, directory, extension='.', *args, **kwargs):
super(SplitYamlProvider, self).__init__(id, directory, *args, **kwargs)
super().__init__(id, directory, *args, **kwargs)
self.extension = extension
def _zone_directory(self, zone):


+ 507
- 193
octodns/record/__init__.py
File diff suppressed because it is too large
View File


+ 6
- 8
octodns/source/axfr.py View File

@ -40,7 +40,7 @@ class AxfrBaseSource(BaseSource):
)
def __init__(self, id):
super(AxfrBaseSource, self).__init__(id)
super().__init__(id)
def _data_for_multiple(self, _type, records):
return {
@ -186,9 +186,7 @@ class AxfrSourceException(Exception):
class AxfrSourceZoneTransferFailed(AxfrSourceException):
def __init__(self):
super(AxfrSourceZoneTransferFailed, self).__init__(
'Unable to Perform Zone Transfer'
)
super().__init__('Unable to Perform Zone Transfer')
class AxfrSource(AxfrBaseSource):
@ -204,7 +202,7 @@ class AxfrSource(AxfrBaseSource):
def __init__(self, id, master):
self.log = logging.getLogger(f'AxfrSource[{id}]')
self.log.debug('__init__: id=%s, master=%s', id, master)
super(AxfrSource, self).__init__(id)
super().__init__(id)
self.master = master
def zone_records(self, zone):
@ -238,12 +236,12 @@ class ZoneFileSourceException(Exception):
class ZoneFileSourceNotFound(ZoneFileSourceException):
def __init__(self):
super(ZoneFileSourceNotFound, self).__init__('Zone file not found')
super().__init__('Zone file not found')
class ZoneFileSourceLoadFailure(ZoneFileSourceException):
def __init__(self, error):
super(ZoneFileSourceLoadFailure, self).__init__(str(error))
super().__init__(str(error))
class ZoneFileSource(AxfrBaseSource):
@ -275,7 +273,7 @@ class ZoneFileSource(AxfrBaseSource):
file_extension,
check_origin,
)
super(ZoneFileSource, self).__init__(id)
super().__init__(id)
self.directory = directory
self.file_extension = file_extension
self.check_origin = check_origin


+ 2
- 4
octodns/source/envvar.py View File

@ -11,9 +11,7 @@ class EnvVarSourceException(Exception):
class EnvironmentVariableNotFoundException(EnvVarSourceException):
def __init__(self, data):
super(EnvironmentVariableNotFoundException, self).__init__(
f'Unknown environment variable {data}'
)
super().__init__(f'Unknown environment variable {data}')
class EnvVarSource(BaseSource):
@ -73,7 +71,7 @@ class EnvVarSource(BaseSource):
name,
ttl,
)
super(EnvVarSource, self).__init__(id)
super().__init__(id)
self.envvar = variable
self.name = name
self.ttl = ttl


+ 2
- 2
octodns/source/tinydns.py View File

@ -23,7 +23,7 @@ class TinyDnsBaseSource(BaseSource):
split_re = re.compile(r':+')
def __init__(self, id, default_ttl=3600):
super(TinyDnsBaseSource, self).__init__(id)
super().__init__(id)
self.default_ttl = default_ttl
def _data_for_A(self, _type, records):
@ -239,7 +239,7 @@ class TinyDnsFileSource(TinyDnsBaseSource):
directory,
default_ttl,
)
super(TinyDnsFileSource, self).__init__(id, default_ttl)
super().__init__(id, default_ttl)
self.directory = directory
self._cache = None


+ 5
- 0
octodns/yaml.py View File

@ -4,6 +4,7 @@
from natsort import natsort_keygen
from yaml import SafeDumper, SafeLoader, load, dump
from yaml.representer import SafeRepresenter
from yaml.constructor import ConstructorError
@ -54,6 +55,10 @@ class SortingDumper(SafeDumper):
SortingDumper.add_representer(dict, SortingDumper._representer)
# This should handle all the record value types which are ultimately either str
# or dict at some point in their inheritance hierarchy
SortingDumper.add_multi_representer(str, SafeRepresenter.represent_str)
SortingDumper.add_multi_representer(dict, SortingDumper._representer)
def safe_dump(data, fh, **options):


+ 2
- 2
tests/helpers.py View File

@ -95,7 +95,7 @@ class TemporaryDirectory(object):
class WantsConfigProcessor(BaseProcessor):
def __init__(self, name, some_config):
super(WantsConfigProcessor, self).__init__(name)
super().__init__(name)
class PlannableProvider(BaseProvider):
@ -106,7 +106,7 @@ class PlannableProvider(BaseProvider):
SUPPORTS = set(('A', 'AAAA', 'TXT'))
def __init__(self, *args, **kwargs):
super(PlannableProvider, self).__init__(*args, **kwargs)
super().__init__(*args, **kwargs)
def populate(self, zone, source=False, target=False, lenient=False):
pass


+ 2
- 2
tests/test_octodns_provider_base.py View File

@ -53,7 +53,7 @@ class HelperProvider(BaseProvider):
class TrickyProcessor(BaseProcessor):
def __init__(self, name, add_during_process_target_zone):
super(TrickyProcessor, self).__init__(name)
super().__init__(name)
self.add_during_process_target_zone = add_during_process_target_zone
self.reset()
@ -640,7 +640,7 @@ class TestBaseProvider(TestCase):
def __init__(self, **kwargs):
self.log = MagicMock()
super(MinimalProvider, self).__init__('minimal', **kwargs)
super().__init__('minimal', **kwargs)
normal = MinimalProvider(strict_supports=False)
# Should log and not expect


+ 399
- 125
tests/test_octodns_record.py View File

@ -16,6 +16,7 @@ from octodns.record import (
Create,
Delete,
GeoValue,
Ipv4Address,
LocRecord,
LocValue,
MxRecord,
@ -32,6 +33,7 @@ from octodns.record import (
SrvRecord,
SrvValue,
TlsaRecord,
TlsaValue,
TxtRecord,
Update,
UrlfwdRecord,
@ -93,6 +95,81 @@ class TestRecord(TestCase):
self.assertTrue(f'{encoded}.{zone.name}', record.fqdn)
self.assertTrue(f'{utf8}.{zone.decoded_name}', record.decoded_fqdn)
def test_utf8_values(self):
zone = Zone('unit.tests.', [])
utf8 = 'гэрбүл.mn.'
encoded = idna_encode(utf8)
# ALIAS
record = Record.new(
zone, '', {'type': 'ALIAS', 'ttl': 300, 'value': utf8}
)
self.assertEqual(encoded, record.value)
# CNAME
record = Record.new(
zone, 'cname', {'type': 'CNAME', 'ttl': 300, 'value': utf8}
)
self.assertEqual(encoded, record.value)
# DNAME
record = Record.new(
zone, 'dname', {'type': 'DNAME', 'ttl': 300, 'value': utf8}
)
self.assertEqual(encoded, record.value)
# MX
record = Record.new(
zone,
'mx',
{
'type': 'MX',
'ttl': 300,
'value': {'preference': 10, 'exchange': utf8},
},
)
self.assertEqual(
MxValue({'preference': 10, 'exchange': encoded}), record.values[0]
)
# NS
record = Record.new(
zone, 'ns', {'type': 'NS', 'ttl': 300, 'value': utf8}
)
self.assertEqual(encoded, record.values[0])
# PTR
another_utf8 = 'niño.mx.'
another_encoded = idna_encode(another_utf8)
record = Record.new(
zone,
'ptr',
{'type': 'PTR', 'ttl': 300, 'values': [utf8, another_utf8]},
)
self.assertEqual([encoded, another_encoded], record.values)
# SRV
record = Record.new(
zone,
'_srv._tcp',
{
'type': 'SRV',
'ttl': 300,
'value': {
'priority': 0,
'weight': 10,
'port': 80,
'target': utf8,
},
},
)
self.assertEqual(
SrvValue(
{'priority': 0, 'weight': 10, 'port': 80, 'target': encoded}
),
record.values[0],
)
def test_alias_lowering_value(self):
upper_record = AliasRecord(
self.zone,
@ -386,12 +463,14 @@ class TestRecord(TestCase):
def test_caa(self):
a_values = [
{'flags': 0, 'tag': 'issue', 'value': 'ca.example.net'},
{
'flags': 128,
'tag': 'iodef',
'value': 'mailto:security@example.com',
},
CaaValue({'flags': 0, 'tag': 'issue', 'value': 'ca.example.net'}),
CaaValue(
{
'flags': 128,
'tag': 'iodef',
'value': 'mailto:security@example.com',
}
),
]
a_data = {'ttl': 30, 'values': a_values}
a = CaaRecord(self.zone, 'a', a_data)
@ -406,7 +485,9 @@ class TestRecord(TestCase):
self.assertEqual(a_values[1]['value'], a.values[1].value)
self.assertEqual(a_data, a.data)
b_value = {'tag': 'iodef', 'value': 'http://iodef.example.com/'}
b_value = CaaValue(
{'tag': 'iodef', 'value': 'http://iodef.example.com/'}
)
b_data = {'ttl': 30, 'value': b_value}
b = CaaRecord(self.zone, 'b', b_data)
self.assertEqual(0, b.values[0].flags)
@ -448,20 +529,22 @@ class TestRecord(TestCase):
def test_loc(self):
a_values = [
{
'lat_degrees': 31,
'lat_minutes': 58,
'lat_seconds': 52.1,
'lat_direction': 'S',
'long_degrees': 115,
'long_minutes': 49,
'long_seconds': 11.7,
'long_direction': 'E',
'altitude': 20,
'size': 10,
'precision_horz': 10,
'precision_vert': 2,
}
LocValue(
{
'lat_degrees': 31,
'lat_minutes': 58,
'lat_seconds': 52.1,
'lat_direction': 'S',
'long_degrees': 115,
'long_minutes': 49,
'long_seconds': 11.7,
'long_direction': 'E',
'altitude': 20,
'size': 10,
'precision_horz': 10,
'precision_vert': 2,
}
)
]
a_data = {'ttl': 30, 'values': a_values}
a = LocRecord(self.zone, 'a', a_data)
@ -489,20 +572,22 @@ class TestRecord(TestCase):
a_values[0]['precision_vert'], a.values[0].precision_vert
)
b_value = {
'lat_degrees': 32,
'lat_minutes': 7,
'lat_seconds': 19,
'lat_direction': 'S',
'long_degrees': 116,
'long_minutes': 2,
'long_seconds': 25,
'long_direction': 'E',
'altitude': 10,
'size': 1,
'precision_horz': 10000,
'precision_vert': 10,
}
b_value = LocValue(
{
'lat_degrees': 32,
'lat_minutes': 7,
'lat_seconds': 19,
'lat_direction': 'S',
'long_degrees': 116,
'long_minutes': 2,
'long_seconds': 25,
'long_direction': 'E',
'altitude': 10,
'size': 1,
'precision_horz': 10000,
'precision_vert': 10,
}
)
b_data = {'ttl': 30, 'value': b_value}
b = LocRecord(self.zone, 'b', b_data)
self.assertEqual(b_value['lat_degrees'], b.values[0].lat_degrees)
@ -540,8 +625,8 @@ class TestRecord(TestCase):
def test_mx(self):
a_values = [
{'preference': 10, 'exchange': 'smtp1.'},
{'priority': 20, 'value': 'smtp2.'},
MxValue({'preference': 10, 'exchange': 'smtp1.'}),
MxValue({'priority': 20, 'value': 'smtp2.'}),
]
a_data = {'ttl': 30, 'values': a_values}
a = MxRecord(self.zone, 'a', a_data)
@ -550,12 +635,12 @@ class TestRecord(TestCase):
self.assertEqual(30, a.ttl)
self.assertEqual(a_values[0]['preference'], a.values[0].preference)
self.assertEqual(a_values[0]['exchange'], a.values[0].exchange)
self.assertEqual(a_values[1]['priority'], a.values[1].preference)
self.assertEqual(a_values[1]['value'], a.values[1].exchange)
a_data['values'][1] = {'preference': 20, 'exchange': 'smtp2.'}
self.assertEqual(a_values[1]['preference'], a.values[1].preference)
self.assertEqual(a_values[1]['exchange'], a.values[1].exchange)
a_data['values'][1] = MxValue({'preference': 20, 'exchange': 'smtp2.'})
self.assertEqual(a_data, a.data)
b_value = {'preference': 0, 'exchange': 'smtp3.'}
b_value = MxValue({'preference': 0, 'exchange': 'smtp3.'})
b_data = {'ttl': 30, 'value': b_value}
b = MxRecord(self.zone, 'b', b_data)
self.assertEqual(b_value['preference'], b.values[0].preference)
@ -591,22 +676,26 @@ class TestRecord(TestCase):
def test_naptr(self):
a_values = [
{
'order': 10,
'preference': 11,
'flags': 'X',
'service': 'Y',
'regexp': 'Z',
'replacement': '.',
},
{
'order': 20,
'preference': 21,
'flags': 'A',
'service': 'B',
'regexp': 'C',
'replacement': 'foo.com',
},
NaptrValue(
{
'order': 10,
'preference': 11,
'flags': 'X',
'service': 'Y',
'regexp': 'Z',
'replacement': '.',
}
),
NaptrValue(
{
'order': 20,
'preference': 21,
'flags': 'A',
'service': 'B',
'regexp': 'C',
'replacement': 'foo.com',
}
),
]
a_data = {'ttl': 30, 'values': a_values}
a = NaptrRecord(self.zone, 'a', a_data)
@ -618,14 +707,16 @@ class TestRecord(TestCase):
self.assertEqual(a_values[i][k], getattr(a.values[i], k))
self.assertEqual(a_data, a.data)
b_value = {
'order': 30,
'preference': 31,
'flags': 'M',
'service': 'N',
'regexp': 'O',
'replacement': 'x',
}
b_value = NaptrValue(
{
'order': 30,
'preference': 31,
'flags': 'M',
'service': 'N',
'regexp': 'O',
'replacement': 'x',
}
)
b_data = {'ttl': 30, 'value': b_value}
b = NaptrRecord(self.zone, 'b', b_data)
for k in a_values[0].keys():
@ -849,6 +940,30 @@ class TestRecord(TestCase):
values.add(o)
self.assertTrue(o in values)
self.assertEqual(30, o.order)
o.order = o.order + 1
self.assertEqual(31, o.order)
self.assertEqual(32, o.preference)
o.preference = o.preference + 1
self.assertEqual(33, o.preference)
self.assertEqual('M', o.flags)
o.flags = 'P'
self.assertEqual('P', o.flags)
self.assertEqual('N', o.service)
o.service = 'Q'
self.assertEqual('Q', o.service)
self.assertEqual('O', o.regexp)
o.regexp = 'R'
self.assertEqual('R', o.regexp)
self.assertEqual('z', o.replacement)
o.replacement = '1'
self.assertEqual('1', o.replacement)
def test_ns(self):
a_values = ['5.6.7.8.', '6.7.8.9.', '7.8.9.0.']
a_data = {'ttl': 30, 'values': a_values}
@ -867,8 +982,20 @@ class TestRecord(TestCase):
def test_sshfp(self):
a_values = [
{'algorithm': 10, 'fingerprint_type': 11, 'fingerprint': 'abc123'},
{'algorithm': 20, 'fingerprint_type': 21, 'fingerprint': 'def456'},
SshfpValue(
{
'algorithm': 10,
'fingerprint_type': 11,
'fingerprint': 'abc123',
}
),
SshfpValue(
{
'algorithm': 20,
'fingerprint_type': 21,
'fingerprint': 'def456',
}
),
]
a_data = {'ttl': 30, 'values': a_values}
a = SshfpRecord(self.zone, 'a', a_data)
@ -882,11 +1009,9 @@ class TestRecord(TestCase):
self.assertEqual(a_values[0]['fingerprint'], a.values[0].fingerprint)
self.assertEqual(a_data, a.data)
b_value = {
'algorithm': 30,
'fingerprint_type': 31,
'fingerprint': 'ghi789',
}
b_value = SshfpValue(
{'algorithm': 30, 'fingerprint_type': 31, 'fingerprint': 'ghi789'}
)
b_data = {'ttl': 30, 'value': b_value}
b = SshfpRecord(self.zone, 'b', b_data)
self.assertEqual(b_value['algorithm'], b.values[0].algorithm)
@ -930,8 +1055,12 @@ class TestRecord(TestCase):
def test_srv(self):
a_values = [
{'priority': 10, 'weight': 11, 'port': 12, 'target': 'server1'},
{'priority': 20, 'weight': 21, 'port': 22, 'target': 'server2'},
SrvValue(
{'priority': 10, 'weight': 11, 'port': 12, 'target': 'server1'}
),
SrvValue(
{'priority': 20, 'weight': 21, 'port': 22, 'target': 'server2'}
),
]
a_data = {'ttl': 30, 'values': a_values}
a = SrvRecord(self.zone, '_a._tcp', a_data)
@ -944,12 +1073,9 @@ class TestRecord(TestCase):
self.assertEqual(a_values[0]['target'], a.values[0].target)
self.assertEqual(a_data, a.data)
b_value = {
'priority': 30,
'weight': 31,
'port': 32,
'target': 'server3',
}
b_value = SrvValue(
{'priority': 30, 'weight': 31, 'port': 32, 'target': 'server3'}
)
b_data = {'ttl': 30, 'value': b_value}
b = SrvRecord(self.zone, '_b._tcp', b_data)
self.assertEqual(b_value['priority'], b.values[0].priority)
@ -993,18 +1119,22 @@ class TestRecord(TestCase):
def test_tlsa(self):
a_values = [
{
'certificate_usage': 1,
'selector': 1,
'matching_type': 1,
'certificate_association_data': 'ABABABABABABABABAB',
},
{
'certificate_usage': 2,
'selector': 0,
'matching_type': 2,
'certificate_association_data': 'ABABABABABABABABAC',
},
TlsaValue(
{
'certificate_usage': 1,
'selector': 1,
'matching_type': 1,
'certificate_association_data': 'ABABABABABABABABAB',
}
),
TlsaValue(
{
'certificate_usage': 2,
'selector': 0,
'matching_type': 2,
'certificate_association_data': 'ABABABABABABABABAC',
}
),
]
a_data = {'ttl': 30, 'values': a_values}
a = TlsaRecord(self.zone, 'a', a_data)
@ -1036,12 +1166,14 @@ class TestRecord(TestCase):
)
self.assertEqual(a_data, a.data)
b_value = {
'certificate_usage': 0,
'selector': 0,
'matching_type': 0,
'certificate_association_data': 'AAAAAAAAAAAAAAA',
}
b_value = TlsaValue(
{
'certificate_usage': 0,
'selector': 0,
'matching_type': 0,
'certificate_association_data': 'AAAAAAAAAAAAAAA',
}
)
b_data = {'ttl': 30, 'value': b_value}
b = TlsaRecord(self.zone, 'b', b_data)
self.assertEqual(
@ -1093,20 +1225,24 @@ class TestRecord(TestCase):
def test_urlfwd(self):
a_values = [
{
'path': '/',
'target': 'http://foo',
'code': 301,
'masking': 2,
'query': 0,
},
{
'path': '/target',
'target': 'http://target',
'code': 302,
'masking': 2,
'query': 0,
},
UrlfwdValue(
{
'path': '/',
'target': 'http://foo',
'code': 301,
'masking': 2,
'query': 0,
}
),
UrlfwdValue(
{
'path': '/target',
'target': 'http://target',
'code': 302,
'masking': 2,
'query': 0,
}
),
]
a_data = {'ttl': 30, 'values': a_values}
a = UrlfwdRecord(self.zone, 'a', a_data)
@ -1125,13 +1261,15 @@ class TestRecord(TestCase):
self.assertEqual(a_values[1]['query'], a.values[1].query)
self.assertEqual(a_data, a.data)
b_value = {
'path': '/',
'target': 'http://location',
'code': 301,
'masking': 2,
'query': 0,
}
b_value = UrlfwdValue(
{
'path': '/',
'target': 'http://location',
'code': 301,
'masking': 2,
'query': 0,
}
)
b_data = {'ttl': 30, 'value': b_value}
b = UrlfwdRecord(self.zone, 'b', b_data)
self.assertEqual(b_value['path'], b.values[0].path)
@ -1653,6 +1791,54 @@ class TestRecord(TestCase):
self.assertTrue(c >= c)
self.assertTrue(c <= c)
self.assertEqual(31, a.lat_degrees)
a.lat_degrees = a.lat_degrees + 1
self.assertEqual(32, a.lat_degrees)
self.assertEqual(58, a.lat_minutes)
a.lat_minutes = a.lat_minutes + 1
self.assertEqual(59, a.lat_minutes)
self.assertEqual(52.1, a.lat_seconds)
a.lat_seconds = a.lat_seconds + 1
self.assertEqual(53.1, a.lat_seconds)
self.assertEqual('S', a.lat_direction)
a.lat_direction = 'N'
self.assertEqual('N', a.lat_direction)
self.assertEqual(115, a.long_degrees)
a.long_degrees = a.long_degrees + 1
self.assertEqual(116, a.long_degrees)
self.assertEqual(49, a.long_minutes)
a.long_minutes = a.long_minutes + 1
self.assertEqual(50, a.long_minutes)
self.assertEqual(11.7, a.long_seconds)
a.long_seconds = a.long_seconds + 1
self.assertEqual(12.7, a.long_seconds)
self.assertEqual('E', a.long_direction)
a.long_direction = 'W'
self.assertEqual('W', a.long_direction)
self.assertEqual(20, a.altitude)
a.altitude = a.altitude + 1
self.assertEqual(21, a.altitude)
self.assertEqual(10, a.size)
a.size = a.size + 1
self.assertEqual(11, a.size)
self.assertEqual(10, a.precision_horz)
a.precision_horz = a.precision_horz + 1
self.assertEqual(11, a.precision_horz)
self.assertEqual(2, a.precision_vert)
a.precision_vert = a.precision_vert + 1
self.assertEqual(3, a.precision_vert)
# Hash
values = set()
values.add(a)
@ -2477,7 +2663,7 @@ class TestRecordValidation(TestCase):
)
self.assertEqual(['missing value'], ctx.exception.reasons)
def test_CNAME(self):
def test_cname_validation(self):
# doesn't blow up
Record.new(
self.zone,
@ -3026,6 +3212,19 @@ class TestRecordValidation(TestCase):
ctx.exception.reasons,
)
# if exchange doesn't exist value can not be None/falsey
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'',
{
'type': 'MX',
'ttl': 600,
'value': {'preference': 10, 'value': ''},
},
)
self.assertEqual(['missing exchange'], ctx.exception.reasons)
# exchange can be a single `.`
record = Record.new(
self.zone,
@ -3038,7 +3237,7 @@ class TestRecordValidation(TestCase):
)
self.assertEqual('.', record.values[0].exchange)
def test_NXPTR(self):
def test_NAPTR(self):
# doesn't blow up
Record.new(
self.zone,
@ -3521,6 +3720,24 @@ class TestRecordValidation(TestCase):
ctx.exception.reasons,
)
# falsey target
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
'_srv._tcp',
{
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 2,
'port': 3,
'target': '',
},
},
)
self.assertEqual(['missing target'], ctx.exception.reasons)
# target must be a valid FQDN
with self.assertRaises(ValidationError) as ctx:
Record.new(
@ -5206,7 +5423,6 @@ class TestDynamicRecords(TestCase):
'pools': {
'one': {'values': [{'value': '3.3.3.3'}]},
'two': {
# Testing out of order value sorting here
'values': [{'value': '5.5.5.5'}, {'value': '4.4.4.4'}]
},
'three': {
@ -5234,9 +5450,12 @@ class TestDynamicRecords(TestCase):
)
def test_dynamic_eqs(self):
pool_one = _DynamicPool('one', {'values': [{'value': '1.2.3.4'}]})
pool_two = _DynamicPool('two', {'values': [{'value': '1.2.3.5'}]})
pool_one = _DynamicPool(
'one', {'values': [{'value': '1.2.3.4'}]}, Ipv4Address
)
pool_two = _DynamicPool(
'two', {'values': [{'value': '1.2.3.5'}]}, Ipv4Address
)
self.assertEqual(pool_one, pool_one)
self.assertNotEqual(pool_one, pool_two)
self.assertNotEqual(pool_one, 42)
@ -5255,6 +5474,61 @@ class TestDynamicRecords(TestCase):
self.assertNotEqual(dynamic, other)
self.assertNotEqual(dynamic, 42)
def test_dynamic_cname_idna(self):
a_utf8 = 'natación.mx.'
a_encoded = idna_encode(a_utf8)
b_utf8 = 'гэрбүл.mn.'
b_encoded = idna_encode(b_utf8)
cname_data = {
'dynamic': {
'pools': {
'one': {
# Testing out of order value sorting here
'values': [
{'value': 'b.unit.tests.'},
{'value': 'a.unit.tests.'},
]
},
'two': {
'values': [
# some utf8 values we expect to be idna encoded
{'weight': 10, 'value': a_utf8},
{'weight': 12, 'value': b_utf8},
]
},
},
'rules': [
{'geos': ['NA-US-CA'], 'pool': 'two'},
{'pool': 'one'},
],
},
'type': 'CNAME',
'ttl': 60,
'value': a_utf8,
}
cname = Record.new(self.zone, 'cname', cname_data)
self.assertEqual(a_encoded, cname.value)
self.assertEqual(
{
'fallback': None,
'values': [
{'weight': 1, 'value': 'a.unit.tests.', 'status': 'obey'},
{'weight': 1, 'value': 'b.unit.tests.', 'status': 'obey'},
],
},
cname.dynamic.pools['one'].data,
)
self.assertEqual(
{
'fallback': None,
'values': [
{'weight': 12, 'value': b_encoded, 'status': 'obey'},
{'weight': 10, 'value': a_encoded, 'status': 'obey'},
],
},
cname.dynamic.pools['two'].data,
)
class TestChanges(TestCase):
zone = Zone('unit.tests.', [])


Loading…
Cancel
Save