Browse Source

Merge branch 'master' into pool-value-up

pull/780/head
Ross McFarland 4 years ago
committed by GitHub
parent
commit
8c0065b7c6
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 192 additions and 216 deletions
  1. +3
    -5
      octodns/cmds/report.py
  2. +2
    -3
      octodns/manager.py
  3. +1
    -3
      octodns/provider/base.py
  4. +1
    -2
      octodns/provider/constellix.py
  5. +1
    -1
      octodns/provider/edgedns.py
  6. +4
    -6
      octodns/provider/ns1.py
  7. +1
    -2
      octodns/provider/ovh.py
  8. +11
    -11
      octodns/provider/plan.py
  9. +3
    -5
      octodns/provider/route53.py
  10. +8
    -8
      octodns/record/__init__.py
  11. +1
    -2
      octodns/source/axfr.py
  12. +1
    -3
      octodns/zone.py
  13. +0
    -1
      requirements.txt
  14. +31
    -34
      tests/test_octodns_manager.py
  15. +2
    -3
      tests/test_octodns_plan.py
  16. +6
    -10
      tests/test_octodns_provider_azuredns.py
  17. +17
    -18
      tests/test_octodns_provider_base.py
  18. +6
    -7
      tests/test_octodns_provider_cloudflare.py
  19. +2
    -3
      tests/test_octodns_provider_constellix.py
  20. +1
    -2
      tests/test_octodns_provider_digitalocean.py
  21. +1
    -2
      tests/test_octodns_provider_dnsimple.py
  22. +2
    -4
      tests/test_octodns_provider_dnsmadeeasy.py
  23. +4
    -5
      tests/test_octodns_provider_easydns.py
  24. +1
    -2
      tests/test_octodns_provider_edgedns.py
  25. +5
    -6
      tests/test_octodns_provider_gandi.py
  26. +5
    -6
      tests/test_octodns_provider_gcore.py
  27. +1
    -2
      tests/test_octodns_provider_hetzner.py
  28. +9
    -16
      tests/test_octodns_provider_mythicbeasts.py
  29. +5
    -8
      tests/test_octodns_provider_ns1.py
  30. +3
    -4
      tests/test_octodns_provider_powerdns.py
  31. +2
    -3
      tests/test_octodns_provider_rackspace.py
  32. +1
    -2
      tests/test_octodns_provider_route53.py
  33. +1
    -2
      tests/test_octodns_provider_selectel.py
  34. +5
    -6
      tests/test_octodns_provider_ultra.py
  35. +2
    -3
      tests/test_octodns_provider_yaml.py
  36. +33
    -3
      tests/test_octodns_record.py
  37. +3
    -4
      tests/test_octodns_source_axfr.py
  38. +1
    -2
      tests/test_octodns_source_envvar.py
  39. +1
    -1
      tests/test_octodns_yaml.py
  40. +5
    -6
      tests/test_octodns_zone.py

+ 3
- 5
octodns/cmds/report.py View File

@ -13,8 +13,6 @@ from logging import getLogger
from sys import stdout from sys import stdout
import re import re
from six import text_type
from octodns.cmds.args import ArgumentParser from octodns.cmds.args import ArgumentParser
from octodns.manager import Manager from octodns.manager import Manager
@ -67,7 +65,7 @@ def main():
resolver = AsyncResolver(configure=False, resolver = AsyncResolver(configure=False,
num_workers=int(args.num_workers)) num_workers=int(args.num_workers))
if not ip_addr_re.match(server): if not ip_addr_re.match(server):
server = text_type(query(server, 'A')[0])
server = str(query(server, 'A')[0])
log.info('server=%s', server) log.info('server=%s', server)
resolver.nameservers = [server] resolver.nameservers = [server]
resolver.lifetime = int(args.timeout) resolver.lifetime = int(args.timeout)
@ -83,12 +81,12 @@ def main():
stdout.write(',') stdout.write(',')
stdout.write(record._type) stdout.write(record._type)
stdout.write(',') stdout.write(',')
stdout.write(text_type(record.ttl))
stdout.write(str(record.ttl))
compare = {} compare = {}
for future in futures: for future in futures:
stdout.write(',') stdout.write(',')
try: try:
answers = [text_type(r) for r in future.result()]
answers = [str(r) for r in future.result()]
except (NoAnswer, NoNameservers): except (NoAnswer, NoNameservers):
answers = ['*no answer*'] answers = ['*no answer*']
except NXDOMAIN: except NXDOMAIN:


+ 2
- 3
octodns/manager.py View File

@ -8,7 +8,6 @@ from __future__ import absolute_import, division, print_function, \
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from importlib import import_module from importlib import import_module
from os import environ from os import environ
from six import text_type
from sys import stdout from sys import stdout
import logging import logging
@ -259,7 +258,7 @@ class Manager(object):
source.populate(zone, lenient=lenient) source.populate(zone, lenient=lenient)
except TypeError as e: except TypeError as e:
if ("unexpected keyword argument 'lenient'" if ("unexpected keyword argument 'lenient'"
not in text_type(e)):
not in str(e)):
raise raise
self.log.warn('provider %s does not accept lenient ' self.log.warn('provider %s does not accept lenient '
'param', source.__class__.__name__) 'param', source.__class__.__name__)
@ -282,7 +281,7 @@ class Manager(object):
try: try:
plan = target.plan(zone, processors=processors) plan = target.plan(zone, processors=processors)
except TypeError as e: except TypeError as e:
if "keyword argument 'processors'" not in text_type(e):
if "keyword argument 'processors'" not in str(e):
raise raise
self.log.warn('provider.plan %s does not accept processors ' self.log.warn('provider.plan %s does not accept processors '
'param', target.__class__.__name__) 'param', target.__class__.__name__)


+ 1
- 3
octodns/provider/base.py View File

@ -5,8 +5,6 @@
from __future__ import absolute_import, division, print_function, \ from __future__ import absolute_import, division, print_function, \
unicode_literals unicode_literals
from six import text_type
from ..source.base import BaseSource from ..source.base import BaseSource
from ..zone import Zone from ..zone import Zone
from .plan import Plan from .plan import Plan
@ -155,7 +153,7 @@ class BaseProvider(BaseSource):
changes=changes) changes=changes)
if extra: if extra:
self.log.info('plan: extra changes\n %s', '\n ' self.log.info('plan: extra changes\n %s', '\n '
.join([text_type(c) for c in extra]))
.join([str(c) for c in extra]))
changes += extra changes += extra
if changes: if changes:


+ 1
- 2
octodns/provider/constellix.py View File

@ -8,7 +8,6 @@ from __future__ import absolute_import, division, print_function, \
from collections import defaultdict from collections import defaultdict
from requests import Session from requests import Session
from base64 import b64encode from base64 import b64encode
from six import string_types
from pycountry_convert import country_alpha2_to_continent_code from pycountry_convert import country_alpha2_to_continent_code
import hashlib import hashlib
import hmac import hmac
@ -142,7 +141,7 @@ class ConstellixClient(object):
# change relative values to absolute # change relative values to absolute
value = record['value'] value = record['value']
if record['type'] in ['ALIAS', 'CNAME', 'MX', 'NS', 'SRV']: if record['type'] in ['ALIAS', 'CNAME', 'MX', 'NS', 'SRV']:
if isinstance(value, string_types):
if isinstance(value, str):
record['value'] = self._absolutize_value(value, record['value'] = self._absolutize_value(value,
zone_name) zone_name)
if isinstance(value, list): if isinstance(value, list):


+ 1
- 1
octodns/provider/edgedns.py View File

@ -7,8 +7,8 @@ from __future__ import absolute_import, division, print_function, \
from requests import Session from requests import Session
from akamai.edgegrid import EdgeGridAuth from akamai.edgegrid import EdgeGridAuth
from six.moves.urllib.parse import urljoin
from collections import defaultdict from collections import defaultdict
from urllib.parse import urljoin
from logging import getLogger from logging import getLogger
from ..record import Record from ..record import Record


+ 4
- 6
octodns/provider/ns1.py View File

@ -14,8 +14,6 @@ from pycountry_convert import country_alpha2_to_continent_code
from time import sleep from time import sleep
from uuid import uuid4 from uuid import uuid4
from six import text_type
from ..record import Record, Update from ..record import Record, Update
from . import ProviderException from . import ProviderException
from .base import BaseProvider from .base import BaseProvider
@ -529,9 +527,9 @@ class Ns1Provider(BaseProvider):
else: else:
values.extend(answer['answer']) values.extend(answer['answer'])
codes.append([]) codes.append([])
values = [text_type(x) for x in values]
values = [str(x) for x in values]
geo = OrderedDict( geo = OrderedDict(
{text_type(k): [text_type(x) for x in v] for k, v in geo.items()}
{str(k): [str(x) for x in v] for k, v in geo.items()}
) )
data['values'] = values data['values'] = values
data['geo'] = geo data['geo'] = geo
@ -565,7 +563,7 @@ class Ns1Provider(BaseProvider):
meta = answer['meta'] meta = answer['meta']
notes = self._parse_notes(meta.get('note', '')) notes = self._parse_notes(meta.get('note', ''))
value = text_type(answer['answer'][0])
value = str(answer['answer'][0])
if notes.get('from', False) == '--default--': if notes.get('from', False) == '--default--':
# It's a final/default value, record it and move on # It's a final/default value, record it and move on
default.add(value) default.add(value)
@ -727,7 +725,7 @@ class Ns1Provider(BaseProvider):
return { return {
'ttl': record['ttl'], 'ttl': record['ttl'],
'type': _type, 'type': _type,
'values': [text_type(x) for x in record['short_answers']]
'values': [str(x) for x in record['short_answers']]
} }
_data_for_AAAA = _data_for_A _data_for_AAAA = _data_for_A


+ 1
- 2
octodns/provider/ovh.py View File

@ -9,7 +9,6 @@ import base64
import binascii import binascii
import logging import logging
from collections import defaultdict from collections import defaultdict
from six import text_type
import ovh import ovh
from ovh import ResourceNotFoundError from ovh import ResourceNotFoundError
@ -65,7 +64,7 @@ class OvhProvider(BaseProvider):
records = self.get_records(zone_name=zone_name) records = self.get_records(zone_name=zone_name)
exists = True exists = True
except ResourceNotFoundError as e: except ResourceNotFoundError as e:
if text_type(e) != self.ZONE_NOT_FOUND_MESSAGE:
if str(e) != self.ZONE_NOT_FOUND_MESSAGE:
raise raise
exists = False exists = False
records = [] records = []


+ 11
- 11
octodns/provider/plan.py View File

