Browse Source

Merge branch 'route53-hosted-zone-id' of https://github.com/slandry90/octodns into route53-hosted-zone-id

pull/783/head
slandry 4 years ago
parent
commit
2e9b116f8a
43 changed files with 201 additions and 225 deletions
  1. +3
    -5
      octodns/cmds/report.py
  2. +2
    -3
      octodns/manager.py
  3. +1
    -1
      octodns/provider/azuredns.py
  4. +2
    -4
      octodns/provider/base.py
  5. +1
    -2
      octodns/provider/constellix.py
  6. +1
    -1
      octodns/provider/edgedns.py
  7. +5
    -7
      octodns/provider/ns1.py
  8. +1
    -2
      octodns/provider/ovh.py
  9. +11
    -11
      octodns/provider/plan.py
  10. +3
    -5
      octodns/provider/route53.py
  11. +1
    -1
      octodns/provider/yaml.py
  12. +8
    -8
      octodns/record/__init__.py
  13. +1
    -2
      octodns/source/axfr.py
  14. +1
    -1
      octodns/source/base.py
  15. +1
    -3
      octodns/zone.py
  16. +0
    -1
      requirements.txt
  17. +31
    -34
      tests/test_octodns_manager.py
  18. +2
    -3
      tests/test_octodns_plan.py
  19. +6
    -10
      tests/test_octodns_provider_azuredns.py
  20. +21
    -22
      tests/test_octodns_provider_base.py
  21. +6
    -7
      tests/test_octodns_provider_cloudflare.py
  22. +2
    -3
      tests/test_octodns_provider_constellix.py
  23. +1
    -2
      tests/test_octodns_provider_digitalocean.py
  24. +1
    -2
      tests/test_octodns_provider_dnsimple.py
  25. +2
    -4
      tests/test_octodns_provider_dnsmadeeasy.py
  26. +4
    -5
      tests/test_octodns_provider_easydns.py
  27. +1
    -2
      tests/test_octodns_provider_edgedns.py
  28. +5
    -6
      tests/test_octodns_provider_gandi.py
  29. +5
    -6
      tests/test_octodns_provider_gcore.py
  30. +1
    -2
      tests/test_octodns_provider_hetzner.py
  31. +9
    -16
      tests/test_octodns_provider_mythicbeasts.py
  32. +5
    -8
      tests/test_octodns_provider_ns1.py
  33. +3
    -4
      tests/test_octodns_provider_powerdns.py
  34. +2
    -3
      tests/test_octodns_provider_rackspace.py
  35. +1
    -2
      tests/test_octodns_provider_route53.py
  36. +1
    -2
      tests/test_octodns_provider_selectel.py
  37. +5
    -6
      tests/test_octodns_provider_ultra.py
  38. +2
    -3
      tests/test_octodns_provider_yaml.py
  39. +33
    -3
      tests/test_octodns_record.py
  40. +3
    -4
      tests/test_octodns_source_axfr.py
  41. +1
    -2
      tests/test_octodns_source_envvar.py
  42. +1
    -1
      tests/test_octodns_yaml.py
  43. +5
    -6
      tests/test_octodns_zone.py

+ 3
- 5
octodns/cmds/report.py View File

@ -13,8 +13,6 @@ from logging import getLogger
from sys import stdout
import re
from six import text_type
from octodns.cmds.args import ArgumentParser
from octodns.manager import Manager
@ -67,7 +65,7 @@ def main():
resolver = AsyncResolver(configure=False,
num_workers=int(args.num_workers))
if not ip_addr_re.match(server):
server = text_type(query(server, 'A')[0])
server = str(query(server, 'A')[0])
log.info('server=%s', server)
resolver.nameservers = [server]
resolver.lifetime = int(args.timeout)
@ -83,12 +81,12 @@ def main():
stdout.write(',')
stdout.write(record._type)
stdout.write(',')
stdout.write(text_type(record.ttl))
stdout.write(str(record.ttl))
compare = {}
for future in futures:
stdout.write(',')
try:
answers = [text_type(r) for r in future.result()]
answers = [str(r) for r in future.result()]
except (NoAnswer, NoNameservers):
answers = ['*no answer*']
except NXDOMAIN:


+ 2
- 3
octodns/manager.py View File

@ -8,7 +8,6 @@ from __future__ import absolute_import, division, print_function, \
from concurrent.futures import ThreadPoolExecutor
from importlib import import_module
from os import environ
from six import text_type
from sys import stdout
import logging
@ -259,7 +258,7 @@ class Manager(object):
source.populate(zone, lenient=lenient)
except TypeError as e:
if ("unexpected keyword argument 'lenient'"
not in text_type(e)):
not in str(e)):
raise
self.log.warn('provider %s does not accept lenient '
'param', source.__class__.__name__)
@ -282,7 +281,7 @@ class Manager(object):
try:
plan = target.plan(zone, processors=processors)
except TypeError as e:
if "keyword argument 'processors'" not in text_type(e):
if "keyword argument 'processors'" not in str(e):
raise
self.log.warn('provider.plan %s does not accept processors '
'param', target.__class__.__name__)


+ 1
- 1
octodns/provider/azuredns.py View File

@ -457,7 +457,7 @@ class AzureProvider(BaseProvider):
'''
SUPPORTS_GEO = False
SUPPORTS_DYNAMIC = True
SUPPORTS_MUTLIVALUE_PTR = True
SUPPORTS_MULTIVALUE_PTR = True
SUPPORTS = set(('A', 'AAAA', 'CAA', 'CNAME', 'MX', 'NS', 'PTR', 'SRV',
'TXT'))


+ 2
- 4
octodns/provider/base.py View File

@ -5,8 +5,6 @@
from __future__ import absolute_import, division, print_function, \
unicode_literals
from six import text_type
from ..source.base import BaseSource
from ..zone import Zone
from .plan import Plan
@ -66,7 +64,7 @@ class BaseProvider(BaseSource):
record.dynamic = None
desired.add_record(record, replace=True)
elif record._type == 'PTR' and len(record.values) > 1 and \
not self.SUPPORTS_MUTLIVALUE_PTR:
not self.SUPPORTS_MULTIVALUE_PTR:
# replace with a single-value copy
msg = \
f'multi-value PTR records not supported for {record.fqdn}'
@ -134,7 +132,7 @@ class BaseProvider(BaseSource):
changes=changes)
if extra:
self.log.info('plan: extra changes\n %s', '\n '
.join([text_type(c) for c in extra]))
.join([str(c) for c in extra]))
changes += extra
if changes:


+ 1
- 2
octodns/provider/constellix.py View File

@ -8,7 +8,6 @@ from __future__ import absolute_import, division, print_function, \
from collections import defaultdict
from requests import Session
from base64 import b64encode
from six import string_types
from pycountry_convert import country_alpha2_to_continent_code
import hashlib
import hmac
@ -142,7 +141,7 @@ class ConstellixClient(object):
# change relative values to absolute
value = record['value']
if record['type'] in ['ALIAS', 'CNAME', 'MX', 'NS', 'SRV']:
if isinstance(value, string_types):
if isinstance(value, str):
record['value'] = self._absolutize_value(value,
zone_name)
if isinstance(value, list):


+ 1
- 1
octodns/provider/edgedns.py View File

@ -7,8 +7,8 @@ from __future__ import absolute_import, division, print_function, \
from requests import Session
from akamai.edgegrid import EdgeGridAuth
from six.moves.urllib.parse import urljoin
from collections import defaultdict
from urllib.parse import urljoin
from logging import getLogger
from ..record import Record


+ 5
- 7
octodns/provider/ns1.py View File

@ -14,8 +14,6 @@ from pycountry_convert import country_alpha2_to_continent_code
from time import sleep
from uuid import uuid4
from six import text_type
from ..record import Record, Update
from . import ProviderException
from .base import BaseProvider
@ -306,7 +304,7 @@ class Ns1Provider(BaseProvider):
'''
SUPPORTS_GEO = True
SUPPORTS_DYNAMIC = True
SUPPORTS_MUTLIVALUE_PTR = True
SUPPORTS_MULTIVALUE_PTR = True
SUPPORTS = set(('A', 'AAAA', 'ALIAS', 'CAA', 'CNAME', 'MX', 'NAPTR',
'NS', 'PTR', 'SPF', 'SRV', 'TXT', 'URLFWD'))
@ -528,9 +526,9 @@ class Ns1Provider(BaseProvider):
else:
values.extend(answer['answer'])
codes.append([])
values = [text_type(x) for x in values]
values = [str(x) for x in values]
geo = OrderedDict(
{text_type(k): [text_type(x) for x in v] for k, v in geo.items()}
{str(k): [str(x) for x in v] for k, v in geo.items()}
)
data['values'] = values
data['geo'] = geo
@ -564,7 +562,7 @@ class Ns1Provider(BaseProvider):
meta = answer['meta']
notes = self._parse_notes(meta.get('note', ''))
value = text_type(answer['answer'][0])
value = str(answer['answer'][0])
if notes.get('from', False) == '--default--':
# It's a final/default value, record it and move on
default.add(value)
@ -723,7 +721,7 @@ class Ns1Provider(BaseProvider):
return {
'ttl': record['ttl'],
'type': _type,
'values': [text_type(x) for x in record['short_answers']]
'values': [str(x) for x in record['short_answers']]
}
_data_for_AAAA = _data_for_A


