Browse Source

Extract PowerDNSProvider and implement shim

pull/822/head
Ross McFarland 4 years ago
parent
commit
d6ab035838
No known key found for this signature in database GPG Key ID: 943B179E15D3B22A
4 changed files with 21 additions and 1231 deletions
  1. +1
    -1
      README.md
  2. +15
    -518
      octodns/provider/powerdns.py
  3. +0
    -303
      tests/fixtures/powerdns-full-data.json
  4. +5
    -409
      tests/test_octodns_provider_powerdns.py

+ 1
- 1
README.md View File

@ -208,7 +208,7 @@ The above command pulled the existing data out of Route53 and placed the results
| [MythicBeastsProvider](/octodns/provider/mythicbeasts.py) | Mythic Beasts | A, AAAA, ALIAS, CNAME, MX, NS, SRV, SSHFP, CAA, TXT | No | | | [MythicBeastsProvider](/octodns/provider/mythicbeasts.py) | Mythic Beasts | A, AAAA, ALIAS, CNAME, MX, NS, SRV, SSHFP, CAA, TXT | No | |
| [Ns1Provider](/octodns/provider/ns1.py) | ns1-python | All | Yes | | | [Ns1Provider](/octodns/provider/ns1.py) | ns1-python | All | Yes | |
| [OVH](/octodns/provider/ovh.py) | ovh | A, AAAA, CAA, CNAME, MX, NAPTR, NS, PTR, SPF, SRV, SSHFP, TXT, DKIM | No | | | [OVH](/octodns/provider/ovh.py) | ovh | A, AAAA, CAA, CNAME, MX, NAPTR, NS, PTR, SPF, SRV, SSHFP, TXT, DKIM | No | |
| [PowerDnsProvider](/octodns/provider/powerdns.py) | | All | No | |
| [PowerDnsProvider](https://github.com/octodns/octodns-powerdns/) | | All | No | |
| [Rackspace](/octodns/provider/rackspace.py) | | A, AAAA, ALIAS, CNAME, MX, NS, PTR, SPF, TXT | No | | | [Rackspace](/octodns/provider/rackspace.py) | | A, AAAA, ALIAS, CNAME, MX, NS, PTR, SPF, TXT | No | |
| [Route53](/octodns/provider/route53.py) | boto3 | A, AAAA, CAA, CNAME, MX, NAPTR, NS, PTR, SPF, SRV, TXT | Both | CNAME health checks don't support a Host header | | [Route53](/octodns/provider/route53.py) | boto3 | A, AAAA, CAA, CNAME, MX, NAPTR, NS, PTR, SPF, SRV, TXT | Both | CNAME health checks don't support a Host header |
| [Selectel](/octodns/provider/selectel.py) | | A, AAAA, CNAME, MX, NS, SPF, SRV, TXT | No | | | [Selectel](/octodns/provider/selectel.py) | | A, AAAA, CNAME, MX, NS, SPF, SRV, TXT | No | |


+ 15
- 518
octodns/provider/powerdns.py View File

@ -5,521 +5,18 @@
from __future__ import absolute_import, division, print_function, \ from __future__ import absolute_import, division, print_function, \
unicode_literals unicode_literals
from requests import HTTPError, Session
from operator import itemgetter
import logging
from ..record import Create, Record
from .base import BaseProvider
class PowerDnsBaseProvider(BaseProvider):
SUPPORTS_GEO = False
SUPPORTS_DYNAMIC = False
SUPPORTS = set(('A', 'AAAA', 'ALIAS', 'CAA', 'CNAME', 'LOC', 'MX', 'NAPTR',
'NS', 'PTR', 'SPF', 'SSHFP', 'SRV', 'TXT'))
TIMEOUT = 5
def __init__(self, id, host, api_key, port=8081,
scheme="http", timeout=TIMEOUT, *args, **kwargs):
super(PowerDnsBaseProvider, self).__init__(id, *args, **kwargs)
self.host = host
self.port = port
self.scheme = scheme
self.timeout = timeout
self._powerdns_version = None
sess = Session()
sess.headers.update({'X-API-Key': api_key})
self._sess = sess
def _request(self, method, path, data=None):
self.log.debug('_request: method=%s, path=%s', method, path)
url = f'{self.scheme}://{self.host}:{self.port}/api/v1/servers/' \
f'localhost/{path}'.rstrip('/')
# Strip trailing / from url.
resp = self._sess.request(method, url, json=data, timeout=self.timeout)
self.log.debug('_request: status=%d', resp.status_code)
resp.raise_for_status()
return resp
def _get(self, path, data=None):
return self._request('GET', path, data=data)
def _post(self, path, data=None):
return self._request('POST', path, data=data)
def _patch(self, path, data=None):
return self._request('PATCH', path, data=data)
def _data_for_multiple(self, rrset):
# TODO: geo not supported
return {
'type': rrset['type'],
'values': [r['content'] for r in rrset['records']],
'ttl': rrset['ttl']
}
_data_for_A = _data_for_multiple
_data_for_AAAA = _data_for_multiple
_data_for_NS = _data_for_multiple
def _data_for_CAA(self, rrset):
values = []
for record in rrset['records']:
flags, tag, value = record['content'].split(' ', 2)
values.append({
'flags': flags,
'tag': tag,
'value': value[1:-1],
})
return {
'type': rrset['type'],
'values': values,
'ttl': rrset['ttl']
}
def _data_for_single(self, rrset):
return {
'type': rrset['type'],
'value': rrset['records'][0]['content'],
'ttl': rrset['ttl']
}
_data_for_ALIAS = _data_for_single
_data_for_CNAME = _data_for_single
_data_for_PTR = _data_for_single
def _data_for_quoted(self, rrset):
return {
'type': rrset['type'],
'values': [r['content'][1:-1] for r in rrset['records']],
'ttl': rrset['ttl']
}
_data_for_SPF = _data_for_quoted
_data_for_TXT = _data_for_quoted
def _data_for_LOC(self, rrset):
values = []
for record in rrset['records']:
lat_degrees, lat_minutes, lat_seconds, lat_direction, \
long_degrees, long_minutes, long_seconds, long_direction, \
altitude, size, precision_horz, precision_vert = \
record['content'].replace('m', '').split(' ', 11)
values.append({
'lat_degrees': int(lat_degrees),
'lat_minutes': int(lat_minutes),
'lat_seconds': float(lat_seconds),
'lat_direction': lat_direction,
'long_degrees': int(long_degrees),
'long_minutes': int(long_minutes),
'long_seconds': float(long_seconds),
'long_direction': long_direction,
'altitude': float(altitude),
'size': float(size),
'precision_horz': float(precision_horz),
'precision_vert': float(precision_vert),
})
return {
'ttl': rrset['ttl'],
'type': rrset['type'],
'values': values
}
def _data_for_MX(self, rrset):
values = []
for record in rrset['records']:
preference, exchange = record['content'].split(' ', 1)
values.append({
'preference': preference,
'exchange': exchange,
})
return {
'type': rrset['type'],
'values': values,
'ttl': rrset['ttl']
}
def _data_for_NAPTR(self, rrset):
values = []
for record in rrset['records']:
order, preference, flags, service, regexp, replacement = \
record['content'].split(' ', 5)
values.append({
'order': order,
'preference': preference,
'flags': flags[1:-1],
'service': service[1:-1],
'regexp': regexp[1:-1],
'replacement': replacement,
})
return {
'type': rrset['type'],
'values': values,
'ttl': rrset['ttl']
}
def _data_for_SSHFP(self, rrset):
values = []
for record in rrset['records']:
algorithm, fingerprint_type, fingerprint = \
record['content'].split(' ', 2)
values.append({
'algorithm': algorithm,
'fingerprint_type': fingerprint_type,
'fingerprint': fingerprint,
})
return {
'type': rrset['type'],
'values': values,
'ttl': rrset['ttl']
}
def _data_for_SRV(self, rrset):
values = []
for record in rrset['records']:
priority, weight, port, target = \
record['content'].split(' ', 3)
values.append({
'priority': priority,
'weight': weight,
'port': port,
'target': target,
})
return {
'type': rrset['type'],
'values': values,
'ttl': rrset['ttl']
}
@property
def powerdns_version(self):
if self._powerdns_version is None:
try:
resp = self._get('')
except HTTPError as e:
if e.response.status_code == 401:
# Nicer error message for auth problems
raise Exception(f'PowerDNS unauthorized host={self.host}')
raise
version = resp.json()['version']
self.log.debug('powerdns_version: got version %s from server',
version)
# The extra `-` split is to handle pre-release and source built
# versions like 4.5.0-alpha0.435.master.gcb114252b
self._powerdns_version = [
int(p.split('-')[0]) for p in version.split('.')[:3]]
return self._powerdns_version
@property
def soa_edit_api(self):
# >>> [4, 4, 3] >= [4, 3]
# True
# >>> [4, 3, 3] >= [4, 3]
# True
# >>> [4, 1, 3] >= [4, 3]
# False
if self.powerdns_version >= [4, 3]:
return 'DEFAULT'
return 'INCEPTION-INCREMENT'
@property
def check_status_not_found(self):
# >=4.2.x returns 404 when not found
return self.powerdns_version >= [4, 2]
def populate(self, zone, target=False, lenient=False):
self.log.debug('populate: name=%s, target=%s, lenient=%s', zone.name,
target, lenient)
resp = None
try:
resp = self._get(f'zones/{zone.name}')
self.log.debug('populate: loaded')
except HTTPError as e:
error = self._get_error(e)
if e.response.status_code == 401:
# Nicer error message for auth problems
raise Exception(f'PowerDNS unauthorized host={self.host}')
elif e.response.status_code == 404 \
and self.check_status_not_found:
# 404 means powerdns doesn't know anything about the requested
# domain. We'll just ignore it here and leave the zone
# untouched.
pass
elif e.response.status_code == 422 \
and error.startswith('Could not find domain ') \
and not self.check_status_not_found:
# 422 means powerdns doesn't know anything about the requested
# domain. We'll just ignore it here and leave the zone
# untouched.
pass
else:
# just re-throw
raise
before = len(zone.records)
exists = False
if resp:
exists = True
for rrset in resp.json()['rrsets']:
_type = rrset['type']
if _type == 'SOA':
continue
data_for = getattr(self, f'_data_for_{_type}')
record_name = zone.hostname_from_fqdn(rrset['name'])
record = Record.new(zone, record_name, data_for(rrset),
source=self, lenient=lenient)
zone.add_record(record, lenient=lenient)
self.log.info('populate: found %s records, exists=%s',
len(zone.records) - before, exists)
return exists
def _records_for_multiple(self, record):
return [{'content': v, 'disabled': False}
for v in record.values]
_records_for_A = _records_for_multiple
_records_for_AAAA = _records_for_multiple
_records_for_NS = _records_for_multiple
def _records_for_CAA(self, record):
return [{
'content': f'{v.flags} {v.tag} "{v.value}"',
'disabled': False
} for v in record.values]
def _records_for_single(self, record):
return [{'content': record.value, 'disabled': False}]
_records_for_ALIAS = _records_for_single
_records_for_CNAME = _records_for_single
_records_for_PTR = _records_for_single
def _records_for_quoted(self, record):
return [{'content': f'"{v}"', 'disabled': False}
for v in record.values]
_records_for_SPF = _records_for_quoted
_records_for_TXT = _records_for_quoted
def _records_for_LOC(self, record):
return [{
'content':
'%d %d %0.3f %s %d %d %.3f %s %0.2fm %0.2fm %0.2fm %0.2fm' %
(
int(v.lat_degrees),
int(v.lat_minutes),
float(v.lat_seconds),
v.lat_direction,
int(v.long_degrees),
int(v.long_minutes),
float(v.long_seconds),
v.long_direction,
float(v.altitude),
float(v.size),
float(v.precision_horz),
float(v.precision_vert)
),
'disabled': False
} for v in record.values]
def _records_for_MX(self, record):
return [{
'content': f'{v.preference} {v.exchange}',
'disabled': False
} for v in record.values]
def _records_for_NAPTR(self, record):
return [{
'content': f'{v.order} {v.preference} "{v.flags}" "{v.service}" '
f'"{v.regexp}" {v.replacement}',
'disabled': False
} for v in record.values]
def _records_for_SSHFP(self, record):
return [{
'content': f'{v.algorithm} {v.fingerprint_type} {v.fingerprint}',
'disabled': False
} for v in record.values]
def _records_for_SRV(self, record):
return [{
'content': f'{v.priority} {v.weight} {v.port} {v.target}',
'disabled': False
} for v in record.values]
def _mod_Create(self, change):
new = change.new
records_for = getattr(self, f'_records_for_{new._type}')
return {
'name': new.fqdn,
'type': new._type,
'ttl': new.ttl,
'changetype': 'REPLACE',
'records': records_for(new)
}
_mod_Update = _mod_Create
def _mod_Delete(self, change):
existing = change.existing
records_for = getattr(self, f'_records_for_{existing._type}')
return {
'name': existing.fqdn,
'type': existing._type,
'ttl': existing.ttl,
'changetype': 'DELETE',
'records': records_for(existing)
}
def _get_nameserver_record(self, existing):
return None
def _extra_changes(self, existing, **kwargs):
self.log.debug('_extra_changes: zone=%s', existing.name)
ns = self._get_nameserver_record(existing)
if not ns:
return []
# sorting mostly to make things deterministic for testing, but in
# theory it let us find what we're after quicker (though sorting would
# be more expensive.)
for record in sorted(existing.records):
if record == ns:
# We've found the top-level NS record, return any changes
change = record.changes(ns, self)
self.log.debug('_extra_changes: change=%s', change)
if change:
# We need to modify an existing record
return [change]
# No change is necessary
return []
# No existing top-level NS
self.log.debug('_extra_changes: create')
return [Create(ns)]
def _get_error(self, http_error):
try:
return http_error.response.json()['error']
except Exception:
return ''
def _apply(self, plan):
desired = plan.desired
changes = plan.changes
self.log.debug('_apply: zone=%s, len(changes)=%d', desired.name,
len(changes))
mods = []
for change in changes:
class_name = change.__class__.__name__
mods.append(getattr(self, f'_mod_{class_name}')(change))
# Ensure that any DELETE modifications always occur before any REPLACE
# modifications. This ensures that an A record can be replaced by a
# CNAME record and vice-versa.
mods.sort(key=itemgetter('changetype'))
self.log.debug('_apply: sending change request')
try:
self._patch(f'zones/{desired.name}', data={'rrsets': mods})
self.log.debug('_apply: patched')
except HTTPError as e:
error = self._get_error(e)
if not (
(
e.response.status_code == 404 and
self.check_status_not_found
) or (
e.response.status_code == 422 and
error.startswith('Could not find domain ') and
not self.check_status_not_found
)
):
self.log.error(
'_apply: status=%d, text=%s',
e.response.status_code,
e.response.text)
raise
self.log.info('_apply: creating zone=%s', desired.name)
# 404 or 422 means powerdns doesn't know anything about the
# requested domain. We'll try to create it with the correct
# records instead of update. Hopefully all the mods are
# creates :-)
data = {
'name': desired.name,
'kind': 'Master',
'masters': [],
'nameservers': [],
'rrsets': mods,
'soa_edit_api': self.soa_edit_api,
'serial': 0,
}
try:
self._post('zones', data)
except HTTPError as e:
self.log.error('_apply: status=%d, text=%s',
e.response.status_code,
e.response.text)
raise
self.log.debug('_apply: created')
self.log.debug('_apply: complete')
class PowerDnsProvider(PowerDnsBaseProvider):
'''
PowerDNS API v4 Provider
powerdns:
class: octodns.provider.powerdns.PowerDnsProvider
# The host on which PowerDNS api is listening (required)
host: fqdn
# The api key that grans access (required)
api_key: api-key
# The port on which PowerDNS api is listening (optional, default 8081)
port: 8081
# The nameservers to use for this provider (optional,
# default unmanaged)
nameserver_values:
- 1.2.3.4.
- 1.2.3.5.
# The nameserver record TTL when managed, (optional, default 600)
nameserver_ttl: 600
'''
def __init__(self, id, host, api_key, port=8081, nameserver_values=None,
nameserver_ttl=600,
*args, **kwargs):
self.log = logging.getLogger(f'PowerDnsProvider[{id}]')
self.log.debug('__init__: id=%s, host=%s, port=%d, '
'nameserver_values=%s, nameserver_ttl=%d',
id, host, port, nameserver_values, nameserver_ttl)
super(PowerDnsProvider, self).__init__(id, host=host, api_key=api_key,
port=port,
*args, **kwargs)
self.nameserver_values = nameserver_values
self.nameserver_ttl = nameserver_ttl
def _get_nameserver_record(self, existing):
if self.nameserver_values:
return Record.new(existing, '', {
'type': 'NS',
'ttl': self.nameserver_ttl,
'values': self.nameserver_values,
}, source=self)
return super(PowerDnsProvider, self)._get_nameserver_record(existing)
from logging import getLogger
logger = getLogger('PowerDNS')
try:
logger.warn('octodns_powerdns shimmed. Update your provider class to '
'octodns_powerdns.PowerDnsProvider. '
'Shim will be removed in 1.0')
from octodns_powerdns import PowerDnsProvider, PowerDnsBaseProvider
PowerDnsProvider # pragma: no cover
PowerDnsBaseProvider # pragma: no cover
except ModuleNotFoundError:
logger.exception('PowerDnsProvider has been moved into a seperate module, '
'octodns_powerdns is now required. Provider class should '
'be updated to octodns_powerdns.PowerDnsProvider')
raise

+ 0
- 303
tests/fixtures/powerdns-full-data.json View File

@ -1,303 +0,0 @@
{
"account": "",
"dnssec": false,
"id": "unit.tests.",
"kind": "Master",
"last_check": 0,
"masters": [],
"name": "unit.tests.",
"notified_serial": 2017012803,
"rrsets": [
{
"comments": [],
"name": "mx.unit.tests.",
"records": [
{
"content": "40 smtp-1.unit.tests.",
"disabled": false
},
{
"content": "20 smtp-2.unit.tests.",
"disabled": false
},
{
"content": "30 smtp-3.unit.tests.",
"disabled": false
},
{
"content": "10 smtp-4.unit.tests.",
"disabled": false
}
],
"ttl": 300,
"type": "MX"
},
{
"comments": [],
"name": "loc.unit.tests.",
"records": [
{
"content": "31 58 52.100 S 115 49 11.700 E 20.00m 10.00m 10.00m 2.00m",
"disabled": false
},
{
"content": "53 13 10.000 N 2 18 26.000 W 20.00m 10.00m 1000.00m 2.00m",
"disabled": false
}
],
"ttl": 300,
"type": "LOC"
},
{
"comments": [],
"name": "sub.unit.tests.",
"records": [
{
"content": "6.2.3.4.",
"disabled": false
}, {
"content": "7.2.3.4.",
"disabled": false
}
],
"ttl": 3600,
"type": "NS"
},
{
"comments": [],
"name": "www.unit.tests.",
"records": [
{
"content": "2.2.3.6",
"disabled": false
}
],
"ttl": 300,
"type": "A"
},
{
"comments": [],
"name": "_imap._tcp.unit.tests.",
"records": [
{
"content": "0 0 0 .",
"disabled": false
}
],
"ttl": 600,
"type": "SRV"
},
{
"comments": [],
"name": "_pop3._tcp.unit.tests.",
"records": [
{
"content": "0 0 0 .",
"disabled": false
}
],
"ttl": 600,
"type": "SRV"
},
{
"comments": [],
"name": "_srv._tcp.unit.tests.",
"records": [
{
"content": "10 20 30 foo-1.unit.tests.",
"disabled": false
},
{
"content": "12 20 30 foo-2.unit.tests.",
"disabled": false
}
],
"ttl": 600,
"type": "SRV"
},
{
"comments": [],
"name": "txt.unit.tests.",
"records": [
{
"content": "\"Bah bah black sheep\"",
"disabled": false
},
{
"content": "\"have you any wool.\"",
"disabled": false
},
{
"content": "\"v=DKIM1\\;k=rsa\\;s=email\\;h=sha256\\;p=A/kinda+of/long/string+with+numb3rs\"",
"disabled": false
}
],
"ttl": 600,
"type": "TXT"
},
{
"comments": [],
"name": "naptr.unit.tests.",
"records": [
{
"content": "10 100 \"S\" \"SIP+D2U\" \"!^.*$!sip:info@bar.example.com!\" .",
"disabled": false
},
{
"content": "100 100 \"U\" \"SIP+D2U\" \"!^.*$!sip:info@bar.example.com!\" .",
"disabled": false
}
],
"ttl": 600,
"type": "NAPTR"
},
{
"comments": [],
"name": "ptr.unit.tests.",
"records": [
{
"content": "foo.bar.com.",
"disabled": false
}
],
"ttl": 300,
"type": "PTR"
},
{
"comments": [],
"name": "spf.unit.tests.",
"records": [
{
"content": "\"v=spf1 ip4:192.168.0.1/16-all\"",
"disabled": false
}
],
"ttl": 600,
"type": "SPF"
},
{
"comments": [],
"name": "cname.unit.tests.",
"records": [
{
"content": "unit.tests.",
"disabled": false
}
],
"ttl": 300,
"type": "CNAME"
},
{
"comments": [],
"name": "www.sub.unit.tests.",
"records": [
{
"content": "2.2.3.6",
"disabled": false
}
],
"ttl": 300,
"type": "A"
},
{
"comments": [],
"name": "aaaa.unit.tests.",
"records": [
{
"content": "2601:644:500:e210:62f8:1dff:feb8:947a",
"disabled": false
}
],
"ttl": 600,
"type": "AAAA"
},
{
"comments": [],
"name": "unit.tests.",
"records": [
{
"content": "1 1 7491973e5f8b39d5327cd4e08bc81b05f7710b49",
"disabled": false
},
{
"content": "1 1 bf6b6825d2977c511a475bbefb88aad54a92ac73",
"disabled": false
}
],
"ttl": 3600,
"type": "SSHFP"
},
{
"comments": [],
"name": "unit.tests.",
"records": [
{
"content": "ns1.ext.unit.tests. hostmaster.unit.tests. 2017012803 3600 600 604800 60",
"disabled": false
}
],
"ttl": 3600,
"type": "SOA"
},
{
"comments": [],
"name": "unit.tests.",
"records": [
{
"content": "1.1.1.1.",
"disabled": false
},
{
"content": "4.4.4.4.",
"disabled": false
}
],
"ttl": 600,
"type": "NS"
},
{
"comments": [],
"name": "unit.tests.",
"records": [
{
"content": "1.2.3.5",
"disabled": false
},
{
"content": "1.2.3.4",
"disabled": false
}
],
"ttl": 300,
"type": "A"
},
{
"comments": [],
"name": "unit.tests.",
"records": [
{
"content": "0 issue \"ca.unit.tests\"",
"disabled": false
}
],
"ttl": 3600,
"type": "CAA"
},
{
"comments": [],
"name": "included.unit.tests.",
"records": [
{
"content": "unit.tests.",
"disabled": false
}
],
"ttl": 3600,
"type": "CNAME"
}
],
"serial": 2017012803,
"soa_edit": "",
"soa_edit_api": "INCEPTION-INCREMENT",
"url": "api/v1/servers/localhost/zones/unit.tests."
}

+ 5
- 409
tests/test_octodns_provider_powerdns.py View File

@ -5,416 +5,12 @@
from __future__ import absolute_import, division, print_function, \ from __future__ import absolute_import, division, print_function, \
unicode_literals unicode_literals
from json import loads, dumps
from os.path import dirname, join
from requests import HTTPError
from requests_mock import ANY, mock as requests_mock
from unittest import TestCase from unittest import TestCase
from octodns.record import Record
from octodns.provider.powerdns import PowerDnsProvider
from octodns.provider.yaml import YamlProvider
from octodns.zone import Zone
EMPTY_TEXT = '''
{
"account": "",
"dnssec": false,
"id": "xunit.tests.",
"kind": "Master",
"last_check": 0,
"masters": [],
"name": "xunit.tests.",
"notified_serial": 0,
"rrsets": [],
"serial": 2017012801,
"soa_edit": "",
"soa_edit_api": "INCEPTION-INCREMENT",
"url": "api/v1/servers/localhost/zones/xunit.tests."
}
'''
class TestPowerDnsShim(TestCase):
with open('./tests/fixtures/powerdns-full-data.json') as fh:
FULL_TEXT = fh.read()
class TestPowerDnsProvider(TestCase):
def test_provider_version_detection(self):
provider = PowerDnsProvider('test', 'non.existent', 'api-key',
nameserver_values=['8.8.8.8.',
'9.9.9.9.'])
# Bad auth
with requests_mock() as mock:
mock.get(ANY, status_code=401, text='Unauthorized')
with self.assertRaises(Exception) as ctx:
provider.powerdns_version
self.assertTrue('unauthorized' in str(ctx.exception))
# Api not found
with requests_mock() as mock:
mock.get(ANY, status_code=404, text='Not Found')
with self.assertRaises(Exception) as ctx:
provider.powerdns_version
self.assertTrue('404' in str(ctx.exception))
# Test version detection
with requests_mock() as mock:
mock.get('http://non.existent:8081/api/v1/servers/localhost',
status_code=200, json={'version': "4.1.10"})
self.assertEquals(provider.powerdns_version, [4, 1, 10])
# Test version detection for second time (should stay at 4.1.10)
with requests_mock() as mock:
mock.get('http://non.existent:8081/api/v1/servers/localhost',
status_code=200, json={'version': "4.2.0"})
self.assertEquals(provider.powerdns_version, [4, 1, 10])
# Test version detection
with requests_mock() as mock:
mock.get('http://non.existent:8081/api/v1/servers/localhost',
status_code=200, json={'version': "4.2.0"})
# Reset version, so detection will try again
provider._powerdns_version = None
self.assertNotEquals(provider.powerdns_version, [4, 1, 10])
# Test version detection with pre-releases
with requests_mock() as mock:
# Reset version, so detection will try again
provider._powerdns_version = None
mock.get('http://non.existent:8081/api/v1/servers/localhost',
status_code=200, json={'version': "4.4.0-alpha1"})
self.assertEquals(provider.powerdns_version, [4, 4, 0])
provider._powerdns_version = None
mock.get('http://non.existent:8081/api/v1/servers/localhost',
status_code=200,
json={'version': "4.5.0-alpha0.435.master.gcb114252b"})
self.assertEquals(provider.powerdns_version, [4, 5, 0])
def test_provider_version_config(self):
provider = PowerDnsProvider('test', 'non.existent', 'api-key',
nameserver_values=['8.8.8.8.',
'9.9.9.9.'])
# Test version 4.1.0
provider._powerdns_version = None
with requests_mock() as mock:
mock.get('http://non.existent:8081/api/v1/servers/localhost',
status_code=200, json={'version': "4.1.10"})
self.assertEquals(provider.soa_edit_api, 'INCEPTION-INCREMENT')
self.assertFalse(
provider.check_status_not_found,
'check_status_not_found should be false '
'for version 4.1.x and below')
# Test version 4.2.0
provider._powerdns_version = None
with requests_mock() as mock:
mock.get('http://non.existent:8081/api/v1/servers/localhost',
status_code=200, json={'version': "4.2.0"})
self.assertEquals(provider.soa_edit_api, 'INCEPTION-INCREMENT')
self.assertTrue(
provider.check_status_not_found,
'check_status_not_found should be true for version 4.2.x')
# Test version 4.3.0
provider._powerdns_version = None
with requests_mock() as mock:
mock.get('http://non.existent:8081/api/v1/servers/localhost',
status_code=200, json={'version': "4.3.0"})
self.assertEquals(provider.soa_edit_api, 'DEFAULT')
self.assertTrue(
provider.check_status_not_found,
'check_status_not_found should be true for version 4.3.x')
def test_provider(self):
provider = PowerDnsProvider('test', 'non.existent', 'api-key',
nameserver_values=['8.8.8.8.',
'9.9.9.9.'])
# Test version detection
with requests_mock() as mock:
mock.get('http://non.existent:8081/api/v1/servers/localhost',
status_code=200, json={'version': "4.1.10"})
self.assertEquals(provider.powerdns_version, [4, 1, 10])
# Bad auth
with requests_mock() as mock:
mock.get(ANY, status_code=401, text='Unauthorized')
with self.assertRaises(Exception) as ctx:
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertTrue('unauthorized' in str(ctx.exception))
# General error
with requests_mock() as mock:
mock.get(ANY, status_code=502, text='Things caught fire')
with self.assertRaises(HTTPError) as ctx:
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertEquals(502, ctx.exception.response.status_code)
# Non-existent zone in PowerDNS <4.3.0 doesn't populate anything
with requests_mock() as mock:
mock.get(ANY, status_code=422,
json={'error': "Could not find domain 'unit.tests.'"})
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertEquals(set(), zone.records)
# Non-existent zone in PowerDNS >=4.2.0 doesn't populate anything
provider._powerdns_version = [4, 2, 0]
with requests_mock() as mock:
mock.get(ANY, status_code=404, text='Not Found')
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertEquals(set(), zone.records)
provider._powerdns_version = [4, 1, 0]
# The rest of this is messy/complicated b/c it's dealing with mocking
expected = Zone('unit.tests.', [])
source = YamlProvider('test', join(dirname(__file__), 'config'))
source.populate(expected)
expected_n = len(expected.records) - 4
self.assertEquals(19, expected_n)
# No diffs == no changes
with requests_mock() as mock:
mock.get(ANY, status_code=200, text=FULL_TEXT)
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertEquals(19, len(zone.records))
changes = expected.changes(zone, provider)
self.assertEquals(0, len(changes))
# Used in a minute
def assert_rrsets_callback(request, context):
data = loads(request.body)
self.assertEquals(expected_n, len(data['rrsets']))
return ''
# No existing records -> creates for every record in expected
with requests_mock() as mock:
mock.get(ANY, status_code=200, text=EMPTY_TEXT)
# post 201, is response to the create with data
mock.patch(ANY, status_code=201, text=assert_rrsets_callback)
plan = provider.plan(expected)
self.assertEquals(expected_n, len(plan.changes))
self.assertEquals(expected_n, provider.apply(plan))
self.assertTrue(plan.exists)
# Non-existent zone -> creates for every record in expected
# OMG this is fucking ugly, probably better to ditch requests_mocks and
# just mock things for real as it doesn't seem to provide a way to get
# at the request params or verify that things were called from what I
# can tell
not_found = {'error': "Could not find domain 'unit.tests.'"}
with requests_mock() as mock:
# get 422's, unknown zone
mock.get(ANY, status_code=422, text=dumps(not_found))
# patch 422's, unknown zone
mock.patch(ANY, status_code=422, text=dumps(not_found))
# post 201, is response to the create with data
mock.post(ANY, status_code=201, text=assert_rrsets_callback)
plan = provider.plan(expected)
self.assertEquals(expected_n, len(plan.changes))
self.assertEquals(expected_n, provider.apply(plan))
self.assertFalse(plan.exists)
provider._powerdns_version = [4, 2, 0]
with requests_mock() as mock:
# get 404's, unknown zone
mock.get(ANY, status_code=404, text='')
# patch 404's, unknown zone
mock.patch(ANY, status_code=404, text=dumps(not_found))
# post 201, is response to the create with data
mock.post(ANY, status_code=201, text=assert_rrsets_callback)
plan = provider.plan(expected)
self.assertEquals(expected_n, len(plan.changes))
self.assertEquals(expected_n, provider.apply(plan))
self.assertFalse(plan.exists)
provider._powerdns_version = [4, 1, 0]
with requests_mock() as mock:
# get 422's, unknown zone
mock.get(ANY, status_code=422, text=dumps(not_found))
# patch 422's,
data = {'error': "Key 'name' not present or not a String"}
mock.patch(ANY, status_code=422, text=dumps(data))
with self.assertRaises(HTTPError) as ctx:
plan = provider.plan(expected)
provider.apply(plan)
response = ctx.exception.response
self.assertEquals(422, response.status_code)
self.assertTrue('error' in response.json())
with requests_mock() as mock:
# get 422's, unknown zone
mock.get(ANY, status_code=422, text=dumps(not_found))
# patch 500's, things just blew up
mock.patch(ANY, status_code=500, text='')
with self.assertRaises(HTTPError):
plan = provider.plan(expected)
provider.apply(plan)
with requests_mock() as mock:
# get 422's, unknown zone
mock.get(ANY, status_code=422, text=dumps(not_found))
# patch 500's, things just blew up
mock.patch(ANY, status_code=422, text=dumps(not_found))
# post 422's, something wrong with create
mock.post(ANY, status_code=422, text='Hello Word!')
with self.assertRaises(HTTPError):
plan = provider.plan(expected)
provider.apply(plan)
def test_small_change(self):
provider = PowerDnsProvider('test', 'non.existent', 'api-key')
expected = Zone('unit.tests.', [])
source = YamlProvider('test', join(dirname(__file__), 'config'))
source.populate(expected)
self.assertEquals(23, len(expected.records))
# A small change to a single record
with requests_mock() as mock:
mock.get(ANY, status_code=200, text=FULL_TEXT)
mock.get('http://non.existent:8081/api/v1/servers/localhost',
status_code=200, json={'version': '4.1.0'})
missing = Zone(expected.name, [])
# Find and delete the SPF record
for record in expected.records:
if record._type != 'SPF':
missing.add_record(record)
def assert_delete_callback(request, context):
self.assertEquals({
'rrsets': [{
'records': [
{'content': '"v=spf1 ip4:192.168.0.1/16-all"',
'disabled': False}
],
'changetype': 'DELETE',
'type': 'SPF',
'name': 'spf.unit.tests.',
'ttl': 600
}]
}, loads(request.body))
return ''
mock.patch(ANY, status_code=201, text=assert_delete_callback)
plan = provider.plan(missing)
self.assertEquals(1, len(plan.changes))
self.assertEquals(1, provider.apply(plan))
def test_existing_nameservers(self):
ns_values = ['8.8.8.8.', '9.9.9.9.']
provider = PowerDnsProvider('test', 'non.existent', 'api-key',
nameserver_values=ns_values)
expected = Zone('unit.tests.', [])
ns_record = Record.new(expected, '', {
'type': 'NS',
'ttl': 600,
'values': ns_values
})
expected.add_record(ns_record)
# no changes
with requests_mock() as mock:
data = {
'rrsets': [{
'comments': [],
'name': 'unit.tests.',
'records': [
{
'content': '8.8.8.8.',
'disabled': False
},
{
'content': '9.9.9.9.',
'disabled': False
}
],
'ttl': 600,
'type': 'NS'
}, {
'comments': [],
'name': 'unit.tests.',
'records': [{
'content': '1.2.3.4',
'disabled': False,
}],
'ttl': 60,
'type': 'A'
}]
}
mock.get(ANY, status_code=200, json=data)
mock.get('http://non.existent:8081/api/v1/servers/localhost',
status_code=200, json={'version': '4.1.0'})
unrelated_record = Record.new(expected, '', {
'type': 'A',
'ttl': 60,
'value': '1.2.3.4'
})
expected.add_record(unrelated_record)
plan = provider.plan(expected)
self.assertFalse(plan)
# remove it now that we don't need the unrelated change any longer
expected._remove_record(unrelated_record)
# ttl diff
with requests_mock() as mock:
data = {
'rrsets': [{
'comments': [],
'name': 'unit.tests.',
'records': [
{
'content': '8.8.8.8.',
'disabled': False
},
{
'content': '9.9.9.9.',
'disabled': False
},
],
'ttl': 3600,
'type': 'NS'
}]
}
mock.get(ANY, status_code=200, json=data)
mock.get('http://non.existent:8081/api/v1/servers/localhost',
status_code=200, json={'version': '4.1.0'})
plan = provider.plan(expected)
self.assertEquals(1, len(plan.changes))
# create
with requests_mock() as mock:
data = {
'rrsets': []
}
mock.get(ANY, status_code=200, json=data)
plan = provider.plan(expected)
self.assertEquals(1, len(plan.changes))
def test_missing(self):
with self.assertRaises(ModuleNotFoundError):
from octodns.provider.powerdns import PowerDnsProvider
PowerDnsProvider

Loading…
Cancel
Save