@ -8,7 +8,7 @@ from __future__ import absolute_import, division, print_function, \
from logging import DEBUG, ERROR, INFO, WARN, getLogger from logging import DEBUG, ERROR, INFO, WARN, getLogger
from sys import stdout from sys import stdout
from six import StringIO, text_type
from io import StringIO
class UnsafePlan(Exception): class UnsafePlan(Exception):
@ -130,7 +130,7 @@ class PlanLogger(_PlanOutput):
buf.write('* ') buf.write('* ')
buf.write(target.id) buf.write(target.id)
buf.write(' (') buf.write(' (')
buf.write(text_type(target))
buf.write(str(target))
buf.write(')\n* ') buf.write(')\n* ')
if plan.exists is False: if plan.exists is False:
@ -143,7 +143,7 @@ class PlanLogger(_PlanOutput):
buf.write('\n* ') buf.write('\n* ')
buf.write('Summary: ') buf.write('Summary: ')
buf.write(text_type(plan))
buf.write(str(plan))
buf.write('\n') buf.write('\n')
else: else:
buf.write(hr) buf.write(hr)
@ -155,11 +155,11 @@ class PlanLogger(_PlanOutput):
def _value_stringifier(record, sep): def _value_stringifier(record, sep):
try: try:
values = [text_type(v) for v in record.values]
values = [str(v) for v in record.values]
except AttributeError: except AttributeError:
values = [record.value] values = [record.value]
for code, gv in sorted(getattr(record, 'geo', {}).items()): for code, gv in sorted(getattr(record, 'geo', {}).items()):
vs = ', '.join([text_type(v) for v in gv.values])
vs = ', '.join([str(v) for v in gv.values])
values.append(f'{code}: {vs}') values.append(f'{code}: {vs}')
return sep.join(values) return sep.join(values)
@ -201,7 +201,7 @@ class PlanMarkdown(_PlanOutput):
fh.write(' | ') fh.write(' | ')
# TTL # TTL
if existing: if existing:
fh.write(text_type(existing.ttl))
fh.write(str(existing.ttl))
fh.write(' | ') fh.write(' | ')
fh.write(_value_stringifier(existing, '; ')) fh.write(_value_stringifier(existing, '; '))
fh.write(' | |\n') fh.write(' | |\n')
@ -209,7 +209,7 @@ class PlanMarkdown(_PlanOutput):
fh.write('| | | | ') fh.write('| | | | ')
if new: if new:
fh.write(text_type(new.ttl))
fh.write(str(new.ttl))
fh.write(' | ') fh.write(' | ')
fh.write(_value_stringifier(new, '; ')) fh.write(_value_stringifier(new, '; '))
fh.write(' | ') fh.write(' | ')
@ -218,7 +218,7 @@ class PlanMarkdown(_PlanOutput):
fh.write(' |\n') fh.write(' |\n')
fh.write('\nSummary: ') fh.write('\nSummary: ')
fh.write(text_type(plan))
fh.write(str(plan))
fh.write('\n\n') fh.write('\n\n')
else: else:
fh.write('## No changes were planned\n') fh.write('## No changes were planned\n')
@ -269,7 +269,7 @@ class PlanHtml(_PlanOutput):
# TTL # TTL
if existing: if existing:
fh.write(' <td>') fh.write(' <td>')
fh.write(text_type(existing.ttl))
fh.write(str(existing.ttl))
fh.write('</td>\n <td>') fh.write('</td>\n <td>')
fh.write(_value_stringifier(existing, '<br/>')) fh.write(_value_stringifier(existing, '<br/>'))
fh.write('</td>\n <td></td>\n </tr>\n') fh.write('</td>\n <td></td>\n </tr>\n')
@ -278,7 +278,7 @@ class PlanHtml(_PlanOutput):
if new: if new:
fh.write(' <td>') fh.write(' <td>')
fh.write(text_type(new.ttl))
fh.write(str(new.ttl))
fh.write('</td>\n <td>') fh.write('</td>\n <td>')
fh.write(_value_stringifier(new, '<br/>')) fh.write(_value_stringifier(new, '<br/>'))
fh.write('</td>\n <td>') fh.write('</td>\n <td>')
@ -287,7 +287,7 @@ class PlanHtml(_PlanOutput):
fh.write('</td>\n </tr>\n') fh.write('</td>\n </tr>\n')
fh.write(' <tr>\n <td colspan=6>Summary: ') fh.write(' <tr>\n <td colspan=6>Summary: ')
fh.write(text_type(plan))
fh.write(str(plan))
fh.write('</td>\n </tr>\n</table>\n') fh.write('</td>\n </tr>\n</table>\n')
else: else:
fh.write('<b>No changes were planned</b>') fh.write('<b>No changes were planned</b>')

+ 3
- 5
octodns/provider/route53.py View File

@ -14,8 +14,6 @@ from uuid import uuid4
import logging import logging
import re import re
from six import text_type
from ..equality import EqualityTupleMixin from ..equality import EqualityTupleMixin
from ..record import Record, Update from ..record import Record, Update
from ..record.geo import GeoCodes from ..record.geo import GeoCodes
@ -1081,8 +1079,8 @@ class Route53Provider(BaseProvider):
# for equivalence. # for equivalence.
# E.g 2001:4860:4860:0:0:0:0:8842 -> 2001:4860:4860::8842 # E.g 2001:4860:4860:0:0:0:0:8842 -> 2001:4860:4860::8842
if value: if value:
value = ip_address(text_type(value))
config_ip_address = ip_address(text_type(config['IPAddress']))
value = ip_address(str(value))
config_ip_address = ip_address(str(config['IPAddress']))
else: else:
# No value so give this a None to match value's # No value so give this a None to match value's
config_ip_address = None config_ip_address = None
@ -1107,7 +1105,7 @@ class Route53Provider(BaseProvider):
fqdn, record._type, value) fqdn, record._type, value)
try: try:
ip_address(text_type(value))
ip_address(str(value))
# We're working with an IP, host is the Host header # We're working with an IP, host is the Host header
healthcheck_host = record.healthcheck_host(value=value) healthcheck_host = record.healthcheck_host(value=value)
except (AddressValueError, ValueError): except (AddressValueError, ValueError):


+ 8
- 8
octodns/record/__init__.py View File

