Browse Source

Implement cleaned code from `https://github.com/solarmonkey`

pull/815/head
Maikel Poot 4 years ago
parent
commit
16f9acd870
3 changed files with 515 additions and 483 deletions
  1. +148
    -244
      octodns/provider/transip.py
  2. +2
    -2
      requirements.txt
  3. +365
    -237
      tests/test_octodns_provider_transip.py

+ 148
- 244
octodns/provider/transip.py View File

@ -1,19 +1,18 @@
#
#
#
from __future__ import (absolute_import, division, print_function,
unicode_literals)
from __future__ import absolute_import, division, print_function, \
unicode_literals
from collections import defaultdict, namedtuple
from logging import getLogger
from suds import WebFault
from transip import TransIP
from transip.exceptions import TransIPHTTPError
from transip.v6.objects import DnsEntry
from collections import defaultdict
from . import ProviderException
from .base import BaseProvider
from logging import getLogger
from ..record import Record
from transip.service.domain import DomainService
from transip.service.objects import DnsEntry
from .base import BaseProvider
DNSEntry = namedtuple('DNSEntry', ('name', 'expire', 'type', 'content'))
class TransipException(ProviderException):
@ -48,6 +47,7 @@ class TransipProvider(BaseProvider):
# if both `key_file` and `key` are presented `key_file` is used
'''
SUPPORTS_GEO = False
SUPPORTS_DYNAMIC = False
SUPPORTS = set(('A', 'AAAA', 'CNAME', 'MX', 'NS', 'SRV', 'SPF', 'TXT',
@ -64,71 +64,74 @@ class TransipProvider(BaseProvider):
super(TransipProvider, self).__init__(id, *args, **kwargs)
if key_file is not None:
self._client = self._domain_service(account,
private_key_file=key_file)
self._client = TransIP(login=account, private_key_file=key_file)
elif key is not None:
self._client = self._domain_service(account, private_key=key)
self._client = TransIP(login=account, private_key=key)
else:
raise TransipConfigException(
'Missing `key` or `key_file` parameter in config'
)
self._currentZone = {}
def _domain_service(self, *args, **kwargs):
'This exists only for mocking purposes'
return DomainService(*args, **kwargs)
def populate(self, zone, target=False, lenient=False):
exists = False
self._currentZone = zone
'''
Populate the zone with records in-place.
'''
self.log.debug('populate: name=%s, target=%s, lenient=%s', zone.name,
target, lenient)
before = len(zone.records)
try:
zoneInfo = self._client.get_info(zone.name[:-1])
except WebFault as e:
if e.fault.faultcode == '102' and target is False:
domain = self._client.domains.get(zone.name.strip('.'))
records = domain.dns.list()
except TransIPHTTPError as e:
if e.response_code == 404 and target is False:
# Zone not found in account, and not a target so just
# leave an empty zone.
return exists
elif e.fault.faultcode == '102' and target is True:
return False
elif e.response_code == 404 and target is True:
self.log.warning('populate: Transip can\'t create new zones')
raise TransipNewZoneException(
('populate: ({}) Transip used ' +
'as target for non-existing zone: {}').format(
e.fault.faultcode, zone.name))
e.response_code, zone.name))
else:
self.log.error('populate: (%s) %s ', e.fault.faultcode,
e.fault.faultstring)
raise e
self.log.debug('populate: found %s records for zone %s',
len(zoneInfo.dnsEntries), zone.name)
exists = True
if zoneInfo.dnsEntries:
self.log.error(
'populate: (%s) %s ', e.response_code, e.message
)
raise TransipException(
'Unhandled error: ({}) {}'.format(
e.response_code, e.message
)
)
self.log.debug(
'populate: found %s records for zone %s', len(records), zone.name
)
if records:
values = defaultdict(lambda: defaultdict(list))
for record in zoneInfo.dnsEntries:
name = zone.hostname_from_fqdn(record['name'])
for record in records:
name = zone.hostname_from_fqdn(record.name)
if name == self.ROOT_RECORD:
name = ''
if record['type'] in self.SUPPORTS:
values[name][record['type']].append(record)
if record.type in self.SUPPORTS:
values[name][record.type].append(record)
for name, types in values.items():
for _type, records in types.items():
data_for = getattr(self, '_data_for_{}'.format(_type))
record = Record.new(zone, name, data_for(_type, records),
source=self, lenient=lenient)
record = Record.new(
zone,
name,
_data_for(_type, records, zone),
source=self,
lenient=lenient,
)
zone.add_record(record, lenient=lenient)
self.log.info('populate: found %s records, exists = %s',
len(zone.records) - before, exists)
self.log.info('populate: found %s records',
len(zone.records) - before)
self._currentZone = {}
return exists
return True
def _apply(self, plan):
desired = plan.desired
@ -136,219 +139,120 @@ class TransipProvider(BaseProvider):
self.log.debug('apply: zone=%s, changes=%d', desired.name,
len(changes))
self._currentZone = plan.desired
try:
self._client.get_info(plan.desired.name[:-1])
except WebFault as e:
self.log.exception('_apply: get_info failed')
raise e
domain = self._client.domains.get(plan.desired.name[:-1])
except TransIPHTTPError as e:
self.log.exception('_apply: getting the domain failed')
raise TransipException(
'Unhandled error: ({}) {}'.format(e.response_code, e.message)
)
_dns_entries = []
records = []
for record in plan.desired.records:
entries_for = getattr(self, '_entries_for_{}'.format(record._type))
if record._type in self.SUPPORTS:
# Root records have '@' as name
name = record.name
if name == '':
name = self.ROOT_RECORD
# Root records have '@' as name
name = record.name
if name == '':
name = self.ROOT_RECORD
_dns_entries.extend(entries_for(name, record))
records.extend(_entries_for(name, record))
# Transform DNSEntry namedtuples into transip.v6.objects.DnsEntry
# objects, which is a bit ugly because it's quite a magical object.
api_records = [DnsEntry(domain.dns, r._asdict()) for r in records]
try:
self._client.set_dns_entries(plan.desired.name[:-1], _dns_entries)
except WebFault as e:
self.log.warning(('_apply: Set DNS returned ' +
'one or more errors: {}').format(
e.fault.faultstring))
raise TransipException(200, e.fault.faultstring)
self._currentZone = {}
def _entries_for_multiple(self, name, record):
_entries = []
for value in record.values:
_entries.append(DnsEntry(name, record.ttl, record._type, value))
return _entries
def _entries_for_single(self, name, record):
return [DnsEntry(name, record.ttl, record._type, record.value)]
_entries_for_A = _entries_for_multiple
_entries_for_AAAA = _entries_for_multiple
_entries_for_NS = _entries_for_multiple
_entries_for_SPF = _entries_for_multiple
_entries_for_CNAME = _entries_for_single
def _entries_for_MX(self, name, record):
_entries = []
for value in record.values:
content = "{} {}".format(value.preference, value.exchange)
_entries.append(DnsEntry(name, record.ttl, record._type, content))
return _entries
def _entries_for_SRV(self, name, record):
_entries = []
for value in record.values:
content = "{} {} {} {}".format(value.priority, value.weight,
value.port, value.target)
_entries.append(DnsEntry(name, record.ttl, record._type, content))
return _entries
def _entries_for_SSHFP(self, name, record):
_entries = []
for value in record.values:
content = "{} {} {}".format(value.algorithm,
value.fingerprint_type,
value.fingerprint)
_entries.append(DnsEntry(name, record.ttl, record._type, content))
return _entries
def _entries_for_CAA(self, name, record):
_entries = []
for value in record.values:
content = "{} {} {}".format(value.flags, value.tag,
value.value)
_entries.append(DnsEntry(name, record.ttl, record._type, content))
return _entries
def _entries_for_TXT(self, name, record):
_entries = []
for value in record.values:
value = value.replace('\\;', ';')
_entries.append(DnsEntry(name, record.ttl, record._type, value))
return _entries
def _parse_to_fqdn(self, value):
# Enforce switch from suds.sax.text.Text to string
value = str(value)
# TransIP allows '@' as value to alias the root record.
# this provider won't set an '@' value, but can be an existing record
if value == self.ROOT_RECORD:
value = self._currentZone.name
if value[-1] != '.':
self.log.debug('parseToFQDN: changed %s to %s', value,
'{}.{}'.format(value, self._currentZone.name))
value = '{}.{}'.format(value, self._currentZone.name)
return value
def _get_lowest_ttl(self, records):
_ttl = 100000
for record in records:
_ttl = min(_ttl, record['expire'])
return _ttl
def _data_for_multiple(self, _type, records):
_values = []
for record in records:
# Enforce switch from suds.sax.text.Text to string
_values.append(str(record['content']))
return {
'ttl': self._get_lowest_ttl(records),
'type': _type,
'values': _values
}
_data_for_A = _data_for_multiple
_data_for_AAAA = _data_for_multiple
_data_for_NS = _data_for_multiple
_data_for_SPF = _data_for_multiple
domain.dns.replace(api_records)
except TransIPHTTPError as e:
self.log.warning(
'_apply: Set DNS returned one or more errors: {}'.format(e)
)
raise TransipException(
'Unhandled error: ({}) {}'.format(e.response_code, e.message)
)
def _data_for_CNAME(self, _type, records):
return {
'ttl': records[0]['expire'],
'type': _type,
'value': self._parse_to_fqdn(records[0]['content'])
}
def _data_for_MX(self, _type, records):
_values = []
for record in records:
preference, exchange = record['content'].split(" ", 1)
_values.append({
'preference': preference,
'exchange': self._parse_to_fqdn(exchange)
})
def _data_for(type_, records, current_zone):
if type_ == 'CNAME':
return {
'ttl': self._get_lowest_ttl(records),
'type': _type,
'values': _values
'type': type_,
'ttl': records[0].expire,
'value': _parse_to_fqdn(records[0].content, current_zone),
}
def _data_for_SRV(self, _type, records):
_values = []
for record in records:
priority, weight, port, target = record['content'].split(' ', 3)
_values.append({
'port': port,
'priority': priority,
'target': self._parse_to_fqdn(target),
'weight': weight
})
def format_mx(record):
preference, exchange = record.content.split(' ', 1)
return {
'type': _type,
'ttl': self._get_lowest_ttl(records),
'values': _values
'preference': preference,
'exchange': _parse_to_fqdn(exchange, current_zone),
}
def _data_for_SSHFP(self, _type, records):
_values = []
for record in records:
algorithm, fp_type, fingerprint = record['content'].split(' ', 2)
_values.append({
'algorithm': algorithm,
'fingerprint': fingerprint.lower(),
'fingerprint_type': fp_type
})
def format_srv(record):
priority, weight, port, target = record.content.split(' ', 3)
return {
'type': _type,
'ttl': self._get_lowest_ttl(records),
'values': _values
'port': port,
'priority': priority,
'target': _parse_to_fqdn(target, current_zone),
'weight': weight,
}
def _data_for_CAA(self, _type, records):
_values = []
for record in records:
flags, tag, value = record['content'].split(' ', 2)
_values.append({
'flags': flags,
'tag': tag,
'value': value
})
def format_sshfp(record):
algorithm, fp_type, fingerprint = record.content.split(' ', 2)
return {
'type': _type,
'ttl': self._get_lowest_ttl(records),
'values': _values
'algorithm': algorithm,
'fingerprint': fingerprint.lower(),
'fingerprint_type': fp_type,
}
def _data_for_TXT(self, _type, records):
_values = []
for record in records:
_values.append(record['content'].replace(';', '\\;'))
return {
'type': _type,
'ttl': self._get_lowest_ttl(records),
'values': _values
}
def format_caa(record):
flags, tag, value = record.content.split(' ', 2)
return {'flags': flags, 'tag': tag, 'value': value}
def format_txt(record):
return record.content.replace(';', '\\;')
value_formatter = {
'MX': format_mx,
'SRV': format_srv,
'SSHFP': format_sshfp,
'CAA': format_caa,
'TXT': format_txt,
}.get(type_, lambda r: r.content)
return {
'type': type_,
'ttl': _get_lowest_ttl(records),
'values': [value_formatter(r) for r in records],
}
def _parse_to_fqdn(value, current_zone):
# TransIP allows '@' as value to alias the root record.
# this provider won't set an '@' value, but can be an existing record
if value == TransipProvider.ROOT_RECORD:
value = current_zone.name
if value[-1] != '.':
value = '{}.{}'.format(value, current_zone.name)
return value
def _get_lowest_ttl(records):
return min([r.expire for r in records] + [100000])
def _entries_for(name, record):
values = record.values if hasattr(record, 'values') else [record.value]
formatter = {
'MX': lambda v: f'{v.preference} {v.exchange}',
'SRV': lambda v: f'{v.priority} {v.weight} {v.port} {v.target}',
'SSHFP': lambda v: (
f'{v.algorithm} {v.fingerprint_type} {v.fingerprint}'
),
'CAA': lambda v: f'{v.flags} {v.tag} {v.value}',
'TXT': lambda v: v.replace('\\;', ';'),
}.get(record._type, lambda r: r)
return [
DNSEntry(name, record.ttl, record._type, formatter(value))
for value in values
]

+ 2
- 2
requirements.txt View File

@ -20,8 +20,8 @@ ovh==0.5.0
pycountry-convert==0.7.2
pycountry==20.7.3
python-dateutil==2.8.1
requests==2.24.0
requests==2.25.1
s3transfer==0.3.3
setuptools==44.1.1
six==1.15.0
transip==2.1.2
python-transip==0.5.0

+ 365
- 237
tests/test_octodns_provider_transip.py View File

@ -1,291 +1,419 @@
#
#
#
from __future__ import absolute_import, division, print_function, \
unicode_literals
from __future__ import (absolute_import, division, print_function,
unicode_literals)
from operator import itemgetter
from os.path import dirname, join
from six import text_type
from suds import WebFault
from mock import patch
from unittest import TestCase
from unittest.mock import Mock, patch
from octodns.provider.transip import TransipProvider
from octodns.provider.transip import (DNSEntry, TransipConfigException,
TransipException,
TransipNewZoneException, TransipProvider,
_entries_for, _parse_to_fqdn)
from octodns.provider.yaml import YamlProvider
from octodns.zone import Zone
from transip.service.objects import DnsEntry
class MockFault(object):
faultstring = ""
faultcode = ""
def __init__(self, code, string, *args, **kwargs):
self.faultstring = string
self.faultcode = code
class MockResponse(object):
dnsEntries = []
class MockDomainService(object):
def __init__(self, *args, **kwargs):
self.mockupEntries = []
self.throw_auth_fault = False
from transip.exceptions import TransIPHTTPError
def mockup(self, records):
provider = TransipProvider('', '', '')
def make_expected():
expected = Zone("unit.tests.", [])
source = YamlProvider("test", join(dirname(__file__), "config"))
source.populate(expected)
return expected
_dns_entries = []
for record in records:
if record._type in provider.SUPPORTS:
entries_for = getattr(provider,
'_entries_for_{}'.format(record._type))
# Root records have '@' as name
name = record.name
if name == '':
name = provider.ROOT_RECORD
def make_mock():
zone = make_expected()
_dns_entries.extend(entries_for(name, record))
# Turn Zone.records into TransIP DNSEntries
api_entries = []
for record in zone.records:
if record._type in TransipProvider.SUPPORTS:
# Root records have '@' as name
name = record.name
if name == "":
name = TransipProvider.ROOT_RECORD
# Add a non-supported type
# so it triggers the "is supported" (transip.py:115) check and
# give 100% code coverage
_dns_entries.append(
DnsEntry('@', '3600', 'BOGUS', 'ns01.transip.nl.'))
api_entries.extend(_entries_for(name, record))
self.mockupEntries = _dns_entries
# Append bogus entry so test for record type not being in SUPPORTS is
# executed. For 100% test coverage.
api_entries.append(DNSEntry("@", "3600", "BOGUS", "ns.transip.nl"))
# Skips authentication layer and returns the entries loaded by "Mockup"
def get_info(self, domain_name):
return zone, api_entries
if self.throw_auth_fault:
self.raiseInvalidAuth()
# Special 'domain' to trigger error
if str(domain_name) == str('notfound.unit.tests'):
self.raiseZoneNotFound()
def make_mock_empty():
mock = Mock()
mock.return_value.domains.get.return_value.dns.list.return_value = []
return mock
result = MockResponse()
result.dnsEntries = self.mockupEntries
return result
def set_dns_entries(self, domain_name, dns_entries):
# Special 'domain' to trigger error
if str(domain_name) == str('failsetdns.unit.tests'):
self.raiseSaveError()
return True
def raiseZoneNotFound(self):
fault = MockFault(str('102'), '102 is zone not found')
document = {}
raise WebFault(fault, document)
def raiseInvalidAuth(self):
fault = MockFault(str('200'), '200 is invalid auth')
document = {}
raise WebFault(fault, document)
def raiseSaveError(self):
fault = MockFault(str('200'), '202 random error')
document = {}
raise WebFault(fault, document)
def make_failing_mock(response_code):
mock = Mock()
mock.return_value.domains.get.side_effect = [
TransIPHTTPError(str(response_code), response_code)
]
return mock
class TestTransipProvider(TestCase):
bogus_key = str("""-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEA0U5HGCkLrz423IyUf3u4cKN2WrNz1x5KNr6PvH2M/zxas+zB
elbxkdT3AQ+wmfcIvOuTmFRTHv35q2um1aBrPxVw+2s+lWo28VwIRttwIB1vIeWu
lSBnkEZQRLyPI2tH0i5QoMX4CVPf9rvij3Uslimi84jdzDfPFIh6jZ6C8nLipOTG
0IMhge1ofVfB0oSy5H+7PYS2858QLAf5ruYbzbAxZRivS402wGmQ0d0Lc1KxraAj
kiMM5yj/CkH/Vm2w9I6+tLFeASE4ub5HCP5G/ig4dbYtqZMQMpqyAbGxd5SOVtyn
UHagAJUxf8DT3I8PyjEHjxdOPUsxNyRtepO/7QIDAQABAoIBAQC7fiZ7gxE/ezjD
2n6PsHFpHVTBLS2gzzZl0dCKZeFvJk6ODJDImaeuHhrh7X8ifMNsEI9XjnojMhl8
MGPzy88mZHugDNK0H8B19x5G8v1/Fz7dG5WHas660/HFkS+b59cfdXOugYiOOn9O
08HBBpLZNRUOmVUuQfQTjapSwGLG8PocgpyRD4zx0LnldnJcqYCxwCdev+AAsPnq
ibNtOd/MYD37w9MEGcaxLE8wGgkv8yd97aTjkgE+tp4zsM4QE4Rag133tsLLNznT
4Qr/of15M3NW/DXq/fgctyRcJjZpU66eCXLCz2iRTnLyyxxDC2nwlxKbubV+lcS0
S4hbfd/BAoGBAO8jXxEaiybR0aIhhSR5esEc3ymo8R8vBN3ZMJ+vr5jEPXr/ZuFj
/R4cZ2XV3VoQJG0pvIOYVPZ5DpJM7W+zSXtJ/7bLXy4Bnmh/rc+YYgC+AXQoLSil
iD2OuB2xAzRAK71DVSO0kv8gEEXCersPT2i6+vC2GIlJvLcYbOdRKWGxAoGBAOAQ
aJbRLtKujH+kMdoMI7tRlL8XwI+SZf0FcieEu//nFyerTePUhVgEtcE+7eQ7hyhG
fIXUFx/wALySoqFzdJDLc8U8pTLhbUaoLOTjkwnCTKQVprhnISqQqqh/0U5u47IE
RWzWKN6OHb0CezNTq80Dr6HoxmPCnJHBHn5LinT9AoGAQSpvZpbIIqz8pmTiBl2A
QQ2gFpcuFeRXPClKYcmbXVLkuhbNL1BzEniFCLAt4LQTaRf9ghLJ3FyCxwVlkpHV
zV4N6/8hkcTpKOraL38D/dXJSaEFJVVuee/hZl3tVJjEEpA9rDwx7ooLRSdJEJ6M
ciq55UyKBSdt4KssSiDI2RECgYBL3mJ7xuLy5bWfNsrGiVvD/rC+L928/5ZXIXPw
26oI0Yfun7ulDH4GOroMcDF/GYT/Zzac3h7iapLlR0WYI47xxGI0A//wBZLJ3QIu
krxkDo2C9e3Y/NqnHgsbOQR3aWbiDT4wxydZjIeXS3LKA2fl6Hyc90PN3cTEOb8I
hq2gRQKBgEt0SxhhtyB93SjgTzmUZZ7PiEf0YJatfM6cevmjWHexrZH+x31PB72s
fH2BQyTKKzoCLB1k/6HRaMnZdrWyWSZ7JKz3AHJ8+58d0Hr8LTrzDM1L6BbjeDct
N4OiVz1I3rbZGYa396lpxO6ku8yCglisL1yrSP6DdEUp66ntpKVd
-----END RSA PRIVATE KEY-----""")
def make_expected(self):
expected = Zone('unit.tests.', [])
source = YamlProvider('test', join(dirname(__file__), 'config'))
source.populate(expected)
return expected
@patch('octodns.provider.transip.TransipProvider._domain_service',
return_value=MockDomainService())
def test_init(self, _):
# No key nor key_file
with self.assertRaises(Exception) as ctx:
TransipProvider('test', 'unittest')
bogus_key = "-----BEGIN RSA PRIVATE KEY-----Z-----END RSA PRIVATE KEY-----"
self.assertEquals(
str('Missing `key` or `key_file` parameter in config'),
str(ctx.exception))
# With key
TransipProvider('test', 'unittest', key=self.bogus_key)
# With key_file
TransipProvider('test', 'unittest', key_file='/fake/path')
@patch("octodns.provider.transip.TransIP", make_mock_empty())
def test_init(self):
with self.assertRaises(TransipConfigException) as ctx:
TransipProvider("test", "unittest")
@patch('suds.client.Client.__init__', new=lambda *args, **kwargs: None)
def test_domain_service(self):
# Special case smoke test for DomainService to get coverage
TransipProvider('test', 'unittest', key=self.bogus_key)
self.assertEquals(
"Missing `key` or `key_file` parameter in config",
str(ctx.exception),
)
@patch('octodns.provider.transip.TransipProvider._domain_service',
return_value=MockDomainService())
def test_populate(self, _):
_expected = self.make_expected()
# Those should work
TransipProvider("test", "unittest", key=self.bogus_key)
TransipProvider("test", "unittest", key_file="/fake/path")
@patch("octodns.provider.transip.TransIP", make_failing_mock(401))
def test_populate_unauthenticated(self):
# Unhappy Plan - Not authenticated
# Live test against API, will fail in an unauthorized error
with self.assertRaises(WebFault) as ctx:
provider = TransipProvider('test', 'unittest', self.bogus_key)
provider._client.throw_auth_fault = True
zone = Zone('unit.tests.', [])
provider = TransipProvider("test", "unittest", self.bogus_key)
zone = Zone("unit.tests.", [])
with self.assertRaises(TransipException):
provider.populate(zone, True)
self.assertEquals(str('WebFault'),
str(ctx.exception.__class__.__name__))
self.assertEquals(str('200'), ctx.exception.fault.faultcode)
# No more auth problems
provider._client.throw_auth_fault = False
@patch("octodns.provider.transip.TransIP", make_failing_mock(404))
def test_populate_new_zone_as_target(self):
# Unhappy Plan - Zone does not exists
# Will trigger an exception if provider is used as a target for a
# non-existing zone
with self.assertRaises(Exception) as ctx:
provider = TransipProvider('test', 'unittest', self.bogus_key)
zone = Zone('notfound.unit.tests.', [])
provider = TransipProvider("test", "unittest", self.bogus_key)
zone = Zone("notfound.unit.tests.", [])
with self.assertRaises(TransipNewZoneException):
provider.populate(zone, True)
self.assertEquals(str('TransipNewZoneException'),
str(ctx.exception.__class__.__name__))
self.assertEquals(
'populate: (102) Transip used as target' +
' for non-existing zone: notfound.unit.tests.',
text_type(ctx.exception))
@patch("octodns.provider.transip.TransIP", make_mock_empty())
def test_populate_new_zone_not_target(self):
# Happy Plan - Zone does not exists
# Won't trigger an exception if provider is NOT used as a target for a
# non-existing zone.
provider = TransipProvider('test', 'unittest', self.bogus_key)
zone = Zone('notfound.unit.tests.', [])
provider = TransipProvider("test", "unittest", self.bogus_key)
zone = Zone("notfound.unit.tests.", [])
provider.populate(zone, False)
# Happy Plan - Populate with mockup records
provider = TransipProvider('test', 'unittest', self.bogus_key)
provider._client.mockup(_expected.records)
zone = Zone('unit.tests.', [])
@patch("octodns.provider.transip.TransIP", make_failing_mock(404))
def test_populate_zone_does_not_exist(self):
# Happy Plan - Zone does not exists
# Won't trigger an exception if provider is NOT used as a target for a
# non-existing zone.
provider = TransipProvider("test", "unittest", self.bogus_key)
zone = Zone("notfound.unit.tests.", [])
provider.populate(zone, False)
# Transip allows relative values for types like cname, mx.
# Test is these are correctly appended with the domain
provider._currentZone = zone
self.assertEquals("www.unit.tests.", provider._parse_to_fqdn("www"))
self.assertEquals("www.unit.tests.",
provider._parse_to_fqdn("www.unit.tests."))
self.assertEquals("www.sub.sub.sub.unit.tests.",
provider._parse_to_fqdn("www.sub.sub.sub"))
self.assertEquals("unit.tests.",
provider._parse_to_fqdn("@"))
@patch("octodns.provider.transip.TransIP")
def test_populate_zone_exists_not_target(self, mock_client):
# Happy Plan - Populate
source_zone, api_records = make_mock()
mock_client.return_value.domains.get.return_value.dns.list. \
return_value = api_records
provider = TransipProvider("test", "unittest", self.bogus_key)
zone = Zone("unit.tests.", [])
exists = provider.populate(zone, False)
self.assertTrue(exists, "populate should return True")
# Due to the implementation of Record._equality_tuple() we can't do a
# normal compare, as that ingores ttl's for example. We therefor use
# the __repr__ to compare. We do need to filter out `.geo` attributes
# that Transip doesn't support.
expected = set()
for r in source_zone.records:
if r._type in TransipProvider.SUPPORTS:
if hasattr(r, "geo"):
r.geo = None
expected.add(r.__repr__())
self.assertEqual({r.__repr__() for r in zone.records}, expected)
@patch("octodns.provider.transip.TransIP", make_mock_empty())
def test_populate_zone_exists_as_target(self):
# Happy Plan - Even if the zone has no records the zone should exist
provider = TransipProvider('test', 'unittest', self.bogus_key)
zone = Zone('unit.tests.', [])
provider = TransipProvider("test", "unittest", self.bogus_key)
zone = Zone("unit.tests.", [])
exists = provider.populate(zone, True)
self.assertTrue(exists, 'populate should return true')
return
@patch('octodns.provider.transip.TransipProvider._domain_service',
return_value=MockDomainService())
def test_plan(self, _):
_expected = self.make_expected()
# Test Happy plan, only create
provider = TransipProvider('test', 'unittest', self.bogus_key)
plan = provider.plan(_expected)
self.assertEqual(15, plan.change_counts['Create'])
self.assertEqual(0, plan.change_counts['Update'])
self.assertEqual(0, plan.change_counts['Delete'])
return
@patch('octodns.provider.transip.TransipProvider._domain_service',
return_value=MockDomainService())
def test_apply(self, _):
_expected = self.make_expected()
# Test happy flow. Create all supoorted records
provider = TransipProvider('test', 'unittest', self.bogus_key)
plan = provider.plan(_expected)
self.assertEqual(15, len(plan.changes))
changes = provider.apply(plan)
self.assertEqual(changes, len(plan.changes))
self.assertTrue(exists, "populate should return True")
@patch("octodns.provider.transip.TransIP", make_mock_empty())
def test_plan(self):
# Test happy plan, only create
provider = TransipProvider("test", "unittest", self.bogus_key)
plan = provider.plan(make_expected())
self.assertIsNotNone(plan)
self.assertEqual(15, plan.change_counts["Create"])
self.assertEqual(0, plan.change_counts["Update"])
self.assertEqual(0, plan.change_counts["Delete"])
@patch("octodns.provider.transip.TransIP")
def test_apply(self, client_mock):
# Test happy flow. Create all supported records
domain_mock = Mock()
client_mock.return_value.domains.get.return_value = domain_mock
domain_mock.dns.list.return_value = []
provider = TransipProvider("test", "unittest", self.bogus_key)
plan = provider.plan(make_expected())
self.assertIsNotNone(plan)
provider.apply(plan)
domain_mock.dns.replace.assert_called_once()
# These are the supported ones from tests/config/unit.test.yaml
expected_entries = [
{
"name": "ignored",
"expire": 3600,
"type": "A",
"content": "9.9.9.9",
},
{
"name": "@",
"expire": 3600,
"type": "CAA",
"content": "0 issue ca.unit.tests",
},
{
"name": "sub",
"expire": 3600,
"type": "NS",
"content": "6.2.3.4.",
},
{
"name": "sub",
"expire": 3600,
"type": "NS",
"content": "7.2.3.4.",
},
{
"name": "spf",
"expire": 600,
"type": "SPF",
"content": "v=spf1 ip4:192.168.0.1/16-all",
},
{
"name": "_srv._tcp",
"expire": 600,
"type": "SRV",
"content": "10 20 30 foo-1.unit.tests.",
},
{
"name": "_srv._tcp",
"expire": 600,
"type": "SRV",
"content": "12 20 30 foo-2.unit.tests.",
},
{
"name": "_pop3._tcp",
"expire": 600,
"type": "SRV",
"content": "0 0 0 .",
},
{
"name": "_imap._tcp",
"expire": 600,
"type": "SRV",
"content": "0 0 0 .",
},
{
"name": "txt",
"expire": 600,
"type": "TXT",
"content": "Bah bah black sheep",
},
{
"name": "txt",
"expire": 600,
"type": "TXT",
"content": "have you any wool.",
},
{
"name": "txt",
"expire": 600,
"type": "TXT",
"content": (
"v=DKIM1;k=rsa;s=email;h=sha256;"
"p=A/kinda+of/long/string+with+numb3rs"
),
},
{"name": "@", "expire": 3600, "type": "NS", "content": "6.2.3.4."},
{"name": "@", "expire": 3600, "type": "NS", "content": "7.2.3.4."},
{
"name": "cname",
"expire": 300,
"type": "CNAME",
"content": "unit.tests.",
},
{
"name": "excluded",
"expire": 3600,
"type": "CNAME",
"content": "unit.tests.",
},
{
"name": "www.sub",
"expire": 300,
"type": "A",
"content": "2.2.3.6",
},
{
"name": "included",
"expire": 3600,
"type": "CNAME",
"content": "unit.tests.",
},
{
"name": "mx",
"expire": 300,
"type": "MX",
"content": "10 smtp-4.unit.tests.",
},
{
"name": "mx",
"expire": 300,
"type": "MX",
"content": "20 smtp-2.unit.tests.",
},
{
"name": "mx",
"expire": 300,
"type": "MX",
"content": "30 smtp-3.unit.tests.",
},
{
"name": "mx",
"expire": 300,
"type": "MX",
"content": "40 smtp-1.unit.tests.",
},
{
"name": "aaaa",
"expire": 600,
"type": "AAAA",
"content": "2601:644:500:e210:62f8:1dff:feb8:947a",
},
{"name": "@", "expire": 300, "type": "A", "content": "1.2.3.4"},
{"name": "@", "expire": 300, "type": "A", "content": "1.2.3.5"},
{"name": "www", "expire": 300, "type": "A", "content": "2.2.3.6"},
{
"name": "@",
"expire": 3600,
"type": "SSHFP",
"content": "1 1 7491973e5f8b39d5327cd4e08bc81b05f7710b49",
},
{
"name": "@",
"expire": 3600,
"type": "SSHFP",
"content": "1 1 bf6b6825d2977c511a475bbefb88aad54a92ac73",
},
]
# Unpack from the transip library magic structure...
seen_entries = [
e.__dict__["_attrs"]
for e in domain_mock.dns.replace.mock_calls[0][1][0]
]
self.assertEqual(
sorted(seen_entries, key=itemgetter("name", "type", "expire")),
sorted(expected_entries, key=itemgetter("name", "type", "expire")),
)
@patch("octodns.provider.transip.TransIP")
def test_apply_unsupported(self, client_mock):
# This triggers the if supported statement to give 100% code coverage
domain_mock = Mock()
client_mock.return_value.domains.get.return_value = domain_mock
domain_mock.dns.list.return_value = []
provider = TransipProvider("test", "unittest", self.bogus_key)
plan = provider.plan(make_expected())
self.assertIsNotNone(plan)
# Test apply with only support for A records
provider.SUPPORTS = set(("A"))
provider.apply(plan)
seen_entries = [
e.__dict__["_attrs"]
for e in domain_mock.dns.replace.mock_calls[0][1][0]
]
expected_entries = [
{
"name": "ignored",
"expire": 3600,
"type": "A",
"content": "9.9.9.9",
},
{
"name": "www.sub",
"expire": 300,
"type": "A",
"content": "2.2.3.6",
},
{"name": "@", "expire": 300, "type": "A", "content": "1.2.3.4"},
{"name": "@", "expire": 300, "type": "A", "content": "1.2.3.5"},
{"name": "www", "expire": 300, "type": "A", "content": "2.2.3.6"},
]
self.assertEquals(
sorted(seen_entries, key=itemgetter("name", "type", "expire")),
sorted(expected_entries, key=itemgetter("name", "type", "expire")),
)
@patch("octodns.provider.transip.TransIP")
def test_apply_failure_on_not_found(self, client_mock):
# Test unhappy flow. Trigger 'not found error' in apply stage
# This should normally not happen as populate will capture it first
# but just in case.
changes = [] # reset changes
with self.assertRaises(Exception) as ctx:
provider = TransipProvider('test', 'unittest', self.bogus_key)
plan = provider.plan(_expected)
plan.desired.name = 'notfound.unit.tests.'
changes = provider.apply(plan)
# Changes should not be set due to an Exception
self.assertEqual([], changes)
domain_mock = Mock()
domain_mock.dns.list.return_value = []
client_mock.return_value.domains.get.side_effect = [
domain_mock,
TransIPHTTPError("Not Found", 404),
]
provider = TransipProvider("test", "unittest", self.bogus_key)
self.assertEquals(str('WebFault'),
str(ctx.exception.__class__.__name__))
plan = provider.plan(make_expected())
self.assertEquals(str('102'), ctx.exception.fault.faultcode)
with self.assertRaises(TransipException):
provider.apply(plan)
@patch("octodns.provider.transip.TransIP")
def test_apply_failure_on_error(self, client_mock):
# Test unhappy flow. Trigger a unrecoverable error while saving
_expected = self.make_expected() # reset expected
changes = [] # reset changes
domain_mock = Mock()
domain_mock.dns.list.return_value = []
domain_mock.dns.replace.side_effect = [
TransIPHTTPError("Not Found", 500)
]
client_mock.return_value.domains.get.return_value = domain_mock
provider = TransipProvider("test", "unittest", self.bogus_key)
plan = provider.plan(make_expected())
with self.assertRaises(Exception) as ctx:
provider = TransipProvider('test', 'unittest', self.bogus_key)
plan = provider.plan(_expected)
plan.desired.name = 'failsetdns.unit.tests.'
changes = provider.apply(plan)
with self.assertRaises(TransipException):
provider.apply(plan)
# Changes should not be set due to an Exception
self.assertEqual([], changes)
self.assertEquals(str('TransipException'),
str(ctx.exception.__class__.__name__))
class TestParseFQDN(TestCase):
def test_parse_fqdn(self):
zone = Zone("unit.tests.", [])
self.assertEquals("www.unit.tests.", _parse_to_fqdn("www", zone))
self.assertEquals(
"www.unit.tests.", _parse_to_fqdn("www.unit.tests.", zone)
)
self.assertEquals(
"www.sub.sub.sub.unit.tests.",
_parse_to_fqdn("www.sub.sub.sub", zone),
)
self.assertEquals("unit.tests.", _parse_to_fqdn("@", zone))

Loading…
Cancel
Save