+ 1
- 2
octodns/provider/ovh.py View File

@ -9,7 +9,6 @@ import base64
import binascii
import logging
from collections import defaultdict
from six import text_type
import ovh
from ovh import ResourceNotFoundError
@ -65,7 +64,7 @@ class OvhProvider(BaseProvider):
records = self.get_records(zone_name=zone_name)
exists = True
except ResourceNotFoundError as e:
if text_type(e) != self.ZONE_NOT_FOUND_MESSAGE:
if str(e) != self.ZONE_NOT_FOUND_MESSAGE:
raise
exists = False
records = []


+ 11
- 11
octodns/provider/plan.py View File

@ -8,7 +8,7 @@ from __future__ import absolute_import, division, print_function, \
from logging import DEBUG, ERROR, INFO, WARN, getLogger
from sys import stdout
from six import StringIO, text_type
from io import StringIO
class UnsafePlan(Exception):
@ -130,7 +130,7 @@ class PlanLogger(_PlanOutput):
buf.write('* ')
buf.write(target.id)
buf.write(' (')
buf.write(text_type(target))
buf.write(str(target))
buf.write(')\n* ')
if plan.exists is False:
@ -143,7 +143,7 @@ class PlanLogger(_PlanOutput):
buf.write('\n* ')
buf.write('Summary: ')
buf.write(text_type(plan))
buf.write(str(plan))
buf.write('\n')
else:
buf.write(hr)
@ -155,11 +155,11 @@ class PlanLogger(_PlanOutput):
def _value_stringifier(record, sep):
try:
values = [text_type(v) for v in record.values]
values = [str(v) for v in record.values]
except AttributeError:
values = [record.value]
for code, gv in sorted(getattr(record, 'geo', {}).items()):
vs = ', '.join([text_type(v) for v in gv.values])
vs = ', '.join([str(v) for v in gv.values])
values.append(f'{code}: {vs}')
return sep.join(values)
@ -201,7 +201,7 @@ class PlanMarkdown(_PlanOutput):
fh.write(' | ')
# TTL
if existing:
fh.write(text_type(existing.ttl))
fh.write(str(existing.ttl))
fh.write(' | ')
fh.write(_value_stringifier(existing, '; '))
fh.write(' | |\n')
@ -209,7 +209,7 @@ class PlanMarkdown(_PlanOutput):
fh.write('| | | | ')
if new:
fh.write(text_type(new.ttl))
fh.write(str(new.ttl))
fh.write(' | ')
fh.write(_value_stringifier(new, '; '))
fh.write(' | ')
@ -218,7 +218,7 @@ class PlanMarkdown(_PlanOutput):
fh.write(' |\n')
fh.write('\nSummary: ')
fh.write(text_type(plan))
fh.write(str(plan))
fh.write('\n\n')
else:
fh.write('## No changes were planned\n')
@ -269,7 +269,7 @@ class PlanHtml(_PlanOutput):
# TTL
if existing:
fh.write(' <td>')
fh.write(text_type(existing.ttl))
fh.write(str(existing.ttl))
fh.write('</td>\n <td>')
fh.write(_value_stringifier(existing, '<br/>'))
fh.write('</td>\n <td></td>\n </tr>\n')
@ -278,7 +278,7 @@ class PlanHtml(_PlanOutput):
if new:
fh.write(' <td>')
fh.write(text_type(new.ttl))
fh.write(str(new.ttl))
fh.write('</td>\n <td>')
fh.write(_value_stringifier(new, '<br/>'))
fh.write('</td>\n <td>')
@ -287,7 +287,7 @@ class PlanHtml(_PlanOutput):
fh.write('</td>\n </tr>\n')
fh.write(' <tr>\n <td colspan=6>Summary: ')
fh.write(text_type(plan))
fh.write(str(plan))
fh.write('</td>\n </tr>\n</table>\n')
else:
fh.write('<b>No changes were planned</b>')

+ 3
- 5
octodns/provider/route53.py View File

@ -14,8 +14,6 @@ from uuid import uuid4
import logging
import re
from six import text_type
from ..equality import EqualityTupleMixin
from ..record import Record, Update
from ..record.geo import GeoCodes
@ -1104,8 +1102,8 @@ class Route53Provider(BaseProvider):
# for equivalence.
# E.g 2001:4860:4860:0:0:0:0:8842 -> 2001:4860:4860::8842
if value:
value = ip_address(text_type(value))
config_ip_address = ip_address(text_type(config['IPAddress']))
value = ip_address(str(value))
config_ip_address = ip_address(str(config['IPAddress']))
else:
# No value so give this a None to match value's
config_ip_address = None
@ -1130,7 +1128,7 @@ class Route53Provider(BaseProvider):
fqdn, record._type, value)
try:
ip_address(text_type(value))
ip_address(str(value))
# We're working with an IP, host is the Host header
healthcheck_host = record.healthcheck_host(value=value)
except (AddressValueError, ValueError):


+ 1
- 1
octodns/provider/yaml.py View File