@ -9,7 +9,6 @@ from ipaddress import IPv4Address, IPv6Address
from logging import getLogger from logging import getLogger
import re import re
from six import string_types, text_type
from fqdn import FQDN from fqdn import FQDN
from ..equality import EqualityTupleMixin from ..equality import EqualityTupleMixin
@ -83,7 +82,7 @@ class Record(EqualityTupleMixin):
@classmethod @classmethod
def new(cls, zone, name, data, source=None, lenient=False): def new(cls, zone, name, data, source=None, lenient=False):
name = text_type(name)
name = str(name)
fqdn = f'{name}.{zone.name}' if name else zone.name fqdn = f'{name}.{zone.name}' if name else zone.name
try: try:
_type = data['type'] _type = data['type']
@ -153,7 +152,7 @@ class Record(EqualityTupleMixin):
self.__class__.__name__, name) self.__class__.__name__, name)
self.zone = zone self.zone = zone
# force everything lower-case just to be safe # force everything lower-case just to be safe
self.name = text_type(name).lower() if name else name
self.name = str(name).lower() if name else name
self.source = source self.source = source
self.ttl = int(data['ttl']) self.ttl = int(data['ttl'])
@ -222,6 +221,7 @@ class Record(EqualityTupleMixin):
def copy(self, zone=None): def copy(self, zone=None):
data = self.data data = self.data
data['type'] = self._type data['type'] = self._type
data['octodns'] = self._octodns
return Record.new( return Record.new(
zone if zone else self.zone, zone if zone else self.zone,
@ -323,7 +323,7 @@ class _ValuesMixin(object):
return ret return ret
def __repr__(self): def __repr__(self):
values = "', '".join([text_type(v) for v in self.values])
values = "', '".join([str(v) for v in self.values])
klass = self.__class__.__name__ klass = self.__class__.__name__
return f"<{klass} {self._type} {self.ttl}, {self.fqdn}, ['{values}']>" return f"<{klass} {self._type} {self.ttl}, {self.fqdn}, ['{values}']>"
@ -632,7 +632,7 @@ class _DynamicMixin(object):
except KeyError: except KeyError:
geos = [] geos = []
if not isinstance(pool, string_types):
if not isinstance(pool, str):
reasons.append(f'rule {rule_num} invalid pool "{pool}"') reasons.append(f'rule {rule_num} invalid pool "{pool}"')
else: else:
if pool not in pools: if pool not in pools:
@ -738,7 +738,7 @@ class _IpList(object):
reasons.append('missing value(s)') reasons.append('missing value(s)')
else: else:
try: try:
cls._address_type(text_type(value))
cls._address_type(str(value))
except Exception: except Exception:
addr_name = cls._address_name addr_name = cls._address_name
reasons.append(f'invalid {addr_name} address "{value}"') reasons.append(f'invalid {addr_name} address "{value}"')
@ -748,10 +748,10 @@ class _IpList(object):
def process(cls, values): def process(cls, values):
# Translating None into '' so that the list will be sortable in # Translating None into '' so that the list will be sortable in
# python3, get everything to str first # python3, get everything to str first
values = [text_type(v) if v is not None else '' for v in values]
values = [str(v) if v is not None else '' for v in values]
# Now round trip all non-'' through the address type and back to a str # Now round trip all non-'' through the address type and back to a str
# to normalize the address representation. # to normalize the address representation.
return [text_type(cls._address_type(v)) if v != '' else ''
return [str(cls._address_type(v)) if v != '' else ''
for v in values] for v in values]


+ 1
- 2
octodns/source/axfr.py View File

@ -15,7 +15,6 @@ from dns.exception import DNSException
from collections import defaultdict from collections import defaultdict
from os import listdir from os import listdir
from os.path import join from os.path import join
from six import text_type
import logging import logging
from ..record import Record from ..record import Record
@ -222,7 +221,7 @@ class ZoneFileSourceNotFound(ZoneFileSourceException):
class ZoneFileSourceLoadFailure(ZoneFileSourceException): class ZoneFileSourceLoadFailure(ZoneFileSourceException):
def __init__(self, error): def __init__(self, error):
super(ZoneFileSourceLoadFailure, self).__init__(text_type(error))
super(ZoneFileSourceLoadFailure, self).__init__(str(error))
class ZoneFileSource(AxfrBaseSource): class ZoneFileSource(AxfrBaseSource):


+ 1
- 3
octodns/zone.py View File

@ -9,8 +9,6 @@ from collections import defaultdict
from logging import getLogger from logging import getLogger
import re import re
from six import text_type
from .record import Create, Delete from .record import Create, Delete
@ -39,7 +37,7 @@ class Zone(object):
if not name[-1] == '.': if not name[-1] == '.':
raise Exception(f'Invalid zone name {name}, missing ending dot') raise Exception(f'Invalid zone name {name}, missing ending dot')
# Force everything to lowercase just to be safe # Force everything to lowercase just to be safe
self.name = text_type(name).lower() if name else name
self.name = str(name).lower() if name else name
self.sub_zones = sub_zones self.sub_zones = sub_zones
# We're grouping by node, it allows us to efficiently search for # We're grouping by node, it allows us to efficiently search for
# duplicates and detect when CNAMEs co-exist with other records # duplicates and detect when CNAMEs co-exist with other records


+ 0
- 1
requirements.txt View File

@ -25,4 +25,3 @@ python-dateutil==2.8.1
requests==2.24.0 requests==2.24.0
s3transfer==0.3.3 s3transfer==0.3.3
setuptools==44.1.1 setuptools==44.1.1
six==1.15.0

+ 31
- 34
tests/test_octodns_manager.py View File

@ -7,7 +7,6 @@ from __future__ import absolute_import, division, print_function, \
from os import environ from os import environ
from os.path import dirname, join from os.path import dirname, join
from six import text_type
from octodns.manager import _AggregateTarget, MainThreadExecutor, Manager, \ from octodns.manager import _AggregateTarget, MainThreadExecutor, Manager, \
ManagerException ManagerException
@ -34,79 +33,79 @@ class TestManager(TestCase):
def test_missing_provider_class(self): def test_missing_provider_class(self):
with self.assertRaises(ManagerException) as ctx: with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('missing-provider-class.yaml')).sync() Manager(get_config_filename('missing-provider-class.yaml')).sync()
self.assertTrue('missing class' in text_type(ctx.exception))
self.assertTrue('missing class' in str(ctx.exception))
def test_bad_provider_class(self): def test_bad_provider_class(self):
with self.assertRaises(ManagerException) as ctx: with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('bad-provider-class.yaml')).sync() Manager(get_config_filename('bad-provider-class.yaml')).sync()
self.assertTrue('Unknown provider class' in text_type(ctx.exception))
self.assertTrue('Unknown provider class' in str(ctx.exception))
def test_bad_provider_class_module(self): def test_bad_provider_class_module(self):
with self.assertRaises(ManagerException) as ctx: with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('bad-provider-class-module.yaml')) \ Manager(get_config_filename('bad-provider-class-module.yaml')) \
.sync() .sync()
self.assertTrue('Unknown provider class' in text_type(ctx.exception))
self.assertTrue('Unknown provider class' in str(ctx.exception))
def test_bad_provider_class_no_module(self): def test_bad_provider_class_no_module(self):
with self.assertRaises(ManagerException) as ctx: with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('bad-provider-class-no-module.yaml')) \ Manager(get_config_filename('bad-provider-class-no-module.yaml')) \
.sync() .sync()
self.assertTrue('Unknown provider class' in text_type(ctx.exception))
self.assertTrue('Unknown provider class' in str(ctx.exception))
def test_missing_provider_config(self): def test_missing_provider_config(self):
# Missing provider config # Missing provider config
with self.assertRaises(ManagerException) as ctx: with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('missing-provider-config.yaml')).sync() Manager(get_config_filename('missing-provider-config.yaml')).sync()
self.assertTrue('provider config' in text_type(ctx.exception))
self.assertTrue('provider config' in str(ctx.exception))
def test_missing_env_config(self): def test_missing_env_config(self):
with self.assertRaises(ManagerException) as ctx: with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('missing-provider-env.yaml')).sync() Manager(get_config_filename('missing-provider-env.yaml')).sync()
self.assertTrue('missing env var' in text_type(ctx.exception))
self.assertTrue('missing env var' in str(ctx.exception))
def test_missing_source(self): def test_missing_source(self):
with self.assertRaises(ManagerException) as ctx: with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('provider-problems.yaml')) \ Manager(get_config_filename('provider-problems.yaml')) \
.sync(['missing.sources.']) .sync(['missing.sources.'])
self.assertTrue('missing sources' in text_type(ctx.exception))
self.assertTrue('missing sources' in str(ctx.exception))
def test_missing_targets(self): def test_missing_targets(self):
with self.assertRaises(ManagerException) as ctx: with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('provider-problems.yaml')) \ Manager(get_config_filename('provider-problems.yaml')) \
.sync(['missing.targets.']) .sync(['missing.targets.'])
self.assertTrue('missing targets' in text_type(ctx.exception))
self.assertTrue('missing targets' in str(ctx.exception))
def test_unknown_source(self): def test_unknown_source(self):
with self.assertRaises(ManagerException) as ctx: with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('provider-problems.yaml')) \ Manager(get_config_filename('provider-problems.yaml')) \
.sync(['unknown.source.']) .sync(['unknown.source.'])
self.assertTrue('unknown source' in text_type(ctx.exception))
self.assertTrue('unknown source' in str(ctx.exception))
def test_unknown_target(self): def test_unknown_target(self):
with self.assertRaises(ManagerException) as ctx: with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('provider-problems.yaml')) \ Manager(get_config_filename('provider-problems.yaml')) \
.sync(['unknown.target.']) .sync(['unknown.target.'])
self.assertTrue('unknown target' in text_type(ctx.exception))
self.assertTrue('unknown target' in str(ctx.exception))
def test_bad_plan_output_class(self): def test_bad_plan_output_class(self):
with self.assertRaises(ManagerException) as ctx: with self.assertRaises(ManagerException) as ctx:
name = 'bad-plan-output-missing-class.yaml' name = 'bad-plan-output-missing-class.yaml'
Manager(get_config_filename(name)).sync() Manager(get_config_filename(name)).sync()
self.assertEquals('plan_output bad is missing class', self.assertEquals('plan_output bad is missing class',
text_type(ctx.exception))
str(ctx.exception))
def test_bad_plan_output_config(self): def test_bad_plan_output_config(self):
with self.assertRaises(ManagerException) as ctx: with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('bad-plan-output-config.yaml')).sync() Manager(get_config_filename('bad-plan-output-config.yaml')).sync()
self.assertEqual('Incorrect plan_output config for bad', self.assertEqual('Incorrect plan_output config for bad',
text_type(ctx.exception))
str(ctx.exception))
def test_source_only_as_a_target(self): def test_source_only_as_a_target(self):
with self.assertRaises(ManagerException) as ctx: with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('provider-problems.yaml')) \ Manager(get_config_filename('provider-problems.yaml')) \
.sync(['not.targetable.']) .sync(['not.targetable.'])
self.assertTrue('does not support targeting' in self.assertTrue('does not support targeting' in
text_type(ctx.exception))
str(ctx.exception))
def test_always_dry_run(self): def test_always_dry_run(self):
with TemporaryDirectory() as tmpdir: with TemporaryDirectory() as tmpdir:
@ -184,7 +183,7 @@ class TestManager(TestCase):
.sync() .sync()
self.assertEquals('Invalid alias zone alias.tests.: source zone ' self.assertEquals('Invalid alias zone alias.tests.: source zone '
'does-not-exists.tests. does not exist', 'does-not-exists.tests. does not exist',
text_type(ctx.exception))
str(ctx.exception))
# Alias zone that points to another alias zone. # Alias zone that points to another alias zone.
with self.assertRaises(ManagerException) as ctx: with self.assertRaises(ManagerException) as ctx:
@ -192,7 +191,7 @@ class TestManager(TestCase):
.sync() .sync()
self.assertEquals('Invalid alias zone alias-loop.tests.: source ' self.assertEquals('Invalid alias zone alias-loop.tests.: source '
'zone alias.tests. is an alias zone', 'zone alias.tests. is an alias zone',
text_type(ctx.exception))
str(ctx.exception))
# Sync an alias without the zone it refers to # Sync an alias without the zone it refers to
with self.assertRaises(ManagerException) as ctx: with self.assertRaises(ManagerException) as ctx:
@ -200,7 +199,7 @@ class TestManager(TestCase):
.sync(eligible_zones=["alias.tests."]) .sync(eligible_zones=["alias.tests."])
self.assertEquals('Zone alias.tests. cannot be sync without zone ' self.assertEquals('Zone alias.tests. cannot be sync without zone '
'unit.tests. sinced it is aliased', 'unit.tests. sinced it is aliased',
text_type(ctx.exception))
str(ctx.exception))
def test_compare(self): def test_compare(self):
with TemporaryDirectory() as tmpdir: with TemporaryDirectory() as tmpdir:
@ -228,7 +227,7 @@ class TestManager(TestCase):
with self.assertRaises(ManagerException) as ctx: with self.assertRaises(ManagerException) as ctx:
manager.compare(['nope'], ['dump'], 'unit.tests.') manager.compare(['nope'], ['dump'], 'unit.tests.')
self.assertEquals('Unknown source: nope', text_type(ctx.exception))
self.assertEquals('Unknown source: nope', str(ctx.exception))
def test_aggregate_target(self): def test_aggregate_target(self):
simple = SimpleProvider() simple = SimpleProvider()
@ -269,7 +268,7 @@ class TestManager(TestCase):
with self.assertRaises(ManagerException) as ctx: with self.assertRaises(ManagerException) as ctx:
manager.dump('unit.tests.', tmpdir.dirname, False, False, manager.dump('unit.tests.', tmpdir.dirname, False, False,
'nope') 'nope')
self.assertEquals('Unknown source: nope', text_type(ctx.exception))
self.assertEquals('Unknown source: nope', str(ctx.exception))
manager.dump('unit.tests.', tmpdir.dirname, False, False, 'in') manager.dump('unit.tests.', tmpdir.dirname, False, False, 'in')
@ -298,7 +297,7 @@ class TestManager(TestCase):
with self.assertRaises(ManagerException) as ctx: with self.assertRaises(ManagerException) as ctx:
manager.dump('unit.tests.', tmpdir.dirname, False, True, manager.dump('unit.tests.', tmpdir.dirname, False, True,
'nope') 'nope')
self.assertEquals('Unknown source: nope', text_type(ctx.exception))
self.assertEquals('Unknown source: nope', str(ctx.exception))
manager.dump('unit.tests.', tmpdir.dirname, False, True, 'in') manager.dump('unit.tests.', tmpdir.dirname, False, True, 'in')
@ -314,26 +313,24 @@ class TestManager(TestCase):
with self.assertRaises(ManagerException) as ctx: with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('missing-sources.yaml')) \ Manager(get_config_filename('missing-sources.yaml')) \
.validate_configs() .validate_configs()
self.assertTrue('missing sources' in text_type(ctx.exception))
self.assertTrue('missing sources' in str(ctx.exception))
with self.assertRaises(ManagerException) as ctx: with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('unknown-provider.yaml')) \ Manager(get_config_filename('unknown-provider.yaml')) \
.validate_configs() .validate_configs()
self.assertTrue('unknown source' in text_type(ctx.exception))
self.assertTrue('unknown source' in str(ctx.exception))
# Alias zone using an invalid source zone. # Alias zone using an invalid source zone.
with self.assertRaises(ManagerException) as ctx: with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('unknown-source-zone.yaml')) \ Manager(get_config_filename('unknown-source-zone.yaml')) \
.validate_configs() .validate_configs()
self.assertTrue('does not exist' in
text_type(ctx.exception))
self.assertTrue('does not exist' in str(ctx.exception))
# Alias zone that points to another alias zone. # Alias zone that points to another alias zone.
with self.assertRaises(ManagerException) as ctx: with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('alias-zone-loop.yaml')) \ Manager(get_config_filename('alias-zone-loop.yaml')) \
.validate_configs() .validate_configs()
self.assertTrue('is an alias zone' in
text_type(ctx.exception))
self.assertTrue('is an alias zone' in str(ctx.exception))
# Valid config file using an alias zone. # Valid config file using an alias zone.
Manager(get_config_filename('simple-alias-zone.yaml')) \ Manager(get_config_filename('simple-alias-zone.yaml')) \
@ -342,19 +339,19 @@ class TestManager(TestCase):
with self.assertRaises(ManagerException) as ctx: with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('unknown-processor.yaml')) \ Manager(get_config_filename('unknown-processor.yaml')) \
.validate_configs() .validate_configs()
self.assertTrue('unknown processor' in text_type(ctx.exception))
self.assertTrue('unknown processor' in str(ctx.exception))
def test_get_zone(self): def test_get_zone(self):
Manager(get_config_filename('simple.yaml')).get_zone('unit.tests.') Manager(get_config_filename('simple.yaml')).get_zone('unit.tests.')
with self.assertRaises(ManagerException) as ctx: with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('simple.yaml')).get_zone('unit.tests') Manager(get_config_filename('simple.yaml')).get_zone('unit.tests')
self.assertTrue('missing ending dot' in text_type(ctx.exception))
self.assertTrue('missing ending dot' in str(ctx.exception))
with self.assertRaises(ManagerException) as ctx: with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('simple.yaml')) \ Manager(get_config_filename('simple.yaml')) \
.get_zone('unknown-zone.tests.') .get_zone('unknown-zone.tests.')
self.assertTrue('Unknown zone name' in text_type(ctx.exception))
self.assertTrue('Unknown zone name' in str(ctx.exception))
def test_populate_lenient_fallback(self): def test_populate_lenient_fallback(self):
with TemporaryDirectory() as tmpdir: with TemporaryDirectory() as tmpdir:
@ -379,7 +376,7 @@ class TestManager(TestCase):
with self.assertRaises(TypeError) as ctx: with self.assertRaises(TypeError) as ctx:
manager._populate_and_plan('unit.tests.', [], [OtherType()], manager._populate_and_plan('unit.tests.', [], [OtherType()],
[]) [])
self.assertEquals('something else', text_type(ctx.exception))
self.assertEquals('something else', str(ctx.exception))
def test_plan_processors_fallback(self): def test_plan_processors_fallback(self):
with TemporaryDirectory() as tmpdir: with TemporaryDirectory() as tmpdir:
@ -405,7 +402,7 @@ class TestManager(TestCase):
with self.assertRaises(TypeError) as ctx: with self.assertRaises(TypeError) as ctx:
manager._populate_and_plan('unit.tests.', [], [], manager._populate_and_plan('unit.tests.', [], [],
[OtherType()]) [OtherType()])
self.assertEquals('something else', text_type(ctx.exception))
self.assertEquals('something else', str(ctx.exception))
@patch('octodns.manager.Manager._get_named_class') @patch('octodns.manager.Manager._get_named_class')
def test_sync_passes_file_handle(self, mock): def test_sync_passes_file_handle(self, mock):
@ -436,17 +433,17 @@ class TestManager(TestCase):
# This zone specifies a non-existant processor # This zone specifies a non-existant processor
manager.sync(['bad.unit.tests.']) manager.sync(['bad.unit.tests.'])
self.assertTrue('Zone bad.unit.tests., unknown processor: ' self.assertTrue('Zone bad.unit.tests., unknown processor: '
'doesnt-exist' in text_type(ctx.exception))
'doesnt-exist' in str(ctx.exception))
with self.assertRaises(ManagerException) as ctx: with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('processors-missing-class.yaml')) Manager(get_config_filename('processors-missing-class.yaml'))
self.assertTrue('Processor no-class is missing class' in self.assertTrue('Processor no-class is missing class' in
text_type(ctx.exception))
str(ctx.exception))
with self.assertRaises(ManagerException) as ctx: with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('processors-wants-config.yaml')) Manager(get_config_filename('processors-wants-config.yaml'))
self.assertTrue('Incorrect processor config for wants-config' in self.assertTrue('Incorrect processor config for wants-config' in
text_type(ctx.exception))
str(ctx.exception))
def test_processors(self): def test_processors(self):
manager = Manager(get_config_filename('simple.yaml')) manager = Manager(get_config_filename('simple.yaml'))


