From d6ab035838a037f535314ce417d43b3dbe953164 Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Tue, 21 Dec 2021 13:28:22 -0800 Subject: [PATCH] Extract PowerDNSProvider and implement shim --- README.md | 2 +- octodns/provider/powerdns.py | 533 +----------------------- tests/fixtures/powerdns-full-data.json | 303 -------------- tests/test_octodns_provider_powerdns.py | 414 +----------------- 4 files changed, 21 insertions(+), 1231 deletions(-) delete mode 100644 tests/fixtures/powerdns-full-data.json diff --git a/README.md b/README.md index 1dc4f91..acd4395 100644 --- a/README.md +++ b/README.md @@ -208,7 +208,7 @@ The above command pulled the existing data out of Route53 and placed the results | [MythicBeastsProvider](/octodns/provider/mythicbeasts.py) | Mythic Beasts | A, AAAA, ALIAS, CNAME, MX, NS, SRV, SSHFP, CAA, TXT | No | | | [Ns1Provider](/octodns/provider/ns1.py) | ns1-python | All | Yes | | | [OVH](/octodns/provider/ovh.py) | ovh | A, AAAA, CAA, CNAME, MX, NAPTR, NS, PTR, SPF, SRV, SSHFP, TXT, DKIM | No | | -| [PowerDnsProvider](/octodns/provider/powerdns.py) | | All | No | | +| [PowerDnsProvider](https://github.com/octodns/octodns-powerdns/) | | All | No | | | [Rackspace](/octodns/provider/rackspace.py) | | A, AAAA, ALIAS, CNAME, MX, NS, PTR, SPF, TXT | No | | | [Route53](/octodns/provider/route53.py) | boto3 | A, AAAA, CAA, CNAME, MX, NAPTR, NS, PTR, SPF, SRV, TXT | Both | CNAME health checks don't support a Host header | | [Selectel](/octodns/provider/selectel.py) | | A, AAAA, CNAME, MX, NS, SPF, SRV, TXT | No | | diff --git a/octodns/provider/powerdns.py b/octodns/provider/powerdns.py index 900e879..ebcd3b6 100644 --- a/octodns/provider/powerdns.py +++ b/octodns/provider/powerdns.py @@ -5,521 +5,18 @@ from __future__ import absolute_import, division, print_function, \ unicode_literals -from requests import HTTPError, Session -from operator import itemgetter -import logging - -from ..record import Create, Record -from .base import BaseProvider - - -class PowerDnsBaseProvider(BaseProvider): - SUPPORTS_GEO = False - SUPPORTS_DYNAMIC = False - SUPPORTS = set(('A', 'AAAA', 'ALIAS', 'CAA', 'CNAME', 'LOC', 'MX', 'NAPTR', - 'NS', 'PTR', 'SPF', 'SSHFP', 'SRV', 'TXT')) - TIMEOUT = 5 - - def __init__(self, id, host, api_key, port=8081, - scheme="http", timeout=TIMEOUT, *args, **kwargs): - super(PowerDnsBaseProvider, self).__init__(id, *args, **kwargs) - - self.host = host - self.port = port - self.scheme = scheme - self.timeout = timeout - - self._powerdns_version = None - - sess = Session() - sess.headers.update({'X-API-Key': api_key}) - self._sess = sess - - def _request(self, method, path, data=None): - self.log.debug('_request: method=%s, path=%s', method, path) - - url = f'{self.scheme}://{self.host}:{self.port}/api/v1/servers/' \ - f'localhost/{path}'.rstrip('/') - # Strip trailing / from url. - resp = self._sess.request(method, url, json=data, timeout=self.timeout) - self.log.debug('_request: status=%d', resp.status_code) - resp.raise_for_status() - return resp - - def _get(self, path, data=None): - return self._request('GET', path, data=data) - - def _post(self, path, data=None): - return self._request('POST', path, data=data) - - def _patch(self, path, data=None): - return self._request('PATCH', path, data=data) - - def _data_for_multiple(self, rrset): - # TODO: geo not supported - return { - 'type': rrset['type'], - 'values': [r['content'] for r in rrset['records']], - 'ttl': rrset['ttl'] - } - - _data_for_A = _data_for_multiple - _data_for_AAAA = _data_for_multiple - _data_for_NS = _data_for_multiple - - def _data_for_CAA(self, rrset): - values = [] - for record in rrset['records']: - flags, tag, value = record['content'].split(' ', 2) - values.append({ - 'flags': flags, - 'tag': tag, - 'value': value[1:-1], - }) - return { - 'type': rrset['type'], - 'values': values, - 'ttl': rrset['ttl'] - } - - def _data_for_single(self, rrset): - return { - 'type': rrset['type'], - 'value': rrset['records'][0]['content'], - 'ttl': rrset['ttl'] - } - - _data_for_ALIAS = _data_for_single - _data_for_CNAME = _data_for_single - _data_for_PTR = _data_for_single - - def _data_for_quoted(self, rrset): - return { - 'type': rrset['type'], - 'values': [r['content'][1:-1] for r in rrset['records']], - 'ttl': rrset['ttl'] - } - - _data_for_SPF = _data_for_quoted - _data_for_TXT = _data_for_quoted - - def _data_for_LOC(self, rrset): - values = [] - for record in rrset['records']: - lat_degrees, lat_minutes, lat_seconds, lat_direction, \ - long_degrees, long_minutes, long_seconds, long_direction, \ - altitude, size, precision_horz, precision_vert = \ - record['content'].replace('m', '').split(' ', 11) - values.append({ - 'lat_degrees': int(lat_degrees), - 'lat_minutes': int(lat_minutes), - 'lat_seconds': float(lat_seconds), - 'lat_direction': lat_direction, - 'long_degrees': int(long_degrees), - 'long_minutes': int(long_minutes), - 'long_seconds': float(long_seconds), - 'long_direction': long_direction, - 'altitude': float(altitude), - 'size': float(size), - 'precision_horz': float(precision_horz), - 'precision_vert': float(precision_vert), - }) - return { - 'ttl': rrset['ttl'], - 'type': rrset['type'], - 'values': values - } - - def _data_for_MX(self, rrset): - values = [] - for record in rrset['records']: - preference, exchange = record['content'].split(' ', 1) - values.append({ - 'preference': preference, - 'exchange': exchange, - }) - return { - 'type': rrset['type'], - 'values': values, - 'ttl': rrset['ttl'] - } - - def _data_for_NAPTR(self, rrset): - values = [] - for record in rrset['records']: - order, preference, flags, service, regexp, replacement = \ - record['content'].split(' ', 5) - values.append({ - 'order': order, - 'preference': preference, - 'flags': flags[1:-1], - 'service': service[1:-1], - 'regexp': regexp[1:-1], - 'replacement': replacement, - }) - return { - 'type': rrset['type'], - 'values': values, - 'ttl': rrset['ttl'] - } - - def _data_for_SSHFP(self, rrset): - values = [] - for record in rrset['records']: - algorithm, fingerprint_type, fingerprint = \ - record['content'].split(' ', 2) - values.append({ - 'algorithm': algorithm, - 'fingerprint_type': fingerprint_type, - 'fingerprint': fingerprint, - }) - return { - 'type': rrset['type'], - 'values': values, - 'ttl': rrset['ttl'] - } - - def _data_for_SRV(self, rrset): - values = [] - for record in rrset['records']: - priority, weight, port, target = \ - record['content'].split(' ', 3) - values.append({ - 'priority': priority, - 'weight': weight, - 'port': port, - 'target': target, - }) - return { - 'type': rrset['type'], - 'values': values, - 'ttl': rrset['ttl'] - } - - @property - def powerdns_version(self): - if self._powerdns_version is None: - try: - resp = self._get('') - except HTTPError as e: - if e.response.status_code == 401: - # Nicer error message for auth problems - raise Exception(f'PowerDNS unauthorized host={self.host}') - raise - - version = resp.json()['version'] - self.log.debug('powerdns_version: got version %s from server', - version) - # The extra `-` split is to handle pre-release and source built - # versions like 4.5.0-alpha0.435.master.gcb114252b - self._powerdns_version = [ - int(p.split('-')[0]) for p in version.split('.')[:3]] - - return self._powerdns_version - - @property - def soa_edit_api(self): - # >>> [4, 4, 3] >= [4, 3] - # True - # >>> [4, 3, 3] >= [4, 3] - # True - # >>> [4, 1, 3] >= [4, 3] - # False - if self.powerdns_version >= [4, 3]: - return 'DEFAULT' - return 'INCEPTION-INCREMENT' - - @property - def check_status_not_found(self): - # >=4.2.x returns 404 when not found - return self.powerdns_version >= [4, 2] - - def populate(self, zone, target=False, lenient=False): - self.log.debug('populate: name=%s, target=%s, lenient=%s', zone.name, - target, lenient) - - resp = None - try: - resp = self._get(f'zones/{zone.name}') - self.log.debug('populate: loaded') - except HTTPError as e: - error = self._get_error(e) - if e.response.status_code == 401: - # Nicer error message for auth problems - raise Exception(f'PowerDNS unauthorized host={self.host}') - elif e.response.status_code == 404 \ - and self.check_status_not_found: - # 404 means powerdns doesn't know anything about the requested - # domain. We'll just ignore it here and leave the zone - # untouched. - pass - elif e.response.status_code == 422 \ - and error.startswith('Could not find domain ') \ - and not self.check_status_not_found: - # 422 means powerdns doesn't know anything about the requested - # domain. We'll just ignore it here and leave the zone - # untouched. - pass - else: - # just re-throw - raise - - before = len(zone.records) - exists = False - - if resp: - exists = True - for rrset in resp.json()['rrsets']: - _type = rrset['type'] - if _type == 'SOA': - continue - data_for = getattr(self, f'_data_for_{_type}') - record_name = zone.hostname_from_fqdn(rrset['name']) - record = Record.new(zone, record_name, data_for(rrset), - source=self, lenient=lenient) - zone.add_record(record, lenient=lenient) - - self.log.info('populate: found %s records, exists=%s', - len(zone.records) - before, exists) - return exists - - def _records_for_multiple(self, record): - return [{'content': v, 'disabled': False} - for v in record.values] - - _records_for_A = _records_for_multiple - _records_for_AAAA = _records_for_multiple - _records_for_NS = _records_for_multiple - - def _records_for_CAA(self, record): - return [{ - 'content': f'{v.flags} {v.tag} "{v.value}"', - 'disabled': False - } for v in record.values] - - def _records_for_single(self, record): - return [{'content': record.value, 'disabled': False}] - - _records_for_ALIAS = _records_for_single - _records_for_CNAME = _records_for_single - _records_for_PTR = _records_for_single - - def _records_for_quoted(self, record): - return [{'content': f'"{v}"', 'disabled': False} - for v in record.values] - - _records_for_SPF = _records_for_quoted - _records_for_TXT = _records_for_quoted - - def _records_for_LOC(self, record): - return [{ - 'content': - '%d %d %0.3f %s %d %d %.3f %s %0.2fm %0.2fm %0.2fm %0.2fm' % - ( - int(v.lat_degrees), - int(v.lat_minutes), - float(v.lat_seconds), - v.lat_direction, - int(v.long_degrees), - int(v.long_minutes), - float(v.long_seconds), - v.long_direction, - float(v.altitude), - float(v.size), - float(v.precision_horz), - float(v.precision_vert) - ), - 'disabled': False - } for v in record.values] - - def _records_for_MX(self, record): - return [{ - 'content': f'{v.preference} {v.exchange}', - 'disabled': False - } for v in record.values] - - def _records_for_NAPTR(self, record): - return [{ - 'content': f'{v.order} {v.preference} "{v.flags}" "{v.service}" ' - f'"{v.regexp}" {v.replacement}', - 'disabled': False - } for v in record.values] - - def _records_for_SSHFP(self, record): - return [{ - 'content': f'{v.algorithm} {v.fingerprint_type} {v.fingerprint}', - 'disabled': False - } for v in record.values] - - def _records_for_SRV(self, record): - return [{ - 'content': f'{v.priority} {v.weight} {v.port} {v.target}', - 'disabled': False - } for v in record.values] - - def _mod_Create(self, change): - new = change.new - records_for = getattr(self, f'_records_for_{new._type}') - return { - 'name': new.fqdn, - 'type': new._type, - 'ttl': new.ttl, - 'changetype': 'REPLACE', - 'records': records_for(new) - } - - _mod_Update = _mod_Create - - def _mod_Delete(self, change): - existing = change.existing - records_for = getattr(self, f'_records_for_{existing._type}') - return { - 'name': existing.fqdn, - 'type': existing._type, - 'ttl': existing.ttl, - 'changetype': 'DELETE', - 'records': records_for(existing) - } - - def _get_nameserver_record(self, existing): - return None - - def _extra_changes(self, existing, **kwargs): - self.log.debug('_extra_changes: zone=%s', existing.name) - - ns = self._get_nameserver_record(existing) - if not ns: - return [] - - # sorting mostly to make things deterministic for testing, but in - # theory it let us find what we're after quicker (though sorting would - # be more expensive.) - for record in sorted(existing.records): - if record == ns: - # We've found the top-level NS record, return any changes - change = record.changes(ns, self) - self.log.debug('_extra_changes: change=%s', change) - if change: - # We need to modify an existing record - return [change] - # No change is necessary - return [] - # No existing top-level NS - self.log.debug('_extra_changes: create') - return [Create(ns)] - - def _get_error(self, http_error): - try: - return http_error.response.json()['error'] - except Exception: - return '' - - def _apply(self, plan): - desired = plan.desired - changes = plan.changes - self.log.debug('_apply: zone=%s, len(changes)=%d', desired.name, - len(changes)) - - mods = [] - for change in changes: - class_name = change.__class__.__name__ - mods.append(getattr(self, f'_mod_{class_name}')(change)) - - # Ensure that any DELETE modifications always occur before any REPLACE - # modifications. This ensures that an A record can be replaced by a - # CNAME record and vice-versa. - mods.sort(key=itemgetter('changetype')) - - self.log.debug('_apply: sending change request') - - try: - self._patch(f'zones/{desired.name}', data={'rrsets': mods}) - self.log.debug('_apply: patched') - except HTTPError as e: - error = self._get_error(e) - if not ( - ( - e.response.status_code == 404 and - self.check_status_not_found - ) or ( - e.response.status_code == 422 and - error.startswith('Could not find domain ') and - not self.check_status_not_found - ) - ): - self.log.error( - '_apply: status=%d, text=%s', - e.response.status_code, - e.response.text) - raise - - self.log.info('_apply: creating zone=%s', desired.name) - # 404 or 422 means powerdns doesn't know anything about the - # requested domain. We'll try to create it with the correct - # records instead of update. Hopefully all the mods are - # creates :-) - data = { - 'name': desired.name, - 'kind': 'Master', - 'masters': [], - 'nameservers': [], - 'rrsets': mods, - 'soa_edit_api': self.soa_edit_api, - 'serial': 0, - } - try: - self._post('zones', data) - except HTTPError as e: - self.log.error('_apply: status=%d, text=%s', - e.response.status_code, - e.response.text) - raise - self.log.debug('_apply: created') - - self.log.debug('_apply: complete') - - -class PowerDnsProvider(PowerDnsBaseProvider): - ''' - PowerDNS API v4 Provider - - powerdns: - class: octodns.provider.powerdns.PowerDnsProvider - # The host on which PowerDNS api is listening (required) - host: fqdn - # The api key that grans access (required) - api_key: api-key - # The port on which PowerDNS api is listening (optional, default 8081) - port: 8081 - # The nameservers to use for this provider (optional, - # default unmanaged) - nameserver_values: - - 1.2.3.4. - - 1.2.3.5. - # The nameserver record TTL when managed, (optional, default 600) - nameserver_ttl: 600 - ''' - - def __init__(self, id, host, api_key, port=8081, nameserver_values=None, - nameserver_ttl=600, - *args, **kwargs): - self.log = logging.getLogger(f'PowerDnsProvider[{id}]') - self.log.debug('__init__: id=%s, host=%s, port=%d, ' - 'nameserver_values=%s, nameserver_ttl=%d', - id, host, port, nameserver_values, nameserver_ttl) - super(PowerDnsProvider, self).__init__(id, host=host, api_key=api_key, - port=port, - *args, **kwargs) - - self.nameserver_values = nameserver_values - self.nameserver_ttl = nameserver_ttl - - def _get_nameserver_record(self, existing): - if self.nameserver_values: - return Record.new(existing, '', { - 'type': 'NS', - 'ttl': self.nameserver_ttl, - 'values': self.nameserver_values, - }, source=self) - - return super(PowerDnsProvider, self)._get_nameserver_record(existing) +from logging import getLogger + +logger = getLogger('PowerDNS') +try: + logger.warn('octodns_powerdns shimmed. Update your provider class to ' + 'octodns_powerdns.PowerDnsProvider. ' + 'Shim will be removed in 1.0') + from octodns_powerdns import PowerDnsProvider, PowerDnsBaseProvider + PowerDnsProvider # pragma: no cover + PowerDnsBaseProvider # pragma: no cover +except ModuleNotFoundError: + logger.exception('PowerDnsProvider has been moved into a seperate module, ' + 'octodns_powerdns is now required. Provider class should ' + 'be updated to octodns_powerdns.PowerDnsProvider') + raise diff --git a/tests/fixtures/powerdns-full-data.json b/tests/fixtures/powerdns-full-data.json deleted file mode 100644 index 8feda7e..0000000 --- a/tests/fixtures/powerdns-full-data.json +++ /dev/null @@ -1,303 +0,0 @@ -{ - "account": "", - "dnssec": false, - "id": "unit.tests.", - "kind": "Master", - "last_check": 0, - "masters": [], - "name": "unit.tests.", - "notified_serial": 2017012803, - "rrsets": [ - { - "comments": [], - "name": "mx.unit.tests.", - "records": [ - { - "content": "40 smtp-1.unit.tests.", - "disabled": false - }, - { - "content": "20 smtp-2.unit.tests.", - "disabled": false - }, - { - "content": "30 smtp-3.unit.tests.", - "disabled": false - }, - { - "content": "10 smtp-4.unit.tests.", - "disabled": false - } - ], - "ttl": 300, - "type": "MX" - }, - { - "comments": [], - "name": "loc.unit.tests.", - "records": [ - { - "content": "31 58 52.100 S 115 49 11.700 E 20.00m 10.00m 10.00m 2.00m", - "disabled": false - }, - { - "content": "53 13 10.000 N 2 18 26.000 W 20.00m 10.00m 1000.00m 2.00m", - "disabled": false - } - ], - "ttl": 300, - "type": "LOC" - }, - { - "comments": [], - "name": "sub.unit.tests.", - "records": [ - { - "content": "6.2.3.4.", - "disabled": false - }, { - "content": "7.2.3.4.", - "disabled": false - } - ], - "ttl": 3600, - "type": "NS" - }, - { - "comments": [], - "name": "www.unit.tests.", - "records": [ - { - "content": "2.2.3.6", - "disabled": false - } - ], - "ttl": 300, - "type": "A" - }, - { - "comments": [], - "name": "_imap._tcp.unit.tests.", - "records": [ - { - "content": "0 0 0 .", - "disabled": false - } - ], - "ttl": 600, - "type": "SRV" - }, - { - "comments": [], - "name": "_pop3._tcp.unit.tests.", - "records": [ - { - "content": "0 0 0 .", - "disabled": false - } - ], - "ttl": 600, - "type": "SRV" - }, - { - "comments": [], - "name": "_srv._tcp.unit.tests.", - "records": [ - { - "content": "10 20 30 foo-1.unit.tests.", - "disabled": false - }, - { - "content": "12 20 30 foo-2.unit.tests.", - "disabled": false - } - ], - "ttl": 600, - "type": "SRV" - }, - { - "comments": [], - "name": "txt.unit.tests.", - "records": [ - { - "content": "\"Bah bah black sheep\"", - "disabled": false - }, - { - "content": "\"have you any wool.\"", - "disabled": false - }, - { - "content": "\"v=DKIM1\\;k=rsa\\;s=email\\;h=sha256\\;p=A/kinda+of/long/string+with+numb3rs\"", - "disabled": false - } - ], - "ttl": 600, - "type": "TXT" - }, - { - "comments": [], - "name": "naptr.unit.tests.", - "records": [ - { - "content": "10 100 \"S\" \"SIP+D2U\" \"!^.*$!sip:info@bar.example.com!\" .", - "disabled": false - }, - { - "content": "100 100 \"U\" \"SIP+D2U\" \"!^.*$!sip:info@bar.example.com!\" .", - "disabled": false - } - ], - "ttl": 600, - "type": "NAPTR" - }, - { - "comments": [], - "name": "ptr.unit.tests.", - "records": [ - { - "content": "foo.bar.com.", - "disabled": false - } - ], - "ttl": 300, - "type": "PTR" - }, - { - "comments": [], - "name": "spf.unit.tests.", - "records": [ - { - "content": "\"v=spf1 ip4:192.168.0.1/16-all\"", - "disabled": false - } - ], - "ttl": 600, - "type": "SPF" - }, - { - "comments": [], - "name": "cname.unit.tests.", - "records": [ - { - "content": "unit.tests.", - "disabled": false - } - ], - "ttl": 300, - "type": "CNAME" - }, - { - "comments": [], - "name": "www.sub.unit.tests.", - "records": [ - { - "content": "2.2.3.6", - "disabled": false - } - ], - "ttl": 300, - "type": "A" - }, - { - "comments": [], - "name": "aaaa.unit.tests.", - "records": [ - { - "content": "2601:644:500:e210:62f8:1dff:feb8:947a", - "disabled": false - } - ], - "ttl": 600, - "type": "AAAA" - }, - { - "comments": [], - "name": "unit.tests.", - "records": [ - { - "content": "1 1 7491973e5f8b39d5327cd4e08bc81b05f7710b49", - "disabled": false - }, - { - "content": "1 1 bf6b6825d2977c511a475bbefb88aad54a92ac73", - "disabled": false - } - ], - "ttl": 3600, - "type": "SSHFP" - }, - { - "comments": [], - "name": "unit.tests.", - "records": [ - { - "content": "ns1.ext.unit.tests. hostmaster.unit.tests. 2017012803 3600 600 604800 60", - "disabled": false - } - ], - "ttl": 3600, - "type": "SOA" - }, - { - "comments": [], - "name": "unit.tests.", - "records": [ - { - "content": "1.1.1.1.", - "disabled": false - }, - { - "content": "4.4.4.4.", - "disabled": false - } - ], - "ttl": 600, - "type": "NS" - }, - { - "comments": [], - "name": "unit.tests.", - "records": [ - { - "content": "1.2.3.5", - "disabled": false - }, - { - "content": "1.2.3.4", - "disabled": false - } - ], - "ttl": 300, - "type": "A" - }, - { - "comments": [], - "name": "unit.tests.", - "records": [ - { - "content": "0 issue \"ca.unit.tests\"", - "disabled": false - } - ], - "ttl": 3600, - "type": "CAA" - }, - { - "comments": [], - "name": "included.unit.tests.", - "records": [ - { - "content": "unit.tests.", - "disabled": false - } - ], - "ttl": 3600, - "type": "CNAME" - } - ], - "serial": 2017012803, - "soa_edit": "", - "soa_edit_api": "INCEPTION-INCREMENT", - "url": "api/v1/servers/localhost/zones/unit.tests." -} diff --git a/tests/test_octodns_provider_powerdns.py b/tests/test_octodns_provider_powerdns.py index c234713..464efef 100644 --- a/tests/test_octodns_provider_powerdns.py +++ b/tests/test_octodns_provider_powerdns.py @@ -5,416 +5,12 @@ from __future__ import absolute_import, division, print_function, \ unicode_literals -from json import loads, dumps -from os.path import dirname, join -from requests import HTTPError -from requests_mock import ANY, mock as requests_mock from unittest import TestCase -from octodns.record import Record -from octodns.provider.powerdns import PowerDnsProvider -from octodns.provider.yaml import YamlProvider -from octodns.zone import Zone -EMPTY_TEXT = ''' -{ - "account": "", - "dnssec": false, - "id": "xunit.tests.", - "kind": "Master", - "last_check": 0, - "masters": [], - "name": "xunit.tests.", - "notified_serial": 0, - "rrsets": [], - "serial": 2017012801, - "soa_edit": "", - "soa_edit_api": "INCEPTION-INCREMENT", - "url": "api/v1/servers/localhost/zones/xunit.tests." -} -''' +class TestPowerDnsShim(TestCase): -with open('./tests/fixtures/powerdns-full-data.json') as fh: - FULL_TEXT = fh.read() - - -class TestPowerDnsProvider(TestCase): - - def test_provider_version_detection(self): - provider = PowerDnsProvider('test', 'non.existent', 'api-key', - nameserver_values=['8.8.8.8.', - '9.9.9.9.']) - # Bad auth - with requests_mock() as mock: - mock.get(ANY, status_code=401, text='Unauthorized') - - with self.assertRaises(Exception) as ctx: - provider.powerdns_version - self.assertTrue('unauthorized' in str(ctx.exception)) - - # Api not found - with requests_mock() as mock: - mock.get(ANY, status_code=404, text='Not Found') - - with self.assertRaises(Exception) as ctx: - provider.powerdns_version - self.assertTrue('404' in str(ctx.exception)) - - # Test version detection - with requests_mock() as mock: - mock.get('http://non.existent:8081/api/v1/servers/localhost', - status_code=200, json={'version': "4.1.10"}) - self.assertEquals(provider.powerdns_version, [4, 1, 10]) - - # Test version detection for second time (should stay at 4.1.10) - with requests_mock() as mock: - mock.get('http://non.existent:8081/api/v1/servers/localhost', - status_code=200, json={'version': "4.2.0"}) - self.assertEquals(provider.powerdns_version, [4, 1, 10]) - - # Test version detection - with requests_mock() as mock: - mock.get('http://non.existent:8081/api/v1/servers/localhost', - status_code=200, json={'version': "4.2.0"}) - - # Reset version, so detection will try again - provider._powerdns_version = None - self.assertNotEquals(provider.powerdns_version, [4, 1, 10]) - - # Test version detection with pre-releases - with requests_mock() as mock: - # Reset version, so detection will try again - provider._powerdns_version = None - mock.get('http://non.existent:8081/api/v1/servers/localhost', - status_code=200, json={'version': "4.4.0-alpha1"}) - self.assertEquals(provider.powerdns_version, [4, 4, 0]) - - provider._powerdns_version = None - mock.get('http://non.existent:8081/api/v1/servers/localhost', - status_code=200, - json={'version': "4.5.0-alpha0.435.master.gcb114252b"}) - self.assertEquals(provider.powerdns_version, [4, 5, 0]) - - def test_provider_version_config(self): - provider = PowerDnsProvider('test', 'non.existent', 'api-key', - nameserver_values=['8.8.8.8.', - '9.9.9.9.']) - - # Test version 4.1.0 - provider._powerdns_version = None - with requests_mock() as mock: - mock.get('http://non.existent:8081/api/v1/servers/localhost', - status_code=200, json={'version': "4.1.10"}) - self.assertEquals(provider.soa_edit_api, 'INCEPTION-INCREMENT') - self.assertFalse( - provider.check_status_not_found, - 'check_status_not_found should be false ' - 'for version 4.1.x and below') - - # Test version 4.2.0 - provider._powerdns_version = None - with requests_mock() as mock: - mock.get('http://non.existent:8081/api/v1/servers/localhost', - status_code=200, json={'version': "4.2.0"}) - self.assertEquals(provider.soa_edit_api, 'INCEPTION-INCREMENT') - self.assertTrue( - provider.check_status_not_found, - 'check_status_not_found should be true for version 4.2.x') - - # Test version 4.3.0 - provider._powerdns_version = None - with requests_mock() as mock: - mock.get('http://non.existent:8081/api/v1/servers/localhost', - status_code=200, json={'version': "4.3.0"}) - self.assertEquals(provider.soa_edit_api, 'DEFAULT') - self.assertTrue( - provider.check_status_not_found, - 'check_status_not_found should be true for version 4.3.x') - - def test_provider(self): - provider = PowerDnsProvider('test', 'non.existent', 'api-key', - nameserver_values=['8.8.8.8.', - '9.9.9.9.']) - - # Test version detection - with requests_mock() as mock: - mock.get('http://non.existent:8081/api/v1/servers/localhost', - status_code=200, json={'version': "4.1.10"}) - self.assertEquals(provider.powerdns_version, [4, 1, 10]) - - # Bad auth - with requests_mock() as mock: - mock.get(ANY, status_code=401, text='Unauthorized') - - with self.assertRaises(Exception) as ctx: - zone = Zone('unit.tests.', []) - provider.populate(zone) - self.assertTrue('unauthorized' in str(ctx.exception)) - - # General error - with requests_mock() as mock: - mock.get(ANY, status_code=502, text='Things caught fire') - - with self.assertRaises(HTTPError) as ctx: - zone = Zone('unit.tests.', []) - provider.populate(zone) - self.assertEquals(502, ctx.exception.response.status_code) - - # Non-existent zone in PowerDNS <4.3.0 doesn't populate anything - with requests_mock() as mock: - mock.get(ANY, status_code=422, - json={'error': "Could not find domain 'unit.tests.'"}) - zone = Zone('unit.tests.', []) - provider.populate(zone) - self.assertEquals(set(), zone.records) - - # Non-existent zone in PowerDNS >=4.2.0 doesn't populate anything - - provider._powerdns_version = [4, 2, 0] - with requests_mock() as mock: - mock.get(ANY, status_code=404, text='Not Found') - zone = Zone('unit.tests.', []) - provider.populate(zone) - self.assertEquals(set(), zone.records) - - provider._powerdns_version = [4, 1, 0] - - # The rest of this is messy/complicated b/c it's dealing with mocking - - expected = Zone('unit.tests.', []) - source = YamlProvider('test', join(dirname(__file__), 'config')) - source.populate(expected) - expected_n = len(expected.records) - 4 - self.assertEquals(19, expected_n) - - # No diffs == no changes - with requests_mock() as mock: - mock.get(ANY, status_code=200, text=FULL_TEXT) - - zone = Zone('unit.tests.', []) - provider.populate(zone) - self.assertEquals(19, len(zone.records)) - changes = expected.changes(zone, provider) - self.assertEquals(0, len(changes)) - - # Used in a minute - def assert_rrsets_callback(request, context): - data = loads(request.body) - self.assertEquals(expected_n, len(data['rrsets'])) - return '' - - # No existing records -> creates for every record in expected - with requests_mock() as mock: - mock.get(ANY, status_code=200, text=EMPTY_TEXT) - # post 201, is response to the create with data - mock.patch(ANY, status_code=201, text=assert_rrsets_callback) - - plan = provider.plan(expected) - self.assertEquals(expected_n, len(plan.changes)) - self.assertEquals(expected_n, provider.apply(plan)) - self.assertTrue(plan.exists) - - # Non-existent zone -> creates for every record in expected - # OMG this is fucking ugly, probably better to ditch requests_mocks and - # just mock things for real as it doesn't seem to provide a way to get - # at the request params or verify that things were called from what I - # can tell - not_found = {'error': "Could not find domain 'unit.tests.'"} - with requests_mock() as mock: - # get 422's, unknown zone - mock.get(ANY, status_code=422, text=dumps(not_found)) - # patch 422's, unknown zone - mock.patch(ANY, status_code=422, text=dumps(not_found)) - # post 201, is response to the create with data - mock.post(ANY, status_code=201, text=assert_rrsets_callback) - - plan = provider.plan(expected) - self.assertEquals(expected_n, len(plan.changes)) - self.assertEquals(expected_n, provider.apply(plan)) - self.assertFalse(plan.exists) - - provider._powerdns_version = [4, 2, 0] - with requests_mock() as mock: - # get 404's, unknown zone - mock.get(ANY, status_code=404, text='') - # patch 404's, unknown zone - mock.patch(ANY, status_code=404, text=dumps(not_found)) - # post 201, is response to the create with data - mock.post(ANY, status_code=201, text=assert_rrsets_callback) - - plan = provider.plan(expected) - self.assertEquals(expected_n, len(plan.changes)) - self.assertEquals(expected_n, provider.apply(plan)) - self.assertFalse(plan.exists) - - provider._powerdns_version = [4, 1, 0] - with requests_mock() as mock: - # get 422's, unknown zone - mock.get(ANY, status_code=422, text=dumps(not_found)) - # patch 422's, - data = {'error': "Key 'name' not present or not a String"} - mock.patch(ANY, status_code=422, text=dumps(data)) - - with self.assertRaises(HTTPError) as ctx: - plan = provider.plan(expected) - provider.apply(plan) - response = ctx.exception.response - self.assertEquals(422, response.status_code) - self.assertTrue('error' in response.json()) - - with requests_mock() as mock: - # get 422's, unknown zone - mock.get(ANY, status_code=422, text=dumps(not_found)) - # patch 500's, things just blew up - mock.patch(ANY, status_code=500, text='') - - with self.assertRaises(HTTPError): - plan = provider.plan(expected) - provider.apply(plan) - - with requests_mock() as mock: - # get 422's, unknown zone - mock.get(ANY, status_code=422, text=dumps(not_found)) - # patch 500's, things just blew up - mock.patch(ANY, status_code=422, text=dumps(not_found)) - # post 422's, something wrong with create - mock.post(ANY, status_code=422, text='Hello Word!') - - with self.assertRaises(HTTPError): - plan = provider.plan(expected) - provider.apply(plan) - - def test_small_change(self): - provider = PowerDnsProvider('test', 'non.existent', 'api-key') - - expected = Zone('unit.tests.', []) - source = YamlProvider('test', join(dirname(__file__), 'config')) - source.populate(expected) - self.assertEquals(23, len(expected.records)) - - # A small change to a single record - with requests_mock() as mock: - mock.get(ANY, status_code=200, text=FULL_TEXT) - mock.get('http://non.existent:8081/api/v1/servers/localhost', - status_code=200, json={'version': '4.1.0'}) - - missing = Zone(expected.name, []) - # Find and delete the SPF record - for record in expected.records: - if record._type != 'SPF': - missing.add_record(record) - - def assert_delete_callback(request, context): - self.assertEquals({ - 'rrsets': [{ - 'records': [ - {'content': '"v=spf1 ip4:192.168.0.1/16-all"', - 'disabled': False} - ], - 'changetype': 'DELETE', - 'type': 'SPF', - 'name': 'spf.unit.tests.', - 'ttl': 600 - }] - }, loads(request.body)) - return '' - - mock.patch(ANY, status_code=201, text=assert_delete_callback) - - plan = provider.plan(missing) - self.assertEquals(1, len(plan.changes)) - self.assertEquals(1, provider.apply(plan)) - - def test_existing_nameservers(self): - ns_values = ['8.8.8.8.', '9.9.9.9.'] - provider = PowerDnsProvider('test', 'non.existent', 'api-key', - nameserver_values=ns_values) - - expected = Zone('unit.tests.', []) - ns_record = Record.new(expected, '', { - 'type': 'NS', - 'ttl': 600, - 'values': ns_values - }) - expected.add_record(ns_record) - - # no changes - with requests_mock() as mock: - data = { - 'rrsets': [{ - 'comments': [], - 'name': 'unit.tests.', - 'records': [ - { - 'content': '8.8.8.8.', - 'disabled': False - }, - { - 'content': '9.9.9.9.', - 'disabled': False - } - ], - 'ttl': 600, - 'type': 'NS' - }, { - 'comments': [], - 'name': 'unit.tests.', - 'records': [{ - 'content': '1.2.3.4', - 'disabled': False, - }], - 'ttl': 60, - 'type': 'A' - }] - } - mock.get(ANY, status_code=200, json=data) - mock.get('http://non.existent:8081/api/v1/servers/localhost', - status_code=200, json={'version': '4.1.0'}) - - unrelated_record = Record.new(expected, '', { - 'type': 'A', - 'ttl': 60, - 'value': '1.2.3.4' - }) - expected.add_record(unrelated_record) - plan = provider.plan(expected) - self.assertFalse(plan) - # remove it now that we don't need the unrelated change any longer - expected._remove_record(unrelated_record) - - # ttl diff - with requests_mock() as mock: - data = { - 'rrsets': [{ - 'comments': [], - 'name': 'unit.tests.', - 'records': [ - { - 'content': '8.8.8.8.', - 'disabled': False - }, - { - 'content': '9.9.9.9.', - 'disabled': False - }, - ], - 'ttl': 3600, - 'type': 'NS' - }] - } - mock.get(ANY, status_code=200, json=data) - mock.get('http://non.existent:8081/api/v1/servers/localhost', - status_code=200, json={'version': '4.1.0'}) - - plan = provider.plan(expected) - self.assertEquals(1, len(plan.changes)) - - # create - with requests_mock() as mock: - data = { - 'rrsets': [] - } - mock.get(ANY, status_code=200, json=data) - - plan = provider.plan(expected) - self.assertEquals(1, len(plan.changes)) + def test_missing(self): + with self.assertRaises(ModuleNotFoundError): + from octodns.provider.powerdns import PowerDnsProvider + PowerDnsProvider