@ -104,7 +104,7 @@ class YamlProvider(BaseProvider):
'''
SUPPORTS_GEO = True
SUPPORTS_DYNAMIC = True
SUPPORTS_MUTLIVALUE_PTR = True
SUPPORTS_MULTIVALUE_PTR = True
SUPPORTS = set(('A', 'AAAA', 'ALIAS', 'CAA', 'CNAME', 'DNAME', 'LOC', 'MX',
'NAPTR', 'NS', 'PTR', 'SSHFP', 'SPF', 'SRV', 'TXT',
'URLFWD'))


+ 8
- 8
octodns/record/__init__.py View File

@ -9,7 +9,6 @@ from ipaddress import IPv4Address, IPv6Address
from logging import getLogger
import re
from six import string_types, text_type
from fqdn import FQDN
from ..equality import EqualityTupleMixin
@ -83,7 +82,7 @@ class Record(EqualityTupleMixin):
@classmethod
def new(cls, zone, name, data, source=None, lenient=False):
name = text_type(name)
name = str(name)
fqdn = f'{name}.{zone.name}' if name else zone.name
try:
_type = data['type']
@ -153,7 +152,7 @@ class Record(EqualityTupleMixin):
self.__class__.__name__, name)
self.zone = zone
# force everything lower-case just to be safe
self.name = text_type(name).lower() if name else name
self.name = str(name).lower() if name else name
self.source = source
self.ttl = int(data['ttl'])
@ -222,6 +221,7 @@ class Record(EqualityTupleMixin):
def copy(self, zone=None):
data = self.data
data['type'] = self._type
data['octodns'] = self._octodns
return Record.new(
zone if zone else self.zone,
@ -323,7 +323,7 @@ class _ValuesMixin(object):
return ret
def __repr__(self):
values = "', '".join([text_type(v) for v in self.values])
values = "', '".join([str(v) for v in self.values])
klass = self.__class__.__name__
return f"<{klass} {self._type} {self.ttl}, {self.fqdn}, ['{values}']>"
@ -623,7 +623,7 @@ class _DynamicMixin(object):
except KeyError:
geos = []
if not isinstance(pool, string_types):
if not isinstance(pool, str):
reasons.append(f'rule {rule_num} invalid pool "{pool}"')
else:
if pool not in pools:
@ -729,7 +729,7 @@ class _IpList(object):
reasons.append('missing value(s)')
else:
try:
cls._address_type(text_type(value))
cls._address_type(str(value))
except Exception:
addr_name = cls._address_name
reasons.append(f'invalid {addr_name} address "{value}"')
@ -739,10 +739,10 @@ class _IpList(object):
def process(cls, values):
# Translating None into '' so that the list will be sortable in
# python3, get everything to str first
values = [text_type(v) if v is not None else '' for v in values]
values = [str(v) if v is not None else '' for v in values]
# Now round trip all non-'' through the address type and back to a str
# to normalize the address representation.
return [text_type(cls._address_type(v)) if v != '' else ''
return [str(cls._address_type(v)) if v != '' else ''
for v in values]


+ 1
- 2
octodns/source/axfr.py View File

@ -15,7 +15,6 @@ from dns.exception import DNSException
from collections import defaultdict
from os import listdir
from os.path import join
from six import text_type
import logging
from ..record import Record
@ -222,7 +221,7 @@ class ZoneFileSourceNotFound(ZoneFileSourceException):
class ZoneFileSourceLoadFailure(ZoneFileSourceException):
def __init__(self, error):
super(ZoneFileSourceLoadFailure, self).__init__(text_type(error))
super(ZoneFileSourceLoadFailure, self).__init__(str(error))
class ZoneFileSource(AxfrBaseSource):


+ 1
- 1
octodns/source/base.py View File

@ -8,7 +8,7 @@ from __future__ import absolute_import, division, print_function, \
class BaseSource(object):
SUPPORTS_MUTLIVALUE_PTR = False
SUPPORTS_MULTIVALUE_PTR = False
def __init__(self, id):
self.id = id


+ 1
- 3
octodns/zone.py View File

@ -9,8 +9,6 @@ from collections import defaultdict
from logging import getLogger
import re
from six import text_type
from .record import Create, Delete
@ -39,7 +37,7 @@ class Zone(object):
if not name[-1] == '.':
raise Exception(f'Invalid zone name {name}, missing ending dot')
# Force everything to lowercase just to be safe
self.name = text_type(name).lower() if name else name
self.name = str(name).lower() if name else name
self.sub_zones = sub_zones
# We're grouping by node, it allows us to efficiently search for
# duplicates and detect when CNAMEs co-exist with other records


+ 0
- 1
requirements.txt View File

@ -25,4 +25,3 @@ python-dateutil==2.8.1
requests==2.24.0
s3transfer==0.3.3
setuptools==44.1.1
six==1.15.0

+ 31
- 34
tests/test_octodns_manager.py View File

@ -7,7 +7,6 @@ from __future__ import absolute_import, division, print_function, \
from os import environ
from os.path import dirname, join
from six import text_type
from octodns.manager import _AggregateTarget, MainThreadExecutor, Manager, \
ManagerException
@ -34,79 +33,79 @@ class TestManager(TestCase):
def test_missing_provider_class(self):
with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('missing-provider-class.yaml')).sync()
self.assertTrue('missing class' in text_type(ctx.exception))
self.assertTrue('missing class' in str(ctx.exception))
def test_bad_provider_class(self):
with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('bad-provider-class.yaml')).sync()
self.assertTrue('Unknown provider class' in text_type(ctx.exception))
self.assertTrue('Unknown provider class' in str(ctx.exception))
def test_bad_provider_class_module(self):
with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('bad-provider-class-module.yaml')) \
.sync()
self.assertTrue('Unknown provider class' in text_type(ctx.exception))
self.assertTrue('Unknown provider class' in str(ctx.exception))
def test_bad_provider_class_no_module(self):
with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('bad-provider-class-no-module.yaml')) \
.sync()
self.assertTrue('Unknown provider class' in text_type(ctx.exception))
self.assertTrue('Unknown provider class' in str(ctx.exception))
def test_missing_provider_config(self):
# Missing provider config
with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('missing-provider-config.yaml')).sync()
self.assertTrue('provider config' in text_type(ctx.exception))
self.assertTrue('provider config' in str(ctx.exception))
def test_missing_env_config(self):
with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('missing-provider-env.yaml')).sync()
self.assertTrue('missing env var' in text_type(ctx.exception))
self.assertTrue('missing env var' in str(ctx.exception))
def test_missing_source(self):
with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('provider-problems.yaml')) \
.sync(['missing.sources.'])
self.assertTrue('missing sources' in text_type(ctx.exception))
self.assertTrue('missing sources' in str(ctx.exception))
def test_missing_targets(self):
with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('provider-problems.yaml')) \
.sync(['missing.targets.'])
self.assertTrue('missing targets' in text_type(ctx.exception))
self.assertTrue('missing targets' in str(ctx.exception))
def test_unknown_source(self):
with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('provider-problems.yaml')) \
.sync(['unknown.source.'])
self.assertTrue('unknown source' in text_type(ctx.exception))
self.assertTrue('unknown source' in str(ctx.exception))
def test_unknown_target(self):
with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('provider-problems.yaml')) \
.sync(['unknown.target.'])
self.assertTrue('unknown target' in text_type(ctx.exception))
self.assertTrue('unknown target' in str(ctx.exception))
def test_bad_plan_output_class(self):
with self.assertRaises(ManagerException) as ctx:
name = 'bad-plan-output-missing-class.yaml'
Manager(get_config_filename(name)).sync()
self.assertEquals('plan_output bad is missing class',
text_type(ctx.exception))
str(ctx.exception))
def test_bad_plan_output_config(self):
with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('bad-plan-output-config.yaml')).sync()
self.assertEqual('Incorrect plan_output config for bad',
text_type(ctx.exception))
str(ctx.exception))
def test_source_only_as_a_target(self):
with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('provider-problems.yaml')) \
.sync(['not.targetable.'])
self.assertTrue('does not support targeting' in
text_type(ctx.exception))
str(ctx.exception))
def test_always_dry_run(self):
with TemporaryDirectory() as tmpdir:
@ -184,7 +183,7 @@ class TestManager(TestCase):
.sync()
self.assertEquals('Invalid alias zone alias.tests.: source zone '
'does-not-exists.tests. does not exist',
text_type(ctx.exception))
str(ctx.exception))
# Alias zone that points to another alias zone.
with self.assertRaises(ManagerException) as ctx:
@ -192,7 +191,7 @@ class TestManager(TestCase):
.sync()
self.assertEquals('Invalid alias zone alias-loop.tests.: source '
'zone alias.tests. is an alias zone',
text_type(ctx.exception))
str(ctx.exception))
# Sync an alias without the zone it refers to
with self.assertRaises(ManagerException) as ctx:
@ -200,7 +199,7 @@ class TestManager(TestCase):
.sync(eligible_zones=["alias.tests."])
self.assertEquals('Zone alias.tests. cannot be sync without zone '
'unit.tests. sinced it is aliased',
text_type(ctx.exception))
str(ctx.exception))
def test_compare(self):
with TemporaryDirectory() as tmpdir:
@ -228,7 +227,7 @@ class TestManager(TestCase):
with self.assertRaises(ManagerException) as ctx:
manager.compare(['nope'], ['dump'], 'unit.tests.')
self.assertEquals('Unknown source: nope', text_type(ctx.exception))
self.assertEquals('Unknown source: nope', str(ctx.exception))
def test_aggregate_target(self):
simple = SimpleProvider()
@ -269,7 +268,7 @@ class TestManager(TestCase):
with self.assertRaises(ManagerException) as ctx:
manager.dump('unit.tests.', tmpdir.dirname, False, False,
'nope')
self.assertEquals('Unknown source: nope', text_type(ctx.exception))
self.assertEquals('Unknown source: nope', str(ctx.exception))
manager.dump('unit.tests.', tmpdir.dirname, False, False, 'in')
@ -298,7 +297,7 @@ class TestManager(TestCase):
with self.assertRaises(ManagerException) as ctx:
manager.dump('unit.tests.', tmpdir.dirname, False, True,
'nope')
self.assertEquals('Unknown source: nope', text_type(ctx.exception))
self.assertEquals('Unknown source: nope', str(ctx.exception))
manager.dump('unit.tests.', tmpdir.dirname, False, True, 'in')
@ -314,26 +313,24 @@ class TestManager(TestCase):
with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('missing-sources.yaml')) \
.validate_configs()
self.assertTrue('missing sources' in text_type(ctx.exception))
self.assertTrue('missing sources' in str(ctx.exception))
with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('unknown-provider.yaml')) \
.validate_configs()
self.assertTrue('unknown source' in text_type(ctx.exception))
self.assertTrue('unknown source' in str(ctx.exception))
# Alias zone using an invalid source zone.
with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('unknown-source-zone.yaml')) \
.validate_configs()
self.assertTrue('does not exist' in
text_type(ctx.exception))
self.assertTrue('does not exist' in str(ctx.exception))
# Alias zone that points to another alias zone.
with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('alias-zone-loop.yaml')) \
.validate_configs()
self.assertTrue('is an alias zone' in
text_type(ctx.exception))
self.assertTrue('is an alias zone' in str(ctx.exception))
# Valid config file using an alias zone.
Manager(get_config_filename('simple-alias-zone.yaml')) \
@ -342,19 +339,19 @@ class TestManager(TestCase):
with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('unknown-processor.yaml')) \
.validate_configs()
self.assertTrue('unknown processor' in text_type(ctx.exception))
self.assertTrue('unknown processor' in str(ctx.exception))
def test_get_zone(self):
Manager(get_config_filename('simple.yaml')).get_zone('unit.tests.')
with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('simple.yaml')).get_zone('unit.tests')
self.assertTrue('missing ending dot' in text_type(ctx.exception))
self.assertTrue('missing ending dot' in str(ctx.exception))
with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('simple.yaml')) \
.get_zone('unknown-zone.tests.')
self.assertTrue('Unknown zone name' in text_type(ctx.exception))
self.assertTrue('Unknown zone name' in str(ctx.exception))
def test_populate_lenient_fallback(self):
with TemporaryDirectory() as tmpdir:
@ -379,7 +376,7 @@ class TestManager(TestCase):
with self.assertRaises(TypeError) as ctx:
manager._populate_and_plan('unit.tests.', [], [OtherType()],
[])
self.assertEquals('something else', text_type(ctx.exception))
self.assertEquals('something else', str(ctx.exception))
def test_plan_processors_fallback(self):
with TemporaryDirectory() as tmpdir:
@ -405,7 +402,7 @@ class TestManager(TestCase):
with self.assertRaises(TypeError) as ctx:
manager._populate_and_plan('unit.tests.', [], [],
[OtherType()])
self.assertEquals('something else', text_type(ctx.exception))
self.assertEquals('something else', str(ctx.exception))
@patch('octodns.manager.Manager._get_named_class')
def test_sync_passes_file_handle(self, mock):
@ -436,17 +433,17 @@ class TestManager(TestCase):
# This zone specifies a non-existant processor
manager.sync(['bad.unit.tests.'])
self.assertTrue('Zone bad.unit.tests., unknown processor: '
'doesnt-exist' in text_type(ctx.exception))
'doesnt-exist' in str(ctx.exception))
with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('processors-missing-class.yaml'))
self.assertTrue('Processor no-class is missing class' in
text_type(ctx.exception))
str(ctx.exception))
with self.assertRaises(ManagerException) as ctx:
Manager(get_config_filename('processors-wants-config.yaml'))
self.assertTrue('Incorrect processor config for wants-config' in
text_type(ctx.exception))
str(ctx.exception))
def test_processors(self):
manager = Manager(get_config_filename('simple.yaml'))


+ 2
- 3
tests/test_octodns_plan.py View File

@ -5,8 +5,8 @@
from __future__ import absolute_import, division, print_function, \
unicode_literals
from io import StringIO
from logging import getLogger
from six import StringIO, text_type
from unittest import TestCase
from octodns.provider.plan import Plan, PlanHtml, PlanLogger, PlanMarkdown
@ -58,8 +58,7 @@ class TestPlanLogger(TestCase):
def test_invalid_level(self):
with self.assertRaises(Exception) as ctx:
PlanLogger('invalid', 'not-a-level')
self.assertEquals('Unsupported level: not-a-level',
text_type(ctx.exception))
self.assertEquals('Unsupported level: not-a-level', str(ctx.exception))
def test_create(self):


+ 6
- 10
tests/test_octodns_provider_azuredns.py View File

@ -19,7 +19,6 @@ from azure.mgmt.trafficmanager.models import Profile, DnsConfig, \
MonitorConfig, Endpoint, MonitorConfigCustomHeadersItem
from msrestazure.azure_exceptions import CloudError
from six import text_type
from unittest import TestCase
from mock import Mock, patch, call
@ -992,9 +991,7 @@ class TestAzureDnsProvider(TestCase):
changes = [Create(unsupported_dynamic)]
with self.assertRaises(AzureException) as ctx:
provider._extra_changes(existing, desired, changes)
self.assertTrue(text_type(ctx).endswith(
'must be of type CNAME'
))
self.assertTrue(str(ctx).endswith('must be of type CNAME'))
desired._remove_record(unsupported_dynamic)
# test colliding ATM names throws exception
@ -1015,9 +1012,8 @@ class TestAzureDnsProvider(TestCase):
changes = [Create(record1), Create(record2)]
with self.assertRaises(AzureException) as ctx:
provider._extra_changes(existing, desired, changes)
self.assertTrue(text_type(ctx).startswith(
'Collision in Traffic Manager'
))
self.assertTrue(str(ctx)
.startswith('Collision in Traffic Manager'))
@patch(
'octodns.provider.azuredns.AzureProvider._generate_traffic_managers')
@ -1062,7 +1058,7 @@ class TestAzureDnsProvider(TestCase):
)]
with self.assertRaises(AzureException) as ctx:
provider._extra_changes(zone, desired, changes)
self.assertTrue('duplicate endpoint' in text_type(ctx))
self.assertTrue('duplicate endpoint' in str(ctx))
def test_extra_changes_A_multi_defaults(self):
provider = self._get_provider()
@ -1088,7 +1084,7 @@ class TestAzureDnsProvider(TestCase):
desired.add_record(record)
with self.assertRaises(AzureException) as ctx:
provider._extra_changes(zone, desired, [])
self.assertEqual('single value' in text_type(ctx))
self.assertEqual('single value' in str(ctx))
def test_generate_tm_profile(self):
provider, zone, record = self._get_dynamic_package()
@ -1194,7 +1190,7 @@ class TestAzureDnsProvider(TestCase):
azrecord.type = f'Microsoft.Network/dnszones/{record._type}'
with self.assertRaises(AzureException) as ctx:
provider._populate_record(zone, azrecord)
self.assertTrue(text_type(ctx).startswith(
self.assertTrue(str(ctx).startswith(
'Middle East (GEO-ME) is not supported'
))


+ 21
- 22
tests/test_octodns_provider_base.py View File

@ -7,7 +7,6 @@ from __future__ import absolute_import, division, print_function, \
from logging import getLogger
from mock import MagicMock, call
from six import text_type
from unittest import TestCase
from octodns.processor.base import BaseProcessor
@ -22,7 +21,7 @@ class HelperProvider(BaseProvider):
log = getLogger('HelperProvider')
SUPPORTS = set(('A', 'PTR'))
SUPPORTS_MUTLIVALUE_PTR = False
SUPPORTS_MULTIVALUE_PTR = False
SUPPORTS_DYNAMIC = False
id = 'test'
@ -79,7 +78,7 @@ class TestBaseProvider(TestCase):
with self.assertRaises(NotImplementedError) as ctx:
BaseProvider('base')
self.assertEquals('Abstract base class, log property missing',
text_type(ctx.exception))
str(ctx.exception))
class HasLog(BaseProvider):
log = getLogger('HasLog')
@ -87,7 +86,7 @@ class TestBaseProvider(TestCase):
with self.assertRaises(NotImplementedError) as ctx:
HasLog('haslog')
self.assertEquals('Abstract base class, SUPPORTS_GEO property missing',
text_type(ctx.exception))
str(ctx.exception))
class HasSupportsGeo(HasLog):
SUPPORTS_GEO = False
@ -96,14 +95,14 @@ class TestBaseProvider(TestCase):
with self.assertRaises(NotImplementedError) as ctx:
HasSupportsGeo('hassupportsgeo').populate(zone)
self.assertEquals('Abstract base class, SUPPORTS property missing',
text_type(ctx.exception))
str(ctx.exception))
class HasSupports(HasSupportsGeo):
SUPPORTS = set(('A',))
with self.assertRaises(NotImplementedError) as ctx:
HasSupports('hassupports').populate(zone)
self.assertEquals('Abstract base class, populate method missing',
text_type(ctx.exception))
str(ctx.exception))
# SUPPORTS_DYNAMIC has a default/fallback
self.assertFalse(HasSupports('hassupports').SUPPORTS_DYNAMIC)
@ -149,7 +148,7 @@ class TestBaseProvider(TestCase):
with self.assertRaises(NotImplementedError) as ctx:
HasPopulate('haspopulate').apply(plan)
self.assertEquals('Abstract base class, _apply method missing',
text_type(ctx.exception))
str(ctx.exception))
def test_plan(self):
ignored = Zone('unit.tests.', [])
@ -239,8 +238,8 @@ class TestBaseProvider(TestCase):
def test_process_desired_zone(self):
provider = HelperProvider('test')
# SUPPORTS_MUTLIVALUE_PTR
provider.SUPPORTS_MUTLIVALUE_PTR = False
# SUPPORTS_MULTIVALUE_PTR
provider.SUPPORTS_MULTIVALUE_PTR = False
zone1 = Zone('unit.tests.', [])
record1 = Record.new(zone1, 'ptr', {
'type': 'PTR',
@ -253,7 +252,7 @@ class TestBaseProvider(TestCase):
record2 = list(zone2.records)[0]
self.assertEqual(len(record2.values), 1)
provider.SUPPORTS_MUTLIVALUE_PTR = True
provider.SUPPORTS_MULTIVALUE_PTR = True
zone2 = provider._process_desired_zone(zone1.copy())
record2 = list(zone2.records)[0]
from pprint import pprint
@ -321,7 +320,7 @@ class TestBaseProvider(TestCase):
})
for i in range(int(Plan.MIN_EXISTING_RECORDS)):
zone.add_record(Record.new(zone, text_type(i), {
zone.add_record(Record.new(zone, str(i), {
'ttl': 60,
'type': 'A',
'value': '2.3.4.5'
@ -353,7 +352,7 @@ class TestBaseProvider(TestCase):
})
for i in range(int(Plan.MIN_EXISTING_RECORDS)):
zone.add_record(Record.new(zone, text_type(i), {
zone.add_record(Record.new(zone, str(i), {
'ttl': 60,
'type': 'A',
'value': '2.3.4.5'
@ -366,7 +365,7 @@ class TestBaseProvider(TestCase):
with self.assertRaises(UnsafePlan) as ctx:
Plan(zone, zone, changes, True).raise_if_unsafe()
self.assertTrue('Too many updates' in text_type(ctx.exception))
self.assertTrue('Too many updates' in str(ctx.exception))
def test_safe_updates_min_existing_pcent(self):
# MAX_SAFE_UPDATE_PCENT is safe when more
@ -379,7 +378,7 @@ class TestBaseProvider(TestCase):
})
for i in range(int(Plan.MIN_EXISTING_RECORDS)):
zone.add_record(Record.new(zone, text_type(i), {
zone.add_record(Record.new(zone, str(i), {
'ttl': 60,
'type': 'A',
'value': '2.3.4.5'
@ -401,7 +400,7 @@ class TestBaseProvider(TestCase):
})
for i in range(int(Plan.MIN_EXISTING_RECORDS)):
zone.add_record(Record.new(zone, text_type(i), {
zone.add_record(Record.new(zone, str(i), {
'ttl': 60,
'type': 'A',
'value': '2.3.4.5'
@ -414,7 +413,7 @@ class TestBaseProvider(TestCase):
with self.assertRaises(UnsafePlan) as ctx:
Plan(zone, zone, changes, True).raise_if_unsafe()
self.assertTrue('Too many deletes' in text_type(ctx.exception))
self.assertTrue('Too many deletes' in str(ctx.exception))
def test_safe_deletes_min_existing_pcent(self):
# MAX_SAFE_DELETE_PCENT is safe when more
@ -427,7 +426,7 @@ class TestBaseProvider(TestCase):
})
for i in range(int(Plan.MIN_EXISTING_RECORDS)):
zone.add_record(Record.new(zone, text_type(i), {
zone.add_record(Record.new(zone, str(i), {
'ttl': 60,
'type': 'A',
'value': '2.3.4.5'
@ -450,7 +449,7 @@ class TestBaseProvider(TestCase):
})
for i in range(int(Plan.MIN_EXISTING_RECORDS)):
zone.add_record(Record.new(zone, text_type(i), {
zone.add_record(Record.new(zone, str(i), {
'ttl': 60,
'type': 'A',
'value': '2.3.4.5'
@ -464,7 +463,7 @@ class TestBaseProvider(TestCase):
Plan(zone, zone, changes, True,
update_pcent_threshold=safe_pcent).raise_if_unsafe()
self.assertTrue('Too many updates' in text_type(ctx.exception))
self.assertTrue('Too many updates' in str(ctx.exception))
def test_safe_deletes_min_existing_override(self):
safe_pcent = .4
@ -478,7 +477,7 @@ class TestBaseProvider(TestCase):
})
for i in range(int(Plan.MIN_EXISTING_RECORDS)):
zone.add_record(Record.new(zone, text_type(i), {
zone.add_record(Record.new(zone, str(i), {
'ttl': 60,
'type': 'A',
'value': '2.3.4.5'
@ -492,7 +491,7 @@ class TestBaseProvider(TestCase):
Plan(zone, zone, changes, True,
delete_pcent_threshold=safe_pcent).raise_if_unsafe()
self.assertTrue('Too many deletes' in text_type(ctx.exception))
self.assertTrue('Too many deletes' in str(ctx.exception))
def test_supports_warn_or_except(self):
class MinimalProvider(BaseProvider):
@ -515,5 +514,5 @@ class TestBaseProvider(TestCase):
# Should log and not expect
with self.assertRaises(SupportsException) as ctx:
strict.supports_warn_or_except('Hello World!', 'Will not see')
self.assertEquals('minimal: Hello World!', text_type(ctx.exception))
self.assertEquals('minimal: Hello World!', str(ctx.exception))
strict.log.warning.assert_not_called()

+ 6
- 7
tests/test_octodns_provider_cloudflare.py View File

@ -9,7 +9,6 @@ from mock import Mock, call
from os.path import dirname, join
from requests import HTTPError
from requests_mock import ANY, mock as requests_mock
from six import text_type
from unittest import TestCase
from octodns.record import Record, Update
@ -67,7 +66,7 @@ class TestCloudflareProvider(TestCase):
provider.populate(zone)
self.assertEquals('CloudflareError', type(ctx.exception).__name__)
self.assertEquals('request was invalid', text_type(ctx.exception))
self.assertEquals('request was invalid', str(ctx.exception))
# Bad auth
with requests_mock() as mock:
@ -82,7 +81,7 @@ class TestCloudflareProvider(TestCase):
self.assertEquals('CloudflareAuthenticationError',
type(ctx.exception).__name__)
self.assertEquals('Unknown X-Auth-Key or X-Auth-Email',
text_type(ctx.exception))
str(ctx.exception))
# Bad auth, unknown resp
with requests_mock() as mock:
@ -93,7 +92,7 @@ class TestCloudflareProvider(TestCase):
provider.populate(zone)
self.assertEquals('CloudflareAuthenticationError',
type(ctx.exception).__name__)
self.assertEquals('Cloudflare error', text_type(ctx.exception))
self.assertEquals('Cloudflare error', str(ctx.exception))
# General error
with requests_mock() as mock:
@ -120,7 +119,7 @@ class TestCloudflareProvider(TestCase):
type(ctx.exception).__name__)
self.assertEquals('More than 1200 requests per 300 seconds '
'reached. Please wait and consider throttling '
'your request speed', text_type(ctx.exception))
'your request speed', str(ctx.exception))
# Rate Limit error, unknown resp
with requests_mock() as mock:
@ -132,7 +131,7 @@ class TestCloudflareProvider(TestCase):
self.assertEquals('CloudflareRateLimitError',
type(ctx.exception).__name__)
self.assertEquals('Cloudflare error', text_type(ctx.exception))
self.assertEquals('Cloudflare error', str(ctx.exception))
# Non-existent zone doesn't populate anything
with requests_mock() as mock:
@ -1625,7 +1624,7 @@ class TestCloudflareProvider(TestCase):
]
with self.assertRaises(CloudflareRateLimitError) as ctx:
provider.zone_records(zone)
self.assertEquals('last', text_type(ctx.exception))
self.assertEquals('last', str(ctx.exception))
def test_ttl_mapping(self):
provider = CloudflareProvider('test', 'email', 'token')


+ 2
- 3
tests/test_octodns_provider_constellix.py View File

@ -10,7 +10,6 @@ from mock import Mock, call
from os.path import dirname, join
from requests import HTTPError
from requests_mock import ANY, mock as requests_mock
from six import text_type
from unittest import TestCase
from octodns.record import Record
@ -156,7 +155,7 @@ class TestConstellixProvider(TestCase):
with self.assertRaises(Exception) as ctx:
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertEquals('Unauthorized', text_type(ctx.exception))
self.assertEquals('Unauthorized', str(ctx.exception))
# Bad request
with requests_mock() as mock:
@ -168,7 +167,7 @@ class TestConstellixProvider(TestCase):
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertEquals('\n - "unittests" is not a valid domain name',
text_type(ctx.exception))
str(ctx.exception))
# General error
with requests_mock() as mock:


+ 1
- 2
tests/test_octodns_provider_digitalocean.py View File

@ -10,7 +10,6 @@ from mock import Mock, call
from os.path import dirname, join
from requests import HTTPError
from requests_mock import ANY, mock as requests_mock
from six import text_type
from unittest import TestCase
from octodns.record import Record
@ -51,7 +50,7 @@ class TestDigitalOceanProvider(TestCase):
with self.assertRaises(Exception) as ctx:
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertEquals('Unauthorized', text_type(ctx.exception))
self.assertEquals('Unauthorized', str(ctx.exception))
# General error
with requests_mock() as mock:


+ 1
- 2
tests/test_octodns_provider_dnsimple.py View File

@ -9,7 +9,6 @@ from mock import Mock, call
from os.path import dirname, join
from requests import HTTPError
from requests_mock import ANY, mock as requests_mock
from six import text_type
from unittest import TestCase
from octodns.record import Record
@ -54,7 +53,7 @@ class TestDnsimpleProvider(TestCase):
with self.assertRaises(Exception) as ctx:
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertEquals('Unauthorized', text_type(ctx.exception))
self.assertEquals('Unauthorized', str(ctx.exception))
# General error
with requests_mock() as mock:


+ 2
- 4
tests/test_octodns_provider_dnsmadeeasy.py View File

@ -10,7 +10,6 @@ from mock import Mock, call
from os.path import dirname, join
from requests import HTTPError
from requests_mock import ANY, mock as requests_mock
from six import text_type
from unittest import TestCase
from octodns.record import Record
@ -60,7 +59,7 @@ class TestDnsMadeEasyProvider(TestCase):
with self.assertRaises(Exception) as ctx:
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertEquals('Unauthorized', text_type(ctx.exception))
self.assertEquals('Unauthorized', str(ctx.exception))
# Bad request
with requests_mock() as mock:
@ -70,8 +69,7 @@ class TestDnsMadeEasyProvider(TestCase):
with self.assertRaises(Exception) as ctx:
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertEquals('\n - Rate limit exceeded',
text_type(ctx.exception))
self.assertEquals('\n - Rate limit exceeded', str(ctx.exception))
# General error
with requests_mock() as mock:


+ 4
- 5
tests/test_octodns_provider_easydns.py View File

@ -10,7 +10,6 @@ from mock import Mock, call
from os.path import dirname, join
from requests import HTTPError
from requests_mock import ANY, mock as requests_mock
from six import text_type
from unittest import TestCase
from octodns.record import Record
@ -37,7 +36,7 @@ class TestEasyDNSProvider(TestCase):
with self.assertRaises(Exception) as ctx:
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertEquals('Unauthorized', text_type(ctx.exception))
self.assertEquals('Unauthorized', str(ctx.exception))
# Bad request
with requests_mock() as mock:
@ -48,7 +47,7 @@ class TestEasyDNSProvider(TestCase):
with self.assertRaises(Exception) as ctx:
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertEquals('Bad request', text_type(ctx.exception))
self.assertEquals('Bad request', str(ctx.exception))
# General error
with requests_mock() as mock:
@ -102,7 +101,7 @@ class TestEasyDNSProvider(TestCase):
with self.assertRaises(Exception) as ctx:
provider._client.domain('unit.tests')
self.assertEquals('Not Found', text_type(ctx.exception))
self.assertEquals('Not Found', str(ctx.exception))
def test_apply_not_found(self):
provider = EasyDNSProvider('test', 'token', 'apikey',
@ -137,7 +136,7 @@ class TestEasyDNSProvider(TestCase):
with self.assertRaises(Exception) as ctx:
provider.apply(plan)
self.assertEquals('Not Found', text_type(ctx.exception))
self.assertEquals('Not Found', str(ctx.exception))
def test_domain_create(self):
provider = EasyDNSProvider('test', 'token', 'apikey',


+ 1
- 2
tests/test_octodns_provider_edgedns.py View File

@ -9,7 +9,6 @@ from __future__ import absolute_import, division, print_function, \
from os.path import dirname, join
from requests import HTTPError
from requests_mock import ANY, mock as requests_mock
from six import text_type
from unittest import TestCase
from octodns.record import Record
@ -149,7 +148,7 @@ class TestEdgeDnsProvider(TestCase):
changes = provider.apply(plan)
except NameError as e:
expected = "contractId not specified to create zone"
self.assertEquals(text_type(e), expected)
self.assertEquals(str(e), expected)
class TestDeprecatedAkamaiProvider(TestCase):


+ 5
- 6
tests/test_octodns_provider_gandi.py View File

@ -9,7 +9,6 @@ from mock import Mock, call
from os.path import dirname, join
from requests import HTTPError
from requests_mock import ANY, mock as requests_mock
from six import text_type
from unittest import TestCase
from octodns.record import Record
@ -58,7 +57,7 @@ class TestGandiProvider(TestCase):
with self.assertRaises(GandiClientBadRequest) as ctx:
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertIn('"status": "error"', text_type(ctx.exception))
self.assertIn('"status": "error"', str(ctx.exception))
# 401 - Unauthorized.
with requests_mock() as mock:
@ -73,7 +72,7 @@ class TestGandiProvider(TestCase):
with self.assertRaises(GandiClientUnauthorized) as ctx:
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertIn('"cause":"Unauthorized"', text_type(ctx.exception))
self.assertIn('"cause":"Unauthorized"', str(ctx.exception))
# 403 - Forbidden.
with requests_mock() as mock:
@ -85,7 +84,7 @@ class TestGandiProvider(TestCase):
with self.assertRaises(GandiClientForbidden) as ctx:
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertIn('"cause":"Forbidden"', text_type(ctx.exception))
self.assertIn('"cause":"Forbidden"', str(ctx.exception))
# 404 - Not Found.
with requests_mock() as mock:
@ -97,7 +96,7 @@ class TestGandiProvider(TestCase):
with self.assertRaises(GandiClientNotFound) as ctx:
zone = Zone('unit.tests.', [])
provider._client.zone(zone)
self.assertIn('"cause": "Not Found"', text_type(ctx.exception))
self.assertIn('"cause": "Not Found"', str(ctx.exception))
# General error
with requests_mock() as mock:
@ -175,7 +174,7 @@ class TestGandiProvider(TestCase):
plan = provider.plan(self.expected)
provider.apply(plan)
self.assertIn('This domain is not registered at Gandi.',
text_type(ctx.exception))
str(ctx.exception))
resp = Mock()
resp.json = Mock()


+ 5
- 6
tests/test_octodns_provider_gcore.py View File

@ -12,7 +12,6 @@ from __future__ import (
from mock import Mock, call
from os.path import dirname, join
from requests_mock import ANY, mock as requests_mock
from six import text_type
from unittest import TestCase
from octodns.record import Record, Update, Delete, Create
@ -52,7 +51,7 @@ class TestGCoreProvider(TestCase):
with self.assertRaises(GCoreClientBadRequest) as ctx:
zone = Zone("unit.tests.", [])
provider.populate(zone)
self.assertIn('"error":"bad body"', text_type(ctx.exception))
self.assertIn('"error":"bad body"', str(ctx.exception))
# TC: 404 - Not Found.
with requests_mock() as mock:
@ -64,7 +63,7 @@ class TestGCoreProvider(TestCase):
zone = Zone("unit.tests.", [])
provider._client.zone(zone.name)
self.assertIn(
'"error":"zone is not found"', text_type(ctx.exception)
'"error":"zone is not found"', str(ctx.exception)
)
# TC: General error
@ -74,7 +73,7 @@ class TestGCoreProvider(TestCase):
with self.assertRaises(GCoreClientException) as ctx:
zone = Zone("unit.tests.", [])
provider.populate(zone)
self.assertEqual("Things caught fire", text_type(ctx.exception))
self.assertEqual("Things caught fire", str(ctx.exception))
# TC: No credentials or token error
with requests_mock() as mock:
@ -82,7 +81,7 @@ class TestGCoreProvider(TestCase):
GCoreProvider("test_id")
self.assertEqual(
"either token or login & password must be set",
text_type(ctx.exception),
str(ctx.exception),
)
# TC: Auth with login password
@ -230,7 +229,7 @@ class TestGCoreProvider(TestCase):
provider.apply(plan)
self.assertIn(
"parent zone is already occupied by another client",
text_type(ctx.exception),
str(ctx.exception),
)
resp = Mock()


+ 1
- 2
tests/test_octodns_provider_hetzner.py View File

@ -10,7 +10,6 @@ from mock import Mock, call
from os.path import dirname, join
from requests import HTTPError
from requests_mock import ANY, mock as requests_mock
from six import text_type
from unittest import TestCase
from octodns.record import Record
@ -36,7 +35,7 @@ class TestHetznerProvider(TestCase):
with self.assertRaises(Exception) as ctx:
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertEquals('Unauthorized', text_type(ctx.exception))
self.assertEquals('Unauthorized', str(ctx.exception))
# General error
with requests_mock() as mock:


+ 9
- 16
tests/test_octodns_provider_mythicbeasts.py View File

@ -8,7 +8,6 @@ from __future__ import absolute_import, division, print_function, \
from os.path import dirname, join
from requests_mock import ANY, mock as requests_mock
from six import text_type
from unittest import TestCase
from octodns.provider.mythicbeasts import MythicBeastsProvider, \
@ -32,12 +31,12 @@ class TestMythicBeastsProvider(TestCase):
with self.assertRaises(AssertionError) as err:
add_trailing_dot('unit.tests.')
self.assertEquals('Value already has trailing dot',
text_type(err.exception))
str(err.exception))
with self.assertRaises(AssertionError) as err:
remove_trailing_dot('unit.tests')
self.assertEquals('Value already missing trailing dot',
text_type(err.exception))
str(err.exception))
self.assertEquals(add_trailing_dot('unit.tests'), 'unit.tests.')
self.assertEquals(remove_trailing_dot('unit.tests.'), 'unit.tests')
@ -91,8 +90,7 @@ class TestMythicBeastsProvider(TestCase):
'',
{'raw_values': [{'value': '', 'ttl': 0}]}
)
self.assertEquals('Unable to parse MX data',
text_type(err.exception))
self.assertEquals('Unable to parse MX data', str(err.exception))
def test_data_for_CNAME(self):
test_data = {
@ -129,8 +127,7 @@ class TestMythicBeastsProvider(TestCase):
'',
{'raw_values': [{'value': '', 'ttl': 0}]}
)
self.assertEquals('Unable to parse SRV data',
text_type(err.exception))
self.assertEquals('Unable to parse SRV data', str(err.exception))
def test_data_for_SSHFP(self):
test_data = {
@ -149,8 +146,7 @@ class TestMythicBeastsProvider(TestCase):
'',
{'raw_values': [{'value': '', 'ttl': 0}]}
)
self.assertEquals('Unable to parse SSHFP data',
text_type(err.exception))
self.assertEquals('Unable to parse SSHFP data', str(err.exception))
def test_data_for_CAA(self):
test_data = {
@ -166,8 +162,7 @@ class TestMythicBeastsProvider(TestCase):
'',
{'raw_values': [{'value': '', 'ttl': 0}]}
)
self.assertEquals('Unable to parse CAA data',
text_type(err.exception))
self.assertEquals('Unable to parse CAA data', str(err.exception))
def test_command_generation(self):
zone = Zone('unit.tests.', [])
@ -312,8 +307,7 @@ class TestMythicBeastsProvider(TestCase):
# Null passwords dict
with self.assertRaises(AssertionError) as err:
provider = MythicBeastsProvider('test', None)
self.assertEquals('Passwords must be a dictionary',
text_type(err.exception))
self.assertEquals('Passwords must be a dictionary', str(err.exception))
# Missing password
with requests_mock() as mock:
@ -323,9 +317,8 @@ class TestMythicBeastsProvider(TestCase):
provider = MythicBeastsProvider('test', dict())
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertEquals(
'Missing password for domain: unit.tests',
text_type(err.exception))
self.assertEquals('Missing password for domain: unit.tests',
str(err.exception))
# Failed authentication
with requests_mock() as mock:


+ 5
- 8
tests/test_octodns_provider_ns1.py View File

@ -9,7 +9,6 @@ from collections import defaultdict
from mock import call, patch
from ns1.rest.errors import AuthException, RateLimitException, \
ResourceException
from six import text_type
from unittest import TestCase
from octodns.record import Delete, Record, Update
@ -1584,8 +1583,7 @@ class TestNs1ProviderDynamic(TestCase):
}
with self.assertRaises(Ns1Exception) as ctx:
provider._data_for_dynamic('A', ns1_record)
self.assertEquals('Unrecognized advanced record',
text_type(ctx.exception))
self.assertEquals('Unrecognized advanced record', str(ctx.exception))
# empty record turns into empty data
ns1_record = {
@ -2150,8 +2148,7 @@ class TestNs1ProviderDynamic(TestCase):
records_retrieve_mock.side_effect = ns1_zone['records']
with self.assertRaises(Ns1Exception) as ctx:
extra = provider._extra_changes(desired, [])
self.assertTrue('Mixed disabled flag in filters' in
text_type(ctx.exception))
self.assertTrue('Mixed disabled flag in filters' in str(ctx.exception))
DESIRED = Zone('unit.tests.', [])
@ -2231,14 +2228,14 @@ class TestNs1ProviderDynamic(TestCase):
apply_update_mock.reset_mock()
with self.assertRaises(Ns1Exception) as ctx:
provider._apply(dynamic_plan)
self.assertTrue('monitor_regions not set' in text_type(ctx.exception))
self.assertTrue('monitor_regions not set' in str(ctx.exception))
apply_update_mock.assert_not_called()
# Blows up and apply not called even though there's a simple
apply_update_mock.reset_mock()
with self.assertRaises(Ns1Exception) as ctx:
provider._apply(both_plan)
self.assertTrue('monitor_regions not set' in text_type(ctx.exception))
self.assertTrue('monitor_regions not set' in str(ctx.exception))
apply_update_mock.assert_not_called()
# with monitor_regions set
@ -2296,7 +2293,7 @@ class TestNs1Client(TestCase):
]
with self.assertRaises(RateLimitException) as ctx:
client.zones_retrieve('unit.tests')
self.assertEquals('last', text_type(ctx.exception))
self.assertEquals('last', str(ctx.exception))
def test_client_config(self):
with self.assertRaises(TypeError):


+ 3
- 4
tests/test_octodns_provider_powerdns.py View File

@ -9,7 +9,6 @@ from json import loads, dumps
from os.path import dirname, join
from requests import HTTPError
from requests_mock import ANY, mock as requests_mock
from six import text_type
from unittest import TestCase
from octodns.record import Record
@ -51,7 +50,7 @@ class TestPowerDnsProvider(TestCase):
with self.assertRaises(Exception) as ctx:
provider.powerdns_version
self.assertTrue('unauthorized' in text_type(ctx.exception))
self.assertTrue('unauthorized' in str(ctx.exception))
# Api not found
with requests_mock() as mock:
@ -59,7 +58,7 @@ class TestPowerDnsProvider(TestCase):
with self.assertRaises(Exception) as ctx:
provider.powerdns_version
self.assertTrue('404' in text_type(ctx.exception))
self.assertTrue('404' in str(ctx.exception))
# Test version detection
with requests_mock() as mock:
@ -150,7 +149,7 @@ class TestPowerDnsProvider(TestCase):
with self.assertRaises(Exception) as ctx:
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertTrue('unauthorized' in text_type(ctx.exception))
self.assertTrue('unauthorized' in str(ctx.exception))
# General error
with requests_mock() as mock:


+ 2
- 3
tests/test_octodns_provider_rackspace.py View File

@ -7,8 +7,7 @@ from __future__ import absolute_import, division, print_function, \
import json
import re
from six import text_type
from six.moves.urllib.parse import urlparse
from urllib.parse import urlparse
from unittest import TestCase
from requests import HTTPError
@ -53,7 +52,7 @@ class TestRackspaceProvider(TestCase):
with self.assertRaises(Exception) as ctx:
zone = Zone('unit.tests.', [])
self.provider.populate(zone)
self.assertTrue('unauthorized' in text_type(ctx.exception))
self.assertTrue('unauthorized' in str(ctx.exception))
self.assertTrue(mock.called_once)
def test_server_error(self):


+ 1
- 2
tests/test_octodns_provider_route53.py View File

@ -7,7 +7,6 @@ from __future__ import absolute_import, division, print_function, \
from botocore.exceptions import ClientError
from botocore.stub import ANY, Stubber
from six import text_type
from unittest import TestCase
from mock import patch
@ -2530,7 +2529,7 @@ class TestRoute53Provider(TestCase):
provider, plan = self._get_test_plan(1)
with self.assertRaises(Exception) as ctx:
provider.apply(plan)
self.assertTrue('modifications' in text_type(ctx.exception))
self.assertTrue('modifications' in str(ctx.exception))
def test_semicolon_fixup(self):
provider = Route53Provider('test', 'abc', '123')


+ 1
- 2
tests/test_octodns_provider_selectel.py View File

@ -6,7 +6,6 @@ from __future__ import absolute_import, division, print_function, \
unicode_literals
from unittest import TestCase
from six import text_type
import requests_mock
@ -288,7 +287,7 @@ class TestSelectelProvider(TestCase):
with self.assertRaises(Exception) as ctx:
SelectelProvider(123, 'fail_token')
self.assertEquals(text_type(ctx.exception),
self.assertEquals(str(ctx.exception),
'Authorization failed. Invalid or empty token.')
@requests_mock.Mocker()


+ 5
- 6
tests/test_octodns_provider_ultra.py View File

@ -4,8 +4,7 @@ from mock import Mock, call
from os.path import dirname, join
from requests import HTTPError
from requests_mock import ANY, mock as requests_mock
from six import text_type
from six.moves.urllib import parse
from urllib.parse import parse_qs
from unittest import TestCase
from json import load as json_load
@ -45,7 +44,7 @@ class TestUltraProvider(TestCase):
text='{"errorCode": 60001}')
with self.assertRaises(Exception) as ctx:
UltraProvider('test', 'account', 'user', 'wrongpass')
self.assertEquals('Unauthorized', text_type(ctx.exception))
self.assertEquals('Unauthorized', str(ctx.exception))
# Good Auth
with requests_mock() as mock:
@ -58,8 +57,8 @@ class TestUltraProvider(TestCase):
self.assertEquals(1, mock.call_count)
expected_payload = "grant_type=password&username=user&"\
"password=rightpass"
self.assertEquals(parse.parse_qs(mock.last_request.text),
parse.parse_qs(expected_payload))
self.assertEquals(parse_qs(mock.last_request.text),
parse_qs(expected_payload))
def test_get_zones(self):
provider = _get_provider()
@ -145,7 +144,7 @@ class TestUltraProvider(TestCase):
headers={'Authorization': 'Bearer 123'}, json={})
with self.assertRaises(Exception) as ctx:
provider._get(path)
self.assertEquals('Unauthorized', text_type(ctx.exception))
self.assertEquals('Unauthorized', str(ctx.exception))
# Test all GET patterns
with requests_mock() as mock:


+ 2
- 3
tests/test_octodns_provider_yaml.py View File

@ -8,7 +8,6 @@ from __future__ import absolute_import, division, print_function, \
from os import makedirs
from os.path import basename, dirname, isdir, isfile, join
from unittest import TestCase
from six import text_type
from yaml import safe_load
from yaml.constructor import ConstructorError
@ -187,7 +186,7 @@ class TestYamlProvider(TestCase):
with self.assertRaises(SubzoneRecordException) as ctx:
source.populate(zone)
self.assertEquals('Record www.sub.unit.tests. is under a managed '
'subzone', text_type(ctx.exception))
'subzone', str(ctx.exception))
class TestSplitYamlProvider(TestCase):
@ -385,7 +384,7 @@ class TestSplitYamlProvider(TestCase):
with self.assertRaises(SubzoneRecordException) as ctx:
source.populate(zone)
self.assertEquals('Record www.sub.unit.tests. is under a managed '
'subzone', text_type(ctx.exception))
'subzone', str(ctx.exception))
class TestOverridingYamlProvider(TestCase):


+ 33
- 3
tests/test_octodns_record.py View File

@ -5,7 +5,6 @@
from __future__ import absolute_import, division, print_function, \
unicode_literals
from six import text_type
from unittest import TestCase
from octodns.record import ARecord, AaaaRecord, AliasRecord, CaaRecord, \
@ -1013,14 +1012,14 @@ class TestRecord(TestCase):
# Missing type
with self.assertRaises(Exception) as ctx:
Record.new(self.zone, 'unknown', {})
self.assertTrue('missing type' in text_type(ctx.exception))
self.assertTrue('missing type' in str(ctx.exception))
# Unknown type
with self.assertRaises(Exception) as ctx:
Record.new(self.zone, 'unknown', {
'type': 'XXX',
})
self.assertTrue('Unknown record type' in text_type(ctx.exception))
self.assertTrue('Unknown record type' in str(ctx.exception))
def test_record_copy(self):
a = Record.new(self.zone, 'a', {
@ -1055,6 +1054,37 @@ class TestRecord(TestCase):
d.copy()
self.assertEquals('TXT', d._type)
def test_dynamic_record_copy(self):
a_data = {
'dynamic': {
'pools': {
'one': {
'values': [{
'value': '3.3.3.3',
}],
},
},
'rules': [{
'pool': 'one',
}],
},
'octodns': {
'healthcheck': {
'protocol': 'TCP',
'port': 80,
},
},
'ttl': 60,
'type': 'A',
'values': [
'1.1.1.1',
'2.2.2.2',
],
}
record1 = Record.new(self.zone, 'a', a_data)
record2 = record1.copy()
self.assertEqual(record1._octodns, record2._octodns)
def test_change(self):
existing = Record.new(self.zone, 'txt', {
'ttl': 44,


+ 3
- 4
tests/test_octodns_source_axfr.py View File

@ -11,7 +11,6 @@ from dns.exception import DNSException
from mock import patch
from os.path import exists
from shutil import copyfile
from six import text_type
from unittest import TestCase
from octodns.source.axfr import AxfrSource, AxfrSourceZoneTransferFailed, \
@ -42,7 +41,7 @@ class TestAxfrSource(TestCase):
zone = Zone('unit.tests.', [])
self.source.populate(zone)
self.assertEquals('Unable to Perform Zone Transfer',
text_type(ctx.exception))
str(ctx.exception))
class TestZoneFileSource(TestCase):
@ -99,7 +98,7 @@ class TestZoneFileSource(TestCase):
zone = Zone('invalid.zone.', [])
self.source.populate(zone)
self.assertEquals('The DNS zone has no NS RRset at its origin.',
text_type(ctx.exception))
str(ctx.exception))
# Records are not to RFC (lenient=False)
with self.assertRaises(ValidationError) as ctx:
@ -107,7 +106,7 @@ class TestZoneFileSource(TestCase):
self.source.populate(zone)
self.assertEquals('Invalid record _invalid.invalid.records.\n'
' - invalid name for SRV record',
text_type(ctx.exception))
str(ctx.exception))
# Records are not to RFC, but load anyhow (lenient=True)
invalid = Zone('invalid.records.', [])


+ 1
- 2
tests/test_octodns_source_envvar.py View File

@ -1,5 +1,4 @@
from mock import patch
from six import text_type
from unittest import TestCase
from octodns.source.envvar import EnvVarSource
@ -15,7 +14,7 @@ class TestEnvVarSource(TestCase):
with self.assertRaises(EnvironmentVariableNotFoundException) as ctx:
source._read_variable()
msg = f'Unknown environment variable {envvar}'
self.assertEquals(msg, text_type(ctx.exception))
self.assertEquals(msg, str(ctx.exception))
with patch.dict('os.environ', {envvar: 'testvalue'}):
value = source._read_variable()


+ 1
- 1
tests/test_octodns_yaml.py View File

@ -5,7 +5,7 @@
from __future__ import absolute_import, division, print_function, \
unicode_literals
from six import StringIO
from io import StringIO
from unittest import TestCase
from yaml.constructor import ConstructorError


+ 5
- 6
tests/test_octodns_zone.py View File

@ -6,7 +6,6 @@ from __future__ import absolute_import, division, print_function, \
unicode_literals
from unittest import TestCase
from six import text_type
from octodns.record import ARecord, AaaaRecord, Create, Delete, Record, Update
from octodns.zone import DuplicateRecordException, InvalidNodeException, \
@ -48,7 +47,7 @@ class TestZone(TestCase):
with self.assertRaises(DuplicateRecordException) as ctx:
zone.add_record(a)
self.assertEquals('Duplicate record a.unit.tests., type A',
text_type(ctx.exception))
str(ctx.exception))
self.assertEquals(zone.records, set([a]))
# can add duplicate with replace=True
@ -138,7 +137,7 @@ class TestZone(TestCase):
def test_missing_dot(self):
with self.assertRaises(Exception) as ctx:
Zone('not.allowed', [])
self.assertTrue('missing ending dot' in text_type(ctx.exception))
self.assertTrue('missing ending dot' in str(ctx.exception))
def test_sub_zones(self):
@ -161,7 +160,7 @@ class TestZone(TestCase):
})
with self.assertRaises(SubzoneRecordException) as ctx:
zone.add_record(record)
self.assertTrue('not of type NS', text_type(ctx.exception))
self.assertTrue('not of type NS', str(ctx.exception))
# Can add it w/lenient
zone.add_record(record, lenient=True)
self.assertEquals(set([record]), zone.records)
@ -175,7 +174,7 @@ class TestZone(TestCase):
})
with self.assertRaises(SubzoneRecordException) as ctx:
zone.add_record(record)
self.assertTrue('under a managed sub-zone', text_type(ctx.exception))
self.assertTrue('under a managed sub-zone', str(ctx.exception))
# Can add it w/lenient
zone.add_record(record, lenient=True)
self.assertEquals(set([record]), zone.records)
@ -189,7 +188,7 @@ class TestZone(TestCase):
})
with self.assertRaises(SubzoneRecordException) as ctx:
zone.add_record(record)
self.assertTrue('under a managed sub-zone', text_type(ctx.exception))
self.assertTrue('under a managed sub-zone', str(ctx.exception))
# Can add it w/lenient
zone.add_record(record, lenient=True)
self.assertEquals(set([record]), zone.records)


Loading…
Cancel
Save