+ 2
- 3
tests/test_octodns_plan.py View File

@ -5,8 +5,8 @@
from __future__ import absolute_import, division, print_function, \ from __future__ import absolute_import, division, print_function, \
unicode_literals unicode_literals
from io import StringIO
from logging import getLogger from logging import getLogger
from six import StringIO, text_type
from unittest import TestCase from unittest import TestCase
from octodns.provider.plan import Plan, PlanHtml, PlanLogger, PlanMarkdown from octodns.provider.plan import Plan, PlanHtml, PlanLogger, PlanMarkdown
@ -58,8 +58,7 @@ class TestPlanLogger(TestCase):
def test_invalid_level(self): def test_invalid_level(self):
with self.assertRaises(Exception) as ctx: with self.assertRaises(Exception) as ctx:
PlanLogger('invalid', 'not-a-level') PlanLogger('invalid', 'not-a-level')
self.assertEquals('Unsupported level: not-a-level',
text_type(ctx.exception))
self.assertEquals('Unsupported level: not-a-level', str(ctx.exception))
def test_create(self): def test_create(self):


+ 6
- 10
tests/test_octodns_provider_azuredns.py View File

@ -19,7 +19,6 @@ from azure.mgmt.trafficmanager.models import Profile, DnsConfig, \
MonitorConfig, Endpoint, MonitorConfigCustomHeadersItem MonitorConfig, Endpoint, MonitorConfigCustomHeadersItem
from msrestazure.azure_exceptions import CloudError from msrestazure.azure_exceptions import CloudError
from six import text_type
from unittest import TestCase from unittest import TestCase
from mock import Mock, patch, call from mock import Mock, patch, call
@ -992,9 +991,7 @@ class TestAzureDnsProvider(TestCase):
changes = [Create(unsupported_dynamic)] changes = [Create(unsupported_dynamic)]
with self.assertRaises(AzureException) as ctx: with self.assertRaises(AzureException) as ctx:
provider._extra_changes(existing, desired, changes) provider._extra_changes(existing, desired, changes)
self.assertTrue(text_type(ctx).endswith(
'must be of type CNAME'
))
self.assertTrue(str(ctx).endswith('must be of type CNAME'))
desired._remove_record(unsupported_dynamic) desired._remove_record(unsupported_dynamic)
# test colliding ATM names throws exception # test colliding ATM names throws exception
@ -1015,9 +1012,8 @@ class TestAzureDnsProvider(TestCase):
changes = [Create(record1), Create(record2)] changes = [Create(record1), Create(record2)]
with self.assertRaises(AzureException) as ctx: with self.assertRaises(AzureException) as ctx:
provider._extra_changes(existing, desired, changes) provider._extra_changes(existing, desired, changes)
self.assertTrue(text_type(ctx).startswith(
'Collision in Traffic Manager'
))
self.assertTrue(str(ctx)
.startswith('Collision in Traffic Manager'))
@patch( @patch(
'octodns.provider.azuredns.AzureProvider._generate_traffic_managers') 'octodns.provider.azuredns.AzureProvider._generate_traffic_managers')
@ -1062,7 +1058,7 @@ class TestAzureDnsProvider(TestCase):
)] )]
with self.assertRaises(AzureException) as ctx: with self.assertRaises(AzureException) as ctx:
provider._extra_changes(zone, desired, changes) provider._extra_changes(zone, desired, changes)
self.assertTrue('duplicate endpoint' in text_type(ctx))
self.assertTrue('duplicate endpoint' in str(ctx))
def test_extra_changes_A_multi_defaults(self): def test_extra_changes_A_multi_defaults(self):
provider = self._get_provider() provider = self._get_provider()
@ -1088,7 +1084,7 @@ class TestAzureDnsProvider(TestCase):
desired.add_record(record) desired.add_record(record)
with self.assertRaises(AzureException) as ctx: with self.assertRaises(AzureException) as ctx:
provider._extra_changes(zone, desired, []) provider._extra_changes(zone, desired, [])
self.assertEqual('single value' in text_type(ctx))
self.assertEqual('single value' in str(ctx))
def test_generate_tm_profile(self): def test_generate_tm_profile(self):
provider, zone, record = self._get_dynamic_package() provider, zone, record = self._get_dynamic_package()
@ -1194,7 +1190,7 @@ class TestAzureDnsProvider(TestCase):
azrecord.type = f'Microsoft.Network/dnszones/{record._type}' azrecord.type = f'Microsoft.Network/dnszones/{record._type}'
with self.assertRaises(AzureException) as ctx: with self.assertRaises(AzureException) as ctx:
provider._populate_record(zone, azrecord) provider._populate_record(zone, azrecord)
self.assertTrue(text_type(ctx).startswith(
self.assertTrue(str(ctx).startswith(
'Middle East (GEO-ME) is not supported' 'Middle East (GEO-ME) is not supported'
)) ))


+ 17
- 18
tests/test_octodns_provider_base.py View File

@ -7,7 +7,6 @@ from __future__ import absolute_import, division, print_function, \
from logging import getLogger from logging import getLogger
from mock import MagicMock, call from mock import MagicMock, call
from six import text_type
from unittest import TestCase from unittest import TestCase
from octodns.processor.base import BaseProcessor from octodns.processor.base import BaseProcessor
@ -79,7 +78,7 @@ class TestBaseProvider(TestCase):
with self.assertRaises(NotImplementedError) as ctx: with self.assertRaises(NotImplementedError) as ctx:
BaseProvider('base') BaseProvider('base')
self.assertEquals('Abstract base class, log property missing', self.assertEquals('Abstract base class, log property missing',
text_type(ctx.exception))
str(ctx.exception))
class HasLog(BaseProvider): class HasLog(BaseProvider):
log = getLogger('HasLog') log = getLogger('HasLog')
@ -87,7 +86,7 @@ class TestBaseProvider(TestCase):
with self.assertRaises(NotImplementedError) as ctx: with self.assertRaises(NotImplementedError) as ctx:
HasLog('haslog') HasLog('haslog')
self.assertEquals('Abstract base class, SUPPORTS_GEO property missing', self.assertEquals('Abstract base class, SUPPORTS_GEO property missing',
text_type(ctx.exception))
str(ctx.exception))
class HasSupportsGeo(HasLog): class HasSupportsGeo(HasLog):
SUPPORTS_GEO = False SUPPORTS_GEO = False
@ -96,14 +95,14 @@ class TestBaseProvider(TestCase):
with self.assertRaises(NotImplementedError) as ctx: with self.assertRaises(NotImplementedError) as ctx:
HasSupportsGeo('hassupportsgeo').populate(zone) HasSupportsGeo('hassupportsgeo').populate(zone)
self.assertEquals('Abstract base class, SUPPORTS property missing', self.assertEquals('Abstract base class, SUPPORTS property missing',
text_type(ctx.exception))
str(ctx.exception))
class HasSupports(HasSupportsGeo): class HasSupports(HasSupportsGeo):
SUPPORTS = set(('A',)) SUPPORTS = set(('A',))
with self.assertRaises(NotImplementedError) as ctx: with self.assertRaises(NotImplementedError) as ctx:
HasSupports('hassupports').populate(zone) HasSupports('hassupports').populate(zone)
self.assertEquals('Abstract base class, populate method missing', self.assertEquals('Abstract base class, populate method missing',
text_type(ctx.exception))
str(ctx.exception))
# SUPPORTS_DYNAMIC has a default/fallback # SUPPORTS_DYNAMIC has a default/fallback
self.assertFalse(HasSupports('hassupports').SUPPORTS_DYNAMIC) self.assertFalse(HasSupports('hassupports').SUPPORTS_DYNAMIC)
@ -149,7 +148,7 @@ class TestBaseProvider(TestCase):
with self.assertRaises(NotImplementedError) as ctx: with self.assertRaises(NotImplementedError) as ctx:
HasPopulate('haspopulate').apply(plan) HasPopulate('haspopulate').apply(plan)
self.assertEquals('Abstract base class, _apply method missing', self.assertEquals('Abstract base class, _apply method missing',
text_type(ctx.exception))
str(ctx.exception))
def test_plan(self): def test_plan(self):
ignored = Zone('unit.tests.', []) ignored = Zone('unit.tests.', [])
@ -321,7 +320,7 @@ class TestBaseProvider(TestCase):
}) })
for i in range(int(Plan.MIN_EXISTING_RECORDS)): for i in range(int(Plan.MIN_EXISTING_RECORDS)):
zone.add_record(Record.new(zone, text_type(i), {
zone.add_record(Record.new(zone, str(i), {
'ttl': 60, 'ttl': 60,
'type': 'A', 'type': 'A',
'value': '2.3.4.5' 'value': '2.3.4.5'
@ -353,7 +352,7 @@ class TestBaseProvider(TestCase):
}) })
for i in range(int(Plan.MIN_EXISTING_RECORDS)): for i in range(int(Plan.MIN_EXISTING_RECORDS)):
zone.add_record(Record.new(zone, text_type(i), {
zone.add_record(Record.new(zone, str(i), {
'ttl': 60, 'ttl': 60,
'type': 'A', 'type': 'A',
'value': '2.3.4.5' 'value': '2.3.4.5'
@ -366,7 +365,7 @@ class TestBaseProvider(TestCase):
with self.assertRaises(UnsafePlan) as ctx: with self.assertRaises(UnsafePlan) as ctx:
Plan(zone, zone, changes, True).raise_if_unsafe() Plan(zone, zone, changes, True).raise_if_unsafe()
self.assertTrue('Too many updates' in text_type(ctx.exception))
self.assertTrue('Too many updates' in str(ctx.exception))
def test_safe_updates_min_existing_pcent(self): def test_safe_updates_min_existing_pcent(self):
# MAX_SAFE_UPDATE_PCENT is safe when more # MAX_SAFE_UPDATE_PCENT is safe when more
@ -379,7 +378,7 @@ class TestBaseProvider(TestCase):
}) })
for i in range(int(Plan.MIN_EXISTING_RECORDS)): for i in range(int(Plan.MIN_EXISTING_RECORDS)):
zone.add_record(Record.new(zone, text_type(i), {
zone.add_record(Record.new(zone, str(i), {
'ttl': 60, 'ttl': 60,
'type': 'A', 'type': 'A',
'value': '2.3.4.5' 'value': '2.3.4.5'
@ -401,7 +400,7 @@ class TestBaseProvider(TestCase):
}) })
for i in range(int(Plan.MIN_EXISTING_RECORDS)): for i in range(int(Plan.MIN_EXISTING_RECORDS)):
zone.add_record(Record.new(zone, text_type(i), {
zone.add_record(Record.new(zone, str(i), {
'ttl': 60, 'ttl': 60,
'type': 'A', 'type': 'A',
'value': '2.3.4.5' 'value': '2.3.4.5'
@ -414,7 +413,7 @@ class TestBaseProvider(TestCase):
with self.assertRaises(UnsafePlan) as ctx: with self.assertRaises(UnsafePlan) as ctx:
Plan(zone, zone, changes, True).raise_if_unsafe() Plan(zone, zone, changes, True).raise_if_unsafe()
self.assertTrue('Too many deletes' in text_type(ctx.exception))
self.assertTrue('Too many deletes' in str(ctx.exception))
def test_safe_deletes_min_existing_pcent(self): def test_safe_deletes_min_existing_pcent(self):
# MAX_SAFE_DELETE_PCENT is safe when more # MAX_SAFE_DELETE_PCENT is safe when more
@ -427,7 +426,7 @@ class TestBaseProvider(TestCase):
}) })
for i in range(int(Plan.MIN_EXISTING_RECORDS)): for i in range(int(Plan.MIN_EXISTING_RECORDS)):
zone.add_record(Record.new(zone, text_type(i), {
zone.add_record(Record.new(zone, str(i), {
'ttl': 60, 'ttl': 60,
'type': 'A', 'type': 'A',
'value': '2.3.4.5' 'value': '2.3.4.5'
@ -450,7 +449,7 @@ class TestBaseProvider(TestCase):
}) })
for i in range(int(Plan.MIN_EXISTING_RECORDS)): for i in range(int(Plan.MIN_EXISTING_RECORDS)):
zone.add_record(Record.new(zone, text_type(i), {
zone.add_record(Record.new(zone, str(i), {
'ttl': 60, 'ttl': 60,
'type': 'A', 'type': 'A',
'value': '2.3.4.5' 'value': '2.3.4.5'
@ -464,7 +463,7 @@ class TestBaseProvider(TestCase):
Plan(zone, zone, changes, True, Plan(zone, zone, changes, True,
update_pcent_threshold=safe_pcent).raise_if_unsafe() update_pcent_threshold=safe_pcent).raise_if_unsafe()
self.assertTrue('Too many updates' in text_type(ctx.exception))
self.assertTrue('Too many updates' in str(ctx.exception))
def test_safe_deletes_min_existing_override(self): def test_safe_deletes_min_existing_override(self):
safe_pcent = .4 safe_pcent = .4
@ -478,7 +477,7 @@ class TestBaseProvider(TestCase):
}) })
for i in range(int(Plan.MIN_EXISTING_RECORDS)): for i in range(int(Plan.MIN_EXISTING_RECORDS)):
zone.add_record(Record.new(zone, text_type(i), {
zone.add_record(Record.new(zone, str(i), {
'ttl': 60, 'ttl': 60,
'type': 'A', 'type': 'A',
'value': '2.3.4.5' 'value': '2.3.4.5'
@ -492,7 +491,7 @@ class TestBaseProvider(TestCase):
Plan(zone, zone, changes, True, Plan(zone, zone, changes, True,
delete_pcent_threshold=safe_pcent).raise_if_unsafe() delete_pcent_threshold=safe_pcent).raise_if_unsafe()
self.assertTrue('Too many deletes' in text_type(ctx.exception))
self.assertTrue('Too many deletes' in str(ctx.exception))
def test_supports_warn_or_except(self): def test_supports_warn_or_except(self):
class MinimalProvider(BaseProvider): class MinimalProvider(BaseProvider):
@ -515,5 +514,5 @@ class TestBaseProvider(TestCase):
# Should log and not expect # Should log and not expect
with self.assertRaises(SupportsException) as ctx: with self.assertRaises(SupportsException) as ctx:
strict.supports_warn_or_except('Hello World!', 'Will not see') strict.supports_warn_or_except('Hello World!', 'Will not see')
self.assertEquals('minimal: Hello World!', text_type(ctx.exception))
self.assertEquals('minimal: Hello World!', str(ctx.exception))
strict.log.warning.assert_not_called() strict.log.warning.assert_not_called()

+ 6
- 7
tests/test_octodns_provider_cloudflare.py View File

@ -9,7 +9,6 @@ from mock import Mock, call
from os.path import dirname, join from os.path import dirname, join
from requests import HTTPError from requests import HTTPError
from requests_mock import ANY, mock as requests_mock from requests_mock import ANY, mock as requests_mock
from six import text_type
from unittest import TestCase from unittest import TestCase
from octodns.record import Record, Update from octodns.record import Record, Update
@ -67,7 +66,7 @@ class TestCloudflareProvider(TestCase):
provider.populate(zone) provider.populate(zone)
self.assertEquals('CloudflareError', type(ctx.exception).__name__) self.assertEquals('CloudflareError', type(ctx.exception).__name__)
self.assertEquals('request was invalid', text_type(ctx.exception))
self.assertEquals('request was invalid', str(ctx.exception))
# Bad auth # Bad auth
with requests_mock() as mock: with requests_mock() as mock:
@ -82,7 +81,7 @@ class TestCloudflareProvider(TestCase):
self.assertEquals('CloudflareAuthenticationError', self.assertEquals('CloudflareAuthenticationError',
type(ctx.exception).__name__) type(ctx.exception).__name__)
self.assertEquals('Unknown X-Auth-Key or X-Auth-Email', self.assertEquals('Unknown X-Auth-Key or X-Auth-Email',
text_type(ctx.exception))
str(ctx.exception))
# Bad auth, unknown resp # Bad auth, unknown resp
with requests_mock() as mock: with requests_mock() as mock:
@ -93,7 +92,7 @@ class TestCloudflareProvider(TestCase):
provider.populate(zone) provider.populate(zone)
self.assertEquals('CloudflareAuthenticationError', self.assertEquals('CloudflareAuthenticationError',
type(ctx.exception).__name__) type(ctx.exception).__name__)
self.assertEquals('Cloudflare error', text_type(ctx.exception))
self.assertEquals('Cloudflare error', str(ctx.exception))
# General error # General error
with requests_mock() as mock: with requests_mock() as mock:
@ -120,7 +119,7 @@ class TestCloudflareProvider(TestCase):
type(ctx.exception).__name__) type(ctx.exception).__name__)
self.assertEquals('More than 1200 requests per 300 seconds ' self.assertEquals('More than 1200 requests per 300 seconds '
'reached. Please wait and consider throttling ' 'reached. Please wait and consider throttling '
'your request speed', text_type(ctx.exception))
'your request speed', str(ctx.exception))
# Rate Limit error, unknown resp # Rate Limit error, unknown resp
with requests_mock() as mock: with requests_mock() as mock:
@ -132,7 +131,7 @@ class TestCloudflareProvider(TestCase):
self.assertEquals('CloudflareRateLimitError', self.assertEquals('CloudflareRateLimitError',
type(ctx.exception).__name__) type(ctx.exception).__name__)
self.assertEquals('Cloudflare error', text_type(ctx.exception))
self.assertEquals('Cloudflare error', str(ctx.exception))
# Non-existent zone doesn't populate anything # Non-existent zone doesn't populate anything
with requests_mock() as mock: with requests_mock() as mock:
@ -1625,7 +1624,7 @@ class TestCloudflareProvider(TestCase):
] ]
with self.assertRaises(CloudflareRateLimitError) as ctx: with self.assertRaises(CloudflareRateLimitError) as ctx:
provider.zone_records(zone) provider.zone_records(zone)
self.assertEquals('last', text_type(ctx.exception))
self.assertEquals('last', str(ctx.exception))
def test_ttl_mapping(self): def test_ttl_mapping(self):
provider = CloudflareProvider('test', 'email', 'token') provider = CloudflareProvider('test', 'email', 'token')


+ 2
- 3
tests/test_octodns_provider_constellix.py View File

@ -10,7 +10,6 @@ from mock import Mock, call
from os.path import dirname, join from os.path import dirname, join
from requests import HTTPError from requests import HTTPError
from requests_mock import ANY, mock as requests_mock from requests_mock import ANY, mock as requests_mock
from six import text_type
from unittest import TestCase from unittest import TestCase
from octodns.record import Record from octodns.record import Record
@ -156,7 +155,7 @@ class TestConstellixProvider(TestCase):
with self.assertRaises(Exception) as ctx: with self.assertRaises(Exception) as ctx:
zone = Zone('unit.tests.', []) zone = Zone('unit.tests.', [])
provider.populate(zone) provider.populate(zone)
self.assertEquals('Unauthorized', text_type(ctx.exception))
self.assertEquals('Unauthorized', str(ctx.exception))
# Bad request # Bad request
with requests_mock() as mock: with requests_mock() as mock:
@ -168,7 +167,7 @@ class TestConstellixProvider(TestCase):
zone = Zone('unit.tests.', []) zone = Zone('unit.tests.', [])
provider.populate(zone) provider.populate(zone)
self.assertEquals('\n - "unittests" is not a valid domain name', self.assertEquals('\n - "unittests" is not a valid domain name',
text_type(ctx.exception))
str(ctx.exception))
# General error # General error
with requests_mock() as mock: with requests_mock() as mock:


+ 1
- 2
tests/test_octodns_provider_digitalocean.py View File

@ -10,7 +10,6 @@ from mock import Mock, call
from os.path import dirname, join from os.path import dirname, join
from requests import HTTPError from requests import HTTPError
from requests_mock import ANY, mock as requests_mock from requests_mock import ANY, mock as requests_mock
from six import text_type
from unittest import TestCase from unittest import TestCase
from octodns.record import Record from octodns.record import Record
@ -51,7 +50,7 @@ class TestDigitalOceanProvider(TestCase):
with self.assertRaises(Exception) as ctx: with self.assertRaises(Exception) as ctx:
zone = Zone('unit.tests.', []) zone = Zone('unit.tests.', [])
provider.populate(zone) provider.populate(zone)
self.assertEquals('Unauthorized', text_type(ctx.exception))
self.assertEquals('Unauthorized', str(ctx.exception))
# General error # General error
with requests_mock() as mock: with requests_mock() as mock:


+ 1
- 2
tests/test_octodns_provider_dnsimple.py View File

@ -9,7 +9,6 @@ from mock import Mock, call
from os.path import dirname, join from os.path import dirname, join
from requests import HTTPError from requests import HTTPError
from requests_mock import ANY, mock as requests_mock from requests_mock import ANY, mock as requests_mock
from six import text_type
from unittest import TestCase from unittest import TestCase
from octodns.record import Record from octodns.record import Record
@ -54,7 +53,7 @@ class TestDnsimpleProvider(TestCase):
with self.assertRaises(Exception) as ctx: with self.assertRaises(Exception) as ctx:
zone = Zone('unit.tests.', []) zone = Zone('unit.tests.', [])
provider.populate(zone) provider.populate(zone)
self.assertEquals('Unauthorized', text_type(ctx.exception))
self.assertEquals('Unauthorized', str(ctx.exception))
# General error # General error
with requests_mock() as mock: with requests_mock() as mock:


+ 2
- 4
tests/test_octodns_provider_dnsmadeeasy.py View File

@ -10,7 +10,6 @@ from mock import Mock, call
from os.path import dirname, join from os.path import dirname, join
from requests import HTTPError from requests import HTTPError
from requests_mock import ANY, mock as requests_mock from requests_mock import ANY, mock as requests_mock
from six import text_type
from unittest import TestCase from unittest import TestCase
from octodns.record import Record from octodns.record import Record
@ -60,7 +59,7 @@ class TestDnsMadeEasyProvider(TestCase):
with self.assertRaises(Exception) as ctx: with self.assertRaises(Exception) as ctx:
zone = Zone('unit.tests.', []) zone = Zone('unit.tests.', [])
provider.populate(zone) provider.populate(zone)
self.assertEquals('Unauthorized', text_type(ctx.exception))
self.assertEquals('Unauthorized', str(ctx.exception))
# Bad request # Bad request
with requests_mock() as mock: with requests_mock() as mock:
@ -70,8 +69,7 @@ class TestDnsMadeEasyProvider(TestCase):
with self.assertRaises(Exception) as ctx: with self.assertRaises(Exception) as ctx:
zone = Zone('unit.tests.', []) zone = Zone('unit.tests.', [])
provider.populate(zone) provider.populate(zone)
self.assertEquals('\n - Rate limit exceeded',
text_type(ctx.exception))
self.assertEquals('\n - Rate limit exceeded', str(ctx.exception))
# General error # General error
with requests_mock() as mock: with requests_mock() as mock:


+ 4
- 5
tests/test_octodns_provider_easydns.py View File

@ -10,7 +10,6 @@ from mock import Mock, call
from os.path import dirname, join from os.path import dirname, join
from requests import HTTPError from requests import HTTPError
from requests_mock import ANY, mock as requests_mock from requests_mock import ANY, mock as requests_mock
from six import text_type
from unittest import TestCase from unittest import TestCase
from octodns.record import Record from octodns.record import Record
@ -37,7 +36,7 @@ class TestEasyDNSProvider(TestCase):
with self.assertRaises(Exception) as ctx: with self.assertRaises(Exception) as ctx:
zone = Zone('unit.tests.', []) zone = Zone('unit.tests.', [])
provider.populate(zone) provider.populate(zone)
self.assertEquals('Unauthorized', text_type(ctx.exception))
self.assertEquals('Unauthorized', str(ctx.exception))
# Bad request # Bad request
with requests_mock() as mock: with requests_mock() as mock:
@ -48,7 +47,7 @@ class TestEasyDNSProvider(TestCase):
with self.assertRaises(Exception) as ctx: with self.assertRaises(Exception) as ctx:
zone = Zone('unit.tests.', []) zone = Zone('unit.tests.', [])
provider.populate(zone) provider.populate(zone)
self.assertEquals('Bad request', text_type(ctx.exception))
self.assertEquals('Bad request', str(ctx.exception))
# General error # General error
with requests_mock() as mock: with requests_mock() as mock:
@ -102,7 +101,7 @@ class TestEasyDNSProvider(TestCase):
with self.assertRaises(Exception) as ctx: with self.assertRaises(Exception) as ctx:
provider._client.domain('unit.tests') provider._client.domain('unit.tests')
self.assertEquals('Not Found', text_type(ctx.exception))
self.assertEquals('Not Found', str(ctx.exception))
def test_apply_not_found(self): def test_apply_not_found(self):
provider = EasyDNSProvider('test', 'token', 'apikey', provider = EasyDNSProvider('test', 'token', 'apikey',
@ -137,7 +136,7 @@ class TestEasyDNSProvider(TestCase):
with self.assertRaises(Exception) as ctx: with self.assertRaises(Exception) as ctx:
provider.apply(plan) provider.apply(plan)
self.assertEquals('Not Found', text_type(ctx.exception))
self.assertEquals('Not Found', str(ctx.exception))
def test_domain_create(self): def test_domain_create(self):
provider = EasyDNSProvider('test', 'token', 'apikey', provider = EasyDNSProvider('test', 'token', 'apikey',


+ 1
- 2
tests/test_octodns_provider_edgedns.py View File

@ -9,7 +9,6 @@ from __future__ import absolute_import, division, print_function, \
from os.path import dirname, join from os.path import dirname, join
from requests import HTTPError from requests import HTTPError
from requests_mock import ANY, mock as requests_mock from requests_mock import ANY, mock as requests_mock
from six import text_type
from unittest import TestCase from unittest import TestCase
from octodns.record import Record from octodns.record import Record
@ -149,7 +148,7 @@ class TestEdgeDnsProvider(TestCase):
changes = provider.apply(plan) changes = provider.apply(plan)
except NameError as e: except NameError as e:
expected = "contractId not specified to create zone" expected = "contractId not specified to create zone"
self.assertEquals(text_type(e), expected)
self.assertEquals(str(e), expected)
class TestDeprecatedAkamaiProvider(TestCase): class TestDeprecatedAkamaiProvider(TestCase):


+ 5
- 6
tests/test_octodns_provider_gandi.py View File

@ -9,7 +9,6 @@ from mock import Mock, call
from os.path import dirname, join from os.path import dirname, join
from requests import HTTPError from requests import HTTPError
from requests_mock import ANY, mock as requests_mock from requests_mock import ANY, mock as requests_mock
from six import text_type
from unittest import TestCase from unittest import TestCase
from octodns.record import Record from octodns.record import Record
@ -58,7 +57,7 @@ class TestGandiProvider(TestCase):
with self.assertRaises(GandiClientBadRequest) as ctx: with self.assertRaises(GandiClientBadRequest) as ctx:
zone = Zone('unit.tests.', []) zone = Zone('unit.tests.', [])
provider.populate(zone) provider.populate(zone)
self.assertIn('"status": "error"', text_type(ctx.exception))
self.assertIn('"status": "error"', str(ctx.exception))
# 401 - Unauthorized. # 401 - Unauthorized.
with requests_mock() as mock: with requests_mock() as mock:
@ -73,7 +72,7 @@ class TestGandiProvider(TestCase):
with self.assertRaises(GandiClientUnauthorized) as ctx: with self.assertRaises(GandiClientUnauthorized) as ctx:
zone = Zone('unit.tests.', []) zone = Zone('unit.tests.', [])
provider.populate(zone) provider.populate(zone)
self.assertIn('"cause":"Unauthorized"', text_type(ctx.exception))
self.assertIn('"cause":"Unauthorized"', str(ctx.exception))
# 403 - Forbidden. # 403 - Forbidden.
with requests_mock() as mock: with requests_mock() as mock:
@ -85,7 +84,7 @@ class TestGandiProvider(TestCase):
with self.assertRaises(GandiClientForbidden) as ctx: with self.assertRaises(GandiClientForbidden) as ctx:
zone = Zone('unit.tests.', []) zone = Zone('unit.tests.', [])
provider.populate(zone) provider.populate(zone)
self.assertIn('"cause":"Forbidden"', text_type(ctx.exception))
self.assertIn('"cause":"Forbidden"', str(ctx.exception))
# 404 - Not Found. # 404 - Not Found.
with requests_mock() as mock: with requests_mock() as mock:
@ -97,7 +96,7 @@ class TestGandiProvider(TestCase):
with self.assertRaises(GandiClientNotFound) as ctx: with self.assertRaises(GandiClientNotFound) as ctx:
zone = Zone('unit.tests.', []) zone = Zone('unit.tests.', [])
provider._client.zone(zone) provider._client.zone(zone)
self.assertIn('"cause": "Not Found"', text_type(ctx.exception))
self.assertIn('"cause": "Not Found"', str(ctx.exception))
# General error # General error
with requests_mock() as mock: with requests_mock() as mock:
@ -175,7 +174,7 @@ class TestGandiProvider(TestCase):
plan = provider.plan(self.expected) plan = provider.plan(self.expected)
provider.apply(plan) provider.apply(plan)
self.assertIn('This domain is not registered at Gandi.', self.assertIn('This domain is not registered at Gandi.',
text_type(ctx.exception))
str(ctx.exception))
resp = Mock() resp = Mock()
resp.json = Mock() resp.json = Mock()


+ 5
- 6
tests/test_octodns_provider_gcore.py View File

@ -12,7 +12,6 @@ from __future__ import (
from mock import Mock, call from mock import Mock, call
from os.path import dirname, join from os.path import dirname, join
from requests_mock import ANY, mock as requests_mock from requests_mock import ANY, mock as requests_mock
from six import text_type
from unittest import TestCase from unittest import TestCase
from octodns.record import Record, Update, Delete, Create from octodns.record import Record, Update, Delete, Create
@ -52,7 +51,7 @@ class TestGCoreProvider(TestCase):
with self.assertRaises(GCoreClientBadRequest) as ctx: with self.assertRaises(GCoreClientBadRequest) as ctx:
zone = Zone("unit.tests.", []) zone = Zone("unit.tests.", [])
provider.populate(zone) provider.populate(zone)
self.assertIn('"error":"bad body"', text_type(ctx.exception))
self.assertIn('"error":"bad body"', str(ctx.exception))
# TC: 404 - Not Found. # TC: 404 - Not Found.
with requests_mock() as mock: with requests_mock() as mock:
@ -64,7 +63,7 @@ class TestGCoreProvider(TestCase):
zone = Zone("unit.tests.", []) zone = Zone("unit.tests.", [])
provider._client.zone(zone.name) provider._client.zone(zone.name)
self.assertIn( self.assertIn(
'"error":"zone is not found"', text_type(ctx.exception)
'"error":"zone is not found"', str(ctx.exception)
) )
# TC: General error # TC: General error
@ -74,7 +73,7 @@ class TestGCoreProvider(TestCase):
with self.assertRaises(GCoreClientException) as ctx: with self.assertRaises(GCoreClientException) as ctx:
zone = Zone("unit.tests.", []) zone = Zone("unit.tests.", [])
provider.populate(zone) provider.populate(zone)
self.assertEqual("Things caught fire", text_type(ctx.exception))
self.assertEqual("Things caught fire", str(ctx.exception))
# TC: No credentials or token error # TC: No credentials or token error
with requests_mock() as mock: with requests_mock() as mock:
@ -82,7 +81,7 @@ class TestGCoreProvider(TestCase):
GCoreProvider("test_id") GCoreProvider("test_id")
self.assertEqual( self.assertEqual(
"either token or login & password must be set", "either token or login & password must be set",
text_type(ctx.exception),
str(ctx.exception),
) )
# TC: Auth with login password # TC: Auth with login password
@ -230,7 +229,7 @@ class TestGCoreProvider(TestCase):
provider.apply(plan) provider.apply(plan)
self.assertIn( self.assertIn(
"parent zone is already occupied by another client", "parent zone is already occupied by another client",
text_type(ctx.exception),
str(ctx.exception),
) )
resp = Mock() resp = Mock()


+ 1
- 2
tests/test_octodns_provider_hetzner.py View File

@ -10,7 +10,6 @@ from mock import Mock, call
from os.path import dirname, join from os.path import dirname, join
from requests import HTTPError from requests import HTTPError
from requests_mock import ANY, mock as requests_mock from requests_mock import ANY, mock as requests_mock
from six import text_type
from unittest import TestCase from unittest import TestCase
from octodns.record import Record from octodns.record import Record
@ -36,7 +35,7 @@ class TestHetznerProvider(TestCase):
with self.assertRaises(Exception) as ctx: with self.assertRaises(Exception) as ctx:
zone = Zone('unit.tests.', []) zone = Zone('unit.tests.', [])
provider.populate(zone) provider.populate(zone)
self.assertEquals('Unauthorized', text_type(ctx.exception))
self.assertEquals('Unauthorized', str(ctx.exception))
# General error # General error
with requests_mock() as mock: with requests_mock() as mock:


+ 9
- 16
tests/test_octodns_provider_mythicbeasts.py View File

@ -8,7 +8,6 @@ from __future__ import absolute_import, division, print_function, \
from os.path import dirname, join from os.path import dirname, join
from requests_mock import ANY, mock as requests_mock from requests_mock import ANY, mock as requests_mock
from six import text_type
from unittest import TestCase from unittest import TestCase
from octodns.provider.mythicbeasts import MythicBeastsProvider, \ from octodns.provider.mythicbeasts import MythicBeastsProvider, \
@ -32,12 +31,12 @@ class TestMythicBeastsProvider(TestCase):
with self.assertRaises(AssertionError) as err: with self.assertRaises(AssertionError) as err:
add_trailing_dot('unit.tests.') add_trailing_dot('unit.tests.')
self.assertEquals('Value already has trailing dot', self.assertEquals('Value already has trailing dot',
text_type(err.exception))
str(err.exception))
with self.assertRaises(AssertionError) as err: with self.assertRaises(AssertionError) as err:
remove_trailing_dot('unit.tests') remove_trailing_dot('unit.tests')
self.assertEquals('Value already missing trailing dot', self.assertEquals('Value already missing trailing dot',
text_type(err.exception))
str(err.exception))
self.assertEquals(add_trailing_dot('unit.tests'), 'unit.tests.') self.assertEquals(add_trailing_dot('unit.tests'), 'unit.tests.')
self.assertEquals(remove_trailing_dot('unit.tests.'), 'unit.tests') self.assertEquals(remove_trailing_dot('unit.tests.'), 'unit.tests')
@ -91,8 +90,7 @@ class TestMythicBeastsProvider(TestCase):
'', '',
{'raw_values': [{'value': '', 'ttl': 0}]} {'raw_values': [{'value': '', 'ttl': 0}]}
) )
self.assertEquals('Unable to parse MX data',
text_type(err.exception))
self.assertEquals('Unable to parse MX data', str(err.exception))
def test_data_for_CNAME(self): def test_data_for_CNAME(self):
test_data = { test_data = {
@ -129,8 +127,7 @@ class TestMythicBeastsProvider(TestCase):
'', '',
{'raw_values': [{'value': '', 'ttl': 0}]} {'raw_values': [{'value': '', 'ttl': 0}]}
) )
self.assertEquals('Unable to parse SRV data',
text_type(err.exception))
self.assertEquals('Unable to parse SRV data', str(err.exception))
def test_data_for_SSHFP(self): def test_data_for_SSHFP(self):
test_data = { test_data = {
@ -149,8 +146,7 @@ class TestMythicBeastsProvider(TestCase):
'', '',
{'raw_values': [{'value': '', 'ttl': 0}]} {'raw_values': [{'value': '', 'ttl': 0}]}
) )
self.assertEquals('Unable to parse SSHFP data',
text_type(err.exception))
self.assertEquals('Unable to parse SSHFP data', str(err.exception))
def test_data_for_CAA(self): def test_data_for_CAA(self):
test_data = { test_data = {
@ -166,8 +162,7 @@ class TestMythicBeastsProvider(TestCase):
'', '',
{'raw_values': [{'value': '', 'ttl': 0}]} {'raw_values': [{'value': '', 'ttl': 0}]}
) )
self.assertEquals('Unable to parse CAA data',
text_type(err.exception))
self.assertEquals('Unable to parse CAA data', str(err.exception))
def test_command_generation(self): def test_command_generation(self):
zone = Zone('unit.tests.', []) zone = Zone('unit.tests.', [])
@ -312,8 +307,7 @@ class TestMythicBeastsProvider(TestCase):
# Null passwords dict # Null passwords dict
with self.assertRaises(AssertionError) as err: with self.assertRaises(AssertionError) as err:
provider = MythicBeastsProvider('test', None) provider = MythicBeastsProvider('test', None)
self.assertEquals('Passwords must be a dictionary',
text_type(err.exception))
self.assertEquals('Passwords must be a dictionary', str(err.exception))
# Missing password # Missing password
with requests_mock() as mock: with requests_mock() as mock:
@ -323,9 +317,8 @@ class TestMythicBeastsProvider(TestCase):
provider = MythicBeastsProvider('test', dict()) provider = MythicBeastsProvider('test', dict())
zone = Zone('unit.tests.', []) zone = Zone('unit.tests.', [])
provider.populate(zone) provider.populate(zone)
self.assertEquals(
'Missing password for domain: unit.tests',
text_type(err.exception))
self.assertEquals('Missing password for domain: unit.tests',
str(err.exception))
# Failed authentication # Failed authentication
with requests_mock() as mock: with requests_mock() as mock:


+ 5
- 8
tests/test_octodns_provider_ns1.py View File

@ -9,7 +9,6 @@ from collections import defaultdict
from mock import call, patch from mock import call, patch
from ns1.rest.errors import AuthException, RateLimitException, \ from ns1.rest.errors import AuthException, RateLimitException, \
ResourceException ResourceException
from six import text_type
from unittest import TestCase from unittest import TestCase
from octodns.record import Delete, Record, Update from octodns.record import Delete, Record, Update
@ -1584,8 +1583,7 @@ class TestNs1ProviderDynamic(TestCase):
} }
with self.assertRaises(Ns1Exception) as ctx: with self.assertRaises(Ns1Exception) as ctx:
provider._data_for_dynamic('A', ns1_record) provider._data_for_dynamic('A', ns1_record)
self.assertEquals('Unrecognized advanced record',
text_type(ctx.exception))
self.assertEquals('Unrecognized advanced record', str(ctx.exception))
# empty record turns into empty data # empty record turns into empty data
ns1_record = { ns1_record = {
@ -2150,8 +2148,7 @@ class TestNs1ProviderDynamic(TestCase):
records_retrieve_mock.side_effect = ns1_zone['records'] records_retrieve_mock.side_effect = ns1_zone['records']
with self.assertRaises(Ns1Exception) as ctx: with self.assertRaises(Ns1Exception) as ctx:
extra = provider._extra_changes(desired, []) extra = provider._extra_changes(desired, [])
self.assertTrue('Mixed disabled flag in filters' in
text_type(ctx.exception))
self.assertTrue('Mixed disabled flag in filters' in str(ctx.exception))
DESIRED = Zone('unit.tests.', []) DESIRED = Zone('unit.tests.', [])
@ -2231,14 +2228,14 @@ class TestNs1ProviderDynamic(TestCase):
apply_update_mock.reset_mock() apply_update_mock.reset_mock()
with self.assertRaises(Ns1Exception) as ctx: with self.assertRaises(Ns1Exception) as ctx:
provider._apply(dynamic_plan) provider._apply(dynamic_plan)
self.assertTrue('monitor_regions not set' in text_type(ctx.exception))
self.assertTrue('monitor_regions not set' in str(ctx.exception))
apply_update_mock.assert_not_called() apply_update_mock.assert_not_called()
# Blows up and apply not called even though there's a simple # Blows up and apply not called even though there's a simple
apply_update_mock.reset_mock() apply_update_mock.reset_mock()
with self.assertRaises(Ns1Exception) as ctx: with self.assertRaises(Ns1Exception) as ctx:
provider._apply(both_plan) provider._apply(both_plan)
self.assertTrue('monitor_regions not set' in text_type(ctx.exception))
self.assertTrue('monitor_regions not set' in str(ctx.exception))
apply_update_mock.assert_not_called() apply_update_mock.assert_not_called()
# with monitor_regions set # with monitor_regions set
@ -2296,7 +2293,7 @@ class TestNs1Client(TestCase):
] ]
with self.assertRaises(RateLimitException) as ctx: with self.assertRaises(RateLimitException) as ctx:
client.zones_retrieve('unit.tests') client.zones_retrieve('unit.tests')
self.assertEquals('last', text_type(ctx.exception))
self.assertEquals('last', str(ctx.exception))
def test_client_config(self): def test_client_config(self):
with self.assertRaises(TypeError): with self.assertRaises(TypeError):


+ 3
- 4
tests/test_octodns_provider_powerdns.py View File

@ -9,7 +9,6 @@ from json import loads, dumps
from os.path import dirname, join from os.path import dirname, join
from requests import HTTPError from requests import HTTPError
from requests_mock import ANY, mock as requests_mock from requests_mock import ANY, mock as requests_mock
from six import text_type
from unittest import TestCase from unittest import TestCase
from octodns.record import Record from octodns.record import Record
@ -51,7 +50,7 @@ class TestPowerDnsProvider(TestCase):
with self.assertRaises(Exception) as ctx: with self.assertRaises(Exception) as ctx:
provider.powerdns_version provider.powerdns_version
self.assertTrue('unauthorized' in text_type(ctx.exception))
self.assertTrue('unauthorized' in str(ctx.exception))
# Api not found # Api not found
with requests_mock() as mock: with requests_mock() as mock:
@ -59,7 +58,7 @@ class TestPowerDnsProvider(TestCase):
with self.assertRaises(Exception) as ctx: with self.assertRaises(Exception) as ctx:
provider.powerdns_version provider.powerdns_version
self.assertTrue('404' in text_type(ctx.exception))
self.assertTrue('404' in str(ctx.exception))
# Test version detection # Test version detection
with requests_mock() as mock: with requests_mock() as mock:
@ -150,7 +149,7 @@ class TestPowerDnsProvider(TestCase):
with self.assertRaises(Exception) as ctx: with self.assertRaises(Exception) as ctx:
zone = Zone('unit.tests.', []) zone = Zone('unit.tests.', [])
provider.populate(zone) provider.populate(zone)
self.assertTrue('unauthorized' in text_type(ctx.exception))
self.assertTrue('unauthorized' in str(ctx.exception))
# General error # General error
with requests_mock() as mock: with requests_mock() as mock:


+ 2
- 3
tests/test_octodns_provider_rackspace.py View File

@ -7,8 +7,7 @@ from __future__ import absolute_import, division, print_function, \
import json import json
import re import re
from six import text_type
from six.moves.urllib.parse import urlparse
from urllib.parse import urlparse
from unittest import TestCase from unittest import TestCase
from requests import HTTPError from requests import HTTPError
@ -53,7 +52,7 @@ class TestRackspaceProvider(TestCase):
with self.assertRaises(Exception) as ctx: with self.assertRaises(Exception) as ctx:
zone = Zone('unit.tests.', []) zone = Zone('unit.tests.', [])
self.provider.populate(zone) self.provider.populate(zone)
self.assertTrue('unauthorized' in text_type(ctx.exception))
self.assertTrue('unauthorized' in str(ctx.exception))
self.assertTrue(mock.called_once) self.assertTrue(mock.called_once)
def test_server_error(self): def test_server_error(self):


+ 1
- 2
tests/test_octodns_provider_route53.py View File

@ -7,7 +7,6 @@ from __future__ import absolute_import, division, print_function, \
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
from botocore.stub import ANY, Stubber from botocore.stub import ANY, Stubber
from six import text_type
from unittest import TestCase from unittest import TestCase
from mock import patch from mock import patch
@ -2400,7 +2399,7 @@ class TestRoute53Provider(TestCase):
provider, plan = self._get_test_plan(1) provider, plan = self._get_test_plan(1)
with self.assertRaises(Exception) as ctx: with self.assertRaises(Exception) as ctx:
provider.apply(plan) provider.apply(plan)
self.assertTrue('modifications' in text_type(ctx.exception))
self.assertTrue('modifications' in str(ctx.exception))
def test_semicolon_fixup(self): def test_semicolon_fixup(self):
provider = Route53Provider('test', 'abc', '123') provider = Route53Provider('test', 'abc', '123')


+ 1
- 2
tests/test_octodns_provider_selectel.py View File

@ -6,7 +6,6 @@ from __future__ import absolute_import, division, print_function, \
unicode_literals unicode_literals
from unittest import TestCase from unittest import TestCase
from six import text_type
import requests_mock import requests_mock
@ -288,7 +287,7 @@ class TestSelectelProvider(TestCase):
with self.assertRaises(Exception) as ctx: with self.assertRaises(Exception) as ctx:
SelectelProvider(123, 'fail_token') SelectelProvider(123, 'fail_token')
self.assertEquals(text_type(ctx.exception),
self.assertEquals(str(ctx.exception),
'Authorization failed. Invalid or empty token.') 'Authorization failed. Invalid or empty token.')
@requests_mock.Mocker() @requests_mock.Mocker()


+ 5
- 6
tests/test_octodns_provider_ultra.py View File

@ -4,8 +4,7 @@ from mock import Mock, call
from os.path import dirname, join from os.path import dirname, join
from requests import HTTPError from requests import HTTPError
from requests_mock import ANY, mock as requests_mock from requests_mock import ANY, mock as requests_mock
from six import text_type
from six.moves.urllib import parse
from urllib.parse import parse_qs
from unittest import TestCase from unittest import TestCase
from json import load as json_load from json import load as json_load
@ -45,7 +44,7 @@ class TestUltraProvider(TestCase):
text='{"errorCode": 60001}') text='{"errorCode": 60001}')
with self.assertRaises(Exception) as ctx: with self.assertRaises(Exception) as ctx:
UltraProvider('test', 'account', 'user', 'wrongpass') UltraProvider('test', 'account', 'user', 'wrongpass')
self.assertEquals('Unauthorized', text_type(ctx.exception))
self.assertEquals('Unauthorized', str(ctx.exception))
# Good Auth # Good Auth
with requests_mock() as mock: with requests_mock() as mock:
@ -58,8 +57,8 @@ class TestUltraProvider(TestCase):
self.assertEquals(1, mock.call_count) self.assertEquals(1, mock.call_count)
expected_payload = "grant_type=password&username=user&"\ expected_payload = "grant_type=password&username=user&"\
"password=rightpass" "password=rightpass"
self.assertEquals(parse.parse_qs(mock.last_request.text),
parse.parse_qs(expected_payload))
self.assertEquals(parse_qs(mock.last_request.text),
parse_qs(expected_payload))
def test_get_zones(self): def test_get_zones(self):
provider = _get_provider() provider = _get_provider()
@ -145,7 +144,7 @@ class TestUltraProvider(TestCase):
headers={'Authorization': 'Bearer 123'}, json={}) headers={'Authorization': 'Bearer 123'}, json={})
with self.assertRaises(Exception) as ctx: with self.assertRaises(Exception) as ctx:
provider._get(path) provider._get(path)
self.assertEquals('Unauthorized', text_type(ctx.exception))
self.assertEquals('Unauthorized', str(ctx.exception))
# Test all GET patterns # Test all GET patterns
with requests_mock() as mock: with requests_mock() as mock:


+ 2
- 3
tests/test_octodns_provider_yaml.py View File

@ -8,7 +8,6 @@ from __future__ import absolute_import, division, print_function, \
from os import makedirs from os import makedirs
from os.path import basename, dirname, isdir, isfile, join from os.path import basename, dirname, isdir, isfile, join
from unittest import TestCase from unittest import TestCase
from six import text_type
from yaml import safe_load from yaml import safe_load
from yaml.constructor import ConstructorError from yaml.constructor import ConstructorError
@ -187,7 +186,7 @@ class TestYamlProvider(TestCase):
with self.assertRaises(SubzoneRecordException) as ctx: with self.assertRaises(SubzoneRecordException) as ctx:
source.populate(zone) source.populate(zone)
self.assertEquals('Record www.sub.unit.tests. is under a managed ' self.assertEquals('Record www.sub.unit.tests. is under a managed '
'subzone', text_type(ctx.exception))
'subzone', str(ctx.exception))
class TestSplitYamlProvider(TestCase): class TestSplitYamlProvider(TestCase):
@ -385,7 +384,7 @@ class TestSplitYamlProvider(TestCase):
with self.assertRaises(SubzoneRecordException) as ctx: with self.assertRaises(SubzoneRecordException) as ctx:
source.populate(zone) source.populate(zone)
self.assertEquals('Record www.sub.unit.tests. is under a managed ' self.assertEquals('Record www.sub.unit.tests. is under a managed '
'subzone', text_type(ctx.exception))
'subzone', str(ctx.exception))
class TestOverridingYamlProvider(TestCase): class TestOverridingYamlProvider(TestCase):


+ 33
- 3
tests/test_octodns_record.py View File

@ -5,7 +5,6 @@
from __future__ import absolute_import, division, print_function, \ from __future__ import absolute_import, division, print_function, \
unicode_literals unicode_literals
from six import text_type
from unittest import TestCase from unittest import TestCase
from octodns.record import ARecord, AaaaRecord, AliasRecord, CaaRecord, \ from octodns.record import ARecord, AaaaRecord, AliasRecord, CaaRecord, \
@ -1013,14 +1012,14 @@ class TestRecord(TestCase):
# Missing type # Missing type
with self.assertRaises(Exception) as ctx: with self.assertRaises(Exception) as ctx:
Record.new(self.zone, 'unknown', {}) Record.new(self.zone, 'unknown', {})
self.assertTrue('missing type' in text_type(ctx.exception))
self.assertTrue('missing type' in str(ctx.exception))
# Unknown type # Unknown type
with self.assertRaises(Exception) as ctx: with self.assertRaises(Exception) as ctx:
Record.new(self.zone, 'unknown', { Record.new(self.zone, 'unknown', {
'type': 'XXX', 'type': 'XXX',
}) })
self.assertTrue('Unknown record type' in text_type(ctx.exception))
self.assertTrue('Unknown record type' in str(ctx.exception))
def test_record_copy(self): def test_record_copy(self):
a = Record.new(self.zone, 'a', { a = Record.new(self.zone, 'a', {
@ -1055,6 +1054,37 @@ class TestRecord(TestCase):
d.copy() d.copy()
self.assertEquals('TXT', d._type) self.assertEquals('TXT', d._type)
def test_dynamic_record_copy(self):
a_data = {
'dynamic': {
'pools': {
'one': {
'values': [{
'value': '3.3.3.3',
}],
},
},
'rules': [{
'pool': 'one',
}],
},
'octodns': {
'healthcheck': {
'protocol': 'TCP',
'port': 80,
},
},
'ttl': 60,
'type': 'A',
'values': [
'1.1.1.1',
'2.2.2.2',
],
}
record1 = Record.new(self.zone, 'a', a_data)
record2 = record1.copy()
self.assertEqual(record1._octodns, record2._octodns)
def test_change(self): def test_change(self):
existing = Record.new(self.zone, 'txt', { existing = Record.new(self.zone, 'txt', {
'ttl': 44, 'ttl': 44,


+ 3
- 4
tests/test_octodns_source_axfr.py View File

@ -11,7 +11,6 @@ from dns.exception import DNSException
from mock import patch from mock import patch
from os.path import exists from os.path import exists
from shutil import copyfile from shutil import copyfile
from six import text_type
from unittest import TestCase from unittest import TestCase
from octodns.source.axfr import AxfrSource, AxfrSourceZoneTransferFailed, \ from octodns.source.axfr import AxfrSource, AxfrSourceZoneTransferFailed, \
@ -42,7 +41,7 @@ class TestAxfrSource(TestCase):
zone = Zone('unit.tests.', []) zone = Zone('unit.tests.', [])
self.source.populate(zone) self.source.populate(zone)
self.assertEquals('Unable to Perform Zone Transfer', self.assertEquals('Unable to Perform Zone Transfer',
text_type(ctx.exception))
str(ctx.exception))
class TestZoneFileSource(TestCase): class TestZoneFileSource(TestCase):
@ -99,7 +98,7 @@ class TestZoneFileSource(TestCase):
zone = Zone('invalid.zone.', []) zone = Zone('invalid.zone.', [])
self.source.populate(zone) self.source.populate(zone)
self.assertEquals('The DNS zone has no NS RRset at its origin.', self.assertEquals('The DNS zone has no NS RRset at its origin.',
text_type(ctx.exception))
str(ctx.exception))
# Records are not to RFC (lenient=False) # Records are not to RFC (lenient=False)
with self.assertRaises(ValidationError) as ctx: with self.assertRaises(ValidationError) as ctx:
@ -107,7 +106,7 @@ class TestZoneFileSource(TestCase):
self.source.populate(zone) self.source.populate(zone)
self.assertEquals('Invalid record _invalid.invalid.records.\n' self.assertEquals('Invalid record _invalid.invalid.records.\n'
' - invalid name for SRV record', ' - invalid name for SRV record',
text_type(ctx.exception))
str(ctx.exception))
# Records are not to RFC, but load anyhow (lenient=True) # Records are not to RFC, but load anyhow (lenient=True)
invalid = Zone('invalid.records.', []) invalid = Zone('invalid.records.', [])


+ 1
- 2
tests/test_octodns_source_envvar.py View File

@ -1,5 +1,4 @@
from mock import patch from mock import patch
from six import text_type
from unittest import TestCase from unittest import TestCase
from octodns.source.envvar import EnvVarSource from octodns.source.envvar import EnvVarSource
@ -15,7 +14,7 @@ class TestEnvVarSource(TestCase):
with self.assertRaises(EnvironmentVariableNotFoundException) as ctx: with self.assertRaises(EnvironmentVariableNotFoundException) as ctx:
source._read_variable() source._read_variable()
msg = f'Unknown environment variable {envvar}' msg = f'Unknown environment variable {envvar}'
self.assertEquals(msg, text_type(ctx.exception))
self.assertEquals(msg, str(ctx.exception))
with patch.dict('os.environ', {envvar: 'testvalue'}): with patch.dict('os.environ', {envvar: 'testvalue'}):
value = source._read_variable() value = source._read_variable()


+ 1
- 1
tests/test_octodns_yaml.py View File

@ -5,7 +5,7 @@
from __future__ import absolute_import, division, print_function, \ from __future__ import absolute_import, division, print_function, \
unicode_literals unicode_literals
from six import StringIO
from io import StringIO
from unittest import TestCase from unittest import TestCase
from yaml.constructor import ConstructorError from yaml.constructor import ConstructorError


+ 5
- 6
tests/test_octodns_zone.py View File

@ -6,7 +6,6 @@ from __future__ import absolute_import, division, print_function, \
unicode_literals unicode_literals
from unittest import TestCase from unittest import TestCase
from six import text_type
from octodns.record import ARecord, AaaaRecord, Create, Delete, Record, Update from octodns.record import ARecord, AaaaRecord, Create, Delete, Record, Update
from octodns.zone import DuplicateRecordException, InvalidNodeException, \ from octodns.zone import DuplicateRecordException, InvalidNodeException, \
@ -48,7 +47,7 @@ class TestZone(TestCase):
with self.assertRaises(DuplicateRecordException) as ctx: with self.assertRaises(DuplicateRecordException) as ctx:
zone.add_record(a) zone.add_record(a)
self.assertEquals('Duplicate record a.unit.tests., type A', self.assertEquals('Duplicate record a.unit.tests., type A',
text_type(ctx.exception))
str(ctx.exception))
self.assertEquals(zone.records, set([a])) self.assertEquals(zone.records, set([a]))
# can add duplicate with replace=True # can add duplicate with replace=True
@ -138,7 +137,7 @@ class TestZone(TestCase):
def test_missing_dot(self): def test_missing_dot(self):
with self.assertRaises(Exception) as ctx: with self.assertRaises(Exception) as ctx:
Zone('not.allowed', []) Zone('not.allowed', [])
self.assertTrue('missing ending dot' in text_type(ctx.exception))
self.assertTrue('missing ending dot' in str(ctx.exception))
def test_sub_zones(self): def test_sub_zones(self):
@ -161,7 +160,7 @@ class TestZone(TestCase):
}) })
with self.assertRaises(SubzoneRecordException) as ctx: with self.assertRaises(SubzoneRecordException) as ctx:
zone.add_record(record) zone.add_record(record)
self.assertTrue('not of type NS', text_type(ctx.exception))
self.assertTrue('not of type NS', str(ctx.exception))
# Can add it w/lenient # Can add it w/lenient
zone.add_record(record, lenient=True) zone.add_record(record, lenient=True)
self.assertEquals(set([record]), zone.records) self.assertEquals(set([record]), zone.records)
@ -175,7 +174,7 @@ class TestZone(TestCase):
}) })
with self.assertRaises(SubzoneRecordException) as ctx: with self.assertRaises(SubzoneRecordException) as ctx:
zone.add_record(record) zone.add_record(record)
self.assertTrue('under a managed sub-zone', text_type(ctx.exception))
self.assertTrue('under a managed sub-zone', str(ctx.exception))
# Can add it w/lenient # Can add it w/lenient
zone.add_record(record, lenient=True) zone.add_record(record, lenient=True)
self.assertEquals(set([record]), zone.records) self.assertEquals(set([record]), zone.records)
@ -189,7 +188,7 @@ class TestZone(TestCase):
}) })
with self.assertRaises(SubzoneRecordException) as ctx: with self.assertRaises(SubzoneRecordException) as ctx:
zone.add_record(record) zone.add_record(record)
self.assertTrue('under a managed sub-zone', text_type(ctx.exception))
self.assertTrue('under a managed sub-zone', str(ctx.exception))
# Can add it w/lenient # Can add it w/lenient
zone.add_record(record, lenient=True) zone.add_record(record, lenient=True)
self.assertEquals(set([record]), zone.records) self.assertEquals(set([record]), zone.records)


Loading…
Cancel
Save